This commit is contained in:
Colombo 2020-02-29 14:31:21 +04:00
parent 10ffca37c0
commit fcbfbdd560

View file

@ -26,14 +26,9 @@ class Subprocessor(object):
self.name = None
self.host_dict = None
def close(self, terminate=False):
if terminate:
self.p.terminate()
def kill(self):
self.p.terminate()
self.p.join()
self.s2c.close()
self.c2s.close()
self.s2c = None
self.c2s = None
#overridable optional
def on_initialize(self, client_dict):
@ -55,14 +50,22 @@ class Subprocessor(object):
#return string identificator of your 'data'
return "undefined"
def log_info(self, msg): self.c2s.put ( {'op': 'log_info', 'msg':msg } )
def log_err(self, msg): self.c2s.put ( {'op': 'log_err' , 'msg':msg } )
def log_info(self, msg): self.c2s.put ( {'op': 'log_info', 'msg':msg } )
def log_err(self, msg): self.c2s.put ( {'op': 'log_err' , 'msg':msg } )
def progress_bar_inc(self, c): self.c2s.put ( {'op': 'progress_bar_inc' , 'c':c } )
def _subprocess_run(self, client_dict, s2c, c2s):
def _subprocess_run(self, client_dict, s2c, c2s):
import sys
if sys.platform != 'win32':
# fix for Linux , Ignoring :
# /usr/lib/python3.6/multiprocessing/semaphore_tracker.py:143:
# UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown
import warnings
warnings.simplefilter(action='ignore', category=UserWarning)
self.s2c = s2c
self.c2s = c2s
data = None
is_error = False
try:
self.on_initialize(client_dict)
@ -83,24 +86,16 @@ class Subprocessor(object):
self.on_finalize()
c2s.put ( {'op': 'finalized'} )
return
except Subprocessor.SilenceException as e:
is_error = True
pass
except Exception as e:
is_error = True
if data is not None:
print ('Exception while process data [%s]: %s' % (self.get_data_name(data), traceback.format_exc()) )
else:
print ('Exception: %s' % (traceback.format_exc()) )
if is_error:
c2s.put ( {'op': 'error', 'data' : data} )
s2c.close()
c2s.close()
self.c2s = None
import gc
gc.collect()
c2s.put ( {'op': 'error', 'data' : data} )
# disable pickling
def __getstate__(self):
@ -194,7 +189,7 @@ class Subprocessor(object):
elif op == 'log_err':
io.log_err(obj['msg'])
elif op == 'error':
cli.close()
cli.kill()
self.clis.remove(cli)
break
if cli.state == 0:
@ -219,7 +214,7 @@ class Subprocessor(object):
elif op == 'log_err':
io.log_err(obj['msg'])
elif op == 'error':
cli.close()
cli.kill()
self.clis.remove(cli)
break
if all ([cli.state == 0 for cli in self.clis]):
@ -248,7 +243,8 @@ class Subprocessor(object):
#some error occured while process data, returning chunk to on_data_return
if 'data' in obj.keys():
self.on_data_return (cli.host_dict, obj['data'] )
cli.close()
#and killing process
cli.kill()
self.clis.remove(cli)
elif op == 'log_info':
io.log_info(obj['msg'])
@ -263,7 +259,7 @@ class Subprocessor(object):
#subprocess busy too long
print ( '%s doesnt response, terminating it.' % (cli.name) )
self.on_data_return (cli.host_dict, cli.sent_data )
cli.close(kill=True)
cli.kill()
self.clis.remove(cli)
for cli in self.clis[:]:
@ -293,18 +289,20 @@ class Subprocessor(object):
while True:
for cli in self.clis[:]:
if cli.state != 2:
while not cli.c2s.empty():
obj = cli.c2s.get()
obj_op = obj['op']
if obj_op == 'finalized':
cli.state = 2
cli.close()
break
terminate_it = False
while not cli.c2s.empty():
obj = cli.c2s.get()
obj_op = obj['op']
if obj_op == 'finalized':
terminate_it = True
break
if (time.time() - cli.sent_time) > 30:
cli.state = 2
cli.close(kill=True)
if (time.time() - cli.sent_time) > 30:
terminate_it = True
if terminate_it:
cli.state = 2
cli.kill()
if all ([cli.state == 2 for cli in self.clis]):
break