From 63adc2995e288919b69dbe314882e0f2d186da96 Mon Sep 17 00:00:00 2001 From: iperov Date: Mon, 18 Oct 2021 16:56:04 +0400 Subject: [PATCH] +xlib.facemeta and refactoring --- apps/DeepFaceLive/backend/BackendBase.py | 91 +++--- apps/DeepFaceLive/backend/CameraSource.py | 2 +- apps/DeepFaceLive/backend/FaceAligner.py | 41 +-- apps/DeepFaceLive/backend/FaceDetector.py | 32 +- apps/DeepFaceLive/backend/FaceMarker.py | 31 +- apps/DeepFaceLive/backend/FaceMerger.py | 57 ++-- apps/DeepFaceLive/backend/FaceSwapper.py | 61 ++-- apps/DeepFaceLive/backend/FileSource.py | 2 +- apps/DeepFaceLive/backend/FrameAdjuster.py | 6 +- apps/DeepFaceLive/backend/StreamOutput.py | 25 +- apps/DeepFaceLive/ui/QFaceDetector.py | 14 +- .../ui/widgets/QBCFaceAlignViewer.py | 41 +-- .../ui/widgets/QBCFaceSwapViewer.py | 18 +- .../ui/widgets/QBCFinalFrameViewer.py | 16 +- .../DeepFaceLive/ui/widgets/QBCFrameViewer.py | 9 +- scripts/dev.py | 205 ++++++++++++- xlib/facemeta/ELandmarks2D.py | 6 + xlib/facemeta/EMaskType.py | 4 + .../{FaceULandmarks.py => FLandmarks2D.py} | 62 ++-- xlib/facemeta/FMask.py | 40 +++ xlib/facemeta/{FacePose.py => FPose.py} | 6 +- xlib/facemeta/{FaceURect.py => FRect.py} | 25 +- xlib/facemeta/Faceset.py | 286 +++++++++++++----- xlib/facemeta/UFaceMark.py | 99 ++++++ xlib/facemeta/UImage.py | 59 ++++ xlib/facemeta/UPerson.py | 39 +++ xlib/facemeta/__init__.py | 87 +++--- xlib/facemeta/face.py | 211 ------------- 28 files changed, 962 insertions(+), 613 deletions(-) create mode 100644 xlib/facemeta/ELandmarks2D.py create mode 100644 xlib/facemeta/EMaskType.py rename xlib/facemeta/{FaceULandmarks.py => FLandmarks2D.py} (97%) create mode 100644 xlib/facemeta/FMask.py rename xlib/facemeta/{FacePose.py => FPose.py} (92%) rename xlib/facemeta/{FaceURect.py => FRect.py} (89%) create mode 100644 xlib/facemeta/UFaceMark.py create mode 100644 xlib/facemeta/UImage.py create mode 100644 xlib/facemeta/UPerson.py delete mode 100644 xlib/facemeta/face.py diff --git a/apps/DeepFaceLive/backend/BackendBase.py b/apps/DeepFaceLive/backend/BackendBase.py index fc7a6bd..fe79507 100644 --- a/apps/DeepFaceLive/backend/BackendBase.py +++ b/apps/DeepFaceLive/backend/BackendBase.py @@ -5,10 +5,32 @@ from typing import List, Union, Tuple import numpy as np from xlib import mp as lib_mp from xlib import time as lib_time -from xlib.facemeta import FaceMark from xlib.mp import csw as lib_csw from xlib.python.EventListener import EventListener +from xlib.facemeta import FRect, FLandmarks2D, FPose + +class BackendFaceSwapInfo: + def __init__(self): + self.image_name = None + self.face_urect : FRect = None + self.face_pose : FPose = None + self.face_ulmrks : FLandmarks2D = None + + self.face_align_image_name : str = None + self.face_align_mask_name : str = None + self.face_swap_image_name : str = None + self.face_swap_mask_name : str = None + + self.image_to_align_uni_mat = None + self.face_align_ulmrks : FLandmarks2D = None + + def __getstate__(self): + return self.__dict__.copy() + + def __setstate__(self, d): + self.__init__() + self.__dict__.update(d) class BackendConnectionData: """ @@ -27,13 +49,14 @@ class BackendConnectionData: self._uid = uid self._is_frame_reemitted = None - self._frame_name = None + self._frame_image_name = None self._frame_count = None self._frame_num = None self._frame_fps = None self._frame_timestamp = None - self._merged_frame_name = None - self._face_mark_list = [] + self._merged_image_name = None + + self._face_swap_info_list = [] def __getstate__(self, ): d = self.__dict__.copy() @@ -43,43 +66,43 @@ class BackendConnectionData: def assign_weak_heap(self, weak_heap : lib_mp.MPWeakHeap): self._weak_heap = weak_heap - def set_file(self, name : str, data : Union[bytes, bytearray, memoryview]): - self._weak_heap_refs[name] = self._weak_heap.add_data(data) + def set_file(self, key, data : Union[bytes, bytearray, memoryview]): + self._weak_heap_refs[key] = self._weak_heap.add_data(data) - def get_file(self, name : str) -> Union[bytes, None]: - ref = self._weak_heap_refs.get(name, None) + def get_file(self, key) -> Union[bytes, None]: + ref = self._weak_heap_refs.get(key, None) if ref is not None: return self._weak_heap.get_data(ref) return None - def set_image(self, name : str, image : np.ndarray): + def set_image(self, key, image : np.ndarray): """ store image to weak heap - name str + key image np.ndarray """ - self.set_file(name, image.data) - self._weak_heap_image_infos[name] = (image.shape, image.dtype) - - def get_image_shape_dtype(self, name:str) -> Union[None, Tuple[List, np.dtype]]: + self.set_file(key, image.data) + self._weak_heap_image_infos[key] = (image.shape, image.dtype) + + def get_image_shape_dtype(self, key) -> Union[None, Tuple[List, np.dtype]]: """ returns (image shape, dtype) or (None, None) if file does not exist """ - if name is None: - return None - image_info = self._weak_heap_image_infos.get(name, None) + if key is None: + return (None, None) + image_info = self._weak_heap_image_infos.get(key, None) if image_info is not None: shape, dtype = image_info return shape, dtype return (None, None) - - def get_image(self, name : str) -> Union[np.ndarray, None]: - if name is None: + + def get_image(self, key) -> Union[np.ndarray, None]: + if key is None: return None - image_info = self._weak_heap_image_infos.get(name, None) - buffer = self.get_file(name) + image_info = self._weak_heap_image_infos.get(key, None) + buffer = self.get_file(key) if image_info is not None and buffer is not None: shape, dtype = image_info @@ -90,10 +113,6 @@ class BackendConnectionData: def get_is_frame_reemitted(self) -> Union[bool, None]: return self._is_frame_reemitted def set_is_frame_reemitted(self, is_frame_reemitted : bool): self._is_frame_reemitted = is_frame_reemitted - - def get_frame_name(self) -> Union[str, None]: return self._frame_name - def set_frame_name(self, frame_name : str): self._frame_name = frame_name - def get_frame_count(self) -> Union[int, None]: return self._frame_count def set_frame_count(self, frame_count : int): self._frame_count = frame_count def get_frame_num(self) -> Union[int, None]: return self._frame_num @@ -103,14 +122,17 @@ class BackendConnectionData: def get_frame_timestamp(self) -> Union[float, None]: return self._frame_timestamp def set_frame_timestamp(self, frame_timestamp : float): self._frame_timestamp = frame_timestamp - def get_merged_frame_name(self) -> Union[str, None]: return self._merged_frame_name - def set_merged_frame_name(self, merged_frame_name : str): self._merged_frame_name = merged_frame_name + def get_frame_image_name(self) -> Union[str, None]: return self._frame_image_name + def set_frame_image_name(self, frame_image_name : str): self._frame_image_name = frame_image_name + def get_merged_image_name(self) -> Union[str, None]: return self._merged_image_name + def set_merged_image_name(self, merged_frame_name : str): self._merged_image_name = merged_frame_name + + def get_face_swap_info_list(self) -> List[BackendFaceSwapInfo]: return self._face_swap_info_list + def add_face_swap_info(self, fsi : BackendFaceSwapInfo): + if not isinstance(fsi, BackendFaceSwapInfo): + raise ValueError(f'fsi must be an instance of BackendFaceSwapInfo') + self._face_swap_info_list.append(fsi) - def get_face_mark_list(self) -> List[FaceMark]: return self._face_mark_list - def add_face_mark(self, face_mark : FaceMark): - if not isinstance(face_mark, FaceMark): - raise ValueError(f'face_mark must be an instance of FaceMark') - self._face_mark_list.append(face_mark) class BackendConnection: @@ -144,7 +166,7 @@ class BackendConnection: def is_full_read(self, buffer_size=0) -> bool: """ if fully readed by receiver side minus buffer_size - """ + """ return self._rd.get_read_id() >= (self._rd.get_write_id() - buffer_size) @@ -205,3 +227,4 @@ class BackendWorker(lib_csw.Worker): def stop_profile_timing(self): self.send_msg('_profile_timing', self._profile_timing_measurer.stop() ) + diff --git a/apps/DeepFaceLive/backend/CameraSource.py b/apps/DeepFaceLive/backend/CameraSource.py index 604d546..db3cd6a 100644 --- a/apps/DeepFaceLive/backend/CameraSource.py +++ b/apps/DeepFaceLive/backend/CameraSource.py @@ -253,7 +253,7 @@ class CameraSourceWorker(BackendWorker): bcd.assign_weak_heap(self.weak_heap) frame_name = f'Camera_{state.device_idx}_{bcd_uid:06}' - bcd.set_frame_name(frame_name) + bcd.set_frame_image_name(frame_name) bcd.set_frame_num(bcd_uid) bcd.set_frame_timestamp(timestamp) bcd.set_image(frame_name, img) diff --git a/apps/DeepFaceLive/backend/FaceAligner.py b/apps/DeepFaceLive/backend/FaceAligner.py index e3a9d7d..1567465 100644 --- a/apps/DeepFaceLive/backend/FaceAligner.py +++ b/apps/DeepFaceLive/backend/FaceAligner.py @@ -2,7 +2,6 @@ import time import numpy as np from xlib import os as lib_os -from xlib.facemeta import FaceAlign, FaceULandmarks from xlib.mp import csw as lib_csw from xlib.python import all_is_not_None @@ -51,7 +50,7 @@ class FaceAlignerWorker(BackendWorker): cs.exclude_moving_parts.enable() cs.exclude_moving_parts.set_flag(state.exclude_moving_parts if state.exclude_moving_parts is not None else True) - + cs.head_mode.enable() cs.head_mode.set_flag(state.head_mode if state.head_mode is not None else False) @@ -91,7 +90,7 @@ class FaceAlignerWorker(BackendWorker): state.head_mode = head_mode self.save_state() self.reemit_frame_signal.send() - + def on_cs_x_offset(self, x_offset): state, cs = self.get_state(), self.get_control_sheet() cfg = cs.x_offset.get_config() @@ -118,21 +117,17 @@ class FaceAlignerWorker(BackendWorker): if bcd is not None: bcd.assign_weak_heap(self.weak_heap) - frame_name = bcd.get_frame_name() - frame_image = bcd.get_image(frame_name) + frame_image_name = bcd.get_frame_image_name() + frame_image = bcd.get_image(frame_image_name) - if all_is_not_None(state.face_coverage, state.resolution, frame_name, frame_image): - for face_id,face_mark in enumerate( bcd.get_face_mark_list() ): - face_ulmrks = face_mark.get_face_ulandmarks_by_type(FaceULandmarks.Type.LANDMARKS_468) - if face_ulmrks is None: - face_ulmrks = face_mark.get_face_ulandmarks_by_type(FaceULandmarks.Type.LANDMARKS_68) - + if all_is_not_None(state.face_coverage, state.resolution, frame_image): + for face_id, fsi in enumerate( bcd.get_face_swap_info_list() ): head_yaw = None if state.head_mode: - face_pose = face_mark.get_face_pose() - if face_pose is not None: - head_yaw = face_pose.as_radians()[1] - + if fsi.face_pose is not None: + head_yaw = fsi.face_pose.as_radians()[1] + + face_ulmrks = fsi.face_ulmrks if face_ulmrks is not None: face_image, uni_mat = face_ulmrks.cut(frame_image, state.face_coverage, state.resolution, exclude_moving_parts=state.exclude_moving_parts, @@ -140,19 +135,11 @@ class FaceAlignerWorker(BackendWorker): x_offset=state.x_offset, y_offset=state.y_offset) - face_align_image_name = f'{frame_name}_{face_id}_aligned' + fsi.face_align_image_name = f'{frame_image_name}_{face_id}_aligned' + fsi.image_to_align_uni_mat = uni_mat + fsi.face_align_ulmrks = face_ulmrks.transform(uni_mat) - face_align = FaceAlign() - face_align.set_image_name(face_align_image_name) - face_align.set_coverage(state.face_coverage) - face_align.set_source_face_ulandmarks_type(face_ulmrks.get_type()) - face_align.set_source_to_aligned_uni_mat(uni_mat) - - for face_ulmrks in face_mark.get_face_ulandmarks_list(): - face_align.add_face_ulandmarks( face_ulmrks.transform(uni_mat) ) - face_mark.set_face_align(face_align) - - bcd.set_image(face_align_image_name, face_image) + bcd.set_image(fsi.face_align_image_name, face_image) self.stop_profile_timing() self.pending_bcd = bcd diff --git a/apps/DeepFaceLive/backend/FaceDetector.py b/apps/DeepFaceLive/backend/FaceDetector.py index a498804..9a3e36c 100644 --- a/apps/DeepFaceLive/backend/FaceDetector.py +++ b/apps/DeepFaceLive/backend/FaceDetector.py @@ -4,14 +4,14 @@ from enum import IntEnum import numpy as np from modelhub import onnx as onnx_models from xlib import os as lib_os -from xlib.facemeta import FaceMark, FaceURect +from xlib.facemeta import FRect from xlib.image import ImageProcessor from xlib.mp import csw as lib_csw from xlib.python import all_is_not_None from .BackendBase import (BackendConnection, BackendDB, BackendHost, BackendSignal, BackendWeakHeap, BackendWorker, - BackendWorkerState) + BackendWorkerState, BackendFaceSwapInfo) class DetectorType(IntEnum): @@ -213,10 +213,10 @@ class FaceDetectorWorker(BackendWorker): detector_state = state.get_detector_state() - frame_name = bcd.get_frame_name() - frame_image = bcd.get_image(frame_name) + frame_image_name = bcd.get_frame_image_name() + frame_image = bcd.get_image(frame_image_name) - if all_is_not_None(frame_image, frame_name): + if frame_image is not None: _,H,W,_ = ImageProcessor(frame_image).get_dims() rects = [] @@ -228,13 +228,13 @@ class FaceDetectorWorker(BackendWorker): rects = self.YoloV5Face.extract (frame_image, threshold=detector_state.threshold, fixed_window=detector_state.fixed_window_size)[0] # to list of FaceURect - rects = [ FaceURect.from_ltrb( (l/W, t/H, r/W, b/H) ) for l,t,r,b in rects ] + rects = [ FRect.from_ltrb( (l/W, t/H, r/W, b/H) ) for l,t,r,b in rects ] # sort if detector_state.sort_by == FaceSortBy.LARGEST: - rects = FaceURect.sort_by_area_size(rects) + rects = FRect.sort_by_area_size(rects) elif detector_state.sort_by == FaceSortBy.DIST_FROM_CENTER: - rects = FaceURect.sort_by_dist_from_center(rects) + rects = FRect.sort_by_dist_from_center(rects) if len(rects) != 0: max_faces = detector_state.max_faces @@ -245,20 +245,20 @@ class FaceDetectorWorker(BackendWorker): if len(self.temporal_rects) != len(rects): self.temporal_rects = [ [] for _ in range(len(rects)) ] - for face_id, face_rect in enumerate(rects): + for face_id, face_urect in enumerate(rects): if detector_state.temporal_smoothing != 1: if not is_frame_reemitted or len(self.temporal_rects[face_id]) == 0: - self.temporal_rects[face_id].append( face_rect.as_4pts() ) + self.temporal_rects[face_id].append( face_urect.as_4pts() ) self.temporal_rects[face_id] = self.temporal_rects[face_id][-detector_state.temporal_smoothing:] - face_rect = FaceURect.from_4pts ( np.mean(self.temporal_rects[face_id],0 ) ) + face_urect = FRect.from_4pts ( np.mean(self.temporal_rects[face_id],0 ) ) - if face_rect.get_area() != 0: - face_mark = FaceMark() - face_mark.set_image_name(frame_name) - face_mark.set_face_urect ( face_rect ) - bcd.add_face_mark(face_mark) + if face_urect.get_area() != 0: + fsi = BackendFaceSwapInfo() + fsi.image_name = frame_image_name + fsi.face_urect = face_urect + bcd.add_face_swap_info(fsi) self.stop_profile_timing() self.pending_bcd = bcd diff --git a/apps/DeepFaceLive/backend/FaceMarker.py b/apps/DeepFaceLive/backend/FaceMarker.py index 5479c82..974b9bd 100644 --- a/apps/DeepFaceLive/backend/FaceMarker.py +++ b/apps/DeepFaceLive/backend/FaceMarker.py @@ -5,7 +5,7 @@ from modelhub import onnx as onnx_models from modelhub import cv as cv_models from xlib import os as lib_os -from xlib.facemeta import FaceULandmarks, FacePose +from xlib.facemeta import ELandmarks2D, FLandmarks2D, FPose from xlib.image import ImageProcessor from xlib.mp import csw as lib_csw from xlib.python import all_is_not_None @@ -153,20 +153,18 @@ class FaceMarkerWorker(BackendWorker): is_google_facemesh = marker_type == MarkerType.GOOGLE_FACEMESH and self.google_facemesh is not None if marker_type is not None: - frame_name = bcd.get_frame_name() - frame_image = bcd.get_image(frame_name) + frame_image = bcd.get_image(bcd.get_frame_image_name()) - if all_is_not_None(frame_image) and (is_opencv_lbf or is_google_facemesh): - face_mark_list = bcd.get_face_mark_list() + if frame_image is not None and (is_opencv_lbf or is_google_facemesh): + fsi_list = bcd.get_face_swap_info_list() if marker_state.temporal_smoothing != 1 and \ - len(self.temporal_lmrks) != len(face_mark_list): - self.temporal_lmrks = [ [] for _ in range(len(face_mark_list)) ] + len(self.temporal_lmrks) != len(fsi_list): + self.temporal_lmrks = [ [] for _ in range(len(fsi_list)) ] - for face_id, face_mark in enumerate(face_mark_list): - face_mark_rect = face_mark.get_face_urect() - if face_mark_rect is not None: + for face_id, fsi in enumerate(fsi_list): + if fsi.face_urect is not None: # Cut the face to feed to the face marker - face_image, face_uni_mat = face_mark_rect.cut(frame_image, marker_state.marker_coverage, 256 if is_opencv_lbf else \ + face_image, face_uni_mat = fsi.face_urect.cut(frame_image, marker_state.marker_coverage, 256 if is_opencv_lbf else \ 192 if is_google_facemesh else 0 ) _,H,W,_ = ImageProcessor(face_image).get_dims() @@ -183,20 +181,17 @@ class FaceMarkerWorker(BackendWorker): lmrks = np.mean(self.temporal_lmrks[face_id],0 ) if is_google_facemesh: - face_mark.set_face_pose(FacePose.from_3D_468_landmarks(lmrks)) + fsi.face_pose = FPose.from_3D_468_landmarks(lmrks) if is_opencv_lbf: lmrks /= (W,H) elif is_google_facemesh: lmrks = lmrks[...,0:2] / (W,H) - face_ulmrks = FaceULandmarks.create (FaceULandmarks.Type.LANDMARKS_68 if is_opencv_lbf else \ - FaceULandmarks.Type.LANDMARKS_468 if is_google_facemesh else None, lmrks) - + face_ulmrks = FLandmarks2D.create (ELandmarks2D.L68 if is_opencv_lbf else \ + ELandmarks2D.L468 if is_google_facemesh else None, lmrks) face_ulmrks = face_ulmrks.transform(face_uni_mat, invert=True) - face_mark.add_face_ulandmarks (face_ulmrks) - - + fsi.face_ulmrks = face_ulmrks self.stop_profile_timing() self.pending_bcd = bcd diff --git a/apps/DeepFaceLive/backend/FaceMerger.py b/apps/DeepFaceLive/backend/FaceMerger.py index 2e77bee..002c7a9 100644 --- a/apps/DeepFaceLive/backend/FaceMerger.py +++ b/apps/DeepFaceLive/backend/FaceMerger.py @@ -255,44 +255,35 @@ class FaceMergerWorker(BackendWorker): if bcd is not None: bcd.assign_weak_heap(self.weak_heap) - frame_name = bcd.get_frame_name() - frame_image = bcd.get_image(frame_name) + frame_image_name = bcd.get_frame_image_name() + frame_image = bcd.get_image(frame_image_name) if frame_image is not None: - for face_mark in bcd.get_face_mark_list(): - face_align = face_mark.get_face_align() - if face_align is not None: - face_swap = face_align.get_face_swap() - face_align_mask = face_align.get_face_mask() + for fsi in bcd.get_face_swap_info_list(): + face_align_img_shape, _ = bcd.get_image_shape_dtype(fsi.face_align_image_name) + face_align_mask_img = bcd.get_image(fsi.face_align_mask_name) + face_swap_img = bcd.get_image(fsi.face_swap_image_name) + face_swap_mask_img = bcd.get_image(fsi.face_swap_mask_name) + image_to_align_uni_mat = fsi.image_to_align_uni_mat - if face_swap is not None: - face_swap_mask = face_swap.get_face_mask() - if face_swap_mask is not None: + if all_is_not_None(face_align_img_shape, face_align_mask_img, face_swap_img, face_swap_mask_img, image_to_align_uni_mat): + face_height, face_width = face_align_img_shape[:2] + frame_height, frame_width = frame_image.shape[:2] + aligned_to_source_uni_mat = image_to_align_uni_mat.invert() + aligned_to_source_uni_mat = aligned_to_source_uni_mat.source_translated(-state.face_x_offset, -state.face_y_offset) + aligned_to_source_uni_mat = aligned_to_source_uni_mat.source_scaled_around_center(state.face_scale,state.face_scale) + aligned_to_source_uni_mat = aligned_to_source_uni_mat.to_exact_mat (face_width, face_height, frame_width, frame_height) - face_align_img_shape, _ = bcd.get_image_shape_dtype(face_align.get_image_name()) - face_align_mask_img = bcd.get_image(face_align_mask.get_image_name()) - face_swap_img = bcd.get_image(face_swap.get_image_name()) - face_swap_mask_img = bcd.get_image(face_swap_mask.get_image_name()) - source_to_aligned_uni_mat = face_align.get_source_to_aligned_uni_mat() + if state.device == 'CPU': + merged_frame = self._merge_on_cpu(frame_image, face_align_mask_img, face_swap_img, face_swap_mask_img, aligned_to_source_uni_mat, frame_width, frame_height ) + else: + merged_frame = self._merge_on_gpu(frame_image, face_align_mask_img, face_swap_img, face_swap_mask_img, aligned_to_source_uni_mat, frame_width, frame_height ) + # keep image in float32 in order not to extra load FaceMerger - if all_is_not_None(face_align_img_shape, face_align_mask_img, face_swap_img, face_swap_mask_img): - face_height, face_width = face_align_img_shape[:2] - frame_height, frame_width = frame_image.shape[:2] - aligned_to_source_uni_mat = source_to_aligned_uni_mat.invert() - aligned_to_source_uni_mat = aligned_to_source_uni_mat.source_translated(-state.face_x_offset, -state.face_y_offset) - aligned_to_source_uni_mat = aligned_to_source_uni_mat.source_scaled_around_center(state.face_scale,state.face_scale) - aligned_to_source_uni_mat = aligned_to_source_uni_mat.to_exact_mat (face_width, face_height, frame_width, frame_height) - - if state.device == 'CPU': - merged_frame = self._merge_on_cpu(frame_image, face_align_mask_img, face_swap_img, face_swap_mask_img, aligned_to_source_uni_mat, frame_width, frame_height ) - else: - merged_frame = self._merge_on_gpu(frame_image, face_align_mask_img, face_swap_img, face_swap_mask_img, aligned_to_source_uni_mat, frame_width, frame_height ) - # keep image in float32 in order not to extra load FaceMerger - - merged_frame_name = f'{frame_name}_merged' - bcd.set_merged_frame_name(merged_frame_name) - bcd.set_image(merged_frame_name, merged_frame) - break + merged_image_name = f'{frame_image_name}_merged' + bcd.set_merged_image_name(merged_image_name) + bcd.set_image(merged_image_name, merged_frame) + break self.stop_profile_timing() self.pending_bcd = bcd diff --git a/apps/DeepFaceLive/backend/FaceSwapper.py b/apps/DeepFaceLive/backend/FaceSwapper.py index daad718..cfef9dc 100644 --- a/apps/DeepFaceLive/backend/FaceSwapper.py +++ b/apps/DeepFaceLive/backend/FaceSwapper.py @@ -4,7 +4,6 @@ from pathlib import Path import numpy as np from modelhub import DFLive from xlib import os as lib_os -from xlib.facemeta import FaceMask, FaceSwap from xlib.image.ImageProcessor import ImageProcessor from xlib.mp import csw as lib_csw from xlib.python import all_is_not_None @@ -171,7 +170,7 @@ class FaceSwapperWorker(BackendWorker): cs.model_dl_progress.enable() cs.model_dl_progress.set_config( lib_csw.Progress.Config(title='@FaceSwapper.downloading_model') ) cs.model_dl_progress.set_progress(0) - + elif events.new_status_initialized: self.dfm_model = events.dfm_model self.dfm_model_initializer = None @@ -239,47 +238,39 @@ class FaceSwapperWorker(BackendWorker): if all_is_not_None(dfm_model, model_state): face_id = model_state.face_id if face_id is not None: - for i, face_mark in enumerate(bcd.get_face_mark_list()): - if face_id == i: - face_align = face_mark.get_face_align() - if face_align is not None: - face_align_image_name = face_align.get_image_name() - face_align_image = bcd.get_image(face_align_image_name) - if face_align_image is not None: + for i, fsi in enumerate(bcd.get_face_swap_info_list()): + if face_id != i: + continue - pre_gamma_red = model_state.pre_gamma_red - pre_gamma_green = model_state.pre_gamma_green - pre_gamma_blue = model_state.pre_gamma_blue + face_align_image = bcd.get_image(fsi.face_align_image_name) + if face_align_image is not None: - fai_ip = ImageProcessor(face_align_image) - if model_state.presharpen_amount != 0: - fai_ip.sharpen(factor=model_state.presharpen_amount) + pre_gamma_red = model_state.pre_gamma_red + pre_gamma_green = model_state.pre_gamma_green + pre_gamma_blue = model_state.pre_gamma_blue - if pre_gamma_red != 1.0 or pre_gamma_green != 1.0 or pre_gamma_blue != 1.0: - fai_ip.adjust_gamma(pre_gamma_red, pre_gamma_green, pre_gamma_blue) - face_align_image = fai_ip.get_image('HWC') + fai_ip = ImageProcessor(face_align_image) + if model_state.presharpen_amount != 0: + fai_ip.sharpen(factor=model_state.presharpen_amount) - celeb_face, celeb_face_mask_img, face_align_mask_img = dfm_model.convert(face_align_image, morph_factor=model_state.morph_factor) - celeb_face, celeb_face_mask_img, face_align_mask_img = celeb_face[0], celeb_face_mask_img[0], face_align_mask_img[0] + if pre_gamma_red != 1.0 or pre_gamma_green != 1.0 or pre_gamma_blue != 1.0: + fai_ip.adjust_gamma(pre_gamma_red, pre_gamma_green, pre_gamma_blue) + face_align_image = fai_ip.get_image('HWC') - if model_state.two_pass: - celeb_face, celeb_face_mask_img, _ = dfm_model.convert(celeb_face, morph_factor=model_state.morph_factor) - celeb_face, celeb_face_mask_img = celeb_face[0], celeb_face_mask_img[0] + celeb_face, celeb_face_mask_img, face_align_mask_img = dfm_model.convert(face_align_image, morph_factor=model_state.morph_factor) + celeb_face, celeb_face_mask_img, face_align_mask_img = celeb_face[0], celeb_face_mask_img[0], face_align_mask_img[0] - face_align_mask = FaceMask() - face_align_mask.set_image_name(f'{face_align_image_name}_mask') - face_align.set_face_mask(face_align_mask) - bcd.set_image(face_align_mask.get_image_name(), face_align_mask_img) + if model_state.two_pass: + celeb_face, celeb_face_mask_img, _ = dfm_model.convert(celeb_face, morph_factor=model_state.morph_factor) + celeb_face, celeb_face_mask_img = celeb_face[0], celeb_face_mask_img[0] - face_swap = FaceSwap() - face_swap.set_image_name (f"{face_align_image_name}_swapped") - face_align.set_face_swap(face_swap) - bcd.set_image(face_swap.get_image_name(), celeb_face) + fsi.face_align_mask_name = f'{fsi.face_align_image_name}_mask' + fsi.face_swap_image_name = f'{fsi.face_align_image_name}_swapped' + fsi.face_swap_mask_name = f'{fsi.face_swap_image_name}_mask' - face_swap_mask = FaceMask() - face_swap_mask.set_image_name(f'{face_swap.get_image_name()}_mask') - face_swap.set_face_mask(face_swap_mask) - bcd.set_image(face_swap_mask.get_image_name(), celeb_face_mask_img) + bcd.set_image(fsi.face_align_mask_name, face_align_mask_img) + bcd.set_image(fsi.face_swap_image_name, celeb_face) + bcd.set_image(fsi.face_swap_mask_name, celeb_face_mask_img) self.stop_profile_timing() self.pending_bcd = bcd diff --git a/apps/DeepFaceLive/backend/FileSource.py b/apps/DeepFaceLive/backend/FileSource.py index f6c116d..614bd58 100644 --- a/apps/DeepFaceLive/backend/FileSource.py +++ b/apps/DeepFaceLive/backend/FileSource.py @@ -287,7 +287,7 @@ class FileSourceWorker(BackendWorker): bcd.set_frame_num(p_frame.frame_num) bcd.set_frame_fps(p_frame.fps) bcd.set_frame_timestamp(p_frame.timestamp) - bcd.set_frame_name(p_frame.name) + bcd.set_frame_image_name(p_frame.name) image = ImageProcessor(p_frame.image).to_uint8().get_image('HWC') bcd.set_image(p_frame.name, image) diff --git a/apps/DeepFaceLive/backend/FrameAdjuster.py b/apps/DeepFaceLive/backend/FrameAdjuster.py index 9b1b08d..2edebc9 100644 --- a/apps/DeepFaceLive/backend/FrameAdjuster.py +++ b/apps/DeepFaceLive/backend/FrameAdjuster.py @@ -72,8 +72,8 @@ class FrameAdjusterWorker(BackendWorker): if bcd is not None: bcd.assign_weak_heap(self.weak_heap) - frame_name = bcd.get_frame_name() - frame_image = bcd.get_image(frame_name) + frame_image_name = bcd.get_frame_image_name() + frame_image = bcd.get_image(frame_image_name) if frame_image is not None: frame_image_ip = ImageProcessor(frame_image) @@ -81,7 +81,7 @@ class FrameAdjusterWorker(BackendWorker): frame_image_ip.degrade_resize( state.degrade_bicubic_per / 100.0, interpolation=ImageProcessor.Interpolation.CUBIC) frame_image = frame_image_ip.get_image('HWC') - bcd.set_image(frame_name, frame_image) + bcd.set_image(frame_image_name, frame_image) self.stop_profile_timing() self.pending_bcd = bcd diff --git a/apps/DeepFaceLive/backend/StreamOutput.py b/apps/DeepFaceLive/backend/StreamOutput.py index 744cb7f..cf45b8e 100644 --- a/apps/DeepFaceLive/backend/StreamOutput.py +++ b/apps/DeepFaceLive/backend/StreamOutput.py @@ -202,31 +202,26 @@ class StreamOutputWorker(BackendWorker): view_image = None if source_type == SourceType.SOURCE_FRAME: - view_image = bcd.get_image(bcd.get_frame_name()) + view_image = bcd.get_image(bcd.get_frame_image_name()) elif source_type == SourceType.MERGED_FRAME: - view_image = bcd.get_image(bcd.get_merged_frame_name()) + view_image = bcd.get_image(bcd.get_merged_image_name()) elif source_type == SourceType.ALIGNED_FACE: aligned_face_id = state.aligned_face_id - for i, face_mark in enumerate(bcd.get_face_mark_list()): + for i, fsi in enumerate(bcd.get_face_swap_info_list()): if aligned_face_id == i: - face_align = face_mark.get_face_align() - if face_align is not None: - view_image = bcd.get_image(face_align.get_image_name()) + view_image = bcd.get_image(fsi.face_align_image_name) break elif source_type == SourceType.SWAPPED_FACE: - for face_mark in bcd.get_face_mark_list(): - face_align = face_mark.get_face_align() - if face_align is not None: - face_swap = face_align.get_face_swap() - if face_swap is not None: - view_image = bcd.get_image(face_swap.get_image_name()) - break + for fsi in bcd.get_face_swap_info_list(): + view_image = bcd.get_image(fsi.face_swap_image_name) + if view_image is not None: + break elif source_type == SourceType.SOURCE_N_MERGED_FRAME: - source_frame = bcd.get_image(bcd.get_frame_name()) - merged_frame = bcd.get_image(bcd.get_merged_frame_name()) + source_frame = bcd.get_image(bcd.get_frame_image_name()) + merged_frame = bcd.get_image(bcd.get_merged_image_name()) if source_frame is not None and merged_frame is not None: source_frame = ImageProcessor(source_frame).to_ufloat32().get_image('HWC') view_image = np.concatenate( (source_frame, merged_frame), 1 ) diff --git a/apps/DeepFaceLive/ui/QFaceDetector.py b/apps/DeepFaceLive/ui/QFaceDetector.py index 330e326..030b6f7 100644 --- a/apps/DeepFaceLive/ui/QFaceDetector.py +++ b/apps/DeepFaceLive/ui/QFaceDetector.py @@ -92,20 +92,18 @@ class QFaceDetector(QBackendPanel): if bcd is not None: bcd.assign_weak_heap(self._weak_heap) - frame_name = bcd.get_frame_name() - frame_image = bcd.get_image(frame_name) + frame_image = bcd.get_image(bcd.get_frame_image_name()) frame_image_w_h = None if frame_image is not None: h,w = frame_image.shape[0:2] frame_image_w_h = (w,h) info = [] - for face_num,face_mark in enumerate(bcd.get_face_mark_list()): - info_str = f'{face_num}: ' - - rect = face_mark.get_face_urect() - if rect is not None: - l,t,r,b = rect.as_ltrb_bbox(frame_image_w_h).astype(np.int) + for face_id, fsi in enumerate(bcd.get_face_swap_info_list()): + info_str = f'{face_id}: ' + + if fsi.face_urect is not None: + l,t,r,b = fsi.face_urect.as_ltrb_bbox(frame_image_w_h).astype(np.int) info_str += f'[{l},{t},{r},{b}]' info.append(info_str) diff --git a/apps/DeepFaceLive/ui/widgets/QBCFaceAlignViewer.py b/apps/DeepFaceLive/ui/widgets/QBCFaceAlignViewer.py index 6b59298..0323a47 100644 --- a/apps/DeepFaceLive/ui/widgets/QBCFaceAlignViewer.py +++ b/apps/DeepFaceLive/ui/widgets/QBCFaceAlignViewer.py @@ -5,8 +5,6 @@ from PyQt6.QtGui import * from PyQt6.QtWidgets import * from resources.fonts import QXFontDB from xlib import qt as lib_qt -from xlib.facemeta import FaceULandmarks -from xlib.python import all_is_not_None from ... import backend @@ -47,36 +45,25 @@ class QBCFaceAlignViewer(lib_qt.QXCollapsibleSection): self._layered_images.clear_images() - for face_mark in bcd.get_face_mark_list(): - face_align = face_mark.get_face_align() - if face_align is not None: - face_image = bcd.get_image (face_align.get_image_name()) - if face_image is not None: - source_to_aligned_uni_mat = face_align.get_source_to_aligned_uni_mat() + for fsi in bcd.get_face_swap_info_list(): + face_image = bcd.get_image (fsi.face_align_image_name) + if face_image is not None: + h,w = face_image.shape[:2] + self._layered_images.add_image(face_image) - h,w = face_image.shape[:2] - self._layered_images.add_image(face_image) + if fsi.face_align_ulmrks is not None: + lmrks_layer = np.zeros( (self._preview_width, self._preview_width, 4), dtype=np.uint8) - face_ulmrks = face_align.get_face_ulandmarks_by_type(FaceULandmarks.Type.LANDMARKS_468) - if face_ulmrks is None: - face_ulmrks = face_align.get_face_ulandmarks_by_type(FaceULandmarks.Type.LANDMARKS_68) + fsi.face_align_ulmrks.draw(lmrks_layer, (0,255,0,255)) - if face_ulmrks is not None: - lmrks_layer = np.zeros( (self._preview_width, self._preview_width, 4), dtype=np.uint8) + if fsi.face_urect is not None and fsi.image_to_align_uni_mat is not None: + aligned_uni_rect = fsi.face_urect.transform(fsi.image_to_align_uni_mat) + aligned_uni_rect.draw(lmrks_layer, (0,0,255,255) ) - face_ulmrks.draw(lmrks_layer, (0,255,0,255)) + self._layered_images.add_image(lmrks_layer) - face_mark_rect = face_mark.get_face_urect() - if face_mark_rect is not None: - aligned_uni_rect = face_mark_rect.transform(source_to_aligned_uni_mat) - aligned_uni_rect.draw(lmrks_layer, (0,0,255,255) ) - - self._layered_images.add_image(lmrks_layer) - - - self._info_label.setText(f'{w}x{h}') - - return + self._info_label.setText(f'{w}x{h}') + return def clear(self): diff --git a/apps/DeepFaceLive/ui/widgets/QBCFaceSwapViewer.py b/apps/DeepFaceLive/ui/widgets/QBCFaceSwapViewer.py index f7cdec4..096904b 100644 --- a/apps/DeepFaceLive/ui/widgets/QBCFaceSwapViewer.py +++ b/apps/DeepFaceLive/ui/widgets/QBCFaceSwapViewer.py @@ -45,17 +45,13 @@ class QBCFaceSwapViewer(lib_qt.QXCollapsibleSection): self._layered_images.clear_images() - for face_mark in bcd.get_face_mark_list(): - face_align = face_mark.get_face_align() - if face_align is not None: - face_swap = face_align.get_face_swap() - if face_swap is not None: - face_swap_image = bcd.get_image(face_swap.get_image_name()) - if face_swap_image is not None: - self._layered_images.add_image(face_swap_image) - h,w = face_swap_image.shape[0:2] - self._info_label.setText(f'{w}x{h}') - return + for fsi in bcd.get_face_swap_info_list(): + face_swap_image = bcd.get_image(fsi.face_swap_image_name) + if face_swap_image is not None: + self._layered_images.add_image(face_swap_image) + h,w = face_swap_image.shape[0:2] + self._info_label.setText(f'{w}x{h}') + return def clear(self): self._layered_images.clear_images() diff --git a/apps/DeepFaceLive/ui/widgets/QBCFinalFrameViewer.py b/apps/DeepFaceLive/ui/widgets/QBCFinalFrameViewer.py index bf4606b..0e8765d 100644 --- a/apps/DeepFaceLive/ui/widgets/QBCFinalFrameViewer.py +++ b/apps/DeepFaceLive/ui/widgets/QBCFinalFrameViewer.py @@ -43,16 +43,16 @@ class QBCFinalFrameViewer(lib_qt.QXCollapsibleSection): self._layered_images.clear_images() - merged_frame_name = bcd.get_merged_frame_name() - merged_frame_image = bcd.get_image(merged_frame_name) + merged_image_name = bcd.get_merged_image_name() + merged_image = bcd.get_image(merged_image_name) - if merged_frame_image is not None: - if merged_frame_image.dtype != np.uint8: - merged_frame_image = lib_image.ImageProcessor(merged_frame_image).to_uint8().get_image('HWC') + if merged_image is not None: + if merged_image.dtype != np.uint8: + merged_image = lib_image.ImageProcessor(merged_image).to_uint8().get_image('HWC') - self._layered_images.add_image(merged_frame_image) - h,w = merged_frame_image.shape[0:2] - self._info_label.setText(f'{merged_frame_name} {w}x{h}') + self._layered_images.add_image(merged_image) + h,w = merged_image.shape[0:2] + self._info_label.setText(f'{merged_image_name} {w}x{h}') def clear(self): diff --git a/apps/DeepFaceLive/ui/widgets/QBCFrameViewer.py b/apps/DeepFaceLive/ui/widgets/QBCFrameViewer.py index 847f0d5..4357496 100644 --- a/apps/DeepFaceLive/ui/widgets/QBCFrameViewer.py +++ b/apps/DeepFaceLive/ui/widgets/QBCFrameViewer.py @@ -42,16 +42,13 @@ class QBCFrameViewer(lib_qt.QXCollapsibleSection): self._layered_images.clear_images() - frame_name = bcd.get_frame_name() - frame_image = bcd.get_image(frame_name) + frame_image_name = bcd.get_frame_image_name() + frame_image = bcd.get_image(frame_image_name) if frame_image is not None: self._layered_images.add_image (frame_image) h,w = frame_image.shape[:2] - if frame_name is not None: - self._info_label.setText(f'{frame_name} {w}x{h}') - else: - self._info_label.setText(f'{w}x{h}') + self._info_label.setText(f'{frame_image_name} {w}x{h}') def clear(self): diff --git a/scripts/dev.py b/scripts/dev.py index f34255e..4b18163 100644 --- a/scripts/dev.py +++ b/scripts/dev.py @@ -1,20 +1,213 @@ from pathlib import Path + +import numpy as np +from xlib import console as lib_con +from xlib import facemeta as lib_fm +from xlib import path as lib_path from xlib.file import SplittedFile +from xlib import cv as lib_cv +import cv2 repo_root = Path(__file__).parent.parent -large_files_list = [ (repo_root / 'modelhub' / 'onnx' / 'S3FD' / 'S3FD.onnx', 48*1024*1024), - (repo_root / 'modelhub' / 'torch' / 'S3FD' / 'S3FD.pth', 48*1024*1024), - (repo_root / 'modelhub' / 'cv' / 'FaceMarkerLBF' / 'lbfmodel.yaml', 34*1024*1024), +large_files_list = [ (repo_root / 'modelhub' / 'onnx' / 'S3FD' / 'S3FD.onnx', 48*1024*1024), + (repo_root / 'modelhub' / 'torch' / 'S3FD' / 'S3FD.pth', 48*1024*1024), + (repo_root / 'modelhub' / 'cv' / 'FaceMarkerLBF' / 'lbfmodel.yaml', 34*1024*1024), ] - + def merge_large_files(delete_parts=False): for filepath, _ in large_files_list: print(f'Merging {filepath}...') SplittedFile.merge(filepath, delete_parts=delete_parts) print('Done') - + def split_large_files(delete_original=False): for filepath, part_size in large_files_list: print(f'Splitting {filepath}...') SplittedFile.split(filepath, part_size=part_size, delete_original=delete_original) - print('Done') \ No newline at end of file + print('Done') + +def extract_facesynthetics_dataset(input_dir): + """ + extract FaceSynthetics dataset https://github.com/microsoft/FaceSynthetics + + BACKGROUND = 0 + SKIN = 1 + NOSE = 2 + RIGHT_EYE = 3 + LEFT_EYE = 4 + RIGHT_BROW = 5 + LEFT_BROW = 6 + RIGHT_EAR = 7 + LEFT_EAR = 8 + MOUTH_INTERIOR = 9 + TOP_LIP = 10 + BOTTOM_LIP = 11 + NECK = 12 + HAIR = 13 + BEARD = 14 + CLOTHING = 15 + GLASSES = 16 + HEADWEAR = 17 + FACEWEAR = 18 + IGNORE = 255 + """ + input_path = Path(input_dir) + faceset_path = input_path.parent / f'{input_path.name}.dfs' + + # fs = lib_fm.Faceset(output_dbpath) + # for ufm in fs.iter_UFaceMark(): + # uimg = fs.get_UImage_by_uuid( ufm.get_UImage_uuid() ) + # img = uimg.get_image() + + # cv2.imshow('', img) + # cv2.waitKey(0) + + filepaths = lib_path.get_files_paths(input_path)[:100] #TODO + + fs = lib_fm.Faceset(faceset_path) + fs.clear_db() + + + for filepath in lib_con.progress_bar_iterator(filepaths, 'Processing'): + if filepath.suffix == '.txt': + + image_filepath = filepath.parent / f'{filepath.name.split("_")[0]}.png' + if not image_filepath.exists(): + print(f'{image_filepath} does not exist, skipping') + + + img = lib_cv.imread(image_filepath) + H,W,C = img.shape + + lmrks = [] + for lmrk_line in filepath.read_text().split('\n'): + if len(lmrk_line) == 0: + continue + + x, y = lmrk_line.split(' ') + x, y = float(x), float(y) + + lmrks.append( (x,y) ) + + lmrks = np.array(lmrks[:68], np.float32) / (H,W) + + flmrks = lib_fm.FLandmarks2D.create(lib_fm.ELandmarks2D.L68, lmrks) + + uimg = lib_fm.UImage() + uimg.assign_image(img) + uimg.set_name(image_filepath.stem) + + ufm = lib_fm.UFaceMark() + ufm.set_UImage_uuid(uimg.get_uuid()) + ufm.set_FRect(flmrks.get_FRect()) + ufm.add_FLandmarks2D(flmrks) + + fs.add_UImage(uimg, format='png') + fs.add_UFaceMark(ufm) + + fs.shrink() + fs.close() + + import code + code.interact(local=dict(globals(), **locals())) + +# seg_filepath = input_path / ( Path(image_filepath).stem + '_seg.png') +# if not seg_filepath.exists(): +# raise ValueError(f'{seg_filepath} does not exist') + +# erode_kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3,3)) +# img_seg = lib_cv.imread(seg_filepath) +# if img_seg.ndim == 2: +# img_seg = img_seg[...,None] +# if img_seg.shape[-1] != 1: +# raise Exception(f'{seg_filepath} wrong mask file. Must be 1 channel.') + +# seg_hair = img_seg.copy() + +# seg_hair_inds = np.isin(img_seg, [13]) +# seg_hair[~seg_hair_inds] = 0 +# seg_hair[seg_hair_inds] = 255 + +# cv2.imshow('', img) +# cv2.waitKey(0) + +# cv2.imshow('', seg_hair) +# cv2.waitKey(0) + +# # fix 1-3 pix hair holes +# seg_hair = cv2.dilate(seg_hair, erode_kernel, iterations=3) +# seg_hair = cv2.erode(seg_hair, erode_kernel, iterations=3) + +# cv2.imshow('', seg_hair) +# cv2.waitKey(0) +# img_seg_inds = np.isin(img_seg, [1,2,3,4,5,6,9,10,11,14]) +# img_seg[~img_seg_inds] = 0 +# img_seg[img_seg_inds] = 255 +# import numpy as np +# import cv2 +# +# from xlib import math as lib_math + +# fs1 = lib_fm.Faceset(r'D:\\1.dfs') +# fs1.clear_db() + +# uimg = lib_fm.UImage() +# uimg.assign_image( np.random.uniform(0, 255, size=(128,128,1) ).astype(np.uint8) ) + +# fs1.add_UImage(uimg, format='jp2', quality=30) + +# uimg.assign_image( np.ones( shape=(128,128,1) ).astype(np.uint8) ) + +# #fs1.add_UImage(uimg, format='jp2', quality=30) + + +# up = lib_fm.UPerson() +# up.set_name('Man') +# up.set_age(13) + +# fs1.add_UPerson(up) + +# ufm = lib_fm.UFaceMark() +# ufm.set_UPerson_uuid(up.get_uuid()) +# ufm.set_UImage_uuid(uimg.get_uuid()) +# ufm.add_mask_info( lib_fm.EMaskType.UNDEFINED, uimg.get_uuid(), lib_math.Affine2DUniMat.identity() ) + +# fs1.add_UFaceMark(ufm) + +# fs1.close() + + +# fs = lib_fm.Faceset(r'D:\\1.dfs') +# for uperson in fs.iter_UPerson(): +# print(uperson) + +# for ufm in fs.iter_UFaceMark(): +# print(ufm) + +# for uimg in fs.iter_UImage(): +# cv2.imshow('', uimg.get_image()) +# cv2.waitKey(0) +# print(uimg) + +# import code +# code.interact(local=dict(globals(), **locals())) + + +# uimg2 = lib_fm.UImage() +# uimg2.assign_image( np.random.uniform(0, 255, size=(128,128,1) ).astype(np.uint8) ) + +# ufm = lib_fm.UFaceMark() +# ufm.set_UImage_uuid(uimg.get_uuid()) + + +# fs.add_UFaceMark(ufm) + +# fs.add_UImage(uimg, format='jp2', quality=30) +# fs.add_UImage(uimg2, format='jpg', quality=10) + +# #print(fs.get_fimage_count()) + +# print( fs.get_UFaceMark_count() ) +# import code +# code.interact(local=dict(globals(), **locals())) + diff --git a/xlib/facemeta/ELandmarks2D.py b/xlib/facemeta/ELandmarks2D.py new file mode 100644 index 0000000..3f749ed --- /dev/null +++ b/xlib/facemeta/ELandmarks2D.py @@ -0,0 +1,6 @@ +from enum import IntEnum + +class ELandmarks2D(IntEnum): + L5 = 0 + L68 = 1 + L468 = 2 \ No newline at end of file diff --git a/xlib/facemeta/EMaskType.py b/xlib/facemeta/EMaskType.py new file mode 100644 index 0000000..b0f120a --- /dev/null +++ b/xlib/facemeta/EMaskType.py @@ -0,0 +1,4 @@ +from enum import IntEnum + +class EMaskType(IntEnum): + UNDEFINED = 0 \ No newline at end of file diff --git a/xlib/facemeta/FaceULandmarks.py b/xlib/facemeta/FLandmarks2D.py similarity index 97% rename from xlib/facemeta/FaceULandmarks.py rename to xlib/facemeta/FLandmarks2D.py index fccb929..1c6d9ca 100644 --- a/xlib/facemeta/FaceULandmarks.py +++ b/xlib/facemeta/FLandmarks2D.py @@ -1,24 +1,20 @@ -from enum import IntEnum from typing import Tuple import cv2 import numpy as np import numpy.linalg as npla + from ..math import Affine2DMat, Affine2DUniMat +from .ELandmarks2D import ELandmarks2D +from .FRect import FRect -class FaceULandmarks: - """ - Describes 2D face landmarks in uniform coordinates - """ - - class Type(IntEnum): - LANDMARKS_5 = 0 - LANDMARKS_68 = 1 - LANDMARKS_468 = 2 - +class FLandmarks2D: def __init__(self): - self._type : FaceULandmarks.Type = None + """ + Describes 2D face landmarks in uniform float coordinates + """ + self._type : ELandmarks2D = None self._ulmrks : np.ndarray = None def __getstate__(self): @@ -29,14 +25,13 @@ class FaceULandmarks: self.__dict__.update(d) @staticmethod - def create( type : 'FaceULandmarks.Type', ulmrks : np.ndarray): + def create( type : ELandmarks2D, ulmrks : np.ndarray): """ - ulmrks np.ndarray (*,2|3) """ - if not isinstance(type, FaceULandmarks.Type): - raise ValueError('type must be an FaceULandmarks.Type') + if not isinstance(type, ELandmarks2D): + raise ValueError('type must be ELandmarks2D') ulmrks = np.float32(ulmrks) if len(ulmrks.shape) != 2: @@ -46,22 +41,22 @@ class FaceULandmarks: raise ValueError('ulmrks dim must be == 2') ulmrks_count = ulmrks.shape[0] - if type == FaceULandmarks.Type.LANDMARKS_5: + if type == ELandmarks2D.L5: if ulmrks_count != 5: raise ValueError('ulmrks_count must be == 5') - elif type == FaceULandmarks.Type.LANDMARKS_68: + elif type == ELandmarks2D.L68: if ulmrks_count != 68: raise ValueError('ulmrks_count must be == 68') - elif type == FaceULandmarks.Type.LANDMARKS_468: + elif type == ELandmarks2D.L468: if ulmrks_count != 468: raise ValueError('ulmrks_count must be == 468') - face_ulmrks = FaceULandmarks() + face_ulmrks = FLandmarks2D() face_ulmrks._type = type face_ulmrks._ulmrks = ulmrks return face_ulmrks - def get_type(self) -> 'FaceULandmarks.Type': return self._type + def get_type(self) -> ELandmarks2D: return self._type def get_count(self) -> int: return self._ulmrks.shape[0] def as_numpy(self, w_h = None): @@ -76,9 +71,9 @@ class FaceULandmarks: return ulmrks - def transform(self, mat, invert=False) -> 'FaceULandmarks': + def transform(self, mat, invert=False) -> 'FLandmarks2D': """ - Tranforms FaceULandmarks using affine mat and returns new FaceULandmarks() + Tranforms FLandmarks2D using affine mat and returns new FLandmarks2D() mat : np.ndarray """ @@ -93,9 +88,20 @@ class FaceULandmarks: ulmrks = np.expand_dims(ulmrks, axis=1) ulmrks = cv2.transform(ulmrks, mat, ulmrks.shape).squeeze() - return FaceULandmarks.create(type=self._type, ulmrks=ulmrks) - + return FLandmarks2D.create(type=self._type, ulmrks=ulmrks) + def get_FRect(self, coverage=1.6) -> FRect: + """ + create FRect from landmarks with given coverage + """ + _, uni_mat = self.calc_cut( (1,1), coverage, 1, exclude_moving_parts=False) + xlt, xlb, xrb, xrt = uni_mat.invert().transform_points([[0,0], [0,1], [1,1], [1,0]]) + l = min(xlt[0], xlb[0]) + t = min(xlt[1], xrt[1]) + r = max(xrt[0], xrb[0]) + b = max(xlb[1], xrb[1]) + return FRect.from_ltrb( (l,t,r,b) ) + def calc_cut(self, w_h, coverage : float, output_size : int, exclude_moving_parts : bool, head_yaw : float = None, @@ -112,9 +118,9 @@ class FaceULandmarks: lmrks = (self._ulmrks * w_h).astype(np.float32) # estimate landmarks transform from global space to local aligned space with bounds [0..1] - if type == FaceULandmarks.Type.LANDMARKS_68: + if type == ELandmarks2D.L68: mat = Affine2DMat.umeyama( np.concatenate ([ lmrks[17:49] , lmrks[54:55] ]), uni_landmarks_68) - elif type == FaceULandmarks.Type.LANDMARKS_468: + elif type == ELandmarks2D.L468: src_lmrks = lmrks dst_lmrks = uni_landmarks_468 if exclude_moving_parts: @@ -1297,4 +1303,4 @@ uni_landmarks_468 = np.array( # # time.sleep(1.0) # import code -# code.interact(local=dict(globals(), **locals())) \ No newline at end of file +# code.interact(local=dict(globals(), **locals())) diff --git a/xlib/facemeta/FMask.py b/xlib/facemeta/FMask.py new file mode 100644 index 0000000..5d673f9 --- /dev/null +++ b/xlib/facemeta/FMask.py @@ -0,0 +1,40 @@ +import uuid +from typing import Union +from enum import IntEnum + +class FMask: + class Type(IntEnum): + WHOLE_FACE = 0 + + def __init__(self, _from_pickled=False): + """ + """ + self._uuid : Union[bytes, None] = uuid.uuid4().bytes_le if not _from_pickled else None + self._mask_type : Union[FMask.Type, None] = None + self._FImage_uuid : Union[bytes, None] = None + + def __getstate__(self): + return self.__dict__.copy() + + def __setstate__(self, d): + self.__init__(_from_pickled=True) + self.__dict__.update(d) + + def get_uuid(self) -> Union[bytes, None]: return self._uuid + def set_uuid(self, uuid : Union[bytes, None]): + if uuid is not None and not isinstance(uuid, bytes): + raise ValueError(f'uuid must be an instance of bytes or None') + self._uuid = uuid + + def get_mask_type(self) -> Union['FMask.Type', None]: return self._mask_type + def set_mask_type(self, mask_type : Union['FMask.Type', None]): + if mask_type is not None and not isinstance(mask_type, 'FMask.Type'): + raise ValueError(f'mask_type must be an instance of FMask.Type or None') + self._mask_type = mask_type + + def get_FImage_uuid(self) -> Union[bytes, None]: return self._FImage_uuid + def set_FImage_uuid(self, FImage_uuid : Union[bytes, None]): + if FImage_uuid is not None and not isinstance(FImage_uuid, bytes): + raise ValueError(f'FImage_uuid must be an instance of bytes or None') + self._FImage_uuid = FImage_uuid + \ No newline at end of file diff --git a/xlib/facemeta/FacePose.py b/xlib/facemeta/FPose.py similarity index 92% rename from xlib/facemeta/FacePose.py rename to xlib/facemeta/FPose.py index 780b1c8..7b76c43 100644 --- a/xlib/facemeta/FacePose.py +++ b/xlib/facemeta/FPose.py @@ -3,7 +3,7 @@ import numpy as np from .. import math as lib_math -class FacePose: +class FPose: """ Describes face pitch/yaw/roll """ @@ -33,7 +33,7 @@ class FacePose: def from_radians(pitch, yaw, roll): """ """ - face_rect = FacePose() + face_rect = FPose() face_rect._pyr = np.array([pitch, yaw, roll], np.float32) return face_rect @@ -47,4 +47,4 @@ class FacePose: mat[2,:] = np.cross(mat[0, :], mat[1, :]) pitch, yaw, roll = lib_math.rotation_matrix_to_euler(mat) - return FacePose.from_radians(pitch, yaw, roll) + return FPose.from_radians(pitch, yaw, roll) diff --git a/xlib/facemeta/FaceURect.py b/xlib/facemeta/FRect.py similarity index 89% rename from xlib/facemeta/FaceURect.py rename to xlib/facemeta/FRect.py index 1d966cb..ccc4c60 100644 --- a/xlib/facemeta/FaceURect.py +++ b/xlib/facemeta/FRect.py @@ -8,8 +8,7 @@ import numpy.linalg as npla from .. import math as lib_math from ..math import Affine2DMat, Affine2DUniMat - -class FaceURect: +class FRect: """ Describes face rectangle in uniform float coordinates """ @@ -24,9 +23,9 @@ class FaceURect: self.__dict__.update(d) @staticmethod - def sort_by_area_size(rects : List['FaceURect']): + def sort_by_area_size(rects : List['FRect']): """ - sort list of FaceURect by largest area descend + sort list of FRect by largest area descend """ rects = [ (rect, rect.get_area()) for rect in rects ] rects = sorted(rects, key=operator.itemgetter(1), reverse=True ) @@ -34,9 +33,9 @@ class FaceURect: return rects @staticmethod - def sort_by_dist_from_center(rects : List['FaceURect']): + def sort_by_dist_from_center(rects : List['FRect']): """ - sort list of FaceURect by nearest distance from center to center of rects descent + sort list of FRect by nearest distance from center to center of rects descent """ c = np.float32([0.5,0.5]) @@ -48,7 +47,7 @@ class FaceURect: @staticmethod def from_4pts(pts : Iterable): """ - Construct FaceURect from 4 pts + Construct FRect from 4 pts 0--3 | | 1--2 @@ -60,14 +59,14 @@ class FaceURect: if pts.shape != (4,2): raise ValueError('pts must have (4,2) shape') - face_rect = FaceURect() + face_rect = FRect() face_rect._pts = pts return face_rect @staticmethod def from_ltrb(ltrb : Iterable): """ - Construct FaceURect from l,t,r,b list of float values + Construct FRect from l,t,r,b list of float values t l-|-r b @@ -76,7 +75,7 @@ class FaceURect: raise ValueError('ltrb must be Iterable') l,t,r,b = ltrb - return FaceURect.from_4pts([ [l,t], [l,b], [r,b], [r,t] ]) + return FRect.from_4pts([ [l,t], [l,b], [r,b], [r,t] ]) def get_area(self, w_h = None) -> float: @@ -124,9 +123,9 @@ class FaceURect: return self._pts * w_h return self._pts.copy() - def transform(self, mat, invert=False) -> 'FaceURect': + def transform(self, mat, invert=False) -> 'FRect': """ - Tranforms FaceURect using affine mat and returns new FaceURect() + Tranforms FRect using affine mat and returns new FRect() mat : np.ndarray should be uniform affine mat """ @@ -141,7 +140,7 @@ class FaceURect: pts = np.expand_dims(pts, axis=1) pts = cv2.transform(pts, mat, pts.shape).squeeze() - return FaceURect.from_4pts(pts) + return FRect.from_4pts(pts) def cut(self, img : np.ndarray, coverage : float, output_size : int) -> Tuple[Affine2DMat, Affine2DUniMat]: """ diff --git a/xlib/facemeta/Faceset.py b/xlib/facemeta/Faceset.py index 0fda472..365303b 100644 --- a/xlib/facemeta/Faceset.py +++ b/xlib/facemeta/Faceset.py @@ -1,95 +1,237 @@ +import pickle +import sqlite3 from pathlib import Path -from typing import Tuple +from typing import Generator, List, Union import cv2 -from .. import cv as lib_cv -from .. import path as lib_path -from ..io import FormattedFileIO +import numpy as np -from .face import FaceMark +from .FMask import FMask +from .UFaceMark import UFaceMark +from .UImage import UImage +from .UPerson import UPerson class Faceset: - """ - Faceset is a class to store and manage multiple FaceMark()'s - - - arguments: - - path path to directory - - raises - - Exception, ValueError - """ def __init__(self, path): + """ + Faceset is a class to store and manage face related data. + + arguments: + + path path to faceset .dfs file + """ + self._path = path = Path(path) - if not path.is_dir(): - raise ValueError('Path must be a directory.') - self.reload() + if path.suffix != '.dfs': + raise ValueError('Path must be a .dfs file') - def reload(self): + self._conn = conn = sqlite3.connect(path, isolation_level=None) + self._cur = cur = conn.cursor() + + cur.execute('BEGIN IMMEDIATE') + if not self._is_table_exists('FacesetInfo'): + self.clear_db(transaction=False) + cur.execute('COMMIT') + + def close(self): + self._cur.close() + self._cur = None + self._conn.close() + self._conn = None + + def _is_table_exists(self, name): + return self._cur.execute(f"SELECT count(*) FROM sqlite_master WHERE type='table' AND name=?", [name]).fetchone()[0] != 0 + + def shrink(self): + self._cur.execute('VACUUM') + + def clear_db(self, transaction=True): """ - reload the faceset - - raises - - Exception + delete all data and recreate DB """ - self._is_packed = False + cur = self._cur - filepaths = lib_path.get_files_paths(self._path) + if transaction: + cur.execute('BEGIN IMMEDIATE') - face_filepaths = self._face_filepaths = set() + for table_name, in cur.execute("SELECT name from sqlite_master where type = 'table';").fetchall(): + cur.execute(f'DROP TABLE {table_name}') - for filepath in filepaths: - suffix = filepath.suffix - if suffix == '.face': - if self._is_packed: - raise Exception(f'{self._path} contains .faceset and .face but only one type is allowed.') + (cur.execute('CREATE TABLE FacesetInfo (version INT)') + .execute('INSERT INTO FacesetInfo VALUES (1)') - face_filepaths.add(filepath.name) - elif suffix == '.faceset': - if self._is_packed: - raise Exception(f'{self._path} contains more than one .faceset.') + .execute('CREATE TABLE UImage (uuid BLOB, name TEXT, format TEXT, data BLOB)') + .execute('CREATE TABLE UPerson (uuid BLOB, name TEXT, age NUMERIC)') + .execute('CREATE TABLE UFaceMark (uuid BLOB, UImage_uuid BLOB, UPerson_uuid BLOB, pickled_bytes BLOB)') + ) - if len(face_filepaths) != 0: - raise Exception(f'{self._path} contains .faceset and .face but only one type is allowed.') - self._is_packed = True - - def pack(self): + if transaction: + cur.execute('COMMIT') + + ################### + ### UFaceMark + ################### + def _UFaceMark_from_db_row(self, db_row) -> UFaceMark: + uuid, UImage_uuid, UPerson_uuid, pickled_bytes = db_row + return pickle.loads(pickled_bytes) + + def add_UFaceMark(self, fm : UFaceMark): """ - Pack faceset. + add or update UFaceMark in DB """ + pickled_bytes = pickle.dumps(fm) + uuid = fm.get_uuid() + UImage_uuid = fm.get_UImage_uuid() + UPerson_uuid = fm.get_UPerson_uuid() - def unpack(self): - """ - unpack faceset. - """ - - def is_packed(self) -> bool: return self._is_packed - - def get_face_count(self): return len(self._face_filepaths) - - def get_faces(self) -> Tuple[FaceMark]: - """ - returns a tuple of FaceMark()'s - """ - - def save_face(self, face : FaceMark): - filepath = self._path / (face.get_name() + '.face') - - with FormattedFileIO(filepath, 'w+') as f: - f.write_pickled(face) - - self._face_filepaths.add(filepath.name) - - def save_image(self, name, img, type='jpg'): - filepath = self._path / (name + f'.{type}') - - if type == 'jpg': - lib_cv.imwrite(filepath, img, [int(cv2.IMWRITE_JPEG_QUALITY), 100] ) + cur = self._cur + cur.execute('BEGIN IMMEDIATE') + if cur.execute('SELECT COUNT(*) from UFaceMark where uuid=?', [uuid] ).fetchone()[0] != 0: + cur.execute('UPDATE UFaceMark SET UImage_uuid=?, UPerson_uuid=?, pickled_bytes=? WHERE uuid=?', + [UImage_uuid, UPerson_uuid, pickled_bytes, uuid]) else: - raise ValueError(f'Unknown type {type}') + cur.execute('INSERT INTO UFaceMark VALUES (?, ?, ?, ?)', [uuid, UImage_uuid, UPerson_uuid, pickled_bytes]) + cur.execute('COMMIT') + + def get_UFaceMark_count(self) -> int: + return self._cur.execute('SELECT COUNT(*) FROM UFaceMark').fetchone()[0] + + def get_all_UFaceMark(self) -> List[UFaceMark]: + return [ pickle.loads(pickled_bytes) for pickled_bytes, in self._cur.execute('SELECT pickled_bytes FROM UFaceMark').fetchall() ] + + def iter_UFaceMark(self) -> Generator[UFaceMark, None, None]: + """ + returns Generator of UFaceMark + """ + for db_row in self._cur.execute('SELECT * FROM UFaceMark').fetchall(): + yield self._UFaceMark_from_db_row(db_row) + + def delete_all_UFaceMark(self): + """ + deletes all UFaceMark from DB + """ + (self._cur.execute('BEGIN IMMEDIATE') + .execute('DELETE FROM UFaceMark') + .execute('COMMIT') ) + + ################### + ### UPerson + ################### + def add_UPerson(self, uperson : UPerson): + """ + add or update UPerson in DB + """ + uuid = uperson.get_uuid() + name = uperson.get_name() + age = uperson.get_age() + + cur = self._conn.cursor() + cur.execute('BEGIN IMMEDIATE') + if cur.execute('SELECT COUNT(*) from UPerson where uuid=?', [uuid]).fetchone()[0] != 0: + cur.execute('UPDATE UPerson SET name=?, age=? WHERE uuid=?', [name, age, uuid]) + else: + cur.execute('INSERT INTO UPerson VALUES (?, ?, ?)', [uuid, name, age]) + cur.execute('COMMIT') + cur.close() + + def iter_UPerson(self) -> Generator[UPerson, None, None]: + """ + iterator of all UPerson's + """ + for uuid, name, age in self._cur.execute('SELECT * FROM UPerson').fetchall(): + uperson = UPerson() + uperson.set_uuid(uuid) + uperson.set_name(name) + uperson.set_age(age) + yield uperson + + def delete_all_UPerson(self): + """ + deletes all UPerson from DB + """ + (self._cur.execute('BEGIN IMMEDIATE') + .execute('DELETE FROM UPerson') + .execute('COMMIT') ) + + ################### + ### UImage + ################### + def _UImage_from_db_row(self, db_row) -> UImage: + uuid, name, format, data_bytes = db_row + img = cv2.imdecode(np.frombuffer(data_bytes, dtype=np.uint8), flags=cv2.IMREAD_UNCHANGED) + + uimg = UImage() + uimg.set_uuid(uuid) + uimg.set_name(name) + uimg.assign_image(img) + return uimg + + def add_UImage(self, uimage : UImage, format : str = 'webp', quality : int = 100): + """ + add or update UImage in DB + + uimage UImage object + + format('png') webp ( does not support lossless on 100 quality ! ) + png ( lossless ) + jpg + jp2 ( jpeg2000 ) + + quality(100) 0-100 for formats jpg,jp2,webp + """ + if format not in ['webp','png', 'jpg', 'jp2']: + raise ValueError(f'format {format} is unsupported') + + if format in ['jpg','jp2'] and quality < 0 or quality > 100: + raise ValueError('quality must be in range [0..100]') + + img = uimage.get_image() + uuid = uimage.get_uuid() + + if format == 'webp': + imencode_args = [int(cv2.IMWRITE_WEBP_QUALITY), quality] + elif format == 'jpg': + imencode_args = [int(cv2.IMWRITE_JPEG_QUALITY), quality] + elif format == 'jp2': + imencode_args = [int(cv2.IMWRITE_JPEG2000_COMPRESSION_X1000), quality*10] + else: + imencode_args = [] + + ret, data_bytes = cv2.imencode( f'.{format}', img, imencode_args) + if not ret: + raise Exception(f'Unable to encode image format {format}') + + cur = self._cur + cur.execute('BEGIN IMMEDIATE') + if cur.execute('SELECT COUNT(*) from UImage where uuid=?', [uuid] ).fetchone()[0] != 0: + cur.execute('UPDATE UImage SET name=?, format=?, data=? WHERE uuid=?', [uimage.get_name(), format, data_bytes.data, uuid]) + else: + cur.execute('INSERT INTO UImage VALUES (?, ?, ?, ?)', [uuid, uimage.get_name(), format, data_bytes.data]) + cur.execute('COMMIT') + + def get_UImage_count(self) -> int: return self._cur.execute('SELECT COUNT(*) FROM UImage').fetchone()[0] + def get_UImage_by_uuid(self, uuid : bytes) -> Union[UImage, None]: + """ + """ + db_row = self._cur.execute('SELECT * FROM UImage where uuid=?', [uuid]).fetchone() + if db_row is None: + return None + return self._UImage_from_db_row(db_row) + + def iter_UImage(self) -> Generator[UImage, None, None]: + """ + iterator of all UImage's + """ + for db_row in self._cur.execute('SELECT * FROM UImage').fetchall(): + yield self._UImage_from_db_row(db_row) + + def delete_all_UImage(self): + """ + deletes all UImage from DB + """ + (self._cur.execute('BEGIN IMMEDIATE') + .execute('DELETE FROM UImage') + .execute('COMMIT') ) diff --git a/xlib/facemeta/UFaceMark.py b/xlib/facemeta/UFaceMark.py new file mode 100644 index 0000000..97514d3 --- /dev/null +++ b/xlib/facemeta/UFaceMark.py @@ -0,0 +1,99 @@ +import uuid +from typing import List, Tuple, Union + +from ..math import Affine2DMat +from .ELandmarks2D import ELandmarks2D +from .EMaskType import EMaskType +from .FLandmarks2D import FLandmarks2D +from .FPose import FPose +from .FRect import FRect + + +class UFaceMark: + def __init__(self, _from_pickled=False): + """ + Describes single face in the image. + """ + self._uuid : Union[bytes, None] = uuid.uuid4().bytes_le if not _from_pickled else None + self._UImage_uuid : Union[bytes, None] = None + self._UPerson_uuid : Union[bytes, None] = None + self._FRect : Union[FRect, None] = None + self._FLandmarks2D_list : List[FLandmarks2D] = [] + self._FPose : Union[FPose, None] = None + self._mask_info_list : List = [] + + def __getstate__(self): + return self.__dict__.copy() + + def __setstate__(self, d): + self.__init__(_from_pickled=True) + self.__dict__.update(d) + + def __str__(self): + s = "Masks: " + return f"UFaceMark UUID:[...{self._uuid[-4:].hex()}]" + + def __repr__(self): return self.__str__() + + def get_uuid(self) -> Union[bytes, None]: return self._uuid + def set_uuid(self, uuid : Union[bytes, None]): + if uuid is not None and not isinstance(uuid, bytes): + raise ValueError(f'uuid must be an instance of bytes or None') + self._uuid = uuid + + def get_UImage_uuid(self) -> Union[bytes, None]: return self._UImage_uuid + def set_UImage_uuid(self, UImage_uuid : Union[bytes, None]): + if UImage_uuid is not None and not isinstance(UImage_uuid, bytes): + raise ValueError(f'UImage_uuid must be an instance of bytes or None') + self._UImage_uuid = UImage_uuid + + def get_UPerson_uuid(self) -> Union[bytes, None]: return self._UPerson_uuid + def set_UPerson_uuid(self, UPerson_uuid : Union[bytes, None]): + if UPerson_uuid is not None and not isinstance(UPerson_uuid, bytes): + raise ValueError(f'UPerson_uuid must be an instance of bytes or None') + self._UPerson_uuid = UPerson_uuid + + def get_FRect(self) -> Union['FRect', None]: return self._FRect + def set_FRect(self, face_urect : Union['FRect', None]): + if face_urect is not None and not isinstance(face_urect, FRect): + raise ValueError(f'face_urect must be an instance of FRect or None') + self._FRect = face_urect + + def get_all_FLandmarks2D(self) -> List[FLandmarks2D]: return self._FLandmarks2D_list + def get_FLandmarks2D_by_type(self, type : ELandmarks2D) -> Union[FLandmarks2D, None]: + """get FLandmarks2D from list by type""" + if not isinstance(type, ELandmarks2D): + raise ValueError(f'type must be an instance of ELandmarks2D') + + for ulmrks in self._FLandmarks2D_list: + if ulmrks.get_type() == type: + return ulmrks + return None + + def add_FLandmarks2D(self, flmrks : FLandmarks2D): + if not isinstance(flmrks, FLandmarks2D): + raise ValueError('flmrks must be an instance of FLandmarks2D') + + if self.get_FLandmarks2D_by_type(flmrks.get_type()) is not None: + raise Exception(f'_FLandmarks2D_list already contains type {flmrks.get_type()}.') + + self._FLandmarks2D_list.append(flmrks) + + def get_FPose(self) -> Union[FPose, None]: return self._FPose + def set_FPose(self, face_pose : FPose): + if not isinstance(face_pose, FPose): + raise ValueError('face_pose must be an instance of FPose') + self._FPose = face_pose + + def get_mask_info_list(self) -> List[Tuple[EMaskType, bytes, Affine2DMat]]: + return self._mask_info_list + + def add_mask_info(self, mask_type : EMaskType, UImage_uuid : bytes, mask_to_mark_uni_mat : Affine2DMat): + if not isinstance(mask_type, EMaskType): + raise ValueError('mask_type must be an instance of EMaskType') + if not isinstance(UImage_uuid, bytes): + raise ValueError('UImage_uuid must be an instance of bytes') + if not isinstance(mask_to_mark_uni_mat, Affine2DMat): + raise ValueError('mask_to_mark_uni_mat must be an instance of Affine2DMat') + + self._mask_info_list.append( (mask_type, UImage_uuid, mask_to_mark_uni_mat) ) diff --git a/xlib/facemeta/UImage.py b/xlib/facemeta/UImage.py new file mode 100644 index 0000000..76edb75 --- /dev/null +++ b/xlib/facemeta/UImage.py @@ -0,0 +1,59 @@ +import uuid +from typing import Union + +import numpy as np + +class UImage: + + def __init__(self): + """ + represents uncompressed image uint8 HWC ( 1/3/4 channels ) + """ + self._uuid : Union[bytes, None] = uuid.uuid4().bytes_le + self._name : Union[str, None] = None + self._image : np.ndarray = None + + def __getstate__(self): + return self.__dict__.copy() + + def __setstate__(self, d): + self.__init__() + self.__dict__.update(d) + + def __str__(self): return f"UImage UUID:[...{self._uuid[-4:].hex()}] name:[{self._name}] image:[{ (self._image.shape, self._image.dtype) if self._image is not None else None}]" + def __repr__(self): return self.__str__() + + def get_uuid(self) -> Union[bytes, None]: return self._uuid + def set_uuid(self, uuid : Union[bytes, None]): + if uuid is not None and not isinstance(uuid, bytes): + raise ValueError(f'uuid must be an instance of bytes or None') + self._uuid = uuid + + def get_name(self) -> Union[str, None]: return self._name + def set_name(self, name : Union[str, None]): + if name is not None and not isinstance(name, str): + raise ValueError(f'name must be an instance of str or None') + self._name = name + + def get_image(self) -> Union[np.ndarray, None]: return self._image + def assign_image(self, image : Union[np.ndarray, None]): + """ + assign np.ndarray image , or remove(None) + + It's mean you should not to change provided image nd.array after assigning, or do the copy before. + + Image must be uint8 and HWC 1/3/4 channels. + """ + if image is not None: + if image.ndim == 2: + image = image[...,None] + + if image.ndim != 3: + raise ValueError('image must have ndim == 3') + _,_,C = image.shape + if C not in [1,3,4]: + raise ValueError('image channels must be 1,3,4') + if image.dtype != np.uint8: + raise ValueError('image dtype must be uint8') + self._image = image + diff --git a/xlib/facemeta/UPerson.py b/xlib/facemeta/UPerson.py new file mode 100644 index 0000000..37bbf25 --- /dev/null +++ b/xlib/facemeta/UPerson.py @@ -0,0 +1,39 @@ +import uuid +from typing import Union + + +class UPerson: + def __init__(self, _from_pickled=False): + """ + """ + self._uuid : Union[bytes, None] = uuid.uuid4().bytes_le if not _from_pickled else None + self._name : Union[str, None] = None + self._age : Union[int, None] = None + + def __getstate__(self): + return self.__dict__.copy() + + def __setstate__(self, d): + self.__init__(_from_pickled=True) + self.__dict__.update(d) + + def __str__(self): return f"UPerson UUID:[...{self._uuid[-4:].hex()}] name:[{self._name}] age:[{self._age}]" + def __repr__(self): return self.__str__() + + def get_uuid(self) -> Union[bytes, None]: return self._uuid + def set_uuid(self, uuid : Union[bytes, None]): + if uuid is not None and not isinstance(uuid, bytes): + raise ValueError(f'uuid must be an instance of bytes or None') + self._uuid = uuid + + def get_name(self) -> Union[str, None]: return self._name + def set_name(self, name : Union[str, None]): + if name is not None and not isinstance(name, str): + raise ValueError(f'name must be an instance of str or None') + self._name = name + + def get_age(self) -> Union[str, None]: return self._age + def set_age(self, age : Union[int, None]): + if age is not None and not isinstance(age, int): + raise ValueError(f'age must be an instance of int or None') + self._age = age \ No newline at end of file diff --git a/xlib/facemeta/__init__.py b/xlib/facemeta/__init__.py index 3baefcc..f73c9a9 100644 --- a/xlib/facemeta/__init__.py +++ b/xlib/facemeta/__init__.py @@ -1,48 +1,61 @@ """ -facemeta +facemeta library -Trying to standartize face description. +Contains classes for effectively storing, manage, and transfering all face related data. -It is not perfect structure, but enough for current tasks. +All classes are picklable and expandable. +All classes have noneable members accessed via get/set. No properties. -FaceURect and FaceULandmarks mean uniform coordinates in order to apply them to any resolution. +E-classes are enums. +U-classes are unique, have uuid and can be saved in Faceset. -Overall structure: +ELandmarks2D L5 + L68 + L468 + +EMaskType UNDEFINED, ..., ... -FaceMark - (mean single face data referencing any image) - .image_name - image reference - .person_name - optional name of person - .FaceURect - a rectangle of the face in source image space - .list[FaceULandmarks] - a list of unique types of landmarks of the face in source image space - types: - LANDMARKS_5 - LANDMARKS_68 - LANDMARKS_468 - - .FaceAlign - an aligned face from FaceMark - - .image_name - image reference - .person_name - optional name of person - .coverage - coverage value used to align - - .source_source_face_ulandmarks_type - type of FaceULandmarks from which this FaceAlign was produced - .source_to_aligned_uni_mat - uniform AffineMat to FaceMark image space to FaceAlign image space - - .FaceURect - a rectangle of the face in aligned image space - .list[FaceULandmarks] - a list of unique types of landmarks of the face in aligned image space - - .FaceMask - grayscale image to mask the face in FaceAlign image space - .image_name - image reference - - .FaceSwap - face image of other person in the same as FaceAlign image space - .image_name - image reference - .person_name - optional name of person - - .FaceMask - grayscale image to mask the swapped face in FaceSwap image space - .image_name +FRect rectangle of the face in uniform float coordinates + +FLandmarks2D 2D landmarks of the face in uniform float coordinates + +FPose pitch/yaw/roll values + +UPerson - person info + .uuid + .name + .age + +UImage - image + .uuid + .name + .data (H,W,C 1/3/4 ) of uint8[0..255] +UFaceMark - face mark info referencing UImage from which the face was detected + .uuid + .UImage_uuid - reference to FImage + .UPerson_uuid - reference to FPerson + .FRect + .List[FLandmarks2D] + .FPose + + .List[ (EMaskType, FImage_uuid, uni_mat) ] - list of FMask and AffineMat to transform mask image space to UFaceMark image space + + +Faceset + .List[UImage] + .List[UFaceMark] + .List[UPerson] """ -from .face import FaceMark, FaceAlign, FaceSwap, FaceMask, FaceURect, FaceULandmarks, FacePose +from .ELandmarks2D import ELandmarks2D +from .EMaskType import EMaskType from .Faceset import Faceset +from .UImage import UImage +from .FLandmarks2D import FLandmarks2D +from .UFaceMark import UFaceMark +from .FMask import FMask +from .UPerson import UPerson +from .FPose import FPose +from .FRect import FRect diff --git a/xlib/facemeta/face.py b/xlib/facemeta/face.py deleted file mode 100644 index f43e3f2..0000000 --- a/xlib/facemeta/face.py +++ /dev/null @@ -1,211 +0,0 @@ -from typing import List, Union - -from .. import math as lib_math - -from .FaceULandmarks import FaceULandmarks -from .FaceURect import FaceURect -from .FacePose import FacePose - -class _part_picklable_expandable: - def __getstate__(self): - return self.__dict__.copy() - - def __setstate__(self, d): - self.__init__() - self.__dict__.update(d) - -class _part_image_name: - def __init__(self): - self._image_name : Union[str, None] = None - - def get_image_name(self) -> Union[str, None]: return self._image_name - def set_image_name(self, image_name : Union[str, None]): - if image_name is not None and not isinstance(image_name, str): - raise ValueError(f'image_name must be an instance of str or None') - self._image_name = image_name - -class _part_person_name: - def __init__(self): - self._person_name : Union[str, None] = None - - def get_person_name(self) -> Union[str, None]: return self._person_name - def set_person_name(self, person_name : Union[str, None]): - if person_name is not None and not isinstance(person_name, str): - raise ValueError(f'person_name must be an instance of str or None') - self._person_name = person_name - -class _part_face_align: - def __init__(self): - self._face_align : Union['FaceAlign', None] = None - - def get_face_align(self) -> Union['FaceAlign', None]: return self._face_align - def set_face_align(self, face_align : 'FaceAlign'): - """add FaceAlign""" - if not isinstance(face_align, FaceAlign): - raise ValueError('face_align must be an instance of FaceAlign') - self._face_align = face_align - -class _part_face_urect: - def __init__(self): - self._face_urect : Union[FaceURect, None] = None - - def get_face_urect(self) -> Union['FaceURect', None]: - """get uniform FaceURect""" - return self._face_urect - - def set_face_urect(self, face_urect : Union['FaceURect', None]): - if face_urect is not None and not isinstance(face_urect, FaceURect): - raise ValueError(f'face_urect must be an instance of FaceURect or None') - self._face_urect = face_urect - -class _part_face_ulandmarks_list: - def __init__(self): - self._face_ulmrks_list : List[FaceULandmarks] = [] - - def get_face_ulandmarks_list(self) -> List['FaceULandmarks']: return self._face_ulmrks_list - def get_face_ulandmarks_by_type(self, type : 'FaceULandmarks.Type') -> Union['FaceULandmarks', None]: - """get FaceULandmarks from list by type""" - if not isinstance(type, FaceULandmarks.Type): - raise ValueError(f'type must be an instance of FaceULandmarks.Type') - - for ulmrks in self._face_ulmrks_list: - if ulmrks.get_type() == type: - return ulmrks - return None - - - def add_face_ulandmarks(self, face_ulmrks : 'FaceULandmarks'): - if not isinstance(face_ulmrks, FaceULandmarks): - raise ValueError('face_ulmrks must be an instance of FaceULandmarks') - - if self.get_face_ulandmarks_by_type(face_ulmrks.get_type()) is not None: - raise Exception(f'_face_ulmrks_list already contains type {face_ulmrks.get_type()}.') - - self._face_ulmrks_list.append(face_ulmrks) - -class _part_source_source_face_ulandmarks_type: - def __init__(self): - self._source_face_ulandmarks_type : Union[FaceULandmarks.Type, None] = None - - def get_source_face_ulandmarks_type(self) -> Union[FaceULandmarks.Type, None]: return self._source_face_ulandmarks_type - def set_source_face_ulandmarks_type(self, source_face_ulandmarks_type : Union[FaceULandmarks.Type, None]): - if source_face_ulandmarks_type is not None and not isinstance(source_face_ulandmarks_type, FaceULandmarks.Type): - raise ValueError('source_face_ulandmarks_type must be an instance of FaceULandmarks.Type') - self._source_face_ulandmarks_type = source_face_ulandmarks_type - - -class _part_coverage: - def __init__(self): - self._coverage : Union[float, None] = None - - def get_coverage(self) -> Union[float, None]: return self._coverage - def set_coverage(self, coverage : Union[float, None]): - if coverage is not None and not isinstance(coverage, float): - raise ValueError('coverage must be an instance of float') - self._coverage = coverage - -class _part_source_to_aligned_uni_mat: - def __init__(self): - self._source_to_aligned_uni_mat : Union[lib_math.Affine2DUniMat, None] = None - - def get_source_to_aligned_uni_mat(self) -> Union[lib_math.Affine2DUniMat, None]: return self._source_to_aligned_uni_mat - def set_source_to_aligned_uni_mat(self, source_to_aligned_uni_mat : Union[lib_math.Affine2DUniMat, None]): - if source_to_aligned_uni_mat is not None and not isinstance(source_to_aligned_uni_mat, lib_math.Affine2DUniMat): - raise ValueError('source_to_aligned_uni_mat must be an instance of lib_math.Affine2DUniMat') - self._source_to_aligned_uni_mat = source_to_aligned_uni_mat - - -class _part_face_mask: - def __init__(self): - self._face_mask : Union['FaceMask', None] = None - - def get_face_mask(self) -> Union['FaceMask', None]: return self._face_mask - def set_face_mask(self, face_mask : 'FaceMask'): - if not isinstance(face_mask, FaceMask): - raise ValueError('face_mask must be an instance of FaceMask') - self._face_mask = face_mask - -class _part_face_swap: - def __init__(self): - self._face_swap : Union['FaceSwap', None] = None - - def get_face_swap(self) -> Union['FaceSwap', None]: return self._face_swap - def set_face_swap(self, face_swap : 'FaceSwap'): - if not isinstance(face_swap, FaceSwap): - raise ValueError('face_swap must be an instance of FaceSwap') - self._face_swap = face_swap - -class _part_face_pose: - def __init__(self): - self._face_pose : Union['FacePose', None] = None - - def get_face_pose(self) -> Union['FacePose', None]: return self._face_pose - def set_face_pose(self, face_pose : 'FacePose'): - if not isinstance(face_pose, FacePose): - raise ValueError('face_pose must be an instance of FacePose') - self._face_pose = face_pose - - -class FaceMark(_part_picklable_expandable, - _part_image_name, - _part_person_name, - _part_face_urect, - _part_face_ulandmarks_list, - _part_face_align, - _part_face_pose, - ): - """ - Describes meta data of single face. - - must not contain any images or large buffers, only references or filenames of them. - """ - def __init__(self): - _part_image_name.__init__(self) - _part_person_name.__init__(self) - _part_face_urect.__init__(self) - _part_face_ulandmarks_list.__init__(self) - _part_face_align.__init__(self) - _part_face_pose.__init__(self) - -class FaceAlign(_part_picklable_expandable, - _part_image_name, - _part_person_name, - _part_face_urect, - _part_face_ulandmarks_list, - _part_coverage, - _part_source_source_face_ulandmarks_type, - _part_source_to_aligned_uni_mat, - _part_face_mask, - _part_face_swap, - ): - def __init__(self): - _part_image_name.__init__(self) - _part_person_name.__init__(self) - _part_coverage.__init__(self) - _part_source_source_face_ulandmarks_type.__init__(self) - _part_source_to_aligned_uni_mat.__init__(self) - _part_face_urect.__init__(self) - _part_face_ulandmarks_list.__init__(self) - _part_face_mask.__init__(self) - _part_face_swap.__init__(self) - - -class FaceSwap(_part_picklable_expandable, - _part_image_name, - _part_person_name, - _part_face_mask, - ): - - def __init__(self): - _part_image_name.__init__(self) - _part_person_name.__init__(self) - _part_face_mask.__init__(self) - - -class FaceMask(_part_picklable_expandable, - _part_image_name, - ): - def __init__(self): - _part_image_name.__init__(self) - -