import multiprocessing import operator import traceback from pathlib import Path import samplelib.PackedFaceset from DFLIMG import * from facelib import FaceType, LandmarksProcessor from interact import interact as io from joblib import Subprocessor from utils import Path_utils, mp_utils from .Sample import Sample, SampleType class SampleHost: samples_cache = dict() host_cache = dict() @staticmethod def get_person_id_max_count(samples_path): samples = None try: samples = samplelib.PackedFaceset.load(samples_path) except: io.log_err(f"Error occured while loading samplelib.PackedFaceset.load {str(samples_dat_path)}, {traceback.format_exc()}") if samples is None: raise ValueError("packed faceset not found.") persons_name_idxs = {} for sample in samples: persons_name_idxs[sample.person_name] = 0 return len(list(persons_name_idxs.keys())) @staticmethod def load(sample_type, samples_path): samples_cache = SampleHost.samples_cache if str(samples_path) not in samples_cache.keys(): samples_cache[str(samples_path)] = [None]*SampleType.QTY samples = samples_cache[str(samples_path)] if sample_type == SampleType.IMAGE: if samples[sample_type] is None: samples[sample_type] = [ Sample(filename=filename) for filename in io.progress_bar_generator( Path_utils.get_image_paths(samples_path), "Loading") ] elif sample_type == SampleType.FACE: if samples[sample_type] is None: result = None try: result = samplelib.PackedFaceset.load(samples_path) except: io.log_err(f"Error occured while loading samplelib.PackedFaceset.load {str(samples_dat_path)}, {traceback.format_exc()}") if result is not None: io.log_info (f"Loaded {len(result)} packed faces from {samples_path}") if result is None: result = SampleHost.load_face_samples( Path_utils.get_image_paths(samples_path) ) samples[sample_type] = result elif sample_type == SampleType.FACE_TEMPORAL_SORTED: if samples[sample_type] is None: samples[sample_type] = SampleHost.upgradeToFaceTemporalSortedSamples( SampleHost.load(SampleType.FACE, samples_path) ) return samples[sample_type] @staticmethod def mp_host(sample_type, samples_path): result = SampleHost.load (sample_type, samples_path) host_cache = SampleHost.host_cache if str(samples_path) not in host_cache.keys(): host_cache[str(samples_path)] = [None]*SampleType.QTY hosts = host_cache[str(samples_path)] if hosts[sample_type] is None: hosts[sample_type] = mp_utils.ListHost(result) return hosts[sample_type] @staticmethod def load_face_samples ( image_paths): result = FaceSamplesLoaderSubprocessor(image_paths).run() sample_list = [] for filename, \ ( face_type, shape, landmarks, ie_polys, eyebrows_expand_mod, source_filename, ) in io.progress_bar_generator(result, "Processing"): sample_list.append( Sample(filename=filename, sample_type=SampleType.FACE, face_type=FaceType.fromString (face_type), shape=shape, landmarks=landmarks, ie_polys=ie_polys, eyebrows_expand_mod=eyebrows_expand_mod, source_filename=source_filename, )) return sample_list """ sample_list = [] for filename in io.progress_bar_generator(image_paths, "Loading"): filename_path = Path(filename) try: dflimg = DFLIMG.load (filename_path) sample_list.append( Sample(filename=filename, sample_type=SampleType.FACE, face_type=FaceType.fromString (dflimg.get_face_type()), shape=dflimg.get_shape(), landmarks=dflimg.get_landmarks(), ie_polys=dflimg.get_ie_polys(), eyebrows_expand_mod=dflimg.get_eyebrows_expand_mod(), source_filename=dflimg.get_source_filename(), )) except: io.log_err ("Unable to load %s , error: %s" % (filename, traceback.format_exc() ) ) return sample_list """ @staticmethod def upgradeToFaceTemporalSortedSamples( samples ): new_s = [ (s, s.source_filename) for s in samples] new_s = sorted(new_s, key=operator.itemgetter(1)) return [ s[0] for s in new_s] class FaceSamplesLoaderSubprocessor(Subprocessor): #override def __init__(self, image_paths ): self.image_paths = image_paths self.image_paths_len = len(image_paths) self.idxs = [*range(self.image_paths_len)] self.result = [None]*self.image_paths_len super().__init__('FaceSamplesLoader', FaceSamplesLoaderSubprocessor.Cli, 60, initialize_subprocesses_in_serial=False) #override def on_clients_initialized(self): io.progress_bar ("Loading", len (self.image_paths)) #override def on_clients_finalized(self): io.progress_bar_close() #override def process_info_generator(self): for i in range(min(multiprocessing.cpu_count(), 8) ): yield 'CPU%d' % (i), {}, {'device_idx': i, 'device_name': 'CPU%d' % (i), } #override def get_data(self, host_dict): if len (self.idxs) > 0: idx = self.idxs.pop(0) return idx, self.image_paths[idx] return None #override def on_data_return (self, host_dict, data): self.idxs.insert(0, data[0]) #override def on_result (self, host_dict, data, result): idx, dflimg = result self.result[idx] = (self.image_paths[idx], dflimg) io.progress_bar_inc(1) #override def get_result(self): return self.result class Cli(Subprocessor.Cli): #override def on_initialize(self, client_dict): self.log_info ('Running on %s.' % (client_dict['device_name']) ) #override def process_data(self, data): idx, filename = data dflimg = DFLIMG.load (Path(filename)) if dflimg is None: self.log_err (f"FaceSamplesLoader: {filename} is not a dfl image file.") data = None else: data = (dflimg.get_face_type(), dflimg.get_shape(), dflimg.get_landmarks(), dflimg.get_ie_polys(), dflimg.get_eyebrows_expand_mod(), dflimg.get_source_filename() ) return idx, data #override def get_data_name (self, data): #return string identificator of your data return data[1]