diff --git a/core/joblib/SubprocessorBase.py b/core/joblib/SubprocessorBase.py index af96f21..e4544a0 100644 --- a/core/joblib/SubprocessorBase.py +++ b/core/joblib/SubprocessorBase.py @@ -26,12 +26,15 @@ class Subprocessor(object): self.name = None self.host_dict = None - def kill(self): - self.p.terminate() + def close(self, terminate=False): + if terminate: + self.p.terminate() self.p.join() self.s2c.close() self.c2s.close() - + self.s2c = None + self.c2s = None + #overridable optional def on_initialize(self, client_dict): #initialize your subprocess here using client_dict @@ -52,14 +55,14 @@ class Subprocessor(object): #return string identificator of your 'data' return "undefined" - def log_info(self, msg): self.c2s.put ( {'op': 'log_info', 'msg':msg } ) - def log_err(self, msg): self.c2s.put ( {'op': 'log_err' , 'msg':msg } ) + def log_info(self, msg): self.c2s.put ( {'op': 'log_info', 'msg':msg } ) + def log_err(self, msg): self.c2s.put ( {'op': 'log_err' , 'msg':msg } ) def progress_bar_inc(self, c): self.c2s.put ( {'op': 'progress_bar_inc' , 'c':c } ) def _subprocess_run(self, client_dict, s2c, c2s): - self.s2c = s2c self.c2s = c2s data = None + is_error = False try: self.on_initialize(client_dict) @@ -79,22 +82,25 @@ class Subprocessor(object): time.sleep(0.001) self.on_finalize() - - c2s.put ( {'op': 'finalized'} ) - self.s2c.close() - self.c2s.close() - - return + except Subprocessor.SilenceException as e: - pass + is_error = True except Exception as e: + is_error = True if data is not None: print ('Exception while process data [%s]: %s' % (self.get_data_name(data), traceback.format_exc()) ) else: print ('Exception: %s' % (traceback.format_exc()) ) - c2s.put ( {'op': 'error', 'data' : data} ) + if is_error: + c2s.put ( {'op': 'error', 'data' : data} ) + + s2c.close() + c2s.close() + self.c2s = None + import gc + gc.collect() # disable pickling def __getstate__(self): @@ -188,7 +194,7 @@ class Subprocessor(object): elif op == 'log_err': io.log_err(obj['msg']) elif op == 'error': - cli.kill() + cli.close() self.clis.remove(cli) break if cli.state == 0: @@ -213,7 +219,7 @@ class Subprocessor(object): elif op == 'log_err': io.log_err(obj['msg']) elif op == 'error': - cli.kill() + cli.close() self.clis.remove(cli) break if all ([cli.state == 0 for cli in self.clis]): @@ -242,8 +248,7 @@ class Subprocessor(object): #some error occured while process data, returning chunk to on_data_return if 'data' in obj.keys(): self.on_data_return (cli.host_dict, obj['data'] ) - #and killing process - cli.kill() + cli.close() self.clis.remove(cli) elif op == 'log_info': io.log_info(obj['msg']) @@ -258,7 +263,7 @@ class Subprocessor(object): #subprocess busy too long print ( '%s doesnt response, terminating it.' % (cli.name) ) self.on_data_return (cli.host_dict, cli.sent_data ) - cli.kill() + cli.close(kill=True) self.clis.remove(cli) for cli in self.clis[:]: @@ -289,20 +294,17 @@ class Subprocessor(object): while True: for cli in self.clis[:]: if cli.state != 2: - terminate_it = False while not cli.c2s.empty(): obj = cli.c2s.get() obj_op = obj['op'] if obj_op == 'finalized': - terminate_it = True + cli.state = 2 + cli.close() break if (time.time() - cli.sent_time) > 30: - terminate_it = True - - if terminate_it: cli.state = 2 - cli.kill() + cli.close(kill=True) if all ([cli.state == 2 for cli in self.clis]): break