From 7180076ecfd6b98638292fdc6f3d446fa45014ea Mon Sep 17 00:00:00 2001 From: Colombo Date: Sat, 29 Feb 2020 13:31:36 +0400 Subject: [PATCH] fix --- core/joblib/SubprocessorBase.py | 34 ++++++++++++++++++++------------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/core/joblib/SubprocessorBase.py b/core/joblib/SubprocessorBase.py index a7213f9..af96f21 100644 --- a/core/joblib/SubprocessorBase.py +++ b/core/joblib/SubprocessorBase.py @@ -29,7 +29,9 @@ class Subprocessor(object): def kill(self): self.p.terminate() self.p.join() - + self.s2c.close() + self.c2s.close() + #overridable optional def on_initialize(self, client_dict): #initialize your subprocess here using client_dict @@ -77,7 +79,12 @@ class Subprocessor(object): time.sleep(0.001) self.on_finalize() + + c2s.put ( {'op': 'finalized'} ) + self.s2c.close() + self.c2s.close() + return except Subprocessor.SilenceException as e: pass @@ -281,20 +288,21 @@ class Subprocessor(object): while True: for cli in self.clis[:]: - terminate_it = False - while not cli.c2s.empty(): - obj = cli.c2s.get() - obj_op = obj['op'] - if obj_op == 'finalized': + if cli.state != 2: + terminate_it = False + while not cli.c2s.empty(): + obj = cli.c2s.get() + obj_op = obj['op'] + if obj_op == 'finalized': + terminate_it = True + break + + if (time.time() - cli.sent_time) > 30: terminate_it = True - break - if (time.time() - cli.sent_time) > 30: - terminate_it = True - - if terminate_it: - cli.state = 2 - cli.kill() + if terminate_it: + cli.state = 2 + cli.kill() if all ([cli.state == 2 for cli in self.clis]): break