diff --git a/mainscripts/FacesetResizer.py b/mainscripts/FacesetResizer.py new file mode 100644 index 0000000..f2ec391 --- /dev/null +++ b/mainscripts/FacesetResizer.py @@ -0,0 +1,147 @@ +import multiprocessing +import shutil + +from core import pathex +from core.cv2ex import * +from core.interact import interact as io +from core.joblib import Subprocessor +from DFLIMG import * +from facelib import LandmarksProcessor, FaceType + + +class FacesetResizerSubprocessor(Subprocessor): + + #override + def __init__(self, image_paths, output_dirpath, image_size): + self.image_paths = image_paths + self.output_dirpath = output_dirpath + self.image_size = image_size + self.result = [] + + super().__init__('FacesetResizer', FacesetResizerSubprocessor.Cli, 600) + + #override + def on_clients_initialized(self): + io.progress_bar (None, len (self.image_paths)) + + #override + def on_clients_finalized(self): + io.progress_bar_close() + + #override + def process_info_generator(self): + base_dict = {'output_dirpath':self.output_dirpath, 'image_size':self.image_size} + + for device_idx in range( min(8, multiprocessing.cpu_count()) ): + client_dict = base_dict.copy() + device_name = f'CPU #{device_idx}' + client_dict['device_name'] = device_name + yield device_name, {}, client_dict + + #override + def get_data(self, host_dict): + if len (self.image_paths) > 0: + return self.image_paths.pop(0) + + #override + def on_data_return (self, host_dict, data): + self.image_paths.insert(0, data) + + #override + def on_result (self, host_dict, data, result): + io.progress_bar_inc(1) + if result[0] == 1: + self.result +=[ (result[1], result[2]) ] + + #override + def get_result(self): + return self.result + + class Cli(Subprocessor.Cli): + + #override + def on_initialize(self, client_dict): + self.output_dirpath = client_dict['output_dirpath'] + self.image_size = client_dict['image_size'] + self.log_info (f"Running on { client_dict['device_name'] }") + + #override + def process_data(self, filepath): + try: + dflimg = DFLIMG.load (filepath) + if dflimg is None or not dflimg.has_data(): + self.log_err (f"{filepath.name} is not a dfl image file") + else: + dfl_dict = dflimg.get_dict() + + img = cv2_imread(filepath) + h,w = img.shape[:2] + if h != w: + raise Exception(f'w != h in {filepath}') + + image_size = self.image_size + scale = w / image_size + + img = cv2.resize(img, (image_size, image_size), interpolation=cv2.INTER_LANCZOS4) + + output_filepath = self.output_dirpath / filepath.name + cv2_imwrite ( str(output_filepath), img, [int(cv2.IMWRITE_JPEG_QUALITY), 100] ) + + dflimg = DFLIMG.load (output_filepath) + dflimg.set_dict(dfl_dict) + + + lmrks = dflimg.get_landmarks() + lmrks /= scale + dflimg.set_landmarks(lmrks) + + seg_ie_polys = dflimg.get_seg_ie_polys() + seg_ie_polys.mult_points( 1.0 / scale) + dflimg.set_seg_ie_polys(seg_ie_polys) + + mat = dflimg.get_image_to_face_mat() + if mat is not None: + face_type = FaceType.fromString ( dflimg.get_face_type() ) + mat = LandmarksProcessor.get_transform_mat ( dflimg.get_source_landmarks(), image_size, face_type ) + dflimg.set_image_to_face_mat(mat) + dflimg.save() + + return (1, filepath, output_filepath) + except: + self.log_err (f"Exception occured while processing file {filepath}. Error: {traceback.format_exc()}") + + return (0, filepath, None) + +def process_folder ( dirpath): + + image_size = io.input_int(f"New image size", 512, valid_range=[256,2048]) + + + output_dirpath = dirpath.parent / (dirpath.name + '_resized') + output_dirpath.mkdir (exist_ok=True, parents=True) + + dirpath_parts = '/'.join( dirpath.parts[-2:]) + output_dirpath_parts = '/'.join( output_dirpath.parts[-2:] ) + io.log_info (f"Resizing faceset in {dirpath_parts}") + io.log_info ( f"Processing to {output_dirpath_parts}") + + output_images_paths = pathex.get_image_paths(output_dirpath) + if len(output_images_paths) > 0: + for filename in output_images_paths: + Path(filename).unlink() + + image_paths = [Path(x) for x in pathex.get_image_paths( dirpath )] + result = FacesetResizerSubprocessor ( image_paths, output_dirpath, image_size).run() + + is_merge = io.input_bool (f"\r\nMerge {output_dirpath_parts} to {dirpath_parts} ?", True) + if is_merge: + io.log_info (f"Copying processed files to {dirpath_parts}") + + for (filepath, output_filepath) in result: + try: + shutil.copy (output_filepath, filepath) + except: + pass + + io.log_info (f"Removing {output_dirpath_parts}") + shutil.rmtree(output_dirpath)