diff --git a/facelib/ResNet50.py b/facelib/ResNet50.py deleted file mode 100644 index 0060382..0000000 --- a/facelib/ResNet50.py +++ /dev/null @@ -1,221 +0,0 @@ -import torch -import torch.nn as nn - -from pathlib import Path - -def conv3x3(in_planes, out_planes, stride=1, groups=1, dilation=1): - """3x3 convolution with padding""" - return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, - padding=dilation, groups=groups, bias=False, dilation=dilation) - - -def conv1x1(in_planes, out_planes, stride=1): - """1x1 convolution""" - return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False) - - -class BasicBlock(nn.Module): - expansion = 1 - - def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1, - base_width=64, dilation=1, norm_layer=None): - super(BasicBlock, self).__init__() - if norm_layer is None: - norm_layer = nn.BatchNorm2d - if groups != 1 or base_width != 64: - raise ValueError('BasicBlock only supports groups=1 and base_width=64') - if dilation > 1: - raise NotImplementedError("Dilation > 1 not supported in BasicBlock") - # Both self.conv1 and self.downsample layers downsample the input when stride != 1 - self.conv1 = conv3x3(inplanes, planes, stride) - self.bn1 = norm_layer(planes) - self.relu = nn.ReLU(inplace=True) - self.conv2 = conv3x3(planes, planes) - self.bn2 = norm_layer(planes) - self.downsample = downsample - self.stride = stride - - def forward(self, x): - identity = x - - out = self.conv1(x) - out = self.bn1(out) - out = self.relu(out) - - out = self.conv2(out) - out = self.bn2(out) - - if self.downsample is not None: - identity = self.downsample(x) - - out += identity - out = self.relu(out) - - return out - - -class Bottleneck(nn.Module): - # Bottleneck in torchvision places the stride for downsampling at 3x3 convolution(self.conv2) - # while original implementation places the stride at the first 1x1 convolution(self.conv1) - # according to "Deep residual learning for image recognition"https://arxiv.org/abs/1512.03385. - # This variant is also known as ResNet V1.5 and improves accuracy according to - # https://ngc.nvidia.com/catalog/model-scripts/nvidia:resnet_50_v1_5_for_pytorch. - - expansion = 4 - - def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1, - base_width=64, dilation=1, norm_layer=None): - super(Bottleneck, self).__init__() - if norm_layer is None: - norm_layer = nn.BatchNorm2d - width = int(planes * (base_width / 64.)) * groups - # Both self.conv2 and self.downsample layers downsample the input when stride != 1 - self.conv1 = conv1x1(inplanes, width) - self.bn1 = norm_layer(width) - self.conv2 = conv3x3(width, width, stride, groups, dilation) - self.bn2 = norm_layer(width) - self.conv3 = conv1x1(width, planes * self.expansion) - self.bn3 = norm_layer(planes * self.expansion) - self.relu = nn.ReLU(inplace=True) - self.downsample = downsample - self.stride = stride - - def forward(self, x): - identity = x - - out = self.conv1(x) - out = self.bn1(out) - out = self.relu(out) - - out = self.conv2(out) - out = self.bn2(out) - out = self.relu(out) - - out = self.conv3(out) - out = self.bn3(out) - - if self.downsample is not None: - identity = self.downsample(x) - - out += identity - out = self.relu(out) - - return out - - -class ResNet(nn.Module): - - def __init__(self, block, layers, num_classes=1000, zero_init_residual=False, - groups=1, width_per_group=64, replace_stride_with_dilation=None, - norm_layer=None): - super(ResNet, self).__init__() - if norm_layer is None: - norm_layer = nn.BatchNorm2d - self._norm_layer = norm_layer - - self.inplanes = 64 - self.dilation = 1 - if replace_stride_with_dilation is None: - # each element in the tuple indicates if we should replace - # the 2x2 stride with a dilated convolution instead - replace_stride_with_dilation = [False, False, False] - if len(replace_stride_with_dilation) != 3: - raise ValueError("replace_stride_with_dilation should be None " - "or a 3-element tuple, got {}".format(replace_stride_with_dilation)) - self.groups = groups - self.base_width = width_per_group - self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=7, stride=2, padding=3, - bias=False) - self.bn1 = norm_layer(self.inplanes) - self.relu = nn.ReLU(inplace=True) - self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1) - self.layer1 = self._make_layer(block, 64, layers[0]) - self.layer2 = self._make_layer(block, 128, layers[1], stride=2, - dilate=replace_stride_with_dilation[0]) - self.layer3 = self._make_layer(block, 256, layers[2], stride=2, - dilate=replace_stride_with_dilation[1]) - self.layer4 = self._make_layer(block, 512, layers[3], stride=2, - dilate=replace_stride_with_dilation[2]) - self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) - self.fc = nn.Linear(512 * block.expansion, num_classes) - - for m in self.modules(): - if isinstance(m, nn.Conv2d): - nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu') - elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)): - nn.init.constant_(m.weight, 1) - nn.init.constant_(m.bias, 0) - - # Zero-initialize the last BN in each residual branch, - # so that the residual branch starts with zeros, and each residual block behaves like an identity. - # This improves the model by 0.2~0.3% according to https://arxiv.org/abs/1706.02677 - if zero_init_residual: - for m in self.modules(): - if isinstance(m, Bottleneck): - nn.init.constant_(m.bn3.weight, 0) - elif isinstance(m, BasicBlock): - nn.init.constant_(m.bn2.weight, 0) - - def _make_layer(self, block, planes, blocks, stride=1, dilate=False): - norm_layer = self._norm_layer - downsample = None - previous_dilation = self.dilation - if dilate: - self.dilation *= stride - stride = 1 - if stride != 1 or self.inplanes != planes * block.expansion: - downsample = nn.Sequential( - conv1x1(self.inplanes, planes * block.expansion, stride), - norm_layer(planes * block.expansion), - ) - - layers = [] - layers.append(block(self.inplanes, planes, stride, downsample, self.groups, - self.base_width, previous_dilation, norm_layer)) - self.inplanes = planes * block.expansion - for _ in range(1, blocks): - layers.append(block(self.inplanes, planes, groups=self.groups, - base_width=self.base_width, dilation=self.dilation, - norm_layer=norm_layer)) - - return nn.Sequential(*layers) - - def _forward_impl(self, x): - # See note [TorchScript super()] - x = self.conv1(x) - x = self.bn1(x) - x = self.relu(x) - x = self.maxpool(x) - - x = self.layer1(x) - x = self.layer2(x) - x = self.layer3(x) - x = self.layer4(x) - - x = self.avgpool(x) - x = torch.flatten(x, 1) - x = self.fc(x) - - return x - - def forward(self, x): - return self._forward_impl(x) - - -def _resnet(arch, block, layers, pretrained, **kwargs): - model = ResNet(block, layers, **kwargs) - if pretrained: - model_path = Path(__file__).parent / "resnet50-19c8e357.pth" - state_dict = torch.load(model_path) - model.load_state_dict(state_dict) - return model - - -def resnet50(pretrained=True, **kwargs): - r"""ResNet-50 model from - `"Deep Residual Learning for Image Recognition" `_ - Args: - pretrained (bool): If True, returns a model pre-trained on ImageNet - """ - return _resnet('resnet50', Bottleneck, [3, 4, 6, 3], pretrained, - **kwargs) \ No newline at end of file diff --git a/facelib/RetinaFaceExtractor.py b/facelib/RetinaFaceExtractor.py deleted file mode 100644 index 9f02fc8..0000000 --- a/facelib/RetinaFaceExtractor.py +++ /dev/null @@ -1,343 +0,0 @@ -import operator -from pathlib import Path - -import cv2 -import numpy as np - -import torch -import torch.nn as nn -import torch.nn.functional as F - -from facelib.nn_pt import nn as nn_pt - -import torchvision.models as models -import torchvision.models.detection.backbone_utils as backbone_utils -import torchvision.models._utils as _utils -from torchvision.transforms import ToTensor - -from facelib.net import FPN as FPN -from facelib.net import SSH as SSH - -from facelib.box_utils import decode -from facelib.prior_box import PriorBox - -from facelib.ResNet50 import resnet50 - -from facelib.config import cfg_re50 -"""Ported from https://github.com/biubug6/Pytorch_Retinaface/""" - -class RetinaFaceExtractor(object): - def __init__(self, place_model_on_cpu=False): - nn_pt.initialize() - - model_path = Path(__file__).parent / "RetinaFace-Resnet50.pth" - - if not model_path.exists(): - raise Exception("Unable to load RetinaFace-Resnet50.pth") - - class ClassHead(nn.Module): - def __init__(self,inchannels=512,num_anchors=3): - super(ClassHead,self).__init__() - self.num_anchors = num_anchors - self.conv1x1 = nn.Conv2d(inchannels,self.num_anchors*2,kernel_size=(1,1),stride=1,padding=0) - - def forward(self,x): - out = self.conv1x1(x) - out = out.permute(0,2,3,1).contiguous() - - return out.view(out.shape[0], -1, 2) - - - class BboxHead(nn.Module): - def __init__(self,inchannels=512,num_anchors=3): - super(BboxHead,self).__init__() - self.conv1x1 = nn.Conv2d(inchannels,num_anchors*4,kernel_size=(1,1),stride=1,padding=0) - - def forward(self,x): - out = self.conv1x1(x) - out = out.permute(0,2,3,1).contiguous() - - return out.view(out.shape[0], -1, 4) - - - class LandmarkHead(nn.Module): - def __init__(self,inchannels=512,num_anchors=3): - super(LandmarkHead,self).__init__() - self.conv1x1 = nn.Conv2d(inchannels,num_anchors*10,kernel_size=(1,1),stride=1,padding=0) - - def forward(self,x): - out = self.conv1x1(x) - out = out.permute(0,2,3,1).contiguous() - - return out.view(out.shape[0], -1, 10) - - class RetinaFace(nn.Module): - def __init__(self, cfg = cfg_re50): - super(RetinaFace,self).__init__() - backbone = resnet50(pretrained=cfg['pretrain']) - - self.body = _utils.IntermediateLayerGetter(backbone, cfg['return_layers']) - in_channels_stage2 = cfg['in_channel'] - in_channels_list = [in_channels_stage2 * 2, - in_channels_stage2 * 4, - in_channels_stage2 * 8,] - out_channels = cfg['out_channel'] - - self.fpn = FPN(in_channels_list,out_channels) - self.ssh1 = SSH(out_channels, out_channels) - self.ssh2 = SSH(out_channels, out_channels) - self.ssh3 = SSH(out_channels, out_channels) - - self.ClassHead = self._make_class_head(fpn_num=3, inchannels=cfg['out_channel']) - self.BboxHead = self._make_bbox_head(fpn_num=3, inchannels=cfg['out_channel']) - self.LandmarkHead = self._make_landmark_head(fpn_num=3, inchannels=cfg['out_channel']) - - def _make_class_head(self,fpn_num=3,inchannels=64,anchor_num=2): - classhead = nn.ModuleList() - for i in range(fpn_num): - classhead.append(ClassHead(inchannels,anchor_num)) - return classhead - - def _make_bbox_head(self,fpn_num=3,inchannels=64,anchor_num=2): - bboxhead = nn.ModuleList() - for i in range(fpn_num): - bboxhead.append(BboxHead(inchannels,anchor_num)) - return bboxhead - - def _make_landmark_head(self,fpn_num=3,inchannels=64,anchor_num=2): - landmarkhead = nn.ModuleList() - for i in range(fpn_num): - landmarkhead.append(LandmarkHead(inchannels,anchor_num)) - return landmarkhead - - - def forward(self,inputs): - out = self.body(inputs) - - # FPN - fpn = self.fpn(out) - - # SSH - feature1 = self.ssh1(fpn[0]) - feature2 = self.ssh2(fpn[1]) - feature3 = self.ssh3(fpn[2]) - features = [feature1, feature2, feature3] - - bbox_regressions = torch.cat([self.BboxHead[i](feature) for i, feature in enumerate(features)], dim=1) - classifications = torch.cat([self.ClassHead[i](feature) for i, feature in enumerate(features)],dim=1) - ldm_regressions = torch.cat([self.LandmarkHead[i](feature) for i, feature in enumerate(features)], dim=1) - - output = (bbox_regressions, F.softmax(classifications, dim=-1), ldm_regressions) - - return output - - def check_keys(model, pretrained_state_dict): - ckpt_keys = set(pretrained_state_dict.keys()) - model_keys = set(model.state_dict().keys()) - used_pretrained_keys = model_keys & ckpt_keys - unused_pretrained_keys = ckpt_keys - model_keys - missing_keys = model_keys - ckpt_keys - assert len(used_pretrained_keys) > 0, 'load NONE from pretrained checkpoint' - return True - - def remove_prefix(state_dict, prefix): - ''' Old style model is stored with all names of parameters sharing common prefix 'module.' ''' - f = lambda x: x.split(prefix, 1)[-1] if x.startswith(prefix) else x - return {f(key): value for key, value in state_dict.items()} - - def load_model(model, pretrained_path, load_to_cpu): - if load_to_cpu: - pretrained_dict = torch.load(pretrained_path, map_location=lambda storage, loc: storage) - else: - device = torch.cuda.current_device() - pretrained_dict = torch.load(pretrained_path, map_location=lambda storage, loc: storage.cuda(device)) - if "state_dict" in pretrained_dict.keys(): - pretrained_dict = remove_prefix(pretrained_dict['state_dict'], 'module.') - else: - pretrained_dict = remove_prefix(pretrained_dict, 'module.') - check_keys(model, pretrained_dict) - model.load_state_dict(pretrained_dict, strict=False) - return model - - try: - torch.set_grad_enabled(False) - self.model = RetinaFace(cfg=cfg_re50) - self.model = load_model(self.model, model_path, place_model_on_cpu) - self.model.eval() - - self.device = torch.device("cpu" if place_model_on_cpu else "cuda") - self.model = self.model.to(self.device) - - except: - self.model = None - print("Could not load RetinaFace") - - - - - - def __enter__(self): - return self - - def __exit__(self, exc_type=None, exc_value=None, traceback=None): - return False #pass exception between __enter__ and __exit__ to outter level - - def extract (self, input_image, is_bgr=True, is_remove_intersects=False): - - cfg = cfg_re50 - - if is_bgr: - input_image = input_image[:,:,::-1] - is_bgr = False - - (h, w, ch) = input_image.shape - - d = max(w, h) - scale_to = 640 if d >= 1280 else d / 2 - scale_to = max(64, scale_to) - - input_scale = d / scale_to - input_image = cv2.resize (input_image, ( int(w/input_scale), int(h/input_scale) ), interpolation=cv2.INTER_LINEAR) - - (h, w, ch) = input_image.shape - - with torch.no_grad(): - - input_image = ToTensor()(input_image) - input_image = input_image.to(self.device) - - loc, conf, landmarks = self.model( input_image[None,...] ) - - priorbox = PriorBox(cfg, image_size=(h, w)) - priors = priorbox.forward() - priors = priors.to(self.device) - prior_data = priors.data - - boxes = decode(loc.data.squeeze(0), prior_data, cfg['variance']) - boxes = np.float32(boxes.cpu().numpy()) - scores = np.float32(conf.squeeze(0).data.cpu().numpy()[:, 1]) - - inds = np.where(scores > 0.05)[0] - boxes = boxes[inds] - scores = scores[inds] - - dets = np.hstack((boxes, scores[:, np.newaxis])).astype(np.float32, copy=False) - - keep = self.refine_nms(dets, 0.3) - dets = dets[keep, :] - - dets = dets.tolist() - dets = [ x[:-1].astype(np.int) for x in dets if x[-1] >= 0.5 ] - - - - detected_faces = [] - - for ltrb in dets: - # l,t,r,b = [ x for x in ltrb] - l,t,r,b = [ x*input_scale for x in ltrb] - bt = b-t - if min(r-l,bt) < 40: #filtering faces < 40pix by any side - continue - b += bt*0.1 #enlarging bottom line a bit for 2DFAN-4, because default is not enough covering a chin - detected_faces.append ( [int(x) for x in (l,t,r,b) ] ) - - #sort by largest area first - detected_faces = [ [(l,t,r,b), (r-l)*(b-t) ] for (l,t,r,b) in detected_faces ] - detected_faces = sorted(detected_faces, key=operator.itemgetter(1), reverse=True ) - detected_faces = [ x[0] for x in detected_faces] - - if is_remove_intersects: - for i in range( len(detected_faces)-1, 0, -1): - l1,t1,r1,b1 = detected_faces[i] - l0,t0,r0,b0 = detected_faces[i-1] - - dx = min(r0, r1) - max(l0, l1) - dy = min(b0, b1) - max(t0, t1) - if (dx>=0) and (dy>=0): - detected_faces.pop(i) - - return detected_faces - - - - def refine(self, olist): - bboxlist = [] - for i, ((ocls,), (oreg,)) in enumerate ( ): - stride = 2**(i + 2) # 4,8,16,32,64,128 - s_d2 = stride / 2 - s_m4 = stride * 4 - - for hindex, windex in zip(*np.where(ocls[...,1] > 0.05)): - score = ocls[hindex, windex, 1] - loc = oreg[hindex, windex, :] - priors = np.array([windex * stride + s_d2, hindex * stride + s_d2, s_m4, s_m4]) - priors_2p = priors[2:] - box = np.concatenate((priors[:2] + loc[:2] * 0.1 * priors_2p, - priors_2p * np.exp(loc[2:] * 0.2)) ) - box[:2] -= box[2:] / 2 - box[2:] += box[:2] - - bboxlist.append([*box, score]) - - bboxlist = np.array(bboxlist) - if len(bboxlist) == 0: - bboxlist = np.zeros((1, 5)) - - bboxlist = bboxlist[self.refine_nms(bboxlist, 0.3), :] - bboxlist = [ x[:-1].astype(np.int) for x in bboxlist if x[-1] >= 0.5] - return bboxlist - - def refine_nms2(self, dets, thresh): - keep = list() - if len(dets) == 0: - return keep - - x_1, y_1, x_2, y_2, scores = dets[:, 0], dets[:, 1], dets[:, 2], dets[:, 3], dets[:, 4] - areas = (x_2 - x_1 + 1) * (y_2 - y_1 + 1) - order = scores.argsort()[::-1] - - keep = [] - while order.size > 0: - i = order[0] - keep.append(i) - xx_1, yy_1 = np.maximum(x_1[i], x_1[order[1:]]), np.maximum(y_1[i], y_1[order[1:]]) - xx_2, yy_2 = np.minimum(x_2[i], x_2[order[1:]]), np.minimum(y_2[i], y_2[order[1:]]) - - width, height = np.maximum(0.0, xx_2 - xx_1 + 1), np.maximum(0.0, yy_2 - yy_1 + 1) - ovr = width * height / (areas[i] + areas[order[1:]] - width * height) - - inds = np.where(ovr <= thresh)[0] - order = order[inds + 1] - return keep - - - def refine_nms(self, dets, thresh): - """Pure Python NMS baseline.""" - x1 = dets[:, 0] - y1 = dets[:, 1] - x2 = dets[:, 2] - y2 = dets[:, 3] - scores = dets[:, 4] - - areas = (x2 - x1 + 1) * (y2 - y1 + 1) - order = scores.argsort()[::-1] - - keep = [] - while order.size > 0: - i = order[0] - keep.append(i) - xx1 = np.maximum(x1[i], x1[order[1:]]) - yy1 = np.maximum(y1[i], y1[order[1:]]) - xx2 = np.minimum(x2[i], x2[order[1:]]) - yy2 = np.minimum(y2[i], y2[order[1:]]) - - w = np.maximum(0.0, xx2 - xx1 + 1) - h = np.maximum(0.0, yy2 - yy1 + 1) - inter = w * h - ovr = inter / (areas[i] + areas[order[1:]] - inter) - - inds = np.where(ovr <= thresh)[0] - order = order[inds + 1] - - return keep diff --git a/facelib/box_utils.py b/facelib/box_utils.py deleted file mode 100644 index daea71b..0000000 --- a/facelib/box_utils.py +++ /dev/null @@ -1,328 +0,0 @@ -import torch -import numpy as np - - -def point_form(boxes): - """ Convert prior_boxes to (xmin, ymin, xmax, ymax) - representation for comparison to point form ground truth data. - Args: - boxes: (tensor) center-size default boxes from priorbox layers. - Return: - boxes: (tensor) Converted xmin, ymin, xmax, ymax form of boxes. - """ - return torch.cat((boxes[:, :2] - boxes[:, 2:]/2, # xmin, ymin - boxes[:, :2] + boxes[:, 2:]/2), 1) # xmax, ymax - - -def center_size(boxes): - """ Convert prior_boxes to (cx, cy, w, h) - representation for comparison to center-size form ground truth data. - Args: - boxes: (tensor) point_form boxes - Return: - boxes: (tensor) Converted xmin, ymin, xmax, ymax form of boxes. - """ - return torch.cat((boxes[:, 2:] + boxes[:, :2])/2, # cx, cy - boxes[:, 2:] - boxes[:, :2], 1) # w, h - - -def intersect(box_a, box_b): - """ We resize both tensors to [A,B,2] without new malloc: - [A,2] -> [A,1,2] -> [A,B,2] - [B,2] -> [1,B,2] -> [A,B,2] - Then we compute the area of intersect between box_a and box_b. - Args: - box_a: (tensor) bounding boxes, Shape: [A,4]. - box_b: (tensor) bounding boxes, Shape: [B,4]. - Return: - (tensor) intersection area, Shape: [A,B]. - """ - A = box_a.size(0) - B = box_b.size(0) - max_xy = torch.min(box_a[:, 2:].unsqueeze(1).expand(A, B, 2), - box_b[:, 2:].unsqueeze(0).expand(A, B, 2)) - min_xy = torch.max(box_a[:, :2].unsqueeze(1).expand(A, B, 2), - box_b[:, :2].unsqueeze(0).expand(A, B, 2)) - inter = torch.clamp((max_xy - min_xy), min=0) - return inter[:, :, 0] * inter[:, :, 1] - - -def jaccard(box_a, box_b): - """Compute the jaccard overlap of two sets of boxes. The jaccard overlap - is simply the intersection over union of two boxes. Here we operate on - ground truth boxes and default boxes. - E.g.: - A ∩ B / A ∪ B = A ∩ B / (area(A) + area(B) - A ∩ B) - Args: - box_a: (tensor) Ground truth bounding boxes, Shape: [num_objects,4] - box_b: (tensor) Prior boxes from priorbox layers, Shape: [num_priors,4] - Return: - jaccard overlap: (tensor) Shape: [box_a.size(0), box_b.size(0)] - """ - inter = intersect(box_a, box_b) - area_a = ((box_a[:, 2]-box_a[:, 0]) * - (box_a[:, 3]-box_a[:, 1])).unsqueeze(1).expand_as(inter) # [A,B] - area_b = ((box_b[:, 2]-box_b[:, 0]) * - (box_b[:, 3]-box_b[:, 1])).unsqueeze(0).expand_as(inter) # [A,B] - union = area_a + area_b - inter - return inter / union # [A,B] - - -def matrix_iou(a, b): - """ - return iou of a and b, numpy version for data augenmentation - """ - lt = np.maximum(a[:, np.newaxis, :2], b[:, :2]) - rb = np.minimum(a[:, np.newaxis, 2:], b[:, 2:]) - - area_i = np.prod(rb - lt, axis=2) * (lt < rb).all(axis=2) - area_a = np.prod(a[:, 2:] - a[:, :2], axis=1) - area_b = np.prod(b[:, 2:] - b[:, :2], axis=1) - return area_i / (area_a[:, np.newaxis] + area_b - area_i) - - -def matrix_iof(a, b): - """ - return iof of a and b, numpy version for data augenmentation - """ - lt = np.maximum(a[:, np.newaxis, :2], b[:, :2]) - rb = np.minimum(a[:, np.newaxis, 2:], b[:, 2:]) - - area_i = np.prod(rb - lt, axis=2) * (lt < rb).all(axis=2) - area_a = np.prod(a[:, 2:] - a[:, :2], axis=1) - return area_i / np.maximum(area_a[:, np.newaxis], 1) - - -def match(threshold, truths, priors, variances, labels, landms, loc_t, conf_t, landm_t, idx): - """Match each prior box with the ground truth box of the highest jaccard - overlap, encode the bounding boxes, then return the matched indices - corresponding to both confidence and location preds. - Args: - threshold: (float) The overlap threshold used when mathing boxes. - truths: (tensor) Ground truth boxes, Shape: [num_obj, 4]. - priors: (tensor) Prior boxes from priorbox layers, Shape: [n_priors,4]. - variances: (tensor) Variances corresponding to each prior coord, - Shape: [num_priors, 4]. - labels: (tensor) All the class labels for the image, Shape: [num_obj]. - landms: (tensor) Ground truth landms, Shape [num_obj, 10]. - loc_t: (tensor) Tensor to be filled w/ endcoded location targets. - conf_t: (tensor) Tensor to be filled w/ matched indices for conf preds. - landm_t: (tensor) Tensor to be filled w/ endcoded landm targets. - idx: (int) current batch index - Return: - The matched indices corresponding to 1)location 2)confidence 3)landm preds. - """ - # jaccard index - overlaps = jaccard( - truths, - point_form(priors) - ) - # (Bipartite Matching) - # [1,num_objects] best prior for each ground truth - best_prior_overlap, best_prior_idx = overlaps.max(1, keepdim=True) - - # ignore hard gt - valid_gt_idx = best_prior_overlap[:, 0] >= 0.2 - best_prior_idx_filter = best_prior_idx[valid_gt_idx, :] - if best_prior_idx_filter.shape[0] <= 0: - loc_t[idx] = 0 - conf_t[idx] = 0 - return - - # [1,num_priors] best ground truth for each prior - best_truth_overlap, best_truth_idx = overlaps.max(0, keepdim=True) - best_truth_idx.squeeze_(0) - best_truth_overlap.squeeze_(0) - best_prior_idx.squeeze_(1) - best_prior_idx_filter.squeeze_(1) - best_prior_overlap.squeeze_(1) - best_truth_overlap.index_fill_(0, best_prior_idx_filter, 2) # ensure best prior - # TODO refactor: index best_prior_idx with long tensor - # ensure every gt matches with its prior of max overlap - for j in range(best_prior_idx.size(0)): # 判别此anchor是预测哪一个boxes - best_truth_idx[best_prior_idx[j]] = j - matches = truths[best_truth_idx] # Shape: [num_priors,4] 此处为每一个anchor对应的bbox取出来 - conf = labels[best_truth_idx] # Shape: [num_priors] 此处为每一个anchor对应的label取出来 - conf[best_truth_overlap < threshold] = 0 # label as background overlap<0.35的全部作为负样本 - loc = encode(matches, priors, variances) - - matches_landm = landms[best_truth_idx] - landm = encode_landm(matches_landm, priors, variances) - loc_t[idx] = loc # [num_priors,4] encoded offsets to learn - conf_t[idx] = conf # [num_priors] top class label for each prior - landm_t[idx] = landm - - -def encode(matched, priors, variances): - """Encode the variances from the priorbox layers into the ground truth boxes - we have matched (based on jaccard overlap) with the prior boxes. - Args: - matched: (tensor) Coords of ground truth for each prior in point-form - Shape: [num_priors, 4]. - priors: (tensor) Prior boxes in center-offset form - Shape: [num_priors,4]. - variances: (list[float]) Variances of priorboxes - Return: - encoded boxes (tensor), Shape: [num_priors, 4] - """ - - # dist b/t match center and prior's center - g_cxcy = (matched[:, :2] + matched[:, 2:])/2 - priors[:, :2] - # encode variance - g_cxcy /= (variances[0] * priors[:, 2:]) - # match wh / prior wh - g_wh = (matched[:, 2:] - matched[:, :2]) / priors[:, 2:] - g_wh = torch.log(g_wh) / variances[1] - # return target for smooth_l1_loss - return torch.cat([g_cxcy, g_wh], 1) # [num_priors,4] - -def encode_landm(matched, priors, variances): - """Encode the variances from the priorbox layers into the ground truth boxes - we have matched (based on jaccard overlap) with the prior boxes. - Args: - matched: (tensor) Coords of ground truth for each prior in point-form - Shape: [num_priors, 10]. - priors: (tensor) Prior boxes in center-offset form - Shape: [num_priors,4]. - variances: (list[float]) Variances of priorboxes - Return: - encoded landm (tensor), Shape: [num_priors, 10] - """ - - # dist b/t match center and prior's center - matched = torch.reshape(matched, (matched.size(0), 5, 2)) - priors_cx = priors[:, 0].unsqueeze(1).expand(matched.size(0), 5).unsqueeze(2) - priors_cy = priors[:, 1].unsqueeze(1).expand(matched.size(0), 5).unsqueeze(2) - priors_w = priors[:, 2].unsqueeze(1).expand(matched.size(0), 5).unsqueeze(2) - priors_h = priors[:, 3].unsqueeze(1).expand(matched.size(0), 5).unsqueeze(2) - priors = torch.cat([priors_cx, priors_cy, priors_w, priors_h], dim=2) - g_cxcy = matched[:, :, :2] - priors[:, :, :2] - # encode variance - g_cxcy /= (variances[0] * priors[:, :, 2:]) - # g_cxcy /= priors[:, :, 2:] - g_cxcy = g_cxcy.reshape(g_cxcy.size(0), -1) - # return target for smooth_l1_loss - return g_cxcy - - -# Adapted from https://github.com/Hakuyume/chainer-ssd -def decode(loc, priors, variances): - """Decode locations from predictions using priors to undo - the encoding we did for offset regression at train time. - Args: - loc (tensor): location predictions for loc layers, - Shape: [num_priors,4] - priors (tensor): Prior boxes in center-offset form. - Shape: [num_priors,4]. - variances: (list[float]) Variances of priorboxes - Return: - decoded bounding box predictions - """ - - boxes = torch.cat(( - priors[:, :2] + loc[:, :2] * variances[0] * priors[:, 2:], - priors[:, 2:] * torch.exp(loc[:, 2:] * variances[1])), 1) - boxes[:, :2] -= boxes[:, 2:] / 2 - boxes[:, 2:] += boxes[:, :2] - return boxes - -def decode_landm(pre, priors, variances): - """Decode landm from predictions using priors to undo - the encoding we did for offset regression at train time. - Args: - pre (tensor): landm predictions for loc layers, - Shape: [num_priors,10] - priors (tensor): Prior boxes in center-offset form. - Shape: [num_priors,4]. - variances: (list[float]) Variances of priorboxes - Return: - decoded landm predictions - """ - landms = torch.cat((priors[:, :2] + pre[:, :2] * variances[0] * priors[:, 2:], - priors[:, :2] + pre[:, 2:4] * variances[0] * priors[:, 2:], - priors[:, :2] + pre[:, 4:6] * variances[0] * priors[:, 2:], - priors[:, :2] + pre[:, 6:8] * variances[0] * priors[:, 2:], - priors[:, :2] + pre[:, 8:10] * variances[0] * priors[:, 2:], - ), dim=1) - return landms - - -def log_sum_exp(x): - """Utility function for computing log_sum_exp while determining - This will be used to determine unaveraged confidence loss across - all examples in a batch. - Args: - x (Variable(tensor)): conf_preds from conf layers - """ - x_max = x.data.max() - return torch.log(torch.sum(torch.exp(x-x_max), 1, keepdim=True)) + x_max - - -# Original author: Francisco Massa: -# https://github.com/fmassa/object-detection.torch -# Ported to PyTorch by Max deGroot (02/01/2017) -def nms(boxes, scores, overlap=0.5, top_k=200): - """Apply non-maximum suppression at test time to avoid detecting too many - overlapping bounding boxes for a given object. - Args: - boxes: (tensor) The location preds for the img, Shape: [num_priors,4]. - scores: (tensor) The class predscores for the img, Shape:[num_priors]. - overlap: (float) The overlap thresh for suppressing unnecessary boxes. - top_k: (int) The Maximum number of box preds to consider. - Return: - The indices of the kept boxes with respect to num_priors. - """ - - keep = torch.Tensor(scores.size(0)).fill_(0).long() - if boxes.numel() == 0: - return keep - x1 = boxes[:, 0] - y1 = boxes[:, 1] - x2 = boxes[:, 2] - y2 = boxes[:, 3] - area = torch.mul(x2 - x1, y2 - y1) - v, idx = scores.sort(0) # sort in ascending order - # I = I[v >= 0.01] - idx = idx[-top_k:] # indices of the top-k largest vals - xx1 = boxes.new() - yy1 = boxes.new() - xx2 = boxes.new() - yy2 = boxes.new() - w = boxes.new() - h = boxes.new() - - # keep = torch.Tensor() - count = 0 - while idx.numel() > 0: - i = idx[-1] # index of current largest val - # keep.append(i) - keep[count] = i - count += 1 - if idx.size(0) == 1: - break - idx = idx[:-1] # remove kept element from view - # load bboxes of next highest vals - torch.index_select(x1, 0, idx, out=xx1) - torch.index_select(y1, 0, idx, out=yy1) - torch.index_select(x2, 0, idx, out=xx2) - torch.index_select(y2, 0, idx, out=yy2) - # store element-wise max with next highest score - xx1 = torch.clamp(xx1, min=x1[i]) - yy1 = torch.clamp(yy1, min=y1[i]) - xx2 = torch.clamp(xx2, max=x2[i]) - yy2 = torch.clamp(yy2, max=y2[i]) - w.resize_as_(xx2) - h.resize_as_(yy2) - w = xx2 - xx1 - h = yy2 - yy1 - # check sizes of xx1 and xx2.. after each iteration - w = torch.clamp(w, min=0.0) - h = torch.clamp(h, min=0.0) - inter = w*h - # IoU = i / (area(a) + area(b) - i) - rem_areas = torch.index_select(area, 0, idx) # load remaining areas) - union = (rem_areas - inter) + area[i] - IoU = inter/union # store result in iou - # keep only elements with an IoU <= overlap - idx = idx[IoU.le(overlap)] - return keep, count \ No newline at end of file diff --git a/facelib/coord_conv.py b/facelib/coord_conv.py deleted file mode 100644 index f5c9400..0000000 --- a/facelib/coord_conv.py +++ /dev/null @@ -1,153 +0,0 @@ -import torch -import torch.nn as nn - - -class AddCoordsTh(nn.Module): - def __init__(self, x_dim=64, y_dim=64, with_r=False, with_boundary=False): - super(AddCoordsTh, self).__init__() - self.x_dim = x_dim - self.y_dim = y_dim - self.with_r = with_r - self.with_boundary = with_boundary - - def forward(self, input_tensor, heatmap=None): - """ - input_tensor: (batch, c, x_dim, y_dim) - """ - batch_size_tensor = input_tensor.shape[0] - - xx_ones = torch.ones([1, self.y_dim], dtype=torch.int32)#.cuda() - xx_ones = xx_ones.unsqueeze(-1) - - xx_range = torch.arange(self.x_dim, dtype=torch.int32).unsqueeze(0)#.cuda() - xx_range = xx_range.unsqueeze(1) - - xx_channel = torch.matmul(xx_ones.float(), xx_range.float()) - xx_channel = xx_channel.unsqueeze(-1) - - - yy_ones = torch.ones([1, self.x_dim], dtype=torch.int32)#.cuda() - yy_ones = yy_ones.unsqueeze(1) - - yy_range = torch.arange(self.y_dim, dtype=torch.int32).unsqueeze(0)#.cuda() - yy_range = yy_range.unsqueeze(-1) - - yy_channel = torch.matmul(yy_range.float(), yy_ones.float()) - yy_channel = yy_channel.unsqueeze(-1) - - xx_channel = xx_channel.permute(0, 3, 2, 1) - yy_channel = yy_channel.permute(0, 3, 2, 1) - - xx_channel = xx_channel / (self.x_dim - 1) - yy_channel = yy_channel / (self.y_dim - 1) - - xx_channel = xx_channel * 2 - 1 - yy_channel = yy_channel * 2 - 1 - - xx_channel = xx_channel.repeat(batch_size_tensor, 1, 1, 1) - yy_channel = yy_channel.repeat(batch_size_tensor, 1, 1, 1) - - if self.with_boundary and type(heatmap) != type(None): - boundary_channel = torch.clamp(heatmap[:, -1:, :, :], - 0.0, 1.0) - - zero_tensor = torch.zeros_like(xx_channel) - xx_boundary_channel = torch.where(boundary_channel>0.05, - xx_channel, zero_tensor) - yy_boundary_channel = torch.where(boundary_channel>0.05, - yy_channel, zero_tensor) - if self.with_boundary and type(heatmap) != type(None): - xx_boundary_channel = xx_boundary_channel#.cuda() - yy_boundary_channel = yy_boundary_channel#.cuda() - ret = torch.cat([input_tensor, xx_channel, yy_channel], dim=1) - - - if self.with_r: - rr = torch.sqrt(torch.pow(xx_channel, 2) + torch.pow(yy_channel, 2)) - rr = rr / torch.max(rr) - ret = torch.cat([ret, rr], dim=1) - - if self.with_boundary and type(heatmap) != type(None): - ret = torch.cat([ret, xx_boundary_channel, - yy_boundary_channel], dim=1) - return ret - - -class CoordConvTh(nn.Module): - """CoordConv layer as in the paper.""" - def __init__(self, x_dim, y_dim, with_r, with_boundary, - in_channels, first_one=False, *args, **kwargs): - super(CoordConvTh, self).__init__() - self.addcoords = AddCoordsTh(x_dim=x_dim, y_dim=y_dim, with_r=with_r, - with_boundary=with_boundary) - in_channels += 2 - if with_r: - in_channels += 1 - if with_boundary and not first_one: - in_channels += 2 - self.conv = nn.Conv2d(in_channels=in_channels, *args, **kwargs) - - def forward(self, input_tensor, heatmap=None): - ret = self.addcoords(input_tensor, heatmap) - last_channel = ret[:, -2:, :, :] - ret = self.conv(ret) - return ret, last_channel - - -''' -An alternative implementation for PyTorch with auto-infering the x-y dimensions. -''' -class AddCoords(nn.Module): - - def __init__(self, with_r=False): - super().__init__() - self.with_r = with_r - - def forward(self, input_tensor): - """ - Args: - input_tensor: shape(batch, channel, x_dim, y_dim) - """ - batch_size, _, x_dim, y_dim = input_tensor.size() - - xx_channel = torch.arange(x_dim).repeat(1, y_dim, 1) - yy_channel = torch.arange(y_dim).repeat(1, x_dim, 1).transpose(1, 2) - - xx_channel = xx_channel / (x_dim - 1) - yy_channel = yy_channel / (y_dim - 1) - - xx_channel = xx_channel * 2 - 1 - yy_channel = yy_channel * 2 - 1 - - xx_channel = xx_channel.repeat(batch_size, 1, 1, 1).transpose(2, 3) - yy_channel = yy_channel.repeat(batch_size, 1, 1, 1).transpose(2, 3) - - if input_tensor.is_cuda: - xx_channel = xx_channel#.cuda() - yy_channel = yy_channel#.cuda() - - ret = torch.cat([ - input_tensor, - xx_channel.type_as(input_tensor), - yy_channel.type_as(input_tensor)], dim=1) - - if self.with_r: - rr = torch.sqrt(torch.pow(xx_channel - 0.5, 2) + torch.pow(yy_channel - 0.5, 2)) - if input_tensor.is_cuda: - rr = rr#.cuda() - ret = torch.cat([ret, rr], dim=1) - - return ret - - -class CoordConv(nn.Module): - - def __init__(self, in_channels, out_channels, with_r=False, **kwargs): - super().__init__() - self.addcoords = AddCoords(with_r=with_r) - self.conv = nn.Conv2d(in_channels + 2, out_channels, **kwargs) - - def forward(self, x): - ret = self.addcoords(x) - ret = self.conv(ret) - return ret \ No newline at end of file diff --git a/facelib/prior_box.py b/facelib/prior_box.py deleted file mode 100644 index e20e443..0000000 --- a/facelib/prior_box.py +++ /dev/null @@ -1,34 +0,0 @@ -import torch -from itertools import product as product -import numpy as np -from math import ceil - - -class PriorBox(object): - def __init__(self, cfg, image_size=None, phase='test'): - super(PriorBox, self).__init__() - self.min_sizes = cfg['min_sizes'] - self.steps = cfg['steps'] - self.clip = cfg['clip'] - self.image_size = image_size - self.feature_maps = [[ceil(self.image_size[0]/step), ceil(self.image_size[1]/step)] for step in self.steps] - self.name = "s" - - def forward(self): - anchors = [] - for k, f in enumerate(self.feature_maps): - min_sizes = self.min_sizes[k] - for i, j in product(range(f[0]), range(f[1])): - for min_size in min_sizes: - s_kx = min_size / self.image_size[1] - s_ky = min_size / self.image_size[0] - dense_cx = [x * self.steps[k] / self.image_size[1] for x in [j + 0.5]] - dense_cy = [y * self.steps[k] / self.image_size[0] for y in [i + 0.5]] - for cy, cx in product(dense_cy, dense_cx): - anchors += [cx, cy, s_kx, s_ky] - - # back to torch land - output = torch.Tensor(anchors).view(-1, 4) - if self.clip: - output.clamp_(max=1, min=0) - return output \ No newline at end of file diff --git a/main.py b/main.py index 7686a6a..36e79b1 100644 --- a/main.py +++ b/main.py @@ -43,7 +43,7 @@ if __name__ == "__main__": ) p = subparsers.add_parser( "extract", help="Extract the faces from a pictures.") - p.add_argument('--detector', dest="detector", choices=['retinaface', 'manual'], default=None, help="Type of detector.") + p.add_argument('--detector', dest="detector", choices=['s3fd','manual'], default=None, help="Type of detector.") p.add_argument('--input-dir', required=True, action=fixPathAction, dest="input_dir", help="Input directory. A directory containing the files you wish to process.") p.add_argument('--output-dir', required=True, action=fixPathAction, dest="output_dir", help="Output directory. This is where the extracted files will be stored.") p.add_argument('--output-debug', action="store_true", dest="output_debug", default=None, help="Writes debug images to _debug\ directory.") diff --git a/mainscripts/Extractor.py b/mainscripts/Extractor.py index cbe38df..908e723 100644 --- a/mainscripts/Extractor.py +++ b/mainscripts/Extractor.py @@ -1,4 +1,4 @@ -import traceback +import traceback import math import multiprocessing import operator @@ -15,9 +15,9 @@ import facelib from core import imagelib from core import mathlib from facelib import FaceType, LandmarksProcessor -from facelib.nn_pt import nn from core.interact import interact as io from core.joblib import Subprocessor +from core.leras import nn from core import pathex from core.cv2ex import * from DFLIMG import * @@ -68,11 +68,12 @@ class ExtractSubprocessor(Subprocessor): self.log_info (f"Running on {client_dict['device_name'] }") if self.type == 'all' or self.type == 'rects-s3fd' or 'landmarks' in self.type: - self.rects_extractor = facelib.RetinaFaceExtractor(place_model_on_cpu=place_model_on_cpu) + self.rects_extractor = facelib.S3FDExtractor(place_model_on_cpu=place_model_on_cpu) if self.type == 'all' or 'landmarks' in self.type: # for head type, extract "3D landmarks" - self.landmarks_extractor = facelib.FANExtractor(place_model_on_cpu=place_model_on_cpu) + self.landmarks_extractor = facelib.FANExtractor(landmarks_3D=self.face_type >= FaceType.HEAD, + place_model_on_cpu=place_model_on_cpu) self.cached_image = (None, None) @@ -715,9 +716,9 @@ def main(detector=None, if detector is None: io.log_info ("Choose detector type.") - io.log_info ("[0] RetinaFace") + io.log_info ("[0] S3FD") io.log_info ("[1] manual") - detector = {0:'retinaface', 1:'manual'}[ io.input_int("", 0, [0,1]) ] + detector = {0:'s3fd', 1:'manual'}[ io.input_int("", 0, [0,1]) ] device_config = nn.DeviceConfig.GPUIndexes( force_gpu_idxs or nn.ask_choose_device_idxs(choose_only_one=detector=='manual', suggest_all_gpu=True) ) \ if not cpu_only else nn.DeviceConfig.CPU() @@ -794,4 +795,4 @@ def main(detector=None, io.log_info ('-------------------------') io.log_info ('Images found: %d' % (images_found) ) io.log_info ('Faces detected: %d' % (faces_detected) ) - io.log_info ('-------------------------') \ No newline at end of file + io.log_info ('-------------------------')