+xlib.facemeta and refactoring

This commit is contained in:
iperov 2021-10-18 16:56:04 +04:00
parent 081dde23c7
commit 63adc2995e
28 changed files with 962 additions and 613 deletions

View file

@ -5,10 +5,32 @@ from typing import List, Union, Tuple
import numpy as np import numpy as np
from xlib import mp as lib_mp from xlib import mp as lib_mp
from xlib import time as lib_time from xlib import time as lib_time
from xlib.facemeta import FaceMark
from xlib.mp import csw as lib_csw from xlib.mp import csw as lib_csw
from xlib.python.EventListener import EventListener from xlib.python.EventListener import EventListener
from xlib.facemeta import FRect, FLandmarks2D, FPose
class BackendFaceSwapInfo:
def __init__(self):
self.image_name = None
self.face_urect : FRect = None
self.face_pose : FPose = None
self.face_ulmrks : FLandmarks2D = None
self.face_align_image_name : str = None
self.face_align_mask_name : str = None
self.face_swap_image_name : str = None
self.face_swap_mask_name : str = None
self.image_to_align_uni_mat = None
self.face_align_ulmrks : FLandmarks2D = None
def __getstate__(self):
return self.__dict__.copy()
def __setstate__(self, d):
self.__init__()
self.__dict__.update(d)
class BackendConnectionData: class BackendConnectionData:
""" """
@ -27,13 +49,14 @@ class BackendConnectionData:
self._uid = uid self._uid = uid
self._is_frame_reemitted = None self._is_frame_reemitted = None
self._frame_name = None self._frame_image_name = None
self._frame_count = None self._frame_count = None
self._frame_num = None self._frame_num = None
self._frame_fps = None self._frame_fps = None
self._frame_timestamp = None self._frame_timestamp = None
self._merged_frame_name = None self._merged_image_name = None
self._face_mark_list = []
self._face_swap_info_list = []
def __getstate__(self, ): def __getstate__(self, ):
d = self.__dict__.copy() d = self.__dict__.copy()
@ -43,43 +66,43 @@ class BackendConnectionData:
def assign_weak_heap(self, weak_heap : lib_mp.MPWeakHeap): def assign_weak_heap(self, weak_heap : lib_mp.MPWeakHeap):
self._weak_heap = weak_heap self._weak_heap = weak_heap
def set_file(self, name : str, data : Union[bytes, bytearray, memoryview]): def set_file(self, key, data : Union[bytes, bytearray, memoryview]):
self._weak_heap_refs[name] = self._weak_heap.add_data(data) self._weak_heap_refs[key] = self._weak_heap.add_data(data)
def get_file(self, name : str) -> Union[bytes, None]: def get_file(self, key) -> Union[bytes, None]:
ref = self._weak_heap_refs.get(name, None) ref = self._weak_heap_refs.get(key, None)
if ref is not None: if ref is not None:
return self._weak_heap.get_data(ref) return self._weak_heap.get_data(ref)
return None return None
def set_image(self, name : str, image : np.ndarray): def set_image(self, key, image : np.ndarray):
""" """
store image to weak heap store image to weak heap
name str key
image np.ndarray image np.ndarray
""" """
self.set_file(name, image.data) self.set_file(key, image.data)
self._weak_heap_image_infos[name] = (image.shape, image.dtype) self._weak_heap_image_infos[key] = (image.shape, image.dtype)
def get_image_shape_dtype(self, name:str) -> Union[None, Tuple[List, np.dtype]]: def get_image_shape_dtype(self, key) -> Union[None, Tuple[List, np.dtype]]:
""" """
returns (image shape, dtype) or (None, None) if file does not exist returns (image shape, dtype) or (None, None) if file does not exist
""" """
if name is None: if key is None:
return None return (None, None)
image_info = self._weak_heap_image_infos.get(name, None) image_info = self._weak_heap_image_infos.get(key, None)
if image_info is not None: if image_info is not None:
shape, dtype = image_info shape, dtype = image_info
return shape, dtype return shape, dtype
return (None, None) return (None, None)
def get_image(self, name : str) -> Union[np.ndarray, None]: def get_image(self, key) -> Union[np.ndarray, None]:
if name is None: if key is None:
return None return None
image_info = self._weak_heap_image_infos.get(name, None) image_info = self._weak_heap_image_infos.get(key, None)
buffer = self.get_file(name) buffer = self.get_file(key)
if image_info is not None and buffer is not None: if image_info is not None and buffer is not None:
shape, dtype = image_info shape, dtype = image_info
@ -90,10 +113,6 @@ class BackendConnectionData:
def get_is_frame_reemitted(self) -> Union[bool, None]: return self._is_frame_reemitted def get_is_frame_reemitted(self) -> Union[bool, None]: return self._is_frame_reemitted
def set_is_frame_reemitted(self, is_frame_reemitted : bool): self._is_frame_reemitted = is_frame_reemitted def set_is_frame_reemitted(self, is_frame_reemitted : bool): self._is_frame_reemitted = is_frame_reemitted
def get_frame_name(self) -> Union[str, None]: return self._frame_name
def set_frame_name(self, frame_name : str): self._frame_name = frame_name
def get_frame_count(self) -> Union[int, None]: return self._frame_count def get_frame_count(self) -> Union[int, None]: return self._frame_count
def set_frame_count(self, frame_count : int): self._frame_count = frame_count def set_frame_count(self, frame_count : int): self._frame_count = frame_count
def get_frame_num(self) -> Union[int, None]: return self._frame_num def get_frame_num(self) -> Union[int, None]: return self._frame_num
@ -103,14 +122,17 @@ class BackendConnectionData:
def get_frame_timestamp(self) -> Union[float, None]: return self._frame_timestamp def get_frame_timestamp(self) -> Union[float, None]: return self._frame_timestamp
def set_frame_timestamp(self, frame_timestamp : float): self._frame_timestamp = frame_timestamp def set_frame_timestamp(self, frame_timestamp : float): self._frame_timestamp = frame_timestamp
def get_merged_frame_name(self) -> Union[str, None]: return self._merged_frame_name def get_frame_image_name(self) -> Union[str, None]: return self._frame_image_name
def set_merged_frame_name(self, merged_frame_name : str): self._merged_frame_name = merged_frame_name def set_frame_image_name(self, frame_image_name : str): self._frame_image_name = frame_image_name
def get_merged_image_name(self) -> Union[str, None]: return self._merged_image_name
def set_merged_image_name(self, merged_frame_name : str): self._merged_image_name = merged_frame_name
def get_face_swap_info_list(self) -> List[BackendFaceSwapInfo]: return self._face_swap_info_list
def add_face_swap_info(self, fsi : BackendFaceSwapInfo):
if not isinstance(fsi, BackendFaceSwapInfo):
raise ValueError(f'fsi must be an instance of BackendFaceSwapInfo')
self._face_swap_info_list.append(fsi)
def get_face_mark_list(self) -> List[FaceMark]: return self._face_mark_list
def add_face_mark(self, face_mark : FaceMark):
if not isinstance(face_mark, FaceMark):
raise ValueError(f'face_mark must be an instance of FaceMark')
self._face_mark_list.append(face_mark)
class BackendConnection: class BackendConnection:
@ -144,7 +166,7 @@ class BackendConnection:
def is_full_read(self, buffer_size=0) -> bool: def is_full_read(self, buffer_size=0) -> bool:
""" """
if fully readed by receiver side minus buffer_size if fully readed by receiver side minus buffer_size
""" """
return self._rd.get_read_id() >= (self._rd.get_write_id() - buffer_size) return self._rd.get_read_id() >= (self._rd.get_write_id() - buffer_size)
@ -205,3 +227,4 @@ class BackendWorker(lib_csw.Worker):
def stop_profile_timing(self): def stop_profile_timing(self):
self.send_msg('_profile_timing', self._profile_timing_measurer.stop() ) self.send_msg('_profile_timing', self._profile_timing_measurer.stop() )

View file

@ -253,7 +253,7 @@ class CameraSourceWorker(BackendWorker):
bcd.assign_weak_heap(self.weak_heap) bcd.assign_weak_heap(self.weak_heap)
frame_name = f'Camera_{state.device_idx}_{bcd_uid:06}' frame_name = f'Camera_{state.device_idx}_{bcd_uid:06}'
bcd.set_frame_name(frame_name) bcd.set_frame_image_name(frame_name)
bcd.set_frame_num(bcd_uid) bcd.set_frame_num(bcd_uid)
bcd.set_frame_timestamp(timestamp) bcd.set_frame_timestamp(timestamp)
bcd.set_image(frame_name, img) bcd.set_image(frame_name, img)

View file

@ -2,7 +2,6 @@ import time
import numpy as np import numpy as np
from xlib import os as lib_os from xlib import os as lib_os
from xlib.facemeta import FaceAlign, FaceULandmarks
from xlib.mp import csw as lib_csw from xlib.mp import csw as lib_csw
from xlib.python import all_is_not_None from xlib.python import all_is_not_None
@ -51,7 +50,7 @@ class FaceAlignerWorker(BackendWorker):
cs.exclude_moving_parts.enable() cs.exclude_moving_parts.enable()
cs.exclude_moving_parts.set_flag(state.exclude_moving_parts if state.exclude_moving_parts is not None else True) cs.exclude_moving_parts.set_flag(state.exclude_moving_parts if state.exclude_moving_parts is not None else True)
cs.head_mode.enable() cs.head_mode.enable()
cs.head_mode.set_flag(state.head_mode if state.head_mode is not None else False) cs.head_mode.set_flag(state.head_mode if state.head_mode is not None else False)
@ -91,7 +90,7 @@ class FaceAlignerWorker(BackendWorker):
state.head_mode = head_mode state.head_mode = head_mode
self.save_state() self.save_state()
self.reemit_frame_signal.send() self.reemit_frame_signal.send()
def on_cs_x_offset(self, x_offset): def on_cs_x_offset(self, x_offset):
state, cs = self.get_state(), self.get_control_sheet() state, cs = self.get_state(), self.get_control_sheet()
cfg = cs.x_offset.get_config() cfg = cs.x_offset.get_config()
@ -118,21 +117,17 @@ class FaceAlignerWorker(BackendWorker):
if bcd is not None: if bcd is not None:
bcd.assign_weak_heap(self.weak_heap) bcd.assign_weak_heap(self.weak_heap)
frame_name = bcd.get_frame_name() frame_image_name = bcd.get_frame_image_name()
frame_image = bcd.get_image(frame_name) frame_image = bcd.get_image(frame_image_name)
if all_is_not_None(state.face_coverage, state.resolution, frame_name, frame_image): if all_is_not_None(state.face_coverage, state.resolution, frame_image):
for face_id,face_mark in enumerate( bcd.get_face_mark_list() ): for face_id, fsi in enumerate( bcd.get_face_swap_info_list() ):
face_ulmrks = face_mark.get_face_ulandmarks_by_type(FaceULandmarks.Type.LANDMARKS_468)
if face_ulmrks is None:
face_ulmrks = face_mark.get_face_ulandmarks_by_type(FaceULandmarks.Type.LANDMARKS_68)
head_yaw = None head_yaw = None
if state.head_mode: if state.head_mode:
face_pose = face_mark.get_face_pose() if fsi.face_pose is not None:
if face_pose is not None: head_yaw = fsi.face_pose.as_radians()[1]
head_yaw = face_pose.as_radians()[1]
face_ulmrks = fsi.face_ulmrks
if face_ulmrks is not None: if face_ulmrks is not None:
face_image, uni_mat = face_ulmrks.cut(frame_image, state.face_coverage, state.resolution, face_image, uni_mat = face_ulmrks.cut(frame_image, state.face_coverage, state.resolution,
exclude_moving_parts=state.exclude_moving_parts, exclude_moving_parts=state.exclude_moving_parts,
@ -140,19 +135,11 @@ class FaceAlignerWorker(BackendWorker):
x_offset=state.x_offset, x_offset=state.x_offset,
y_offset=state.y_offset) y_offset=state.y_offset)
face_align_image_name = f'{frame_name}_{face_id}_aligned' fsi.face_align_image_name = f'{frame_image_name}_{face_id}_aligned'
fsi.image_to_align_uni_mat = uni_mat
fsi.face_align_ulmrks = face_ulmrks.transform(uni_mat)
face_align = FaceAlign() bcd.set_image(fsi.face_align_image_name, face_image)
face_align.set_image_name(face_align_image_name)
face_align.set_coverage(state.face_coverage)
face_align.set_source_face_ulandmarks_type(face_ulmrks.get_type())
face_align.set_source_to_aligned_uni_mat(uni_mat)
for face_ulmrks in face_mark.get_face_ulandmarks_list():
face_align.add_face_ulandmarks( face_ulmrks.transform(uni_mat) )
face_mark.set_face_align(face_align)
bcd.set_image(face_align_image_name, face_image)
self.stop_profile_timing() self.stop_profile_timing()
self.pending_bcd = bcd self.pending_bcd = bcd

View file

@ -4,14 +4,14 @@ from enum import IntEnum
import numpy as np import numpy as np
from modelhub import onnx as onnx_models from modelhub import onnx as onnx_models
from xlib import os as lib_os from xlib import os as lib_os
from xlib.facemeta import FaceMark, FaceURect from xlib.facemeta import FRect
from xlib.image import ImageProcessor from xlib.image import ImageProcessor
from xlib.mp import csw as lib_csw from xlib.mp import csw as lib_csw
from xlib.python import all_is_not_None from xlib.python import all_is_not_None
from .BackendBase import (BackendConnection, BackendDB, BackendHost, from .BackendBase import (BackendConnection, BackendDB, BackendHost,
BackendSignal, BackendWeakHeap, BackendWorker, BackendSignal, BackendWeakHeap, BackendWorker,
BackendWorkerState) BackendWorkerState, BackendFaceSwapInfo)
class DetectorType(IntEnum): class DetectorType(IntEnum):
@ -213,10 +213,10 @@ class FaceDetectorWorker(BackendWorker):
detector_state = state.get_detector_state() detector_state = state.get_detector_state()
frame_name = bcd.get_frame_name() frame_image_name = bcd.get_frame_image_name()
frame_image = bcd.get_image(frame_name) frame_image = bcd.get_image(frame_image_name)
if all_is_not_None(frame_image, frame_name): if frame_image is not None:
_,H,W,_ = ImageProcessor(frame_image).get_dims() _,H,W,_ = ImageProcessor(frame_image).get_dims()
rects = [] rects = []
@ -228,13 +228,13 @@ class FaceDetectorWorker(BackendWorker):
rects = self.YoloV5Face.extract (frame_image, threshold=detector_state.threshold, fixed_window=detector_state.fixed_window_size)[0] rects = self.YoloV5Face.extract (frame_image, threshold=detector_state.threshold, fixed_window=detector_state.fixed_window_size)[0]
# to list of FaceURect # to list of FaceURect
rects = [ FaceURect.from_ltrb( (l/W, t/H, r/W, b/H) ) for l,t,r,b in rects ] rects = [ FRect.from_ltrb( (l/W, t/H, r/W, b/H) ) for l,t,r,b in rects ]
# sort # sort
if detector_state.sort_by == FaceSortBy.LARGEST: if detector_state.sort_by == FaceSortBy.LARGEST:
rects = FaceURect.sort_by_area_size(rects) rects = FRect.sort_by_area_size(rects)
elif detector_state.sort_by == FaceSortBy.DIST_FROM_CENTER: elif detector_state.sort_by == FaceSortBy.DIST_FROM_CENTER:
rects = FaceURect.sort_by_dist_from_center(rects) rects = FRect.sort_by_dist_from_center(rects)
if len(rects) != 0: if len(rects) != 0:
max_faces = detector_state.max_faces max_faces = detector_state.max_faces
@ -245,20 +245,20 @@ class FaceDetectorWorker(BackendWorker):
if len(self.temporal_rects) != len(rects): if len(self.temporal_rects) != len(rects):
self.temporal_rects = [ [] for _ in range(len(rects)) ] self.temporal_rects = [ [] for _ in range(len(rects)) ]
for face_id, face_rect in enumerate(rects): for face_id, face_urect in enumerate(rects):
if detector_state.temporal_smoothing != 1: if detector_state.temporal_smoothing != 1:
if not is_frame_reemitted or len(self.temporal_rects[face_id]) == 0: if not is_frame_reemitted or len(self.temporal_rects[face_id]) == 0:
self.temporal_rects[face_id].append( face_rect.as_4pts() ) self.temporal_rects[face_id].append( face_urect.as_4pts() )
self.temporal_rects[face_id] = self.temporal_rects[face_id][-detector_state.temporal_smoothing:] self.temporal_rects[face_id] = self.temporal_rects[face_id][-detector_state.temporal_smoothing:]
face_rect = FaceURect.from_4pts ( np.mean(self.temporal_rects[face_id],0 ) ) face_urect = FRect.from_4pts ( np.mean(self.temporal_rects[face_id],0 ) )
if face_rect.get_area() != 0: if face_urect.get_area() != 0:
face_mark = FaceMark() fsi = BackendFaceSwapInfo()
face_mark.set_image_name(frame_name) fsi.image_name = frame_image_name
face_mark.set_face_urect ( face_rect ) fsi.face_urect = face_urect
bcd.add_face_mark(face_mark) bcd.add_face_swap_info(fsi)
self.stop_profile_timing() self.stop_profile_timing()
self.pending_bcd = bcd self.pending_bcd = bcd

View file

@ -5,7 +5,7 @@ from modelhub import onnx as onnx_models
from modelhub import cv as cv_models from modelhub import cv as cv_models
from xlib import os as lib_os from xlib import os as lib_os
from xlib.facemeta import FaceULandmarks, FacePose from xlib.facemeta import ELandmarks2D, FLandmarks2D, FPose
from xlib.image import ImageProcessor from xlib.image import ImageProcessor
from xlib.mp import csw as lib_csw from xlib.mp import csw as lib_csw
from xlib.python import all_is_not_None from xlib.python import all_is_not_None
@ -153,20 +153,18 @@ class FaceMarkerWorker(BackendWorker):
is_google_facemesh = marker_type == MarkerType.GOOGLE_FACEMESH and self.google_facemesh is not None is_google_facemesh = marker_type == MarkerType.GOOGLE_FACEMESH and self.google_facemesh is not None
if marker_type is not None: if marker_type is not None:
frame_name = bcd.get_frame_name() frame_image = bcd.get_image(bcd.get_frame_image_name())
frame_image = bcd.get_image(frame_name)
if all_is_not_None(frame_image) and (is_opencv_lbf or is_google_facemesh): if frame_image is not None and (is_opencv_lbf or is_google_facemesh):
face_mark_list = bcd.get_face_mark_list() fsi_list = bcd.get_face_swap_info_list()
if marker_state.temporal_smoothing != 1 and \ if marker_state.temporal_smoothing != 1 and \
len(self.temporal_lmrks) != len(face_mark_list): len(self.temporal_lmrks) != len(fsi_list):
self.temporal_lmrks = [ [] for _ in range(len(face_mark_list)) ] self.temporal_lmrks = [ [] for _ in range(len(fsi_list)) ]
for face_id, face_mark in enumerate(face_mark_list): for face_id, fsi in enumerate(fsi_list):
face_mark_rect = face_mark.get_face_urect() if fsi.face_urect is not None:
if face_mark_rect is not None:
# Cut the face to feed to the face marker # Cut the face to feed to the face marker
face_image, face_uni_mat = face_mark_rect.cut(frame_image, marker_state.marker_coverage, 256 if is_opencv_lbf else \ face_image, face_uni_mat = fsi.face_urect.cut(frame_image, marker_state.marker_coverage, 256 if is_opencv_lbf else \
192 if is_google_facemesh else 0 ) 192 if is_google_facemesh else 0 )
_,H,W,_ = ImageProcessor(face_image).get_dims() _,H,W,_ = ImageProcessor(face_image).get_dims()
@ -183,20 +181,17 @@ class FaceMarkerWorker(BackendWorker):
lmrks = np.mean(self.temporal_lmrks[face_id],0 ) lmrks = np.mean(self.temporal_lmrks[face_id],0 )
if is_google_facemesh: if is_google_facemesh:
face_mark.set_face_pose(FacePose.from_3D_468_landmarks(lmrks)) fsi.face_pose = FPose.from_3D_468_landmarks(lmrks)
if is_opencv_lbf: if is_opencv_lbf:
lmrks /= (W,H) lmrks /= (W,H)
elif is_google_facemesh: elif is_google_facemesh:
lmrks = lmrks[...,0:2] / (W,H) lmrks = lmrks[...,0:2] / (W,H)
face_ulmrks = FaceULandmarks.create (FaceULandmarks.Type.LANDMARKS_68 if is_opencv_lbf else \ face_ulmrks = FLandmarks2D.create (ELandmarks2D.L68 if is_opencv_lbf else \
FaceULandmarks.Type.LANDMARKS_468 if is_google_facemesh else None, lmrks) ELandmarks2D.L468 if is_google_facemesh else None, lmrks)
face_ulmrks = face_ulmrks.transform(face_uni_mat, invert=True) face_ulmrks = face_ulmrks.transform(face_uni_mat, invert=True)
face_mark.add_face_ulandmarks (face_ulmrks) fsi.face_ulmrks = face_ulmrks
self.stop_profile_timing() self.stop_profile_timing()
self.pending_bcd = bcd self.pending_bcd = bcd

View file

@ -255,44 +255,35 @@ class FaceMergerWorker(BackendWorker):
if bcd is not None: if bcd is not None:
bcd.assign_weak_heap(self.weak_heap) bcd.assign_weak_heap(self.weak_heap)
frame_name = bcd.get_frame_name() frame_image_name = bcd.get_frame_image_name()
frame_image = bcd.get_image(frame_name) frame_image = bcd.get_image(frame_image_name)
if frame_image is not None: if frame_image is not None:
for face_mark in bcd.get_face_mark_list(): for fsi in bcd.get_face_swap_info_list():
face_align = face_mark.get_face_align() face_align_img_shape, _ = bcd.get_image_shape_dtype(fsi.face_align_image_name)
if face_align is not None: face_align_mask_img = bcd.get_image(fsi.face_align_mask_name)
face_swap = face_align.get_face_swap() face_swap_img = bcd.get_image(fsi.face_swap_image_name)
face_align_mask = face_align.get_face_mask() face_swap_mask_img = bcd.get_image(fsi.face_swap_mask_name)
image_to_align_uni_mat = fsi.image_to_align_uni_mat
if face_swap is not None: if all_is_not_None(face_align_img_shape, face_align_mask_img, face_swap_img, face_swap_mask_img, image_to_align_uni_mat):
face_swap_mask = face_swap.get_face_mask() face_height, face_width = face_align_img_shape[:2]
if face_swap_mask is not None: frame_height, frame_width = frame_image.shape[:2]
aligned_to_source_uni_mat = image_to_align_uni_mat.invert()
aligned_to_source_uni_mat = aligned_to_source_uni_mat.source_translated(-state.face_x_offset, -state.face_y_offset)
aligned_to_source_uni_mat = aligned_to_source_uni_mat.source_scaled_around_center(state.face_scale,state.face_scale)
aligned_to_source_uni_mat = aligned_to_source_uni_mat.to_exact_mat (face_width, face_height, frame_width, frame_height)
face_align_img_shape, _ = bcd.get_image_shape_dtype(face_align.get_image_name()) if state.device == 'CPU':
face_align_mask_img = bcd.get_image(face_align_mask.get_image_name()) merged_frame = self._merge_on_cpu(frame_image, face_align_mask_img, face_swap_img, face_swap_mask_img, aligned_to_source_uni_mat, frame_width, frame_height )
face_swap_img = bcd.get_image(face_swap.get_image_name()) else:
face_swap_mask_img = bcd.get_image(face_swap_mask.get_image_name()) merged_frame = self._merge_on_gpu(frame_image, face_align_mask_img, face_swap_img, face_swap_mask_img, aligned_to_source_uni_mat, frame_width, frame_height )
source_to_aligned_uni_mat = face_align.get_source_to_aligned_uni_mat() # keep image in float32 in order not to extra load FaceMerger
if all_is_not_None(face_align_img_shape, face_align_mask_img, face_swap_img, face_swap_mask_img): merged_image_name = f'{frame_image_name}_merged'
face_height, face_width = face_align_img_shape[:2] bcd.set_merged_image_name(merged_image_name)
frame_height, frame_width = frame_image.shape[:2] bcd.set_image(merged_image_name, merged_frame)
aligned_to_source_uni_mat = source_to_aligned_uni_mat.invert() break
aligned_to_source_uni_mat = aligned_to_source_uni_mat.source_translated(-state.face_x_offset, -state.face_y_offset)
aligned_to_source_uni_mat = aligned_to_source_uni_mat.source_scaled_around_center(state.face_scale,state.face_scale)
aligned_to_source_uni_mat = aligned_to_source_uni_mat.to_exact_mat (face_width, face_height, frame_width, frame_height)
if state.device == 'CPU':
merged_frame = self._merge_on_cpu(frame_image, face_align_mask_img, face_swap_img, face_swap_mask_img, aligned_to_source_uni_mat, frame_width, frame_height )
else:
merged_frame = self._merge_on_gpu(frame_image, face_align_mask_img, face_swap_img, face_swap_mask_img, aligned_to_source_uni_mat, frame_width, frame_height )
# keep image in float32 in order not to extra load FaceMerger
merged_frame_name = f'{frame_name}_merged'
bcd.set_merged_frame_name(merged_frame_name)
bcd.set_image(merged_frame_name, merged_frame)
break
self.stop_profile_timing() self.stop_profile_timing()
self.pending_bcd = bcd self.pending_bcd = bcd

View file

@ -4,7 +4,6 @@ from pathlib import Path
import numpy as np import numpy as np
from modelhub import DFLive from modelhub import DFLive
from xlib import os as lib_os from xlib import os as lib_os
from xlib.facemeta import FaceMask, FaceSwap
from xlib.image.ImageProcessor import ImageProcessor from xlib.image.ImageProcessor import ImageProcessor
from xlib.mp import csw as lib_csw from xlib.mp import csw as lib_csw
from xlib.python import all_is_not_None from xlib.python import all_is_not_None
@ -171,7 +170,7 @@ class FaceSwapperWorker(BackendWorker):
cs.model_dl_progress.enable() cs.model_dl_progress.enable()
cs.model_dl_progress.set_config( lib_csw.Progress.Config(title='@FaceSwapper.downloading_model') ) cs.model_dl_progress.set_config( lib_csw.Progress.Config(title='@FaceSwapper.downloading_model') )
cs.model_dl_progress.set_progress(0) cs.model_dl_progress.set_progress(0)
elif events.new_status_initialized: elif events.new_status_initialized:
self.dfm_model = events.dfm_model self.dfm_model = events.dfm_model
self.dfm_model_initializer = None self.dfm_model_initializer = None
@ -239,47 +238,39 @@ class FaceSwapperWorker(BackendWorker):
if all_is_not_None(dfm_model, model_state): if all_is_not_None(dfm_model, model_state):
face_id = model_state.face_id face_id = model_state.face_id
if face_id is not None: if face_id is not None:
for i, face_mark in enumerate(bcd.get_face_mark_list()): for i, fsi in enumerate(bcd.get_face_swap_info_list()):
if face_id == i: if face_id != i:
face_align = face_mark.get_face_align() continue
if face_align is not None:
face_align_image_name = face_align.get_image_name()
face_align_image = bcd.get_image(face_align_image_name)
if face_align_image is not None:
pre_gamma_red = model_state.pre_gamma_red face_align_image = bcd.get_image(fsi.face_align_image_name)
pre_gamma_green = model_state.pre_gamma_green if face_align_image is not None:
pre_gamma_blue = model_state.pre_gamma_blue
fai_ip = ImageProcessor(face_align_image) pre_gamma_red = model_state.pre_gamma_red
if model_state.presharpen_amount != 0: pre_gamma_green = model_state.pre_gamma_green
fai_ip.sharpen(factor=model_state.presharpen_amount) pre_gamma_blue = model_state.pre_gamma_blue
if pre_gamma_red != 1.0 or pre_gamma_green != 1.0 or pre_gamma_blue != 1.0: fai_ip = ImageProcessor(face_align_image)
fai_ip.adjust_gamma(pre_gamma_red, pre_gamma_green, pre_gamma_blue) if model_state.presharpen_amount != 0:
face_align_image = fai_ip.get_image('HWC') fai_ip.sharpen(factor=model_state.presharpen_amount)
celeb_face, celeb_face_mask_img, face_align_mask_img = dfm_model.convert(face_align_image, morph_factor=model_state.morph_factor) if pre_gamma_red != 1.0 or pre_gamma_green != 1.0 or pre_gamma_blue != 1.0:
celeb_face, celeb_face_mask_img, face_align_mask_img = celeb_face[0], celeb_face_mask_img[0], face_align_mask_img[0] fai_ip.adjust_gamma(pre_gamma_red, pre_gamma_green, pre_gamma_blue)
face_align_image = fai_ip.get_image('HWC')
if model_state.two_pass: celeb_face, celeb_face_mask_img, face_align_mask_img = dfm_model.convert(face_align_image, morph_factor=model_state.morph_factor)
celeb_face, celeb_face_mask_img, _ = dfm_model.convert(celeb_face, morph_factor=model_state.morph_factor) celeb_face, celeb_face_mask_img, face_align_mask_img = celeb_face[0], celeb_face_mask_img[0], face_align_mask_img[0]
celeb_face, celeb_face_mask_img = celeb_face[0], celeb_face_mask_img[0]
face_align_mask = FaceMask() if model_state.two_pass:
face_align_mask.set_image_name(f'{face_align_image_name}_mask') celeb_face, celeb_face_mask_img, _ = dfm_model.convert(celeb_face, morph_factor=model_state.morph_factor)
face_align.set_face_mask(face_align_mask) celeb_face, celeb_face_mask_img = celeb_face[0], celeb_face_mask_img[0]
bcd.set_image(face_align_mask.get_image_name(), face_align_mask_img)
face_swap = FaceSwap() fsi.face_align_mask_name = f'{fsi.face_align_image_name}_mask'
face_swap.set_image_name (f"{face_align_image_name}_swapped") fsi.face_swap_image_name = f'{fsi.face_align_image_name}_swapped'
face_align.set_face_swap(face_swap) fsi.face_swap_mask_name = f'{fsi.face_swap_image_name}_mask'
bcd.set_image(face_swap.get_image_name(), celeb_face)
face_swap_mask = FaceMask() bcd.set_image(fsi.face_align_mask_name, face_align_mask_img)
face_swap_mask.set_image_name(f'{face_swap.get_image_name()}_mask') bcd.set_image(fsi.face_swap_image_name, celeb_face)
face_swap.set_face_mask(face_swap_mask) bcd.set_image(fsi.face_swap_mask_name, celeb_face_mask_img)
bcd.set_image(face_swap_mask.get_image_name(), celeb_face_mask_img)
self.stop_profile_timing() self.stop_profile_timing()
self.pending_bcd = bcd self.pending_bcd = bcd

View file

@ -287,7 +287,7 @@ class FileSourceWorker(BackendWorker):
bcd.set_frame_num(p_frame.frame_num) bcd.set_frame_num(p_frame.frame_num)
bcd.set_frame_fps(p_frame.fps) bcd.set_frame_fps(p_frame.fps)
bcd.set_frame_timestamp(p_frame.timestamp) bcd.set_frame_timestamp(p_frame.timestamp)
bcd.set_frame_name(p_frame.name) bcd.set_frame_image_name(p_frame.name)
image = ImageProcessor(p_frame.image).to_uint8().get_image('HWC') image = ImageProcessor(p_frame.image).to_uint8().get_image('HWC')
bcd.set_image(p_frame.name, image) bcd.set_image(p_frame.name, image)

View file

@ -72,8 +72,8 @@ class FrameAdjusterWorker(BackendWorker):
if bcd is not None: if bcd is not None:
bcd.assign_weak_heap(self.weak_heap) bcd.assign_weak_heap(self.weak_heap)
frame_name = bcd.get_frame_name() frame_image_name = bcd.get_frame_image_name()
frame_image = bcd.get_image(frame_name) frame_image = bcd.get_image(frame_image_name)
if frame_image is not None: if frame_image is not None:
frame_image_ip = ImageProcessor(frame_image) frame_image_ip = ImageProcessor(frame_image)
@ -81,7 +81,7 @@ class FrameAdjusterWorker(BackendWorker):
frame_image_ip.degrade_resize( state.degrade_bicubic_per / 100.0, interpolation=ImageProcessor.Interpolation.CUBIC) frame_image_ip.degrade_resize( state.degrade_bicubic_per / 100.0, interpolation=ImageProcessor.Interpolation.CUBIC)
frame_image = frame_image_ip.get_image('HWC') frame_image = frame_image_ip.get_image('HWC')
bcd.set_image(frame_name, frame_image) bcd.set_image(frame_image_name, frame_image)
self.stop_profile_timing() self.stop_profile_timing()
self.pending_bcd = bcd self.pending_bcd = bcd

View file

@ -202,31 +202,26 @@ class StreamOutputWorker(BackendWorker):
view_image = None view_image = None
if source_type == SourceType.SOURCE_FRAME: if source_type == SourceType.SOURCE_FRAME:
view_image = bcd.get_image(bcd.get_frame_name()) view_image = bcd.get_image(bcd.get_frame_image_name())
elif source_type == SourceType.MERGED_FRAME: elif source_type == SourceType.MERGED_FRAME:
view_image = bcd.get_image(bcd.get_merged_frame_name()) view_image = bcd.get_image(bcd.get_merged_image_name())
elif source_type == SourceType.ALIGNED_FACE: elif source_type == SourceType.ALIGNED_FACE:
aligned_face_id = state.aligned_face_id aligned_face_id = state.aligned_face_id
for i, face_mark in enumerate(bcd.get_face_mark_list()): for i, fsi in enumerate(bcd.get_face_swap_info_list()):
if aligned_face_id == i: if aligned_face_id == i:
face_align = face_mark.get_face_align() view_image = bcd.get_image(fsi.face_align_image_name)
if face_align is not None:
view_image = bcd.get_image(face_align.get_image_name())
break break
elif source_type == SourceType.SWAPPED_FACE: elif source_type == SourceType.SWAPPED_FACE:
for face_mark in bcd.get_face_mark_list(): for fsi in bcd.get_face_swap_info_list():
face_align = face_mark.get_face_align() view_image = bcd.get_image(fsi.face_swap_image_name)
if face_align is not None: if view_image is not None:
face_swap = face_align.get_face_swap() break
if face_swap is not None:
view_image = bcd.get_image(face_swap.get_image_name())
break
elif source_type == SourceType.SOURCE_N_MERGED_FRAME: elif source_type == SourceType.SOURCE_N_MERGED_FRAME:
source_frame = bcd.get_image(bcd.get_frame_name()) source_frame = bcd.get_image(bcd.get_frame_image_name())
merged_frame = bcd.get_image(bcd.get_merged_frame_name()) merged_frame = bcd.get_image(bcd.get_merged_image_name())
if source_frame is not None and merged_frame is not None: if source_frame is not None and merged_frame is not None:
source_frame = ImageProcessor(source_frame).to_ufloat32().get_image('HWC') source_frame = ImageProcessor(source_frame).to_ufloat32().get_image('HWC')
view_image = np.concatenate( (source_frame, merged_frame), 1 ) view_image = np.concatenate( (source_frame, merged_frame), 1 )

View file

@ -92,20 +92,18 @@ class QFaceDetector(QBackendPanel):
if bcd is not None: if bcd is not None:
bcd.assign_weak_heap(self._weak_heap) bcd.assign_weak_heap(self._weak_heap)
frame_name = bcd.get_frame_name() frame_image = bcd.get_image(bcd.get_frame_image_name())
frame_image = bcd.get_image(frame_name)
frame_image_w_h = None frame_image_w_h = None
if frame_image is not None: if frame_image is not None:
h,w = frame_image.shape[0:2] h,w = frame_image.shape[0:2]
frame_image_w_h = (w,h) frame_image_w_h = (w,h)
info = [] info = []
for face_num,face_mark in enumerate(bcd.get_face_mark_list()): for face_id, fsi in enumerate(bcd.get_face_swap_info_list()):
info_str = f'{face_num}: ' info_str = f'{face_id}: '
rect = face_mark.get_face_urect() if fsi.face_urect is not None:
if rect is not None: l,t,r,b = fsi.face_urect.as_ltrb_bbox(frame_image_w_h).astype(np.int)
l,t,r,b = rect.as_ltrb_bbox(frame_image_w_h).astype(np.int)
info_str += f'[{l},{t},{r},{b}]' info_str += f'[{l},{t},{r},{b}]'
info.append(info_str) info.append(info_str)

View file

@ -5,8 +5,6 @@ from PyQt6.QtGui import *
from PyQt6.QtWidgets import * from PyQt6.QtWidgets import *
from resources.fonts import QXFontDB from resources.fonts import QXFontDB
from xlib import qt as lib_qt from xlib import qt as lib_qt
from xlib.facemeta import FaceULandmarks
from xlib.python import all_is_not_None
from ... import backend from ... import backend
@ -47,36 +45,25 @@ class QBCFaceAlignViewer(lib_qt.QXCollapsibleSection):
self._layered_images.clear_images() self._layered_images.clear_images()
for face_mark in bcd.get_face_mark_list(): for fsi in bcd.get_face_swap_info_list():
face_align = face_mark.get_face_align() face_image = bcd.get_image (fsi.face_align_image_name)
if face_align is not None: if face_image is not None:
face_image = bcd.get_image (face_align.get_image_name()) h,w = face_image.shape[:2]
if face_image is not None: self._layered_images.add_image(face_image)
source_to_aligned_uni_mat = face_align.get_source_to_aligned_uni_mat()
h,w = face_image.shape[:2] if fsi.face_align_ulmrks is not None:
self._layered_images.add_image(face_image) lmrks_layer = np.zeros( (self._preview_width, self._preview_width, 4), dtype=np.uint8)
face_ulmrks = face_align.get_face_ulandmarks_by_type(FaceULandmarks.Type.LANDMARKS_468) fsi.face_align_ulmrks.draw(lmrks_layer, (0,255,0,255))
if face_ulmrks is None:
face_ulmrks = face_align.get_face_ulandmarks_by_type(FaceULandmarks.Type.LANDMARKS_68)
if face_ulmrks is not None: if fsi.face_urect is not None and fsi.image_to_align_uni_mat is not None:
lmrks_layer = np.zeros( (self._preview_width, self._preview_width, 4), dtype=np.uint8) aligned_uni_rect = fsi.face_urect.transform(fsi.image_to_align_uni_mat)
aligned_uni_rect.draw(lmrks_layer, (0,0,255,255) )
face_ulmrks.draw(lmrks_layer, (0,255,0,255)) self._layered_images.add_image(lmrks_layer)
face_mark_rect = face_mark.get_face_urect() self._info_label.setText(f'{w}x{h}')
if face_mark_rect is not None: return
aligned_uni_rect = face_mark_rect.transform(source_to_aligned_uni_mat)
aligned_uni_rect.draw(lmrks_layer, (0,0,255,255) )
self._layered_images.add_image(lmrks_layer)
self._info_label.setText(f'{w}x{h}')
return
def clear(self): def clear(self):

View file

@ -45,17 +45,13 @@ class QBCFaceSwapViewer(lib_qt.QXCollapsibleSection):
self._layered_images.clear_images() self._layered_images.clear_images()
for face_mark in bcd.get_face_mark_list(): for fsi in bcd.get_face_swap_info_list():
face_align = face_mark.get_face_align() face_swap_image = bcd.get_image(fsi.face_swap_image_name)
if face_align is not None: if face_swap_image is not None:
face_swap = face_align.get_face_swap() self._layered_images.add_image(face_swap_image)
if face_swap is not None: h,w = face_swap_image.shape[0:2]
face_swap_image = bcd.get_image(face_swap.get_image_name()) self._info_label.setText(f'{w}x{h}')
if face_swap_image is not None: return
self._layered_images.add_image(face_swap_image)
h,w = face_swap_image.shape[0:2]
self._info_label.setText(f'{w}x{h}')
return
def clear(self): def clear(self):
self._layered_images.clear_images() self._layered_images.clear_images()

View file

@ -43,16 +43,16 @@ class QBCFinalFrameViewer(lib_qt.QXCollapsibleSection):
self._layered_images.clear_images() self._layered_images.clear_images()
merged_frame_name = bcd.get_merged_frame_name() merged_image_name = bcd.get_merged_image_name()
merged_frame_image = bcd.get_image(merged_frame_name) merged_image = bcd.get_image(merged_image_name)
if merged_frame_image is not None: if merged_image is not None:
if merged_frame_image.dtype != np.uint8: if merged_image.dtype != np.uint8:
merged_frame_image = lib_image.ImageProcessor(merged_frame_image).to_uint8().get_image('HWC') merged_image = lib_image.ImageProcessor(merged_image).to_uint8().get_image('HWC')
self._layered_images.add_image(merged_frame_image) self._layered_images.add_image(merged_image)
h,w = merged_frame_image.shape[0:2] h,w = merged_image.shape[0:2]
self._info_label.setText(f'{merged_frame_name} {w}x{h}') self._info_label.setText(f'{merged_image_name} {w}x{h}')
def clear(self): def clear(self):

View file

@ -42,16 +42,13 @@ class QBCFrameViewer(lib_qt.QXCollapsibleSection):
self._layered_images.clear_images() self._layered_images.clear_images()
frame_name = bcd.get_frame_name() frame_image_name = bcd.get_frame_image_name()
frame_image = bcd.get_image(frame_name) frame_image = bcd.get_image(frame_image_name)
if frame_image is not None: if frame_image is not None:
self._layered_images.add_image (frame_image) self._layered_images.add_image (frame_image)
h,w = frame_image.shape[:2] h,w = frame_image.shape[:2]
if frame_name is not None: self._info_label.setText(f'{frame_image_name} {w}x{h}')
self._info_label.setText(f'{frame_name} {w}x{h}')
else:
self._info_label.setText(f'{w}x{h}')
def clear(self): def clear(self):

View file

@ -1,20 +1,213 @@
from pathlib import Path from pathlib import Path
import numpy as np
from xlib import console as lib_con
from xlib import facemeta as lib_fm
from xlib import path as lib_path
from xlib.file import SplittedFile from xlib.file import SplittedFile
from xlib import cv as lib_cv
import cv2
repo_root = Path(__file__).parent.parent repo_root = Path(__file__).parent.parent
large_files_list = [ (repo_root / 'modelhub' / 'onnx' / 'S3FD' / 'S3FD.onnx', 48*1024*1024), large_files_list = [ (repo_root / 'modelhub' / 'onnx' / 'S3FD' / 'S3FD.onnx', 48*1024*1024),
(repo_root / 'modelhub' / 'torch' / 'S3FD' / 'S3FD.pth', 48*1024*1024), (repo_root / 'modelhub' / 'torch' / 'S3FD' / 'S3FD.pth', 48*1024*1024),
(repo_root / 'modelhub' / 'cv' / 'FaceMarkerLBF' / 'lbfmodel.yaml', 34*1024*1024), (repo_root / 'modelhub' / 'cv' / 'FaceMarkerLBF' / 'lbfmodel.yaml', 34*1024*1024),
] ]
def merge_large_files(delete_parts=False): def merge_large_files(delete_parts=False):
for filepath, _ in large_files_list: for filepath, _ in large_files_list:
print(f'Merging {filepath}...') print(f'Merging {filepath}...')
SplittedFile.merge(filepath, delete_parts=delete_parts) SplittedFile.merge(filepath, delete_parts=delete_parts)
print('Done') print('Done')
def split_large_files(delete_original=False): def split_large_files(delete_original=False):
for filepath, part_size in large_files_list: for filepath, part_size in large_files_list:
print(f'Splitting {filepath}...') print(f'Splitting {filepath}...')
SplittedFile.split(filepath, part_size=part_size, delete_original=delete_original) SplittedFile.split(filepath, part_size=part_size, delete_original=delete_original)
print('Done') print('Done')
def extract_facesynthetics_dataset(input_dir):
"""
extract FaceSynthetics dataset https://github.com/microsoft/FaceSynthetics
BACKGROUND = 0
SKIN = 1
NOSE = 2
RIGHT_EYE = 3
LEFT_EYE = 4
RIGHT_BROW = 5
LEFT_BROW = 6
RIGHT_EAR = 7
LEFT_EAR = 8
MOUTH_INTERIOR = 9
TOP_LIP = 10
BOTTOM_LIP = 11
NECK = 12
HAIR = 13
BEARD = 14
CLOTHING = 15
GLASSES = 16
HEADWEAR = 17
FACEWEAR = 18
IGNORE = 255
"""
input_path = Path(input_dir)
faceset_path = input_path.parent / f'{input_path.name}.dfs'
# fs = lib_fm.Faceset(output_dbpath)
# for ufm in fs.iter_UFaceMark():
# uimg = fs.get_UImage_by_uuid( ufm.get_UImage_uuid() )
# img = uimg.get_image()
# cv2.imshow('', img)
# cv2.waitKey(0)
filepaths = lib_path.get_files_paths(input_path)[:100] #TODO
fs = lib_fm.Faceset(faceset_path)
fs.clear_db()
for filepath in lib_con.progress_bar_iterator(filepaths, 'Processing'):
if filepath.suffix == '.txt':
image_filepath = filepath.parent / f'{filepath.name.split("_")[0]}.png'
if not image_filepath.exists():
print(f'{image_filepath} does not exist, skipping')
img = lib_cv.imread(image_filepath)
H,W,C = img.shape
lmrks = []
for lmrk_line in filepath.read_text().split('\n'):
if len(lmrk_line) == 0:
continue
x, y = lmrk_line.split(' ')
x, y = float(x), float(y)
lmrks.append( (x,y) )
lmrks = np.array(lmrks[:68], np.float32) / (H,W)
flmrks = lib_fm.FLandmarks2D.create(lib_fm.ELandmarks2D.L68, lmrks)
uimg = lib_fm.UImage()
uimg.assign_image(img)
uimg.set_name(image_filepath.stem)
ufm = lib_fm.UFaceMark()
ufm.set_UImage_uuid(uimg.get_uuid())
ufm.set_FRect(flmrks.get_FRect())
ufm.add_FLandmarks2D(flmrks)
fs.add_UImage(uimg, format='png')
fs.add_UFaceMark(ufm)
fs.shrink()
fs.close()
import code
code.interact(local=dict(globals(), **locals()))
# seg_filepath = input_path / ( Path(image_filepath).stem + '_seg.png')
# if not seg_filepath.exists():
# raise ValueError(f'{seg_filepath} does not exist')
# erode_kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3,3))
# img_seg = lib_cv.imread(seg_filepath)
# if img_seg.ndim == 2:
# img_seg = img_seg[...,None]
# if img_seg.shape[-1] != 1:
# raise Exception(f'{seg_filepath} wrong mask file. Must be 1 channel.')
# seg_hair = img_seg.copy()
# seg_hair_inds = np.isin(img_seg, [13])
# seg_hair[~seg_hair_inds] = 0
# seg_hair[seg_hair_inds] = 255
# cv2.imshow('', img)
# cv2.waitKey(0)
# cv2.imshow('', seg_hair)
# cv2.waitKey(0)
# # fix 1-3 pix hair holes
# seg_hair = cv2.dilate(seg_hair, erode_kernel, iterations=3)
# seg_hair = cv2.erode(seg_hair, erode_kernel, iterations=3)
# cv2.imshow('', seg_hair)
# cv2.waitKey(0)
# img_seg_inds = np.isin(img_seg, [1,2,3,4,5,6,9,10,11,14])
# img_seg[~img_seg_inds] = 0
# img_seg[img_seg_inds] = 255
# import numpy as np
# import cv2
#
# from xlib import math as lib_math
# fs1 = lib_fm.Faceset(r'D:\\1.dfs')
# fs1.clear_db()
# uimg = lib_fm.UImage()
# uimg.assign_image( np.random.uniform(0, 255, size=(128,128,1) ).astype(np.uint8) )
# fs1.add_UImage(uimg, format='jp2', quality=30)
# uimg.assign_image( np.ones( shape=(128,128,1) ).astype(np.uint8) )
# #fs1.add_UImage(uimg, format='jp2', quality=30)
# up = lib_fm.UPerson()
# up.set_name('Man')
# up.set_age(13)
# fs1.add_UPerson(up)
# ufm = lib_fm.UFaceMark()
# ufm.set_UPerson_uuid(up.get_uuid())
# ufm.set_UImage_uuid(uimg.get_uuid())
# ufm.add_mask_info( lib_fm.EMaskType.UNDEFINED, uimg.get_uuid(), lib_math.Affine2DUniMat.identity() )
# fs1.add_UFaceMark(ufm)
# fs1.close()
# fs = lib_fm.Faceset(r'D:\\1.dfs')
# for uperson in fs.iter_UPerson():
# print(uperson)
# for ufm in fs.iter_UFaceMark():
# print(ufm)
# for uimg in fs.iter_UImage():
# cv2.imshow('', uimg.get_image())
# cv2.waitKey(0)
# print(uimg)
# import code
# code.interact(local=dict(globals(), **locals()))
# uimg2 = lib_fm.UImage()
# uimg2.assign_image( np.random.uniform(0, 255, size=(128,128,1) ).astype(np.uint8) )
# ufm = lib_fm.UFaceMark()
# ufm.set_UImage_uuid(uimg.get_uuid())
# fs.add_UFaceMark(ufm)
# fs.add_UImage(uimg, format='jp2', quality=30)
# fs.add_UImage(uimg2, format='jpg', quality=10)
# #print(fs.get_fimage_count())
# print( fs.get_UFaceMark_count() )
# import code
# code.interact(local=dict(globals(), **locals()))

View file

@ -0,0 +1,6 @@
from enum import IntEnum
class ELandmarks2D(IntEnum):
L5 = 0
L68 = 1
L468 = 2

View file

@ -0,0 +1,4 @@
from enum import IntEnum
class EMaskType(IntEnum):
UNDEFINED = 0

View file

@ -1,24 +1,20 @@
from enum import IntEnum
from typing import Tuple from typing import Tuple
import cv2 import cv2
import numpy as np import numpy as np
import numpy.linalg as npla import numpy.linalg as npla
from ..math import Affine2DMat, Affine2DUniMat from ..math import Affine2DMat, Affine2DUniMat
from .ELandmarks2D import ELandmarks2D
from .FRect import FRect
class FaceULandmarks: class FLandmarks2D:
"""
Describes 2D face landmarks in uniform coordinates
"""
class Type(IntEnum):
LANDMARKS_5 = 0
LANDMARKS_68 = 1
LANDMARKS_468 = 2
def __init__(self): def __init__(self):
self._type : FaceULandmarks.Type = None """
Describes 2D face landmarks in uniform float coordinates
"""
self._type : ELandmarks2D = None
self._ulmrks : np.ndarray = None self._ulmrks : np.ndarray = None
def __getstate__(self): def __getstate__(self):
@ -29,14 +25,13 @@ class FaceULandmarks:
self.__dict__.update(d) self.__dict__.update(d)
@staticmethod @staticmethod
def create( type : 'FaceULandmarks.Type', ulmrks : np.ndarray): def create( type : ELandmarks2D, ulmrks : np.ndarray):
""" """
ulmrks np.ndarray (*,2|3) ulmrks np.ndarray (*,2|3)
""" """
if not isinstance(type, FaceULandmarks.Type): if not isinstance(type, ELandmarks2D):
raise ValueError('type must be an FaceULandmarks.Type') raise ValueError('type must be ELandmarks2D')
ulmrks = np.float32(ulmrks) ulmrks = np.float32(ulmrks)
if len(ulmrks.shape) != 2: if len(ulmrks.shape) != 2:
@ -46,22 +41,22 @@ class FaceULandmarks:
raise ValueError('ulmrks dim must be == 2') raise ValueError('ulmrks dim must be == 2')
ulmrks_count = ulmrks.shape[0] ulmrks_count = ulmrks.shape[0]
if type == FaceULandmarks.Type.LANDMARKS_5: if type == ELandmarks2D.L5:
if ulmrks_count != 5: if ulmrks_count != 5:
raise ValueError('ulmrks_count must be == 5') raise ValueError('ulmrks_count must be == 5')
elif type == FaceULandmarks.Type.LANDMARKS_68: elif type == ELandmarks2D.L68:
if ulmrks_count != 68: if ulmrks_count != 68:
raise ValueError('ulmrks_count must be == 68') raise ValueError('ulmrks_count must be == 68')
elif type == FaceULandmarks.Type.LANDMARKS_468: elif type == ELandmarks2D.L468:
if ulmrks_count != 468: if ulmrks_count != 468:
raise ValueError('ulmrks_count must be == 468') raise ValueError('ulmrks_count must be == 468')
face_ulmrks = FaceULandmarks() face_ulmrks = FLandmarks2D()
face_ulmrks._type = type face_ulmrks._type = type
face_ulmrks._ulmrks = ulmrks face_ulmrks._ulmrks = ulmrks
return face_ulmrks return face_ulmrks
def get_type(self) -> 'FaceULandmarks.Type': return self._type def get_type(self) -> ELandmarks2D: return self._type
def get_count(self) -> int: return self._ulmrks.shape[0] def get_count(self) -> int: return self._ulmrks.shape[0]
def as_numpy(self, w_h = None): def as_numpy(self, w_h = None):
@ -76,9 +71,9 @@ class FaceULandmarks:
return ulmrks return ulmrks
def transform(self, mat, invert=False) -> 'FaceULandmarks': def transform(self, mat, invert=False) -> 'FLandmarks2D':
""" """
Tranforms FaceULandmarks using affine mat and returns new FaceULandmarks() Tranforms FLandmarks2D using affine mat and returns new FLandmarks2D()
mat : np.ndarray mat : np.ndarray
""" """
@ -93,9 +88,20 @@ class FaceULandmarks:
ulmrks = np.expand_dims(ulmrks, axis=1) ulmrks = np.expand_dims(ulmrks, axis=1)
ulmrks = cv2.transform(ulmrks, mat, ulmrks.shape).squeeze() ulmrks = cv2.transform(ulmrks, mat, ulmrks.shape).squeeze()
return FaceULandmarks.create(type=self._type, ulmrks=ulmrks) return FLandmarks2D.create(type=self._type, ulmrks=ulmrks)
def get_FRect(self, coverage=1.6) -> FRect:
"""
create FRect from landmarks with given coverage
"""
_, uni_mat = self.calc_cut( (1,1), coverage, 1, exclude_moving_parts=False)
xlt, xlb, xrb, xrt = uni_mat.invert().transform_points([[0,0], [0,1], [1,1], [1,0]])
l = min(xlt[0], xlb[0])
t = min(xlt[1], xrt[1])
r = max(xrt[0], xrb[0])
b = max(xlb[1], xrb[1])
return FRect.from_ltrb( (l,t,r,b) )
def calc_cut(self, w_h, coverage : float, output_size : int, def calc_cut(self, w_h, coverage : float, output_size : int,
exclude_moving_parts : bool, exclude_moving_parts : bool,
head_yaw : float = None, head_yaw : float = None,
@ -112,9 +118,9 @@ class FaceULandmarks:
lmrks = (self._ulmrks * w_h).astype(np.float32) lmrks = (self._ulmrks * w_h).astype(np.float32)
# estimate landmarks transform from global space to local aligned space with bounds [0..1] # estimate landmarks transform from global space to local aligned space with bounds [0..1]
if type == FaceULandmarks.Type.LANDMARKS_68: if type == ELandmarks2D.L68:
mat = Affine2DMat.umeyama( np.concatenate ([ lmrks[17:49] , lmrks[54:55] ]), uni_landmarks_68) mat = Affine2DMat.umeyama( np.concatenate ([ lmrks[17:49] , lmrks[54:55] ]), uni_landmarks_68)
elif type == FaceULandmarks.Type.LANDMARKS_468: elif type == ELandmarks2D.L468:
src_lmrks = lmrks src_lmrks = lmrks
dst_lmrks = uni_landmarks_468 dst_lmrks = uni_landmarks_468
if exclude_moving_parts: if exclude_moving_parts:
@ -1297,4 +1303,4 @@ uni_landmarks_468 = np.array(
# # time.sleep(1.0) # # time.sleep(1.0)
# import code # import code
# code.interact(local=dict(globals(), **locals())) # code.interact(local=dict(globals(), **locals()))

40
xlib/facemeta/FMask.py Normal file
View file

@ -0,0 +1,40 @@
import uuid
from typing import Union
from enum import IntEnum
class FMask:
class Type(IntEnum):
WHOLE_FACE = 0
def __init__(self, _from_pickled=False):
"""
"""
self._uuid : Union[bytes, None] = uuid.uuid4().bytes_le if not _from_pickled else None
self._mask_type : Union[FMask.Type, None] = None
self._FImage_uuid : Union[bytes, None] = None
def __getstate__(self):
return self.__dict__.copy()
def __setstate__(self, d):
self.__init__(_from_pickled=True)
self.__dict__.update(d)
def get_uuid(self) -> Union[bytes, None]: return self._uuid
def set_uuid(self, uuid : Union[bytes, None]):
if uuid is not None and not isinstance(uuid, bytes):
raise ValueError(f'uuid must be an instance of bytes or None')
self._uuid = uuid
def get_mask_type(self) -> Union['FMask.Type', None]: return self._mask_type
def set_mask_type(self, mask_type : Union['FMask.Type', None]):
if mask_type is not None and not isinstance(mask_type, 'FMask.Type'):
raise ValueError(f'mask_type must be an instance of FMask.Type or None')
self._mask_type = mask_type
def get_FImage_uuid(self) -> Union[bytes, None]: return self._FImage_uuid
def set_FImage_uuid(self, FImage_uuid : Union[bytes, None]):
if FImage_uuid is not None and not isinstance(FImage_uuid, bytes):
raise ValueError(f'FImage_uuid must be an instance of bytes or None')
self._FImage_uuid = FImage_uuid

View file

@ -3,7 +3,7 @@ import numpy as np
from .. import math as lib_math from .. import math as lib_math
class FacePose: class FPose:
""" """
Describes face pitch/yaw/roll Describes face pitch/yaw/roll
""" """
@ -33,7 +33,7 @@ class FacePose:
def from_radians(pitch, yaw, roll): def from_radians(pitch, yaw, roll):
""" """
""" """
face_rect = FacePose() face_rect = FPose()
face_rect._pyr = np.array([pitch, yaw, roll], np.float32) face_rect._pyr = np.array([pitch, yaw, roll], np.float32)
return face_rect return face_rect
@ -47,4 +47,4 @@ class FacePose:
mat[2,:] = np.cross(mat[0, :], mat[1, :]) mat[2,:] = np.cross(mat[0, :], mat[1, :])
pitch, yaw, roll = lib_math.rotation_matrix_to_euler(mat) pitch, yaw, roll = lib_math.rotation_matrix_to_euler(mat)
return FacePose.from_radians(pitch, yaw, roll) return FPose.from_radians(pitch, yaw, roll)

View file

@ -8,8 +8,7 @@ import numpy.linalg as npla
from .. import math as lib_math from .. import math as lib_math
from ..math import Affine2DMat, Affine2DUniMat from ..math import Affine2DMat, Affine2DUniMat
class FRect:
class FaceURect:
""" """
Describes face rectangle in uniform float coordinates Describes face rectangle in uniform float coordinates
""" """
@ -24,9 +23,9 @@ class FaceURect:
self.__dict__.update(d) self.__dict__.update(d)
@staticmethod @staticmethod
def sort_by_area_size(rects : List['FaceURect']): def sort_by_area_size(rects : List['FRect']):
""" """
sort list of FaceURect by largest area descend sort list of FRect by largest area descend
""" """
rects = [ (rect, rect.get_area()) for rect in rects ] rects = [ (rect, rect.get_area()) for rect in rects ]
rects = sorted(rects, key=operator.itemgetter(1), reverse=True ) rects = sorted(rects, key=operator.itemgetter(1), reverse=True )
@ -34,9 +33,9 @@ class FaceURect:
return rects return rects
@staticmethod @staticmethod
def sort_by_dist_from_center(rects : List['FaceURect']): def sort_by_dist_from_center(rects : List['FRect']):
""" """
sort list of FaceURect by nearest distance from center to center of rects descent sort list of FRect by nearest distance from center to center of rects descent
""" """
c = np.float32([0.5,0.5]) c = np.float32([0.5,0.5])
@ -48,7 +47,7 @@ class FaceURect:
@staticmethod @staticmethod
def from_4pts(pts : Iterable): def from_4pts(pts : Iterable):
""" """
Construct FaceURect from 4 pts Construct FRect from 4 pts
0--3 0--3
| | | |
1--2 1--2
@ -60,14 +59,14 @@ class FaceURect:
if pts.shape != (4,2): if pts.shape != (4,2):
raise ValueError('pts must have (4,2) shape') raise ValueError('pts must have (4,2) shape')
face_rect = FaceURect() face_rect = FRect()
face_rect._pts = pts face_rect._pts = pts
return face_rect return face_rect
@staticmethod @staticmethod
def from_ltrb(ltrb : Iterable): def from_ltrb(ltrb : Iterable):
""" """
Construct FaceURect from l,t,r,b list of float values Construct FRect from l,t,r,b list of float values
t t
l-|-r l-|-r
b b
@ -76,7 +75,7 @@ class FaceURect:
raise ValueError('ltrb must be Iterable') raise ValueError('ltrb must be Iterable')
l,t,r,b = ltrb l,t,r,b = ltrb
return FaceURect.from_4pts([ [l,t], [l,b], [r,b], [r,t] ]) return FRect.from_4pts([ [l,t], [l,b], [r,b], [r,t] ])
def get_area(self, w_h = None) -> float: def get_area(self, w_h = None) -> float:
@ -124,9 +123,9 @@ class FaceURect:
return self._pts * w_h return self._pts * w_h
return self._pts.copy() return self._pts.copy()
def transform(self, mat, invert=False) -> 'FaceURect': def transform(self, mat, invert=False) -> 'FRect':
""" """
Tranforms FaceURect using affine mat and returns new FaceURect() Tranforms FRect using affine mat and returns new FRect()
mat : np.ndarray should be uniform affine mat mat : np.ndarray should be uniform affine mat
""" """
@ -141,7 +140,7 @@ class FaceURect:
pts = np.expand_dims(pts, axis=1) pts = np.expand_dims(pts, axis=1)
pts = cv2.transform(pts, mat, pts.shape).squeeze() pts = cv2.transform(pts, mat, pts.shape).squeeze()
return FaceURect.from_4pts(pts) return FRect.from_4pts(pts)
def cut(self, img : np.ndarray, coverage : float, output_size : int) -> Tuple[Affine2DMat, Affine2DUniMat]: def cut(self, img : np.ndarray, coverage : float, output_size : int) -> Tuple[Affine2DMat, Affine2DUniMat]:
""" """

View file

@ -1,95 +1,237 @@
import pickle
import sqlite3
from pathlib import Path from pathlib import Path
from typing import Tuple from typing import Generator, List, Union
import cv2 import cv2
from .. import cv as lib_cv import numpy as np
from .. import path as lib_path
from ..io import FormattedFileIO
from .face import FaceMark from .FMask import FMask
from .UFaceMark import UFaceMark
from .UImage import UImage
from .UPerson import UPerson
class Faceset: class Faceset:
"""
Faceset is a class to store and manage multiple FaceMark()'s
arguments:
path path to directory
raises
Exception, ValueError
"""
def __init__(self, path): def __init__(self, path):
"""
Faceset is a class to store and manage face related data.
arguments:
path path to faceset .dfs file
"""
self._path = path = Path(path) self._path = path = Path(path)
if not path.is_dir():
raise ValueError('Path must be a directory.')
self.reload() if path.suffix != '.dfs':
raise ValueError('Path must be a .dfs file')
def reload(self): self._conn = conn = sqlite3.connect(path, isolation_level=None)
self._cur = cur = conn.cursor()
cur.execute('BEGIN IMMEDIATE')
if not self._is_table_exists('FacesetInfo'):
self.clear_db(transaction=False)
cur.execute('COMMIT')
def close(self):
self._cur.close()
self._cur = None
self._conn.close()
self._conn = None
def _is_table_exists(self, name):
return self._cur.execute(f"SELECT count(*) FROM sqlite_master WHERE type='table' AND name=?", [name]).fetchone()[0] != 0
def shrink(self):
self._cur.execute('VACUUM')
def clear_db(self, transaction=True):
""" """
reload the faceset delete all data and recreate DB
raises
Exception
""" """
self._is_packed = False cur = self._cur
filepaths = lib_path.get_files_paths(self._path) if transaction:
cur.execute('BEGIN IMMEDIATE')
face_filepaths = self._face_filepaths = set() for table_name, in cur.execute("SELECT name from sqlite_master where type = 'table';").fetchall():
cur.execute(f'DROP TABLE {table_name}')
for filepath in filepaths: (cur.execute('CREATE TABLE FacesetInfo (version INT)')
suffix = filepath.suffix .execute('INSERT INTO FacesetInfo VALUES (1)')
if suffix == '.face':
if self._is_packed:
raise Exception(f'{self._path} contains .faceset and .face but only one type is allowed.')
face_filepaths.add(filepath.name) .execute('CREATE TABLE UImage (uuid BLOB, name TEXT, format TEXT, data BLOB)')
elif suffix == '.faceset': .execute('CREATE TABLE UPerson (uuid BLOB, name TEXT, age NUMERIC)')
if self._is_packed: .execute('CREATE TABLE UFaceMark (uuid BLOB, UImage_uuid BLOB, UPerson_uuid BLOB, pickled_bytes BLOB)')
raise Exception(f'{self._path} contains more than one .faceset.') )
if len(face_filepaths) != 0: if transaction:
raise Exception(f'{self._path} contains .faceset and .face but only one type is allowed.') cur.execute('COMMIT')
self._is_packed = True
###################
def pack(self): ### UFaceMark
###################
def _UFaceMark_from_db_row(self, db_row) -> UFaceMark:
uuid, UImage_uuid, UPerson_uuid, pickled_bytes = db_row
return pickle.loads(pickled_bytes)
def add_UFaceMark(self, fm : UFaceMark):
""" """
Pack faceset. add or update UFaceMark in DB
""" """
pickled_bytes = pickle.dumps(fm)
uuid = fm.get_uuid()
UImage_uuid = fm.get_UImage_uuid()
UPerson_uuid = fm.get_UPerson_uuid()
def unpack(self): cur = self._cur
""" cur.execute('BEGIN IMMEDIATE')
unpack faceset. if cur.execute('SELECT COUNT(*) from UFaceMark where uuid=?', [uuid] ).fetchone()[0] != 0:
""" cur.execute('UPDATE UFaceMark SET UImage_uuid=?, UPerson_uuid=?, pickled_bytes=? WHERE uuid=?',
[UImage_uuid, UPerson_uuid, pickled_bytes, uuid])
def is_packed(self) -> bool: return self._is_packed
def get_face_count(self): return len(self._face_filepaths)
def get_faces(self) -> Tuple[FaceMark]:
"""
returns a tuple of FaceMark()'s
"""
def save_face(self, face : FaceMark):
filepath = self._path / (face.get_name() + '.face')
with FormattedFileIO(filepath, 'w+') as f:
f.write_pickled(face)
self._face_filepaths.add(filepath.name)
def save_image(self, name, img, type='jpg'):
filepath = self._path / (name + f'.{type}')
if type == 'jpg':
lib_cv.imwrite(filepath, img, [int(cv2.IMWRITE_JPEG_QUALITY), 100] )
else: else:
raise ValueError(f'Unknown type {type}') cur.execute('INSERT INTO UFaceMark VALUES (?, ?, ?, ?)', [uuid, UImage_uuid, UPerson_uuid, pickled_bytes])
cur.execute('COMMIT')
def get_UFaceMark_count(self) -> int:
return self._cur.execute('SELECT COUNT(*) FROM UFaceMark').fetchone()[0]
def get_all_UFaceMark(self) -> List[UFaceMark]:
return [ pickle.loads(pickled_bytes) for pickled_bytes, in self._cur.execute('SELECT pickled_bytes FROM UFaceMark').fetchall() ]
def iter_UFaceMark(self) -> Generator[UFaceMark, None, None]:
"""
returns Generator of UFaceMark
"""
for db_row in self._cur.execute('SELECT * FROM UFaceMark').fetchall():
yield self._UFaceMark_from_db_row(db_row)
def delete_all_UFaceMark(self):
"""
deletes all UFaceMark from DB
"""
(self._cur.execute('BEGIN IMMEDIATE')
.execute('DELETE FROM UFaceMark')
.execute('COMMIT') )
###################
### UPerson
###################
def add_UPerson(self, uperson : UPerson):
"""
add or update UPerson in DB
"""
uuid = uperson.get_uuid()
name = uperson.get_name()
age = uperson.get_age()
cur = self._conn.cursor()
cur.execute('BEGIN IMMEDIATE')
if cur.execute('SELECT COUNT(*) from UPerson where uuid=?', [uuid]).fetchone()[0] != 0:
cur.execute('UPDATE UPerson SET name=?, age=? WHERE uuid=?', [name, age, uuid])
else:
cur.execute('INSERT INTO UPerson VALUES (?, ?, ?)', [uuid, name, age])
cur.execute('COMMIT')
cur.close()
def iter_UPerson(self) -> Generator[UPerson, None, None]:
"""
iterator of all UPerson's
"""
for uuid, name, age in self._cur.execute('SELECT * FROM UPerson').fetchall():
uperson = UPerson()
uperson.set_uuid(uuid)
uperson.set_name(name)
uperson.set_age(age)
yield uperson
def delete_all_UPerson(self):
"""
deletes all UPerson from DB
"""
(self._cur.execute('BEGIN IMMEDIATE')
.execute('DELETE FROM UPerson')
.execute('COMMIT') )
###################
### UImage
###################
def _UImage_from_db_row(self, db_row) -> UImage:
uuid, name, format, data_bytes = db_row
img = cv2.imdecode(np.frombuffer(data_bytes, dtype=np.uint8), flags=cv2.IMREAD_UNCHANGED)
uimg = UImage()
uimg.set_uuid(uuid)
uimg.set_name(name)
uimg.assign_image(img)
return uimg
def add_UImage(self, uimage : UImage, format : str = 'webp', quality : int = 100):
"""
add or update UImage in DB
uimage UImage object
format('png') webp ( does not support lossless on 100 quality ! )
png ( lossless )
jpg
jp2 ( jpeg2000 )
quality(100) 0-100 for formats jpg,jp2,webp
"""
if format not in ['webp','png', 'jpg', 'jp2']:
raise ValueError(f'format {format} is unsupported')
if format in ['jpg','jp2'] and quality < 0 or quality > 100:
raise ValueError('quality must be in range [0..100]')
img = uimage.get_image()
uuid = uimage.get_uuid()
if format == 'webp':
imencode_args = [int(cv2.IMWRITE_WEBP_QUALITY), quality]
elif format == 'jpg':
imencode_args = [int(cv2.IMWRITE_JPEG_QUALITY), quality]
elif format == 'jp2':
imencode_args = [int(cv2.IMWRITE_JPEG2000_COMPRESSION_X1000), quality*10]
else:
imencode_args = []
ret, data_bytes = cv2.imencode( f'.{format}', img, imencode_args)
if not ret:
raise Exception(f'Unable to encode image format {format}')
cur = self._cur
cur.execute('BEGIN IMMEDIATE')
if cur.execute('SELECT COUNT(*) from UImage where uuid=?', [uuid] ).fetchone()[0] != 0:
cur.execute('UPDATE UImage SET name=?, format=?, data=? WHERE uuid=?', [uimage.get_name(), format, data_bytes.data, uuid])
else:
cur.execute('INSERT INTO UImage VALUES (?, ?, ?, ?)', [uuid, uimage.get_name(), format, data_bytes.data])
cur.execute('COMMIT')
def get_UImage_count(self) -> int: return self._cur.execute('SELECT COUNT(*) FROM UImage').fetchone()[0]
def get_UImage_by_uuid(self, uuid : bytes) -> Union[UImage, None]:
"""
"""
db_row = self._cur.execute('SELECT * FROM UImage where uuid=?', [uuid]).fetchone()
if db_row is None:
return None
return self._UImage_from_db_row(db_row)
def iter_UImage(self) -> Generator[UImage, None, None]:
"""
iterator of all UImage's
"""
for db_row in self._cur.execute('SELECT * FROM UImage').fetchall():
yield self._UImage_from_db_row(db_row)
def delete_all_UImage(self):
"""
deletes all UImage from DB
"""
(self._cur.execute('BEGIN IMMEDIATE')
.execute('DELETE FROM UImage')
.execute('COMMIT') )

View file

@ -0,0 +1,99 @@
import uuid
from typing import List, Tuple, Union
from ..math import Affine2DMat
from .ELandmarks2D import ELandmarks2D
from .EMaskType import EMaskType
from .FLandmarks2D import FLandmarks2D
from .FPose import FPose
from .FRect import FRect
class UFaceMark:
def __init__(self, _from_pickled=False):
"""
Describes single face in the image.
"""
self._uuid : Union[bytes, None] = uuid.uuid4().bytes_le if not _from_pickled else None
self._UImage_uuid : Union[bytes, None] = None
self._UPerson_uuid : Union[bytes, None] = None
self._FRect : Union[FRect, None] = None
self._FLandmarks2D_list : List[FLandmarks2D] = []
self._FPose : Union[FPose, None] = None
self._mask_info_list : List = []
def __getstate__(self):
return self.__dict__.copy()
def __setstate__(self, d):
self.__init__(_from_pickled=True)
self.__dict__.update(d)
def __str__(self):
s = "Masks: "
return f"UFaceMark UUID:[...{self._uuid[-4:].hex()}]"
def __repr__(self): return self.__str__()
def get_uuid(self) -> Union[bytes, None]: return self._uuid
def set_uuid(self, uuid : Union[bytes, None]):
if uuid is not None and not isinstance(uuid, bytes):
raise ValueError(f'uuid must be an instance of bytes or None')
self._uuid = uuid
def get_UImage_uuid(self) -> Union[bytes, None]: return self._UImage_uuid
def set_UImage_uuid(self, UImage_uuid : Union[bytes, None]):
if UImage_uuid is not None and not isinstance(UImage_uuid, bytes):
raise ValueError(f'UImage_uuid must be an instance of bytes or None')
self._UImage_uuid = UImage_uuid
def get_UPerson_uuid(self) -> Union[bytes, None]: return self._UPerson_uuid
def set_UPerson_uuid(self, UPerson_uuid : Union[bytes, None]):
if UPerson_uuid is not None and not isinstance(UPerson_uuid, bytes):
raise ValueError(f'UPerson_uuid must be an instance of bytes or None')
self._UPerson_uuid = UPerson_uuid
def get_FRect(self) -> Union['FRect', None]: return self._FRect
def set_FRect(self, face_urect : Union['FRect', None]):
if face_urect is not None and not isinstance(face_urect, FRect):
raise ValueError(f'face_urect must be an instance of FRect or None')
self._FRect = face_urect
def get_all_FLandmarks2D(self) -> List[FLandmarks2D]: return self._FLandmarks2D_list
def get_FLandmarks2D_by_type(self, type : ELandmarks2D) -> Union[FLandmarks2D, None]:
"""get FLandmarks2D from list by type"""
if not isinstance(type, ELandmarks2D):
raise ValueError(f'type must be an instance of ELandmarks2D')
for ulmrks in self._FLandmarks2D_list:
if ulmrks.get_type() == type:
return ulmrks
return None
def add_FLandmarks2D(self, flmrks : FLandmarks2D):
if not isinstance(flmrks, FLandmarks2D):
raise ValueError('flmrks must be an instance of FLandmarks2D')
if self.get_FLandmarks2D_by_type(flmrks.get_type()) is not None:
raise Exception(f'_FLandmarks2D_list already contains type {flmrks.get_type()}.')
self._FLandmarks2D_list.append(flmrks)
def get_FPose(self) -> Union[FPose, None]: return self._FPose
def set_FPose(self, face_pose : FPose):
if not isinstance(face_pose, FPose):
raise ValueError('face_pose must be an instance of FPose')
self._FPose = face_pose
def get_mask_info_list(self) -> List[Tuple[EMaskType, bytes, Affine2DMat]]:
return self._mask_info_list
def add_mask_info(self, mask_type : EMaskType, UImage_uuid : bytes, mask_to_mark_uni_mat : Affine2DMat):
if not isinstance(mask_type, EMaskType):
raise ValueError('mask_type must be an instance of EMaskType')
if not isinstance(UImage_uuid, bytes):
raise ValueError('UImage_uuid must be an instance of bytes')
if not isinstance(mask_to_mark_uni_mat, Affine2DMat):
raise ValueError('mask_to_mark_uni_mat must be an instance of Affine2DMat')
self._mask_info_list.append( (mask_type, UImage_uuid, mask_to_mark_uni_mat) )

59
xlib/facemeta/UImage.py Normal file
View file

@ -0,0 +1,59 @@
import uuid
from typing import Union
import numpy as np
class UImage:
def __init__(self):
"""
represents uncompressed image uint8 HWC ( 1/3/4 channels )
"""
self._uuid : Union[bytes, None] = uuid.uuid4().bytes_le
self._name : Union[str, None] = None
self._image : np.ndarray = None
def __getstate__(self):
return self.__dict__.copy()
def __setstate__(self, d):
self.__init__()
self.__dict__.update(d)
def __str__(self): return f"UImage UUID:[...{self._uuid[-4:].hex()}] name:[{self._name}] image:[{ (self._image.shape, self._image.dtype) if self._image is not None else None}]"
def __repr__(self): return self.__str__()
def get_uuid(self) -> Union[bytes, None]: return self._uuid
def set_uuid(self, uuid : Union[bytes, None]):
if uuid is not None and not isinstance(uuid, bytes):
raise ValueError(f'uuid must be an instance of bytes or None')
self._uuid = uuid
def get_name(self) -> Union[str, None]: return self._name
def set_name(self, name : Union[str, None]):
if name is not None and not isinstance(name, str):
raise ValueError(f'name must be an instance of str or None')
self._name = name
def get_image(self) -> Union[np.ndarray, None]: return self._image
def assign_image(self, image : Union[np.ndarray, None]):
"""
assign np.ndarray image , or remove(None)
It's mean you should not to change provided image nd.array after assigning, or do the copy before.
Image must be uint8 and HWC 1/3/4 channels.
"""
if image is not None:
if image.ndim == 2:
image = image[...,None]
if image.ndim != 3:
raise ValueError('image must have ndim == 3')
_,_,C = image.shape
if C not in [1,3,4]:
raise ValueError('image channels must be 1,3,4')
if image.dtype != np.uint8:
raise ValueError('image dtype must be uint8')
self._image = image

39
xlib/facemeta/UPerson.py Normal file
View file

@ -0,0 +1,39 @@
import uuid
from typing import Union
class UPerson:
def __init__(self, _from_pickled=False):
"""
"""
self._uuid : Union[bytes, None] = uuid.uuid4().bytes_le if not _from_pickled else None
self._name : Union[str, None] = None
self._age : Union[int, None] = None
def __getstate__(self):
return self.__dict__.copy()
def __setstate__(self, d):
self.__init__(_from_pickled=True)
self.__dict__.update(d)
def __str__(self): return f"UPerson UUID:[...{self._uuid[-4:].hex()}] name:[{self._name}] age:[{self._age}]"
def __repr__(self): return self.__str__()
def get_uuid(self) -> Union[bytes, None]: return self._uuid
def set_uuid(self, uuid : Union[bytes, None]):
if uuid is not None and not isinstance(uuid, bytes):
raise ValueError(f'uuid must be an instance of bytes or None')
self._uuid = uuid
def get_name(self) -> Union[str, None]: return self._name
def set_name(self, name : Union[str, None]):
if name is not None and not isinstance(name, str):
raise ValueError(f'name must be an instance of str or None')
self._name = name
def get_age(self) -> Union[str, None]: return self._age
def set_age(self, age : Union[int, None]):
if age is not None and not isinstance(age, int):
raise ValueError(f'age must be an instance of int or None')
self._age = age

View file

@ -1,48 +1,61 @@
""" """
facemeta facemeta library
Trying to standartize face description. Contains classes for effectively storing, manage, and transfering all face related data.
It is not perfect structure, but enough for current tasks. All classes are picklable and expandable.
All classes have noneable members accessed via get/set. No properties.
FaceURect and FaceULandmarks mean uniform coordinates in order to apply them to any resolution. E-classes are enums.
U-classes are unique, have uuid and can be saved in Faceset.
Overall structure: ELandmarks2D L5
L68
L468
EMaskType UNDEFINED, ..., ...
FaceMark - (mean single face data referencing any image) FRect rectangle of the face in uniform float coordinates
.image_name - image reference
.person_name - optional name of person FLandmarks2D 2D landmarks of the face in uniform float coordinates
.FaceURect - a rectangle of the face in source image space
.list[FaceULandmarks] - a list of unique types of landmarks of the face in source image space FPose pitch/yaw/roll values
types:
LANDMARKS_5 UPerson - person info
LANDMARKS_68 .uuid
LANDMARKS_468 .name
.age
.FaceAlign - an aligned face from FaceMark
UImage - image
.image_name - image reference .uuid
.person_name - optional name of person .name
.coverage - coverage value used to align .data (H,W,C 1/3/4 ) of uint8[0..255]
.source_source_face_ulandmarks_type - type of FaceULandmarks from which this FaceAlign was produced
.source_to_aligned_uni_mat - uniform AffineMat to FaceMark image space to FaceAlign image space
.FaceURect - a rectangle of the face in aligned image space
.list[FaceULandmarks] - a list of unique types of landmarks of the face in aligned image space
.FaceMask - grayscale image to mask the face in FaceAlign image space
.image_name - image reference
.FaceSwap - face image of other person in the same as FaceAlign image space
.image_name - image reference
.person_name - optional name of person
.FaceMask - grayscale image to mask the swapped face in FaceSwap image space
.image_name
UFaceMark - face mark info referencing UImage from which the face was detected
.uuid
.UImage_uuid - reference to FImage
.UPerson_uuid - reference to FPerson
.FRect
.List[FLandmarks2D]
.FPose
.List[ (EMaskType, FImage_uuid, uni_mat) ] - list of FMask and AffineMat to transform mask image space to UFaceMark image space
Faceset
.List[UImage]
.List[UFaceMark]
.List[UPerson]
""" """
from .face import FaceMark, FaceAlign, FaceSwap, FaceMask, FaceURect, FaceULandmarks, FacePose from .ELandmarks2D import ELandmarks2D
from .EMaskType import EMaskType
from .Faceset import Faceset from .Faceset import Faceset
from .UImage import UImage
from .FLandmarks2D import FLandmarks2D
from .UFaceMark import UFaceMark
from .FMask import FMask
from .UPerson import UPerson
from .FPose import FPose
from .FRect import FRect

View file

@ -1,211 +0,0 @@
from typing import List, Union
from .. import math as lib_math
from .FaceULandmarks import FaceULandmarks
from .FaceURect import FaceURect
from .FacePose import FacePose
class _part_picklable_expandable:
def __getstate__(self):
return self.__dict__.copy()
def __setstate__(self, d):
self.__init__()
self.__dict__.update(d)
class _part_image_name:
def __init__(self):
self._image_name : Union[str, None] = None
def get_image_name(self) -> Union[str, None]: return self._image_name
def set_image_name(self, image_name : Union[str, None]):
if image_name is not None and not isinstance(image_name, str):
raise ValueError(f'image_name must be an instance of str or None')
self._image_name = image_name
class _part_person_name:
def __init__(self):
self._person_name : Union[str, None] = None
def get_person_name(self) -> Union[str, None]: return self._person_name
def set_person_name(self, person_name : Union[str, None]):
if person_name is not None and not isinstance(person_name, str):
raise ValueError(f'person_name must be an instance of str or None')
self._person_name = person_name
class _part_face_align:
def __init__(self):
self._face_align : Union['FaceAlign', None] = None
def get_face_align(self) -> Union['FaceAlign', None]: return self._face_align
def set_face_align(self, face_align : 'FaceAlign'):
"""add FaceAlign"""
if not isinstance(face_align, FaceAlign):
raise ValueError('face_align must be an instance of FaceAlign')
self._face_align = face_align
class _part_face_urect:
def __init__(self):
self._face_urect : Union[FaceURect, None] = None
def get_face_urect(self) -> Union['FaceURect', None]:
"""get uniform FaceURect"""
return self._face_urect
def set_face_urect(self, face_urect : Union['FaceURect', None]):
if face_urect is not None and not isinstance(face_urect, FaceURect):
raise ValueError(f'face_urect must be an instance of FaceURect or None')
self._face_urect = face_urect
class _part_face_ulandmarks_list:
def __init__(self):
self._face_ulmrks_list : List[FaceULandmarks] = []
def get_face_ulandmarks_list(self) -> List['FaceULandmarks']: return self._face_ulmrks_list
def get_face_ulandmarks_by_type(self, type : 'FaceULandmarks.Type') -> Union['FaceULandmarks', None]:
"""get FaceULandmarks from list by type"""
if not isinstance(type, FaceULandmarks.Type):
raise ValueError(f'type must be an instance of FaceULandmarks.Type')
for ulmrks in self._face_ulmrks_list:
if ulmrks.get_type() == type:
return ulmrks
return None
def add_face_ulandmarks(self, face_ulmrks : 'FaceULandmarks'):
if not isinstance(face_ulmrks, FaceULandmarks):
raise ValueError('face_ulmrks must be an instance of FaceULandmarks')
if self.get_face_ulandmarks_by_type(face_ulmrks.get_type()) is not None:
raise Exception(f'_face_ulmrks_list already contains type {face_ulmrks.get_type()}.')
self._face_ulmrks_list.append(face_ulmrks)
class _part_source_source_face_ulandmarks_type:
def __init__(self):
self._source_face_ulandmarks_type : Union[FaceULandmarks.Type, None] = None
def get_source_face_ulandmarks_type(self) -> Union[FaceULandmarks.Type, None]: return self._source_face_ulandmarks_type
def set_source_face_ulandmarks_type(self, source_face_ulandmarks_type : Union[FaceULandmarks.Type, None]):
if source_face_ulandmarks_type is not None and not isinstance(source_face_ulandmarks_type, FaceULandmarks.Type):
raise ValueError('source_face_ulandmarks_type must be an instance of FaceULandmarks.Type')
self._source_face_ulandmarks_type = source_face_ulandmarks_type
class _part_coverage:
def __init__(self):
self._coverage : Union[float, None] = None
def get_coverage(self) -> Union[float, None]: return self._coverage
def set_coverage(self, coverage : Union[float, None]):
if coverage is not None and not isinstance(coverage, float):
raise ValueError('coverage must be an instance of float')
self._coverage = coverage
class _part_source_to_aligned_uni_mat:
def __init__(self):
self._source_to_aligned_uni_mat : Union[lib_math.Affine2DUniMat, None] = None
def get_source_to_aligned_uni_mat(self) -> Union[lib_math.Affine2DUniMat, None]: return self._source_to_aligned_uni_mat
def set_source_to_aligned_uni_mat(self, source_to_aligned_uni_mat : Union[lib_math.Affine2DUniMat, None]):
if source_to_aligned_uni_mat is not None and not isinstance(source_to_aligned_uni_mat, lib_math.Affine2DUniMat):
raise ValueError('source_to_aligned_uni_mat must be an instance of lib_math.Affine2DUniMat')
self._source_to_aligned_uni_mat = source_to_aligned_uni_mat
class _part_face_mask:
def __init__(self):
self._face_mask : Union['FaceMask', None] = None
def get_face_mask(self) -> Union['FaceMask', None]: return self._face_mask
def set_face_mask(self, face_mask : 'FaceMask'):
if not isinstance(face_mask, FaceMask):
raise ValueError('face_mask must be an instance of FaceMask')
self._face_mask = face_mask
class _part_face_swap:
def __init__(self):
self._face_swap : Union['FaceSwap', None] = None
def get_face_swap(self) -> Union['FaceSwap', None]: return self._face_swap
def set_face_swap(self, face_swap : 'FaceSwap'):
if not isinstance(face_swap, FaceSwap):
raise ValueError('face_swap must be an instance of FaceSwap')
self._face_swap = face_swap
class _part_face_pose:
def __init__(self):
self._face_pose : Union['FacePose', None] = None
def get_face_pose(self) -> Union['FacePose', None]: return self._face_pose
def set_face_pose(self, face_pose : 'FacePose'):
if not isinstance(face_pose, FacePose):
raise ValueError('face_pose must be an instance of FacePose')
self._face_pose = face_pose
class FaceMark(_part_picklable_expandable,
_part_image_name,
_part_person_name,
_part_face_urect,
_part_face_ulandmarks_list,
_part_face_align,
_part_face_pose,
):
"""
Describes meta data of single face.
must not contain any images or large buffers, only references or filenames of them.
"""
def __init__(self):
_part_image_name.__init__(self)
_part_person_name.__init__(self)
_part_face_urect.__init__(self)
_part_face_ulandmarks_list.__init__(self)
_part_face_align.__init__(self)
_part_face_pose.__init__(self)
class FaceAlign(_part_picklable_expandable,
_part_image_name,
_part_person_name,
_part_face_urect,
_part_face_ulandmarks_list,
_part_coverage,
_part_source_source_face_ulandmarks_type,
_part_source_to_aligned_uni_mat,
_part_face_mask,
_part_face_swap,
):
def __init__(self):
_part_image_name.__init__(self)
_part_person_name.__init__(self)
_part_coverage.__init__(self)
_part_source_source_face_ulandmarks_type.__init__(self)
_part_source_to_aligned_uni_mat.__init__(self)
_part_face_urect.__init__(self)
_part_face_ulandmarks_list.__init__(self)
_part_face_mask.__init__(self)
_part_face_swap.__init__(self)
class FaceSwap(_part_picklable_expandable,
_part_image_name,
_part_person_name,
_part_face_mask,
):
def __init__(self):
_part_image_name.__init__(self)
_part_person_name.__init__(self)
_part_face_mask.__init__(self)
class FaceMask(_part_picklable_expandable,
_part_image_name,
):
def __init__(self):
_part_image_name.__init__(self)