diff --git a/main.py b/main.py index 18b8528..03b4f46 100644 --- a/main.py +++ b/main.py @@ -66,8 +66,8 @@ if __name__ == "__main__": def process_dev_extract_umd_csv(arguments): os_utils.set_process_lowest_prio() - from mainscripts import Extractor - Extractor.extract_umd_csv( arguments.input_csv_file, + from mainscripts import dev_misc + dev_misc.extract_umd_csv( arguments.input_csv_file, device_args={'cpu_only' : arguments.cpu_only, 'multi_gpu' : arguments.multi_gpu, } diff --git a/mainscripts/Extractor.py b/mainscripts/Extractor.py index 8512f0e..0530ce8 100644 --- a/mainscripts/Extractor.py +++ b/mainscripts/Extractor.py @@ -789,105 +789,6 @@ def main(input_dir, faces_detected += sum([d.faces_detected for d in fix_data]) - io.log_info ('-------------------------') - io.log_info ('Images found: %d' % (images_found) ) - io.log_info ('Faces detected: %d' % (faces_detected) ) - io.log_info ('-------------------------') - -#unused in end user workflow -def extract_fanseg(input_dir, device_args={} ): - multi_gpu = device_args.get('multi_gpu', False) - cpu_only = device_args.get('cpu_only', False) - - input_path = Path(input_dir) - if not input_path.exists(): - raise ValueError('Input directory not found. Please ensure it exists.') - - paths_to_extract = [] - for filename in Path_utils.get_image_paths(input_path) : - filepath = Path(filename) - dflimg = DFLIMG.load ( filepath ) - if dflimg is not None: - paths_to_extract.append (filepath) - - paths_to_extract_len = len(paths_to_extract) - if paths_to_extract_len > 0: - io.log_info ("Performing extract fanseg for %d files..." % (paths_to_extract_len) ) - data = ExtractSubprocessor ([ ExtractSubprocessor.Data(filename) for filename in paths_to_extract ], 'fanseg', multi_gpu=multi_gpu, cpu_only=cpu_only).run() - -#unused in end user workflow -def extract_umd_csv(input_file_csv, - image_size=256, - face_type='full_face', - device_args={} ): - - #extract faces from umdfaces.io dataset csv file with pitch,yaw,roll info. - multi_gpu = device_args.get('multi_gpu', False) - cpu_only = device_args.get('cpu_only', False) - face_type = FaceType.fromString(face_type) - - input_file_csv_path = Path(input_file_csv) - if not input_file_csv_path.exists(): - raise ValueError('input_file_csv not found. Please ensure it exists.') - - input_file_csv_root_path = input_file_csv_path.parent - output_path = input_file_csv_path.parent / ('aligned_' + input_file_csv_path.name) - - io.log_info("Output dir is %s." % (str(output_path)) ) - - if output_path.exists(): - output_images_paths = Path_utils.get_image_paths(output_path) - if len(output_images_paths) > 0: - io.input_bool("WARNING !!! \n %s contains files! \n They will be deleted. \n Press enter to continue." % (str(output_path)), False ) - for filename in output_images_paths: - Path(filename).unlink() - else: - output_path.mkdir(parents=True, exist_ok=True) - - try: - with open( str(input_file_csv_path), 'r') as f: - csv_file = f.read() - except Exception as e: - io.log_err("Unable to open or read file " + str(input_file_csv_path) + ": " + str(e) ) - return - - strings = csv_file.split('\n') - keys = strings[0].split(',') - keys_len = len(keys) - csv_data = [] - for i in range(1, len(strings)): - values = strings[i].split(',') - if keys_len != len(values): - io.log_err("Wrong string in csv file, skipping.") - continue - - csv_data += [ { keys[n] : values[n] for n in range(keys_len) } ] - - data = [] - for d in csv_data: - filename = input_file_csv_root_path / d['FILE'] - - pitch, yaw, roll = float(d['PITCH']), float(d['YAW']), float(d['ROLL']) - if pitch < -90 or pitch > 90 or yaw < -90 or yaw > 90 or roll < -90 or roll > 90: - continue - - pitch_yaw_roll = pitch/90.0, yaw/90.0, roll/90.0 - - x,y,w,h = float(d['FACE_X']), float(d['FACE_Y']), float(d['FACE_WIDTH']), float(d['FACE_HEIGHT']) - - data += [ ExtractSubprocessor.Data(filename=filename, rects=[ [x,y,x+w,y+h] ], pitch_yaw_roll=pitch_yaw_roll) ] - - images_found = len(data) - faces_detected = 0 - if len(data) > 0: - io.log_info ("Performing 2nd pass from csv file...") - data = ExtractSubprocessor (data, 'landmarks', multi_gpu=multi_gpu, cpu_only=cpu_only).run() - - io.log_info ('Performing 3rd pass...') - data = ExtractSubprocessor (data, 'final', image_size, face_type, None, multi_gpu=multi_gpu, cpu_only=cpu_only, manual=False, final_output_path=output_path).run() - faces_detected += sum([d.faces_detected for d in data]) - - io.log_info ('-------------------------') io.log_info ('Images found: %d' % (images_found) ) io.log_info ('Faces detected: %d' % (faces_detected) ) diff --git a/mainscripts/dev_misc.py b/mainscripts/dev_misc.py index a9897d2..4d4f32a 100644 --- a/mainscripts/dev_misc.py +++ b/mainscripts/dev_misc.py @@ -368,3 +368,104 @@ def apply_celebamaskhq(input_dir ): #import code #code.interact(local=dict(globals(), **locals())) + + + +#unused in end user workflow +def extract_fanseg(input_dir, device_args={} ): + multi_gpu = device_args.get('multi_gpu', False) + cpu_only = device_args.get('cpu_only', False) + + input_path = Path(input_dir) + if not input_path.exists(): + raise ValueError('Input directory not found. Please ensure it exists.') + + paths_to_extract = [] + for filename in Path_utils.get_image_paths(input_path) : + filepath = Path(filename) + dflimg = DFLIMG.load ( filepath ) + if dflimg is not None: + paths_to_extract.append (filepath) + + paths_to_extract_len = len(paths_to_extract) + if paths_to_extract_len > 0: + io.log_info ("Performing extract fanseg for %d files..." % (paths_to_extract_len) ) + data = ExtractSubprocessor ([ ExtractSubprocessor.Data(filename) for filename in paths_to_extract ], 'fanseg', multi_gpu=multi_gpu, cpu_only=cpu_only).run() + +#unused in end user workflow +def extract_umd_csv(input_file_csv, + image_size=256, + face_type='full_face', + device_args={} ): + + #extract faces from umdfaces.io dataset csv file with pitch,yaw,roll info. + multi_gpu = device_args.get('multi_gpu', False) + cpu_only = device_args.get('cpu_only', False) + face_type = FaceType.fromString(face_type) + + input_file_csv_path = Path(input_file_csv) + if not input_file_csv_path.exists(): + raise ValueError('input_file_csv not found. Please ensure it exists.') + + input_file_csv_root_path = input_file_csv_path.parent + output_path = input_file_csv_path.parent / ('aligned_' + input_file_csv_path.name) + + io.log_info("Output dir is %s." % (str(output_path)) ) + + if output_path.exists(): + output_images_paths = Path_utils.get_image_paths(output_path) + if len(output_images_paths) > 0: + io.input_bool("WARNING !!! \n %s contains files! \n They will be deleted. \n Press enter to continue." % (str(output_path)), False ) + for filename in output_images_paths: + Path(filename).unlink() + else: + output_path.mkdir(parents=True, exist_ok=True) + + try: + with open( str(input_file_csv_path), 'r') as f: + csv_file = f.read() + except Exception as e: + io.log_err("Unable to open or read file " + str(input_file_csv_path) + ": " + str(e) ) + return + + strings = csv_file.split('\n') + keys = strings[0].split(',') + keys_len = len(keys) + csv_data = [] + for i in range(1, len(strings)): + values = strings[i].split(',') + if keys_len != len(values): + io.log_err("Wrong string in csv file, skipping.") + continue + + csv_data += [ { keys[n] : values[n] for n in range(keys_len) } ] + + data = [] + for d in csv_data: + filename = input_file_csv_root_path / d['FILE'] + + #pitch, yaw, roll = float(d['PITCH']), float(d['YAW']), float(d['ROLL']) + #if pitch < -90 or pitch > 90 or yaw < -90 or yaw > 90 or roll < -90 or roll > 90: + # continue + # + #pitch_yaw_roll = pitch/90.0, yaw/90.0, roll/90.0 + + x,y,w,h = float(d['FACE_X']), float(d['FACE_Y']), float(d['FACE_WIDTH']), float(d['FACE_HEIGHT']) + + data += [ ExtractSubprocessor.Data(filename=filename, rects=[ [x,y,x+w,y+h] ]) ] + + images_found = len(data) + faces_detected = 0 + if len(data) > 0: + io.log_info ("Performing 2nd pass from csv file...") + data = ExtractSubprocessor (data, 'landmarks', multi_gpu=multi_gpu, cpu_only=cpu_only).run() + + io.log_info ('Performing 3rd pass...') + data = ExtractSubprocessor (data, 'final', image_size, face_type, None, multi_gpu=multi_gpu, cpu_only=cpu_only, manual=False, final_output_path=output_path).run() + faces_detected += sum([d.faces_detected for d in data]) + + + io.log_info ('-------------------------') + io.log_info ('Images found: %d' % (images_found) ) + io.log_info ('Faces detected: %d' % (faces_detected) ) + io.log_info ('-------------------------')