import os import pickle from functools import partial from pathlib import Path import cv2 import numpy as np from interact import interact as io from nnlib import nnlib """ PoseEstimator estimates pitch, yaw, roll, from FAN aligned face. trained on https://www.umdfaces.io based on https://arxiv.org/pdf/1901.06778.pdf HYBRID COARSE-FINE CLASSIFICATION FOR HEAD POSE ESTIMATION """ class PoseEstimator(object): VERSION = 1 def __init__ (self, resolution, face_type_str, load_weights=True, weights_file_root=None, training=False): exec( nnlib.import_all(), locals(), globals() ) self.resolution = resolution self.angles = [60, 45, 30, 10, 2] self.alpha_cat_losses = [7,5,3,1,1] self.class_nums = [ angle+1 for angle in self.angles ] self.encoder, self.decoder, self.model_l = PoseEstimator.BuildModels(resolution, class_nums=self.class_nums) if weights_file_root is not None: weights_file_root = Path(weights_file_root) else: weights_file_root = Path(__file__).parent self.encoder_weights_path = weights_file_root / ('PoseEst_%d_%s_enc.h5' % (resolution, face_type_str) ) self.decoder_weights_path = weights_file_root / ('PoseEst_%d_%s_dec.h5' % (resolution, face_type_str) ) self.l_weights_path = weights_file_root / ('PoseEst_%d_%s_l.h5' % (resolution, face_type_str) ) self.model_weights_path = weights_file_root / ('PoseEst_%d_%s.h5' % (resolution, face_type_str) ) self.input_bgr_shape = (resolution, resolution, 3) def ResamplerFunc(input): mean_t, logvar_t = input return mean_t + K.exp(0.5*logvar_t)*K.random_normal(K.shape(mean_t)) self.BVAEResampler = Lambda ( lambda x: x[0] + K.random_normal(K.shape(x[0])) * K.sqrt(K.exp(0.5*x[1])), output_shape=K.int_shape(self.encoder.outputs[0])[1:] ) inp_t = Input (self.input_bgr_shape) inp_real_t = Input (self.input_bgr_shape) inp_pitch_t = Input ( (1,) ) inp_yaw_t = Input ( (1,) ) inp_roll_t = Input ( (1,) ) mean_t, logvar_t = self.encoder(inp_t) latent_t = self.BVAEResampler([mean_t, logvar_t]) if training: bgr_t = self.decoder (latent_t) pyrs_t = self.model_l(latent_t) else: self.model = Model(inp_t, self.model_l(latent_t) ) pyrs_t = self.model(inp_t) if load_weights: if training: self.encoder.load_weights (str(self.encoder_weights_path)) self.decoder.load_weights (str(self.decoder_weights_path)) self.model_l.load_weights (str(self.l_weights_path)) else: self.model.load_weights (str(self.model_weights_path)) else: def gather_Conv2D_layers(models_list): conv_weights_list = [] for model in models_list: for layer in model.layers: layer_type = type(layer) if layer_type == keras.layers.Conv2D: conv_weights_list += [layer.weights[0]] #Conv2D kernel_weights elif layer_type == keras.engine.training.Model: conv_weights_list += gather_Conv2D_layers ([layer]) return conv_weights_list CAInitializerMP ( gather_Conv2D_layers( [self.encoder, self.decoder] ) ) if training: inp_pyrs_t = [] for class_num in self.class_nums: inp_pyrs_t += [ Input ((3,)) ] pyr_loss = [] for i,class_num in enumerate(self.class_nums): a = self.alpha_cat_losses[i] pyr_loss += [ a*K.mean( K.square ( inp_pyrs_t[i] - pyrs_t[i]) ) ] def BVAELoss(beta=4): def func(input): mean_t, logvar_t = input return beta * K.mean ( K.sum( 0.5*(K.exp(logvar_t)+ K.square(mean_t)-logvar_t-1), axis=1) ) return func BVAE_loss = BVAELoss()([mean_t, logvar_t]) bgr_loss = K.mean(K.sum(K.abs(inp_real_t-bgr_t), axis=[1,2,3])) G_loss = BVAE_loss+bgr_loss pyr_loss = sum(pyr_loss) self.train = K.function ([inp_t, inp_real_t], [ G_loss ], Adam(lr=0.0005, beta_1=0.9, beta_2=0.999).get_updates( G_loss, self.encoder.trainable_weights+self.decoder.trainable_weights ) ) self.train_l = K.function ([inp_t] + inp_pyrs_t, [pyr_loss], Adam(lr=0.0001).get_updates( pyr_loss, self.model_l.trainable_weights) ) self.view = K.function ([inp_t], [ bgr_t, pyrs_t[0] ] ) def __enter__(self): return self def __exit__(self, exc_type=None, exc_value=None, traceback=None): return False #pass exception between __enter__ and __exit__ to outter level def save_weights(self): self.encoder.save_weights (str(self.encoder_weights_path)) self.decoder.save_weights (str(self.decoder_weights_path)) self.model_l.save_weights (str(self.l_weights_path)) inp_t = Input (self.input_bgr_shape) Model(inp_t, self.model_l(self.BVAEResampler(self.encoder(inp_t))) ).save_weights (str(self.model_weights_path)) def train_on_batch(self, warps, imgs, pyr_tanh, skip_bgr_train=False): if not skip_bgr_train: bgr_loss, = self.train( [warps, imgs] ) pyr_loss = 0 else: bgr_loss = 0 feed = [imgs] for i, (angle, class_num) in enumerate(zip(self.angles, self.class_nums)): a = angle / 2 c = np.round( (pyr_tanh+1) * a ) / a -1 #.astype(K.floatx()) feed += [c] pyr_loss, = self.train_l(feed) return bgr_loss, pyr_loss def extract (self, input_image, is_input_tanh=False): if is_input_tanh: raise NotImplemented("is_input_tanh") input_shape_len = len(input_image.shape) if input_shape_len == 3: input_image = input_image[np.newaxis,...] bgr, result, = self.view( [input_image] ) #result = np.clip ( result / (self.angles[0] / 2) - 1, 0.0, 1.0 ) if input_shape_len == 3: bgr = bgr[0] result = result[0] return bgr, result @staticmethod def BuildModels ( resolution, class_nums, ae_dims=128): exec( nnlib.import_all(), locals(), globals() ) x = inp = Input ( (resolution,resolution,3) ) x = PoseEstimator.EncFlow(ae_dims)(x) encoder = Model(inp,x) x = inp = Input ( K.int_shape(encoder.outputs[0][1:]) ) x = PoseEstimator.DecFlow(resolution, ae_dims)(x) decoder = Model(inp,x) x = inp = Input ( K.int_shape(encoder.outputs[0][1:]) ) x = PoseEstimator.LatentFlow(class_nums=class_nums)(x) model_l = Model(inp, x ) return encoder, decoder, model_l @staticmethod def EncFlow(ae_dims): exec( nnlib.import_all(), locals(), globals() ) def downscale (dim, **kwargs): def func(x): return ReLU() ( Conv2D(dim, kernel_size=5, strides=2, padding='same')(x)) return func downscale = partial(downscale) ed_ch_dims = 128 def func(input): x = input x = downscale(64)(x) x = downscale(128)(x) x = downscale(256)(x) x = downscale(512)(x) x = Flatten()(x) x = Dense(256)(x) x = ReLU()(x) x = Dense(256)(x) x = ReLU()(x) mean = Dense(ae_dims)(x) logvar = Dense(ae_dims)(x) return mean, logvar return func @staticmethod def DecFlow(resolution, ae_dims): exec( nnlib.import_all(), locals(), globals() ) def upscale (dim, strides=2, **kwargs): def func(x): return ReLU()( ( Conv2DTranspose(dim, kernel_size=3, strides=strides, padding='same')(x)) ) return func def to_bgr (output_nc, **kwargs): def func(x): return Conv2D(output_nc, kernel_size=5, padding='same', activation='sigmoid')(x) return func upscale = partial(upscale) lowest_dense_res = resolution // 16 def func(input): x = input x = Dense(256)(x) x = ReLU()(x) x = Dense(256)(x) x = ReLU()(x) x = Dense( (lowest_dense_res*lowest_dense_res*256) ) (x) x = ReLU()(x) x = Reshape( (lowest_dense_res,lowest_dense_res,256) )(x) x = upscale(512)(x) x = upscale(256)(x) x = upscale(128)(x) x = upscale(64)(x) x = to_bgr(3)(x) return x return func @staticmethod def LatentFlow(class_nums): exec( nnlib.import_all(), locals(), globals() ) def func(latent): x = latent x = Dense(1024, activation='relu')(x) x = Dropout(0.5)(x) x = Dense(1024, activation='relu')(x) # x = Dropout(0.5)(x) # x = Dense(4096, activation='relu')(x) output = [] for class_num in class_nums: pyr = Dense(3, activation='tanh')(x) output += [pyr] return output #y = Dropout(0.5)(y) #y = Dense(1024, activation='relu')(y) return func # resnet50 = keras.applications.ResNet50(include_top=False, weights=None, input_shape=K.int_shape(x)[1:], pooling='avg') # x = resnet50(x) # output = [] # for class_num in class_nums: # pitch = Dense(class_num)(x) # yaw = Dense(class_num)(x) # roll = Dense(class_num)(x) # output += [pitch,yaw,roll] # return output