From 21b25038acce35ede169b479090040474cb2c3dd Mon Sep 17 00:00:00 2001 From: Colombo Date: Sun, 5 Jan 2020 11:53:31 +0400 Subject: [PATCH] optimized sample generator --- samplelib/SampleGeneratorFace.py | 36 ++-- samplelib/SampleGeneratorFacePerson.py | 1 + samplelib/SampleGeneratorFaceTemporal.py | 20 +- samplelib/SampleHost.py | 59 +++--- utils/iter_utils.py | 2 +- utils/mp_utils.py | 243 +++++++++++++---------- 6 files changed, 201 insertions(+), 160 deletions(-) diff --git a/samplelib/SampleGeneratorFace.py b/samplelib/SampleGeneratorFace.py index 7ba6129..edee7ea 100644 --- a/samplelib/SampleGeneratorFace.py +++ b/samplelib/SampleGeneratorFace.py @@ -7,8 +7,8 @@ import numpy as np from facelib import LandmarksProcessor from samplelib import (SampleGeneratorBase, SampleHost, SampleProcessor, SampleType) -from utils import iter_utils -from utils import mp_utils +from utils import iter_utils, mp_utils + ''' arg @@ -30,8 +30,13 @@ class SampleGeneratorFace(SampleGeneratorBase): self.output_sample_types = output_sample_types self.add_sample_idx = add_sample_idx - samples_host = SampleHost.mp_host (SampleType.FACE, self.samples_path) - self.samples_len = len(samples_host.get_list()) + if self.debug: + self.generators_count = 1 + else: + self.generators_count = np.clip(multiprocessing.cpu_count(), 2, 6) + + samples_clis = SampleHost.host (SampleType.FACE, self.samples_path, number_of_clis=self.generators_count) + self.samples_len = len(samples_clis[0]) if self.samples_len == 0: raise ValueError('No training data provided.') @@ -39,18 +44,16 @@ class SampleGeneratorFace(SampleGeneratorBase): index_host = mp_utils.IndexHost(self.samples_len) if random_ct_samples_path is not None: - ct_samples_host = SampleHost.mp_host (SampleType.FACE, random_ct_samples_path) - ct_index_host = mp_utils.IndexHost( len(ct_samples_host.get_list()) ) + ct_samples_clis = SampleHost.host (SampleType.FACE, random_ct_samples_path, number_of_clis=self.generators_count) + ct_index_host = mp_utils.IndexHost( len(ct_samples_clis[0]) ) else: - ct_samples_host = None + ct_samples_clis = None ct_index_host = None if self.debug: - self.generators_count = 1 - self.generators = [iter_utils.ThisThreadGenerator ( self.batch_func, (samples_host.create_cli(), index_host.create_cli(), ct_samples_host.create_cli() if ct_index_host is not None else None, ct_index_host.create_cli() if ct_index_host is not None else None) )] + self.generators = [iter_utils.ThisThreadGenerator ( self.batch_func, (samples_clis[0], index_host.create_cli(), ct_samples_clis[0] if ct_index_host is not None else None, ct_index_host.create_cli() if ct_index_host is not None else None) )] else: - self.generators_count = np.clip(multiprocessing.cpu_count(), 2, 4) - self.generators = [iter_utils.SubprocessGenerator ( self.batch_func, (samples_host.create_cli(), index_host.create_cli(), ct_samples_host.create_cli() if ct_index_host is not None else None, ct_index_host.create_cli() if ct_index_host is not None else None), start_now=True ) for i in range(self.generators_count) ] + self.generators = [iter_utils.SubprocessGenerator ( self.batch_func, (samples_clis[i], index_host.create_cli(), ct_samples_clis[i] if ct_index_host is not None else None, ct_index_host.create_cli() if ct_index_host is not None else None), start_now=True ) for i in range(self.generators_count) ] self.generator_counter = -1 @@ -72,13 +75,16 @@ class SampleGeneratorFace(SampleGeneratorBase): while True: batches = None - indexes = index_host.get(bs) - ct_indexes = ct_index_host.get(bs) if ct_samples is not None else None + indexes = index_host.multi_get(bs) + ct_indexes = ct_index_host.multi_get(bs) if ct_samples is not None else None + batch_samples = samples.multi_get (indexes) + batch_ct_samples = ct_samples.multi_get (ct_indexes) if ct_samples is not None else None + for n_batch in range(bs): sample_idx = indexes[n_batch] - sample = samples[ sample_idx ] - ct_sample = ct_samples[ ct_indexes[n_batch] ] if ct_samples is not None else None + sample = batch_samples[n_batch] + ct_sample = batch_ct_samples[n_batch] if ct_samples is not None else None try: x, = SampleProcessor.process ([sample], self.sample_process_options, self.output_sample_types, self.debug, ct_sample=ct_sample) diff --git a/samplelib/SampleGeneratorFacePerson.py b/samplelib/SampleGeneratorFacePerson.py index f0a927f..d254063 100644 --- a/samplelib/SampleGeneratorFacePerson.py +++ b/samplelib/SampleGeneratorFacePerson.py @@ -30,6 +30,7 @@ class SampleGeneratorFacePerson(SampleGeneratorBase): self.output_sample_types = output_sample_types self.person_id_mode = person_id_mode + raise NotImplementedError("Currently SampleGeneratorFacePerson is not implemented.") samples_host = SampleHost.mp_host (SampleType.FACE, self.samples_path) samples = samples_host.get_list() diff --git a/samplelib/SampleGeneratorFaceTemporal.py b/samplelib/SampleGeneratorFaceTemporal.py index 9fc8ce9..1ba06c5 100644 --- a/samplelib/SampleGeneratorFaceTemporal.py +++ b/samplelib/SampleGeneratorFaceTemporal.py @@ -20,14 +20,17 @@ class SampleGeneratorFaceTemporal(SampleGeneratorBase): self.sample_process_options = sample_process_options self.output_sample_types = output_sample_types - self.samples = SampleHost.load (SampleType.FACE_TEMPORAL_SORTED, self.samples_path) - if self.debug: self.generators_count = 1 - self.generators = [iter_utils.ThisThreadGenerator ( self.batch_func, 0 )] else: - self.generators_count = min ( generators_count, len(self.samples) ) - self.generators = [iter_utils.SubprocessGenerator ( self.batch_func, i ) for i in range(self.generators_count) ] + self.generators_count = generators_count + + samples_clis = SampleHost.host (SampleType.FACE_TEMPORAL_SORTED, self.samples_path, number_of_clis=self.generators_count) + + if self.debug: + self.generators = [iter_utils.ThisThreadGenerator ( self.batch_func, (0, samples_clis[0]) )] + else: + self.generators = [iter_utils.SubprocessGenerator ( self.batch_func, (i, samples_clis[i]) ) for i in range(self.generators_count) ] self.generator_counter = -1 @@ -39,8 +42,9 @@ class SampleGeneratorFaceTemporal(SampleGeneratorBase): generator = self.generators[self.generator_counter % len(self.generators) ] return next(generator) - def batch_func(self, generator_id): - samples = self.samples + def batch_func(self, param): + generator_id, samples = param + samples_len = len(samples) if samples_len == 0: raise ValueError('No training data provided.') @@ -56,10 +60,8 @@ class SampleGeneratorFaceTemporal(SampleGeneratorBase): shuffle_idxs = [] while True: - batches = None for n_batch in range(self.batch_size): - if len(shuffle_idxs) == 0: shuffle_idxs = samples_idxs.copy() np.random.shuffle (shuffle_idxs) diff --git a/samplelib/SampleHost.py b/samplelib/SampleHost.py index b240d07..003419a 100644 --- a/samplelib/SampleHost.py +++ b/samplelib/SampleHost.py @@ -1,7 +1,5 @@ -import gc import multiprocessing import operator -import pickle import traceback from pathlib import Path @@ -16,9 +14,11 @@ from .Sample import Sample, SampleType class SampleHost: + + + + samples_cache = dict() - host_cache = dict() - @staticmethod def get_person_id_max_count(samples_path): samples = None @@ -35,7 +35,7 @@ class SampleHost: return len(list(persons_name_idxs.keys())) @staticmethod - def load(sample_type, samples_path): + def host(sample_type, samples_path, number_of_clis): samples_cache = SampleHost.samples_cache if str(samples_path) not in samples_cache.keys(): @@ -46,9 +46,11 @@ class SampleHost: if sample_type == SampleType.IMAGE: if samples[sample_type] is None: samples[sample_type] = [ Sample(filename=filename) for filename in io.progress_bar_generator( Path_utils.get_image_paths(samples_path), "Loading") ] - elif sample_type == SampleType.FACE: - if samples[sample_type] is None: - result = None + elif sample_type == SampleType.FACE or \ + sample_type == SampleType.FACE_TEMPORAL_SORTED: + result = None + + if samples[sample_type] is None: try: result = samplelib.PackedFaceset.load(samples_path) except: @@ -60,33 +62,26 @@ class SampleHost: if result is None: result = SampleHost.load_face_samples( Path_utils.get_image_paths(samples_path) ) - result_dmp = pickle.dumps(result) - del result - gc.collect() - result = pickle.loads(result_dmp) - - samples[sample_type] = result - - elif sample_type == SampleType.FACE_TEMPORAL_SORTED: - if samples[sample_type] is None: - samples[sample_type] = SampleHost.upgradeToFaceTemporalSortedSamples( SampleHost.load(SampleType.FACE, samples_path) ) + samples[sample_type] = mp_utils.ListHost() + + if sample_type == SampleType.FACE_TEMPORAL_SORTED: + result = SampleHost.upgradeToFaceTemporalSortedSamples(result) + + list_host = samples[sample_type] + + clis = [ list_host.create_cli() for _ in range(number_of_clis) ] + + if result is not None: + while True: + if len(result) == 0: + break + items = result[0:10000] + del result[0:10000] + clis[0].extend(items) + return clis return samples[sample_type] - @staticmethod - def mp_host(sample_type, samples_path): - result = SampleHost.load (sample_type, samples_path) - - host_cache = SampleHost.host_cache - if str(samples_path) not in host_cache.keys(): - host_cache[str(samples_path)] = [None]*SampleType.QTY - hosts = host_cache[str(samples_path)] - - if hosts[sample_type] is None: - hosts[sample_type] = mp_utils.ListHost(result) - - return hosts[sample_type] - @staticmethod def load_face_samples ( image_paths): result = FaceSamplesLoaderSubprocessor(image_paths).run() diff --git a/utils/iter_utils.py b/utils/iter_utils.py index e690e3b..5cb1b2b 100644 --- a/utils/iter_utils.py +++ b/utils/iter_utils.py @@ -22,7 +22,7 @@ class ThisThreadGenerator(object): return next(self.generator_func) class SubprocessGenerator(object): - def __init__(self, generator_func, user_param=None, prefetch=2, start_now=False): + def __init__(self, generator_func, user_param=None, prefetch=3, start_now=False): super().__init__() self.prefetch = prefetch self.generator_func = generator_func diff --git a/utils/mp_utils.py b/utils/mp_utils.py index 02b04bd..28f795c 100644 --- a/utils/mp_utils.py +++ b/utils/mp_utils.py @@ -1,232 +1,269 @@ import multiprocessing import threading import time - +import traceback import numpy as np -class Index2DHost(): + +class Index2DHost(): """ Provides random shuffled 2D indexes for multiprocesses """ - def __init__(self, indexes2D): + def __init__(self, indexes2D, max_number_of_clis=128): self.sq = multiprocessing.Queue() - self.cqs = [] - self.clis = [] - self.thread = threading.Thread(target=self.host_thread, args=(indexes2D,) ) - self.thread.daemon = True - self.thread.start() + self.cqs = [ multiprocessing.Queue() for _ in range(max_number_of_clis) ] + self.n_clis = 0 + self.max_number_of_clis = max_number_of_clis + + self.p = multiprocessing.Process(target=self.host_proc, args=(indexes2D, self.sq, self.cqs) ) + self.p.daemon = True + self.p.start() - def host_thread(self, indexes2D): + def host_proc(self, indexes2D, sq, cqs): indexes_counts_len = len(indexes2D) - + idxs = [*range(indexes_counts_len)] idxs_2D = [None]*indexes_counts_len - shuffle_idxs = [] + shuffle_idxs = [] shuffle_idxs_2D = [None]*indexes_counts_len for i in range(indexes_counts_len): idxs_2D[i] = indexes2D[i] shuffle_idxs_2D[i] = [] - sq = self.sq - - while True: + while True: while not sq.empty(): obj = sq.get() cq_id, cmd = obj[0], obj[1] - + if cmd == 0: #get_1D count = obj[2] - + result = [] for i in range(count): if len(shuffle_idxs) == 0: shuffle_idxs = idxs.copy() np.random.shuffle(shuffle_idxs) result.append(shuffle_idxs.pop()) - self.cqs[cq_id].put (result) + cqs[cq_id].put (result) elif cmd == 1: #get_2D targ_idxs,count = obj[2], obj[3] result = [] - + for targ_idx in targ_idxs: sub_idxs = [] for i in range(count): - ar = shuffle_idxs_2D[targ_idx] + ar = shuffle_idxs_2D[targ_idx] if len(ar) == 0: ar = shuffle_idxs_2D[targ_idx] = idxs_2D[targ_idx].copy() np.random.shuffle(ar) - sub_idxs.append(ar.pop()) + sub_idxs.append(ar.pop()) result.append (sub_idxs) - self.cqs[cq_id].put (result) - + cqs[cq_id].put (result) + time.sleep(0.005) - + def create_cli(self): cq = multiprocessing.Queue() - self.cqs.append ( cq ) + self.cqs.append ( cq ) cq_id = len(self.cqs)-1 return Index2DHost.Cli(self.sq, cq, cq_id) - + # disable pickling def __getstate__(self): return dict() def __setstate__(self, d): self.__dict__.update(d) - + class Cli(): def __init__(self, sq, cq, cq_id): self.sq = sq self.cq = cq self.cq_id = cq_id - + def get_1D(self, count): self.sq.put ( (self.cq_id,0, count) ) - + while True: if not self.cq.empty(): return self.cq.get() time.sleep(0.001) - + def get_2D(self, idxs, count): self.sq.put ( (self.cq_id,1,idxs,count) ) - + while True: if not self.cq.empty(): return self.cq.get() time.sleep(0.001) - -class IndexHost(): + +class IndexHost(): """ Provides random shuffled indexes for multiprocesses """ - def __init__(self, indexes_count): + def __init__(self, indexes_count, max_number_of_clis=128): self.sq = multiprocessing.Queue() - self.cqs = [] - self.clis = [] - self.thread = threading.Thread(target=self.host_thread, args=(indexes_count,) ) - self.thread.daemon = True - self.thread.start() - - def host_thread(self, indexes_count): + self.cqs = [ multiprocessing.Queue() for _ in range(max_number_of_clis) ] + self.n_clis = 0 + self.max_number_of_clis = max_number_of_clis + + self.p = multiprocessing.Process(target=self.host_proc, args=(indexes_count, self.sq, self.cqs) ) + self.p.daemon = True + self.p.start() + + def host_proc(self, indexes_count, sq, cqs): idxs = [*range(indexes_count)] shuffle_idxs = [] - sq = self.sq - - while True: + + while True: while not sq.empty(): obj = sq.get() cq_id, count = obj[0], obj[1] - + result = [] for i in range(count): if len(shuffle_idxs) == 0: shuffle_idxs = idxs.copy() np.random.shuffle(shuffle_idxs) result.append(shuffle_idxs.pop()) - self.cqs[cq_id].put (result) - + cqs[cq_id].put (result) + time.sleep(0.005) - + def create_cli(self): - cq = multiprocessing.Queue() - self.cqs.append ( cq ) - cq_id = len(self.cqs)-1 - return IndexHost.Cli(self.sq, cq, cq_id) + if self.n_clis == self.max_number_of_clis: + raise Exception("") + cq_id = self.n_clis + self.n_clis += 1 + + return IndexHost.Cli(self.sq, self.cqs[cq_id], cq_id) + # disable pickling def __getstate__(self): return dict() def __setstate__(self, d): self.__dict__.update(d) - + class Cli(): def __init__(self, sq, cq, cq_id): self.sq = sq self.cq = cq self.cq_id = cq_id - - def get(self, count): + + def multi_get(self, count): self.sq.put ( (self.cq_id,count) ) - + while True: if not self.cq.empty(): return self.cq.get() time.sleep(0.001) - -class ListHost(): - def __init__(self, list_): + +class ListHost(): + def __init__(self, list_=None, max_number_of_clis=128): self.sq = multiprocessing.Queue() - self.cqs = [] - self.clis = [] - self.list_ = list_ - self.thread = threading.Thread(target=self.host_thread) - self.thread.daemon = True - self.thread.start() + self.cqs = [ multiprocessing.Queue() for _ in range(max_number_of_clis) ] + self.n_clis = 0 + self.max_number_of_clis = max_number_of_clis - def host_thread(self): - sq = self.sq - while True: + self.p = multiprocessing.Process(target=self.host_proc, args=(self.sq, self.cqs) ) + self.p.daemon = True + self.p.start() + + def host_proc(self, sq, cqs): + m_list = list() + + while True: while not sq.empty(): obj = sq.get() cq_id, cmd = obj[0], obj[1] if cmd == 0: - item = self.list_[ obj[2] ] - self.cqs[cq_id].put ( item ) - + cqs[cq_id].put ( len(m_list) ) elif cmd == 1: - self.cqs[cq_id].put ( len(self.list_) ) + idx = obj[2] + item = m_list[idx ] + cqs[cq_id].put ( item ) + elif cmd == 2: + result = [] + for item in obj[2]: + result.append ( m_list[item] ) + cqs[cq_id].put ( result ) + elif cmd == 3: + m_list.insert(obj[2], obj[3]) + elif cmd == 4: + m_list.append(obj[2]) + elif cmd == 5: + m_list.extend(obj[2]) time.sleep(0.005) + + def create_cli(self): + if self.n_clis == self.max_number_of_clis: + raise Exception("") - def create_cli(self): - cq = multiprocessing.Queue() - self.cqs.append ( cq ) - cq_id = len(self.cqs)-1 - return ListHost.Cli(self.sq, cq, cq_id) - - def get_list(self): - return self.list_ + cq_id = self.n_clis + self.n_clis += 1 + return ListHost.Cli(self.sq, self.cqs[cq_id], cq_id) + # disable pickling def __getstate__(self): return dict() def __setstate__(self, d): self.__dict__.update(d) - + class Cli(): def __init__(self, sq, cq, cq_id): self.sq = sq self.cq = cq self.cq_id = cq_id - - def __getitem__(self, key): - self.sq.put ( (self.cq_id,0,key) ) - + + def __len__(self): + self.sq.put ( (self.cq_id,0) ) + while True: if not self.cq.empty(): return self.cq.get() time.sleep(0.001) - def __len__(self): - self.sq.put ( (self.cq_id,1) ) - + def __getitem__(self, key): + self.sq.put ( (self.cq_id,1,key) ) + while True: if not self.cq.empty(): return self.cq.get() - time.sleep(0.001) - -class DictHost(): - def __init__(self, d, num_users): + time.sleep(0.001) + + def multi_get(self, keys): + self.sq.put ( (self.cq_id,2,keys) ) + + while True: + if not self.cq.empty(): + return self.cq.get() + time.sleep(0.001) + + def insert(self, index, item): + self.sq.put ( (self.cq_id,3,index,item) ) + + def append(self, item): + self.sq.put ( (self.cq_id,4,item) ) + + def extend(self, items): + self.sq.put ( (self.cq_id,5,items) ) + + + +class DictHost(): + def __init__(self, d, num_users): self.sqs = [ multiprocessing.Queue() for _ in range(num_users) ] self.cqs = [ multiprocessing.Queue() for _ in range(num_users) ] - + self.thread = threading.Thread(target=self.host_thread, args=(d,) ) self.thread.daemon = True self.thread.start() - + self.clis = [ DictHostCli(sq,cq) for sq, cq in zip(self.sqs, self.cqs) ] - + def host_thread(self, d): - while True: + while True: for sq, cq in zip(self.sqs, self.cqs): if not sq.empty(): obj = sq.get() @@ -235,32 +272,32 @@ class DictHost(): cq.put (d[ obj[1] ]) elif cmd == 1: cq.put ( list(d.keys()) ) - + time.sleep(0.005) - - + + def get_cli(self, n_user): return self.clis[n_user] - + # disable pickling def __getstate__(self): return dict() def __setstate__(self, d): self.__dict__.update(d) - + class DictHostCli(): def __init__(self, sq, cq): self.sq = sq self.cq = cq - + def __getitem__(self, key): self.sq.put ( (0,key) ) - + while True: if not self.cq.empty(): return self.cq.get() time.sleep(0.001) - + def keys(self): self.sq.put ( (1,) ) while True: