From fbe70767b260570c37b3fb490d10171ede422921 Mon Sep 17 00:00:00 2001 From: Nicholas Scott Date: Tue, 2 Jun 2020 02:43:46 -0500 Subject: [PATCH] upload --- facelib/ResNet50.py | 221 +++++++++++++++++++++ facelib/RetinaFaceExtractor.py | 343 +++++++++++++++++++++++++++++++++ facelib/box_utils.py | 328 +++++++++++++++++++++++++++++++ facelib/coord_conv.py | 153 +++++++++++++++ facelib/prior_box.py | 34 ++++ 5 files changed, 1079 insertions(+) create mode 100644 facelib/ResNet50.py create mode 100644 facelib/RetinaFaceExtractor.py create mode 100644 facelib/box_utils.py create mode 100644 facelib/coord_conv.py create mode 100644 facelib/prior_box.py diff --git a/facelib/ResNet50.py b/facelib/ResNet50.py new file mode 100644 index 0000000..0060382 --- /dev/null +++ b/facelib/ResNet50.py @@ -0,0 +1,221 @@ +import torch +import torch.nn as nn + +from pathlib import Path + +def conv3x3(in_planes, out_planes, stride=1, groups=1, dilation=1): + """3x3 convolution with padding""" + return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, + padding=dilation, groups=groups, bias=False, dilation=dilation) + + +def conv1x1(in_planes, out_planes, stride=1): + """1x1 convolution""" + return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False) + + +class BasicBlock(nn.Module): + expansion = 1 + + def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1, + base_width=64, dilation=1, norm_layer=None): + super(BasicBlock, self).__init__() + if norm_layer is None: + norm_layer = nn.BatchNorm2d + if groups != 1 or base_width != 64: + raise ValueError('BasicBlock only supports groups=1 and base_width=64') + if dilation > 1: + raise NotImplementedError("Dilation > 1 not supported in BasicBlock") + # Both self.conv1 and self.downsample layers downsample the input when stride != 1 + self.conv1 = conv3x3(inplanes, planes, stride) + self.bn1 = norm_layer(planes) + self.relu = nn.ReLU(inplace=True) + self.conv2 = conv3x3(planes, planes) + self.bn2 = norm_layer(planes) + self.downsample = downsample + self.stride = stride + + def forward(self, x): + identity = x + + out = self.conv1(x) + out = self.bn1(out) + out = self.relu(out) + + out = self.conv2(out) + out = self.bn2(out) + + if self.downsample is not None: + identity = self.downsample(x) + + out += identity + out = self.relu(out) + + return out + + +class Bottleneck(nn.Module): + # Bottleneck in torchvision places the stride for downsampling at 3x3 convolution(self.conv2) + # while original implementation places the stride at the first 1x1 convolution(self.conv1) + # according to "Deep residual learning for image recognition"https://arxiv.org/abs/1512.03385. + # This variant is also known as ResNet V1.5 and improves accuracy according to + # https://ngc.nvidia.com/catalog/model-scripts/nvidia:resnet_50_v1_5_for_pytorch. + + expansion = 4 + + def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1, + base_width=64, dilation=1, norm_layer=None): + super(Bottleneck, self).__init__() + if norm_layer is None: + norm_layer = nn.BatchNorm2d + width = int(planes * (base_width / 64.)) * groups + # Both self.conv2 and self.downsample layers downsample the input when stride != 1 + self.conv1 = conv1x1(inplanes, width) + self.bn1 = norm_layer(width) + self.conv2 = conv3x3(width, width, stride, groups, dilation) + self.bn2 = norm_layer(width) + self.conv3 = conv1x1(width, planes * self.expansion) + self.bn3 = norm_layer(planes * self.expansion) + self.relu = nn.ReLU(inplace=True) + self.downsample = downsample + self.stride = stride + + def forward(self, x): + identity = x + + out = self.conv1(x) + out = self.bn1(out) + out = self.relu(out) + + out = self.conv2(out) + out = self.bn2(out) + out = self.relu(out) + + out = self.conv3(out) + out = self.bn3(out) + + if self.downsample is not None: + identity = self.downsample(x) + + out += identity + out = self.relu(out) + + return out + + +class ResNet(nn.Module): + + def __init__(self, block, layers, num_classes=1000, zero_init_residual=False, + groups=1, width_per_group=64, replace_stride_with_dilation=None, + norm_layer=None): + super(ResNet, self).__init__() + if norm_layer is None: + norm_layer = nn.BatchNorm2d + self._norm_layer = norm_layer + + self.inplanes = 64 + self.dilation = 1 + if replace_stride_with_dilation is None: + # each element in the tuple indicates if we should replace + # the 2x2 stride with a dilated convolution instead + replace_stride_with_dilation = [False, False, False] + if len(replace_stride_with_dilation) != 3: + raise ValueError("replace_stride_with_dilation should be None " + "or a 3-element tuple, got {}".format(replace_stride_with_dilation)) + self.groups = groups + self.base_width = width_per_group + self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=7, stride=2, padding=3, + bias=False) + self.bn1 = norm_layer(self.inplanes) + self.relu = nn.ReLU(inplace=True) + self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1) + self.layer1 = self._make_layer(block, 64, layers[0]) + self.layer2 = self._make_layer(block, 128, layers[1], stride=2, + dilate=replace_stride_with_dilation[0]) + self.layer3 = self._make_layer(block, 256, layers[2], stride=2, + dilate=replace_stride_with_dilation[1]) + self.layer4 = self._make_layer(block, 512, layers[3], stride=2, + dilate=replace_stride_with_dilation[2]) + self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) + self.fc = nn.Linear(512 * block.expansion, num_classes) + + for m in self.modules(): + if isinstance(m, nn.Conv2d): + nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu') + elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)): + nn.init.constant_(m.weight, 1) + nn.init.constant_(m.bias, 0) + + # Zero-initialize the last BN in each residual branch, + # so that the residual branch starts with zeros, and each residual block behaves like an identity. + # This improves the model by 0.2~0.3% according to https://arxiv.org/abs/1706.02677 + if zero_init_residual: + for m in self.modules(): + if isinstance(m, Bottleneck): + nn.init.constant_(m.bn3.weight, 0) + elif isinstance(m, BasicBlock): + nn.init.constant_(m.bn2.weight, 0) + + def _make_layer(self, block, planes, blocks, stride=1, dilate=False): + norm_layer = self._norm_layer + downsample = None + previous_dilation = self.dilation + if dilate: + self.dilation *= stride + stride = 1 + if stride != 1 or self.inplanes != planes * block.expansion: + downsample = nn.Sequential( + conv1x1(self.inplanes, planes * block.expansion, stride), + norm_layer(planes * block.expansion), + ) + + layers = [] + layers.append(block(self.inplanes, planes, stride, downsample, self.groups, + self.base_width, previous_dilation, norm_layer)) + self.inplanes = planes * block.expansion + for _ in range(1, blocks): + layers.append(block(self.inplanes, planes, groups=self.groups, + base_width=self.base_width, dilation=self.dilation, + norm_layer=norm_layer)) + + return nn.Sequential(*layers) + + def _forward_impl(self, x): + # See note [TorchScript super()] + x = self.conv1(x) + x = self.bn1(x) + x = self.relu(x) + x = self.maxpool(x) + + x = self.layer1(x) + x = self.layer2(x) + x = self.layer3(x) + x = self.layer4(x) + + x = self.avgpool(x) + x = torch.flatten(x, 1) + x = self.fc(x) + + return x + + def forward(self, x): + return self._forward_impl(x) + + +def _resnet(arch, block, layers, pretrained, **kwargs): + model = ResNet(block, layers, **kwargs) + if pretrained: + model_path = Path(__file__).parent / "resnet50-19c8e357.pth" + state_dict = torch.load(model_path) + model.load_state_dict(state_dict) + return model + + +def resnet50(pretrained=True, **kwargs): + r"""ResNet-50 model from + `"Deep Residual Learning for Image Recognition" `_ + Args: + pretrained (bool): If True, returns a model pre-trained on ImageNet + """ + return _resnet('resnet50', Bottleneck, [3, 4, 6, 3], pretrained, + **kwargs) \ No newline at end of file diff --git a/facelib/RetinaFaceExtractor.py b/facelib/RetinaFaceExtractor.py new file mode 100644 index 0000000..9f02fc8 --- /dev/null +++ b/facelib/RetinaFaceExtractor.py @@ -0,0 +1,343 @@ +import operator +from pathlib import Path + +import cv2 +import numpy as np + +import torch +import torch.nn as nn +import torch.nn.functional as F + +from facelib.nn_pt import nn as nn_pt + +import torchvision.models as models +import torchvision.models.detection.backbone_utils as backbone_utils +import torchvision.models._utils as _utils +from torchvision.transforms import ToTensor + +from facelib.net import FPN as FPN +from facelib.net import SSH as SSH + +from facelib.box_utils import decode +from facelib.prior_box import PriorBox + +from facelib.ResNet50 import resnet50 + +from facelib.config import cfg_re50 +"""Ported from https://github.com/biubug6/Pytorch_Retinaface/""" + +class RetinaFaceExtractor(object): + def __init__(self, place_model_on_cpu=False): + nn_pt.initialize() + + model_path = Path(__file__).parent / "RetinaFace-Resnet50.pth" + + if not model_path.exists(): + raise Exception("Unable to load RetinaFace-Resnet50.pth") + + class ClassHead(nn.Module): + def __init__(self,inchannels=512,num_anchors=3): + super(ClassHead,self).__init__() + self.num_anchors = num_anchors + self.conv1x1 = nn.Conv2d(inchannels,self.num_anchors*2,kernel_size=(1,1),stride=1,padding=0) + + def forward(self,x): + out = self.conv1x1(x) + out = out.permute(0,2,3,1).contiguous() + + return out.view(out.shape[0], -1, 2) + + + class BboxHead(nn.Module): + def __init__(self,inchannels=512,num_anchors=3): + super(BboxHead,self).__init__() + self.conv1x1 = nn.Conv2d(inchannels,num_anchors*4,kernel_size=(1,1),stride=1,padding=0) + + def forward(self,x): + out = self.conv1x1(x) + out = out.permute(0,2,3,1).contiguous() + + return out.view(out.shape[0], -1, 4) + + + class LandmarkHead(nn.Module): + def __init__(self,inchannels=512,num_anchors=3): + super(LandmarkHead,self).__init__() + self.conv1x1 = nn.Conv2d(inchannels,num_anchors*10,kernel_size=(1,1),stride=1,padding=0) + + def forward(self,x): + out = self.conv1x1(x) + out = out.permute(0,2,3,1).contiguous() + + return out.view(out.shape[0], -1, 10) + + class RetinaFace(nn.Module): + def __init__(self, cfg = cfg_re50): + super(RetinaFace,self).__init__() + backbone = resnet50(pretrained=cfg['pretrain']) + + self.body = _utils.IntermediateLayerGetter(backbone, cfg['return_layers']) + in_channels_stage2 = cfg['in_channel'] + in_channels_list = [in_channels_stage2 * 2, + in_channels_stage2 * 4, + in_channels_stage2 * 8,] + out_channels = cfg['out_channel'] + + self.fpn = FPN(in_channels_list,out_channels) + self.ssh1 = SSH(out_channels, out_channels) + self.ssh2 = SSH(out_channels, out_channels) + self.ssh3 = SSH(out_channels, out_channels) + + self.ClassHead = self._make_class_head(fpn_num=3, inchannels=cfg['out_channel']) + self.BboxHead = self._make_bbox_head(fpn_num=3, inchannels=cfg['out_channel']) + self.LandmarkHead = self._make_landmark_head(fpn_num=3, inchannels=cfg['out_channel']) + + def _make_class_head(self,fpn_num=3,inchannels=64,anchor_num=2): + classhead = nn.ModuleList() + for i in range(fpn_num): + classhead.append(ClassHead(inchannels,anchor_num)) + return classhead + + def _make_bbox_head(self,fpn_num=3,inchannels=64,anchor_num=2): + bboxhead = nn.ModuleList() + for i in range(fpn_num): + bboxhead.append(BboxHead(inchannels,anchor_num)) + return bboxhead + + def _make_landmark_head(self,fpn_num=3,inchannels=64,anchor_num=2): + landmarkhead = nn.ModuleList() + for i in range(fpn_num): + landmarkhead.append(LandmarkHead(inchannels,anchor_num)) + return landmarkhead + + + def forward(self,inputs): + out = self.body(inputs) + + # FPN + fpn = self.fpn(out) + + # SSH + feature1 = self.ssh1(fpn[0]) + feature2 = self.ssh2(fpn[1]) + feature3 = self.ssh3(fpn[2]) + features = [feature1, feature2, feature3] + + bbox_regressions = torch.cat([self.BboxHead[i](feature) for i, feature in enumerate(features)], dim=1) + classifications = torch.cat([self.ClassHead[i](feature) for i, feature in enumerate(features)],dim=1) + ldm_regressions = torch.cat([self.LandmarkHead[i](feature) for i, feature in enumerate(features)], dim=1) + + output = (bbox_regressions, F.softmax(classifications, dim=-1), ldm_regressions) + + return output + + def check_keys(model, pretrained_state_dict): + ckpt_keys = set(pretrained_state_dict.keys()) + model_keys = set(model.state_dict().keys()) + used_pretrained_keys = model_keys & ckpt_keys + unused_pretrained_keys = ckpt_keys - model_keys + missing_keys = model_keys - ckpt_keys + assert len(used_pretrained_keys) > 0, 'load NONE from pretrained checkpoint' + return True + + def remove_prefix(state_dict, prefix): + ''' Old style model is stored with all names of parameters sharing common prefix 'module.' ''' + f = lambda x: x.split(prefix, 1)[-1] if x.startswith(prefix) else x + return {f(key): value for key, value in state_dict.items()} + + def load_model(model, pretrained_path, load_to_cpu): + if load_to_cpu: + pretrained_dict = torch.load(pretrained_path, map_location=lambda storage, loc: storage) + else: + device = torch.cuda.current_device() + pretrained_dict = torch.load(pretrained_path, map_location=lambda storage, loc: storage.cuda(device)) + if "state_dict" in pretrained_dict.keys(): + pretrained_dict = remove_prefix(pretrained_dict['state_dict'], 'module.') + else: + pretrained_dict = remove_prefix(pretrained_dict, 'module.') + check_keys(model, pretrained_dict) + model.load_state_dict(pretrained_dict, strict=False) + return model + + try: + torch.set_grad_enabled(False) + self.model = RetinaFace(cfg=cfg_re50) + self.model = load_model(self.model, model_path, place_model_on_cpu) + self.model.eval() + + self.device = torch.device("cpu" if place_model_on_cpu else "cuda") + self.model = self.model.to(self.device) + + except: + self.model = None + print("Could not load RetinaFace") + + + + + + def __enter__(self): + return self + + def __exit__(self, exc_type=None, exc_value=None, traceback=None): + return False #pass exception between __enter__ and __exit__ to outter level + + def extract (self, input_image, is_bgr=True, is_remove_intersects=False): + + cfg = cfg_re50 + + if is_bgr: + input_image = input_image[:,:,::-1] + is_bgr = False + + (h, w, ch) = input_image.shape + + d = max(w, h) + scale_to = 640 if d >= 1280 else d / 2 + scale_to = max(64, scale_to) + + input_scale = d / scale_to + input_image = cv2.resize (input_image, ( int(w/input_scale), int(h/input_scale) ), interpolation=cv2.INTER_LINEAR) + + (h, w, ch) = input_image.shape + + with torch.no_grad(): + + input_image = ToTensor()(input_image) + input_image = input_image.to(self.device) + + loc, conf, landmarks = self.model( input_image[None,...] ) + + priorbox = PriorBox(cfg, image_size=(h, w)) + priors = priorbox.forward() + priors = priors.to(self.device) + prior_data = priors.data + + boxes = decode(loc.data.squeeze(0), prior_data, cfg['variance']) + boxes = np.float32(boxes.cpu().numpy()) + scores = np.float32(conf.squeeze(0).data.cpu().numpy()[:, 1]) + + inds = np.where(scores > 0.05)[0] + boxes = boxes[inds] + scores = scores[inds] + + dets = np.hstack((boxes, scores[:, np.newaxis])).astype(np.float32, copy=False) + + keep = self.refine_nms(dets, 0.3) + dets = dets[keep, :] + + dets = dets.tolist() + dets = [ x[:-1].astype(np.int) for x in dets if x[-1] >= 0.5 ] + + + + detected_faces = [] + + for ltrb in dets: + # l,t,r,b = [ x for x in ltrb] + l,t,r,b = [ x*input_scale for x in ltrb] + bt = b-t + if min(r-l,bt) < 40: #filtering faces < 40pix by any side + continue + b += bt*0.1 #enlarging bottom line a bit for 2DFAN-4, because default is not enough covering a chin + detected_faces.append ( [int(x) for x in (l,t,r,b) ] ) + + #sort by largest area first + detected_faces = [ [(l,t,r,b), (r-l)*(b-t) ] for (l,t,r,b) in detected_faces ] + detected_faces = sorted(detected_faces, key=operator.itemgetter(1), reverse=True ) + detected_faces = [ x[0] for x in detected_faces] + + if is_remove_intersects: + for i in range( len(detected_faces)-1, 0, -1): + l1,t1,r1,b1 = detected_faces[i] + l0,t0,r0,b0 = detected_faces[i-1] + + dx = min(r0, r1) - max(l0, l1) + dy = min(b0, b1) - max(t0, t1) + if (dx>=0) and (dy>=0): + detected_faces.pop(i) + + return detected_faces + + + + def refine(self, olist): + bboxlist = [] + for i, ((ocls,), (oreg,)) in enumerate ( ): + stride = 2**(i + 2) # 4,8,16,32,64,128 + s_d2 = stride / 2 + s_m4 = stride * 4 + + for hindex, windex in zip(*np.where(ocls[...,1] > 0.05)): + score = ocls[hindex, windex, 1] + loc = oreg[hindex, windex, :] + priors = np.array([windex * stride + s_d2, hindex * stride + s_d2, s_m4, s_m4]) + priors_2p = priors[2:] + box = np.concatenate((priors[:2] + loc[:2] * 0.1 * priors_2p, + priors_2p * np.exp(loc[2:] * 0.2)) ) + box[:2] -= box[2:] / 2 + box[2:] += box[:2] + + bboxlist.append([*box, score]) + + bboxlist = np.array(bboxlist) + if len(bboxlist) == 0: + bboxlist = np.zeros((1, 5)) + + bboxlist = bboxlist[self.refine_nms(bboxlist, 0.3), :] + bboxlist = [ x[:-1].astype(np.int) for x in bboxlist if x[-1] >= 0.5] + return bboxlist + + def refine_nms2(self, dets, thresh): + keep = list() + if len(dets) == 0: + return keep + + x_1, y_1, x_2, y_2, scores = dets[:, 0], dets[:, 1], dets[:, 2], dets[:, 3], dets[:, 4] + areas = (x_2 - x_1 + 1) * (y_2 - y_1 + 1) + order = scores.argsort()[::-1] + + keep = [] + while order.size > 0: + i = order[0] + keep.append(i) + xx_1, yy_1 = np.maximum(x_1[i], x_1[order[1:]]), np.maximum(y_1[i], y_1[order[1:]]) + xx_2, yy_2 = np.minimum(x_2[i], x_2[order[1:]]), np.minimum(y_2[i], y_2[order[1:]]) + + width, height = np.maximum(0.0, xx_2 - xx_1 + 1), np.maximum(0.0, yy_2 - yy_1 + 1) + ovr = width * height / (areas[i] + areas[order[1:]] - width * height) + + inds = np.where(ovr <= thresh)[0] + order = order[inds + 1] + return keep + + + def refine_nms(self, dets, thresh): + """Pure Python NMS baseline.""" + x1 = dets[:, 0] + y1 = dets[:, 1] + x2 = dets[:, 2] + y2 = dets[:, 3] + scores = dets[:, 4] + + areas = (x2 - x1 + 1) * (y2 - y1 + 1) + order = scores.argsort()[::-1] + + keep = [] + while order.size > 0: + i = order[0] + keep.append(i) + xx1 = np.maximum(x1[i], x1[order[1:]]) + yy1 = np.maximum(y1[i], y1[order[1:]]) + xx2 = np.minimum(x2[i], x2[order[1:]]) + yy2 = np.minimum(y2[i], y2[order[1:]]) + + w = np.maximum(0.0, xx2 - xx1 + 1) + h = np.maximum(0.0, yy2 - yy1 + 1) + inter = w * h + ovr = inter / (areas[i] + areas[order[1:]] - inter) + + inds = np.where(ovr <= thresh)[0] + order = order[inds + 1] + + return keep diff --git a/facelib/box_utils.py b/facelib/box_utils.py new file mode 100644 index 0000000..daea71b --- /dev/null +++ b/facelib/box_utils.py @@ -0,0 +1,328 @@ +import torch +import numpy as np + + +def point_form(boxes): + """ Convert prior_boxes to (xmin, ymin, xmax, ymax) + representation for comparison to point form ground truth data. + Args: + boxes: (tensor) center-size default boxes from priorbox layers. + Return: + boxes: (tensor) Converted xmin, ymin, xmax, ymax form of boxes. + """ + return torch.cat((boxes[:, :2] - boxes[:, 2:]/2, # xmin, ymin + boxes[:, :2] + boxes[:, 2:]/2), 1) # xmax, ymax + + +def center_size(boxes): + """ Convert prior_boxes to (cx, cy, w, h) + representation for comparison to center-size form ground truth data. + Args: + boxes: (tensor) point_form boxes + Return: + boxes: (tensor) Converted xmin, ymin, xmax, ymax form of boxes. + """ + return torch.cat((boxes[:, 2:] + boxes[:, :2])/2, # cx, cy + boxes[:, 2:] - boxes[:, :2], 1) # w, h + + +def intersect(box_a, box_b): + """ We resize both tensors to [A,B,2] without new malloc: + [A,2] -> [A,1,2] -> [A,B,2] + [B,2] -> [1,B,2] -> [A,B,2] + Then we compute the area of intersect between box_a and box_b. + Args: + box_a: (tensor) bounding boxes, Shape: [A,4]. + box_b: (tensor) bounding boxes, Shape: [B,4]. + Return: + (tensor) intersection area, Shape: [A,B]. + """ + A = box_a.size(0) + B = box_b.size(0) + max_xy = torch.min(box_a[:, 2:].unsqueeze(1).expand(A, B, 2), + box_b[:, 2:].unsqueeze(0).expand(A, B, 2)) + min_xy = torch.max(box_a[:, :2].unsqueeze(1).expand(A, B, 2), + box_b[:, :2].unsqueeze(0).expand(A, B, 2)) + inter = torch.clamp((max_xy - min_xy), min=0) + return inter[:, :, 0] * inter[:, :, 1] + + +def jaccard(box_a, box_b): + """Compute the jaccard overlap of two sets of boxes. The jaccard overlap + is simply the intersection over union of two boxes. Here we operate on + ground truth boxes and default boxes. + E.g.: + A ∩ B / A ∪ B = A ∩ B / (area(A) + area(B) - A ∩ B) + Args: + box_a: (tensor) Ground truth bounding boxes, Shape: [num_objects,4] + box_b: (tensor) Prior boxes from priorbox layers, Shape: [num_priors,4] + Return: + jaccard overlap: (tensor) Shape: [box_a.size(0), box_b.size(0)] + """ + inter = intersect(box_a, box_b) + area_a = ((box_a[:, 2]-box_a[:, 0]) * + (box_a[:, 3]-box_a[:, 1])).unsqueeze(1).expand_as(inter) # [A,B] + area_b = ((box_b[:, 2]-box_b[:, 0]) * + (box_b[:, 3]-box_b[:, 1])).unsqueeze(0).expand_as(inter) # [A,B] + union = area_a + area_b - inter + return inter / union # [A,B] + + +def matrix_iou(a, b): + """ + return iou of a and b, numpy version for data augenmentation + """ + lt = np.maximum(a[:, np.newaxis, :2], b[:, :2]) + rb = np.minimum(a[:, np.newaxis, 2:], b[:, 2:]) + + area_i = np.prod(rb - lt, axis=2) * (lt < rb).all(axis=2) + area_a = np.prod(a[:, 2:] - a[:, :2], axis=1) + area_b = np.prod(b[:, 2:] - b[:, :2], axis=1) + return area_i / (area_a[:, np.newaxis] + area_b - area_i) + + +def matrix_iof(a, b): + """ + return iof of a and b, numpy version for data augenmentation + """ + lt = np.maximum(a[:, np.newaxis, :2], b[:, :2]) + rb = np.minimum(a[:, np.newaxis, 2:], b[:, 2:]) + + area_i = np.prod(rb - lt, axis=2) * (lt < rb).all(axis=2) + area_a = np.prod(a[:, 2:] - a[:, :2], axis=1) + return area_i / np.maximum(area_a[:, np.newaxis], 1) + + +def match(threshold, truths, priors, variances, labels, landms, loc_t, conf_t, landm_t, idx): + """Match each prior box with the ground truth box of the highest jaccard + overlap, encode the bounding boxes, then return the matched indices + corresponding to both confidence and location preds. + Args: + threshold: (float) The overlap threshold used when mathing boxes. + truths: (tensor) Ground truth boxes, Shape: [num_obj, 4]. + priors: (tensor) Prior boxes from priorbox layers, Shape: [n_priors,4]. + variances: (tensor) Variances corresponding to each prior coord, + Shape: [num_priors, 4]. + labels: (tensor) All the class labels for the image, Shape: [num_obj]. + landms: (tensor) Ground truth landms, Shape [num_obj, 10]. + loc_t: (tensor) Tensor to be filled w/ endcoded location targets. + conf_t: (tensor) Tensor to be filled w/ matched indices for conf preds. + landm_t: (tensor) Tensor to be filled w/ endcoded landm targets. + idx: (int) current batch index + Return: + The matched indices corresponding to 1)location 2)confidence 3)landm preds. + """ + # jaccard index + overlaps = jaccard( + truths, + point_form(priors) + ) + # (Bipartite Matching) + # [1,num_objects] best prior for each ground truth + best_prior_overlap, best_prior_idx = overlaps.max(1, keepdim=True) + + # ignore hard gt + valid_gt_idx = best_prior_overlap[:, 0] >= 0.2 + best_prior_idx_filter = best_prior_idx[valid_gt_idx, :] + if best_prior_idx_filter.shape[0] <= 0: + loc_t[idx] = 0 + conf_t[idx] = 0 + return + + # [1,num_priors] best ground truth for each prior + best_truth_overlap, best_truth_idx = overlaps.max(0, keepdim=True) + best_truth_idx.squeeze_(0) + best_truth_overlap.squeeze_(0) + best_prior_idx.squeeze_(1) + best_prior_idx_filter.squeeze_(1) + best_prior_overlap.squeeze_(1) + best_truth_overlap.index_fill_(0, best_prior_idx_filter, 2) # ensure best prior + # TODO refactor: index best_prior_idx with long tensor + # ensure every gt matches with its prior of max overlap + for j in range(best_prior_idx.size(0)): # 判别此anchor是预测哪一个boxes + best_truth_idx[best_prior_idx[j]] = j + matches = truths[best_truth_idx] # Shape: [num_priors,4] 此处为每一个anchor对应的bbox取出来 + conf = labels[best_truth_idx] # Shape: [num_priors] 此处为每一个anchor对应的label取出来 + conf[best_truth_overlap < threshold] = 0 # label as background overlap<0.35的全部作为负样本 + loc = encode(matches, priors, variances) + + matches_landm = landms[best_truth_idx] + landm = encode_landm(matches_landm, priors, variances) + loc_t[idx] = loc # [num_priors,4] encoded offsets to learn + conf_t[idx] = conf # [num_priors] top class label for each prior + landm_t[idx] = landm + + +def encode(matched, priors, variances): + """Encode the variances from the priorbox layers into the ground truth boxes + we have matched (based on jaccard overlap) with the prior boxes. + Args: + matched: (tensor) Coords of ground truth for each prior in point-form + Shape: [num_priors, 4]. + priors: (tensor) Prior boxes in center-offset form + Shape: [num_priors,4]. + variances: (list[float]) Variances of priorboxes + Return: + encoded boxes (tensor), Shape: [num_priors, 4] + """ + + # dist b/t match center and prior's center + g_cxcy = (matched[:, :2] + matched[:, 2:])/2 - priors[:, :2] + # encode variance + g_cxcy /= (variances[0] * priors[:, 2:]) + # match wh / prior wh + g_wh = (matched[:, 2:] - matched[:, :2]) / priors[:, 2:] + g_wh = torch.log(g_wh) / variances[1] + # return target for smooth_l1_loss + return torch.cat([g_cxcy, g_wh], 1) # [num_priors,4] + +def encode_landm(matched, priors, variances): + """Encode the variances from the priorbox layers into the ground truth boxes + we have matched (based on jaccard overlap) with the prior boxes. + Args: + matched: (tensor) Coords of ground truth for each prior in point-form + Shape: [num_priors, 10]. + priors: (tensor) Prior boxes in center-offset form + Shape: [num_priors,4]. + variances: (list[float]) Variances of priorboxes + Return: + encoded landm (tensor), Shape: [num_priors, 10] + """ + + # dist b/t match center and prior's center + matched = torch.reshape(matched, (matched.size(0), 5, 2)) + priors_cx = priors[:, 0].unsqueeze(1).expand(matched.size(0), 5).unsqueeze(2) + priors_cy = priors[:, 1].unsqueeze(1).expand(matched.size(0), 5).unsqueeze(2) + priors_w = priors[:, 2].unsqueeze(1).expand(matched.size(0), 5).unsqueeze(2) + priors_h = priors[:, 3].unsqueeze(1).expand(matched.size(0), 5).unsqueeze(2) + priors = torch.cat([priors_cx, priors_cy, priors_w, priors_h], dim=2) + g_cxcy = matched[:, :, :2] - priors[:, :, :2] + # encode variance + g_cxcy /= (variances[0] * priors[:, :, 2:]) + # g_cxcy /= priors[:, :, 2:] + g_cxcy = g_cxcy.reshape(g_cxcy.size(0), -1) + # return target for smooth_l1_loss + return g_cxcy + + +# Adapted from https://github.com/Hakuyume/chainer-ssd +def decode(loc, priors, variances): + """Decode locations from predictions using priors to undo + the encoding we did for offset regression at train time. + Args: + loc (tensor): location predictions for loc layers, + Shape: [num_priors,4] + priors (tensor): Prior boxes in center-offset form. + Shape: [num_priors,4]. + variances: (list[float]) Variances of priorboxes + Return: + decoded bounding box predictions + """ + + boxes = torch.cat(( + priors[:, :2] + loc[:, :2] * variances[0] * priors[:, 2:], + priors[:, 2:] * torch.exp(loc[:, 2:] * variances[1])), 1) + boxes[:, :2] -= boxes[:, 2:] / 2 + boxes[:, 2:] += boxes[:, :2] + return boxes + +def decode_landm(pre, priors, variances): + """Decode landm from predictions using priors to undo + the encoding we did for offset regression at train time. + Args: + pre (tensor): landm predictions for loc layers, + Shape: [num_priors,10] + priors (tensor): Prior boxes in center-offset form. + Shape: [num_priors,4]. + variances: (list[float]) Variances of priorboxes + Return: + decoded landm predictions + """ + landms = torch.cat((priors[:, :2] + pre[:, :2] * variances[0] * priors[:, 2:], + priors[:, :2] + pre[:, 2:4] * variances[0] * priors[:, 2:], + priors[:, :2] + pre[:, 4:6] * variances[0] * priors[:, 2:], + priors[:, :2] + pre[:, 6:8] * variances[0] * priors[:, 2:], + priors[:, :2] + pre[:, 8:10] * variances[0] * priors[:, 2:], + ), dim=1) + return landms + + +def log_sum_exp(x): + """Utility function for computing log_sum_exp while determining + This will be used to determine unaveraged confidence loss across + all examples in a batch. + Args: + x (Variable(tensor)): conf_preds from conf layers + """ + x_max = x.data.max() + return torch.log(torch.sum(torch.exp(x-x_max), 1, keepdim=True)) + x_max + + +# Original author: Francisco Massa: +# https://github.com/fmassa/object-detection.torch +# Ported to PyTorch by Max deGroot (02/01/2017) +def nms(boxes, scores, overlap=0.5, top_k=200): + """Apply non-maximum suppression at test time to avoid detecting too many + overlapping bounding boxes for a given object. + Args: + boxes: (tensor) The location preds for the img, Shape: [num_priors,4]. + scores: (tensor) The class predscores for the img, Shape:[num_priors]. + overlap: (float) The overlap thresh for suppressing unnecessary boxes. + top_k: (int) The Maximum number of box preds to consider. + Return: + The indices of the kept boxes with respect to num_priors. + """ + + keep = torch.Tensor(scores.size(0)).fill_(0).long() + if boxes.numel() == 0: + return keep + x1 = boxes[:, 0] + y1 = boxes[:, 1] + x2 = boxes[:, 2] + y2 = boxes[:, 3] + area = torch.mul(x2 - x1, y2 - y1) + v, idx = scores.sort(0) # sort in ascending order + # I = I[v >= 0.01] + idx = idx[-top_k:] # indices of the top-k largest vals + xx1 = boxes.new() + yy1 = boxes.new() + xx2 = boxes.new() + yy2 = boxes.new() + w = boxes.new() + h = boxes.new() + + # keep = torch.Tensor() + count = 0 + while idx.numel() > 0: + i = idx[-1] # index of current largest val + # keep.append(i) + keep[count] = i + count += 1 + if idx.size(0) == 1: + break + idx = idx[:-1] # remove kept element from view + # load bboxes of next highest vals + torch.index_select(x1, 0, idx, out=xx1) + torch.index_select(y1, 0, idx, out=yy1) + torch.index_select(x2, 0, idx, out=xx2) + torch.index_select(y2, 0, idx, out=yy2) + # store element-wise max with next highest score + xx1 = torch.clamp(xx1, min=x1[i]) + yy1 = torch.clamp(yy1, min=y1[i]) + xx2 = torch.clamp(xx2, max=x2[i]) + yy2 = torch.clamp(yy2, max=y2[i]) + w.resize_as_(xx2) + h.resize_as_(yy2) + w = xx2 - xx1 + h = yy2 - yy1 + # check sizes of xx1 and xx2.. after each iteration + w = torch.clamp(w, min=0.0) + h = torch.clamp(h, min=0.0) + inter = w*h + # IoU = i / (area(a) + area(b) - i) + rem_areas = torch.index_select(area, 0, idx) # load remaining areas) + union = (rem_areas - inter) + area[i] + IoU = inter/union # store result in iou + # keep only elements with an IoU <= overlap + idx = idx[IoU.le(overlap)] + return keep, count \ No newline at end of file diff --git a/facelib/coord_conv.py b/facelib/coord_conv.py new file mode 100644 index 0000000..f5c9400 --- /dev/null +++ b/facelib/coord_conv.py @@ -0,0 +1,153 @@ +import torch +import torch.nn as nn + + +class AddCoordsTh(nn.Module): + def __init__(self, x_dim=64, y_dim=64, with_r=False, with_boundary=False): + super(AddCoordsTh, self).__init__() + self.x_dim = x_dim + self.y_dim = y_dim + self.with_r = with_r + self.with_boundary = with_boundary + + def forward(self, input_tensor, heatmap=None): + """ + input_tensor: (batch, c, x_dim, y_dim) + """ + batch_size_tensor = input_tensor.shape[0] + + xx_ones = torch.ones([1, self.y_dim], dtype=torch.int32)#.cuda() + xx_ones = xx_ones.unsqueeze(-1) + + xx_range = torch.arange(self.x_dim, dtype=torch.int32).unsqueeze(0)#.cuda() + xx_range = xx_range.unsqueeze(1) + + xx_channel = torch.matmul(xx_ones.float(), xx_range.float()) + xx_channel = xx_channel.unsqueeze(-1) + + + yy_ones = torch.ones([1, self.x_dim], dtype=torch.int32)#.cuda() + yy_ones = yy_ones.unsqueeze(1) + + yy_range = torch.arange(self.y_dim, dtype=torch.int32).unsqueeze(0)#.cuda() + yy_range = yy_range.unsqueeze(-1) + + yy_channel = torch.matmul(yy_range.float(), yy_ones.float()) + yy_channel = yy_channel.unsqueeze(-1) + + xx_channel = xx_channel.permute(0, 3, 2, 1) + yy_channel = yy_channel.permute(0, 3, 2, 1) + + xx_channel = xx_channel / (self.x_dim - 1) + yy_channel = yy_channel / (self.y_dim - 1) + + xx_channel = xx_channel * 2 - 1 + yy_channel = yy_channel * 2 - 1 + + xx_channel = xx_channel.repeat(batch_size_tensor, 1, 1, 1) + yy_channel = yy_channel.repeat(batch_size_tensor, 1, 1, 1) + + if self.with_boundary and type(heatmap) != type(None): + boundary_channel = torch.clamp(heatmap[:, -1:, :, :], + 0.0, 1.0) + + zero_tensor = torch.zeros_like(xx_channel) + xx_boundary_channel = torch.where(boundary_channel>0.05, + xx_channel, zero_tensor) + yy_boundary_channel = torch.where(boundary_channel>0.05, + yy_channel, zero_tensor) + if self.with_boundary and type(heatmap) != type(None): + xx_boundary_channel = xx_boundary_channel#.cuda() + yy_boundary_channel = yy_boundary_channel#.cuda() + ret = torch.cat([input_tensor, xx_channel, yy_channel], dim=1) + + + if self.with_r: + rr = torch.sqrt(torch.pow(xx_channel, 2) + torch.pow(yy_channel, 2)) + rr = rr / torch.max(rr) + ret = torch.cat([ret, rr], dim=1) + + if self.with_boundary and type(heatmap) != type(None): + ret = torch.cat([ret, xx_boundary_channel, + yy_boundary_channel], dim=1) + return ret + + +class CoordConvTh(nn.Module): + """CoordConv layer as in the paper.""" + def __init__(self, x_dim, y_dim, with_r, with_boundary, + in_channels, first_one=False, *args, **kwargs): + super(CoordConvTh, self).__init__() + self.addcoords = AddCoordsTh(x_dim=x_dim, y_dim=y_dim, with_r=with_r, + with_boundary=with_boundary) + in_channels += 2 + if with_r: + in_channels += 1 + if with_boundary and not first_one: + in_channels += 2 + self.conv = nn.Conv2d(in_channels=in_channels, *args, **kwargs) + + def forward(self, input_tensor, heatmap=None): + ret = self.addcoords(input_tensor, heatmap) + last_channel = ret[:, -2:, :, :] + ret = self.conv(ret) + return ret, last_channel + + +''' +An alternative implementation for PyTorch with auto-infering the x-y dimensions. +''' +class AddCoords(nn.Module): + + def __init__(self, with_r=False): + super().__init__() + self.with_r = with_r + + def forward(self, input_tensor): + """ + Args: + input_tensor: shape(batch, channel, x_dim, y_dim) + """ + batch_size, _, x_dim, y_dim = input_tensor.size() + + xx_channel = torch.arange(x_dim).repeat(1, y_dim, 1) + yy_channel = torch.arange(y_dim).repeat(1, x_dim, 1).transpose(1, 2) + + xx_channel = xx_channel / (x_dim - 1) + yy_channel = yy_channel / (y_dim - 1) + + xx_channel = xx_channel * 2 - 1 + yy_channel = yy_channel * 2 - 1 + + xx_channel = xx_channel.repeat(batch_size, 1, 1, 1).transpose(2, 3) + yy_channel = yy_channel.repeat(batch_size, 1, 1, 1).transpose(2, 3) + + if input_tensor.is_cuda: + xx_channel = xx_channel#.cuda() + yy_channel = yy_channel#.cuda() + + ret = torch.cat([ + input_tensor, + xx_channel.type_as(input_tensor), + yy_channel.type_as(input_tensor)], dim=1) + + if self.with_r: + rr = torch.sqrt(torch.pow(xx_channel - 0.5, 2) + torch.pow(yy_channel - 0.5, 2)) + if input_tensor.is_cuda: + rr = rr#.cuda() + ret = torch.cat([ret, rr], dim=1) + + return ret + + +class CoordConv(nn.Module): + + def __init__(self, in_channels, out_channels, with_r=False, **kwargs): + super().__init__() + self.addcoords = AddCoords(with_r=with_r) + self.conv = nn.Conv2d(in_channels + 2, out_channels, **kwargs) + + def forward(self, x): + ret = self.addcoords(x) + ret = self.conv(ret) + return ret \ No newline at end of file diff --git a/facelib/prior_box.py b/facelib/prior_box.py new file mode 100644 index 0000000..e20e443 --- /dev/null +++ b/facelib/prior_box.py @@ -0,0 +1,34 @@ +import torch +from itertools import product as product +import numpy as np +from math import ceil + + +class PriorBox(object): + def __init__(self, cfg, image_size=None, phase='test'): + super(PriorBox, self).__init__() + self.min_sizes = cfg['min_sizes'] + self.steps = cfg['steps'] + self.clip = cfg['clip'] + self.image_size = image_size + self.feature_maps = [[ceil(self.image_size[0]/step), ceil(self.image_size[1]/step)] for step in self.steps] + self.name = "s" + + def forward(self): + anchors = [] + for k, f in enumerate(self.feature_maps): + min_sizes = self.min_sizes[k] + for i, j in product(range(f[0]), range(f[1])): + for min_size in min_sizes: + s_kx = min_size / self.image_size[1] + s_ky = min_size / self.image_size[0] + dense_cx = [x * self.steps[k] / self.image_size[1] for x in [j + 0.5]] + dense_cy = [y * self.steps[k] / self.image_size[0] for y in [i + 0.5]] + for cy, cx in product(dense_cy, dense_cx): + anchors += [cx, cy, s_kx, s_ky] + + # back to torch land + output = torch.Tensor(anchors).view(-1, 4) + if self.clip: + output.clamp_(max=1, min=0) + return output \ No newline at end of file