From 4e39172b2ff5a61cc566e9f9a7037712b7b7a0a7 Mon Sep 17 00:00:00 2001 From: iperov Date: Sat, 13 Apr 2019 16:58:01 +0400 Subject: [PATCH] superb improved fanseg --- converters/ConverterMasked.py | 4 +- facelib/FANSegmentator.py | 145 ++++++++++++++++----------- models/Model_FANSegmentator/Model.py | 20 ++-- nnlib/nnlib.py | 22 +++- 4 files changed, 118 insertions(+), 73 deletions(-) diff --git a/converters/ConverterMasked.py b/converters/ConverterMasked.py index 1b7e3cc..d9b8c1c 100644 --- a/converters/ConverterMasked.py +++ b/converters/ConverterMasked.py @@ -171,13 +171,13 @@ class ConverterMasked(Converter): if self.mask_mode == 3 or self.mask_mode == 5 or self.mask_mode == 6: prd_face_bgr_256 = cv2.resize (prd_face_bgr, (256,256) ) - prd_face_bgr_256_mask = self.fan_seg.extract_from_bgr( prd_face_bgr_256[np.newaxis,...] ) [0] + prd_face_bgr_256_mask = self.fan_seg.extract( prd_face_bgr_256 ) FAN_prd_face_mask_a_0 = cv2.resize (prd_face_bgr_256_mask, (output_size,output_size), cv2.INTER_CUBIC) if self.mask_mode == 4 or self.mask_mode == 5 or self.mask_mode == 6: face_256_mat = LandmarksProcessor.get_transform_mat (img_face_landmarks, 256, face_type=FaceType.FULL) dst_face_256_bgr = cv2.warpAffine(img_bgr, face_256_mat, (256, 256), flags=cv2.INTER_LANCZOS4 ) - dst_face_256_mask = self.fan_seg.extract_from_bgr( dst_face_256_bgr[np.newaxis,...] ) [0] + dst_face_256_mask = self.fan_seg.extract( dst_face_256_bgr ) FAN_dst_face_mask_a_0 = cv2.resize (dst_face_256_mask, (output_size,output_size), cv2.INTER_CUBIC) if self.mask_mode == 3: #FAN-prd diff --git a/facelib/FANSegmentator.py b/facelib/FANSegmentator.py index 09e14af..531b829 100644 --- a/facelib/FANSegmentator.py +++ b/facelib/FANSegmentator.py @@ -1,15 +1,27 @@ -import numpy as np import os -import cv2 +import pickle +from functools import partial from pathlib import Path -from nnlib import nnlib + +import cv2 +import numpy as np + from interact import interact as io +from nnlib import nnlib + +""" +FANSegmentator is designed to segment faces aligned by 2DFAN-4 landmarks extractor. + +using https://github.com/ternaus/TernausNet +TernausNet: U-Net with VGG11 Encoder Pre-Trained on ImageNet for Image Segmentation +""" class FANSegmentator(object): + VERSION = 1 def __init__ (self, resolution, face_type_str, load_weights=True, weights_file_root=None, training=False): exec( nnlib.import_all(), locals(), globals() ) - self.model = FANSegmentator.BuildModel(resolution, ngf=32) + self.model = FANSegmentator.BuildModel(resolution, ngf=64) if weights_file_root: weights_file_root = Path(weights_file_root) @@ -21,12 +33,18 @@ class FANSegmentator(object): if load_weights: self.model.load_weights (str(self.weights_path)) else: - if training: - conv_weights_list = [] - for layer in self.model.layers: - if type(layer) == keras.layers.Conv2D: - conv_weights_list += [layer.weights[0]] # Conv2D kernel_weights - CAInitializerMP(conv_weights_list) + if training: + try: + with open( Path(__file__).parent / 'vgg11_enc_weights.npy', 'rb' ) as f: + d = pickle.loads (f.read()) + + for i in [0,3,6,8,11,13,16,18]: + s = 'features.%d' % i + + self.model.get_layer (s).set_weights ( d[s] ) + except: + io.log_err("Unable to load VGG11 pretrained weights from vgg11_enc_weights.npy") + if training: self.model.compile(loss='mse', optimizer=Adam(tf_cpu_mode=2)) @@ -42,66 +60,75 @@ class FANSegmentator(object): def train_on_batch(self, inp, outp): return self.model.train_on_batch(inp, outp) - def extract_from_bgr (self, input_image): - return np.clip ( (self.model.predict(input_image) + 1) / 2.0, 0, 1.0 ) - + def extract (self, input_image, is_input_tanh=False): + input_shape_len = len(input_image.shape) + if input_shape_len == 3: + input_image = input_image[np.newaxis,...] + + result = np.clip ( self.model.predict( [input_image] ), 0, 1.0 ) + + if input_shape_len == 3: + result = result[0] + + return result + @staticmethod - def BuildModel ( resolution, ngf=64): + def BuildModel ( resolution, ngf=64, norm='', act='lrelu'): exec( nnlib.import_all(), locals(), globals() ) inp = Input ( (resolution,resolution,3) ) x = inp - x = FANSegmentator.EncFlow(ngf=ngf)(x) - x = FANSegmentator.DecFlow(ngf=ngf)(x) + x = FANSegmentator.Flow(ngf=ngf, norm=norm, act=act)(x) model = Model(inp,x) return model @staticmethod - def EncFlow(ngf=64, num_downs=4): + def Flow(ngf=64, num_downs=4, norm='', act='lrelu'): exec( nnlib.import_all(), locals(), globals() ) - use_bias = True - def XNormalization(x): - return InstanceNormalization (axis=3, gamma_initializer=RandomNormal(1., 0.02))(x) - - def downscale (dim): - def func(x): - return LeakyReLU(0.1)(XNormalization(Conv2D(dim, kernel_size=5, strides=2, padding='same', kernel_initializer=RandomNormal(0, 0.02))(x))) - return func - def func(input): x = input - result = [] - for i in range(num_downs): - x = downscale ( min(ngf*(2**i), ngf*8) )(x) - result += [x] - - return result + x0 = x = Conv2D(ngf, kernel_size=3, strides=1, padding='same', activation='relu', name='features.0')(x) + x = MaxPooling2D()(x) + + x1 = x = Conv2D(ngf*2, kernel_size=3, strides=1, padding='same', activation='relu', name='features.3')(x) + x = MaxPooling2D()(x) + + x = Conv2D(ngf*4, kernel_size=3, strides=1, padding='same', activation='relu', name='features.6')(x) + x2 = x = Conv2D(ngf*4, kernel_size=3, strides=1, padding='same', activation='relu', name='features.8')(x) + x = MaxPooling2D()(x) + + x = Conv2D(ngf*8, kernel_size=3, strides=1, padding='same', activation='relu', name='features.11')(x) + x3 = x = Conv2D(ngf*8, kernel_size=3, strides=1, padding='same', activation='relu', name='features.13')(x) + x = MaxPooling2D()(x) + + x = Conv2D(ngf*8, kernel_size=3, strides=1, padding='same', activation='relu', name='features.16')(x) + x4 = x = Conv2D(ngf*8, kernel_size=3, strides=1, padding='same', activation='relu', name='features.18')(x) + x = MaxPooling2D()(x) + + x = Conv2D(ngf*8, kernel_size=3, strides=1, padding='same')(x) + + x = Conv2DTranspose (ngf*4, 3, strides=2, padding='same', activation='relu') (x) + x = Concatenate(axis=3)([ x, x4]) + x = Conv2D (ngf*8, 3, strides=1, padding='same', activation='relu') (x) + + x = Conv2DTranspose (ngf*4, 3, strides=2, padding='same', activation='relu') (x) + x = Concatenate(axis=3)([ x, x3]) + x = Conv2D (ngf*8, 3, strides=1, padding='same', activation='relu') (x) + + x = Conv2DTranspose (ngf*2, 3, strides=2, padding='same', activation='relu') (x) + x = Concatenate(axis=3)([ x, x2]) + x = Conv2D (ngf*4, 3, strides=1, padding='same', activation='relu') (x) + + x = Conv2DTranspose (ngf, 3, strides=2, padding='same', activation='relu') (x) + x = Concatenate(axis=3)([ x, x1]) + x = Conv2D (ngf*2, 3, strides=1, padding='same', activation='relu') (x) + + x = Conv2DTranspose (ngf // 2, 3, strides=2, padding='same', activation='relu') (x) + x = Concatenate(axis=3)([ x, x0]) + x = Conv2D (ngf, 3, strides=1, padding='same', activation='relu') (x) + + return Conv2D(1, 3, strides=1, padding='same', activation='sigmoid')(x) + + return func - - @staticmethod - def DecFlow(output_nc=1, ngf=64, activation='tanh'): - exec (nnlib.import_all(), locals(), globals()) - - use_bias = True - def XNormalization(x): - return InstanceNormalization (axis=3, gamma_initializer=RandomNormal(1., 0.02))(x) - - def Conv2D (filters, kernel_size, strides=(1, 1), padding='valid', data_format=None, dilation_rate=(1, 1), activation=None, use_bias=use_bias, kernel_initializer=RandomNormal(0, 0.02), bias_initializer='zeros', kernel_regularizer=None, bias_regularizer=None, activity_regularizer=None, kernel_constraint=None, bias_constraint=None): - return keras.layers.Conv2D( filters=filters, kernel_size=kernel_size, strides=strides, padding=padding, data_format=data_format, dilation_rate=dilation_rate, activation=activation, use_bias=use_bias, kernel_initializer=kernel_initializer, bias_initializer=bias_initializer, kernel_regularizer=kernel_regularizer, bias_regularizer=bias_regularizer, activity_regularizer=activity_regularizer, kernel_constraint=kernel_constraint, bias_constraint=bias_constraint ) - - def upscale (dim): - def func(x): - return SubpixelUpscaler()( LeakyReLU(0.1)(XNormalization(Conv2D(dim, kernel_size=3, strides=1, padding='same', kernel_initializer=RandomNormal(0, 0.02))(x)))) - return func - - def func(input): - input_len = len(input) - x = input[input_len-1] - for i in range(input_len-1, -1, -1): - x = upscale( min(ngf* (2**i) *4, ngf*8 *4 ) )(x) - if i != 0: - x = Concatenate(axis=3)([ input[i-1] , x]) - - return Conv2D(output_nc, 3, 1, 'same', activation=activation)(x) - return func \ No newline at end of file diff --git a/models/Model_FANSegmentator/Model.py b/models/Model_FANSegmentator/Model.py index 0b0d58d..0153bab 100644 --- a/models/Model_FANSegmentator/Model.py +++ b/models/Model_FANSegmentator/Model.py @@ -37,13 +37,13 @@ class Model(ModelBase): self.set_training_data_generators ([ SampleGeneratorFace(self.training_data_src_path, debug=self.is_debug(), batch_size=self.batch_size, - sample_process_options=SampleProcessor.Options(random_flip=True, motion_blur = [25, 1], normalize_tanh = True ), - output_sample_types=[ [f.TRANSFORMED | f_type | f.MODE_BGR_SHUFFLE | f.OPT_APPLY_MOTION_BLUR, self.resolution], - [f.TRANSFORMED | f_type | f.MODE_M | f.FACE_MASK_FULL, self.resolution] + sample_process_options=SampleProcessor.Options(random_flip=True, motion_blur = [25, 1] ), + output_sample_types=[ [f.WARPED_TRANSFORMED | f_type | f.MODE_BGR_SHUFFLE | f.OPT_APPLY_MOTION_BLUR, self.resolution], + [f.WARPED_TRANSFORMED | f_type | f.MODE_M | f.FACE_MASK_FULL, self.resolution] ]), SampleGeneratorFace(self.training_data_dst_path, debug=self.is_debug(), batch_size=self.batch_size, - sample_process_options=SampleProcessor.Options(random_flip=True, normalize_tanh = True ), + sample_process_options=SampleProcessor.Options(random_flip=True ), output_sample_types=[ [f.TRANSFORMED | f_type | f.MODE_BGR_SHUFFLE, self.resolution] ]) ]) @@ -65,11 +65,9 @@ class Model(ModelBase): test_A = sample[0][0][0:4] #first 4 samples test_B = sample[1][0][0:4] #first 4 samples - mAA = self.fan_seg.extract_from_bgr([test_A]) - mBB = self.fan_seg.extract_from_bgr([test_B]) - - test_A, test_B, = [ np.clip( (x + 1.0)/2.0, 0.0, 1.0) for x in [test_A, test_B] ] - + mAA = self.fan_seg.extract(test_A) + mBB = self.fan_seg.extract(test_B) + mAA = np.repeat ( mAA, (3,), -1) mBB = np.repeat ( mBB, (3,), -1) @@ -89,6 +87,6 @@ class Model(ModelBase): test_B[i,:,:,0:3]*mBB[i], ), axis=1) ) - return [ ('FANSegmentator', np.concatenate ( st, axis=0 ) ), - ('never seen', np.concatenate ( st2, axis=0 ) ), + return [ ('training data', np.concatenate ( st, axis=0 ) ), + ('evaluating data', np.concatenate ( st2, axis=0 ) ), ] diff --git a/nnlib/nnlib.py b/nnlib/nnlib.py index 50003a6..7279218 100644 --- a/nnlib/nnlib.py +++ b/nnlib/nnlib.py @@ -52,7 +52,7 @@ Input = KL.Input Dense = KL.Dense Conv2D = nnlib.Conv2D -Conv2DTranspose = KL.Conv2DTranspose +Conv2DTranspose = nnlib.Conv2DTranspose SeparableConv2D = KL.SeparableConv2D MaxPooling2D = KL.MaxPooling2D UpSampling2D = KL.UpSampling2D @@ -695,6 +695,26 @@ NLayerDiscriminator = nnlib.NLayerDiscriminator x = ReflectionPadding2D( self.pad ) (x) return self.func(x) nnlib.Conv2D = Conv2D + + class Conv2DTranspose(): + def __init__ (self, *args, **kwargs): + self.reflect_pad = False + padding = kwargs.get('padding','') + if padding == 'zero': + kwargs['padding'] = 'same' + if padding == 'reflect': + kernel_size = kwargs['kernel_size'] + if (kernel_size % 2) == 1: + self.pad = (kernel_size // 2,)*2 + kwargs['padding'] = 'valid' + self.reflect_pad = True + self.func = keras.layers.Conv2DTranspose (*args, **kwargs) + + def __call__(self,x): + if self.reflect_pad: + x = ReflectionPadding2D( self.pad ) (x) + return self.func(x) + nnlib.Conv2DTranspose = Conv2DTranspose @staticmethod def import_keras_contrib(device_config):