diff --git a/core/joblib/SubprocessorBase.py b/core/joblib/SubprocessorBase.py index a7213f9..af96f21 100644 --- a/core/joblib/SubprocessorBase.py +++ b/core/joblib/SubprocessorBase.py @@ -29,7 +29,9 @@ class Subprocessor(object): def kill(self): self.p.terminate() self.p.join() - + self.s2c.close() + self.c2s.close() + #overridable optional def on_initialize(self, client_dict): #initialize your subprocess here using client_dict @@ -77,7 +79,12 @@ class Subprocessor(object): time.sleep(0.001) self.on_finalize() + + c2s.put ( {'op': 'finalized'} ) + self.s2c.close() + self.c2s.close() + return except Subprocessor.SilenceException as e: pass @@ -281,20 +288,21 @@ class Subprocessor(object): while True: for cli in self.clis[:]: - terminate_it = False - while not cli.c2s.empty(): - obj = cli.c2s.get() - obj_op = obj['op'] - if obj_op == 'finalized': + if cli.state != 2: + terminate_it = False + while not cli.c2s.empty(): + obj = cli.c2s.get() + obj_op = obj['op'] + if obj_op == 'finalized': + terminate_it = True + break + + if (time.time() - cli.sent_time) > 30: terminate_it = True - break - if (time.time() - cli.sent_time) > 30: - terminate_it = True - - if terminate_it: - cli.state = 2 - cli.kill() + if terminate_it: + cli.state = 2 + cli.kill() if all ([cli.state == 2 for cli in self.clis]): break