diff --git a/apps/DeepFaceLive/backend/FaceMerger.py b/apps/DeepFaceLive/backend/FaceMerger.py index 847def3..a1dae9a 100644 --- a/apps/DeepFaceLive/backend/FaceMerger.py +++ b/apps/DeepFaceLive/backend/FaceMerger.py @@ -234,13 +234,12 @@ class FaceMergerWorker(BackendWorker): face_mask = ImageProcessor(face_mask).erode_blur(state.face_mask_erode, state.face_mask_blur, fade_to_border=True).get_image('HWC') frame_face_mask = ImageProcessor(face_mask).warp_affine(aligned_to_source_uni_mat, frame_width, frame_height).clip2( (1.0/255.0), 0.0, 1.0, 1.0).get_image('HWC') - face_swap_img = ImageProcessor(face_swap_img).to_ufloat32().get_image('HWC') + face_swap_ip = ImageProcessor(face_swap_img).to_ufloat32() - if state.color_transfer == 'rct': - face_align_img = ImageProcessor(face_align_img).to_ufloat32().get_image('HWC') - face_swap_img = lib_ct.rct(face_swap_img, face_align_img, target_mask=face_mask, source_mask=face_mask) + if state.color_transfer == 'rct': + face_swap_img = face_swap_ip.rct(like=face_align_img, mask=face_mask, like_mask=face_mask) - frame_face_swap_img = ImageProcessor(face_swap_img).warp_affine(aligned_to_source_uni_mat, frame_width, frame_height, interpolation=interpolation).get_image('HWC') + frame_face_swap_img = face_swap_ip.warp_affine(aligned_to_source_uni_mat, frame_width, frame_height, interpolation=interpolation).get_image('HWC') # Combine final frame opacity = np.float32(state.face_opacity) diff --git a/xlib/image/ImageProcessor.py b/xlib/image/ImageProcessor.py index 8e8a1ea..513b8cf 100644 --- a/xlib/image/ImageProcessor.py +++ b/xlib/image/ImageProcessor.py @@ -499,7 +499,37 @@ class ImageProcessor: self._img = img self.to_dtype(dtype) return self - + + def to_lab(self) -> 'ImageProcessor': + """ + """ + img = self._img + N,H,W,C = img.shape + if C != 3: + raise Exception('Image channels must be == 3') + + img = img.reshape( (N*H,W,C) ) + img = cv2.cvtColor(img, cv2.COLOR_BGR2LAB) + img = img.reshape( (N,H,W,C) ) + + self._img = img + return self + + def from_lab(self) -> 'ImageProcessor': + """ + """ + img = self._img + N,H,W,C = img.shape + if C != 3: + raise Exception('Image channels must be == 3') + + img = img.reshape( (N*H,W,C) ) + img = cv2.cvtColor(img, cv2.COLOR_LAB2BGR) + img = img.reshape( (N,H,W,C) ) + + self._img = img + return self + def jpeg_recompress(self, quality : int, mask = None ) -> 'ImageProcessor': """ quality 0-100 @@ -532,6 +562,88 @@ class ImageProcessor: self.to_dtype(dtype) return self + + def patch_to_batch(self, patch_size : int) -> 'ImageProcessor': + img = self._img + + N,H,W,C = img.shape + OH, OW = H // patch_size, W // patch_size + + img = img.reshape(N,OH,patch_size,OW,patch_size,C) + img = img.transpose(0,2,4,1,3,5) + img = img.reshape(N*patch_size*patch_size,OH,OW,C) + self._img = img + + return self + + def patch_from_batch(self, patch_size : int) -> 'ImageProcessor': + img = self._img + + N,H,W,C = img.shape + ON = N//(patch_size*patch_size) + img = img.reshape(ON,patch_size,patch_size,H,W,C ) + img = img.transpose(0,3,1,4,2,5) + img = img.reshape(ON,H*patch_size,W*patch_size,C ) + self._img = img + + return self + + def rct(self, like : np.ndarray, mask : np.ndarray = None, like_mask : np.ndarray = None, mask_cutoff=0.5) -> 'ImageProcessor': + """ + Transfer color using rct method. + + like np.ndarray [N][HW][3C] np.uint8/np.float32 + + mask(None) np.ndarray [N][HW][1C] np.uint8/np.float32 + like_mask(None) np.ndarray [N][HW][1C] np.uint8/np.float32 + + mask_cutoff(0.5) float + + masks are used to limit the space where color statistics will be computed to adjust the image + + reference: Color Transfer between Images https://www.cs.tau.ac.il/~turkel/imagepapers/ColorTransfer.pdf + """ + dtype = self.get_dtype() + + self.to_ufloat32() + self.to_lab() + + like_for_stat = ImageProcessor(like).to_ufloat32().to_lab().get_image('NHWC') + if like_mask is not None: + like_mask = ImageProcessor(like_mask).to_ufloat32().ch(1).get_image('NHW') + like_for_stat = like_for_stat.copy() + like_for_stat[like_mask < mask_cutoff] = [0,0,0] + + img_for_stat = img = self._img + if mask is not None: + mask = ImageProcessor(mask).to_ufloat32().ch(1).get_image('NHW') + img_for_stat = img_for_stat.copy() + img_for_stat[mask < mask_cutoff] = [0,0,0] + + source_l_mean, source_l_std, source_a_mean, source_a_std, source_b_mean, source_b_std, \ + = img_for_stat[...,0].mean((1,2), keepdims=True), img_for_stat[...,0].std((1,2), keepdims=True), img_for_stat[...,1].mean((1,2), keepdims=True), img_for_stat[...,1].std((1,2), keepdims=True), img_for_stat[...,2].mean((1,2), keepdims=True), img_for_stat[...,2].std((1,2), keepdims=True) + + like_l_mean, like_l_std, like_a_mean, like_a_std, like_b_mean, like_b_std, \ + = like_for_stat[...,0].mean((1,2), keepdims=True), like_for_stat[...,0].std((1,2), keepdims=True), like_for_stat[...,1].mean((1,2), keepdims=True), like_for_stat[...,1].std((1,2), keepdims=True), like_for_stat[...,2].mean((1,2), keepdims=True), like_for_stat[...,2].std((1,2), keepdims=True) + + # not as in the paper: scale by the standard deviations using reciprocal of paper proposed factor + source_l = img[...,0] + source_l = ne.evaluate('(source_l - source_l_mean) * like_l_std / source_l_std + like_l_mean') + + source_a = img[...,1] + source_a = ne.evaluate('(source_a - source_a_mean) * like_a_std / source_a_std + like_a_mean') + + source_b = img[...,2] + source_b = ne.evaluate('(source_b - source_b_mean) * like_b_std / source_b_std + like_b_mean') + + np.clip(source_l, 0, 100, out=source_l) + np.clip(source_a, -127, 127, out=source_a) + np.clip(source_b, -127, 127, out=source_b) + + self._img = np.stack([source_l,source_a,source_b], -1) + self.from_lab() + self.to_dtype(dtype) + return self def rotate90(self) -> 'ImageProcessor': self._img = np.rot90(self._img, k=1, axes=(1,2) )