mirror of
https://github.com/iperov/DeepFaceLab.git
synced 2025-07-06 04:52:13 -07:00
5.XSeg) data_dst/src mask for XSeg trainer - fetch.bat Copies faces containing XSeg polygons to aligned_xseg\ dir. Useful only if you want to collect labeled faces and reuse them in other fakes. Now you can use trained XSeg mask in the SAEHD training process. It’s mean default ‘full_face’ mask obtained from landmarks will be replaced with the mask obtained from the trained XSeg model. use 5.XSeg.optional) trained mask for data_dst/data_src - apply.bat 5.XSeg.optional) trained mask for data_dst/data_src - remove.bat Normally you don’t need it. You can use it, if you want to use ‘face_style’ and ‘bg_style’ with obstructions. XSeg trainer : now you can choose type of face XSeg trainer : now you can restart training in “override settings” Merger: XSeg-* modes now can be used with all types of faces. Therefore old MaskEditor, FANSEG models, and FAN-x modes have been removed, because the new XSeg solution is better, simpler and more convenient, which costs only 1 hour of manual masking for regular deepfake.
413 lines
14 KiB
Python
413 lines
14 KiB
Python
import traceback
|
|
import json
|
|
import multiprocessing
|
|
import shutil
|
|
from pathlib import Path
|
|
import cv2
|
|
import numpy as np
|
|
|
|
from core import imagelib, pathex
|
|
from core.cv2ex import *
|
|
from core.interact import interact as io
|
|
from core.joblib import Subprocessor
|
|
from core.leras import nn
|
|
from DFLIMG import *
|
|
from facelib import FaceType, LandmarksProcessor
|
|
|
|
from . import Extractor, Sorter
|
|
from .Extractor import ExtractSubprocessor
|
|
|
|
|
|
def extract_vggface2_dataset(input_dir, device_args={} ):
|
|
multi_gpu = device_args.get('multi_gpu', False)
|
|
cpu_only = device_args.get('cpu_only', False)
|
|
|
|
input_path = Path(input_dir)
|
|
if not input_path.exists():
|
|
raise ValueError('Input directory not found. Please ensure it exists.')
|
|
|
|
bb_csv = input_path / 'loose_bb_train.csv'
|
|
if not bb_csv.exists():
|
|
raise ValueError('loose_bb_train.csv found. Please ensure it exists.')
|
|
|
|
bb_lines = bb_csv.read_text().split('\n')
|
|
bb_lines.pop(0)
|
|
|
|
bb_dict = {}
|
|
for line in bb_lines:
|
|
name, l, t, w, h = line.split(',')
|
|
name = name[1:-1]
|
|
l, t, w, h = [ int(x) for x in (l, t, w, h) ]
|
|
bb_dict[name] = (l,t,w, h)
|
|
|
|
|
|
output_path = input_path.parent / (input_path.name + '_out')
|
|
|
|
dir_names = pathex.get_all_dir_names(input_path)
|
|
|
|
if not output_path.exists():
|
|
output_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
data = []
|
|
for dir_name in io.progress_bar_generator(dir_names, "Collecting"):
|
|
cur_input_path = input_path / dir_name
|
|
cur_output_path = output_path / dir_name
|
|
|
|
if not cur_output_path.exists():
|
|
cur_output_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
input_path_image_paths = pathex.get_image_paths(cur_input_path)
|
|
|
|
for filename in input_path_image_paths:
|
|
filename_path = Path(filename)
|
|
|
|
name = filename_path.parent.name + '/' + filename_path.stem
|
|
if name not in bb_dict:
|
|
continue
|
|
|
|
l,t,w,h = bb_dict[name]
|
|
if min(w,h) < 128:
|
|
continue
|
|
|
|
data += [ ExtractSubprocessor.Data(filename=filename,rects=[ (l,t,l+w,t+h) ], landmarks_accurate=False, force_output_path=cur_output_path ) ]
|
|
|
|
face_type = FaceType.fromString('full_face')
|
|
|
|
io.log_info ('Performing 2nd pass...')
|
|
data = ExtractSubprocessor (data, 'landmarks', 256, face_type, debug_dir=None, multi_gpu=multi_gpu, cpu_only=cpu_only, manual=False).run()
|
|
|
|
io.log_info ('Performing 3rd pass...')
|
|
ExtractSubprocessor (data, 'final', 256, face_type, debug_dir=None, multi_gpu=multi_gpu, cpu_only=cpu_only, manual=False, final_output_path=None).run()
|
|
|
|
|
|
"""
|
|
import code
|
|
code.interact(local=dict(globals(), **locals()))
|
|
|
|
data_len = len(data)
|
|
i = 0
|
|
while i < data_len-1:
|
|
i_name = Path(data[i].filename).parent.name
|
|
|
|
sub_data = []
|
|
|
|
for j in range (i, data_len):
|
|
j_name = Path(data[j].filename).parent.name
|
|
if i_name == j_name:
|
|
sub_data += [ data[j] ]
|
|
else:
|
|
break
|
|
i = j
|
|
|
|
cur_output_path = output_path / i_name
|
|
|
|
io.log_info (f"Processing: {str(cur_output_path)}, {i}/{data_len} ")
|
|
|
|
if not cur_output_path.exists():
|
|
cur_output_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for dir_name in dir_names:
|
|
|
|
cur_input_path = input_path / dir_name
|
|
cur_output_path = output_path / dir_name
|
|
|
|
input_path_image_paths = pathex.get_image_paths(cur_input_path)
|
|
l = len(input_path_image_paths)
|
|
#if l < 250 or l > 350:
|
|
# continue
|
|
|
|
io.log_info (f"Processing: {str(cur_input_path)} ")
|
|
|
|
if not cur_output_path.exists():
|
|
cur_output_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
data = []
|
|
for filename in input_path_image_paths:
|
|
filename_path = Path(filename)
|
|
|
|
name = filename_path.parent.name + '/' + filename_path.stem
|
|
if name not in bb_dict:
|
|
continue
|
|
|
|
bb = bb_dict[name]
|
|
l,t,w,h = bb
|
|
if min(w,h) < 128:
|
|
continue
|
|
|
|
data += [ ExtractSubprocessor.Data(filename=filename,rects=[ (l,t,l+w,t+h) ], landmarks_accurate=False ) ]
|
|
|
|
|
|
|
|
io.log_info ('Performing 2nd pass...')
|
|
data = ExtractSubprocessor (data, 'landmarks', 256, face_type, debug_dir=None, multi_gpu=False, cpu_only=False, manual=False).run()
|
|
|
|
io.log_info ('Performing 3rd pass...')
|
|
data = ExtractSubprocessor (data, 'final', 256, face_type, debug_dir=None, multi_gpu=False, cpu_only=False, manual=False, final_output_path=cur_output_path).run()
|
|
|
|
|
|
io.log_info (f"Sorting: {str(cur_output_path)} ")
|
|
Sorter.main (input_path=str(cur_output_path), sort_by_method='hist')
|
|
|
|
import code
|
|
code.interact(local=dict(globals(), **locals()))
|
|
|
|
#try:
|
|
# io.log_info (f"Removing: {str(cur_input_path)} ")
|
|
# shutil.rmtree(cur_input_path)
|
|
#except:
|
|
# io.log_info (f"unable to remove: {str(cur_input_path)} ")
|
|
|
|
|
|
|
|
|
|
def extract_vggface2_dataset(input_dir, device_args={} ):
|
|
multi_gpu = device_args.get('multi_gpu', False)
|
|
cpu_only = device_args.get('cpu_only', False)
|
|
|
|
input_path = Path(input_dir)
|
|
if not input_path.exists():
|
|
raise ValueError('Input directory not found. Please ensure it exists.')
|
|
|
|
output_path = input_path.parent / (input_path.name + '_out')
|
|
|
|
dir_names = pathex.get_all_dir_names(input_path)
|
|
|
|
if not output_path.exists():
|
|
output_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
for dir_name in dir_names:
|
|
|
|
cur_input_path = input_path / dir_name
|
|
cur_output_path = output_path / dir_name
|
|
|
|
l = len(pathex.get_image_paths(cur_input_path))
|
|
if l < 250 or l > 350:
|
|
continue
|
|
|
|
io.log_info (f"Processing: {str(cur_input_path)} ")
|
|
|
|
if not cur_output_path.exists():
|
|
cur_output_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
Extractor.main( str(cur_input_path),
|
|
str(cur_output_path),
|
|
detector='s3fd',
|
|
image_size=256,
|
|
face_type='full_face',
|
|
max_faces_from_image=1,
|
|
device_args=device_args )
|
|
|
|
io.log_info (f"Sorting: {str(cur_input_path)} ")
|
|
Sorter.main (input_path=str(cur_output_path), sort_by_method='hist')
|
|
|
|
try:
|
|
io.log_info (f"Removing: {str(cur_input_path)} ")
|
|
shutil.rmtree(cur_input_path)
|
|
except:
|
|
io.log_info (f"unable to remove: {str(cur_input_path)} ")
|
|
|
|
"""
|
|
|
|
#unused in end user workflow
|
|
def dev_test_68(input_dir ):
|
|
# process 68 landmarks dataset with .pts files
|
|
input_path = Path(input_dir)
|
|
if not input_path.exists():
|
|
raise ValueError('input_dir not found. Please ensure it exists.')
|
|
|
|
output_path = input_path.parent / (input_path.name+'_aligned')
|
|
|
|
io.log_info(f'Output dir is % {output_path}')
|
|
|
|
if output_path.exists():
|
|
output_images_paths = pathex.get_image_paths(output_path)
|
|
if len(output_images_paths) > 0:
|
|
io.input_bool("WARNING !!! \n %s contains files! \n They will be deleted. \n Press enter to continue." % (str(output_path)), False )
|
|
for filename in output_images_paths:
|
|
Path(filename).unlink()
|
|
else:
|
|
output_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
images_paths = pathex.get_image_paths(input_path)
|
|
|
|
for filepath in io.progress_bar_generator(images_paths, "Processing"):
|
|
filepath = Path(filepath)
|
|
|
|
|
|
pts_filepath = filepath.parent / (filepath.stem+'.pts')
|
|
if pts_filepath.exists():
|
|
pts = pts_filepath.read_text()
|
|
pts_lines = pts.split('\n')
|
|
|
|
lmrk_lines = None
|
|
for pts_line in pts_lines:
|
|
if pts_line == '{':
|
|
lmrk_lines = []
|
|
elif pts_line == '}':
|
|
break
|
|
else:
|
|
if lmrk_lines is not None:
|
|
lmrk_lines.append (pts_line)
|
|
|
|
if lmrk_lines is not None and len(lmrk_lines) == 68:
|
|
try:
|
|
lmrks = [ np.array ( lmrk_line.strip().split(' ') ).astype(np.float32).tolist() for lmrk_line in lmrk_lines]
|
|
except Exception as e:
|
|
print(e)
|
|
print(filepath)
|
|
continue
|
|
|
|
rect = LandmarksProcessor.get_rect_from_landmarks(lmrks)
|
|
|
|
output_filepath = output_path / (filepath.stem+'.jpg')
|
|
|
|
img = cv2_imread(filepath)
|
|
img = imagelib.normalize_channels(img, 3)
|
|
cv2_imwrite(output_filepath, img, [int(cv2.IMWRITE_JPEG_QUALITY), 95] )
|
|
|
|
raise Exception("unimplemented")
|
|
#DFLJPG.x(output_filepath, face_type=FaceType.toString(FaceType.MARK_ONLY),
|
|
# landmarks=lmrks,
|
|
# source_filename=filepath.name,
|
|
# source_rect=rect,
|
|
# source_landmarks=lmrks
|
|
# )
|
|
|
|
io.log_info("Done.")
|
|
|
|
#unused in end user workflow
|
|
def extract_umd_csv(input_file_csv,
|
|
face_type='full_face',
|
|
device_args={} ):
|
|
|
|
#extract faces from umdfaces.io dataset csv file with pitch,yaw,roll info.
|
|
multi_gpu = device_args.get('multi_gpu', False)
|
|
cpu_only = device_args.get('cpu_only', False)
|
|
face_type = FaceType.fromString(face_type)
|
|
|
|
input_file_csv_path = Path(input_file_csv)
|
|
if not input_file_csv_path.exists():
|
|
raise ValueError('input_file_csv not found. Please ensure it exists.')
|
|
|
|
input_file_csv_root_path = input_file_csv_path.parent
|
|
output_path = input_file_csv_path.parent / ('aligned_' + input_file_csv_path.name)
|
|
|
|
io.log_info("Output dir is %s." % (str(output_path)) )
|
|
|
|
if output_path.exists():
|
|
output_images_paths = pathex.get_image_paths(output_path)
|
|
if len(output_images_paths) > 0:
|
|
io.input_bool("WARNING !!! \n %s contains files! \n They will be deleted. \n Press enter to continue." % (str(output_path)), False )
|
|
for filename in output_images_paths:
|
|
Path(filename).unlink()
|
|
else:
|
|
output_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
try:
|
|
with open( str(input_file_csv_path), 'r') as f:
|
|
csv_file = f.read()
|
|
except Exception as e:
|
|
io.log_err("Unable to open or read file " + str(input_file_csv_path) + ": " + str(e) )
|
|
return
|
|
|
|
strings = csv_file.split('\n')
|
|
keys = strings[0].split(',')
|
|
keys_len = len(keys)
|
|
csv_data = []
|
|
for i in range(1, len(strings)):
|
|
values = strings[i].split(',')
|
|
if keys_len != len(values):
|
|
io.log_err("Wrong string in csv file, skipping.")
|
|
continue
|
|
|
|
csv_data += [ { keys[n] : values[n] for n in range(keys_len) } ]
|
|
|
|
data = []
|
|
for d in csv_data:
|
|
filename = input_file_csv_root_path / d['FILE']
|
|
|
|
|
|
x,y,w,h = float(d['FACE_X']), float(d['FACE_Y']), float(d['FACE_WIDTH']), float(d['FACE_HEIGHT'])
|
|
|
|
data += [ ExtractSubprocessor.Data(filename=filename, rects=[ [x,y,x+w,y+h] ]) ]
|
|
|
|
images_found = len(data)
|
|
faces_detected = 0
|
|
if len(data) > 0:
|
|
io.log_info ("Performing 2nd pass from csv file...")
|
|
data = ExtractSubprocessor (data, 'landmarks', multi_gpu=multi_gpu, cpu_only=cpu_only).run()
|
|
|
|
io.log_info ('Performing 3rd pass...')
|
|
data = ExtractSubprocessor (data, 'final', face_type, None, multi_gpu=multi_gpu, cpu_only=cpu_only, manual=False, final_output_path=output_path).run()
|
|
faces_detected += sum([d.faces_detected for d in data])
|
|
|
|
|
|
io.log_info ('-------------------------')
|
|
io.log_info ('Images found: %d' % (images_found) )
|
|
io.log_info ('Faces detected: %d' % (faces_detected) )
|
|
io.log_info ('-------------------------')
|
|
|
|
def dev_test1(input_dir):
|
|
input_path = Path(input_dir)
|
|
|
|
dir_names = pathex.get_all_dir_names(input_path)
|
|
|
|
for dir_name in io.progress_bar_generator(dir_names, desc="Processing"):
|
|
|
|
img_paths = pathex.get_image_paths (input_path / dir_name)
|
|
for filename in img_paths:
|
|
filepath = Path(filename)
|
|
|
|
dflimg = DFLIMG.x (filepath)
|
|
if dflimg is None:
|
|
raise ValueError
|
|
|
|
#dflimg.x(filename, person_name=dir_name)
|
|
|
|
#import code
|
|
#code.interact(local=dict(globals(), **locals()))
|
|
|
|
def dev_resave_pngs(input_dir):
|
|
input_path = Path(input_dir)
|
|
if not input_path.exists():
|
|
raise ValueError('input_dir not found. Please ensure it exists.')
|
|
|
|
images_paths = pathex.get_image_paths(input_path, image_extensions=['.png'], subdirs=True, return_Path_class=True)
|
|
|
|
for filepath in io.progress_bar_generator(images_paths,"Processing"):
|
|
cv2_imwrite(filepath, cv2_imread(filepath))
|
|
|
|
|
|
def dev_segmented_trash(input_dir):
|
|
input_path = Path(input_dir)
|
|
if not input_path.exists():
|
|
raise ValueError('input_dir not found. Please ensure it exists.')
|
|
|
|
output_path = input_path.parent / (input_path.name+'_trash')
|
|
output_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
images_paths = pathex.get_image_paths(input_path, return_Path_class=True)
|
|
|
|
trash_paths = []
|
|
for filepath in images_paths:
|
|
json_file = filepath.parent / (filepath.stem +'.json')
|
|
if not json_file.exists():
|
|
trash_paths.append(filepath)
|
|
|
|
for filepath in trash_paths:
|
|
|
|
try:
|
|
filepath.rename ( output_path / filepath.name )
|
|
except:
|
|
io.log_info ('fail to trashing %s' % (src.name) )
|
|
|