trained FANSeg_256_full_face.h5,

new experimental models: AVATAR, RecycleGAN
This commit is contained in:
iperov 2019-04-17 23:45:08 +04:00
commit a0f38ac3e9
17 changed files with 1071 additions and 120 deletions

View file

@ -5,8 +5,9 @@ You can implement your own Converter, check example ConverterMasked.py
class Converter(object):
TYPE_FACE = 0 #calls convert_face
TYPE_IMAGE = 1 #calls convert_image without landmarks
TYPE_IMAGE_WITH_LANDMARKS = 2 #calls convert_image with landmarks
TYPE_FACE_AVATAR = 1 #calls convert_face with avatar_operator_face
TYPE_IMAGE = 2 #calls convert_image without landmarks
TYPE_IMAGE_WITH_LANDMARKS = 3 #calls convert_image with landmarks
#overridable
def __init__(self, predictor_func, type):
@ -23,13 +24,13 @@ class Converter(object):
pass
#overridable
def cli_convert_face (self, img_bgr, img_face_landmarks, debug):
def cli_convert_face (self, img_bgr, img_face_landmarks, debug, avaperator_face_bgr=None, **kwargs):
#return float32 image
#if debug , return tuple ( images of any size and channels, ...)
return image
#overridable
def convert_image (self, img_bgr, img_landmarks, debug):
def cli_convert_image (self, img_bgr, img_landmarks, debug):
#img_landmarks not None, if input image is png with embedded data
#return float32 image
#if debug , return tuple ( images of any size and channels, ...)

View file

@ -0,0 +1,70 @@
import time
import cv2
import numpy as np
from facelib import FaceType, LandmarksProcessor
from joblib import SubprocessFunctionCaller
from utils.pickle_utils import AntiPickler
from .Converter import Converter
class ConverterAvatar(Converter):
#override
def __init__(self, predictor_func,
predictor_input_size=0):
super().__init__(predictor_func, Converter.TYPE_FACE_AVATAR)
self.predictor_input_size = predictor_input_size
#dummy predict and sleep, tensorflow caching kernels. If remove it, conversion speed will be x2 slower
predictor_func ( np.zeros ( (predictor_input_size,predictor_input_size,3), dtype=np.float32 ),
np.zeros ( (predictor_input_size,predictor_input_size,1), dtype=np.float32 ) )
time.sleep(2)
predictor_func_host, predictor_func = SubprocessFunctionCaller.make_pair(predictor_func)
self.predictor_func_host = AntiPickler(predictor_func_host)
self.predictor_func = predictor_func
#overridable
def on_host_tick(self):
self.predictor_func_host.obj.process_messages()
#override
def cli_convert_face (self, img_bgr, img_face_landmarks, debug, avaperator_face_bgr=None, **kwargs):
if debug:
debugs = [img_bgr.copy()]
img_size = img_bgr.shape[1], img_bgr.shape[0]
img_face_mask_a = LandmarksProcessor.get_image_hull_mask (img_bgr.shape, img_face_landmarks)
img_face_mask_aaa = np.repeat(img_face_mask_a, 3, -1)
output_size = self.predictor_input_size
face_mat = LandmarksProcessor.get_transform_mat (img_face_landmarks, output_size, face_type=FaceType.FULL)
dst_face_mask_a_0 = cv2.warpAffine( img_face_mask_a, face_mat, (output_size, output_size), flags=cv2.INTER_CUBIC )
predictor_input_dst_face_mask_a_0 = cv2.resize (dst_face_mask_a_0, (self.predictor_input_size,self.predictor_input_size), cv2.INTER_CUBIC )
prd_inp_dst_face_mask_a = predictor_input_dst_face_mask_a_0[...,np.newaxis]
prd_inp_avaperator_face_bgr = cv2.resize (avaperator_face_bgr, (self.predictor_input_size,self.predictor_input_size), cv2.INTER_CUBIC )
prd_face_bgr = self.predictor_func ( prd_inp_avaperator_face_bgr, prd_inp_dst_face_mask_a )
out_img = img_bgr.copy()
out_img = cv2.warpAffine( prd_face_bgr, face_mat, img_size, out_img, cv2.WARP_INVERSE_MAP | cv2.INTER_LANCZOS4, cv2.BORDER_TRANSPARENT )
out_img = np.clip(out_img, 0.0, 1.0)
if debug:
debugs += [out_img.copy()]
out_img = np.clip( img_bgr*(1-img_face_mask_aaa) + (out_img*img_face_mask_aaa) , 0, 1.0 )
if debug:
debugs += [out_img.copy()]
return debugs if debug else out_img

View file

@ -1,40 +1,50 @@
from .Converter import Converter
from facelib import LandmarksProcessor
from facelib import FaceType
import time
import cv2
import numpy as np
'''
predictor_func:
input: [predictor_input_size, predictor_input_size, BGR]
output: [predictor_input_size, predictor_input_size, BGR]
'''
from facelib import FaceType, LandmarksProcessor
from joblib import SubprocessFunctionCaller
from utils.pickle_utils import AntiPickler
from .Converter import Converter
class ConverterImage(Converter):
#override
def __init__(self, predictor_func,
predictor_input_size=0,
output_size=0):
predictor_input_size=0):
super().__init__(predictor_func, Converter.TYPE_IMAGE)
self.predictor_input_size = predictor_input_size
self.output_size = output_size
#dummy predict and sleep, tensorflow caching kernels. If remove it, conversion speed will be x2 slower
predictor_func ( np.zeros ( (predictor_input_size,predictor_input_size,3), dtype=np.float32 ) )
time.sleep(2)
#override
def dummy_predict(self):
self.predictor_func ( np.zeros ( (self.predictor_input_size, self.predictor_input_size,3), dtype=np.float32) )
predictor_func_host, predictor_func = SubprocessFunctionCaller.make_pair(predictor_func)
self.predictor_func_host = AntiPickler(predictor_func_host)
self.predictor_func = predictor_func
#overridable
def on_host_tick(self):
self.predictor_func_host.obj.process_messages()
#override
def convert_image (self, img_bgr, img_landmarks, debug):
def cli_convert_image (self, img_bgr, img_landmarks, debug):
img_size = img_bgr.shape[1], img_bgr.shape[0]
predictor_input_bgr = cv2.resize ( img_bgr, (self.predictor_input_size, self.predictor_input_size), cv2.INTER_LANCZOS4 )
predicted_bgr = self.predictor_func ( predictor_input_bgr )
if debug:
debugs = [predictor_input_bgr]
output = self.predictor_func ( predictor_input_bgr )
output = cv2.resize ( predicted_bgr, (self.output_size, self.output_size), cv2.INTER_LANCZOS4 )
if debug:
return (predictor_input_bgr,output,)
return output
if debug:
debugs += [out_img.copy()]
return debugs if debug else output

View file

@ -30,7 +30,8 @@ class ConverterMasked(Converter):
base_blur_mask_modifier = 0,
default_erode_mask_modifier = 0,
default_blur_mask_modifier = 0,
clip_hborder_mask_per = 0):
clip_hborder_mask_per = 0,
force_mask_mode=-1):
super().__init__(predictor_func, Converter.TYPE_FACE)
@ -76,10 +77,13 @@ class ConverterMasked(Converter):
if self.mode == 'hist-match' or self.mode == 'hist-match-bw' or self.mode == 'seamless-hist-match':
self.hist_match_threshold = np.clip ( io.input_int("Hist match threshold [0..255] (skip:255) : ", 255), 0, 255)
if face_type == FaceType.FULL:
self.mask_mode = np.clip ( io.input_int ("Mask mode: (1) learned, (2) dst, (3) FAN-prd, (4) FAN-dst , (5) FAN-prd*FAN-dst (6) learned*FAN-prd*FAN-dst (?) help. Default - %d : " % (1) , 1, help_message="If you learned mask, then option 1 should be choosed. 'dst' mask is raw shaky mask from dst aligned images. 'FAN-prd' - using super smooth mask by pretrained FAN-model from predicted face. 'FAN-dst' - using super smooth mask by pretrained FAN-model from dst face. 'FAN-prd*FAN-dst' or 'learned*FAN-prd*FAN-dst' - using multiplied masks."), 1, 6 )
if force_mask_mode != -1:
self.mask_mode = force_mask_mode
else:
self.mask_mode = np.clip ( io.input_int ("Mask mode: (1) learned, (2) dst . Default - %d : " % (1) , 1), 1, 2 )
if face_type == FaceType.FULL:
self.mask_mode = np.clip ( io.input_int ("Mask mode: (1) learned, (2) dst, (3) FAN-prd, (4) FAN-dst , (5) FAN-prd*FAN-dst (6) learned*FAN-prd*FAN-dst (?) help. Default - %d : " % (1) , 1, help_message="If you learned mask, then option 1 should be choosed. 'dst' mask is raw shaky mask from dst aligned images. 'FAN-prd' - using super smooth mask by pretrained FAN-model from predicted face. 'FAN-dst' - using super smooth mask by pretrained FAN-model from dst face. 'FAN-prd*FAN-dst' or 'learned*FAN-prd*FAN-dst' - using multiplied masks."), 1, 6 )
else:
self.mask_mode = np.clip ( io.input_int ("Mask mode: (1) learned, (2) dst . Default - %d : " % (1) , 1), 1, 2 )
if self.mask_mode >= 3 and self.mask_mode <= 6:
self.fan_seg = None
@ -118,10 +122,10 @@ class ConverterMasked(Converter):
#overridable
def on_cli_initialize(self):
if (self.mask_mode >= 3 and self.mask_mode <= 6) and self.fan_seg == None:
self.fan_seg = FANSegmentator(256, FaceType.toString(FaceType.FULL) )
self.fan_seg = FANSegmentator(256, FaceType.toString( self.face_type ) )
#override
def cli_convert_face (self, img_bgr, img_face_landmarks, debug):
def cli_convert_face (self, img_bgr, img_face_landmarks, debug, **kwargs):
if debug:
debugs = [img_bgr.copy()]

View file

@ -1,3 +1,4 @@
from .Converter import Converter
from .ConverterMasked import ConverterMasked
from .ConverterImage import ConverterImage
from .ConverterAvatar import ConverterAvatar

Binary file not shown.

View file

@ -8,9 +8,9 @@ class SubprocessFunctionCaller(object):
self.c2s = c2s
self.lock = lock
def __call__(self, value):
def __call__(self, *args, **kwargs):
self.lock.acquire()
self.c2s.put (value)
self.c2s.put ( {'args':args, 'kwargs':kwargs} )
while True:
if not self.s2c.empty():
obj = self.s2c.get()
@ -27,7 +27,7 @@ class SubprocessFunctionCaller(object):
def process_messages(self):
while not self.c2s.empty():
obj = self.c2s.get()
result = self.func (obj)
result = self.func ( *obj['args'], **obj['kwargs'] )
self.s2c.put (result)
@staticmethod

View file

@ -134,6 +134,7 @@ if __name__ == "__main__":
args = {'input_dir' : arguments.input_dir,
'output_dir' : arguments.output_dir,
'aligned_dir' : arguments.aligned_dir,
'avaperator_aligned_dir' : arguments.avaperator_aligned_dir,
'model_dir' : arguments.model_dir,
'model_name' : arguments.model_name,
'debug' : arguments.debug,
@ -147,7 +148,8 @@ if __name__ == "__main__":
p = subparsers.add_parser( "convert", help="Converter")
p.add_argument('--input-dir', required=True, action=fixPathAction, dest="input_dir", help="Input directory. A directory containing the files you wish to process.")
p.add_argument('--output-dir', required=True, action=fixPathAction, dest="output_dir", help="Output directory. This is where the converted files will be stored.")
p.add_argument('--aligned-dir', action=fixPathAction, dest="aligned_dir", help="Aligned directory. This is where the extracted of dst faces stored. Not used in AVATAR model.")
p.add_argument('--aligned-dir', action=fixPathAction, dest="aligned_dir", help="Aligned directory. This is where the extracted of dst faces stored.")
p.add_argument('--avaperator-aligned-dir', action=fixPathAction, dest="avaperator_aligned_dir", help="Only for AVATAR model. Directory of aligned avatar operator faces.")
p.add_argument('--model-dir', required=True, action=fixPathAction, dest="model_dir", help="Model dir.")
p.add_argument('--model', required=True, dest="model_name", choices=Path_utils.get_all_dir_names_startswith ( Path(__file__).parent / 'models' , 'Model_'), help="Type of model")
p.add_argument('--debug', action="store_true", dest="debug", default=False, help="Debug converter.")

View file

@ -1,19 +1,23 @@
import sys
import multiprocessing
import operator
import os
import shutil
import time
import traceback
from pathlib import Path
from utils import Path_utils
import cv2
from utils.DFLPNG import DFLPNG
from utils.DFLJPG import DFLJPG
from utils.cv2_utils import *
import shutil
import numpy as np
import time
import multiprocessing
from converters import Converter
from joblib import Subprocessor, SubprocessFunctionCaller
from interact import interact as io
from joblib import SubprocessFunctionCaller, Subprocessor
from utils import Path_utils
from utils.cv2_utils import *
from utils.DFLJPG import DFLJPG
from utils.DFLPNG import DFLPNG
from imagelib import normalize_channels
class ConvertSubprocessor(Subprocessor):
class Cli(Subprocessor.Cli):
@ -26,6 +30,7 @@ class ConvertSubprocessor(Subprocessor):
self.converter = client_dict['converter']
self.output_path = Path(client_dict['output_dir']) if 'output_dir' in client_dict.keys() else None
self.alignments = client_dict['alignments']
self.avatar_image_paths = client_dict['avatar_image_paths']
self.debug = client_dict['debug']
#transfer and set stdin in order to work code.interact in debug subprocess
@ -45,13 +50,15 @@ class ConvertSubprocessor(Subprocessor):
#override
def process_data(self, data):
filename_path = Path(data)
idx, filename = data
filename_path = Path(filename)
files_processed = 1
faces_processed = 0
output_filename_path = self.output_path / (filename_path.stem + '.png')
if self.converter.type == Converter.TYPE_FACE and filename_path.stem not in self.alignments.keys():
if (self.converter.type == Converter.TYPE_FACE or self.converter.type == Converter.TYPE_FACE_AVATAR ) \
and filename_path.stem not in self.alignments.keys():
if not self.debug:
self.log_info ( 'no faces found for %s, copying without faces' % (filename_path.name) )
@ -62,19 +69,18 @@ class ConvertSubprocessor(Subprocessor):
cv2_imwrite ( str(output_filename_path), image )
else:
image = (cv2_imread(str(filename_path)) / 255.0).astype(np.float32)
h,w,c = image.shape
if c > 3:
image = image[...,0:3]
image = normalize_channels (image, 3)
if self.converter.type == Converter.TYPE_IMAGE:
image = self.converter.convert_image(image, None, self.debug)
image = self.converter.cli_convert_image(image, None, self.debug)
if self.debug:
raise NotImplementedError
#for img in image:
# io.show_image ('Debug convert', img )
# cv2.waitKey(0)
return (1, image)
faces_processed = 1
elif self.converter.type == Converter.TYPE_IMAGE_WITH_LANDMARKS:
#currently unused
if filename_path.suffix == '.png':
dflimg = DFLPNG.load( str(filename_path) )
elif filename_path.suffix == '.jpg':
@ -85,7 +91,7 @@ class ConvertSubprocessor(Subprocessor):
if dflimg is not None:
image_landmarks = dflimg.get_landmarks()
image = self.converter.convert_image(image, image_landmarks, self.debug)
image = self.converter.convert_image(image, image_landmarks, self.debug)
if self.debug:
raise NotImplementedError
@ -96,7 +102,13 @@ class ConvertSubprocessor(Subprocessor):
else:
self.log_err ("%s is not a dfl image file" % (filename_path.name) )
elif self.converter.type == Converter.TYPE_FACE:
elif self.converter.type == Converter.TYPE_FACE or self.converter.type == Converter.TYPE_FACE_AVATAR:
ava_face = None
if self.converter.type == Converter.TYPE_FACE_AVATAR:
ava_filename_path = self.avatar_image_paths[idx]
ava_face = (cv2_imread(str(ava_filename_path)) / 255.0).astype(np.float32)
ava_face = normalize_channels (ava_face, 3)
faces = self.alignments[filename_path.stem]
if self.debug:
@ -108,9 +120,9 @@ class ConvertSubprocessor(Subprocessor):
self.log_info ( '\nConverting face_num [%d] in file [%s]' % (face_num, filename_path) )
if self.debug:
debug_images += self.converter.cli_convert_face(image, image_landmarks, self.debug)
debug_images += self.converter.cli_convert_face(image, image_landmarks, self.debug, avaperator_face_bgr=ava_face)
else:
image = self.converter.cli_convert_face(image, image_landmarks, self.debug)
image = self.converter.cli_convert_face(image, image_landmarks, self.debug, avaperator_face_bgr=ava_face)
except Exception as e:
e_str = traceback.format_exc()
@ -133,16 +145,19 @@ class ConvertSubprocessor(Subprocessor):
#overridable
def get_data_name (self, data):
#return string identificator of your data
return data
idx, filename = data
return filename
#override
def __init__(self, converter, input_path_image_paths, output_path, alignments, debug = False):
def __init__(self, converter, input_path_image_paths, output_path, alignments, avatar_image_paths=None, debug = False):
super().__init__('Converter', ConvertSubprocessor.Cli, 86400 if debug == True else 60)
self.converter = converter
self.input_data = self.input_path_image_paths = input_path_image_paths
self.input_data_idxs = [ *range(len(self.input_data)) ]
self.output_path = output_path
self.alignments = alignments
self.avatar_image_paths = avatar_image_paths
self.debug = debug
self.files_processed = 0
@ -158,6 +173,7 @@ class ConvertSubprocessor(Subprocessor):
'converter' : self.converter,
'output_dir' : str(self.output_path),
'alignments' : self.alignments,
'avatar_image_paths' : self.avatar_image_paths,
'debug': self.debug,
'stdin_fd': sys.stdin.fileno() if self.debug else None
}
@ -167,7 +183,7 @@ class ConvertSubprocessor(Subprocessor):
if self.debug:
io.named_window ("Debug convert")
io.progress_bar ("Converting", len (self.input_data) )
io.progress_bar ("Converting", len (self.input_data_idxs) )
#overridable optional
def on_clients_finalized(self):
@ -178,13 +194,15 @@ class ConvertSubprocessor(Subprocessor):
#override
def get_data(self, host_dict):
if len (self.input_data) > 0:
return self.input_data.pop(0)
if len (self.input_data_idxs) > 0:
idx = self.input_data_idxs.pop(0)
return (idx, self.input_data[idx])
return None
#override
def on_data_return (self, host_dict, data):
self.input_data.insert(0, data)
idx, filename = data
self.input_data_idxs.insert(0, idx)
#override
def on_result (self, host_dict, data, result):
@ -209,7 +227,8 @@ def main (args, device_args):
io.log_info ("Running converter.\r\n")
aligned_dir = args.get('aligned_dir', None)
avaperator_aligned_dir = args.get('avaperator_aligned_dir', None)
try:
input_path = Path(args['input_dir'])
output_path = Path(args['output_dir'])
@ -233,9 +252,10 @@ def main (args, device_args):
model = models.import_model( args['model_name'] )(model_path, device_args=device_args)
converter = model.get_converter()
input_path_image_paths = Path_utils.get_image_paths(input_path)
alignments = None
if converter.type == Converter.TYPE_FACE:
avatar_image_paths = None
if converter.type == Converter.TYPE_FACE or converter.type == Converter.TYPE_FACE_AVATAR:
if aligned_dir is None:
io.log_err('Aligned directory not found. Please ensure it exists.')
return
@ -267,12 +287,45 @@ def main (args, device_args):
alignments[ source_filename_stem ] = []
alignments[ source_filename_stem ].append (dflimg.get_source_landmarks())
if converter.type == Converter.TYPE_FACE_AVATAR:
if avaperator_aligned_dir is None:
io.log_err('Avatar operator aligned directory not found. Please ensure it exists.')
return
avaperator_aligned_path = Path(avaperator_aligned_dir)
if not avaperator_aligned_path.exists():
io.log_err('Avatar operator aligned directory not found. Please ensure it exists.')
return
avatar_image_paths = []
for filename in io.progress_bar_generator( Path_utils.get_image_paths(avaperator_aligned_path) , "Sorting avaperator faces"):
filepath = Path(filename)
if filepath.suffix == '.png':
dflimg = DFLPNG.load( str(filepath) )
elif filepath.suffix == '.jpg':
dflimg = DFLJPG.load ( str(filepath) )
else:
dflimg = None
if dflimg is None:
io.log_err ("Fatal error: %s is not a dfl image file" % (filepath.name) )
return
avatar_image_paths += [ (filename, dflimg.get_source_filename() ) ]
avatar_image_paths = [ p[0] for p in sorted(avatar_image_paths, key=operator.itemgetter(1)) ]
if len(input_path_image_paths) < len(avatar_image_paths):
io.log_err("Input faces count must be >= avatar operator faces count.")
return
files_processed, faces_processed = ConvertSubprocessor (
converter = converter,
input_path_image_paths = Path_utils.get_image_paths(input_path),
input_path_image_paths = input_path_image_paths,
output_path = output_path,
alignments = alignments,
avatar_image_paths = avatar_image_paths,
debug = args.get('debug',False)
).run()

View file

@ -0,0 +1,408 @@
from functools import partial
import cv2
import numpy as np
from facelib import FaceType
from interact import interact as io
from mathlib import get_power_of_two
from models import ModelBase
from nnlib import nnlib
from samplelib import *
class AVATARModel(ModelBase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs,
ask_sort_by_yaw=False,
ask_random_flip=False,
ask_src_scale_mod=False)
#override
def onInitializeOptions(self, is_first_run, ask_override):
default_face_type = 'f'
if is_first_run:
self.options['resolution'] = io.input_int("Resolution ( 128,256 ?:help skip:128) : ", 128, [128,256], help_message="More resolution requires more VRAM and time to train. Value will be adjusted to multiple of 16.")
else:
self.options['resolution'] = self.options.get('resolution', 128)
#override
def onInitialize(self, batch_size=-1, **in_options):
exec(nnlib.code_import_all, locals(), globals())
self.set_vram_batch_requirements({4:4})
resolution = self.options['resolution']
bgr_shape = (resolution, resolution, 3)
mask_shape = (resolution, resolution, 1)
bgrm_shape = (resolution, resolution, 4)
ngf = 64
ndf = 64
lambda_A = 10
lambda_B = 10
use_batch_norm = True #created_batch_size > 1
self.GA = modelify(AVATARModel.ResNet (bgr_shape[2], use_batch_norm, n_blocks=6, ngf=ngf, use_dropout=True))( Input(bgrm_shape) )
self.GB = modelify(AVATARModel.ResNet (bgr_shape[2], use_batch_norm, n_blocks=6, ngf=ngf, use_dropout=True))( Input(bgrm_shape) )
#self.GA = modelify(UNet (bgr_shape[2], use_batch_norm, num_downs=get_power_of_two(resolution)-1, ngf=ngf, use_dropout=True))(Input(bgr_shape))
#self.GB = modelify(UNet (bgr_shape[2], use_batch_norm, num_downs=get_power_of_two(resolution)-1, ngf=ngf, use_dropout=True))(Input(bgr_shape))
self.DA = modelify(AVATARModel.PatchDiscriminator(ndf=ndf) ) ( Input(bgrm_shape) )
self.DB = modelify(AVATARModel.PatchDiscriminator(ndf=ndf) ) ( Input(bgrm_shape) )
if not self.is_first_run():
weights_to_load = [
(self.GA, 'GA.h5'),
(self.DA, 'DA.h5'),
(self.GB, 'GB.h5'),
(self.DB, 'DB.h5'),
]
self.load_weights_safe(weights_to_load)
real_A0 = Input(bgr_shape)
real_A0m = Input(mask_shape)
real_B0 = Input(bgr_shape)
real_B0m = Input(mask_shape)
real_A0_A0m = K.concatenate([real_A0, real_A0m])
real_A0_B0m = K.concatenate([real_A0, real_B0m])
real_B0_B0m = K.concatenate([real_B0, real_B0m])
real_B0_A0m = K.concatenate([real_B0, real_A0m])
DA_ones = K.ones_like ( K.shape(self.DA.outputs[0]) )
DA_zeros = K.zeros_like ( K.shape(self.DA.outputs[0] ))
DB_ones = K.ones_like ( K.shape(self.DB.outputs[0] ))
DB_zeros = K.zeros_like ( K.shape(self.DB.outputs[0] ))
def DLoss(labels,logits):
return K.mean(K.binary_crossentropy(labels,logits))
def CycleLOSS(t1,t2):
return K.mean(K.abs(t1 - t2))
fake_B0 = self.GA(real_A0_B0m)
fake_A0 = self.GB(real_B0_A0m)
real_A0_d = self.DA(real_A0_A0m)
real_A0_d_ones = K.ones_like(real_A0_d)
fake_A0_d = self.DA( K.concatenate([fake_A0, real_A0m]) )
fake_A0_d_ones = K.ones_like(fake_A0_d)
fake_A0_d_zeros = K.zeros_like(fake_A0_d)
real_B0_d = self.DB(real_B0_B0m)
real_B0_d_ones = K.ones_like(real_B0_d)
fake_B0_d = self.DB( K.concatenate([fake_B0, real_B0m]) )
fake_B0_d_ones = K.ones_like(fake_B0_d)
fake_B0_d_zeros = K.zeros_like(fake_B0_d)
rec_A0 = self.GB ( K.concatenate([fake_B0, real_A0m]) )
rec_B0 = self.GA ( K.concatenate([fake_A0, real_B0m]) )
loss_GA = DLoss(fake_B0_d_ones, fake_B0_d ) + \
lambda_A * (CycleLOSS(rec_B0, real_B0) )
weights_GA = self.GA.trainable_weights
loss_GB = DLoss(fake_A0_d_ones, fake_A0_d ) + \
lambda_B * (CycleLOSS(rec_A0, real_A0) )
weights_GB = self.GB.trainable_weights
def opt():
return Adam(lr=2e-4, beta_1=0.5, beta_2=0.999, tf_cpu_mode=2)#, clipnorm=1)
self.GA_train = K.function ([real_A0, real_A0m, real_B0, real_B0m],[loss_GA],
opt().get_updates(loss_GA, weights_GA) )
self.GB_train = K.function ([real_A0, real_A0m, real_B0, real_B0m],[loss_GB],
opt().get_updates(loss_GB, weights_GB) )
###########
loss_D_A = ( DLoss(real_A0_d_ones, real_A0_d ) + \
DLoss(fake_A0_d_zeros, fake_A0_d ) ) * 0.5
self.DA_train = K.function ([real_A0, real_A0m, real_B0, real_B0m],[loss_D_A],
opt().get_updates(loss_D_A, self.DA.trainable_weights) )
############
loss_D_B = ( DLoss(real_B0_d_ones, real_B0_d ) + \
DLoss(fake_B0_d_zeros, fake_B0_d ) ) * 0.5
self.DB_train = K.function ([real_A0, real_A0m, real_B0, real_B0m],[loss_D_B],
opt().get_updates(loss_D_B, self.DB.trainable_weights) )
############
self.G_view = K.function([real_A0, real_A0m, real_B0, real_B0m],[fake_A0, rec_A0, fake_B0, rec_B0 ])
if self.is_training_mode:
f = SampleProcessor.TypeFlags
face_type = f.FACE_TYPE_FULL
output_sample_types=[ [f.SOURCE | face_type | f.MODE_BGR, resolution],
[f.SOURCE | face_type | f.MODE_M | f.FACE_MASK_FULL, resolution],
]
self.set_training_data_generators ([
SampleGeneratorFace(self.training_data_src_path, debug=self.is_debug(), batch_size=self.batch_size,
sample_process_options=SampleProcessor.Options(random_flip=self.random_flip, normalize_tanh = True),
output_sample_types=output_sample_types ),
SampleGeneratorFace(self.training_data_dst_path, debug=self.is_debug(), batch_size=self.batch_size,
sample_process_options=SampleProcessor.Options(random_flip=self.random_flip, normalize_tanh = True),
output_sample_types=output_sample_types )
])
else:
self.G_convert = K.function([real_A0, real_B0m],[fake_B0])
#override
def onSave(self):
self.save_weights_safe( [[self.GA, 'GA.h5'],
[self.GB, 'GB.h5'],
[self.DA, 'DA.h5'],
[self.DB, 'DB.h5'] ])
#override
def onTrainOneIter(self, generators_samples, generators_list):
src, srcm = generators_samples[0]
dst, dstm = generators_samples[1]
feed = [src, srcm, dst, dstm]
loss_GA, = self.GA_train ( feed )
loss_GB, = self.GB_train ( feed )
loss_DA, = self.DA_train( feed )
loss_DB, = self.DB_train( feed )
return ( ('GA', loss_GA), ('GB', loss_GB), ('DA', loss_DA), ('DB', loss_DB) )
#override
def onGetPreview(self, sample):
test_A0 = sample[0][0][0:4]
test_A0m = sample[0][1][0:4]
test_B0 = sample[1][0][0:4]
test_B0m = sample[1][1][0:4]
G_view_result = self.G_view([test_A0, test_A0m, test_B0, test_B0m])
fake_A0, rec_A0, fake_B0, rec_B0 = [ x[0] / 2 + 0.5 for x in G_view_result]
test_A0, test_A0m, test_B0, test_B0m = [ x[0] / 2 + 0.5 for x in [test_A0, test_A0m, test_B0, test_B0m] ]
r = np.concatenate ((np.concatenate ( (test_A0, fake_B0, rec_A0), axis=1),
np.concatenate ( (test_B0, fake_A0, rec_B0), axis=1)
), axis=0)
return [ ('AVATAR', r ) ]
def predictor_func (self, avaperator_face, target_face_mask):
feed = [ avaperator_face[np.newaxis,...]*2-1, target_face_mask[np.newaxis,...]*2-1 ]
x = self.G_convert (feed)[0]
return np.clip ( x[0] / 2 + 0.5 , 0, 1)
# #override
# def get_converter(self, **in_options):
# from models import ConverterImage
# return ConverterImage(self.predictor_func,
# predictor_input_size=self.options['resolution'],
# **in_options)
#override
def get_converter(self):
base_erode_mask_modifier = 30
base_blur_mask_modifier = 0
default_erode_mask_modifier = 0
default_blur_mask_modifier = 0
face_type = FaceType.FULL
from converters import ConverterAvatar
return ConverterAvatar(self.predictor_func,
predictor_input_size=self.options['resolution'])
@staticmethod
def ResNet(output_nc, use_batch_norm, ngf=64, n_blocks=6, use_dropout=False):
exec (nnlib.import_all(), locals(), globals())
if not use_batch_norm:
use_bias = True
def XNormalization(x):
return InstanceNormalization (axis=-1)(x)
else:
use_bias = False
def XNormalization(x):
return BatchNormalization (axis=-1)(x)
XConv2D = partial(Conv2D, padding='same', use_bias=use_bias)
XConv2DTranspose = partial(Conv2DTranspose, padding='same', use_bias=use_bias)
def func(input):
def ResnetBlock(dim, use_dropout=False):
def func(input):
x = input
x = XConv2D(dim, 3, strides=1)(x)
x = XNormalization(x)
x = ReLU()(x)
if use_dropout:
x = Dropout(0.5)(x)
x = XConv2D(dim, 3, strides=1)(x)
x = XNormalization(x)
x = ReLU()(x)
return Add()([x,input])
return func
x = input
x = XConv2D(ngf, 7, strides=1, use_bias=True)(x)
x = ReLU()(XNormalization(XConv2D(ngf*2, 4, strides=2)(x)))
x = ReLU()(XNormalization(XConv2D(ngf*4, 4, strides=2)(x)))
for i in range(n_blocks):
x = ResnetBlock(ngf*4, use_dropout=use_dropout)(x)
x = ReLU()(XNormalization(XConv2DTranspose(ngf*2, 3, strides=2)(x)))
x = ReLU()(XNormalization(XConv2DTranspose(ngf , 3, strides=2)(x)))
x = XConv2D(output_nc, 7, strides=1, activation='tanh', use_bias=True)(x)
return x
return func
@staticmethod
def UNet(output_nc, use_batch_norm, ngf=64, use_dropout=False):
exec (nnlib.import_all(), locals(), globals())
if not use_batch_norm:
use_bias = True
def XNormalizationL():
return InstanceNormalization (axis=-1)
else:
use_bias = False
def XNormalizationL():
return BatchNormalization (axis=-1)
def XNormalization(x):
return XNormalizationL()(x)
XConv2D = partial(Conv2D, padding='same', use_bias=use_bias)
XConv2DTranspose = partial(Conv2DTranspose, padding='same', use_bias=use_bias)
def func(input):
b,h,w,c = K.int_shape(input)
n_downs = get_power_of_two(w) - 4
Norm = XNormalizationL()
Norm2 = XNormalizationL()
Norm4 = XNormalizationL()
Norm8 = XNormalizationL()
x = input
x = e1 = XConv2D( ngf, 4, strides=2, use_bias=True ) (x)
x = e2 = Norm2( XConv2D( ngf*2, 4, strides=2 )( LeakyReLU(0.2)(x) ) )
x = e3 = Norm4( XConv2D( ngf*4, 4, strides=2 )( LeakyReLU(0.2)(x) ) )
l = []
for i in range(n_downs):
x = Norm8( XConv2D( ngf*8, 4, strides=2 )( LeakyReLU(0.2)(x) ) )
l += [x]
x = XConv2D( ngf*8, 4, strides=2, use_bias=True )( LeakyReLU(0.2)(x) )
for i in range(n_downs):
x = Norm8( XConv2DTranspose( ngf*8, 4, strides=2 )( ReLU()(x) ) )
if i <= n_downs-2:
x = Dropout(0.5)(x)
x = Concatenate(axis=-1)([x, l[-i-1] ])
x = Norm4( XConv2DTranspose( ngf*4, 4, strides=2 )( ReLU()(x) ) )
x = Concatenate(axis=-1)([x, e3])
x = Norm2( XConv2DTranspose( ngf*2, 4, strides=2 )( ReLU()(x) ) )
x = Concatenate(axis=-1)([x, e2])
x = Norm( XConv2DTranspose( ngf, 4, strides=2 )( ReLU()(x) ) )
x = Concatenate(axis=-1)([x, e1])
x = XConv2DTranspose(output_nc, 4, strides=2, activation='tanh', use_bias=True)( ReLU()(x) )
return x
return func
nnlib.UNet = UNet
@staticmethod
def UNetTemporalPredictor(output_nc, use_batch_norm, ngf=64, use_dropout=False):
exec (nnlib.import_all(), locals(), globals())
def func(inputs):
past_2_image_tensor, past_1_image_tensor = inputs
x = Concatenate(axis=-1)([ past_2_image_tensor, past_1_image_tensor ])
x = UNet(3, use_batch_norm, ngf=ngf, use_dropout=use_dropout) (x)
return x
return func
@staticmethod
def PatchDiscriminator(ndf=64, n_layers=3):
exec (nnlib.import_all(), locals(), globals())
use_bias = True
def XNormalization(x):
return InstanceNormalization (axis=-1)(x)
XConv2D = partial(Conv2D, use_bias=use_bias)
def func(input):
x = input
x = ZeroPadding2D((1,1))(x)
x = XConv2D( ndf, 4, strides=2, padding='valid')(x)
x = LeakyReLU(0.2)(x)
x = ZeroPadding2D((1,1))(x)
x = XConv2D( ndf*2, 4, strides=2, padding='valid')(x)
x = XNormalization(x)
x = LeakyReLU(0.2)(x)
x = ZeroPadding2D((1,1))(x)
x = XConv2D( ndf*4, 4, strides=2, padding='valid')(x)
x = XNormalization(x)
x = LeakyReLU(0.2)(x)
x = ZeroPadding2D((1,1))(x)
x = XConv2D( ndf*8, 4, strides=1, padding='valid')(x)
x = XNormalization(x)
x = LeakyReLU(0.2)(x)
x = ZeroPadding2D((1,1))(x)
return XConv2D( 1, 4, strides=1, padding='valid', activation='sigmoid')(x)#
return func
Model = AVATARModel

View file

@ -0,0 +1 @@
from .Model import Model

View file

@ -0,0 +1,426 @@
from functools import partial
import cv2
import numpy as np
from facelib import FaceType
from interact import interact as io
from mathlib import get_power_of_two
from models import ModelBase
from nnlib import nnlib
from samplelib import *
class RecycleGANModel(ModelBase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs,
ask_sort_by_yaw=False,
ask_random_flip=False,
ask_src_scale_mod=False)
#override
def onInitializeOptions(self, is_first_run, ask_override):
if is_first_run:
self.options['resolution'] = io.input_int("Resolution ( 128,256 ?:help skip:128) : ", 128, [128,256], help_message="More resolution requires more VRAM and time to train. Value will be adjusted to multiple of 16.")
else:
self.options['resolution'] = self.options.get('resolution', 128)
#override
def onInitialize(self, batch_size=-1, **in_options):
exec(nnlib.code_import_all, locals(), globals())
self.set_vram_batch_requirements({6:16})
resolution = self.options['resolution']
bgr_shape = (resolution, resolution, 3)
ngf = 64
npf = 32
ndf = 64
lambda_A = 10
lambda_B = 10
use_batch_norm = True #created_batch_size > 1
self.GA = modelify(RecycleGANModel.ResNet (bgr_shape[2], use_batch_norm, n_blocks=6, ngf=ngf, use_dropout=True))(Input(bgr_shape))
self.GB = modelify(RecycleGANModel.ResNet (bgr_shape[2], use_batch_norm, n_blocks=6, ngf=ngf, use_dropout=True))(Input(bgr_shape))
#self.GA = modelify(UNet (bgr_shape[2], use_batch_norm, num_downs=get_power_of_two(resolution)-1, ngf=ngf, use_dropout=True))(Input(bgr_shape))
#self.GB = modelify(UNet (bgr_shape[2], use_batch_norm, num_downs=get_power_of_two(resolution)-1, ngf=ngf, use_dropout=True))(Input(bgr_shape))
self.PA = modelify(RecycleGANModel.UNetTemporalPredictor(bgr_shape[2], use_batch_norm, ngf=npf))([Input(bgr_shape), Input(bgr_shape)])
self.PB = modelify(RecycleGANModel.UNetTemporalPredictor(bgr_shape[2], use_batch_norm, ngf=npf))([Input(bgr_shape), Input(bgr_shape)])
self.DA = modelify(RecycleGANModel.PatchDiscriminator(ndf=ndf) ) (Input(bgr_shape))
self.DB = modelify(RecycleGANModel.PatchDiscriminator(ndf=ndf) ) (Input(bgr_shape))
if not self.is_first_run():
weights_to_load = [
(self.GA, 'GA.h5'),
(self.DA, 'DA.h5'),
(self.PA, 'PA.h5'),
(self.GB, 'GB.h5'),
(self.DB, 'DB.h5'),
(self.PB, 'PB.h5'),
]
self.load_weights_safe(weights_to_load)
real_A0 = Input(bgr_shape, name="real_A0")
real_A1 = Input(bgr_shape, name="real_A1")
real_A2 = Input(bgr_shape, name="real_A2")
real_B0 = Input(bgr_shape, name="real_B0")
real_B1 = Input(bgr_shape, name="real_B1")
real_B2 = Input(bgr_shape, name="real_B2")
DA_ones = K.ones_like ( K.shape(self.DA.outputs[0]) )
DA_zeros = K.zeros_like ( K.shape(self.DA.outputs[0] ))
DB_ones = K.ones_like ( K.shape(self.DB.outputs[0] ))
DB_zeros = K.zeros_like ( K.shape(self.DB.outputs[0] ))
def DLoss(labels,logits):
return K.mean(K.binary_crossentropy(labels,logits))
def CycleLoss (t1,t2):
return K.mean(K.abs(t1 - t2))
def RecurrentLOSS(t1,t2):
return K.mean(K.abs(t1 - t2))
def RecycleLOSS(t1,t2):
return K.mean(K.abs(t1 - t2))
fake_B0 = self.GA(real_A0)
fake_B1 = self.GA(real_A1)
fake_A0 = self.GB(real_B0)
fake_A1 = self.GB(real_B1)
real_A0_d = self.DA(real_A0)
real_A0_d_ones = K.ones_like(real_A0_d)
real_A1_d = self.DA(real_A1)
real_A1_d_ones = K.ones_like(real_A1_d)
fake_A0_d = self.DA(fake_A0)
fake_A0_d_ones = K.ones_like(fake_A0_d)
fake_A0_d_zeros = K.zeros_like(fake_A0_d)
fake_A1_d = self.DA(fake_A1)
fake_A1_d_ones = K.ones_like(fake_A1_d)
fake_A1_d_zeros = K.zeros_like(fake_A1_d)
real_B0_d = self.DB(real_B0)
real_B0_d_ones = K.ones_like(real_B0_d)
real_B1_d = self.DB(real_B1)
real_B1_d_ones = K.ones_like(real_B1_d)
fake_B0_d = self.DB(fake_B0)
fake_B0_d_ones = K.ones_like(fake_B0_d)
fake_B0_d_zeros = K.zeros_like(fake_B0_d)
fake_B1_d = self.DB(fake_B1)
fake_B1_d_ones = K.ones_like(fake_B1_d)
fake_B1_d_zeros = K.zeros_like(fake_B1_d)
pred_A2 = self.PA ( [real_A0, real_A1])
pred_B2 = self.PB ( [real_B0, real_B1])
rec_A2 = self.GB ( self.PB ( [fake_B0, fake_B1]) )
rec_B2 = self.GA ( self.PA ( [fake_A0, fake_A1]))
loss_GA = DLoss(fake_B0_d_ones, fake_B0_d ) + \
DLoss(fake_B1_d_ones, fake_B1_d ) + \
lambda_A * (RecurrentLOSS(pred_A2, real_A2) + \
RecycleLOSS(rec_B2, real_B2) )
weights_GA = self.GA.trainable_weights + self.PA.trainable_weights
loss_GB = DLoss(fake_A0_d_ones, fake_A0_d ) + \
DLoss(fake_A1_d_ones, fake_A1_d ) + \
lambda_B * (RecurrentLOSS(pred_B2, real_B2) + \
RecycleLOSS(rec_A2, real_A2) )
weights_GB = self.GB.trainable_weights + self.PB.trainable_weights
def opt():
return Adam(lr=2e-4, beta_1=0.5, beta_2=0.999, tf_cpu_mode=2)#, clipnorm=1)
self.GA_train = K.function ([real_A0, real_A1, real_A2, real_B0, real_B1, real_B2],[loss_GA],
opt().get_updates(loss_GA, weights_GA) )
self.GB_train = K.function ([real_A0, real_A1, real_A2, real_B0, real_B1, real_B2],[loss_GB],
opt().get_updates(loss_GB, weights_GB) )
###########
loss_D_A0 = ( DLoss(real_A0_d_ones, real_A0_d ) + \
DLoss(fake_A0_d_zeros, fake_A0_d ) ) * 0.5
loss_D_A1 = ( DLoss(real_A1_d_ones, real_A1_d ) + \
DLoss(fake_A1_d_zeros, fake_A1_d ) ) * 0.5
loss_D_A = loss_D_A0 + loss_D_A1
self.DA_train = K.function ([real_A0, real_A1, real_A2, real_B0, real_B1, real_B2],[loss_D_A],
opt().get_updates(loss_D_A, self.DA.trainable_weights) )
############
loss_D_B0 = ( DLoss(real_B0_d_ones, real_B0_d ) + \
DLoss(fake_B0_d_zeros, fake_B0_d ) ) * 0.5
loss_D_B1 = ( DLoss(real_B1_d_ones, real_B1_d ) + \
DLoss(fake_B1_d_zeros, fake_B1_d ) ) * 0.5
loss_D_B = loss_D_B0 + loss_D_B1
self.DB_train = K.function ([real_A0, real_A1, real_A2, real_B0, real_B1, real_B2],[loss_D_B],
opt().get_updates(loss_D_B, self.DB.trainable_weights) )
############
self.G_view = K.function([real_A0, real_A1, real_A2, real_B0, real_B1, real_B2],[fake_A0, fake_A1, pred_A2, rec_A2, fake_B0, fake_B1, pred_B2, rec_B2 ])
if self.is_training_mode:
f = SampleProcessor.TypeFlags
self.set_training_data_generators ([
SampleGeneratorImageTemporal(self.training_data_src_path, debug=self.is_debug(), batch_size=self.batch_size,
temporal_image_count=3,
sample_process_options=SampleProcessor.Options(random_flip = False, normalize_tanh = True),
output_sample_types=[ [f.SOURCE | f.MODE_BGR, resolution] ] ),
SampleGeneratorImageTemporal(self.training_data_dst_path, debug=self.is_debug(), batch_size=self.batch_size,
temporal_image_count=3,
sample_process_options=SampleProcessor.Options(random_flip = False, normalize_tanh = True),
output_sample_types=[ [f.SOURCE | f.MODE_BGR, resolution] ] ),
])
else:
self.G_convert = K.function([real_B0],[fake_A0])
#override
def onSave(self):
self.save_weights_safe( [[self.GA, 'GA.h5'],
[self.GB, 'GB.h5'],
[self.DA, 'DA.h5'],
[self.DB, 'DB.h5'],
[self.PA, 'PA.h5'],
[self.PB, 'PB.h5'] ])
#override
def onTrainOneIter(self, generators_samples, generators_list):
source_src_0, source_src_1, source_src_2, = generators_samples[0]
source_dst_0, source_dst_1, source_dst_2, = generators_samples[1]
feed = [source_src_0, source_src_1, source_src_2, source_dst_0, source_dst_1, source_dst_2]
loss_GA, = self.GA_train ( feed )
loss_GB, = self.GB_train ( feed )
loss_DA, = self.DA_train( feed )
loss_DB, = self.DB_train( feed )
return ( ('GA', loss_GA), ('GB', loss_GB), ('DA', loss_DA), ('DB', loss_DB) )
#override
def onGetPreview(self, sample):
test_A0 = sample[0][0]
test_A1 = sample[0][1]
test_A2 = sample[0][2]
test_B0 = sample[1][0]
test_B1 = sample[1][1]
test_B2 = sample[1][2]
G_view_result = self.G_view([test_A0, test_A1, test_A2, test_B0, test_B1, test_B2])
fake_A0, fake_A1, pred_A2, rec_A2, fake_B0, fake_B1, pred_B2, rec_B2 = [ x[0] / 2 + 0.5 for x in G_view_result]
test_A0, test_A1, test_A2, test_B0, test_B1, test_B2 = [ x[0] / 2 + 0.5 for x in [test_A0, test_A1, test_A2, test_B0, test_B1, test_B2] ]
r = np.concatenate ((np.concatenate ( (test_A0, test_A1, test_A2, pred_A2, fake_B0, fake_B1, rec_A2), axis=1),
np.concatenate ( (test_B0, test_B1, test_B2, pred_B2, fake_A0, fake_A1, rec_B2), axis=1)
), axis=0)
return [ ('RecycleGAN', r ) ]
def predictor_func (self, face):
x = self.G_convert ( [ face[np.newaxis,...]*2-1 ] )[0]
return np.clip ( x[0] / 2 + 0.5 , 0, 1)
#override
def get_converter(self, **in_options):
from converters import ConverterImage
return ConverterImage(self.predictor_func,
predictor_input_size=self.options['resolution'],
**in_options)
@staticmethod
def ResNet(output_nc, use_batch_norm, ngf=64, n_blocks=6, use_dropout=False):
exec (nnlib.import_all(), locals(), globals())
if not use_batch_norm:
use_bias = True
def XNormalization(x):
return InstanceNormalization (axis=-1)(x)
else:
use_bias = False
def XNormalization(x):
return BatchNormalization (axis=-1)(x)
XConv2D = partial(Conv2D, padding='same', use_bias=use_bias)
XConv2DTranspose = partial(Conv2DTranspose, padding='same', use_bias=use_bias)
def func(input):
def ResnetBlock(dim, use_dropout=False):
def func(input):
x = input
x = XConv2D(dim, 3, strides=1)(x)
x = XNormalization(x)
x = ReLU()(x)
if use_dropout:
x = Dropout(0.5)(x)
x = XConv2D(dim, 3, strides=1)(x)
x = XNormalization(x)
x = ReLU()(x)
return Add()([x,input])
return func
x = input
x = XConv2D(ngf, 7, strides=1, use_bias=True)(x)
x = ReLU()(XNormalization(XConv2D(ngf*2, 4, strides=2)(x)))
x = ReLU()(XNormalization(XConv2D(ngf*4, 4, strides=2)(x)))
for i in range(n_blocks):
x = ResnetBlock(ngf*4, use_dropout=use_dropout)(x)
x = ReLU()(XNormalization(XConv2DTranspose(ngf*2, 3, strides=2)(x)))
x = ReLU()(XNormalization(XConv2DTranspose(ngf , 3, strides=2)(x)))
x = XConv2D(output_nc, 7, strides=1, activation='tanh', use_bias=True)(x)
return x
return func
@staticmethod
def UNet(output_nc, use_batch_norm, ngf=64, use_dropout=False):
exec (nnlib.import_all(), locals(), globals())
if not use_batch_norm:
use_bias = True
def XNormalizationL():
return InstanceNormalization (axis=-1)
else:
use_bias = False
def XNormalizationL():
return BatchNormalization (axis=-1)
def XNormalization(x):
return XNormalizationL()(x)
XConv2D = partial(Conv2D, padding='same', use_bias=use_bias)
XConv2DTranspose = partial(Conv2DTranspose, padding='same', use_bias=use_bias)
def func(input):
b,h,w,c = K.int_shape(input)
n_downs = get_power_of_two(w) - 4
Norm = XNormalizationL()
Norm2 = XNormalizationL()
Norm4 = XNormalizationL()
Norm8 = XNormalizationL()
x = input
x = e1 = XConv2D( ngf, 4, strides=2, use_bias=True ) (x)
x = e2 = Norm2( XConv2D( ngf*2, 4, strides=2 )( LeakyReLU(0.2)(x) ) )
x = e3 = Norm4( XConv2D( ngf*4, 4, strides=2 )( LeakyReLU(0.2)(x) ) )
l = []
for i in range(n_downs):
x = Norm8( XConv2D( ngf*8, 4, strides=2 )( LeakyReLU(0.2)(x) ) )
l += [x]
x = XConv2D( ngf*8, 4, strides=2, use_bias=True )( LeakyReLU(0.2)(x) )
for i in range(n_downs):
x = Norm8( XConv2DTranspose( ngf*8, 4, strides=2 )( ReLU()(x) ) )
if i <= n_downs-2:
x = Dropout(0.5)(x)
x = Concatenate(axis=-1)([x, l[-i-1] ])
x = Norm4( XConv2DTranspose( ngf*4, 4, strides=2 )( ReLU()(x) ) )
x = Concatenate(axis=-1)([x, e3])
x = Norm2( XConv2DTranspose( ngf*2, 4, strides=2 )( ReLU()(x) ) )
x = Concatenate(axis=-1)([x, e2])
x = Norm( XConv2DTranspose( ngf, 4, strides=2 )( ReLU()(x) ) )
x = Concatenate(axis=-1)([x, e1])
x = XConv2DTranspose(output_nc, 4, strides=2, activation='tanh', use_bias=True)( ReLU()(x) )
return x
return func
nnlib.UNet = UNet
@staticmethod
def UNetTemporalPredictor(output_nc, use_batch_norm, ngf=64, use_dropout=False):
exec (nnlib.import_all(), locals(), globals())
def func(inputs):
past_2_image_tensor, past_1_image_tensor = inputs
x = Concatenate(axis=-1)([ past_2_image_tensor, past_1_image_tensor ])
x = UNet(3, use_batch_norm, ngf=ngf, use_dropout=use_dropout) (x)
return x
return func
@staticmethod
def PatchDiscriminator(ndf=64, n_layers=3):
exec (nnlib.import_all(), locals(), globals())
use_bias = True
def XNormalization(x):
return InstanceNormalization (axis=-1)(x)
XConv2D = partial(Conv2D, use_bias=use_bias)
def func(input):
x = input
x = ZeroPadding2D((1,1))(x)
x = XConv2D( ndf, 4, strides=2, padding='valid')(x)
x = LeakyReLU(0.2)(x)
x = ZeroPadding2D((1,1))(x)
x = XConv2D( ndf*2, 4, strides=2, padding='valid')(x)
x = XNormalization(x)
x = LeakyReLU(0.2)(x)
x = ZeroPadding2D((1,1))(x)
x = XConv2D( ndf*4, 4, strides=2, padding='valid')(x)
x = XNormalization(x)
x = LeakyReLU(0.2)(x)
x = ZeroPadding2D((1,1))(x)
x = XConv2D( ndf*8, 4, strides=1, padding='valid')(x)
x = XNormalization(x)
x = LeakyReLU(0.2)(x)
x = ZeroPadding2D((1,1))(x)
return XConv2D( 1, 4, strides=1, padding='valid', activation='sigmoid')(x)#
return func
Model = RecycleGANModel

View file

@ -0,0 +1 @@
from .Model import Model

View file

@ -16,13 +16,12 @@ class SampleType(IntEnum):
FACE = 1 #aligned face unsorted
FACE_YAW_SORTED = 2 #sorted by yaw
FACE_YAW_SORTED_AS_TARGET = 3 #sorted by yaw and included only yaws which exist in TARGET also automatic mirrored
FACE_WITH_CLOSE_TO_SELF = 4
FACE_END = 4
FACE_END = 3
QTY = 5
QTY = 4
class Sample(object):
def __init__(self, sample_type=None, filename=None, face_type=None, shape=None, landmarks=None, ie_polys=None, pitch=None, yaw=None, mirror=None, close_target_list=None, fanseg_mask_exist=False):
def __init__(self, sample_type=None, filename=None, face_type=None, shape=None, landmarks=None, ie_polys=None, pitch=None, yaw=None, source_filename=None, mirror=None, close_target_list=None, fanseg_mask_exist=False):
self.sample_type = sample_type if sample_type is not None else SampleType.IMAGE
self.filename = filename
self.face_type = face_type
@ -31,11 +30,12 @@ class Sample(object):
self.ie_polys = ie_polys
self.pitch = pitch
self.yaw = yaw
self.source_filename = source_filename
self.mirror = mirror
self.close_target_list = close_target_list
self.fanseg_mask_exist = fanseg_mask_exist
def copy_and_set(self, sample_type=None, filename=None, face_type=None, shape=None, landmarks=None, ie_polys=None, pitch=None, yaw=None, mirror=None, close_target_list=None, fanseg_mask=None, fanseg_mask_exist=None):
def copy_and_set(self, sample_type=None, filename=None, face_type=None, shape=None, landmarks=None, ie_polys=None, pitch=None, yaw=None, source_filename=None, mirror=None, close_target_list=None, fanseg_mask=None, fanseg_mask_exist=None):
return Sample(
sample_type=sample_type if sample_type is not None else self.sample_type,
filename=filename if filename is not None else self.filename,
@ -45,6 +45,7 @@ class Sample(object):
ie_polys=ie_polys if ie_polys is not None else self.ie_polys,
pitch=pitch if pitch is not None else self.pitch,
yaw=yaw if yaw is not None else self.yaw,
source_filename=source_filename if source_filename is not None else self.source_filename,
mirror=mirror if mirror is not None else self.mirror,
close_target_list=close_target_list if close_target_list is not None else self.close_target_list,
fanseg_mask_exist=fanseg_mask_exist if fanseg_mask_exist is not None else self.fanseg_mask_exist)

View file

@ -15,7 +15,7 @@ output_sample_types = [
]
'''
class SampleGeneratorFace(SampleGeneratorBase):
def __init__ (self, samples_path, debug, batch_size, sort_by_yaw=False, sort_by_yaw_target_samples_path=None, with_close_to_self=False, sample_process_options=SampleProcessor.Options(), output_sample_types=[], add_sample_idx=False, add_pitch=False, add_yaw=False, generators_count=2, generators_random_seed=None, **kwargs):
def __init__ (self, samples_path, debug, batch_size, sort_by_yaw=False, sort_by_yaw_target_samples_path=None, sample_process_options=SampleProcessor.Options(), output_sample_types=[], add_sample_idx=False, add_pitch=False, add_yaw=False, generators_count=2, generators_random_seed=None, **kwargs):
super().__init__(samples_path, debug, batch_size)
self.sample_process_options = sample_process_options
self.output_sample_types = output_sample_types
@ -27,8 +27,6 @@ class SampleGeneratorFace(SampleGeneratorBase):
self.sample_type = SampleType.FACE_YAW_SORTED_AS_TARGET
elif sort_by_yaw:
self.sample_type = SampleType.FACE_YAW_SORTED
elif with_close_to_self:
self.sample_type = SampleType.FACE_WITH_CLOSE_TO_SELF
else:
self.sample_type = SampleType.FACE

View file

@ -1,19 +1,19 @@
import operator
import traceback
from enum import IntEnum
import cv2
import numpy as np
from pathlib import Path
from utils import Path_utils
from utils.DFLPNG import DFLPNG
from utils.DFLJPG import DFLJPG
import cv2
import numpy as np
from .Sample import Sample
from .Sample import SampleType
from facelib import FaceType
from facelib import LandmarksProcessor
from facelib import FaceType, LandmarksProcessor
from interact import interact as io
from utils import Path_utils
from utils.DFLJPG import DFLJPG
from utils.DFLPNG import DFLPNG
from .Sample import Sample, SampleType
class SampleLoader:
cache = dict()
@ -35,6 +35,10 @@ class SampleLoader:
if datas[sample_type] is None:
datas[sample_type] = SampleLoader.upgradeToFaceSamples( [ Sample(filename=filename) for filename in Path_utils.get_image_paths(samples_path) ] )
# elif sample_type == SampleType.FACE_TEMPORAL_SORTED:
# if datas[sample_type] is None:
# datas[sample_type] = SampleLoader.upgradeToFaceTemporalSortedSamples( SampleLoader.load(SampleType.FACE, samples_path) )
elif sample_type == SampleType.FACE_YAW_SORTED:
if datas[sample_type] is None:
datas[sample_type] = SampleLoader.upgradeToFaceYawSortedSamples( SampleLoader.load(SampleType.FACE, samples_path) )
@ -44,10 +48,6 @@ class SampleLoader:
if target_samples_path is None:
raise Exception('target_samples_path is None for FACE_YAW_SORTED_AS_TARGET')
datas[sample_type] = SampleLoader.upgradeToFaceYawSortedAsTargetSamples( SampleLoader.load(SampleType.FACE_YAW_SORTED, samples_path), SampleLoader.load(SampleType.FACE_YAW_SORTED, target_samples_path) )
elif sample_type == SampleType.FACE_WITH_CLOSE_TO_SELF:
if datas[sample_type] is None:
datas[sample_type] = SampleLoader.upgradeToFaceCloseToSelfSamples( SampleLoader.load(SampleType.FACE, samples_path) )
return datas[sample_type]
@ -78,44 +78,19 @@ class SampleLoader:
ie_polys=dflimg.get_ie_polys(),
pitch=pitch,
yaw=yaw,
source_filename=dflimg.get_source_filename(),
fanseg_mask_exist=dflimg.get_fanseg_mask() is not None, ) )
except:
print ("Unable to load %s , error: %s" % (str(s_filename_path), traceback.format_exc() ) )
return sample_list
@staticmethod
def upgradeToFaceCloseToSelfSamples (samples):
yaw_samples = SampleLoader.upgradeToFaceYawSortedSamples(samples)
yaw_samples_len = len(yaw_samples)
# @staticmethod
# def upgradeToFaceTemporalSortedSamples( samples ):
# new_s = [ (s, s.source_filename) for s in samples]
# new_s = sorted(new_s, key=operator.itemgetter(1))
sample_list = []
for i in io.progress_bar_generator( range(yaw_samples_len), "Sorting"):
if yaw_samples[i] is not None:
for s in yaw_samples[i]:
s_t = []
for n in range(2000):
yaw_idx = np.clip ( i-10 +np.random.randint(20), 0, yaw_samples_len-1 )
if yaw_samples[yaw_idx] is None:
continue
yaw_idx_samples_len = len(yaw_samples[yaw_idx])
yaw_idx_sample = yaw_samples[yaw_idx][ np.random.randint(yaw_idx_samples_len) ]
if s.filename == yaw_idx_sample.filename:
continue
s_t.append ( yaw_idx_sample )
if len(s_t) >= 50:
break
if len(s_t) == 0:
s_t = [s]
sample_list.append( s.copy_and_set(close_target_list = s_t) )
return sample_list
# return [ s[0] for s in new_s]
@staticmethod
def upgradeToFaceYawSortedSamples( samples ):

View file

@ -209,7 +209,7 @@ class SampleProcessor(object):
img_mask = img[...,3:4]
if f & SPTF.MODE_BGR != 0:
img = img
img = img_bgr
elif f & SPTF.MODE_BGR_SHUFFLE != 0:
img_bgr = np.take (img_bgr, np.random.permutation(img_bgr.shape[-1]), axis=-1)
img = np.concatenate ( (img_bgr,img_mask) , -1 )