DFLIMG refactoring

This commit is contained in:
Colombo 2020-03-21 01:18:15 +04:00
parent a9b23e9851
commit efe3b56683
12 changed files with 175 additions and 547 deletions

View file

@ -10,7 +10,8 @@ from facelib import FaceType
class DFLJPG(object):
def __init__(self):
def __init__(self, filename):
self.filename = filename
self.data = b""
self.length = 0
self.chunks = []
@ -29,7 +30,7 @@ class DFLJPG(object):
raise FileNotFoundError(filename)
try:
inst = DFLJPG()
inst = DFLJPG(filename)
inst.data = data
inst.length = len(data)
inst_length = inst.length
@ -123,7 +124,7 @@ class DFLJPG(object):
def load(filename, loader_func=None):
try:
inst = DFLJPG.load_raw (filename, loader_func=loader_func)
inst.dfl_dict = None
inst.dfl_dict = {}
for chunk in inst.chunks:
if chunk['name'] == 'APP0':
@ -145,19 +146,6 @@ class DFLJPG(object):
if type(chunk['data']) == bytes:
inst.dfl_dict = pickle.loads(chunk['data'])
if (inst.dfl_dict is not None):
if 'face_type' not in inst.dfl_dict:
inst.dfl_dict['face_type'] = FaceType.toString (FaceType.FULL)
if 'fanseg_mask' in inst.dfl_dict:
fanseg_mask = inst.dfl_dict['fanseg_mask']
if fanseg_mask is not None:
numpyarray = np.asarray( inst.dfl_dict['fanseg_mask'], dtype=np.uint8)
inst.dfl_dict['fanseg_mask'] = cv2.imdecode(numpyarray, cv2.IMREAD_UNCHANGED)
if inst.dfl_dict == None:
return None
return inst
except Exception as e:
print (e)
@ -166,7 +154,7 @@ class DFLJPG(object):
@staticmethod
def embed_dfldict(filename, dfl_dict):
inst = DFLJPG.load_raw (filename)
inst.setDFLDictData (dfl_dict)
inst.set_dict (dfl_dict)
try:
with open(filename, "wb") as f:
@ -174,104 +162,42 @@ class DFLJPG(object):
except:
raise Exception( 'cannot save %s' % (filename) )
@staticmethod
def embed_data(filename, face_type=None,
landmarks=None,
ie_polys=None,
seg_ie_polys=None,
source_filename=None,
source_rect=None,
source_landmarks=None,
image_to_face_mat=None,
fanseg_mask=None,
eyebrows_expand_mod=None,
relighted=None,
**kwargs
):
def has_data(self):
return len(self.dfl_dict.keys()) != 0
if fanseg_mask is not None:
fanseg_mask = np.clip ( (fanseg_mask*255).astype(np.uint8), 0, 255 )
ret, buf = cv2.imencode( '.jpg', fanseg_mask, [int(cv2.IMWRITE_JPEG_QUALITY), 85] )
if ret and len(buf) < 60000:
fanseg_mask = buf
else:
io.log_err("Unable to encode fanseg_mask for %s" % (filename) )
fanseg_mask = None
if ie_polys is not None:
if not isinstance(ie_polys, list):
ie_polys = ie_polys.dump()
if seg_ie_polys is not None:
if not isinstance(seg_ie_polys, list):
seg_ie_polys = seg_ie_polys.dump()
DFLJPG.embed_dfldict (filename, {'face_type': face_type,
'landmarks': landmarks,
'ie_polys' : ie_polys,
'seg_ie_polys' : seg_ie_polys,
'source_filename': source_filename,
'source_rect': source_rect,
'source_landmarks': source_landmarks,
'image_to_face_mat': image_to_face_mat,
'fanseg_mask' : fanseg_mask,
'eyebrows_expand_mod' : eyebrows_expand_mod,
'relighted' : relighted
})
def embed_and_set(self, filename, face_type=None,
landmarks=None,
ie_polys=None,
seg_ie_polys=None,
source_filename=None,
source_rect=None,
source_landmarks=None,
image_to_face_mat=None,
fanseg_mask=None,
eyebrows_expand_mod=None,
relighted=None,
**kwargs
):
if face_type is None: face_type = self.get_face_type()
if landmarks is None: landmarks = self.get_landmarks()
if ie_polys is None: ie_polys = self.get_ie_polys()
if seg_ie_polys is None: seg_ie_polys = self.get_seg_ie_polys()
if source_filename is None: source_filename = self.get_source_filename()
if source_rect is None: source_rect = self.get_source_rect()
if source_landmarks is None: source_landmarks = self.get_source_landmarks()
if image_to_face_mat is None: image_to_face_mat = self.get_image_to_face_mat()
if fanseg_mask is None: fanseg_mask = self.get_fanseg_mask()
if eyebrows_expand_mod is None: eyebrows_expand_mod = self.get_eyebrows_expand_mod()
if relighted is None: relighted = self.get_relighted()
DFLJPG.embed_data (filename, face_type=face_type,
landmarks=landmarks,
ie_polys=ie_polys,
seg_ie_polys=seg_ie_polys,
source_filename=source_filename,
source_rect=source_rect,
source_landmarks=source_landmarks,
image_to_face_mat=image_to_face_mat,
fanseg_mask=fanseg_mask,
eyebrows_expand_mod=eyebrows_expand_mod,
relighted=relighted)
def remove_ie_polys(self):
self.dfl_dict['ie_polys'] = None
def remove_seg_ie_polys(self):
self.dfl_dict['seg_ie_polys'] = None
def remove_fanseg_mask(self):
self.dfl_dict['fanseg_mask'] = None
def remove_source_filename(self):
self.dfl_dict['source_filename'] = None
def save(self):
try:
with open(self.filename, "wb") as f:
f.write ( self.dump() )
except:
raise Exception( f'cannot save {self.filename}' )
def dump(self):
data = b""
dict_data = self.dfl_dict
for key in list(dict_data.keys()):
if dict_data[key] is None:
dict_data.pop(key)
for chunk in self.chunks:
if chunk['name'] == 'APP15':
self.chunks.remove(chunk)
break
last_app_chunk = 0
for i, chunk in enumerate (self.chunks):
if chunk['m_h'] & 0xF0 == 0xE0:
last_app_chunk = i
dflchunk = {'name' : 'APP15',
'm_h' : 0xEF,
'data' : pickle.dumps(dict_data),
'ex_data' : None,
}
self.chunks.insert (last_app_chunk+1, dflchunk)
for chunk in self.chunks:
data += struct.pack ("BB", 0xFF, chunk['m_h'] )
chunk_data = chunk['data']
@ -294,47 +220,55 @@ class DFLJPG(object):
return chunk.height
return 0
def getDFLDictData(self):
def get_dict(self):
return self.dfl_dict
def setDFLDictData (self, dict_data=None):
def set_dict (self, dict_data=None):
self.dfl_dict = dict_data
for chunk in self.chunks:
if chunk['name'] == 'APP15':
self.chunks.remove(chunk)
break
def get_face_type(self): return self.dfl_dict.get('face_type', FaceType.toString (FaceType.FULL) )
def set_face_type(self, face_type): self.dfl_dict['face_type'] = face_type
last_app_chunk = 0
for i, chunk in enumerate (self.chunks):
if chunk['m_h'] & 0xF0 == 0xE0:
last_app_chunk = i
def get_landmarks(self): return np.array ( self.dfl_dict['landmarks'] )
def set_landmarks(self, landmarks): self.dfl_dict['landmarks'] = landmarks
dflchunk = {'name' : 'APP15',
'm_h' : 0xEF,
'data' : pickle.dumps(dict_data),
'ex_data' : None,
}
self.chunks.insert (last_app_chunk+1, dflchunk)
def get_eyebrows_expand_mod(self): return self.dfl_dict.get ('eyebrows_expand_mod', 1.0)
def set_eyebrows_expand_mod(self, eyebrows_expand_mod): self.dfl_dict['eyebrows_expand_mod'] = eyebrows_expand_mod
def get_source_filename(self): return self.dfl_dict.get ('source_filename', None)
def set_source_filename(self, source_filename): self.dfl_dict['source_filename'] = source_filename
def get_source_rect(self): return self.dfl_dict.get ('source_rect', None)
def set_source_rect(self, source_rect): self.dfl_dict['source_rect'] = source_rect
def get_source_landmarks(self): return np.array ( self.dfl_dict.get('source_landmarks', None) )
def set_source_landmarks(self, source_landmarks): self.dfl_dict['source_landmarks'] = source_landmarks
def get_face_type(self): return self.dfl_dict['face_type']
def get_landmarks(self): return np.array ( self.dfl_dict['landmarks'] )
def get_ie_polys(self): return self.dfl_dict.get('ie_polys',None)
def get_seg_ie_polys(self): return self.dfl_dict.get('seg_ie_polys',None)
def get_source_filename(self): return self.dfl_dict['source_filename']
def get_source_rect(self): return self.dfl_dict['source_rect']
def get_source_landmarks(self): return np.array ( self.dfl_dict['source_landmarks'] )
def get_image_to_face_mat(self):
mat = self.dfl_dict.get ('image_to_face_mat', None)
if mat is not None:
return np.array (mat)
return None
def get_fanseg_mask(self):
fanseg_mask = self.dfl_dict.get ('fanseg_mask', None)
if fanseg_mask is not None:
return np.clip ( np.array (fanseg_mask) / 255.0, 0.0, 1.0 )[...,np.newaxis]
return None
def get_eyebrows_expand_mod(self):
return self.dfl_dict.get ('eyebrows_expand_mod', None)
def get_relighted(self):
return self.dfl_dict.get ('relighted', False)
def set_image_to_face_mat(self, image_to_face_mat): self.dfl_dict['image_to_face_mat'] = image_to_face_mat
def get_ie_polys(self): return self.dfl_dict.get('ie_polys',None)
def set_ie_polys(self, ie_polys):
if ie_polys is not None and \
not isinstance(ie_polys, list):
ie_polys = ie_polys.dump()
self.dfl_dict['ie_polys'] = ie_polys
def get_seg_ie_polys(self): return self.dfl_dict.get('seg_ie_polys',None)
def set_seg_ie_polys(self, seg_ie_polys):
if seg_ie_polys is not None and \
not isinstance(seg_ie_polys, dict):
seg_ie_polys = seg_ie_polys.dump()
self.dfl_dict['seg_ie_polys'] = seg_ie_polys

View file

@ -75,12 +75,6 @@ if __name__ == "__main__":
if arguments.recover_original_aligned_filename:
Util.recover_original_aligned_filename (input_path=arguments.input_dir)
#if arguments.remove_fanseg:
# Util.remove_fanseg_folder (input_path=arguments.input_dir)
if arguments.remove_ie_polys:
Util.remove_ie_polys_folder (input_path=arguments.input_dir)
if arguments.save_faceset_metadata:
Util.save_faceset_metadata_folder (input_path=arguments.input_dir)
@ -101,8 +95,6 @@ if __name__ == "__main__":
p.add_argument('--input-dir', required=True, action=fixPathAction, dest="input_dir", help="Input directory. A directory containing the files you wish to process.")
p.add_argument('--add-landmarks-debug-images', action="store_true", dest="add_landmarks_debug_images", default=False, help="Add landmarks debug image for aligned faces.")
p.add_argument('--recover-original-aligned-filename', action="store_true", dest="recover_original_aligned_filename", default=False, help="Recover original aligned filename.")
#p.add_argument('--remove-fanseg', action="store_true", dest="remove_fanseg", default=False, help="Remove fanseg mask from aligned faces.")
p.add_argument('--remove-ie-polys', action="store_true", dest="remove_ie_polys", default=False, help="Remove ie_polys from aligned faces.")
p.add_argument('--save-faceset-metadata', action="store_true", dest="save_faceset_metadata", default=False, help="Save faceset metadata to file.")
p.add_argument('--restore-faceset-metadata', action="store_true", dest="restore_faceset_metadata", default=False, help="Restore faceset metadata to file. Image filenames must be the same as used with save.")
p.add_argument('--pack-faceset', action="store_true", dest="pack_faceset", default=False, help="")

View file

@ -92,7 +92,9 @@ class ExtractSubprocessor(Subprocessor):
self.cached_image = ( filepath, image )
h, w, c = image.shape
extract_from_dflimg = (h == w and DFLIMG.load (filepath) is not None)
dflimg = DFLIMG.load (filepath)
extract_from_dflimg = (h == w and (dflimg is not None and dflimg.has_data()) )
if 'rects' in self.type or self.type == 'all':
data = ExtractSubprocessor.Cli.rects_stage (data=data,
@ -263,13 +265,14 @@ class ExtractSubprocessor(Subprocessor):
output_filepath = output_path / f"{filepath.stem}_{face_idx}.jpg"
cv2_imwrite(output_filepath, face_image, [int(cv2.IMWRITE_JPEG_QUALITY), 90] )
DFLJPG.embed_data(output_filepath, face_type=FaceType.toString(face_type),
landmarks=face_image_landmarks.tolist(),
source_filename=filepath.name,
source_rect=rect,
source_landmarks=image_landmarks.tolist(),
image_to_face_mat=image_to_face_mat
)
dflimg = DFLJPG.load(output_filepath)
dflimg.set_face_type(FaceType.toString(face_type))
dflimg.set_landmarks(face_image_landmarks.tolist())
dflimg.set_source_filename(filepath.name)
dflimg.set_source_rect(rect)
dflimg.set_source_landmarks(image_landmarks.tolist())
dflimg.set_image_to_face_mat(image_to_face_mat)
dflimg.save()
data.final_output_files.append (output_filepath)
face_idx += 1

View file

@ -99,19 +99,23 @@ class FacesetEnhancerSubprocessor(Subprocessor):
def process_data(self, filepath):
try:
dflimg = DFLIMG.load (filepath)
if dflimg is None:
self.log_err ("%s is not a dfl image file" % (filepath.name) )
if dflimg is None or not dflimg.has_data():
self.log_err (f"{filepath.name} is not a dfl image file")
else:
dfl_dict = dflimg.get_dict()
img = cv2_imread(filepath).astype(np.float32) / 255.0
img = self.fe.enhance(img)
img = np.clip (img*255, 0, 255).astype(np.uint8)
output_filepath = self.output_dirpath / filepath.name
cv2_imwrite ( str(output_filepath), img, [int(cv2.IMWRITE_JPEG_QUALITY), 100] )
dflimg.embed_and_set ( str(output_filepath) )
dflimg = DFLIMG.load (output_filepath)
dflimg.set_dict(dfl_dict)
dflimg.save()
return (1, filepath, output_filepath)
except:
self.log_err (f"Exception occured while processing file {filepath}. Error: {traceback.format_exc()}")

View file

@ -398,26 +398,23 @@ def mask_editor_main(input_dir, confirmed_dir=None, skipped_dir=None, no_default
if filepath is not None:
dflimg = DFLIMG.load (filepath)
if dflimg is None:
if dflimg is None or not dflimg.has_data():
io.log_err ("%s is not a dfl image file" % (filepath.name) )
continue
else:
lmrks = dflimg.get_landmarks()
ie_polys = IEPolys.load(dflimg.get_ie_polys())
fanseg_mask = dflimg.get_fanseg_mask()
if filepath.name in cached_images:
img = cached_images[filepath.name]
else:
img = cached_images[filepath.name] = cv2_imread(str(filepath)) / 255.0
if fanseg_mask is not None:
mask = fanseg_mask
if no_default_mask:
mask = np.zeros ( (target_wh,target_wh,3) )
else:
if no_default_mask:
mask = np.zeros ( (target_wh,target_wh,3) )
else:
mask = LandmarksProcessor.get_image_hull_mask( img.shape, lmrks, eyebrows_expand_mod=eyebrows_expand_mod)
mask = LandmarksProcessor.get_image_hull_mask( img.shape, lmrks, eyebrows_expand_mod=eyebrows_expand_mod)
else:
img = np.zeros ( (target_wh,target_wh,3) )
mask = np.ones ( (target_wh,target_wh,3) )
@ -521,7 +518,9 @@ def mask_editor_main(input_dir, confirmed_dir=None, skipped_dir=None, no_default
do_save_move_count -= 1
ed.mask_finish()
dflimg.embed_and_set (str(filepath), ie_polys=ed.get_ie_polys(), eyebrows_expand_mod=eyebrows_expand_mod )
dflimg.set_ie_polys(ed.get_ie_polys())
dflimg.set_eyebrows_expand_mod(eyebrows_expand_mod)
dflimg.save()
done_paths += [ confirmed_path / filepath.name ]
done_images_types[filepath.name] = 2
@ -532,7 +531,9 @@ def mask_editor_main(input_dir, confirmed_dir=None, skipped_dir=None, no_default
do_save_count -= 1
ed.mask_finish()
dflimg.embed_and_set (str(filepath), ie_polys=ed.get_ie_polys(), eyebrows_expand_mod=eyebrows_expand_mod )
dflimg.set_ie_polys(ed.get_ie_polys())
dflimg.set_eyebrows_expand_mod(eyebrows_expand_mod)
dflimg.save()
done_paths += [ filepath ]
done_images_types[filepath.name] = 2

View file

@ -101,14 +101,14 @@ def main (model_class_name=None,
def generator():
for filepath in io.progress_bar_generator( pathex.get_image_paths(aligned_path), "Collecting alignments"):
filepath = Path(filepath)
yield filepath, DFLIMG.load(filepath)
yield filepath, DFLIMG.load(filepath)
alignments = {}
multiple_faces_detected = False
for filepath, dflimg in generator():
if dflimg is None:
io.log_err ("%s is not a dfl image file" % (filepath.name) )
if dflimg is None or not dflimg.has_data():
io.log_err (f"{filepath.name} is not a dfl image file")
continue
source_filename = dflimg.get_source_filename()
@ -221,7 +221,7 @@ filesdata = []
for filepath in io.progress_bar_generator(input_path_image_paths, "Collecting info"):
filepath = Path(filepath)
dflimg = DFLIMG.load(filepath)
dflimg = DFLIMG.x(filepath)
if dflimg is None:
io.log_err ("%s is not a dfl image file" % (filepath.name) )
continue

View file

@ -29,12 +29,13 @@ class BlurEstimatorSubprocessor(Subprocessor):
filepath = Path( data[0] )
dflimg = DFLIMG.load (filepath)
if dflimg is not None:
if dflimg is None or not dflimg.has_data():
self.log_err (f"{filepath.name} is not a dfl image file")
return [ str(filepath), 0 ]
else:
image = cv2_imread( str(filepath) )
return [ str(filepath), estimate_sharpness(image) ]
else:
self.log_err ("%s is not a dfl image file" % (filepath.name) )
return [ str(filepath), 0 ]
#override
def get_data_name (self, data):
@ -109,8 +110,8 @@ def sort_by_face_yaw(input_path):
dflimg = DFLIMG.load (filepath)
if dflimg is None:
io.log_err ("%s is not a dfl image file" % (filepath.name) )
if dflimg is None or not dflimg.has_data():
io.log_err (f"{filepath.name} is not a dfl image file")
trash_img_list.append ( [str(filepath)] )
continue
@ -132,8 +133,8 @@ def sort_by_face_pitch(input_path):
dflimg = DFLIMG.load (filepath)
if dflimg is None:
io.log_err ("%s is not a dfl image file" % (filepath.name) )
if dflimg is None or not dflimg.has_data():
io.log_err (f"{filepath.name} is not a dfl image file")
trash_img_list.append ( [str(filepath)] )
continue
@ -155,8 +156,8 @@ def sort_by_face_source_rect_size(input_path):
dflimg = DFLIMG.load (filepath)
if dflimg is None:
io.log_err ("%s is not a dfl image file" % (filepath.name) )
if dflimg is None or not dflimg.has_data():
io.log_err (f"{filepath.name} is not a dfl image file")
trash_img_list.append ( [str(filepath)] )
continue
@ -341,7 +342,7 @@ def sort_by_hist_dissim(input_path):
image = cv2_imread(str(filepath))
if dflimg is not None:
if dflimg is not None and dflimg.has_data():
face_mask = LandmarksProcessor.get_image_hull_mask (image.shape, dflimg.get_landmarks())
image = (image*face_mask).astype(np.uint8)
@ -391,8 +392,8 @@ def sort_by_origname(input_path):
dflimg = DFLIMG.load (filepath)
if dflimg is None:
io.log_err ("%s is not a dfl image file" % (filepath.name) )
if dflimg is None or not dflimg.has_data():
io.log_err (f"{filepath.name} is not a dfl image file")
trash_img_list.append( [str(filepath)] )
continue
@ -434,8 +435,8 @@ class FinalLoaderSubprocessor(Subprocessor):
try:
dflimg = DFLIMG.load (filepath)
if dflimg is None:
self.log_err("%s is not a dfl image file" % (filepath.name))
if dflimg is None or not dflimg.has_data():
self.log_err (f"{filepath.name} is not a dfl image file")
return [ 1, [str(filepath)] ]
bgr = cv2_imread(str(filepath))

View file

@ -22,8 +22,11 @@ def save_faceset_metadata_folder(input_path):
for filepath in io.progress_bar_generator( pathex.get_image_paths(input_path), "Processing"):
filepath = Path(filepath)
dflimg = DFLIMG.load (filepath)
if dflimg is None or not dflimg.has_data():
io.log_info(f"{filepath} is not a dfl image file")
continue
dfl_dict = dflimg.getDFLDictData()
dfl_dict = dflimg.get_dict()
d[filepath.name] = ( dflimg.get_shape(), dfl_dict )
try:
@ -52,70 +55,29 @@ def restore_faceset_metadata_folder(input_path):
except:
raise FileNotFoundError(filename)
for filepath in io.progress_bar_generator( pathex.get_image_paths(input_path), "Processing"):
filepath = Path(filepath)
for filepath in io.progress_bar_generator( pathex.get_image_paths(input_path, image_extensions=['.jpg'], return_Path_class=True), "Processing"):
saved_data = d.get(filepath.name, None)
if saved_data is None:
io.log_info(f"No saved metadata for {filepath}")
continue
shape, dfl_dict = d.get(filepath.name, None)
shape, dfl_dict = saved_data
img = cv2_imread (str(filepath))
img = cv2_imread (filepath)
if img.shape != shape:
img = cv2.resize (img, (shape[1], shape[0]), cv2.INTER_LANCZOS4 )
if filepath.suffix == '.png':
cv2_imwrite (str(filepath), img)
elif filepath.suffix == '.jpg':
cv2_imwrite (str(filepath), img, [int(cv2.IMWRITE_JPEG_QUALITY), 100] )
cv2_imwrite (str(filepath), img, [int(cv2.IMWRITE_JPEG_QUALITY), 100] )
if filepath.suffix == '.jpg':
DFLJPG.embed_dfldict( str(filepath), dfl_dict )
dflimg = DFLJPG.load(filepath)
dflimg.set_dict(dfl_dict)
dflimg.save()
else:
continue
metadata_filepath.unlink()
def remove_ie_polys_file (filepath):
filepath = Path(filepath)
dflimg = DFLIMG.load (filepath)
if dflimg is None:
io.log_err ("%s is not a dfl image file" % (filepath.name) )
return
dflimg.remove_ie_polys()
dflimg.embed_and_set( str(filepath) )
def remove_ie_polys_folder(input_path):
input_path = Path(input_path)
io.log_info ("Removing ie_polys...\r\n")
for filepath in io.progress_bar_generator( pathex.get_image_paths(input_path), "Removing"):
filepath = Path(filepath)
remove_ie_polys_file(filepath)
def remove_fanseg_file (filepath):
filepath = Path(filepath)
dflimg = DFLIMG.load (filepath)
if dflimg is None:
io.log_err ("%s is not a dfl image file" % (filepath.name) )
return
dflimg.remove_fanseg_mask()
dflimg.embed_and_set( str(filepath) )
def remove_fanseg_folder(input_path):
input_path = Path(input_path)
io.log_info ("Removing fanseg mask...\r\n")
for filepath in io.progress_bar_generator( pathex.get_image_paths(input_path), "Removing"):
filepath = Path(filepath)
remove_fanseg_file(filepath)
def add_landmarks_debug_images(input_path):
io.log_info ("Adding landmarks debug images...")
@ -126,8 +88,8 @@ def add_landmarks_debug_images(input_path):
dflimg = DFLIMG.load (filepath)
if dflimg is None:
io.log_err ("%s is not a dfl image file" % (filepath.name) )
if dflimg is None or not dflimg.has_data():
io.log_err (f"{filepath.name} is not a dfl image file")
continue
if img is not None:
@ -154,8 +116,8 @@ def recover_original_aligned_filename(input_path):
dflimg = DFLIMG.load (filepath)
if dflimg is None:
io.log_err ("%s is not a dfl image file" % (filepath.name) )
if dflimg is None or not dflimg.has_data():
io.log_err (f"{filepath.name} is not a dfl image file")
continue
files += [ [filepath, None, dflimg.get_source_filename(), False] ]
@ -212,13 +174,13 @@ def convert_png_to_jpg_file (filepath):
io.log_err ("%s is not a dfl png image file" % (filepath.name) )
return
dfl_dict = dflpng.getDFLDictData()
dfl_dict = dflpng.get_dict()
img = cv2_imread (str(filepath))
new_filepath = str(filepath.parent / (filepath.stem + '.jpg'))
cv2_imwrite ( new_filepath, img, [int(cv2.IMWRITE_JPEG_QUALITY), 100])
DFLJPG.embed_data( new_filepath,
DFLJPG.x( new_filepath,
face_type=dfl_dict.get('face_type', None),
landmarks=dfl_dict.get('landmarks', None),
ie_polys=dfl_dict.get('ie_polys', None),

View file

@ -21,7 +21,7 @@ def merge(input_dir):
json_filepath = filepath.parent / (filepath.stem+'.json')
if json_filepath.exists():
dflimg = DFLIMG.load(filepath)
if dflimg is not None:
if dflimg is not None and dflimg.has_data():
try:
json_dict = json.loads(json_filepath.read_text())
@ -52,7 +52,8 @@ def merge(input_dir):
io.log_info(f"No points found in {json_filepath}, skipping.")
continue
dflimg.embed_and_set (filepath, seg_ie_polys=seg_ie_polys)
dflimg.set_seg_ie_polys (seg_ie_polys)
dflimg.save()
json_filepath.unlink()
@ -76,7 +77,7 @@ def split(input_dir ):
dflimg = DFLIMG.load(filepath)
if dflimg is not None:
if dflimg is not None and dflimg.has_data():
try:
seg_ie_polys = dflimg.get_seg_ie_polys()
if seg_ie_polys is not None:
@ -98,8 +99,8 @@ def split(input_dir ):
json_filepath.write_text( json.dumps (json_dict,indent=4) )
dflimg.remove_seg_ie_polys()
dflimg.embed_and_set (filepath)
dflimg.set_seg_ie_polys(None)
dflimg.save()
images_processed += 1
except:
io.log_err(f"err {filepath}, {traceback.format_exc()}")

View file

@ -219,182 +219,6 @@ def extract_vggface2_dataset(input_dir, device_args={} ):
"""
class CelebAMASKHQSubprocessor(Subprocessor):
class Cli(Subprocessor.Cli):
#override
def on_initialize(self, client_dict):
self.masks_files_paths = client_dict['masks_files_paths']
return None
#override
def process_data(self, data):
filename = data[0]
dflimg = DFLIMG.load(Path(filename))
image_to_face_mat = dflimg.get_image_to_face_mat()
src_filename = dflimg.get_source_filename()
img = cv2_imread(filename)
h,w,c = img.shape
fanseg_mask = LandmarksProcessor.get_image_hull_mask(img.shape, dflimg.get_landmarks() )
idx_name = '%.5d' % int(src_filename.split('.')[0])
idx_files = [ x for x in self.masks_files_paths if idx_name in x ]
skin_files = [ x for x in idx_files if 'skin' in x ]
eye_glass_files = [ x for x in idx_files if 'eye_g' in x ]
for files, is_invert in [ (skin_files,False),
(eye_glass_files,True) ]:
if len(files) > 0:
mask = cv2_imread(files[0])
mask = mask[...,0]
mask[mask == 255] = 1
mask = mask.astype(np.float32)
mask = cv2.resize(mask, (1024,1024) )
mask = cv2.warpAffine(mask, image_to_face_mat, (w, h), cv2.INTER_LANCZOS4)
if not is_invert:
fanseg_mask *= mask[...,None]
else:
fanseg_mask *= (1-mask[...,None])
dflimg.embed_and_set (filename, fanseg_mask=fanseg_mask)
return 1
#override
def get_data_name (self, data):
#return string identificator of your data
return data[0]
#override
def __init__(self, image_paths, masks_files_paths ):
self.image_paths = image_paths
self.masks_files_paths = masks_files_paths
self.result = []
super().__init__('CelebAMASKHQSubprocessor', CelebAMASKHQSubprocessor.Cli, 60)
#override
def process_info_generator(self):
for i in range(min(multiprocessing.cpu_count(), 8)):
yield 'CPU%d' % (i), {}, {'masks_files_paths' : self.masks_files_paths }
#override
def on_clients_initialized(self):
io.progress_bar ("Processing", len (self.image_paths))
#override
def on_clients_finalized(self):
io.progress_bar_close()
#override
def get_data(self, host_dict):
if len (self.image_paths) > 0:
return [self.image_paths.pop(0)]
return None
#override
def on_data_return (self, host_dict, data):
self.image_paths.insert(0, data[0])
#override
def on_result (self, host_dict, data, result):
io.progress_bar_inc(1)
#override
def get_result(self):
return self.result
#unused in end user workflow
def apply_celebamaskhq(input_dir ):
input_path = Path(input_dir)
img_path = input_path / 'aligned'
mask_path = input_path / 'mask'
if not img_path.exists():
raise ValueError(f'{str(img_path)} directory not found. Please ensure it exists.')
CelebAMASKHQSubprocessor(pathex.get_image_paths(img_path),
pathex.get_image_paths(mask_path, subdirs=True) ).run()
return
paths_to_extract = []
for filename in io.progress_bar_generator(pathex.get_image_paths(img_path), desc="Processing"):
filepath = Path(filename)
dflimg = DFLIMG.load(filepath)
if dflimg is not None:
paths_to_extract.append (filepath)
image_to_face_mat = dflimg.get_image_to_face_mat()
src_filename = dflimg.get_source_filename()
#img = cv2_imread(filename)
h,w,c = dflimg.get_shape()
fanseg_mask = LandmarksProcessor.get_image_hull_mask( (h,w,c), dflimg.get_landmarks() )
idx_name = '%.5d' % int(src_filename.split('.')[0])
idx_files = [ x for x in masks_files if idx_name in x ]
skin_files = [ x for x in idx_files if 'skin' in x ]
eye_glass_files = [ x for x in idx_files if 'eye_g' in x ]
for files, is_invert in [ (skin_files,False),
(eye_glass_files,True) ]:
if len(files) > 0:
mask = cv2_imread(files[0])
mask = mask[...,0]
mask[mask == 255] = 1
mask = mask.astype(np.float32)
mask = cv2.resize(mask, (1024,1024) )
mask = cv2.warpAffine(mask, image_to_face_mat, (w, h), cv2.INTER_LANCZOS4)
if not is_invert:
fanseg_mask *= mask[...,None]
else:
fanseg_mask *= (1-mask[...,None])
#cv2.imshow("", (fanseg_mask*255).astype(np.uint8) )
#cv2.waitKey(0)
dflimg.embed_and_set (filename, fanseg_mask=fanseg_mask)
#import code
#code.interact(local=dict(globals(), **locals()))
#unused in end user workflow
def extract_fanseg(input_dir, device_args={} ):
multi_gpu = device_args.get('multi_gpu', False)
cpu_only = device_args.get('cpu_only', False)
input_path = Path(input_dir)
if not input_path.exists():
raise ValueError('Input directory not found. Please ensure it exists.')
paths_to_extract = []
for filename in pathex.get_image_paths(input_path) :
filepath = Path(filename)
dflimg = DFLIMG.load ( filepath )
if dflimg is not None:
paths_to_extract.append (filepath)
paths_to_extract_len = len(paths_to_extract)
if paths_to_extract_len > 0:
io.log_info ("Performing extract fanseg for %d files..." % (paths_to_extract_len) )
data = ExtractSubprocessor ([ ExtractSubprocessor.Data(filename) for filename in paths_to_extract ], 'fanseg', multi_gpu=multi_gpu, cpu_only=cpu_only).run()
#unused in end user workflow
def dev_test_68(input_dir ):
# process 68 landmarks dataset with .pts files
@ -452,12 +276,13 @@ def dev_test_68(input_dir ):
img = imagelib.normalize_channels(img, 3)
cv2_imwrite(output_filepath, img, [int(cv2.IMWRITE_JPEG_QUALITY), 95] )
DFLJPG.embed_data(output_filepath, face_type=FaceType.toString(FaceType.MARK_ONLY),
landmarks=lmrks,
source_filename=filepath.name,
source_rect=rect,
source_landmarks=lmrks
)
raise Exception("unimplemented")
#DFLJPG.x(output_filepath, face_type=FaceType.toString(FaceType.MARK_ONLY),
# landmarks=lmrks,
# source_filename=filepath.name,
# source_rect=rect,
# source_landmarks=lmrks
# )
io.log_info("Done.")
@ -544,11 +369,11 @@ def dev_test1(input_dir):
for filename in img_paths:
filepath = Path(filename)
dflimg = DFLIMG.load (filepath)
dflimg = DFLIMG.x (filepath)
if dflimg is None:
raise ValueError
dflimg.embed_and_set(filename, person_name=dir_name)
#dflimg.x(filename, person_name=dir_name)
#import code
#code.interact(local=dict(globals(), **locals()))
@ -588,98 +413,6 @@ def dev_segmented_trash(input_dir):
io.log_info ('fail to trashing %s' % (src.name) )
def dev_segmented_extract(input_dir, output_dir ):
# extract and merge .json labelme files within the faces
device_config = nn.DeviceConfig.GPUIndexes( nn.ask_choose_device_idxs(suggest_all_gpu=True) )
input_path = Path(input_dir)
if not input_path.exists():
raise ValueError('input_dir not found. Please ensure it exists.')
output_path = Path(output_dir)
io.log_info("Performing extract segmented faces.")
io.log_info(f'Output dir is {output_path}')
if output_path.exists():
output_images_paths = pathex.get_image_paths(output_path, subdirs=True)
if len(output_images_paths) > 0:
io.input_bool("WARNING !!! \n %s contains files! \n They will be deleted. \n Press enter to continue." % (str(output_path)), False )
for filename in output_images_paths:
Path(filename).unlink()
shutil.rmtree(str(output_path))
else:
output_path.mkdir(parents=True, exist_ok=True)
images_paths = pathex.get_image_paths(input_path, subdirs=True, return_Path_class=True)
extract_data = []
images_jsons = {}
images_processed = 0
for filepath in io.progress_bar_generator(images_paths, "Processing"):
json_filepath = filepath.parent / (filepath.stem+'.json')
if json_filepath.exists():
try:
json_dict = json.loads(json_filepath.read_text())
images_jsons[filepath] = json_dict
total_points = [ [x,y] for shape in json_dict['shapes'] for x,y in shape['points'] ]
total_points = np.array(total_points)
if len(total_points) == 0:
io.log_info(f"No points found in {json_filepath}, skipping.")
continue
l,r = int(total_points[:,0].min()), int(total_points[:,0].max())
t,b = int(total_points[:,1].min()), int(total_points[:,1].max())
force_output_path=output_path / filepath.relative_to(input_path).parent
force_output_path.mkdir(exist_ok=True, parents=True)
extract_data.append ( ExtractSubprocessor.Data(filepath,
rects=[ [l,t,r,b] ],
force_output_path=force_output_path ) )
images_processed += 1
except:
io.log_err(f"err {filepath}, {traceback.format_exc()}")
return
else:
io.log_info(f"No .json file for {filepath.relative_to(input_path)}, skipping.")
continue
image_size = 1024
face_type = FaceType.HEAD
extract_data = ExtractSubprocessor (extract_data, 'landmarks', image_size, face_type, device_config=device_config).run()
extract_data = ExtractSubprocessor (extract_data, 'final', image_size, face_type, device_config=device_config).run()
for data in extract_data:
filepath = data.force_output_path / (data.filepath.stem+'_0.jpg')
dflimg = DFLIMG.load(filepath)
image_to_face_mat = dflimg.get_image_to_face_mat()
json_dict = images_jsons[data.filepath]
ie_polys = IEPolys()
for shape in json_dict['shapes']:
ie_poly = ie_polys.add(1)
points = np.array( [ [x,y] for x,y in shape['points'] ] )
points = LandmarksProcessor.transform_points(points, image_to_face_mat)
for x,y in points:
ie_poly.add( int(x), int(y) )
dflimg.embed_and_set (filepath, ie_polys=ie_polys)
io.log_info(f"Images found: {len(images_paths)}")
io.log_info(f"Images processed: {images_processed}")
"""
#mark only
for data in extract_data:
@ -699,7 +432,7 @@ for data in extract_data:
ie_poly.add( int(x), int(y) )
DFLJPG.embed_data(output_filepath, face_type=FaceType.toString(FaceType.MARK_ONLY),
DFLJPG.x(output_filepath, face_type=FaceType.toString(FaceType.MARK_ONLY),
landmarks=data.landmarks[0],
ie_polys=ie_polys,
source_filename=filepath.name,

View file

@ -171,7 +171,7 @@ class FaceSamplesLoaderSubprocessor(Subprocessor):
idx, filename = data
dflimg = DFLIMG.load (Path(filename))
if dflimg is None:
if dflimg is None or not dflimg.has_data():
self.log_err (f"FaceSamplesLoader: {filename} is not a dfl image file.")
data = None
else:

View file

@ -57,10 +57,7 @@ class SampleProcessor(object):
h,w,c = sample_bgr.shape
def get_full_face_mask():
if sample.eyebrows_expand_mod is not None:
full_face_mask = LandmarksProcessor.get_image_hull_mask (sample_bgr.shape, sample_landmarks, eyebrows_expand_mod=sample.eyebrows_expand_mod )
else:
full_face_mask = LandmarksProcessor.get_image_hull_mask (sample_bgr.shape, sample_landmarks)
full_face_mask = LandmarksProcessor.get_image_hull_mask (sample_bgr.shape, sample_landmarks, eyebrows_expand_mod=sample.eyebrows_expand_mod )
return np.clip(full_face_mask, 0, 1)
def get_eyes_mask():