diff --git a/core/joblib/SubprocessorBase.py b/core/joblib/SubprocessorBase.py index 181c8cf..c7703d1 100644 --- a/core/joblib/SubprocessorBase.py +++ b/core/joblib/SubprocessorBase.py @@ -81,11 +81,8 @@ class Subprocessor(object): except Subprocessor.SilenceException as e: c2s.put ( {'op': 'error', 'data' : data} ) except Exception as e: - c2s.put ( {'op': 'error', 'data' : data} ) - if data is not None: - print ('Exception while process data [%s]: %s' % (self.get_data_name(data), traceback.format_exc()) ) - else: - print ('Exception: %s' % (traceback.format_exc()) ) + err_msg = traceback.format_exc() + c2s.put ( {'op': 'error', 'data' : data, 'err_msg' : err_msg} ) c2s.close() s2c.close() @@ -159,6 +156,24 @@ class Subprocessor(object): self.clis = [] + def cli_init_dispatcher(cli): + while not cli.c2s.empty(): + obj = cli.c2s.get() + op = obj.get('op','') + if op == 'init_ok': + cli.state = 0 + elif op == 'log_info': + io.log_info(obj['msg']) + elif op == 'log_err': + io.log_err(obj['msg']) + elif op == 'error': + err_msg = obj.get('err_msg', None) + if err_msg is not None: + io.log_info(f'Error while subprocess initialization: {err_msg}') + cli.kill() + self.clis.remove(cli) + break + #getting info about name of subprocesses, host and client dicts, and spawning them for name, host_dict, client_dict in self.process_info_generator(): try: @@ -173,19 +188,7 @@ class Subprocessor(object): if self.initialize_subprocesses_in_serial: while True: - while not cli.c2s.empty(): - obj = cli.c2s.get() - op = obj.get('op','') - if op == 'init_ok': - cli.state = 0 - elif op == 'log_info': - io.log_info(obj['msg']) - elif op == 'log_err': - io.log_err(obj['msg']) - elif op == 'error': - cli.kill() - self.clis.remove(cli) - break + cli_init_dispatcher(cli) if cli.state == 0: break io.process_messages(0.005) @@ -198,19 +201,7 @@ class Subprocessor(object): #waiting subprocesses their success(or not) initialization while True: for cli in self.clis[:]: - while not cli.c2s.empty(): - obj = cli.c2s.get() - op = obj.get('op','') - if op == 'init_ok': - cli.state = 0 - elif op == 'log_info': - io.log_info(obj['msg']) - elif op == 'log_err': - io.log_err(obj['msg']) - elif op == 'error': - cli.kill() - self.clis.remove(cli) - break + cli_init_dispatcher(cli) if all ([cli.state == 0 for cli in self.clis]): break io.process_messages(0.005)