From 2605930dfea8db970f884a4f729784f3586173d4 Mon Sep 17 00:00:00 2001 From: iperov Date: Sun, 21 Nov 2021 13:33:04 +0400 Subject: [PATCH] _ --- .../FaceAligner/FaceAlignerTrainerApp.py | 19 +- .../FaceAligner/TrainingDataGenerator.py | 13 +- modelhub/torch/FaceAligner/FaceAligner.py | 191 ++++++++++++++++++ modelhub/torch/__init__.py | 2 +- xlib/path/path.py | 3 +- 5 files changed, 220 insertions(+), 8 deletions(-) create mode 100644 modelhub/torch/FaceAligner/FaceAligner.py diff --git a/apps/trainers/FaceAligner/FaceAlignerTrainerApp.py b/apps/trainers/FaceAligner/FaceAlignerTrainerApp.py index 67e6be1..0106d25 100644 --- a/apps/trainers/FaceAligner/FaceAlignerTrainerApp.py +++ b/apps/trainers/FaceAligner/FaceAlignerTrainerApp.py @@ -53,6 +53,7 @@ class FaceAlignerTrainerApp: self._device_info = None self._batch_size = None self._resolution = None + self._random_warp = None self._iteration = None self._autosave_period = None self._is_training = False @@ -89,6 +90,11 @@ class FaceAlignerTrainerApp: self._resolution = resolution self._training_generator.set_resolution(resolution) + def get_random_warp(self) -> bool: return self._random_warp + def set_random_warp(self, random_warp : bool): + self._random_warp = random_warp + self._training_generator.set_random_warp(random_warp) + def get_iteration(self) -> int: return self._iteration def set_iteration(self, iteration : int): self._iteration = iteration @@ -112,6 +118,7 @@ class FaceAlignerTrainerApp: self.set_device_info( lib_torch.get_device_info_by_index(model_data.get('device_index', -1)) ) self.set_batch_size( model_data.get('batch_size', 64) ) self.set_resolution( model_data.get('resolution', 224) ) + self.set_random_warp( model_data.get('random_warp', True) ) self.set_iteration( model_data.get('iteration', 0) ) self.set_autosave_period( model_data.get('autosave_period', 25) ) self.set_is_training( model_data.get('training', False) ) @@ -123,8 +130,8 @@ class FaceAlignerTrainerApp: def reset_model(self, load : bool = True): while True: - model = tv.mobilenet.mobilenet_v3_large(num_classes=6) - + #model = tv.mobilenet.mobilenet_v3_large(num_classes=6) + model = torch_models.FaceAlignerNet() model.train() model.to(self._device) @@ -158,6 +165,7 @@ class FaceAlignerTrainerApp: d = {'device_index' : self._device_info.get_index(), 'batch_size' : self.get_batch_size(), 'resolution' : self.get_resolution(), + 'random_warp' : self.get_random_warp(), 'iteration' : self.get_iteration(), 'autosave_period' : self.get_autosave_period(), 'training' : self.get_is_training(), @@ -291,7 +299,7 @@ class FaceAlignerTrainerApp: self._is_training: # Inference for both preview and training img_aligned_shifted_t = torch.tensor(training_data.img_aligned_shifted).to(self._device) - shift_uni_mats_pred_t = self._model(img_aligned_shifted_t).view( (-1,2,3) ) + shift_uni_mats_pred_t = self._model(img_aligned_shifted_t)#.view( (-1,2,3) ) if self._is_training: # Training optimization step @@ -340,7 +348,7 @@ class FaceAlignerTrainerApp: dc.DlgChoice(short_name='l', row_def=f'| Print loss history | Last loss = {last_loss:.5f} ', on_choose=self.on_main_dlg_print_loss_history ), - + dc.DlgChoice(short_name='p', row_def='| Show current preview.', on_choose=lambda dlg: (self._ev_request_preview.set(), dlg.recreate().set_current())), @@ -393,6 +401,9 @@ class FaceAlignerTrainerApp: dc.DlgChoice(short_name='v', row_def=f'| Previewing samples | {self._is_previewing_samples}', on_choose=self.on_sample_generator_dlg_previewing_last_samples, ), + + dc.DlgChoice(short_name='rw', row_def=f'| Random warp | {self.get_random_warp()}', + on_choose=lambda dlg: (self.set_random_warp(not self.get_random_warp()), dlg.recreate().set_current()) ), dc.DlgChoice(short_name='r', row_def=f'| Running | {self._training_generator.is_running()}', on_choose=lambda dlg: (self._training_generator.set_running(not self._training_generator.is_running()), dlg.recreate().set_current()) ), diff --git a/apps/trainers/FaceAligner/TrainingDataGenerator.py b/apps/trainers/FaceAligner/TrainingDataGenerator.py index 119c9e5..e6f20b3 100644 --- a/apps/trainers/FaceAligner/TrainingDataGenerator.py +++ b/apps/trainers/FaceAligner/TrainingDataGenerator.py @@ -60,6 +60,9 @@ class TrainingDataGenerator(lib_mp.MPWorker): def set_resolution(self, resolution): self._send_msg('resolution', resolution) + + def set_random_warp(self, random_warp): + self._send_msg('random_warp', random_warp) ###### IMPL HOST def _on_host_sub_message(self, process_id, name, *args, **kwargs): @@ -78,6 +81,7 @@ class TrainingDataGenerator(lib_mp.MPWorker): self._running = False self._batch_size = None self._resolution = None + self._random_warp = None self._n_batch = 0 self._img_aligned_list = [] @@ -97,6 +101,8 @@ class TrainingDataGenerator(lib_mp.MPWorker): self._batch_size, = args elif name == 'resolution': self._resolution, = args + elif name == 'random_warp': + self._random_warp, = args elif name == 'running': self._running , = args @@ -110,7 +116,10 @@ class TrainingDataGenerator(lib_mp.MPWorker): if self._resolution is None: print('Unable to start TrainingGenerator: resolution must be set') running = False - + if self._random_warp is None: + print('Unable to start TrainingGenerator: random_warp must be set') + running = False + if running: if self._sent_buffers_count < 2: batch_size = self._batch_size @@ -182,7 +191,7 @@ class TrainingDataGenerator(lib_mp.MPWorker): rw_grid_ty=rw_grid_ty_range, ) - img_aligned_shifted = fw1.transform(img1, resolution, random_warp=True).astype(np.float32) / 255.0 + img_aligned_shifted = fw1.transform(img1, resolution, random_warp=self._random_warp).astype(np.float32) / 255.0 ip = lib_img.ImageProcessor(img_aligned_shifted) rnd = np.random diff --git a/modelhub/torch/FaceAligner/FaceAligner.py b/modelhub/torch/FaceAligner/FaceAligner.py new file mode 100644 index 0000000..5013651 --- /dev/null +++ b/modelhub/torch/FaceAligner/FaceAligner.py @@ -0,0 +1,191 @@ +from functools import partial +from pathlib import Path +from typing import Union + +import numpy as np +import torch +import torch.nn as nn +import torch.nn.functional as F +from xlib.file import SplittedFile +from xlib.torch import TorchDeviceInfo, get_cpu_device_info + +def _make_divisible(v: float, divisor: int, min_value = None) -> int: + if min_value is None: + min_value = divisor + new_v = max(min_value, int(v + divisor / 2) // divisor * divisor) + if new_v < 0.9 * v: + new_v += divisor + return new_v + +class SqueezeExcitation(nn.Module): + def __init__( self, in_ch: int, squeeze_channels: int, activation = nn.ReLU, scale_activation = nn.Sigmoid): + super().__init__() + self.avgpool = nn.AdaptiveAvgPool2d(1) + self.fc1 = nn.Conv2d(in_ch, squeeze_channels, 1) + self.fc2 = nn.Conv2d(squeeze_channels, in_ch, 1) + self.activation = activation() + self.scale_activation = scale_activation() + + def forward(self, input): + scale = self.avgpool(input) + scale = self.fc1(scale) + scale = self.activation(scale) + scale = self.fc2(scale) + scale = self.scale_activation(scale) + return scale * input + +class ConvNormActivation(nn.Sequential): + def __init__(self, in_ch: int, out_ch: int, kernel_size: int = 3, stride: int = 1, padding = None, groups: int = 1, norm_layer = nn.BatchNorm2d, activation_layer = nn.ReLU,) -> None: + if padding is None: + padding = (kernel_size - 1) // 2 + layers = [torch.nn.Conv2d(in_ch, out_ch, kernel_size, stride, padding, groups=groups, bias=norm_layer is None)] + if norm_layer is not None: + layers.append(norm_layer(out_ch)) + if activation_layer is not None: + layers.append(activation_layer()) + super().__init__(*layers) + + +class InvertedResidual(nn.Module): + def __init__(self, in_ch: int, mid_ch: int, out_ch: int, kernel: int, stride: int, use_se: bool, + hs_act : bool, width_mult: float = 1.0, + norm_layer = None,): + super().__init__() + + in_ch = _make_divisible(in_ch * width_mult, 8) + mid_ch = _make_divisible(mid_ch * width_mult, 8) + out_ch = _make_divisible(out_ch * width_mult, 8) + self._is_res_connect = stride == 1 and in_ch == out_ch + activation_layer = nn.Hardswish if hs_act else nn.ReLU + + layers = [] + + if mid_ch != in_ch: + layers.append(ConvNormActivation(in_ch, mid_ch, kernel_size=1, norm_layer=norm_layer, activation_layer=activation_layer)) + + layers.append(ConvNormActivation(mid_ch, mid_ch, kernel_size=kernel, stride=stride, groups=mid_ch, norm_layer=norm_layer, activation_layer=activation_layer)) + + if use_se: + layers.append( SqueezeExcitation(mid_ch, _make_divisible(mid_ch // 4, 8), scale_activation=nn.Hardsigmoid) ) + + layers.append(ConvNormActivation(mid_ch, out_ch, kernel_size=1, norm_layer=norm_layer, activation_layer=None)) + + self.block = nn.Sequential(*layers) + self.out_ch = out_ch + + def forward(self, input): + result = self.block(input) + if self._is_res_connect: + result = result + input + return result + +class FaceAlignerNet(nn.Module): + def __init__(self): + super().__init__() + + norm_layer = partial(nn.BatchNorm2d, eps=0.001, momentum=0.01) + + self.c0 = ConvNormActivation(3, 16, kernel_size=3, stride=2, norm_layer=norm_layer, activation_layer=nn.Hardswish) + + self.c1 = c1 = InvertedResidual ( 16, 16, 16, 3, 1, use_se=False, hs_act=False, norm_layer=norm_layer) + self.c2 = c2 = InvertedResidual ( 16, 64, 24, 3, 2, use_se=False, hs_act=False, norm_layer=norm_layer) + self.c3 = c3 = InvertedResidual ( 24, 72, 24, 3, 1, use_se=False, hs_act=False, norm_layer=norm_layer) + self.c4 = c4 = InvertedResidual ( 24, 72, 40, 5, 2, use_se=True, hs_act=False, norm_layer=norm_layer) + self.c5 = c5 = InvertedResidual ( 40, 120, 40, 5, 1, use_se=True, hs_act=False, norm_layer=norm_layer) + self.c6 = c6 = InvertedResidual ( 40, 120, 40, 5, 1, use_se=True, hs_act=False, norm_layer=norm_layer) + self.c7 = c7 = InvertedResidual ( 40, 240, 80, 3, 2, use_se=False, hs_act=True, norm_layer=norm_layer) + self.c8 = c8 = InvertedResidual ( 80, 200, 80, 3, 1, use_se=False, hs_act=True, norm_layer=norm_layer) + self.c9 = c9 = InvertedResidual ( 80, 184, 80, 3, 1, use_se=False, hs_act=True, norm_layer=norm_layer) + self.c10 = c10 = InvertedResidual( 80, 184, 80, 3, 1, use_se=False, hs_act=True, norm_layer=norm_layer) + self.c11 = c11 = InvertedResidual( 80, 480, 112, 3, 1, use_se=True, hs_act=True, norm_layer=norm_layer) + self.c12 = c12 = InvertedResidual( 112, 672, 112, 3, 1, use_se=True, hs_act=True, norm_layer=norm_layer) + self.c13 = c13 = InvertedResidual( 112, 672, 160, 5, 2, use_se=True, hs_act=True, norm_layer=norm_layer) + self.c14 = c14 = InvertedResidual( 160, 960, 160, 5, 1, use_se=True, hs_act=True, norm_layer=norm_layer) + self.c15 = c15 = InvertedResidual( 160, 960, 160, 5, 1, use_se=True, hs_act=True, norm_layer=norm_layer) + + conv_out_ch = sum([c1.out_ch, c3.out_ch, c6.out_ch, c10.out_ch, c13.out_ch, c15.out_ch]) + #fc_in_ch = _make_divisible(conv_out_ch // 2, 8) + self.fc1 = nn.Linear(conv_out_ch, conv_out_ch ) + self.fc2 = nn.Linear(conv_out_ch, 4 ) + + for m in self.modules(): + if isinstance(m, nn.Conv2d): + nn.init.kaiming_normal_(m.weight, mode='fan_out') + if m.bias is not None: + nn.init.zeros_(m.bias) + elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)): + nn.init.ones_(m.weight) + nn.init.zeros_(m.bias) + elif isinstance(m, nn.Linear): + nn.init.normal_(m.weight, 0, 0.01) + nn.init.zeros_(m.bias) + + def forward(self, inp): + x = inp + + x = self.c0(x) + x = x1 = self.c1(x) + x = self.c2(x) + x = x3 = self.c3(x) + x = self.c4(x) + x = self.c5(x) + x = x6 = self.c6(x) + x = self.c7(x) + x = self.c8(x) + x = self.c9(x) + x = x10 = self.c10(x) + x = self.c11(x) + x = self.c12(x) + x = x13 = self.c13(x) + x = self.c14(x) + x = x15 = self.c15(x) + + x = torch.cat( [x1.mean((-2,-1)), x3.mean((-2,-1)), x6.mean((-2,-1)), x10.mean((-2,-1)), x13.mean((-2,-1)), x15.mean((-2,-1))], -1 ) + x = self.fc1(x) + x = self.fc2(x) + + scale_t, angle_t, tx_t, ty_t = torch.split(x, 1, -1) + + aff_t = torch.cat([torch.cos(angle_t)*scale_t, -torch.sin(angle_t)*scale_t, tx_t, + torch.sin(angle_t)*scale_t, torch.cos(angle_t)*scale_t, ty_t, + ], dim=-1).view(-1,2,3) + # from xlib.console import diacon + # diacon.Diacon.stop() + # import code + # code.interact(local=dict(globals(), **locals())) + + return aff_t + + + +# class CTSOT: +# def __init__(self, device_info : TorchDeviceInfo = None, +# state_dict : Union[dict, None] = None, +# training : bool = False): +# if device_info is None: +# device_info = get_cpu_device_info() +# self.device_info = device_info + +# self._net = net = CTSOTNet() + +# if state_dict is not None: +# net.load_state_dict(state_dict) + +# if training: +# net.train() +# else: +# net.eval() + +# self.set_device(device_info) + +# def set_device(self, device_info : TorchDeviceInfo = None): +# if device_info is None or device_info.is_cpu(): +# self._net.cpu() +# else: +# self._net.cuda(device_info.get_index()) + +# def get_state_dict(self): +# return self.net.state_dict() + +# def get_net(self) -> CTSOTNet: +# return self._net diff --git a/modelhub/torch/__init__.py b/modelhub/torch/__init__.py index 723103b..7ca3ac0 100644 --- a/modelhub/torch/__init__.py +++ b/modelhub/torch/__init__.py @@ -1,3 +1,3 @@ from .CenterFace.CenterFace import CenterFace_to_onnx from .S3FD.S3FD import S3FD -#from .FaceAligner.FaceAligner import FaceAlignerNet \ No newline at end of file +from .FaceAligner.FaceAligner import FaceAlignerNet \ No newline at end of file diff --git a/xlib/path/path.py b/xlib/path/path.py index 22090bc..951c78d 100644 --- a/xlib/path/path.py +++ b/xlib/path/path.py @@ -1,5 +1,6 @@ from os import scandir from pathlib import Path +from typing import List # image_extensions = [".jpg", ".jpeg", ".png", ".tif", ".tiff"] @@ -22,7 +23,7 @@ def scantree(path): yield entry -def get_files_paths(dir_path, extensions=None, subdirs=False): +def get_files_paths(dir_path, extensions=None, subdirs=False) -> List[Path]: """ returns array of Path() of files """