mirror of
https://github.com/iperov/DeepFaceLive
synced 2025-07-15 01:23:45 -07:00
+xlib.sjob
This commit is contained in:
parent
a897cbb06e
commit
bdd80e7cce
2 changed files with 100 additions and 0 deletions
9
xlib/sjob/__init__.py
Normal file
9
xlib/sjob/__init__.py
Normal file
|
@ -0,0 +1,9 @@
|
||||||
|
"""
|
||||||
|
Job processing in subprocesses.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import multiprocessing
|
||||||
|
if multiprocessing.get_start_method() != 'spawn':
|
||||||
|
multiprocessing.set_start_method("spawn", force=True)
|
||||||
|
|
||||||
|
from .run_sequence import run_sequence
|
91
xlib/sjob/run_sequence.py
Normal file
91
xlib/sjob/run_sequence.py
Normal file
|
@ -0,0 +1,91 @@
|
||||||
|
import multiprocessing
|
||||||
|
from typing import Callable, List
|
||||||
|
|
||||||
|
from .. import console as lib_con
|
||||||
|
|
||||||
|
|
||||||
|
def _run_sequence(barrier, init_func, init_kwargs, final_func, process_func, pipe):
|
||||||
|
state = {}
|
||||||
|
if init_func is not None:
|
||||||
|
init_func(state, **init_kwargs)
|
||||||
|
|
||||||
|
barrier.wait()
|
||||||
|
|
||||||
|
while True:
|
||||||
|
if pipe.poll(0.05):
|
||||||
|
obj = pipe.recv()
|
||||||
|
cmd = obj['cmd']
|
||||||
|
if cmd == 'job':
|
||||||
|
result = process_func(state, obj['data'])
|
||||||
|
pipe.send({'cmd':'result', 'data': result})
|
||||||
|
elif cmd == 'finalize':
|
||||||
|
break
|
||||||
|
|
||||||
|
if final_func is not None:
|
||||||
|
final_func(state)
|
||||||
|
|
||||||
|
def run_sequence(data_list : List,
|
||||||
|
process_func : Callable,
|
||||||
|
init_func : Callable = None, init_kwargs : dict = None,
|
||||||
|
final_func : Callable = None,
|
||||||
|
mp_count : int = None, progress_bar_desc='Processing'):
|
||||||
|
"""
|
||||||
|
Simple Job to process list of picklable data.
|
||||||
|
|
||||||
|
init_func(state:dict, **init_kwargs)
|
||||||
|
|
||||||
|
process_func(state:dict, data) -> object
|
||||||
|
|
||||||
|
mp_count(None) number of subprocesses. Default - cores count.
|
||||||
|
"""
|
||||||
|
if mp_count is None:
|
||||||
|
mp_count = multiprocessing.cpu_count()
|
||||||
|
|
||||||
|
barrier = multiprocessing.Barrier(mp_count)
|
||||||
|
|
||||||
|
n_data_sent = [0]*mp_count
|
||||||
|
conn_list = [None]*mp_count
|
||||||
|
p_list = [None]*mp_count
|
||||||
|
|
||||||
|
for i in range(mp_count):
|
||||||
|
s_pipe, c_pipe = conn_list[i] = multiprocessing.Pipe()
|
||||||
|
p = p_list[i] = multiprocessing.Process(target=_run_sequence, args=(barrier, init_func, init_kwargs, final_func, process_func, c_pipe), daemon=True )
|
||||||
|
p.start()
|
||||||
|
|
||||||
|
data_list_len = len(data_list)
|
||||||
|
n_data_done = 0
|
||||||
|
i_data = 0
|
||||||
|
|
||||||
|
lib_con.progress_bar_print(0, data_list_len, desc=progress_bar_desc)
|
||||||
|
result = []
|
||||||
|
while n_data_done != data_list_len:
|
||||||
|
|
||||||
|
for n_conn, (s_pipe, _) in enumerate(conn_list):
|
||||||
|
|
||||||
|
if i_data < data_list_len:
|
||||||
|
if n_data_sent[n_conn] < 2:
|
||||||
|
n_data_sent[n_conn] += 1
|
||||||
|
|
||||||
|
data = data_list[i_data]
|
||||||
|
i_data += 1
|
||||||
|
|
||||||
|
s_pipe.send( {'cmd':'job', 'data':data} )
|
||||||
|
|
||||||
|
if s_pipe.poll(0):
|
||||||
|
obj = s_pipe.recv()
|
||||||
|
|
||||||
|
cmd = obj['cmd']
|
||||||
|
if cmd == 'result':
|
||||||
|
n_data_done += 1
|
||||||
|
lib_con.progress_bar_print(n_data_done, data_list_len, desc=progress_bar_desc)
|
||||||
|
|
||||||
|
n_data_sent[n_conn] -= 1
|
||||||
|
|
||||||
|
data = obj['data']
|
||||||
|
if data is not None:
|
||||||
|
result.append(data)
|
||||||
|
|
||||||
|
for n_conn, (s_pipe, _) in enumerate(conn_list):
|
||||||
|
s_pipe.send( {'cmd':'finalize'} )
|
||||||
|
|
||||||
|
return result
|
Loading…
Add table
Add a link
Reference in a new issue