mirror of
https://github.com/iperov/DeepFaceLab.git
synced 2025-07-06 13:02:15 -07:00
649 lines
28 KiB
Python
649 lines
28 KiB
Python
import math
|
|
import multiprocessing
|
|
import operator
|
|
import os
|
|
import shutil
|
|
import sys
|
|
import time
|
|
import traceback
|
|
from pathlib import Path
|
|
|
|
import cv2
|
|
import numpy as np
|
|
import numpy.linalg as npla
|
|
|
|
import imagelib
|
|
from converters import FrameInfo, ConverterConfig, ConvertMasked, ConvertFaceAvatar
|
|
from facelib import FaceType, FANSegmentator, LandmarksProcessor
|
|
from interact import interact as io
|
|
from joblib import SubprocessFunctionCaller, Subprocessor
|
|
from utils import Path_utils
|
|
from utils.cv2_utils import *
|
|
from utils.DFLJPG import DFLJPG
|
|
from utils.DFLPNG import DFLPNG
|
|
|
|
from .ConverterScreen import Screen, ScreenManager
|
|
|
|
CONVERTER_DEBUG = False
|
|
|
|
class ConvertSubprocessor(Subprocessor):
|
|
|
|
class Frame(object):
|
|
def __init__(self, prev_temporal_frame_infos=None,
|
|
frame_info=None,
|
|
next_temporal_frame_infos=None):
|
|
self.prev_temporal_frame_infos = prev_temporal_frame_infos
|
|
self.frame_info = frame_info
|
|
self.next_temporal_frame_infos = next_temporal_frame_infos
|
|
self.output_filename = None
|
|
|
|
self.idx = None
|
|
self.cfg = None
|
|
self.is_done = False
|
|
self.is_processing = False
|
|
self.is_shown = False
|
|
self.image = None
|
|
|
|
class ProcessingFrame(object):
|
|
def __init__(self, idx=None,
|
|
cfg=None,
|
|
prev_temporal_frame_infos=None,
|
|
frame_info=None,
|
|
next_temporal_frame_infos=None,
|
|
output_filename=None,
|
|
need_return_image = False):
|
|
self.idx = idx
|
|
self.cfg = cfg
|
|
self.prev_temporal_frame_infos = prev_temporal_frame_infos
|
|
self.frame_info = frame_info
|
|
self.next_temporal_frame_infos = next_temporal_frame_infos
|
|
self.output_filename = output_filename
|
|
|
|
self.need_return_image = need_return_image
|
|
if self.need_return_image:
|
|
self.image = None
|
|
|
|
class Cli(Subprocessor.Cli):
|
|
|
|
#override
|
|
def on_initialize(self, client_dict):
|
|
self.log_info ('Running on %s.' % (client_dict['device_name']) )
|
|
self.device_idx = client_dict['device_idx']
|
|
self.device_name = client_dict['device_name']
|
|
self.predictor_func = client_dict['predictor_func']
|
|
self.dcscn_upscale_func = client_dict['dcscn_upscale_func']
|
|
|
|
#transfer and set stdin in order to work code.interact in debug subprocess
|
|
stdin_fd = client_dict['stdin_fd']
|
|
if stdin_fd is not None:
|
|
sys.stdin = os.fdopen(stdin_fd)
|
|
|
|
from nnlib import nnlib
|
|
#model process ate all GPU mem,
|
|
#so we cannot use GPU for any TF operations in converter processes
|
|
#therefore forcing active_DeviceConfig to CPU only
|
|
nnlib.active_DeviceConfig = nnlib.DeviceConfig (cpu_only=True)
|
|
|
|
self.fanseg_by_face_type = {}
|
|
self.fanseg_input_size = 256
|
|
|
|
def fanseg_extract(face_type, *args, **kwargs):
|
|
fanseg = self.fanseg_by_face_type.get(face_type, None)
|
|
if self.fanseg_by_face_type.get(face_type, None) is None:
|
|
fanseg = FANSegmentator( self.fanseg_input_size , FaceType.toString( face_type ) )
|
|
self.fanseg_by_face_type[face_type] = fanseg
|
|
|
|
return fanseg.extract(*args, **kwargs)
|
|
|
|
self.fanseg_extract_func = fanseg_extract
|
|
|
|
return None
|
|
|
|
#override
|
|
def process_data(self, pf): #pf=ProcessingFrame
|
|
cfg = pf.cfg.copy()
|
|
cfg.predictor_func = self.predictor_func
|
|
|
|
frame_info = pf.frame_info
|
|
|
|
filename = frame_info.filename
|
|
landmarks_list = frame_info.landmarks_list
|
|
|
|
filename_path = Path(filename)
|
|
output_filename = pf.output_filename
|
|
need_return_image = pf.need_return_image
|
|
|
|
if len(landmarks_list) == 0:
|
|
self.log_info ( 'no faces found for %s, copying without faces' % (filename_path.name) )
|
|
|
|
if filename_path.suffix == '.png':
|
|
shutil.copy (filename, output_filename )
|
|
else:
|
|
img_bgr = cv2_imread(filename)
|
|
cv2_imwrite (output_filename, img_bgr)
|
|
|
|
if need_return_image:
|
|
img_bgr = cv2_imread(filename)
|
|
pf.image = img_bgr
|
|
else:
|
|
if cfg.type == ConverterConfig.TYPE_MASKED:
|
|
cfg.dcscn_upscale_func = self.dcscn_upscale_func
|
|
cfg.fanseg_input_size = self.fanseg_input_size
|
|
cfg.fanseg_extract_func = self.fanseg_extract_func
|
|
|
|
try:
|
|
final_img = ConvertMasked (cfg, frame_info)
|
|
except Exception as e:
|
|
e_str = traceback.format_exc()
|
|
if 'MemoryError' in e_str:
|
|
raise Subprocessor.SilenceException
|
|
else:
|
|
raise Exception( 'Error while converting file [%s]: %s' % (filename, e_str) )
|
|
|
|
elif cfg.type == ConverterConfig.TYPE_FACE_AVATAR:
|
|
final_img = ConvertFaceAvatar (cfg, pf.prev_temporal_frame_infos,
|
|
pf.frame_info,
|
|
pf.next_temporal_frame_infos )
|
|
|
|
if output_filename is not None and final_img is not None:
|
|
cv2_imwrite (output_filename, final_img )
|
|
|
|
if need_return_image:
|
|
pf.image = final_img
|
|
|
|
return pf
|
|
|
|
#overridable
|
|
def get_data_name (self, pf):
|
|
#return string identificator of your data
|
|
return pf.frame_info.filename
|
|
|
|
#override
|
|
def __init__(self, is_interactive, converter_config, frames, output_path):
|
|
if len (frames) == 0:
|
|
raise ValueError ("len (frames) == 0")
|
|
|
|
super().__init__('Converter', ConvertSubprocessor.Cli, 86400 if CONVERTER_DEBUG else 60, io_loop_sleep_time=0.001, initialize_subprocesses_in_serial=False)# if debug == True else 60)
|
|
|
|
self.is_interactive = is_interactive
|
|
self.converter_config = converter_config
|
|
|
|
#dummy predict and sleep, tensorflow caching kernels. If remove it, sometime conversion speed can be x2 slower
|
|
self.converter_config.predictor_func (dummy_predict=True)
|
|
time.sleep(2)
|
|
|
|
self.predictor_func_host, self.predictor_func = SubprocessFunctionCaller.make_pair(self.converter_config.predictor_func)
|
|
self.converter_config.predictor_func = None
|
|
|
|
self.dcscn = None
|
|
def DCSCN_upscale(*args, **kwargs):
|
|
if self.dcscn is None:
|
|
self.dcscn = imagelib.DCSCN()
|
|
return self.dcscn.upscale(*args, **kwargs)
|
|
|
|
self.dcscn_host, self.dcscn_upscale_func = SubprocessFunctionCaller.make_pair(DCSCN_upscale)
|
|
|
|
self.frames = frames
|
|
self.output_path = output_path
|
|
self.prefetch_frame_count = self.process_count = min(6,multiprocessing.cpu_count())
|
|
|
|
self.frames_idxs = [ *range(len(self.frames)) ]
|
|
self.frames_done_idxs = []
|
|
|
|
for i in range( len(self.frames) ):
|
|
frame = self.frames[i]
|
|
frame.idx = i
|
|
frame.output_filename = self.output_path / ('%.5d.png' % i)
|
|
|
|
frames[0].cfg = self.converter_config.copy()
|
|
|
|
#override
|
|
def process_info_generator(self):
|
|
r = [0] if CONVERTER_DEBUG else range(self.process_count)
|
|
|
|
for i in r:
|
|
yield 'CPU%d' % (i), {}, {'device_idx': i,
|
|
'device_name': 'CPU%d' % (i),
|
|
'predictor_func': self.predictor_func,
|
|
'dcscn_upscale_func': self.dcscn_upscale_func,
|
|
'stdin_fd': sys.stdin.fileno() if CONVERTER_DEBUG else None
|
|
}
|
|
|
|
#overridable optional
|
|
def on_clients_initialized(self):
|
|
io.progress_bar ("Converting", len (self.frames_idxs) )
|
|
|
|
self.process_remain_frames = not self.is_interactive
|
|
self.is_interactive_quitting = not self.is_interactive
|
|
|
|
if self.is_interactive:
|
|
help_images = {
|
|
ConverterConfig.TYPE_MASKED : cv2_imread ( str(Path(__file__).parent / 'gfx' / 'help_converter_masked.jpg') ),
|
|
ConverterConfig.TYPE_FACE_AVATAR : cv2_imread ( str(Path(__file__).parent / 'gfx' / 'help_converter_face_avatar.jpg') ),
|
|
}
|
|
|
|
self.main_screen = Screen(initial_scale_to_width=1368, image=None, waiting_icon=True)
|
|
self.help_screen = Screen(initial_scale_to_height=768, image=help_images[self.converter_config.type], waiting_icon=False)
|
|
self.screen_manager = ScreenManager( "Converter", [self.main_screen, self.help_screen], capture_keys=True )
|
|
self.screen_manager.set_current (self.help_screen)
|
|
self.screen_manager.show_current()
|
|
|
|
#overridable optional
|
|
def on_clients_finalized(self):
|
|
|
|
if self.is_interactive:
|
|
self.screen_manager.finalize()
|
|
|
|
io.progress_bar_close()
|
|
|
|
cfg_change_keys = ['`','1', '2', '3', '4', '5', '6', '7', '8', '9',
|
|
'q', 'a', 'w', 's', 'e', 'd', 'r', 'f', 't', 'g','y','h',
|
|
'z', 'x', 'c', 'v', 'b' ]
|
|
#override
|
|
def on_tick(self):
|
|
self.predictor_func_host.process_messages()
|
|
self.dcscn_host.process_messages()
|
|
|
|
go_prev_frame = False
|
|
go_prev_frame_overriding_cfg = False
|
|
go_next_frame = self.process_remain_frames
|
|
go_next_frame_overriding_cfg = False
|
|
|
|
cur_frame = None
|
|
if len(self.frames_idxs) != 0:
|
|
cur_frame = self.frames[self.frames_idxs[0]]
|
|
|
|
if self.is_interactive:
|
|
self.main_screen.set_waiting_icon(False)
|
|
|
|
if not self.is_interactive_quitting and not self.process_remain_frames:
|
|
if cur_frame is not None:
|
|
if not cur_frame.is_shown:
|
|
if cur_frame.is_done:
|
|
cur_frame.is_shown = True
|
|
io.log_info (cur_frame.cfg)
|
|
|
|
if cur_frame.image is None:
|
|
cur_frame.image = cv2_imread ( cur_frame.output_filename)
|
|
if cur_frame.image is None:
|
|
cur_frame.is_done = False #unable to read? recompute then
|
|
cur_frame.is_shown = False
|
|
self.main_screen.set_image(cur_frame.image)
|
|
else:
|
|
self.main_screen.set_waiting_icon(True)
|
|
|
|
else:
|
|
self.main_screen.set_image(None)
|
|
else:
|
|
self.main_screen.set_image(None)
|
|
self.main_screen.set_waiting_icon(True)
|
|
|
|
self.screen_manager.show_current()
|
|
|
|
key_events = self.screen_manager.get_key_events()
|
|
key, chr_key, ctrl_pressed, alt_pressed, shift_pressed = key_events[-1] if len(key_events) > 0 else (0,0,False,False,False)
|
|
|
|
if key == 9: #tab
|
|
self.screen_manager.switch_screens()
|
|
else:
|
|
if key == 27: #esc
|
|
self.is_interactive_quitting = True
|
|
elif self.screen_manager.get_current() is self.main_screen:
|
|
if chr_key in self.cfg_change_keys:
|
|
self.process_remain_frames = False
|
|
|
|
if cur_frame is not None:
|
|
cfg = cur_frame.cfg
|
|
prev_cfg = cfg.copy()
|
|
|
|
if cfg.type == ConverterConfig.TYPE_MASKED:
|
|
if chr_key == '`':
|
|
cfg.set_mode(0)
|
|
elif key >= ord('1') and key <= ord('9'):
|
|
cfg.set_mode( key - ord('0') )
|
|
elif chr_key == 'q':
|
|
cfg.add_hist_match_threshold(1 if not shift_pressed else 5)
|
|
elif chr_key == 'a':
|
|
cfg.add_hist_match_threshold(-1 if not shift_pressed else -5)
|
|
elif chr_key == 'w':
|
|
cfg.add_erode_mask_modifier(1 if not shift_pressed else 5)
|
|
elif chr_key == 's':
|
|
cfg.add_erode_mask_modifier(-1 if not shift_pressed else -5)
|
|
elif chr_key == 'e':
|
|
cfg.add_blur_mask_modifier(1 if not shift_pressed else 5)
|
|
elif chr_key == 'd':
|
|
cfg.add_blur_mask_modifier(-1 if not shift_pressed else -5)
|
|
elif chr_key == 'r':
|
|
cfg.add_motion_blur_power(1 if not shift_pressed else 5)
|
|
elif chr_key == 'f':
|
|
cfg.add_motion_blur_power(-1 if not shift_pressed else -5)
|
|
elif chr_key == 't':
|
|
cfg.add_color_degrade_power(1 if not shift_pressed else 5)
|
|
elif chr_key == 'g':
|
|
cfg.add_color_degrade_power(-1 if not shift_pressed else -5)
|
|
elif chr_key == 'y':
|
|
cfg.add_output_face_scale(1 if not shift_pressed else 5)
|
|
elif chr_key == 'h':
|
|
cfg.add_output_face_scale(-1 if not shift_pressed else -5)
|
|
|
|
elif chr_key == 'z':
|
|
cfg.toggle_masked_hist_match()
|
|
elif chr_key == 'x':
|
|
cfg.toggle_mask_mode()
|
|
elif chr_key == 'c':
|
|
cfg.toggle_color_transfer_mode()
|
|
elif chr_key == 'v':
|
|
cfg.toggle_super_resolution()
|
|
elif chr_key == 'b':
|
|
cfg.toggle_export_mask_alpha()
|
|
else:
|
|
if chr_key == 's':
|
|
cfg.toggle_add_source_image()
|
|
|
|
if prev_cfg != cfg:
|
|
io.log_info (cfg)
|
|
cur_frame.is_done = False
|
|
cur_frame.is_shown = False
|
|
else:
|
|
if chr_key == ',' or chr_key == 'm':
|
|
self.process_remain_frames = False
|
|
go_prev_frame = True
|
|
go_prev_frame_overriding_cfg = chr_key == 'm'
|
|
elif chr_key == '.' or chr_key == '/':
|
|
self.process_remain_frames = False
|
|
go_next_frame = True
|
|
go_next_frame_overriding_cfg = chr_key == '/'
|
|
elif chr_key == '\r' or chr_key == '\n':
|
|
self.process_remain_frames = not self.process_remain_frames
|
|
elif chr_key == '-':
|
|
self.screen_manager.get_current().diff_scale(-0.1)
|
|
elif chr_key == '=':
|
|
self.screen_manager.get_current().diff_scale(0.1)
|
|
|
|
|
|
if go_prev_frame:
|
|
if cur_frame is not None and cur_frame.is_done:
|
|
cur_frame.image = None
|
|
|
|
|
|
if len(self.frames_done_idxs) > 0:
|
|
prev_frame = self.frames[self.frames_done_idxs.pop()]
|
|
self.frames_idxs.insert(0, prev_frame.idx)
|
|
prev_frame.is_shown = False
|
|
io.progress_bar_inc(-1)
|
|
|
|
if go_prev_frame_overriding_cfg:
|
|
if cur_frame is not None:
|
|
if prev_frame.cfg != cur_frame.cfg:
|
|
prev_frame.cfg = cur_frame.cfg
|
|
prev_frame.is_done = False
|
|
|
|
elif go_next_frame:
|
|
if cur_frame is not None and cur_frame.is_done:
|
|
cur_frame.image = None
|
|
cur_frame.is_shown = True
|
|
self.frames_done_idxs.append(cur_frame.idx)
|
|
self.frames_idxs.pop(0)
|
|
io.progress_bar_inc(1)
|
|
|
|
if len(self.frames_idxs) != 0:
|
|
next_frame = self.frames[ self.frames_idxs[0] ]
|
|
|
|
if go_next_frame_overriding_cfg:
|
|
f = self.frames
|
|
for i in range( next_frame.idx, len(self.frames) ):
|
|
f[i].cfg = None
|
|
f[i].is_shown = False
|
|
|
|
if next_frame.cfg is None or next_frame.is_shown == False: #next frame is never shown or override current cfg to next frames and the prefetches
|
|
for i in range( min(len(self.frames_idxs), self.prefetch_frame_count) ):
|
|
frame = self.frames[ self.frames_idxs[i] ]
|
|
|
|
if frame.cfg is None or frame.cfg != cur_frame.cfg:
|
|
frame.cfg = cur_frame.cfg.copy()
|
|
frame.is_done = False #initiate solve again
|
|
|
|
|
|
next_frame.is_shown = False
|
|
|
|
if len(self.frames_idxs) == 0:
|
|
self.process_remain_frames = False
|
|
|
|
return (self.is_interactive and self.is_interactive_quitting) or \
|
|
(not self.is_interactive and self.process_remain_frames == False)
|
|
|
|
|
|
#override
|
|
def on_data_return (self, host_dict, pf):
|
|
frame = self.frames[pf.idx]
|
|
frame.is_done = False
|
|
frame.is_processing = False
|
|
|
|
#override
|
|
def on_result (self, host_dict, pf_sent, pf_result):
|
|
frame = self.frames[pf_result.idx]
|
|
frame.is_processing = False
|
|
if frame.cfg == pf_result.cfg:
|
|
frame.is_done = True
|
|
frame.image = pf_result.image
|
|
|
|
#override
|
|
def get_data(self, host_dict):
|
|
if self.is_interactive and self.is_interactive_quitting:
|
|
return None
|
|
|
|
for i in range ( min(len(self.frames_idxs), self.prefetch_frame_count) ):
|
|
frame = self.frames[ self.frames_idxs[i] ]
|
|
|
|
if not frame.is_done and not frame.is_processing and frame.cfg is not None:
|
|
frame.is_processing = True
|
|
return ConvertSubprocessor.ProcessingFrame(idx=frame.idx,
|
|
cfg=frame.cfg.copy(),
|
|
prev_temporal_frame_infos=frame.prev_temporal_frame_infos,
|
|
frame_info=frame.frame_info,
|
|
next_temporal_frame_infos=frame.next_temporal_frame_infos,
|
|
output_filename=frame.output_filename,
|
|
need_return_image=True )
|
|
|
|
return None
|
|
|
|
#override
|
|
def get_result(self):
|
|
return 0
|
|
|
|
def main (args, device_args):
|
|
io.log_info ("Running converter.\r\n")
|
|
|
|
aligned_dir = args.get('aligned_dir', None)
|
|
avaperator_aligned_dir = args.get('avaperator_aligned_dir', None)
|
|
|
|
try:
|
|
input_path = Path(args['input_dir'])
|
|
output_path = Path(args['output_dir'])
|
|
model_path = Path(args['model_dir'])
|
|
|
|
if not input_path.exists():
|
|
io.log_err('Input directory not found. Please ensure it exists.')
|
|
return
|
|
|
|
if output_path.exists():
|
|
for filename in Path_utils.get_image_paths(output_path):
|
|
Path(filename).unlink()
|
|
else:
|
|
output_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
if not model_path.exists():
|
|
io.log_err('Model directory not found. Please ensure it exists.')
|
|
return
|
|
|
|
is_interactive = io.input_bool ("Use interactive converter? (y/n skip:y) : ", True) if not io.is_colab() else False
|
|
|
|
import models
|
|
model = models.import_model( args['model_name'] )(model_path, device_args=device_args)
|
|
|
|
cfg = model.get_ConverterConfig()
|
|
|
|
if not is_interactive:
|
|
cfg.ask_settings()
|
|
|
|
input_path_image_paths = Path_utils.get_image_paths(input_path)
|
|
|
|
if cfg.type == ConverterConfig.TYPE_MASKED:
|
|
if aligned_dir is None:
|
|
io.log_err('Aligned directory not found. Please ensure it exists.')
|
|
return
|
|
|
|
aligned_path = Path(aligned_dir)
|
|
if not aligned_path.exists():
|
|
io.log_err('Aligned directory not found. Please ensure it exists.')
|
|
return
|
|
|
|
alignments = {}
|
|
multiple_faces_detected = False
|
|
aligned_path_image_paths = Path_utils.get_image_paths(aligned_path)
|
|
for filepath in io.progress_bar_generator(aligned_path_image_paths, "Collecting alignments"):
|
|
filepath = Path(filepath)
|
|
|
|
if filepath.suffix == '.png':
|
|
dflimg = DFLPNG.load( str(filepath) )
|
|
elif filepath.suffix == '.jpg':
|
|
dflimg = DFLJPG.load ( str(filepath) )
|
|
else:
|
|
dflimg = None
|
|
|
|
if dflimg is None:
|
|
io.log_err ("%s is not a dfl image file" % (filepath.name) )
|
|
continue
|
|
|
|
source_filename_stem = Path( dflimg.get_source_filename() ).stem
|
|
if source_filename_stem not in alignments.keys():
|
|
alignments[ source_filename_stem ] = []
|
|
|
|
alignments_ar = alignments[ source_filename_stem ]
|
|
alignments_ar.append (dflimg.get_source_landmarks())
|
|
if len(alignments_ar) > 1:
|
|
multiple_faces_detected = True
|
|
|
|
if multiple_faces_detected:
|
|
io.log_info ("Warning: multiple faces detected. Strongly recommended to process them separately.")
|
|
|
|
frames = [ ConvertSubprocessor.Frame( frame_info=FrameInfo(filename=p, landmarks_list=alignments.get(Path(p).stem, None))) for p in input_path_image_paths ]
|
|
|
|
if multiple_faces_detected:
|
|
io.log_info ("Warning: multiple faces detected. Motion blur will not be used.")
|
|
else:
|
|
s = 256
|
|
local_pts = [ (s//2-1, s//2-1), (s//2-1,0) ] #center+up
|
|
frames_len = len(frames)
|
|
for i in io.progress_bar_generator( range(len(frames)) , "Computing motion vectors"):
|
|
fi_prev = frames[max(0, i-1)].frame_info
|
|
fi = frames[i].frame_info
|
|
fi_next = frames[min(i+1, frames_len-1)].frame_info
|
|
if len(fi_prev.landmarks_list) == 0 or \
|
|
len(fi.landmarks_list) == 0 or \
|
|
len(fi_next.landmarks_list) == 0:
|
|
continue
|
|
|
|
mat_prev = LandmarksProcessor.get_transform_mat ( fi_prev.landmarks_list[0], s, face_type=FaceType.FULL)
|
|
mat = LandmarksProcessor.get_transform_mat ( fi.landmarks_list[0] , s, face_type=FaceType.FULL)
|
|
mat_next = LandmarksProcessor.get_transform_mat ( fi_next.landmarks_list[0], s, face_type=FaceType.FULL)
|
|
|
|
pts_prev = LandmarksProcessor.transform_points (local_pts, mat_prev, True)
|
|
pts = LandmarksProcessor.transform_points (local_pts, mat, True)
|
|
pts_next = LandmarksProcessor.transform_points (local_pts, mat_next, True)
|
|
|
|
prev_vector = pts[0]-pts_prev[0]
|
|
next_vector = pts_next[0]-pts[0]
|
|
|
|
motion_vector = pts_next[0] - pts_prev[0]
|
|
fi.motion_power = npla.norm(motion_vector)
|
|
|
|
motion_vector = motion_vector / fi.motion_power if fi.motion_power != 0 else np.array([0,0],dtype=np.float32)
|
|
|
|
fi.motion_deg = -math.atan2(motion_vector[1],motion_vector[0])*180 / math.pi
|
|
|
|
|
|
elif cfg.type == ConverterConfig.TYPE_FACE_AVATAR:
|
|
filesdata = []
|
|
for filepath in io.progress_bar_generator(input_path_image_paths, "Collecting info"):
|
|
filepath = Path(filepath)
|
|
|
|
if filepath.suffix == '.png':
|
|
dflimg = DFLPNG.load( str(filepath) )
|
|
elif filepath.suffix == '.jpg':
|
|
dflimg = DFLJPG.load ( str(filepath) )
|
|
else:
|
|
dflimg = None
|
|
|
|
if dflimg is None:
|
|
io.log_err ("%s is not a dfl image file" % (filepath.name) )
|
|
continue
|
|
filesdata += [ ( FrameInfo(filename=str(filepath), landmarks_list=[dflimg.get_landmarks()] ), dflimg.get_source_filename() ) ]
|
|
|
|
filesdata = sorted(filesdata, key=operator.itemgetter(1)) #sort by filename
|
|
frames = []
|
|
filesdata_len = len(filesdata)
|
|
for i in range(len(filesdata)):
|
|
frame_info = filesdata[i][0]
|
|
|
|
prev_temporal_frame_infos = []
|
|
next_temporal_frame_infos = []
|
|
|
|
for t in range (cfg.temporal_face_count):
|
|
prev_frame_info = filesdata[ max(i -t, 0) ][0]
|
|
next_frame_info = filesdata[ min(i +t, filesdata_len-1 )][0]
|
|
|
|
prev_temporal_frame_infos.insert (0, prev_frame_info )
|
|
next_temporal_frame_infos.append ( next_frame_info )
|
|
|
|
frames.append ( ConvertSubprocessor.Frame(prev_temporal_frame_infos=prev_temporal_frame_infos,
|
|
frame_info=frame_info,
|
|
next_temporal_frame_infos=next_temporal_frame_infos) )
|
|
|
|
if len(frames) == 0:
|
|
io.log_info ("No frames to convert in input_dir.")
|
|
else:
|
|
ConvertSubprocessor (
|
|
is_interactive = is_interactive,
|
|
converter_config = cfg,
|
|
frames = frames,
|
|
output_path = output_path,
|
|
).run()
|
|
|
|
model.finalize()
|
|
|
|
except Exception as e:
|
|
print ( 'Error: %s' % (str(e)))
|
|
traceback.print_exc()
|
|
|
|
#interpolate landmarks
|
|
#from facelib import LandmarksProcessor
|
|
#from facelib import FaceType
|
|
#a = sorted(alignments.keys())
|
|
#a_len = len(a)
|
|
#
|
|
#box_pts = 3
|
|
#box = np.ones(box_pts)/box_pts
|
|
#for i in range( a_len ):
|
|
# if i >= box_pts and i <= a_len-box_pts-1:
|
|
# af0 = alignments[ a[i] ][0] ##first face
|
|
# m0 = LandmarksProcessor.get_transform_mat (af0, 256, face_type=FaceType.FULL)
|
|
#
|
|
# points = []
|
|
#
|
|
# for j in range(-box_pts, box_pts+1):
|
|
# af = alignments[ a[i+j] ][0] ##first face
|
|
# m = LandmarksProcessor.get_transform_mat (af, 256, face_type=FaceType.FULL)
|
|
# p = LandmarksProcessor.transform_points (af, m)
|
|
# points.append (p)
|
|
#
|
|
# points = np.array(points)
|
|
# points_len = len(points)
|
|
# t_points = np.transpose(points, [1,0,2])
|
|
#
|
|
# p1 = np.array ( [ int(np.convolve(x[:,0], box, mode='same')[points_len//2]) for x in t_points ] )
|
|
# p2 = np.array ( [ int(np.convolve(x[:,1], box, mode='same')[points_len//2]) for x in t_points ] )
|
|
#
|
|
# new_points = np.concatenate( [np.expand_dims(p1,-1),np.expand_dims(p2,-1)], -1 )
|
|
#
|
|
# alignments[ a[i] ][0] = LandmarksProcessor.transform_points (new_points, m0, True).astype(np.int32)
|