upd xlib.mp

This commit is contained in:
iperov 2021-11-11 23:08:13 +04:00
parent 30ba51edf7
commit 3231b9ed95
3 changed files with 144 additions and 158 deletions

143
xlib/mp/MPWorker.py Normal file
View file

@ -0,0 +1,143 @@
import multiprocessing
import threading
import time
import traceback
import weakref
from typing import List
class MPWorker:
def __init__(self, sub_args : List = None,
process_count : int = None ):
"""
base class for multi process worker
provides messaging interface between host and subprocesses
sub_args a list of args will be passed to _on_sub_initialize
process_count number of subprocesses. Default : number of cpu count
starts immediatelly after construction.
"""
if process_count is None:
process_count = multiprocessing.cpu_count()
pipes = []
ps = []
for i in range(process_count):
host_pipe, sub_pipe = multiprocessing.Pipe()
p = multiprocessing.Process(target=self._sub_process, args=(i, process_count, sub_pipe, sub_args), daemon=True)
p.start()
pipes += [host_pipe]
ps += [p]
self._process_id = -1
self._process_count = process_count
self._process_working_count = process_count
self._pipes = pipes
self._ps = ps
threading.Thread(target=_host_thread_proc, args=(weakref.ref(self),), daemon=True).start()
# overridable
def _on_host_sub_message(self, process_id, name, *args, **kwargs):
"""a message from subprocess"""
# overridable
def _on_sub_host_message(self, name, *args, **kwargs):
"""a message from host"""
# overridable
def _on_sub_initialize(self, *args):
"""on subprocess initialization"""
# overridable
def _on_sub_finalize(self):
"""on graceful subprocess finalization"""
# overridable
def _on_sub_tick(self, process_id):
""""""
def get_process_count(self) -> int: return self._process_count
def get_process_id(self) -> int: return self._process_id
def kill(self):
"""
kill subprocess
"""
for p in self._ps:
p.kill()
self._ps = []
def stop(self):
"""
graceful stop subprocess, will wait all subprocess finalization
"""
self._send_msg('__stop')
for p in self._ps:
p.join()
self._ps = []
def _host_process_messages(self, timeout : float = 0) -> bool:
"""
process messages on host side
"""
for process_id, pipe in enumerate(self._pipes):
try:
if pipe.poll(timeout):
name, args, kwargs = pipe.recv()
if name == '__stopped':
self._process_working_count -= 1
else:
self._on_host_sub_message(process_id, name, *args, **kwargs)
except:
...
def _send_msg(self, name, *args, process_id=-1, **kwargs):
"""
send message to other side
process_id -1 mean send to all sub processes
on subprocess side - ignore this param
"""
try:
for i, pipe in enumerate(self._pipes):
if process_id == -1 or i == process_id:
pipe.send( (name, args, kwargs) )
except:
...
def _sub_process(self, process_id, process_count, pipe, sub_args):
self._process_id = process_id
self._process_count = process_count
self._pipes = [pipe]
self._on_sub_initialize(*sub_args)
working = True
while working:
self._on_sub_tick(process_id)
if pipe.poll(0.005):
while True:
name, args, kwargs = pipe.recv()
if name == '__stop':
working = False
else:
try:
self._on_sub_host_message(name, *args, **kwargs)
except:
print(f'Error during handling host message {name} : {traceback.format_exc()}')
if not pipe.poll():
break
self._on_sub_finalize()
def _host_thread_proc(wref):
while True:
ref = wref()
if ref is None:
break
ref._host_process_messages(0)
del ref
time.sleep(0.005)

View file

@ -1,157 +0,0 @@
import multiprocessing
import threading
import time
import traceback
import weakref
def _host_thread_proc(wref):
while True:
ref = wref()
if ref is None:
break
ref._host_process_messages(0.005)
del ref
class SPMTWorker:
def __init__(self, *sub_args, **sub_kwargs):
"""
base class for single subprocess multi thread worker
provides messaging interface between host and subprocess
"""
host_pipe, sub_pipe = multiprocessing.Pipe()
p = multiprocessing.Process(target=self._subprocess_proc, args=(sub_pipe, sub_args, sub_kwargs), daemon=True)
p.start()
self._p = p
self._pipe = host_pipe
threading.Thread(target=_host_thread_proc, args=(weakref.ref(self),), daemon=True).start()
def kill(self):
"""
kill subprocess
"""
self._p.terminate()
self._p.join()
def stop(self):
"""
graceful stop subprocess, will wait all thread finalization
"""
self._send_msg('_stop')
self._p.join()
# overridable
def _on_host_sub_message(self, name, *args, **kwargs):
"""
a message from subprocess
"""
def _host_process_messages(self, timeout : float = 0) -> bool:
"""
process messages on host side
"""
try:
pipe = self._pipe
if pipe.poll(timeout):
while True:
name, args, kwargs = pipe.recv()
self._on_host_sub_message(name, *args, **kwargs)
if not pipe.poll():
break
except:
...
# overridable
def _on_sub_host_message(self, name, *args, **kwargs):
"""
a message from host
"""
# overridable
def _on_sub_initialize(self):
"""
on subprocess initialization
"""
def _on_sub_finalize(self):
"""
on graceful subprocess finalization
"""
print('_on_sub_finalize')
# overridable
def _on_sub_thread_initialize(self, thread_id):
"""
called on subprocess thread initialization
"""
# overridable
def _on_sub_thread_finalize(self, thread_id):
"""
called on subprocess thread finalization
"""
# overridable
def _on_sub_thread_tick(self, thread_id):
"""
called on subprocess thread tick
"""
def _send_msg(self, name, *args, **kwargs):
"""
send message to other side (to host or to sub)
"""
try:
self._pipe.send( (name, args, kwargs) )
except:
...
def _sub_thread_proc(self, thread_id):
self._on_sub_thread_initialize(thread_id)
while self._threads_running:
self._on_sub_thread_tick(thread_id)
time.sleep(0.005)
self._on_sub_thread_finalize(thread_id)
self._threads_exit_barrier.wait()
def _sub_get_thread_count(self) -> int:
return self._thread_count
def _subprocess_proc(self, pipe, sub_args, sub_kwargs):
self._pipe = pipe
self._thread_count = multiprocessing.cpu_count()
self._on_sub_initialize(*sub_args, **sub_kwargs)
self._threads = []
self._threads_running = True
self._threads_exit_barrier = threading.Barrier(self._thread_count+1)
for thread_id in range(self._thread_count):
t = threading.Thread(target=self._sub_thread_proc, args=(thread_id,), daemon=True)
t.start()
self._threads.append(t)
working = True
while working:
if pipe.poll(0.005):
while True:
name, args, kwargs = pipe.recv()
if name == '_stop':
working = False
else:
try:
self._on_sub_host_message(name, *args, **kwargs)
except:
print(f'Error during handling host message {name} : {traceback.format_exc()}')
if not pipe.poll():
break
self._threads_running = False
self._threads_exit_barrier.wait()
self._on_sub_finalize()

View file

@ -6,4 +6,4 @@ from .PMPI import PMPI
from .MPAtomicInt32 import MPAtomicInt32
from .MPSPSCMRRingData import MPSPSCMRRingData
from .MPWeakHeap import MPWeakHeap
from .SPMTWorker import SPMTWorker
from .MPWorker import MPWorker