From 3231b9ed9545020fd3743228682df23829f37b2a Mon Sep 17 00:00:00 2001 From: iperov Date: Thu, 11 Nov 2021 23:08:13 +0400 Subject: [PATCH] upd xlib.mp --- xlib/mp/MPWorker.py | 143 ++++++++++++++++++++++++++++++++++++++ xlib/mp/SPMTWorker.py | 157 ------------------------------------------ xlib/mp/__init__.py | 2 +- 3 files changed, 144 insertions(+), 158 deletions(-) create mode 100644 xlib/mp/MPWorker.py delete mode 100644 xlib/mp/SPMTWorker.py diff --git a/xlib/mp/MPWorker.py b/xlib/mp/MPWorker.py new file mode 100644 index 0000000..a17e618 --- /dev/null +++ b/xlib/mp/MPWorker.py @@ -0,0 +1,143 @@ +import multiprocessing +import threading +import time +import traceback +import weakref +from typing import List + +class MPWorker: + def __init__(self, sub_args : List = None, + process_count : int = None ): + """ + base class for multi process worker + + provides messaging interface between host and subprocesses + + + sub_args a list of args will be passed to _on_sub_initialize + + process_count number of subprocesses. Default : number of cpu count + + starts immediatelly after construction. + """ + if process_count is None: + process_count = multiprocessing.cpu_count() + + pipes = [] + ps = [] + for i in range(process_count): + host_pipe, sub_pipe = multiprocessing.Pipe() + + p = multiprocessing.Process(target=self._sub_process, args=(i, process_count, sub_pipe, sub_args), daemon=True) + p.start() + + pipes += [host_pipe] + ps += [p] + + self._process_id = -1 + self._process_count = process_count + self._process_working_count = process_count + self._pipes = pipes + self._ps = ps + + threading.Thread(target=_host_thread_proc, args=(weakref.ref(self),), daemon=True).start() + + # overridable + def _on_host_sub_message(self, process_id, name, *args, **kwargs): + """a message from subprocess""" + # overridable + def _on_sub_host_message(self, name, *args, **kwargs): + """a message from host""" + # overridable + def _on_sub_initialize(self, *args): + """on subprocess initialization""" + # overridable + def _on_sub_finalize(self): + """on graceful subprocess finalization""" + # overridable + def _on_sub_tick(self, process_id): + """""" + def get_process_count(self) -> int: return self._process_count + def get_process_id(self) -> int: return self._process_id + + def kill(self): + """ + kill subprocess + """ + for p in self._ps: + p.kill() + self._ps = [] + + def stop(self): + """ + graceful stop subprocess, will wait all subprocess finalization + """ + self._send_msg('__stop') + for p in self._ps: + p.join() + self._ps = [] + + def _host_process_messages(self, timeout : float = 0) -> bool: + """ + process messages on host side + """ + for process_id, pipe in enumerate(self._pipes): + try: + if pipe.poll(timeout): + name, args, kwargs = pipe.recv() + if name == '__stopped': + self._process_working_count -= 1 + else: + self._on_host_sub_message(process_id, name, *args, **kwargs) + except: + ... + + def _send_msg(self, name, *args, process_id=-1, **kwargs): + """ + send message to other side + + process_id -1 mean send to all sub processes + on subprocess side - ignore this param + """ + try: + for i, pipe in enumerate(self._pipes): + if process_id == -1 or i == process_id: + pipe.send( (name, args, kwargs) ) + except: + ... + + def _sub_process(self, process_id, process_count, pipe, sub_args): + self._process_id = process_id + self._process_count = process_count + self._pipes = [pipe] + + self._on_sub_initialize(*sub_args) + + working = True + while working: + self._on_sub_tick(process_id) + + if pipe.poll(0.005): + while True: + name, args, kwargs = pipe.recv() + if name == '__stop': + working = False + else: + try: + self._on_sub_host_message(name, *args, **kwargs) + except: + print(f'Error during handling host message {name} : {traceback.format_exc()}') + + if not pipe.poll(): + break + + self._on_sub_finalize() + +def _host_thread_proc(wref): + while True: + ref = wref() + if ref is None: + break + ref._host_process_messages(0) + del ref + time.sleep(0.005) \ No newline at end of file diff --git a/xlib/mp/SPMTWorker.py b/xlib/mp/SPMTWorker.py deleted file mode 100644 index cbb96ce..0000000 --- a/xlib/mp/SPMTWorker.py +++ /dev/null @@ -1,157 +0,0 @@ -import multiprocessing -import threading -import time -import traceback -import weakref - -def _host_thread_proc(wref): - while True: - ref = wref() - if ref is None: - break - ref._host_process_messages(0.005) - del ref - -class SPMTWorker: - def __init__(self, *sub_args, **sub_kwargs): - """ - base class for single subprocess multi thread worker - - provides messaging interface between host and subprocess - """ - host_pipe, sub_pipe = multiprocessing.Pipe() - p = multiprocessing.Process(target=self._subprocess_proc, args=(sub_pipe, sub_args, sub_kwargs), daemon=True) - p.start() - self._p = p - self._pipe = host_pipe - - threading.Thread(target=_host_thread_proc, args=(weakref.ref(self),), daemon=True).start() - - def kill(self): - """ - kill subprocess - """ - self._p.terminate() - self._p.join() - - def stop(self): - """ - graceful stop subprocess, will wait all thread finalization - """ - self._send_msg('_stop') - self._p.join() - - # overridable - def _on_host_sub_message(self, name, *args, **kwargs): - """ - a message from subprocess - """ - - def _host_process_messages(self, timeout : float = 0) -> bool: - """ - process messages on host side - """ - try: - pipe = self._pipe - if pipe.poll(timeout): - while True: - name, args, kwargs = pipe.recv() - self._on_host_sub_message(name, *args, **kwargs) - if not pipe.poll(): - break - except: - ... - - # overridable - def _on_sub_host_message(self, name, *args, **kwargs): - """ - a message from host - """ - - # overridable - def _on_sub_initialize(self): - """ - on subprocess initialization - """ - - def _on_sub_finalize(self): - """ - on graceful subprocess finalization - """ - print('_on_sub_finalize') - - # overridable - def _on_sub_thread_initialize(self, thread_id): - """ - called on subprocess thread initialization - """ - # overridable - def _on_sub_thread_finalize(self, thread_id): - """ - called on subprocess thread finalization - """ - # overridable - def _on_sub_thread_tick(self, thread_id): - """ - called on subprocess thread tick - """ - - - def _send_msg(self, name, *args, **kwargs): - """ - send message to other side (to host or to sub) - """ - try: - self._pipe.send( (name, args, kwargs) ) - except: - ... - - - def _sub_thread_proc(self, thread_id): - self._on_sub_thread_initialize(thread_id) - while self._threads_running: - self._on_sub_thread_tick(thread_id) - time.sleep(0.005) - self._on_sub_thread_finalize(thread_id) - - self._threads_exit_barrier.wait() - - def _sub_get_thread_count(self) -> int: - return self._thread_count - - def _subprocess_proc(self, pipe, sub_args, sub_kwargs): - self._pipe = pipe - self._thread_count = multiprocessing.cpu_count() - - self._on_sub_initialize(*sub_args, **sub_kwargs) - - self._threads = [] - self._threads_running = True - self._threads_exit_barrier = threading.Barrier(self._thread_count+1) - - for thread_id in range(self._thread_count): - t = threading.Thread(target=self._sub_thread_proc, args=(thread_id,), daemon=True) - t.start() - self._threads.append(t) - - working = True - while working: - if pipe.poll(0.005): - while True: - name, args, kwargs = pipe.recv() - if name == '_stop': - working = False - else: - try: - self._on_sub_host_message(name, *args, **kwargs) - except: - print(f'Error during handling host message {name} : {traceback.format_exc()}') - - if not pipe.poll(): - break - - self._threads_running = False - - self._threads_exit_barrier.wait() - - self._on_sub_finalize() \ No newline at end of file diff --git a/xlib/mp/__init__.py b/xlib/mp/__init__.py index ec49a8c..65e1420 100644 --- a/xlib/mp/__init__.py +++ b/xlib/mp/__init__.py @@ -6,4 +6,4 @@ from .PMPI import PMPI from .MPAtomicInt32 import MPAtomicInt32 from .MPSPSCMRRingData import MPSPSCMRRingData from .MPWeakHeap import MPWeakHeap -from .SPMTWorker import SPMTWorker \ No newline at end of file +from .MPWorker import MPWorker \ No newline at end of file