diff --git a/core/joblib/SubprocessorBase.py b/core/joblib/SubprocessorBase.py index e4544a0..48c7355 100644 --- a/core/joblib/SubprocessorBase.py +++ b/core/joblib/SubprocessorBase.py @@ -26,14 +26,9 @@ class Subprocessor(object): self.name = None self.host_dict = None - def close(self, terminate=False): - if terminate: - self.p.terminate() + def kill(self): + self.p.terminate() self.p.join() - self.s2c.close() - self.c2s.close() - self.s2c = None - self.c2s = None #overridable optional def on_initialize(self, client_dict): @@ -55,14 +50,22 @@ class Subprocessor(object): #return string identificator of your 'data' return "undefined" - def log_info(self, msg): self.c2s.put ( {'op': 'log_info', 'msg':msg } ) - def log_err(self, msg): self.c2s.put ( {'op': 'log_err' , 'msg':msg } ) + def log_info(self, msg): self.c2s.put ( {'op': 'log_info', 'msg':msg } ) + def log_err(self, msg): self.c2s.put ( {'op': 'log_err' , 'msg':msg } ) def progress_bar_inc(self, c): self.c2s.put ( {'op': 'progress_bar_inc' , 'c':c } ) - def _subprocess_run(self, client_dict, s2c, c2s): + def _subprocess_run(self, client_dict, s2c, c2s): + import sys + if sys.platform != 'win32': + # fix for Linux , Ignoring : + # /usr/lib/python3.6/multiprocessing/semaphore_tracker.py:143: + # UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown + import warnings + warnings.simplefilter(action='ignore', category=UserWarning) + + self.s2c = s2c self.c2s = c2s data = None - is_error = False try: self.on_initialize(client_dict) @@ -83,24 +86,16 @@ class Subprocessor(object): self.on_finalize() c2s.put ( {'op': 'finalized'} ) - + return except Subprocessor.SilenceException as e: - is_error = True + pass except Exception as e: - is_error = True if data is not None: print ('Exception while process data [%s]: %s' % (self.get_data_name(data), traceback.format_exc()) ) else: print ('Exception: %s' % (traceback.format_exc()) ) - if is_error: - c2s.put ( {'op': 'error', 'data' : data} ) - - s2c.close() - c2s.close() - self.c2s = None - import gc - gc.collect() + c2s.put ( {'op': 'error', 'data' : data} ) # disable pickling def __getstate__(self): @@ -194,7 +189,7 @@ class Subprocessor(object): elif op == 'log_err': io.log_err(obj['msg']) elif op == 'error': - cli.close() + cli.kill() self.clis.remove(cli) break if cli.state == 0: @@ -219,7 +214,7 @@ class Subprocessor(object): elif op == 'log_err': io.log_err(obj['msg']) elif op == 'error': - cli.close() + cli.kill() self.clis.remove(cli) break if all ([cli.state == 0 for cli in self.clis]): @@ -248,7 +243,8 @@ class Subprocessor(object): #some error occured while process data, returning chunk to on_data_return if 'data' in obj.keys(): self.on_data_return (cli.host_dict, obj['data'] ) - cli.close() + #and killing process + cli.kill() self.clis.remove(cli) elif op == 'log_info': io.log_info(obj['msg']) @@ -263,7 +259,7 @@ class Subprocessor(object): #subprocess busy too long print ( '%s doesnt response, terminating it.' % (cli.name) ) self.on_data_return (cli.host_dict, cli.sent_data ) - cli.close(kill=True) + cli.kill() self.clis.remove(cli) for cli in self.clis[:]: @@ -293,18 +289,20 @@ class Subprocessor(object): while True: for cli in self.clis[:]: - if cli.state != 2: - while not cli.c2s.empty(): - obj = cli.c2s.get() - obj_op = obj['op'] - if obj_op == 'finalized': - cli.state = 2 - cli.close() - break + terminate_it = False + while not cli.c2s.empty(): + obj = cli.c2s.get() + obj_op = obj['op'] + if obj_op == 'finalized': + terminate_it = True + break - if (time.time() - cli.sent_time) > 30: - cli.state = 2 - cli.close(kill=True) + if (time.time() - cli.sent_time) > 30: + terminate_it = True + + if terminate_it: + cli.state = 2 + cli.kill() if all ([cli.state == 2 for cli in self.clis]): break