diff --git a/models/ModelBase.py b/models/ModelBase.py index a597653..25489e4 100644 --- a/models/ModelBase.py +++ b/models/ModelBase.py @@ -10,7 +10,7 @@ from utils import image_utils import numpy as np import cv2 import gpufmkmgr -from .TrainingDataGeneratorBase import TrainingDataGeneratorBase +from samples import SampleGeneratorBase ''' You can implement your own model. Check examples. @@ -47,13 +47,11 @@ class ModelBase(object): self.epoch = model_data['epoch'] self.options = model_data['options'] self.loss_history = model_data['loss_history'] if 'loss_history' in model_data.keys() else [] - self.generator_dict_states = model_data['generator_dict_states'] if 'generator_dict_states' in model_data.keys() else None self.sample_for_preview = model_data['sample_for_preview'] if 'sample_for_preview' in model_data.keys() else None else: self.epoch = 0 self.options = {} self.loss_history = [] - self.generator_dict_states = None self.sample_for_preview = None if self.write_preview_history: @@ -97,11 +95,8 @@ class ModelBase(object): raise Exception( 'You didnt set_training_data_generators()') else: for i, generator in enumerate(self.generator_list): - if not isinstance(generator, TrainingDataGeneratorBase): - raise Exception('training data generator is not subclass of TrainingDataGeneratorBase') - - if self.generator_dict_states is not None and i < len(self.generator_dict_states): - generator.set_dict_state ( self.generator_dict_states[i] ) + if not isinstance(generator, SampleGeneratorBase): + raise Exception('training data generator is not subclass of SampleGeneratorBase') if self.sample_for_preview is None: self.sample_for_preview = self.generate_next_sample() @@ -212,7 +207,6 @@ class ModelBase(object): 'epoch': self.epoch, 'options': self.options, 'loss_history': self.loss_history, - 'generator_dict_states' : [generator.get_dict_state() for generator in self.generator_list], 'sample_for_preview' : self.sample_for_preview } self.model_data_path.write_bytes( pickle.dumps(model_data) ) diff --git a/models/Model_AVATAR/Model.py b/models/Model_AVATAR/Model.py index 6843095..e030ea3 100644 --- a/models/Model_AVATAR/Model.py +++ b/models/Model_AVATAR/Model.py @@ -1,7 +1,7 @@ -from models import ModelBase -from models import TrainingDataType import numpy as np import cv2 +from models import ModelBase +from samples import * from nnlib import tf_dssim from nnlib import DSSIMLossClass from nnlib import conv @@ -72,21 +72,20 @@ class Model(ModelBase): self.BA256_view = K.function ([input_B_warped64], [BA_rec256]) if self.is_training_mode: - from models import TrainingDataGenerator - f = TrainingDataGenerator.SampleTypeFlags + f = SampleProcessor.TypeFlags self.set_training_data_generators ([ - TrainingDataGenerator(TrainingDataType.FACE, self.training_data_src_path, debug=self.is_debug(), batch_size=self.batch_size, output_sample_types=[ - [f.WARPED_TRANSFORMED | f.HALF_FACE | f.MODE_BGR, 64], - [f.TRANSFORMED | f.HALF_FACE | f.MODE_BGR, 64], - [f.TRANSFORMED | f.FULL_FACE | f.MODE_BGR, 256], - [f.SOURCE | f.HALF_FACE | f.MODE_BGR, 64], - [f.SOURCE | f.HALF_FACE | f.MODE_BGR, 256] ] ), + SampleGeneratorFace(self.training_data_src_path, debug=self.is_debug(), batch_size=self.batch_size, output_sample_types=[ + [f.WARPED_TRANSFORMED | f.FACE_ALIGN_HALF | f.MODE_BGR, 64], + [f.TRANSFORMED | f.FACE_ALIGN_HALF | f.MODE_BGR, 64], + [f.TRANSFORMED | f.FACE_ALIGN_FULL | f.MODE_BGR, 256], + [f.SOURCE | f.FACE_ALIGN_HALF | f.MODE_BGR, 64], + [f.SOURCE | f.FACE_ALIGN_HALF | f.MODE_BGR, 256] ] ), - TrainingDataGenerator(TrainingDataType.FACE, self.training_data_dst_path, debug=self.is_debug(), batch_size=self.batch_size, output_sample_types=[ - [f.WARPED_TRANSFORMED | f.HALF_FACE | f.MODE_BGR, 64], - [f.TRANSFORMED | f.HALF_FACE | f.MODE_BGR, 64], - [f.SOURCE | f.HALF_FACE | f.MODE_BGR, 64], - [f.SOURCE | f.HALF_FACE | f.MODE_BGR, 256] ] ) + SampleGeneratorFace(self.training_data_dst_path, debug=self.is_debug(), batch_size=self.batch_size, output_sample_types=[ + [f.WARPED_TRANSFORMED | f.FACE_ALIGN_HALF | f.MODE_BGR, 64], + [f.TRANSFORMED | f.FACE_ALIGN_HALF | f.MODE_BGR, 64], + [f.SOURCE | f.FACE_ALIGN_HALF | f.MODE_BGR, 64], + [f.SOURCE | f.FACE_ALIGN_HALF | f.MODE_BGR, 256] ] ) ]) #override def onSave(self): diff --git a/models/Model_DF/Model.py b/models/Model_DF/Model.py index 76e6897..414b7c0 100644 --- a/models/Model_DF/Model.py +++ b/models/Model_DF/Model.py @@ -1,5 +1,4 @@ from models import ModelBase -from models import TrainingDataType import numpy as np import cv2 @@ -7,7 +6,7 @@ from nnlib import DSSIMMaskLossClass from nnlib import conv from nnlib import upscale from facelib import FaceType - +from samples import * class Model(ModelBase): encoderH5 = 'encoder.h5' @@ -42,11 +41,17 @@ class Model(ModelBase): self.autoencoder_dst.compile(optimizer=optimizer, loss=[dssimloss, 'mse'] ) if self.is_training_mode: - from models import TrainingDataGenerator - f = TrainingDataGenerator.SampleTypeFlags + f = SampleProcessor.TypeFlags self.set_training_data_generators ([ - TrainingDataGenerator(TrainingDataType.FACE, self.training_data_src_path, debug=self.is_debug(), batch_size=self.batch_size, output_sample_types=[ [f.WARPED_TRANSFORMED | f.FULL_FACE | f.MODE_BGR, 128], [f.TRANSFORMED | f.FULL_FACE | f.MODE_BGR, 128], [f.TRANSFORMED | f.FULL_FACE | f.MODE_M | f.MASK_FULL, 128] ], random_flip=True ), - TrainingDataGenerator(TrainingDataType.FACE, self.training_data_dst_path, debug=self.is_debug(), batch_size=self.batch_size, output_sample_types=[ [f.WARPED_TRANSFORMED | f.FULL_FACE | f.MODE_BGR, 128], [f.TRANSFORMED | f.FULL_FACE | f.MODE_BGR, 128], [f.TRANSFORMED | f.FULL_FACE | f.MODE_M | f.MASK_FULL, 128] ], random_flip=True ) + SampleGeneratorFace(self.training_data_src_path, debug=self.is_debug(), batch_size=self.batch_size, + output_sample_types=[ [f.WARPED_TRANSFORMED | f.FACE_ALIGN_FULL | f.MODE_BGR, 128], + [f.TRANSFORMED | f.FACE_ALIGN_FULL | f.MODE_BGR, 128], + [f.TRANSFORMED | f.FACE_ALIGN_FULL | f.MODE_M | f.FACE_MASK_FULL, 128] ] ), + + SampleGeneratorFace(self.training_data_dst_path, debug=self.is_debug(), batch_size=self.batch_size, + output_sample_types=[ [f.WARPED_TRANSFORMED | f.FACE_ALIGN_FULL | f.MODE_BGR, 128], + [f.TRANSFORMED | f.FACE_ALIGN_FULL | f.MODE_BGR, 128], + [f.TRANSFORMED | f.FACE_ALIGN_FULL | f.MODE_M | f.FACE_MASK_FULL, 128] ] ) ]) #override def onSave(self): diff --git a/models/Model_H128/Model.py b/models/Model_H128/Model.py index a24ce7e..9fb6b6a 100644 --- a/models/Model_H128/Model.py +++ b/models/Model_H128/Model.py @@ -1,13 +1,12 @@ -from models import ModelBase -from models import TrainingDataType import numpy as np +import cv2 from nnlib import DSSIMMaskLossClass from nnlib import conv from nnlib import upscale +from models import ModelBase from facelib import FaceType - -import cv2 +from samples import * class Model(ModelBase): @@ -48,11 +47,17 @@ class Model(ModelBase): self.dst_view = K.function([input_dst_bgr],[rec_dst_bgr, rec_dst_mask]) if self.is_training_mode: - from models import TrainingDataGenerator - f = TrainingDataGenerator.SampleTypeFlags + f = SampleProcessor.TypeFlags self.set_training_data_generators ([ - TrainingDataGenerator(TrainingDataType.FACE, self.training_data_src_path, debug=self.is_debug(), batch_size=self.batch_size, output_sample_types=[ [f.WARPED_TRANSFORMED | f.HALF_FACE | f.MODE_BGR, 128], [f.TRANSFORMED | f.HALF_FACE | f.MODE_BGR, 128], [f.TRANSFORMED | f.HALF_FACE | f.MODE_M | f.MASK_FULL, 128] ], random_flip=True ), - TrainingDataGenerator(TrainingDataType.FACE, self.training_data_dst_path, debug=self.is_debug(), batch_size=self.batch_size, output_sample_types=[ [f.WARPED_TRANSFORMED | f.HALF_FACE | f.MODE_BGR, 128], [f.TRANSFORMED | f.HALF_FACE | f.MODE_BGR, 128], [f.TRANSFORMED | f.HALF_FACE | f.MODE_M | f.MASK_FULL, 128] ], random_flip=True ) + SampleGeneratorFace(self.training_data_src_path, debug=self.is_debug(), batch_size=self.batch_size, + output_sample_types=[ [f.WARPED_TRANSFORMED | f.FACE_ALIGN_HALF | f.MODE_BGR, 128], + [f.TRANSFORMED | f.FACE_ALIGN_HALF | f.MODE_BGR, 128], + [f.TRANSFORMED | f.FACE_ALIGN_HALF | f.MODE_M | f.FACE_MASK_FULL, 128] ] ), + + SampleGeneratorFace(self.training_data_dst_path, debug=self.is_debug(), batch_size=self.batch_size, + output_sample_types=[ [f.WARPED_TRANSFORMED | f.FACE_ALIGN_HALF | f.MODE_BGR, 128], + [f.TRANSFORMED | f.FACE_ALIGN_HALF | f.MODE_BGR, 128], + [f.TRANSFORMED | f.FACE_ALIGN_HALF | f.MODE_M | f.FACE_MASK_FULL, 128] ] ) ]) #override diff --git a/models/Model_H64/Model.py b/models/Model_H64/Model.py index 20335cb..b203b63 100644 --- a/models/Model_H64/Model.py +++ b/models/Model_H64/Model.py @@ -1,6 +1,6 @@ from models import ModelBase -from models import TrainingDataType import numpy as np +from samples import * from nnlib import DSSIMMaskLossClass from nnlib import conv @@ -46,11 +46,17 @@ class Model(ModelBase): self.dst_view = K.function([input_dst_bgr],[rec_dst_bgr, rec_dst_mask]) if self.is_training_mode: - from models import TrainingDataGenerator - f = TrainingDataGenerator.SampleTypeFlags + f = SampleProcessor.TypeFlags self.set_training_data_generators ([ - TrainingDataGenerator(TrainingDataType.FACE, self.training_data_src_path, debug=self.is_debug(), batch_size=self.batch_size, output_sample_types=[ [f.WARPED_TRANSFORMED | f.HALF_FACE | f.MODE_BGR, 64], [f.TRANSFORMED | f.HALF_FACE | f.MODE_BGR, 64], [f.TRANSFORMED | f.HALF_FACE | f.MODE_M | f.MASK_FULL, 64] ], random_flip=True ), - TrainingDataGenerator(TrainingDataType.FACE, self.training_data_dst_path, debug=self.is_debug(), batch_size=self.batch_size, output_sample_types=[ [f.WARPED_TRANSFORMED | f.HALF_FACE | f.MODE_BGR, 64], [f.TRANSFORMED | f.HALF_FACE | f.MODE_BGR, 64], [f.TRANSFORMED | f.HALF_FACE | f.MODE_M | f.MASK_FULL, 64] ], random_flip=True ) + SampleGeneratorFace(self.training_data_src_path, debug=self.is_debug(), batch_size=self.batch_size, + output_sample_types=[ [f.WARPED_TRANSFORMED | f.FACE_ALIGN_HALF | f.MODE_BGR, 64], + [f.TRANSFORMED | f.FACE_ALIGN_HALF | f.MODE_BGR, 64], + [f.TRANSFORMED | f.FACE_ALIGN_HALF | f.MODE_M | f.FACE_MASK_FULL, 64] ] ), + + SampleGeneratorFace(self.training_data_dst_path, debug=self.is_debug(), batch_size=self.batch_size, + output_sample_types=[ [f.WARPED_TRANSFORMED | f.FACE_ALIGN_HALF | f.MODE_BGR, 64], + [f.TRANSFORMED | f.FACE_ALIGN_HALF | f.MODE_BGR, 64], + [f.TRANSFORMED | f.FACE_ALIGN_HALF | f.MODE_M | f.FACE_MASK_FULL, 64] ] ) ]) #override diff --git a/models/Model_LIAEF128/Model.py b/models/Model_LIAEF128/Model.py index 215564e..dc2838f 100644 --- a/models/Model_LIAEF128/Model.py +++ b/models/Model_LIAEF128/Model.py @@ -1,5 +1,4 @@ from models import ModelBase -from models import TrainingDataType import numpy as np import cv2 @@ -7,6 +6,7 @@ from nnlib import DSSIMMaskLossClass from nnlib import conv from nnlib import upscale from facelib import FaceType +from samples import * class Model(ModelBase): @@ -48,11 +48,17 @@ class Model(ModelBase): self.autoencoder_dst.compile(optimizer=optimizer, loss=[dssimloss, 'mse'] ) if self.is_training_mode: - from models import TrainingDataGenerator - f = TrainingDataGenerator.SampleTypeFlags + f = SampleProcessor.TypeFlags self.set_training_data_generators ([ - TrainingDataGenerator(TrainingDataType.FACE, self.training_data_src_path, debug=self.is_debug(), batch_size=self.batch_size, output_sample_types=[ [f.WARPED_TRANSFORMED | f.FULL_FACE | f.MODE_BGR, 128], [f.TRANSFORMED | f.FULL_FACE | f.MODE_BGR, 128], [f.TRANSFORMED | f.FULL_FACE | f.MODE_M | f.MASK_FULL, 128] ], random_flip=True ), - TrainingDataGenerator(TrainingDataType.FACE, self.training_data_dst_path, debug=self.is_debug(), batch_size=self.batch_size, output_sample_types=[ [f.WARPED_TRANSFORMED | f.FULL_FACE | f.MODE_BGR, 128], [f.TRANSFORMED | f.FULL_FACE | f.MODE_BGR, 128], [f.TRANSFORMED | f.FULL_FACE | f.MODE_M | f.MASK_FULL, 128] ], random_flip=True ) + SampleGeneratorFace(self.training_data_src_path, debug=self.is_debug(), batch_size=self.batch_size, + output_sample_types=[ [f.WARPED_TRANSFORMED | f.FACE_ALIGN_FULL | f.MODE_BGR, 128], + [f.TRANSFORMED | f.FACE_ALIGN_FULL | f.MODE_BGR, 128], + [f.TRANSFORMED | f.FACE_ALIGN_FULL | f.MODE_M | f.FACE_MASK_FULL, 128] ] ), + + SampleGeneratorFace(self.training_data_dst_path, debug=self.is_debug(), batch_size=self.batch_size, + output_sample_types=[ [f.WARPED_TRANSFORMED | f.FACE_ALIGN_FULL | f.MODE_BGR, 128], + [f.TRANSFORMED | f.FACE_ALIGN_FULL | f.MODE_BGR, 128], + [f.TRANSFORMED | f.FACE_ALIGN_FULL | f.MODE_M | f.FACE_MASK_FULL, 128] ] ) ]) #override diff --git a/models/Model_LIAEF128YAW/Model.py b/models/Model_LIAEF128YAW/Model.py index fc06344..9a9d147 100644 --- a/models/Model_LIAEF128YAW/Model.py +++ b/models/Model_LIAEF128YAW/Model.py @@ -1,5 +1,4 @@ from models import ModelBase -from models import TrainingDataType import numpy as np import cv2 @@ -7,7 +6,8 @@ from nnlib import DSSIMMaskLossClass from nnlib import conv from nnlib import upscale from facelib import FaceType - +from samples import * + class Model(ModelBase): encoderH5 = 'encoder.h5' @@ -47,12 +47,18 @@ class Model(ModelBase): self.autoencoder_src.compile(optimizer=optimizer, loss=[dssimloss, 'mse'] ) self.autoencoder_dst.compile(optimizer=optimizer, loss=[dssimloss, 'mse'] ) - if self.is_training_mode: - from models import TrainingDataGenerator - f = TrainingDataGenerator.SampleTypeFlags + if self.is_training_mode: + f = SampleProcessor.TypeFlags self.set_training_data_generators ([ - TrainingDataGenerator(TrainingDataType.FACE_YAW_SORTED_AS_TARGET, self.training_data_src_path, target_training_data_path=self.training_data_dst_path, debug=self.is_debug(), batch_size=self.batch_size, output_sample_types=[ [f.WARPED_TRANSFORMED | f.FULL_FACE | f.MODE_BGR, 128], [f.TRANSFORMED | f.FULL_FACE | f.MODE_BGR, 128], [f.TRANSFORMED | f.FULL_FACE | f.MODE_M | f.MASK_FULL, 128] ], random_flip=True ), - TrainingDataGenerator(TrainingDataType.FACE, self.training_data_dst_path, debug=self.is_debug(), batch_size=self.batch_size, output_sample_types=[ [f.WARPED_TRANSFORMED | f.FULL_FACE | f.MODE_BGR, 128], [f.TRANSFORMED | f.FULL_FACE | f.MODE_BGR, 128], [f.TRANSFORMED | f.FULL_FACE | f.MODE_M | f.MASK_FULL, 128] ], random_flip=True ) + SampleGeneratorFace(self.training_data_src_path, sort_by_yaw_target_samples_path=self.training_data_dst_path, debug=self.is_debug(), batch_size=self.batch_size, + output_sample_types=[ [f.WARPED_TRANSFORMED | f.FACE_ALIGN_FULL | f.MODE_BGR, 128], + [f.TRANSFORMED | f.FACE_ALIGN_FULL | f.MODE_BGR, 128], + [f.TRANSFORMED | f.FACE_ALIGN_FULL | f.MODE_M | f.FACE_MASK_FULL, 128] ] ), + + SampleGeneratorFace(self.training_data_dst_path, debug=self.is_debug(), batch_size=self.batch_size, + output_sample_types=[ [f.WARPED_TRANSFORMED | f.FACE_ALIGN_FULL | f.MODE_BGR, 128], + [f.TRANSFORMED | f.FACE_ALIGN_FULL | f.MODE_BGR, 128], + [f.TRANSFORMED | f.FACE_ALIGN_FULL | f.MODE_M | f.FACE_MASK_FULL, 128] ] ) ]) #override diff --git a/models/Model_MIAEF128/Model.py b/models/Model_MIAEF128/Model.py index 042c905..f309e86 100644 --- a/models/Model_MIAEF128/Model.py +++ b/models/Model_MIAEF128/Model.py @@ -1,12 +1,12 @@ -from models import ModelBase -from models import TrainingDataType import numpy as np import cv2 +from models import ModelBase from nnlib import DSSIMMaskLossClass from nnlib import conv from nnlib import upscale from facelib import FaceType +from samples import * class Model(ModelBase): @@ -82,11 +82,18 @@ class Model(ModelBase): self.autoencoder_dst.compile(optimizer=optimizer, loss=[dssimloss, 'mse'] ) if self.is_training_mode: - from models import TrainingDataGenerator - f = TrainingDataGenerator.SampleTypeFlags + f = SampleProcessor.TypeFlags self.set_training_data_generators ([ - TrainingDataGenerator(TrainingDataType.FACE, self.training_data_src_path, debug=self.is_debug(), batch_size=self.batch_size, output_sample_types=[ [f.WARPED_TRANSFORMED | f.FULL_FACE | f.MODE_GGG, 128], [f.TRANSFORMED | f.FULL_FACE | f.MODE_G , 128], [f.TRANSFORMED | f.FULL_FACE | f.MODE_M | f.MASK_FULL, 128], [f.TRANSFORMED | f.FULL_FACE | f.MODE_GGG, 128] ], random_flip=True ), - TrainingDataGenerator(TrainingDataType.FACE, self.training_data_dst_path, debug=self.is_debug(), batch_size=self.batch_size, output_sample_types=[ [f.WARPED_TRANSFORMED | f.FULL_FACE | f.MODE_BGR, 128], [f.TRANSFORMED | f.FULL_FACE | f.MODE_BGR, 128], [f.TRANSFORMED | f.FULL_FACE | f.MODE_M | f.MASK_FULL, 128]], random_flip=True ) + SampleGeneratorFace(self.training_data_src_path, debug=self.is_debug(), batch_size=self.batch_size, + output_sample_types=[ [f.WARPED_TRANSFORMED | f.FACE_ALIGN_FULL | f.MODE_GGG, 128], + [f.TRANSFORMED | f.FACE_ALIGN_FULL | f.MODE_G , 128], + [f.TRANSFORMED | f.FACE_ALIGN_FULL | f.MODE_M | f.FACE_MASK_FULL, 128], + [f.TRANSFORMED | f.FACE_ALIGN_FULL | f.MODE_GGG, 128] ] ), + + SampleGeneratorFace(self.training_data_dst_path, debug=self.is_debug(), batch_size=self.batch_size, + output_sample_types=[ [f.WARPED_TRANSFORMED | f.FACE_ALIGN_FULL | f.MODE_BGR, 128], + [f.TRANSFORMED | f.FACE_ALIGN_FULL | f.MODE_BGR, 128], + [f.TRANSFORMED | f.FACE_ALIGN_FULL | f.MODE_M | f.FACE_MASK_FULL, 128]] ) ]) #override def onSave(self): diff --git a/models/TrainingDataGenerator.py b/models/TrainingDataGenerator.py deleted file mode 100644 index 624abfa..0000000 --- a/models/TrainingDataGenerator.py +++ /dev/null @@ -1,149 +0,0 @@ -from facelib import FaceType -from facelib import LandmarksProcessor -import cv2 -import numpy as np -from models import TrainingDataGeneratorBase -from utils import image_utils -from utils import random_utils -from enum import IntEnum -from models import TrainingDataType - -class TrainingDataGenerator(TrainingDataGeneratorBase): - class SampleTypeFlags(IntEnum): - SOURCE = 0x000001, - WARPED = 0x000002, - WARPED_TRANSFORMED = 0x000004, - TRANSFORMED = 0x000008, - - HALF_FACE = 0x000010, - FULL_FACE = 0x000020, - HEAD_FACE = 0x000040, - AVATAR_FACE = 0x000080, - MARK_ONLY_FACE = 0x000100, - - MODE_BGR = 0x001000, #BGR - MODE_G = 0x002000, #Grayscale - MODE_GGG = 0x004000, #3xGrayscale - MODE_M = 0x008000, #mask only - MODE_BGR_SHUFFLE = 0x010000, #BGR shuffle - - MASK_FULL = 0x100000, - MASK_EYES = 0x200000, - - #overrided - def onInitialize(self, random_flip=False, normalize_tanh=False, rotation_range=[-10,10], scale_range=[-0.05, 0.05], tx_range=[-0.05, 0.05], ty_range=[-0.05, 0.05], output_sample_types=[], **kwargs): - self.random_flip = random_flip - self.normalize_tanh = normalize_tanh - self.output_sample_types = output_sample_types - self.rotation_range = rotation_range - self.scale_range = scale_range - self.tx_range = tx_range - self.ty_range = ty_range - - #overrided - def onProcessSample(self, sample, debug): - source = sample.load_bgr() - h,w,c = source.shape - - is_face_sample = self.trainingdatatype >= TrainingDataType.FACE_BEGIN and self.trainingdatatype <= TrainingDataType.FACE_END - - if debug and is_face_sample: - LandmarksProcessor.draw_landmarks (source, sample.landmarks, (0, 1, 0)) - - params = image_utils.gen_warp_params(source, self.random_flip, rotation_range=self.rotation_range, scale_range=self.scale_range, tx_range=self.tx_range, ty_range=self.ty_range ) - - images = [[None]*3 for _ in range(4)] - - outputs = [] - for t,size in self.output_sample_types: - if t & self.SampleTypeFlags.SOURCE != 0: - img_type = 0 - elif t & self.SampleTypeFlags.WARPED != 0: - img_type = 1 - elif t & self.SampleTypeFlags.WARPED_TRANSFORMED != 0: - img_type = 2 - elif t & self.SampleTypeFlags.TRANSFORMED != 0: - img_type = 3 - else: - raise ValueError ('expected SampleTypeFlags type') - - mask_type = 0 - if t & self.SampleTypeFlags.MASK_FULL != 0: - mask_type = 1 - elif t & self.SampleTypeFlags.MASK_EYES != 0: - mask_type = 2 - - if images[img_type][mask_type] is None: - img = source - if is_face_sample: - if mask_type == 1: - img = np.concatenate( (img, LandmarksProcessor.get_image_hull_mask (source, sample.landmarks) ), -1 ) - elif mask_type == 2: - mask = LandmarksProcessor.get_image_eye_mask (source, sample.landmarks) - mask = np.expand_dims (cv2.blur (mask, ( w // 32, w // 32 ) ), -1) - mask[mask > 0.0] = 1.0 - img = np.concatenate( (img, mask ), -1 ) - - images[img_type][mask_type] = image_utils.warp_by_params (params, img, (img_type==1 or img_type==2), (img_type==2 or img_type==3), img_type != 0) - - img = images[img_type][mask_type] - - target_face_type = -1 - if t & self.SampleTypeFlags.HALF_FACE != 0: - target_face_type = FaceType.HALF - elif t & self.SampleTypeFlags.FULL_FACE != 0: - target_face_type = FaceType.FULL - elif t & self.SampleTypeFlags.HEAD_FACE != 0: - target_face_type = FaceType.HEAD - elif t & self.SampleTypeFlags.AVATAR_FACE != 0: - target_face_type = FaceType.AVATAR - elif t & self.SampleTypeFlags.MARK_ONLY_FACE != 0: - target_face_type = FaceType.MARK_ONLY - - if is_face_sample and target_face_type != -1 and target_face_type != FaceType.MARK_ONLY: - if target_face_type > sample.face_type: - raise Exception ('sample %s type %s does not match model requirement %s. Consider extract necessary type of faces.' % (sample.filename, sample.face_type, target_face_type) ) - - img = cv2.warpAffine( img, LandmarksProcessor.get_transform_mat (sample.landmarks, size, target_face_type), (size,size), flags=cv2.INTER_LANCZOS4 ) - else: - img = cv2.resize( img, (size,size), cv2.INTER_LANCZOS4 ) - - img_bgr = img[...,0:3] - img_mask = img[...,3:4] - - if t & self.SampleTypeFlags.MODE_BGR != 0: - img = img - elif t & self.SampleTypeFlags.MODE_BGR_SHUFFLE != 0: - img_bgr = np.take (img_bgr, np.random.permutation(img_bgr.shape[-1]), axis=-1) - img = np.concatenate ( (img_bgr,img_mask) , -1 ) - elif t & self.SampleTypeFlags.MODE_G != 0: - img = np.concatenate ( (np.expand_dims(cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY),-1),img_mask) , -1 ) - elif t & self.SampleTypeFlags.MODE_GGG != 0: - img = np.concatenate ( ( np.repeat ( np.expand_dims(cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY),-1), (3,), -1), img_mask), -1) - elif is_face_sample and t & self.SampleTypeFlags.MODE_M != 0: - if mask_type== 0: - raise ValueError ('no mask mode defined') - img = img_mask - else: - raise ValueError ('expected SampleTypeFlags mode') - - if not debug and self.normalize_tanh: - img = img * 2.0 - 1.0 - - outputs.append ( img ) - - if debug: - result = () - - for output in outputs: - if output.shape[2] < 4: - result += (output,) - elif output.shape[2] == 4: - result += (output[...,0:3]*output[...,3:4],) - - return result - else: - return outputs - - - \ No newline at end of file diff --git a/models/TrainingDataGeneratorBase.py b/models/TrainingDataGeneratorBase.py deleted file mode 100644 index 2f78685..0000000 --- a/models/TrainingDataGeneratorBase.py +++ /dev/null @@ -1,240 +0,0 @@ -import traceback -import random -from pathlib import Path -from tqdm import tqdm -import numpy as np -import cv2 -from utils.DFLPNG import DFLPNG -from utils import iter_utils -from utils import Path_utils -from .BaseTypes import TrainingDataType -from .BaseTypes import TrainingDataSample -from facelib import FaceType -from facelib import LandmarksProcessor - -''' -You can implement your own TrainingDataGenerator -''' -class TrainingDataGeneratorBase(object): - cache = dict() - - #DONT OVERRIDE - #use YourOwnTrainingDataGenerator (..., your_opt=1) - #and then this opt will be passed in YourOwnTrainingDataGenerator.onInitialize ( your_opt ) - def __init__ (self, trainingdatatype, training_data_path, target_training_data_path=None, debug=False, batch_size=1, **kwargs): - if not isinstance(trainingdatatype, TrainingDataType): - raise Exception('TrainingDataGeneratorBase() trainingdatatype is not TrainingDataType') - - if training_data_path is None: - raise Exception('training_data_path is None') - - self.training_data_path = Path(training_data_path) - self.target_training_data_path = Path(target_training_data_path) if target_training_data_path is not None else None - - self.debug = debug - self.batch_size = 1 if self.debug else batch_size - self.trainingdatatype = trainingdatatype - self.data = TrainingDataGeneratorBase.load (trainingdatatype, self.training_data_path, self.target_training_data_path) - - if self.debug: - self.generators = [iter_utils.ThisThreadGenerator ( self.batch_func, self.data)] - else: - if len(self.data) > 1: - self.generators = [iter_utils.SubprocessGenerator ( self.batch_func, self.data[0::2] ), - iter_utils.SubprocessGenerator ( self.batch_func, self.data[1::2] )] - else: - self.generators = [iter_utils.SubprocessGenerator ( self.batch_func, self.data )] - - self.generator_counter = -1 - self.onInitialize(**kwargs) - - #overridable - def onInitialize(self, **kwargs): - #your TrainingDataGenerator initialization here - pass - - #overridable - def onProcessSample(self, sample, debug): - #process sample and return tuple of images for your model in onTrainOneEpoch - return ( np.zeros( (64,64,4), dtype=np.float32 ), ) - - def __iter__(self): - return self - - def __next__(self): - self.generator_counter += 1 - generator = self.generators[self.generator_counter % len(self.generators) ] - x = next(generator) - return x - - def batch_func(self, data): - data_len = len(data) - if data_len == 0: - raise ValueError('No training data provided.') - - if self.trainingdatatype == TrainingDataType.FACE_YAW_SORTED or self.trainingdatatype == TrainingDataType.FACE_YAW_SORTED_AS_TARGET: - if all ( [ x == None for x in data] ): - raise ValueError('Not enough training data. Gather more faces!') - - if self.trainingdatatype == TrainingDataType.IMAGE or self.trainingdatatype == TrainingDataType.FACE: - shuffle_idxs = [] - elif self.trainingdatatype == TrainingDataType.FACE_YAW_SORTED or self.trainingdatatype == TrainingDataType.FACE_YAW_SORTED_AS_TARGET: - shuffle_idxs = [] - shuffle_idxs_2D = [[]]*data_len - - while True: - - batches = None - for n_batch in range(0, self.batch_size): - while True: - sample = None - - if self.trainingdatatype == TrainingDataType.IMAGE or self.trainingdatatype == TrainingDataType.FACE: - if len(shuffle_idxs) == 0: - shuffle_idxs = [ i for i in range(0, data_len) ] - random.shuffle(shuffle_idxs) - idx = shuffle_idxs.pop() - sample = data[ idx ] - elif self.trainingdatatype == TrainingDataType.FACE_YAW_SORTED or self.trainingdatatype == TrainingDataType.FACE_YAW_SORTED_AS_TARGET: - if len(shuffle_idxs) == 0: - shuffle_idxs = [ i for i in range(0, data_len) ] - random.shuffle(shuffle_idxs) - - idx = shuffle_idxs.pop() - if data[idx] != None: - if len(shuffle_idxs_2D[idx]) == 0: - shuffle_idxs_2D[idx] = [ i for i in range(0, len(data[idx])) ] - random.shuffle(shuffle_idxs_2D[idx]) - - idx2 = shuffle_idxs_2D[idx].pop() - sample = data[idx][idx2] - - if sample is not None: - try: - x = self.onProcessSample (sample, self.debug) - except: - raise Exception ("Exception occured in sample %s. Error: %s" % (sample.filename, traceback.format_exc() ) ) - - if type(x) != tuple and type(x) != list: - raise Exception('TrainingDataGenerator.onProcessSample() returns NOT tuple/list') - - x_len = len(x) - if batches is None: - batches = [ [] for _ in range(0,x_len) ] - - for i in range(0,x_len): - batches[i].append ( x[i] ) - - break - - yield [ np.array(batch) for batch in batches] - - def get_dict_state(self): - return {} - - def set_dict_state(self, state): - pass - - @staticmethod - def load(trainingdatatype, training_data_path, target_training_data_path=None): - cache = TrainingDataGeneratorBase.cache - - if str(training_data_path) not in cache.keys(): - cache[str(training_data_path)] = [None]*TrainingDataType.QTY - - if target_training_data_path is not None and str(target_training_data_path) not in cache.keys(): - cache[str(target_training_data_path)] = [None]*TrainingDataType.QTY - - datas = cache[str(training_data_path)] - - if trainingdatatype == TrainingDataType.IMAGE: - if datas[trainingdatatype] is None: - datas[trainingdatatype] = [ TrainingDataSample(filename=filename) for filename in tqdm( Path_utils.get_image_paths(training_data_path), desc="Loading" ) ] - - elif trainingdatatype == TrainingDataType.FACE: - if datas[trainingdatatype] is None: - datas[trainingdatatype] = X_LOAD( [ TrainingDataSample(filename=filename) for filename in Path_utils.get_image_paths(training_data_path) ] ) - - elif trainingdatatype == TrainingDataType.FACE_YAW_SORTED: - if datas[trainingdatatype] is None: - datas[trainingdatatype] = X_YAW_SORTED( TrainingDataGeneratorBase.load(TrainingDataType.FACE, training_data_path) ) - - elif trainingdatatype == TrainingDataType.FACE_YAW_SORTED_AS_TARGET: - if datas[trainingdatatype] is None: - if target_training_data_path is None: - raise Exception('target_training_data_path is None for FACE_YAW_SORTED_AS_TARGET') - datas[trainingdatatype] = X_YAW_AS_Y_SORTED( TrainingDataGeneratorBase.load(TrainingDataType.FACE_YAW_SORTED, training_data_path), TrainingDataGeneratorBase.load(TrainingDataType.FACE_YAW_SORTED, target_training_data_path) ) - - return datas[trainingdatatype] - -def X_LOAD ( RAWS ): - sample_list = [] - - for s in tqdm( RAWS, desc="Loading" ): - - s_filename_path = Path(s.filename) - if s_filename_path.suffix != '.png': - print ("%s is not a png file required for training" % (s_filename_path.name) ) - continue - - dflpng = DFLPNG.load ( str(s_filename_path), print_on_no_embedded_data=True ) - if dflpng is None: - continue - - sample_list.append( s.copy_and_set(face_type=FaceType.fromString (dflpng.get_face_type()), - shape=dflpng.get_shape(), - landmarks=dflpng.get_landmarks(), - yaw=dflpng.get_yaw_value()) ) - - return sample_list - -def X_YAW_SORTED( YAW_RAWS ): - - lowest_yaw, highest_yaw = -32, +32 - gradations = 64 - diff_rot_per_grad = abs(highest_yaw-lowest_yaw) / gradations - - yaws_sample_list = [None]*gradations - - for i in tqdm( range(0, gradations), desc="Sorting" ): - yaw = lowest_yaw + i*diff_rot_per_grad - next_yaw = lowest_yaw + (i+1)*diff_rot_per_grad - - yaw_samples = [] - for s in YAW_RAWS: - s_yaw = s.yaw - if (i == 0 and s_yaw < next_yaw) or \ - (i < gradations-1 and s_yaw >= yaw and s_yaw < next_yaw) or \ - (i == gradations-1 and s_yaw >= yaw): - yaw_samples.append ( s ) - - if len(yaw_samples) > 0: - yaws_sample_list[i] = yaw_samples - - return yaws_sample_list - -def X_YAW_AS_Y_SORTED (s, t): - l = len(s) - if l != len(t): - raise Exception('X_YAW_AS_Y_SORTED() s_len != t_len') - b = l // 2 - - s_idxs = np.argwhere ( np.array ( [ 1 if x != None else 0 for x in s] ) == 1 )[:,0] - t_idxs = np.argwhere ( np.array ( [ 1 if x != None else 0 for x in t] ) == 1 )[:,0] - - new_s = [None]*l - - for t_idx in t_idxs: - search_idxs = [] - for i in range(0,l): - search_idxs += [t_idx - i, (l-t_idx-1) - i, t_idx + i, (l-t_idx-1) + i] - - for search_idx in search_idxs: - if search_idx in s_idxs: - mirrored = ( t_idx != search_idx and ((t_idx < b and search_idx >= b) or (search_idx < b and t_idx >= b)) ) - new_s[t_idx] = [ sample.copy_and_set(mirror=True, yaw=-sample.yaw, landmarks=LandmarksProcessor.mirror_landmarks (sample.landmarks, sample.shape[1] )) - for sample in s[search_idx] - ] if mirrored else s[search_idx] - break - - return new_s diff --git a/models/__init__.py b/models/__init__.py index 0515eb8..e20adde 100644 --- a/models/__init__.py +++ b/models/__init__.py @@ -1,12 +1,7 @@ -from .BaseTypes import TrainingDataType -from .BaseTypes import TrainingDataSample - from .ModelBase import ModelBase from .ConverterBase import ConverterBase from .ConverterMasked import ConverterMasked from .ConverterImage import ConverterImage -from .TrainingDataGeneratorBase import TrainingDataGeneratorBase -from .TrainingDataGenerator import TrainingDataGenerator def import_model(name): module = __import__('Model_'+name, globals(), locals(), [], 1) diff --git a/nnlib/__init__.py b/nnlib/__init__.py index 3942dd3..84004c0 100644 --- a/nnlib/__init__.py +++ b/nnlib/__init__.py @@ -43,7 +43,36 @@ def DSSIMMaskLossClass(tf): return total_loss return DSSIMMaskLoss - + +def DSSIMPatchMaskLossClass(tf): + class DSSIMPatchMaskLoss(object): + def __init__(self, mask_list, is_tanh=False): + self.mask_list = mask_list + self.is_tanh = is_tanh + + def __call__(self,y_true, y_pred): + total_loss = None + for mask in self.mask_list: + #import code + #code.interact(local=dict(globals(), **locals())) + + y_true = tf.extract_image_patches ( y_true, (1,9,9,1), (1,1,1,1), (1,8,8,1), 'VALID' ) + y_pred = tf.extract_image_patches ( y_pred, (1,9,9,1), (1,1,1,1), (1,8,8,1), 'VALID' ) + mask = tf.extract_image_patches ( tf.tile(mask,[1,1,1,3]) , (1,9,9,1), (1,1,1,1), (1,8,8,1), 'VALID' ) + if not self.is_tanh: + loss = (1.0 - tf.image.ssim (y_true*mask, y_pred*mask, 1.0)) / 2.0 + else: + loss = (1.0 - tf.image.ssim ( (y_true/2+0.5)*(mask/2+0.5), (y_pred/2+0.5)*(mask/2+0.5), 1.0)) / 2.0 + + if total_loss is None: + total_loss = loss + else: + total_loss += loss + + return total_loss + + return DSSIMPatchMaskLoss + def DSSIMLossClass(tf): class DSSIMLoss(object): def __init__(self, is_tanh=False): @@ -57,6 +86,125 @@ def DSSIMLossClass(tf): return DSSIMLoss +def rgb_to_lab(tf, rgb_input): + with tf.name_scope("rgb_to_lab"): + srgb_pixels = tf.reshape(rgb_input, [-1, 3]) + + with tf.name_scope("srgb_to_xyz"): + linear_mask = tf.cast(srgb_pixels <= 0.04045, dtype=tf.float32) + exponential_mask = tf.cast(srgb_pixels > 0.04045, dtype=tf.float32) + rgb_pixels = (srgb_pixels / 12.92 * linear_mask) + (((srgb_pixels + 0.055) / 1.055) ** 2.4) * exponential_mask + rgb_to_xyz = tf.constant([ + # X Y Z + [0.412453, 0.212671, 0.019334], # R + [0.357580, 0.715160, 0.119193], # G + [0.180423, 0.072169, 0.950227], # B + ]) + xyz_pixels = tf.matmul(rgb_pixels, rgb_to_xyz) + + # https://en.wikipedia.org/wiki/Lab_color_space#CIELAB-CIEXYZ_conversions + with tf.name_scope("xyz_to_cielab"): + # convert to fx = f(X/Xn), fy = f(Y/Yn), fz = f(Z/Zn) + + # normalize for D65 white point + xyz_normalized_pixels = tf.multiply(xyz_pixels, [1/0.950456, 1.0, 1/1.088754]) + + epsilon = 6/29 + linear_mask = tf.cast(xyz_normalized_pixels <= (epsilon**3), dtype=tf.float32) + exponential_mask = tf.cast(xyz_normalized_pixels > (epsilon**3), dtype=tf.float32) + fxfyfz_pixels = (xyz_normalized_pixels / (3 * epsilon**2) + 4/29) * linear_mask + (xyz_normalized_pixels ** (1/3)) * exponential_mask + + # convert to lab + fxfyfz_to_lab = tf.constant([ + # l a b + [ 0.0, 500.0, 0.0], # fx + [116.0, -500.0, 200.0], # fy + [ 0.0, 0.0, -200.0], # fz + ]) + lab_pixels = tf.matmul(fxfyfz_pixels, fxfyfz_to_lab) + tf.constant([-16.0, 0.0, 0.0]) + #output [0, 100] , ~[-110, 110], ~[-110, 110] + lab_pixels = lab_pixels / tf.constant([100.0, 220.0, 220.0 ]) + tf.constant([0.0, 0.5, 0.5]) + #output [0-1, 0-1, 0-1] + return tf.reshape(lab_pixels, tf.shape(rgb_input)) + +def lab_to_rgb(tf, lab): + with tf.name_scope("lab_to_rgb"): + lab_pixels = tf.reshape(lab, [-1, 3]) + + # https://en.wikipedia.org/wiki/Lab_color_space#CIELAB-CIEXYZ_conversions + with tf.name_scope("cielab_to_xyz"): + # convert to fxfyfz + lab_to_fxfyfz = tf.constant([ + # fx fy fz + [1/116.0, 1/116.0, 1/116.0], # l + [1/500.0, 0.0, 0.0], # a + [ 0.0, 0.0, -1/200.0], # b + ]) + fxfyfz_pixels = tf.matmul(lab_pixels + tf.constant([16.0, 0.0, 0.0]), lab_to_fxfyfz) + + # convert to xyz + epsilon = 6/29 + linear_mask = tf.cast(fxfyfz_pixels <= epsilon, dtype=tf.float32) + exponential_mask = tf.cast(fxfyfz_pixels > epsilon, dtype=tf.float32) + xyz_pixels = (3 * epsilon**2 * (fxfyfz_pixels - 4/29)) * linear_mask + (fxfyfz_pixels ** 3) * exponential_mask + + # denormalize for D65 white point + xyz_pixels = tf.multiply(xyz_pixels, [0.950456, 1.0, 1.088754]) + + with tf.name_scope("xyz_to_srgb"): + xyz_to_rgb = tf.constant([ + # r g b + [ 3.2404542, -0.9692660, 0.0556434], # x + [-1.5371385, 1.8760108, -0.2040259], # y + [-0.4985314, 0.0415560, 1.0572252], # z + ]) + rgb_pixels = tf.matmul(xyz_pixels, xyz_to_rgb) + # avoid a slightly negative number messing up the conversion + rgb_pixels = tf.clip_by_value(rgb_pixels, 0.0, 1.0) + linear_mask = tf.cast(rgb_pixels <= 0.0031308, dtype=tf.float32) + exponential_mask = tf.cast(rgb_pixels > 0.0031308, dtype=tf.float32) + srgb_pixels = (rgb_pixels * 12.92 * linear_mask) + ((rgb_pixels ** (1/2.4) * 1.055) - 0.055) * exponential_mask + + return tf.reshape(srgb_pixels, tf.shape(lab)) + +def DSSIMPatchLossClass(tf, keras): + class DSSIMPatchLoss(object): + def __init__(self, is_tanh=False): + self.is_tanh = is_tanh + + def __call__(self,y_true, y_pred): + + y_pred_lab = rgb_to_lab(tf, y_pred) + y_true_lab = rgb_to_lab(tf, y_true) + + + #import code + #code.interact(local=dict(globals(), **locals())) + + return keras.backend.mean ( keras.backend.square(y_true_lab - y_pred_lab) ) # + (1.0 - tf.image.ssim (y_true, y_pred, 1.0)) / 2.0 + + if not self.is_tanh: + return (1.0 - tf.image.ssim (y_true, y_pred, 1.0)) / 2.0 + else: + return (1.0 - tf.image.ssim ((y_true/2+0.5), (y_pred/2+0.5), 1.0)) / 2.0 + + #y_true_72 = tf.extract_image_patches ( y_true, (1,8,8,1), (1,1,1,1), (1,8,8,1), 'VALID' ) + #y_pred_72 = tf.extract_image_patches ( y_pred, (1,8,8,1), (1,1,1,1), (1,8,8,1), 'VALID' ) + + #y_true_36 = tf.extract_image_patches ( y_true, (1,8,8,1), (1,2,2,1), (1,8,8,1), 'VALID' ) + #y_pred_36 = tf.extract_image_patches ( y_pred, (1,8,8,1), (1,2,2,1), (1,8,8,1), 'VALID' ) + + #if not self.is_tanh: + # return (1.0 - tf.image.ssim (y_true_72, y_pred_72, 1.0)) / 2.0 + \ + # (1.0 - tf.image.ssim (y_true_36, y_pred_36, 1.0)) / 2.0 + # + #else: + # return (1.0 - tf.image.ssim ((y_true_72/2+0.5), (y_pred_72/2+0.5), 1.0)) / 2.0 + \ + # (1.0 - tf.image.ssim ((y_true_36/2+0.5), (y_pred_36/2+0.5), 1.0)) / 2.0 + + + return DSSIMPatchLoss + def MSEMaskLossClass(keras): class MSEMaskLoss(object): def __init__(self, mask_list, is_tanh=False): @@ -208,4 +356,291 @@ def total_variation_loss(keras, x): a = K.square(x[:, :H - 1, :W - 1, :] - x[:, 1:, :W - 1, :]) b = K.square(x[:, :H - 1, :W - 1, :] - x[:, :H - 1, 1:, :]) - return K.mean (a+b) \ No newline at end of file + return K.mean (a+b) + + +# Defines the Unet generator. +# |num_downs|: number of downsamplings in UNet. For example, +# if |num_downs| == 7, image of size 128x128 will become of size 1x1 +# at the bottleneck +def UNet(keras, output_nc, num_downs, ngf=64, use_dropout=False): + Conv2D = keras.layers.convolutional.Conv2D + Conv2DTranspose = keras.layers.convolutional.Conv2DTranspose + LeakyReLU = keras.layers.advanced_activations.LeakyReLU + BatchNormalization = keras.layers.BatchNormalization + ReLU = keras.layers.ReLU + tanh = keras.layers.Activation('tanh') + Dropout = keras.layers.Dropout + Concatenate = keras.layers.Concatenate + ZeroPadding2D = keras.layers.ZeroPadding2D + + conv_kernel_initializer = keras.initializers.RandomNormal(0, 0.02) + norm_gamma_initializer = keras.initializers.RandomNormal(1, 0.02) + + def UNetSkipConnection(outer_nc, inner_nc, sub_model=None, outermost=False, innermost=False, use_dropout=False): + def func(inp): + downconv_pad = ZeroPadding2D( (1,1) ) + downconv = Conv2D(inner_nc, kernel_size=4, kernel_initializer=conv_kernel_initializer, strides=2, padding='valid', use_bias=False) + downrelu = LeakyReLU(0.2) + downnorm = BatchNormalization( gamma_initializer=norm_gamma_initializer ) + + #upconv_pad = ZeroPadding2D( (0,0) ) + upconv = Conv2DTranspose(outer_nc, kernel_size=4, kernel_initializer=conv_kernel_initializer, strides=2, padding='same', use_bias=False) + uprelu = ReLU() + upnorm = BatchNormalization( gamma_initializer=norm_gamma_initializer ) + + if outermost: + x = inp + x = downconv(downconv_pad(x)) + x = sub_model(x) + x = uprelu(x) + #x = upconv(upconv_pad(x)) + x = upconv(x) + x = tanh(x) + elif innermost: + x = inp + x = downrelu(x) + x = downconv(downconv_pad(x)) + x = uprelu(x) + # + # + #x = upconv(upconv_pad(x)) + x = upconv(x) + x = upnorm(x) + + #import code + #code.interact(local=dict(globals(), **locals())) + x = Concatenate(axis=3)([inp, x]) + + else: + x = inp + x = downrelu(x) + x = downconv(downconv_pad(x)) + x = downnorm(x) + x = sub_model(x) + x = uprelu(x) + #x = upconv(upconv_pad(x)) + x = upconv(x) + x = upnorm(x) + if use_dropout: + x = Dropout(0.5)(x) + x = Concatenate(axis=3)([inp, x]) + + return x + + return func + + def func(inp): + unet_block = UNetSkipConnection(ngf * 8, ngf * 8, sub_model=None, innermost=True) + + for i in range(num_downs - 5): + unet_block = UNetSkipConnection(ngf * 8, ngf * 8, sub_model=unet_block, use_dropout=use_dropout) + + unet_block = UNetSkipConnection(ngf * 4 , ngf * 8, sub_model=unet_block) + unet_block = UNetSkipConnection(ngf * 2 , ngf * 4, sub_model=unet_block) + unet_block = UNetSkipConnection(ngf , ngf * 2, sub_model=unet_block) + unet_block = UNetSkipConnection(output_nc, ngf , sub_model=unet_block, outermost=True) + + return unet_block(inp) + + return func + +#predicts future_image_tensor based on past_image_tensor +def UNetStreamPredictor(keras, tf, output_nc, num_downs, ngf=32, use_dropout=False): + Conv2D = keras.layers.convolutional.Conv2D + Conv2DTranspose = keras.layers.convolutional.Conv2DTranspose + LeakyReLU = keras.layers.advanced_activations.LeakyReLU + BatchNormalization = keras.layers.BatchNormalization + ReLU = keras.layers.ReLU + tanh = keras.layers.Activation('tanh') + ReflectionPadding2D = ReflectionPadding2DClass(keras, tf) + ZeroPadding2D = keras.layers.ZeroPadding2D + Dropout = keras.layers.Dropout + Concatenate = keras.layers.Concatenate + + conv_kernel_initializer = keras.initializers.RandomNormal(0, 0.02) + norm_gamma_initializer = keras.initializers.RandomNormal(1, 0.02) + + def func(past_image_tensor, future_image_tensor): + def model1(inp): + x = inp + x = ReflectionPadding2D((3,3))(x) + x = Conv2D(ngf, kernel_size=7, kernel_initializer=conv_kernel_initializer, strides=1, padding='valid', use_bias=False)(x) + x = BatchNormalization( gamma_initializer=norm_gamma_initializer )(x) + x = ReLU()(x) + + x = ZeroPadding2D((1,1))(x) + x = Conv2D(ngf*2, kernel_size=3, kernel_initializer=conv_kernel_initializer, strides=1, padding='valid', use_bias=False)(x) + x = BatchNormalization( gamma_initializer=norm_gamma_initializer )(x) + x = ReLU()(x) + + x = ZeroPadding2D((1,1))(x) + x = Conv2D(ngf*4, kernel_size=3, kernel_initializer=conv_kernel_initializer, strides=1, padding='valid', use_bias=False)(x) + x = BatchNormalization( gamma_initializer=norm_gamma_initializer )(x) + x = ReLU()(x) + + return x + + def model3(inp): + x = inp + + x = ZeroPadding2D((1,1))(x) + x = Conv2D(ngf*2, kernel_size=3, kernel_initializer=conv_kernel_initializer, strides=1, padding='valid', use_bias=False)(x) + x = BatchNormalization( gamma_initializer=norm_gamma_initializer )(x) + x = ReLU()(x) + + x = ZeroPadding2D((1,1))(x) + x = Conv2D(ngf, kernel_size=3, kernel_initializer=conv_kernel_initializer, strides=1, padding='valid', use_bias=False)(x) + x = BatchNormalization( gamma_initializer=norm_gamma_initializer )(x) + x = ReLU()(x) + + x = ReflectionPadding2D((3,3))(x) + x = Conv2D(output_nc, kernel_size=7, kernel_initializer=conv_kernel_initializer, strides=1, padding='valid', use_bias=False)(x) + x = tanh(x) + return x + + model = UNet(keras, ngf*4, num_downs=num_downs, ngf=ngf*4*4, #ngf=ngf*4*4, + use_dropout=use_dropout) + return model3 ( model( Concatenate(axis=3)([ model1(past_image_tensor), model1(future_image_tensor) ]) ) ) + + return func + + +def Resnet(keras, tf, output_nc, ngf=64, use_dropout=False, n_blocks=6): + Conv2D = keras.layers.convolutional.Conv2D + Conv2DTranspose = keras.layers.convolutional.Conv2DTranspose + LeakyReLU = keras.layers.advanced_activations.LeakyReLU + BatchNormalization = keras.layers.BatchNormalization + ReLU = keras.layers.ReLU + Add = keras.layers.Add + tanh = keras.layers.Activation('tanh') + ReflectionPadding2D = ReflectionPadding2DClass(keras, tf) + ZeroPadding2D = keras.layers.ZeroPadding2D + Dropout = keras.layers.Dropout + Concatenate = keras.layers.Concatenate + + conv_kernel_initializer = keras.initializers.RandomNormal(0, 0.02) + norm_gamma_initializer = keras.initializers.RandomNormal(1, 0.02) + use_bias = False + + def ResnetBlock(dim, use_dropout, use_bias): + + def func(inp): + x = inp + + x = ReflectionPadding2D((1,1))(x) + x = Conv2D(dim, kernel_size=3, kernel_initializer=conv_kernel_initializer, padding='valid', use_bias=use_bias)(x) + x = BatchNormalization( gamma_initializer=norm_gamma_initializer )(x) + x = ReLU()(x) + + if use_dropout: + x = Dropout(0.5)(x) + + x = ReflectionPadding2D((1,1))(x) + x = Conv2D(dim, kernel_size=3, kernel_initializer=conv_kernel_initializer, padding='valid', use_bias=use_bias)(x) + x = BatchNormalization( gamma_initializer=norm_gamma_initializer )(x) + + return Add()([x,inp]) + + return func + + def func(inp): + x = inp + + x = ReflectionPadding2D((3,3))(x) + x = Conv2D(ngf, kernel_size=7, kernel_initializer=conv_kernel_initializer, padding='valid', use_bias=use_bias)(x) + x = BatchNormalization( gamma_initializer=norm_gamma_initializer )(x) + x = ReLU()(x) + + n_downsampling = 2 + for i in range(n_downsampling): + x = ZeroPadding2D( (1,1) ) (x) + x = Conv2D(ngf * (2**i) * 2, kernel_size=3, kernel_initializer=conv_kernel_initializer, strides=2, padding='valid', use_bias=use_bias)(x) + x = BatchNormalization( gamma_initializer=norm_gamma_initializer )(x) + x = ReLU()(x) + + for i in range(n_blocks): + x = ResnetBlock(ngf*(2**n_downsampling), use_dropout=use_dropout, use_bias=use_bias)(x) + + for i in range(n_downsampling): + x = ZeroPadding2D( (1,1) ) (x) + x = Conv2DTranspose( int(ngf* (2**(n_downsampling - i)) /2), kernel_size=3, kernel_initializer=conv_kernel_initializer, strides=2, padding='valid', output_padding=1, use_bias=use_bias)(x) + x = BatchNormalization( gamma_initializer=norm_gamma_initializer )(x) + x = ReLU()(x) + + x = ReflectionPadding2D((3,3))(x) + x = Conv2D(output_nc, kernel_size=7, kernel_initializer=conv_kernel_initializer, padding='valid')(x) + x = tanh(x) + + return x + + return func + +def NLayerDiscriminator(keras, tf, ndf=64, n_layers=3, use_sigmoid=False): + Conv2D = keras.layers.convolutional.Conv2D + Conv2DTranspose = keras.layers.convolutional.Conv2DTranspose + LeakyReLU = keras.layers.advanced_activations.LeakyReLU + BatchNormalization = keras.layers.BatchNormalization + ReLU = keras.layers.ReLU + Add = keras.layers.Add + tanh = keras.layers.Activation('tanh') + sigmoid = keras.layers.Activation('sigmoid') + ZeroPadding2D = keras.layers.ZeroPadding2D + Dropout = keras.layers.Dropout + Concatenate = keras.layers.Concatenate + + conv_kernel_initializer = keras.initializers.RandomNormal(0, 0.02) + norm_gamma_initializer = keras.initializers.RandomNormal(1, 0.02) + use_bias = False + + def func(inp): + x = inp + + x = ZeroPadding2D( (1,1) ) (x) + x = Conv2D(ndf, kernel_size=4, kernel_initializer=conv_kernel_initializer, strides=2, padding='valid', use_bias=use_bias)(x) + x = LeakyReLU(0.2)(x) + + nf_mult = 1 + nf_mult_prev = 1 + for n in range(1, n_layers): + nf_mult = min(2**n, 8) + + x = ZeroPadding2D( (1,1) ) (x) + x = Conv2D(ndf * nf_mult, kernel_size=4, kernel_initializer=conv_kernel_initializer, strides=2, padding='valid', use_bias=use_bias)(x) + x = BatchNormalization( gamma_initializer=norm_gamma_initializer )(x) + x = LeakyReLU(0.2)(x) + + nf_mult = min(2**n_layers, 8) + + #x = ZeroPadding2D( (1,1) ) (x) + x = Conv2D(ndf * nf_mult, kernel_size=4, kernel_initializer=conv_kernel_initializer, strides=1, padding='same', use_bias=use_bias)(x) + x = BatchNormalization( gamma_initializer=norm_gamma_initializer )(x) + x = LeakyReLU(0.2)(x) + + #x = ZeroPadding2D( (1,1) ) (x) + x = Conv2D(1, kernel_size=4, kernel_initializer=conv_kernel_initializer, strides=1, padding='same', use_bias=use_bias)(x) + + if use_sigmoid: + x = sigmoid(x) + + return x + + return func + +def ReflectionPadding2DClass(keras, tf): + + class ReflectionPadding2D(keras.layers.Layer): + def __init__(self, padding=(1, 1), **kwargs): + self.padding = tuple(padding) + self.input_spec = [keras.layers.InputSpec(ndim=4)] + super(ReflectionPadding2D, self).__init__(**kwargs) + + def compute_output_shape(self, s): + """ If you are using "channels_last" configuration""" + return (s[0], s[1] + 2 * self.padding[0], s[2] + 2 * self.padding[1], s[3]) + + def call(self, x, mask=None): + w_pad,h_pad = self.padding + return tf.pad(x, [[0,0], [h_pad,h_pad], [w_pad,w_pad], [0,0] ], 'REFLECT') + + return ReflectionPadding2D \ No newline at end of file diff --git a/models/BaseTypes.py b/samples/Sample.py similarity index 73% rename from models/BaseTypes.py rename to samples/Sample.py index 98f6136..ce456d7 100644 --- a/models/BaseTypes.py +++ b/samples/Sample.py @@ -1,11 +1,8 @@ from enum import IntEnum import cv2 import numpy as np -from random import randint -from facelib import FaceType - - -class TrainingDataType(IntEnum): + +class SampleType(IntEnum): IMAGE = 0 #raw image FACE_BEGIN = 1 @@ -16,10 +13,9 @@ class TrainingDataType(IntEnum): QTY = 4 - -class TrainingDataSample(object): - - def __init__(self, filename=None, face_type=None, shape=None, landmarks=None, yaw=None, mirror=None, nearest_target_list=None): +class Sample(object): + def __init__(self, sample_type=None, filename=None, face_type=None, shape=None, landmarks=None, yaw=None, mirror=None, nearest_target_list=None): + self.sample_type = sample_type if sample_type is not None else SampleType.IMAGE self.filename = filename self.face_type = face_type self.shape = shape @@ -28,8 +24,9 @@ class TrainingDataSample(object): self.mirror = mirror self.nearest_target_list = nearest_target_list - def copy_and_set(self, filename=None, face_type=None, shape=None, landmarks=None, yaw=None, mirror=None, nearest_target_list=None): - return TrainingDataSample( + def copy_and_set(self, sample_type=None, filename=None, face_type=None, shape=None, landmarks=None, yaw=None, mirror=None, nearest_target_list=None): + return Sample( + sample_type=sample_type if sample_type is not None else self.sample_type, filename=filename if filename is not None else self.filename, face_type=face_type if face_type is not None else self.face_type, shape=shape if shape is not None else self.shape, diff --git a/samples/SampleGeneratorBase.py b/samples/SampleGeneratorBase.py new file mode 100644 index 0000000..505fdb7 --- /dev/null +++ b/samples/SampleGeneratorBase.py @@ -0,0 +1,25 @@ +from pathlib import Path + +''' +You can implement your own SampleGenerator +''' +class SampleGeneratorBase(object): + + + def __init__ (self, samples_path, debug, batch_size): + if samples_path is None: + raise Exception('samples_path is None') + + self.samples_path = Path(samples_path) + self.debug = debug + self.batch_size = 1 if self.debug else batch_size + + #overridable + def __iter__(self): + #implement your own iterator + return self + + def __next__(self): + #implement your own iterator + return None + diff --git a/samples/SampleGeneratorFace.py b/samples/SampleGeneratorFace.py new file mode 100644 index 0000000..c8204cd --- /dev/null +++ b/samples/SampleGeneratorFace.py @@ -0,0 +1,114 @@ +import traceback +import numpy as np +import random +import cv2 + +from utils import iter_utils + +from samples import SampleType +from samples import SampleProcessor +from samples import SampleLoader +from samples import SampleGeneratorBase + +''' +output_sample_types = [ + [SampleProcessor.TypeFlags, size, (optional)random_sub_size] , + ... + ] +''' +class SampleGeneratorFace(SampleGeneratorBase): + def __init__ (self, samples_path, debug, batch_size, sort_by_yaw=False, sort_by_yaw_target_samples_path=None, sample_process_options=SampleProcessor.Options(), output_sample_types=[], **kwargs): + super().__init__(samples_path, debug, batch_size) + self.sample_process_options = sample_process_options + self.output_sample_types = output_sample_types + + if sort_by_yaw_target_samples_path is not None: + self.sample_type = SampleType.FACE_YAW_SORTED_AS_TARGET + elif sort_by_yaw: + self.sample_type = SampleType.FACE_YAW_SORTED + else: + self.sample_type = SampleType.FACE + + self.samples = SampleLoader.load (self.sample_type, self.samples_path, sort_by_yaw_target_samples_path) + + if self.debug: + self.generator_samples = [ self.samples ] + self.generators = [iter_utils.ThisThreadGenerator ( self.batch_func, 0 )] + else: + if len(self.samples) > 1: + self.generator_samples = [ self.samples[0::2], + self.samples[1::2] ] + self.generators = [iter_utils.SubprocessGenerator ( self.batch_func, 0 ), + iter_utils.SubprocessGenerator ( self.batch_func, 1 )] + else: + self.generator_samples = [ self.samples ] + self.generators = [iter_utils.SubprocessGenerator ( self.batch_func, 0 )] + + self.generator_counter = -1 + + def __iter__(self): + return self + + def __next__(self): + self.generator_counter += 1 + generator = self.generators[self.generator_counter % len(self.generators) ] + return next(generator) + + def batch_func(self, generator_id): + samples = self.generator_samples[generator_id] + data_len = len(samples) + if data_len == 0: + raise ValueError('No training data provided.') + + if self.sample_type == SampleType.FACE_YAW_SORTED or self.sample_type == SampleType.FACE_YAW_SORTED_AS_TARGET: + if all ( [ x == None for x in samples] ): + raise ValueError('Not enough training data. Gather more faces!') + + if self.sample_type == SampleType.FACE: + shuffle_idxs = [] + elif self.sample_type == SampleType.FACE_YAW_SORTED or self.sample_type == SampleType.FACE_YAW_SORTED_AS_TARGET: + shuffle_idxs = [] + shuffle_idxs_2D = [[]]*data_len + + while True: + + batches = None + for n_batch in range(self.batch_size): + while True: + sample = None + + if self.sample_type == SampleType.FACE: + if len(shuffle_idxs) == 0: + shuffle_idxs = random.sample( range(data_len), data_len ) + idx = shuffle_idxs.pop() + sample = samples[ idx ] + elif self.sample_type == SampleType.FACE_YAW_SORTED or self.sample_type == SampleType.FACE_YAW_SORTED_AS_TARGET: + if len(shuffle_idxs) == 0: + shuffle_idxs = random.sample( range(data_len), data_len ) + + idx = shuffle_idxs.pop() + if samples[idx] != None: + if len(shuffle_idxs_2D[idx]) == 0: + shuffle_idxs_2D[idx] = random.sample( range(len(samples[idx])), len(samples[idx]) ) + + idx2 = shuffle_idxs_2D[idx].pop() + sample = samples[idx][idx2] + + if sample is not None: + try: + x = SampleProcessor.process (sample, self.sample_process_options, self.output_sample_types, self.debug) + except: + raise Exception ("Exception occured in sample %s. Error: %s" % (sample.filename, traceback.format_exc() ) ) + + if type(x) != tuple and type(x) != list: + raise Exception('SampleProcessor.process returns NOT tuple/list') + + if batches is None: + batches = [ [] for _ in range(len(x)) ] + + for i in range(len(x)): + batches[i].append ( x[i] ) + + break + + yield [ np.array(batch) for batch in batches] diff --git a/samples/SampleLoader.py b/samples/SampleLoader.py new file mode 100644 index 0000000..b74f4da --- /dev/null +++ b/samples/SampleLoader.py @@ -0,0 +1,128 @@ +from enum import IntEnum +import cv2 +import numpy as np +from tqdm import tqdm +from pathlib import Path + +from utils import Path_utils +from utils.DFLPNG import DFLPNG + +from .Sample import Sample +from .Sample import SampleType + +from facelib import FaceType +from facelib import LandmarksProcessor + +class SampleLoader: + cache = dict() + + @staticmethod + def load(sample_type, samples_path, target_samples_path=None): + cache = SampleLoader.cache + + if str(samples_path) not in cache.keys(): + cache[str(samples_path)] = [None]*SampleType.QTY + + if target_samples_path is not None and str(target_samples_path) not in cache.keys(): + cache[str(target_samples_path)] = [None]*SampleType.QTY + + datas = cache[str(samples_path)] + + if sample_type == SampleType.IMAGE: + if datas[sample_type] is None: + datas[sample_type] = [ Sample(filename=filename) for filename in tqdm( Path_utils.get_image_paths(samples_path), desc="Loading" ) ] + + elif sample_type == SampleType.FACE: + if datas[sample_type] is None: + datas[sample_type] = SampleLoader.upgradeToFaceSamples( [ Sample(filename=filename) for filename in Path_utils.get_image_paths(samples_path) ] ) + + elif sample_type == SampleType.FACE_YAW_SORTED: + if datas[sample_type] is None: + datas[sample_type] = SampleLoader.upgradeToFaceYawSortedSamples( SampleLoader.load(SampleType.FACE, samples_path) ) + + elif sample_type == SampleType.FACE_YAW_SORTED_AS_TARGET: + if datas[sample_type] is None: + if target_samples_path is None: + raise Exception('target_samples_path is None for FACE_YAW_SORTED_AS_TARGET') + datas[sample_type] = SampleLoader.upgradeToFaceYawSortedAsTargetSamples( SampleLoader.load(SampleType.FACE_YAW_SORTED, samples_path), SampleLoader.load(SampleType.FACE_YAW_SORTED, target_samples_path) ) + + return datas[sample_type] + + @staticmethod + def upgradeToFaceSamples ( samples ): + sample_list = [] + + for s in tqdm( samples, desc="Loading" ): + + s_filename_path = Path(s.filename) + if s_filename_path.suffix != '.png': + print ("%s is not a png file required for training" % (s_filename_path.name) ) + continue + + dflpng = DFLPNG.load ( str(s_filename_path), print_on_no_embedded_data=True ) + if dflpng is None: + continue + + sample_list.append( s.copy_and_set(sample_type=SampleType.FACE, + face_type=FaceType.fromString (dflpng.get_face_type()), + shape=dflpng.get_shape(), + landmarks=dflpng.get_landmarks(), + yaw=dflpng.get_yaw_value()) ) + + return sample_list + + @staticmethod + def upgradeToFaceYawSortedSamples( YAW_RAWS ): + + lowest_yaw, highest_yaw = -32, +32 + gradations = 64 + diff_rot_per_grad = abs(highest_yaw-lowest_yaw) / gradations + + yaws_sample_list = [None]*gradations + + for i in tqdm( range(0, gradations), desc="Sorting" ): + yaw = lowest_yaw + i*diff_rot_per_grad + next_yaw = lowest_yaw + (i+1)*diff_rot_per_grad + + yaw_samples = [] + for s in YAW_RAWS: + s_yaw = s.yaw + if (i == 0 and s_yaw < next_yaw) or \ + (i < gradations-1 and s_yaw >= yaw and s_yaw < next_yaw) or \ + (i == gradations-1 and s_yaw >= yaw): + yaw_samples.append ( s.copy_and_set(sample_type=SampleType.FACE_YAW_SORTED) ) + + if len(yaw_samples) > 0: + yaws_sample_list[i] = yaw_samples + + return yaws_sample_list + + @staticmethod + def upgradeToFaceYawSortedAsTargetSamples (s, t): + l = len(s) + if l != len(t): + raise Exception('upgradeToFaceYawSortedAsTargetSamples() s_len != t_len') + b = l // 2 + + s_idxs = np.argwhere ( np.array ( [ 1 if x != None else 0 for x in s] ) == 1 )[:,0] + t_idxs = np.argwhere ( np.array ( [ 1 if x != None else 0 for x in t] ) == 1 )[:,0] + + new_s = [None]*l + + for t_idx in t_idxs: + search_idxs = [] + for i in range(0,l): + search_idxs += [t_idx - i, (l-t_idx-1) - i, t_idx + i, (l-t_idx-1) + i] + + for search_idx in search_idxs: + if search_idx in s_idxs: + mirrored = ( t_idx != search_idx and ((t_idx < b and search_idx >= b) or (search_idx < b and t_idx >= b)) ) + new_s[t_idx] = [ sample.copy_and_set(sample_type=SampleType.FACE_YAW_SORTED_AS_TARGET, + mirror=True, + yaw=-sample.yaw, + landmarks=LandmarksProcessor.mirror_landmarks (sample.landmarks, sample.shape[1] )) + for sample in s[search_idx] + ] if mirrored else s[search_idx] + break + + return new_s \ No newline at end of file diff --git a/samples/SampleProcessor.py b/samples/SampleProcessor.py new file mode 100644 index 0000000..dd22715 --- /dev/null +++ b/samples/SampleProcessor.py @@ -0,0 +1,152 @@ +from enum import IntEnum +import numpy as np +import cv2 +from utils import image_utils +from facelib import LandmarksProcessor +from facelib import FaceType + + +class SampleProcessor(object): + class TypeFlags(IntEnum): + SOURCE = 0x00000001, + WARPED = 0x00000002, + WARPED_TRANSFORMED = 0x00000004, + TRANSFORMED = 0x00000008, + + FACE_ALIGN_HALF = 0x00000010, + FACE_ALIGN_FULL = 0x00000020, + FACE_ALIGN_HEAD = 0x00000040, + FACE_ALIGN_AVATAR = 0x00000080, + FACE_MASK_FULL = 0x00000100, + FACE_MASK_EYES = 0x00000200, + + MODE_BGR = 0x01000000, #BGR + MODE_G = 0x02000000, #Grayscale + MODE_GGG = 0x04000000, #3xGrayscale + MODE_M = 0x08000000, #mask only + MODE_BGR_SHUFFLE = 0x10000000, #BGR shuffle + + class Options(object): + def __init__(self, random_flip = True, normalize_tanh = False, rotation_range=[-10,10], scale_range=[-0.05, 0.05], tx_range=[-0.05, 0.05], ty_range=[-0.05, 0.05]): + self.random_flip = random_flip + self.normalize_tanh = normalize_tanh + self.rotation_range = rotation_range + self.scale_range = scale_range + self.tx_range = tx_range + self.ty_range = ty_range + + @staticmethod + def process (sample, sample_process_options, output_sample_types, debug): + source = sample.load_bgr() + h,w,c = source.shape + + is_face_sample = sample.landmarks is not None + + if debug and is_face_sample: + LandmarksProcessor.draw_landmarks (source, sample.landmarks, (0, 1, 0)) + + params = image_utils.gen_warp_params(source, sample_process_options.random_flip, rotation_range=sample_process_options.rotation_range, scale_range=sample_process_options.scale_range, tx_range=sample_process_options.tx_range, ty_range=sample_process_options.ty_range ) + + images = [[None]*3 for _ in range(4)] + + sample_rnd_seed = np.random.randint(0x80000000) + + outputs = [] + for sample_type in output_sample_types: + f = sample_type[0] + size = sample_type[1] + random_sub_size = 0 if len (sample_type) < 3 else min( sample_type[2] , size) + + if f & SampleProcessor.TypeFlags.SOURCE != 0: + img_type = 0 + elif f & SampleProcessor.TypeFlags.WARPED != 0: + img_type = 1 + elif f & SampleProcessor.TypeFlags.WARPED_TRANSFORMED != 0: + img_type = 2 + elif f & SampleProcessor.TypeFlags.TRANSFORMED != 0: + img_type = 3 + else: + raise ValueError ('expected SampleTypeFlags type') + + face_mask_type = 0 + if f & SampleProcessor.TypeFlags.FACE_MASK_FULL != 0: + face_mask_type = 1 + elif f & SampleProcessor.TypeFlags.FACE_MASK_EYES != 0: + face_mask_type = 2 + + target_face_type = -1 + if f & SampleProcessor.TypeFlags.FACE_ALIGN_HALF != 0: + target_face_type = FaceType.HALF + elif f & SampleProcessor.TypeFlags.FACE_ALIGN_FULL != 0: + target_face_type = FaceType.FULL + elif f & SampleProcessor.TypeFlags.FACE_ALIGN_HEAD != 0: + target_face_type = FaceType.HEAD + elif f & SampleProcessor.TypeFlags.FACE_ALIGN_AVATAR != 0: + target_face_type = FaceType.AVATAR + + if images[img_type][face_mask_type] is None: + img = source + if is_face_sample: + if face_mask_type == 1: + img = np.concatenate( (img, LandmarksProcessor.get_image_hull_mask (source, sample.landmarks) ), -1 ) + elif face_mask_type == 2: + mask = LandmarksProcessor.get_image_eye_mask (source, sample.landmarks) + mask = np.expand_dims (cv2.blur (mask, ( w // 32, w // 32 ) ), -1) + mask[mask > 0.0] = 1.0 + img = np.concatenate( (img, mask ), -1 ) + + images[img_type][face_mask_type] = image_utils.warp_by_params (params, img, (img_type==1 or img_type==2), (img_type==2 or img_type==3), img_type != 0) + + img = images[img_type][face_mask_type] + + if is_face_sample and target_face_type != -1: + if target_face_type > sample.face_type: + raise Exception ('sample %s type %s does not match model requirement %s. Consider extract necessary type of faces.' % (sample.filename, sample.face_type, target_face_type) ) + + img = cv2.warpAffine( img, LandmarksProcessor.get_transform_mat (sample.landmarks, size, target_face_type), (size,size), flags=cv2.INTER_LANCZOS4 ) + else: + img = cv2.resize( img, (size,size), cv2.INTER_LANCZOS4 ) + + if random_sub_size != 0: + sub_size = size - random_sub_size + rnd_state = np.random.RandomState (sample_rnd_seed+random_sub_size) + start_x = rnd_state.randint(sub_size+1) + start_y = rnd_state.randint(sub_size+1) + img = img[start_y:start_y+sub_size,start_x:start_x+sub_size,:] + + img_bgr = img[...,0:3] + img_mask = img[...,3:4] + + if f & SampleProcessor.TypeFlags.MODE_BGR != 0: + img = img + elif f & SampleProcessor.TypeFlags.MODE_BGR_SHUFFLE != 0: + img_bgr = np.take (img_bgr, np.random.permutation(img_bgr.shape[-1]), axis=-1) + img = np.concatenate ( (img_bgr,img_mask) , -1 ) + elif f & SampleProcessor.TypeFlags.MODE_G != 0: + img = np.concatenate ( (np.expand_dims(cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY),-1),img_mask) , -1 ) + elif f & SampleProcessor.TypeFlags.MODE_GGG != 0: + img = np.concatenate ( ( np.repeat ( np.expand_dims(cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY),-1), (3,), -1), img_mask), -1) + elif is_face_sample and f & SampleProcessor.TypeFlags.MODE_M != 0: + if face_mask_type== 0: + raise ValueError ('no face_mask_type defined') + img = img_mask + else: + raise ValueError ('expected SampleTypeFlags mode') + + if not debug and sample_process_options.normalize_tanh: + img = img * 2.0 - 1.0 + + outputs.append ( img ) + + if debug: + result = () + + for output in outputs: + if output.shape[2] < 4: + result += (output,) + elif output.shape[2] == 4: + result += (output[...,0:3]*output[...,3:4],) + + return result + else: + return outputs \ No newline at end of file diff --git a/samples/__init__.py b/samples/__init__.py new file mode 100644 index 0000000..f54da4d --- /dev/null +++ b/samples/__init__.py @@ -0,0 +1,6 @@ +from .Sample import Sample +from .Sample import SampleType +from .SampleLoader import SampleLoader +from .SampleProcessor import SampleProcessor +from .SampleGeneratorBase import SampleGeneratorBase +from .SampleGeneratorFace import SampleGeneratorFace \ No newline at end of file diff --git a/utils/DFLPNG.py b/utils/DFLPNG.py index 48ecf41..27254e5 100644 --- a/utils/DFLPNG.py +++ b/utils/DFLPNG.py @@ -5,6 +5,7 @@ import struct import zlib import pickle import numpy as np +from facelib import FaceType class Chunk(object): def __init__(self, name=None, data=None): @@ -251,6 +252,9 @@ class DFLPNG(object): inst = DFLPNG.load_raw (filename) inst.fcwp_dict = inst.getDFLDictData() + if 'face_type' not in inst.fcwp_dict.keys(): + inst.fcwp_dict['face_type'] = FaceType.toString (FaceType.FULL) + if inst.fcwp_dict == None: if print_on_no_embedded_data: print ( "No DFL data found in %s" % (filename) )