increased speed of sort by hist sim for ten thousands of faces

This commit is contained in:
iperov 2019-01-04 22:54:09 +04:00
parent 08550ac856
commit b5ba7d52cb
6 changed files with 226 additions and 140 deletions

View file

@ -7,9 +7,10 @@ import sys
class SubprocessorBase(object):
#overridable
def __init__(self, name, no_response_time_sec = 60):
def __init__(self, name, no_response_time_sec = 60):
self.name = name
self.no_response_time_sec = no_response_time_sec
self.is_host = True
#overridable
def process_info_generator(self):
@ -29,12 +30,12 @@ class SubprocessorBase(object):
return 0
#overridable
def onHostGetData(self):
def onHostGetData(self, host_dict):
#return data here
return None
#overridable
def onHostDataReturn (self, data):
def onHostDataReturn (self, host_dict, data):
#input_data.insert(0, obj['data'])
pass
@ -62,21 +63,20 @@ class SubprocessorBase(object):
pass
#overridable
def onHostResult (self, data, result):
def onHostResult (self, host_dict, data, result):
#return count of progress bar update
return 1
#overridable
def onHostProcessEnd(self):
pass
#overridable
def get_start_return(self):
def onFinalizeAndGetResult(self):
return None
def inc_progress_bar(self, c):
self.progress_bar.n += c
self.progress_bar.refresh()
def inc_progress_bar(self, c):
if self.is_host:
self.progress_bar.n += c
self.progress_bar.refresh()
else:
self.cq.put ( {'op': 'inc_bar', 'c':c} )
def safe_print(self, msg):
self.print_lock.acquire()
@ -111,7 +111,7 @@ class SubprocessorBase(object):
if len(self.processes) == 0:
raise Exception ("Unable to start Subprocessor '%s' " % (self.name))
while True:
for p in self.processes[:]:
while not p['cq'].empty():
@ -132,11 +132,10 @@ class SubprocessorBase(object):
if len(self.processes) == 0:
print ( self.get_no_process_started_message() )
return self.get_start_return()
self.onHostClientsInitialized()
return None
self.progress_bar = tqdm( total=self.onHostGetProgressBarLen(), desc=self.onHostGetProgressBarDesc() )
self.progress_bar = tqdm( total=self.onHostGetProgressBarLen(), desc=self.onHostGetProgressBarDesc() )
self.onHostClientsInitialized()
try:
while True:
@ -149,15 +148,16 @@ class SubprocessorBase(object):
data = obj['data']
result = obj['result']
c = self.onHostResult (data, result)
c = self.onHostResult (p['host_dict'], data, result)
if c > 0:
self.progress_bar.update(c)
p['state'] = 'free'
elif obj_op == 'inc_bar':
self.inc_progress_bar(obj['c'])
elif obj_op == 'error':
if 'data' in obj.keys():
self.onHostDataReturn ( obj['data'] )
self.onHostDataReturn (p['host_dict'], obj['data'] )
if obj['close'] == True:
p['sq'].put ( {'op': 'close'} )
@ -168,7 +168,7 @@ class SubprocessorBase(object):
for p in self.processes[:]:
if p['state'] == 'free':
data = self.onHostGetData()
data = self.onHostGetData(p['host_dict'])
if data is not None:
p['sq'].put ( {'op': 'data', 'data' : data} )
p['sent_time'] = time.time()
@ -176,9 +176,9 @@ class SubprocessorBase(object):
p['state'] = 'busy'
elif p['state'] == 'busy':
if (time.time() - p['sent_time']) > self.no_response_time_sec:
if self.no_response_time_sec != 0 and (time.time() - p['sent_time']) > self.no_response_time_sec:
print ( '%s doesnt response, terminating it.' % (p['name']) )
self.onHostDataReturn ( p['sent_data'] )
self.onHostDataReturn (p['host_dict'], p['sent_data'] )
p['process'].terminate()
self.processes.remove(p)
@ -205,7 +205,7 @@ class SubprocessorBase(object):
terminate_it = True
break
if (time.time() - p['sent_time']) > self.no_response_time_sec:
if self.no_response_time_sec != 0 and (time.time() - p['sent_time']) > self.no_response_time_sec:
terminate_it = True
if terminate_it:
@ -214,14 +214,13 @@ class SubprocessorBase(object):
if all ([p['state'] == 'finalized' for p in self.processes]):
break
return self.onFinalizeAndGetResult()
self.onHostProcessEnd()
return self.get_start_return()
def subprocess(self, sq, cq, client_dict):
self.is_host = False
self.print_lock = client_dict['print_lock']
self.cq = cq
try:
fail_message = self.onClientInitialize(client_dict)
except:
@ -241,7 +240,7 @@ class SubprocessorBase(object):
if obj_op == 'data':
data = obj['data']
try:
result = self.onClientProcessData (data)
result = self.onClientProcessData (data)
cq.put ( {'op': 'success', 'data' : data, 'result' : result} )
except: