refactoring

This commit is contained in:
iperov 2021-11-07 10:03:15 +04:00
commit 30ba51edf7
24 changed files with 663 additions and 459 deletions

View file

@ -9,7 +9,7 @@ class FMask:
def __init__(self, _from_pickled=False):
"""
"""
self._uuid : Union[bytes, None] = uuid.uuid4().bytes_le if not _from_pickled else None
self._uuid : Union[bytes, None] = uuid.uuid4().bytes if not _from_pickled else None
self._mask_type : Union[FMask.Type, None] = None
self._FImage_uuid : Union[bytes, None] = None

View file

@ -52,9 +52,25 @@ class FaceWarper:
self._rw_grid_tx = rnd_state.uniform(*rw_grid_tx) if isinstance(rw_grid_tx, Iterable) else rw_grid_tx
self._rw_grid_ty = rnd_state.uniform(*rw_grid_ty) if isinstance(rw_grid_ty, Iterable) else rw_grid_ty
self._warp_rnd_mat = Affine2DUniMat.from_transformation(0.5, 0.5, self._rw_grid_rot_deg, 1.0+self._rw_grid_scale, self._rw_grid_tx, self._rw_grid_ty)
self._align_rnd_mat = Affine2DUniMat.from_transformation(0.5, 0.5, self._align_rot_deg, 1.0+self._align_scale, self._align_tx, self._align_ty)
self._rnd_state_state = rnd_state.get_state()
self._cached = {}
def get_aligned_random_transform_mat(self) -> Affine2DUniMat:
"""
returns Affine2DUniMat that represents transformation from aligned face to randomly transformed aligned face
"""
mat1 = self._img_to_face_uni_mat
mat2 = (self._face_to_img_uni_mat * self._align_rnd_mat).invert()
pts = [ [0,0], [1,0], [1,1]]
src_pts = mat1.transform_points(pts)
dst_pts = mat2.transform_points(pts)
return Affine2DUniMat.from_3_pairs(src_pts, dst_pts)
def transform(self, img : np.ndarray, out_res : int, random_warp : bool = True) -> np.ndarray:
"""
transform an image.
@ -91,9 +107,7 @@ class FaceWarper:
face_warp_grid = FaceWarper._gen_random_warp_uni_grid_diff(out_res, self._rw_grid_cell_count, 0.12, rnd_state)
# make a randomly transformable mat to transform face_warp_grid from face to image
face_warp_grid_mat = (self._face_to_img_uni_mat *
Affine2DUniMat.from_transformation(0.5, 0.5, self._rw_grid_rot_deg, 1.0+self._rw_grid_scale, self._rw_grid_tx, self._rw_grid_ty)
)
face_warp_grid_mat = self._face_to_img_uni_mat * self._warp_rnd_mat
# warp face_warp_grid to the space of image and merge with image_grid
image_grid += cv2.warpAffine(face_warp_grid, face_warp_grid_mat.to_exact_mat(out_res,out_res, W, H), (W,H), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT)
@ -101,9 +115,10 @@ class FaceWarper:
# scale uniform grid from to image size
image_grid *= (H-1, W-1)
# apply random transormations for align mat
img_to_face_rnd_mat = (self._face_to_img_uni_mat * Affine2DMat.from_transformation(0.5, 0.5, self._align_rot_deg, 1.0+self._align_scale, self._align_tx, self._align_ty)
).invert().to_exact_mat(W,H,out_res,out_res)
# apply random transformations for align mat
#img_to_face_rnd_uni_mat = (self._face_to_img_uni_mat * self._align_rnd_mat).invert()
img_to_face_rnd_mat = (self._face_to_img_uni_mat * self._align_rnd_mat).invert().to_exact_mat(W,H,out_res,out_res)
# warp image_grid to face space
image_grid = cv2.warpAffine(image_grid, img_to_face_rnd_mat, (out_res,out_res), flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE )

View file

@ -1,19 +1,22 @@
import pickle
import sqlite3
import uuid
from pathlib import Path
from typing import Generator, List, Union, Iterable
from typing import Generator, Iterable, List, Union
import cv2
import h5py
import numpy as np
from .. import console as lib_con
from .FMask import FMask
from .UFaceMark import UFaceMark
from .UImage import UImage
from .UPerson import UPerson
class Faceset:
def __init__(self, path = None):
def __init__(self, path = None, write_access=False, recreate=False):
"""
Faceset is a class to store and manage face related data.
@ -21,205 +24,155 @@ class Faceset:
path path to faceset .dfs file
write_access
recreate
Can be pickled.
"""
self._f = None
self._path = path = Path(path)
if path.suffix != '.dfs':
raise ValueError('Path must be a .dfs file')
self._conn = conn = sqlite3.connect(path, isolation_level=None)
self._cur = cur = conn.cursor()
if path.exists():
if write_access and recreate:
path.unlink()
elif not write_access:
raise FileNotFoundError(f'File {path} not found.')
cur = self._get_cursor()
cur.execute('BEGIN IMMEDIATE')
if not self._is_table_exists('FacesetInfo'):
self.recreate(shrink=False, _transaction=False)
cur.execute('COMMIT')
self.shrink()
else:
cur.execute('END')
self._mode = 'a' if write_access else 'r'
self._open()
def __del__(self):
self.close()
def __getstate__(self):
return {'_path' : self._path}
return {'_path' : self._path, '_mode' : self._mode}
def __setstate__(self, d):
self.__init__( d['_path'] )
self._f = None
self._path = d['_path']
self._mode = d['_mode']
self._open()
def __repr__(self): return self.__str__()
def __str__(self):
return f"Faceset. UImage:{self.get_UImage_count()} UFaceMark:{self.get_UFaceMark_count()} UPerson:{self.get_UPerson_count()}"
def _is_table_exists(self, name):
return self._cur.execute(f"SELECT count(*) FROM sqlite_master WHERE type='table' AND name=?", [name]).fetchone()[0] != 0
def _open(self):
if self._f is None:
self._f = f = h5py.File(self._path, mode=self._mode)
self._UFaceMark_grp = f.require_group('UFaceMark')
self._UImage_grp = f.require_group('UImage')
self._UImage_image_data_grp = f.require_group('UImage_image_data')
self._UPerson_grp = f.require_group('UPerson')
def _get_cursor(self) -> sqlite3.Cursor: return self._cur
def close(self):
if self._cur is not None:
self._cur.close()
self._cur = None
if self._f is not None:
self._f.close()
self._f = None
if self._conn is not None:
self._conn.close()
self._conn = None
def shrink(self):
self._cur.execute('VACUUM')
def recreate(self, shrink=True, _transaction=True):
def optimize(self, verbose=True):
"""
delete all data and recreate Faceset structure.
recreate Faceset with optimized structure.
"""
cur = self._get_cursor()
if verbose:
print(f'Optimizing {self._path.name}...')
if _transaction:
cur.execute('BEGIN IMMEDIATE')
tmp_path = self._path.parent / (self._path.stem + '_optimizing' + self._path.suffix)
for table_name, in cur.execute("SELECT name from sqlite_master where type = 'table';").fetchall():
cur.execute(f'DROP TABLE {table_name}')
tmp_fs = Faceset(tmp_path, write_access=True, recreate=True)
self._group_copy(tmp_fs._UFaceMark_grp, self._UFaceMark_grp, verbose=verbose)
self._group_copy(tmp_fs._UPerson_grp, self._UPerson_grp, verbose=verbose)
self._group_copy(tmp_fs._UImage_grp, self._UImage_grp, verbose=verbose)
self._group_copy(tmp_fs._UImage_image_data_grp, self._UImage_image_data_grp, verbose=verbose)
tmp_fs.close()
(cur.execute('CREATE TABLE FacesetInfo (version INT)')
.execute('INSERT INTO FacesetInfo VALUES (1)')
self.close()
self._path.unlink()
tmp_path.rename(self._path)
self._open()
.execute('CREATE TABLE UImage (uuid BLOB, name TEXT, format TEXT, data BLOB)')
.execute('CREATE TABLE UPerson (uuid BLOB, data BLOB)')
.execute('CREATE TABLE UFaceMark (uuid BLOB, UImage_uuid BLOB, UPerson_uuid BLOB, data BLOB)')
)
def _group_copy(self, group_dst : h5py.Group, group_src : h5py.Group, verbose=True):
for key, value in lib_con.progress_bar_iterator(group_src.items(), desc=f'Copying {group_src.name} -> {group_dst.name}', suppress_print=not verbose):
d = group_dst.create_dataset(key, shape=value.shape, dtype=value.dtype )
d[:] = value[:]
for a_key, a_value in value.attrs.items():
d.attrs[a_key] = a_value
if _transaction:
cur.execute('COMMIT')
def _group_read_bytes(self, group : h5py.Group, key : str, check_key=True) -> Union[bytes, None]:
if check_key and key not in group:
return None
dataset = group[key]
data_bytes = bytearray(len(dataset))
dataset.read_direct(np.frombuffer(data_bytes, dtype=np.uint8))
return data_bytes
if shrink:
self.shrink()
def _group_write_bytes(self, group : h5py.Group, key : str, data : bytes, update_existing=True) -> Union[h5py.Dataset, None]:
if key in group:
if not update_existing:
return None
del group[key]
return group.create_dataset(key, data=np.frombuffer(data, dtype=np.uint8) )
###################
### UFaceMark
###################
def _UFaceMark_from_db_row(self, db_row) -> UFaceMark:
uuid, UImage_uuid, UPerson_uuid, data = db_row
ufm = UFaceMark()
ufm.restore_state(pickle.loads(data))
return ufm
def add_UFaceMark(self, ufacemark_or_list : UFaceMark):
def add_UFaceMark(self, ufacemark_or_list : UFaceMark, update_existing=True):
"""
add or update UFaceMark in DB
"""
if not isinstance(ufacemark_or_list, Iterable):
ufacemark_or_list : List[UFaceMark] = [ufacemark_or_list]
cur = self._cur
cur.execute('BEGIN IMMEDIATE')
for ufm in ufacemark_or_list:
uuid = ufm.get_uuid()
UImage_uuid = ufm.get_UImage_uuid()
UPerson_uuid = ufm.get_UPerson_uuid()
data = pickle.dumps(ufm.dump_state())
if cur.execute('SELECT COUNT(*) from UFaceMark where uuid=?', [uuid] ).fetchone()[0] != 0:
cur.execute('UPDATE UFaceMark SET UImage_uuid=?, UPerson_uuid=?, data=? WHERE uuid=?',
[UImage_uuid, UPerson_uuid, data, uuid])
else:
cur.execute('INSERT INTO UFaceMark VALUES (?, ?, ?, ?)', [uuid, UImage_uuid, UPerson_uuid, data])
cur.execute('COMMIT')
self._group_write_bytes(self._UFaceMark_grp, ufm.get_uuid().hex(), pickle.dumps(ufm.dump_state()), update_existing=update_existing )
def get_UFaceMark_count(self) -> int:
return self._cur.execute('SELECT COUNT(*) FROM UFaceMark').fetchone()[0]
return len(self._UFaceMark_grp.keys())
def get_all_UFaceMark(self) -> List[UFaceMark]:
return [ self._UFaceMark_from_db_row(db_row) for db_row in self._cur.execute('SELECT * FROM UFaceMark').fetchall() ]
return [ UFaceMark.from_state(pickle.loads(self._group_read_bytes(self._UFaceMark_grp, key, check_key=False))) for key in self._UFaceMark_grp.keys() ]
def get_all_UFaceMark_uuids(self) -> List[bytes]:
return [ uuid.UUID(key).bytes for key in self._UFaceMark_grp.keys() ]
def get_UFaceMark_by_uuid(self, uuid : bytes) -> Union[UFaceMark, None]:
c = self._cur.execute('SELECT * FROM UFaceMark WHERE uuid=?', [uuid])
db_row = c.fetchone()
if db_row is None:
data = self._group_read_bytes(self._UFaceMark_grp, uuid.hex())
if data is None:
return None
return UFaceMark.from_state(pickle.loads(data))
return self._UFaceMark_from_db_row(db_row)
def delete_UFaceMark_by_uuid(self, uuid : bytes) -> bool:
key = uuid.hex()
if key in self._UFaceMark_grp:
del self._UFaceMark_grp[key]
return True
return False
def iter_UFaceMark(self) -> Generator[UFaceMark, None, None]:
"""
returns Generator of UFaceMark
"""
for db_row in self._cur.execute('SELECT * FROM UFaceMark').fetchall():
yield self._UFaceMark_from_db_row(db_row)
for key in self._UFaceMark_grp.keys():
yield UFaceMark.from_state(pickle.loads(self._group_read_bytes(self._UFaceMark_grp, key, check_key=False)))
def delete_all_UFaceMark(self):
"""
deletes all UFaceMark from DB
"""
(self._cur.execute('BEGIN IMMEDIATE')
.execute('DELETE FROM UFaceMark')
.execute('COMMIT') )
###################
### UPerson
###################
def _UPerson_from_db_row(self, db_row) -> UPerson:
uuid, data = db_row
up = UPerson()
up.restore_state(pickle.loads(data))
return up
def add_UPerson(self, uperson_or_list : UPerson):
"""
add or update UPerson in DB
"""
if not isinstance(uperson_or_list, Iterable):
uperson_or_list : List[UPerson] = [uperson_or_list]
cur = self._cur
cur.execute('BEGIN IMMEDIATE')
for uperson in uperson_or_list:
uuid = uperson.get_uuid()
data = pickle.dumps(uperson.dump_state())
if cur.execute('SELECT COUNT(*) from UPerson where uuid=?', [uuid]).fetchone()[0] != 0:
cur.execute('UPDATE UPerson SET data=? WHERE uuid=?', [data])
else:
cur.execute('INSERT INTO UPerson VALUES (?, ?)', [uuid, data])
cur.execute('COMMIT')
def get_UPerson_count(self) -> int:
return self._cur.execute('SELECT COUNT(*) FROM UPerson').fetchone()[0]
def get_all_UPerson(self) -> List[UPerson]:
return [ self._UPerson_from_db_row(db_row) for db_row in self._cur.execute('SELECT * FROM UPerson').fetchall() ]
def iter_UPerson(self) -> Generator[UPerson, None, None]:
"""
iterator of all UPerson's
"""
for db_row in self._cur.execute('SELECT * FROM UPerson').fetchall():
yield self._UPerson_from_db_row(db_row)
def delete_all_UPerson(self):
"""
deletes all UPerson from DB
"""
(self._cur.execute('BEGIN IMMEDIATE')
.execute('DELETE FROM UPerson')
.execute('COMMIT') )
for key in self._UFaceMark_grp.keys():
del self._UFaceMark_grp[key]
###################
### UImage
###################
def _UImage_from_db_row(self, db_row) -> UImage:
uuid, name, format, data_bytes = db_row
img = cv2.imdecode(np.frombuffer(data_bytes, dtype=np.uint8), flags=cv2.IMREAD_UNCHANGED)
uimg = UImage()
uimg.set_uuid(uuid)
uimg.set_name(name)
uimg.assign_image(img)
return uimg
def add_UImage(self, uimage_or_list : UImage, format : str = 'webp', quality : int = 100):
def add_UImage(self, uimage_or_list : UImage, format : str = 'png', quality : int = 100, update_existing=True):
"""
add or update UImage in DB
@ -239,9 +192,8 @@ class Faceset:
raise ValueError('quality must be in range [0..100]')
if not isinstance(uimage_or_list, Iterable):
uimage_or_list = [uimage_or_list]
uimage_or_list : List[UImage] = [uimage_or_list]
uimage_datas = []
for uimage in uimage_or_list:
if format == 'webp':
imencode_args = [int(cv2.IMWRITE_WEBP_QUALITY), quality]
@ -251,44 +203,112 @@ class Faceset:
imencode_args = [int(cv2.IMWRITE_JPEG2000_COMPRESSION_X1000), quality*10]
else:
imencode_args = []
ret, data_bytes = cv2.imencode( f'.{format}', uimage.get_image(), imencode_args)
if not ret:
raise Exception(f'Unable to encode image format {format}')
uimage_datas.append(data_bytes.data)
cur = self._cur
cur.execute('BEGIN IMMEDIATE')
for uimage, data in zip(uimage_or_list, uimage_datas):
uuid = uimage.get_uuid()
if cur.execute('SELECT COUNT(*) from UImage where uuid=?', [uuid] ).fetchone()[0] != 0:
cur.execute('UPDATE UImage SET name=?, format=?, data=? WHERE uuid=?', [uimage.get_name(), format, data, uuid])
else:
cur.execute('INSERT INTO UImage VALUES (?, ?, ?, ?)', [uuid, uimage.get_name(), format, data])
cur.execute('COMMIT')
key = uimage.get_uuid().hex()
def get_UImage_count(self) -> int: return self._cur.execute('SELECT COUNT(*) FROM UImage').fetchone()[0]
def get_UImage_by_uuid(self, uuid : Union[bytes, None]) -> Union[UImage, None]:
"""
"""
if uuid is None:
self._group_write_bytes(self._UImage_grp, key, pickle.dumps(uimage.dump_state(exclude_image=True)), update_existing=update_existing )
d = self._group_write_bytes(self._UImage_image_data_grp, key, data_bytes.data, update_existing=update_existing )
d.attrs['format'] = format
d.attrs['quality'] = quality
def get_UImage_count(self) -> int:
return len(self._UImage_grp.keys())
def get_all_UImage(self) -> List[UImage]:
return [ self._get_UImage_by_key(key) for key in self._UImage_grp.keys() ]
def get_all_UImage_uuids(self) -> List[bytes]:
return [ uuid.UUID(key).bytes for key in self._UImage_grp.keys() ]
def _get_UImage_by_key(self, key, check_key=True) -> Union[UImage, None]:
data = self._group_read_bytes(self._UImage_grp, key, check_key=check_key)
if data is None:
return None
uimg = UImage.from_state(pickle.loads(data))
db_row = self._cur.execute('SELECT * FROM UImage where uuid=?', [uuid]).fetchone()
if db_row is None:
return None
return self._UImage_from_db_row(db_row)
image_data = self._group_read_bytes(self._UImage_image_data_grp, key, check_key=check_key)
if image_data is not None:
uimg.assign_image (cv2.imdecode(np.frombuffer(image_data, dtype=np.uint8), flags=cv2.IMREAD_UNCHANGED))
def iter_UImage(self) -> Generator[UImage, None, None]:
return uimg
def get_UImage_by_uuid(self, uuid : bytes) -> Union[UImage, None]:
return self._get_UImage_by_key(uuid.hex())
def delete_UImage_by_uuid(self, uuid : bytes):
key = uuid.hex()
if key in self._UImage_grp:
del self._UImage_grp[key]
if key in self._UImage_image_data_grp:
del self._UImage_image_data_grp[key]
def iter_UImage(self, include_key=False) -> Generator[UImage, None, None]:
"""
iterator of all UImage's
returns Generator of UImage
"""
for db_row in self._cur.execute('SELECT * FROM UImage').fetchall():
yield self._UImage_from_db_row(db_row)
for key in self._UImage_grp.keys():
uimg = self._get_UImage_by_key(key, check_key=False)
yield (uimg, key) if include_key else uimg
def delete_all_UImage(self):
"""
deletes all UImage from DB
"""
(self._cur.execute('BEGIN IMMEDIATE')
.execute('DELETE FROM UImage')
.execute('COMMIT') )
for key in self._UImage_grp.keys():
del self._UImage_grp[key]
for key in self._UImage_image_data_grp.keys():
del self._UImage_image_data_grp[key]
###################
### UPerson
###################
def add_UPerson(self, uperson_or_list : UPerson, update_existing=True):
"""
add or update UPerson in DB
"""
if not isinstance(uperson_or_list, Iterable):
uperson_or_list : List[UPerson] = [uperson_or_list]
for uperson in uperson_or_list:
self._group_write_bytes(self._UPerson_grp, uperson.get_uuid().hex(), pickle.dumps(uperson.dump_state()), update_existing=update_existing )
def get_UPerson_count(self) -> int:
return len(self._UPerson_grp.keys())
def get_all_UPerson(self) -> List[UPerson]:
return [ UPerson.from_state(pickle.loads(self._group_read_bytes(self._UPerson_grp, key, check_key=False))) for key in self._UPerson_grp.keys() ]
def get_all_UPerson_uuids(self) -> List[bytes]:
return [ uuid.UUID(key).bytes for key in self._UPerson_grp.keys() ]
def get_UPerson_by_uuid(self, uuid : bytes) -> Union[UPerson, None]:
data = self._group_read_bytes(self._UPerson_grp, uuid.hex())
if data is None:
return None
return UPerson.from_state(pickle.loads(data))
def delete_UPerson_by_uuid(self, uuid : bytes) -> bool:
key = uuid.hex()
if key in self._UPerson_grp:
del self._UPerson_grp[key]
return True
return False
def iter_UPerson(self) -> Generator[UPerson, None, None]:
"""
returns Generator of UPerson
"""
for key in self._UPerson_grp.keys():
yield UPerson.from_state(pickle.loads(self._group_read_bytes(self._UPerson_grp, key, check_key=False)))
def delete_all_UPerson(self):
"""
deletes all UPerson from DB
"""
for key in self._UPerson_grp.keys():
del self._UPerson_grp[key]

View file

@ -25,7 +25,13 @@ class UFaceMark(IState):
def __repr__(self): return self.__str__()
def __str__(self):
return f"UFaceMark UUID:[...{self.get_uuid()[-4:].hex()}]"
@staticmethod
def from_state(state : dict) -> 'UFaceMark':
ufm = UFaceMark()
ufm.restore_state(state)
return ufm
def restore_state(self, state : dict):
self._uuid = state.get('_uuid', None)
self._UImage_uuid = state.get('_UImage_uuid', None)
@ -45,7 +51,7 @@ class UFaceMark(IState):
def get_uuid(self) -> Union[bytes, None]:
if self._uuid is None:
self._uuid = uuid.uuid4().bytes_le
self._uuid = uuid.uuid4().bytes
return self._uuid
def set_uuid(self, uuid : Union[bytes, None]):
@ -72,6 +78,16 @@ class UFaceMark(IState):
self._FRect = face_urect
def get_all_FLandmarks2D(self) -> List[FLandmarks2D]: return self._FLandmarks2D_list
def get_FLandmarks2D_best(self) -> Union[FLandmarks2D, None]:
"""get best available FLandmarks2D """
lmrks = self.get_FLandmarks2D_by_type(ELandmarks2D.L468)
if lmrks is None:
lmrks = self.get_FLandmarks2D_by_type(ELandmarks2D.L68)
if lmrks is None:
lmrks = self.get_FLandmarks2D_by_type(ELandmarks2D.L5)
return lmrks
def get_FLandmarks2D_by_type(self, type : ELandmarks2D) -> Union[FLandmarks2D, None]:
"""get FLandmarks2D from list by type"""
if not isinstance(type, ELandmarks2D):

View file

@ -18,10 +18,31 @@ class UImage(IState):
def __str__(self): return f"UImage UUID:[...{self.get_uuid()[-4:].hex()}] name:[{self._name}] image:[{ (self._image.shape, self._image.dtype) if self._image is not None else None}]"
def __repr__(self): return self.__str__()
@staticmethod
def from_state(state : dict) -> 'UImage':
ufm = UImage()
ufm.restore_state(state)
return ufm
def restore_state(self, state : dict):
self._uuid = state.get('_uuid', None)
self._name = state.get('_name', None)
self._image = state.get('_image', None)
def dump_state(self, exclude_image=False) -> dict:
d = {'_uuid' : self._uuid,
'_name' : self._name,
}
if not exclude_image:
d['_image'] = self._image
return d
def get_uuid(self) -> Union[bytes, None]:
if self._uuid is None:
self._uuid = uuid.uuid4().bytes_le
self._uuid = uuid.uuid4().bytes
return self._uuid
def set_uuid(self, uuid : Union[bytes, None]):

View file

@ -15,6 +15,12 @@ class UPerson(IState):
def __str__(self): return f"UPerson UUID:[...{self._uuid[-4:].hex()}] name:[{self._name}] age:[{self._age}]"
def __repr__(self): return self.__str__()
@staticmethod
def from_state(state : dict) -> 'UPerson':
ufm = UPerson()
ufm.restore_state(state)
return ufm
def restore_state(self, state : dict):
self._uuid = state.get('_uuid', None)
self._name = state.get('_name', None)
@ -28,7 +34,7 @@ class UPerson(IState):
def get_uuid(self) -> Union[bytes, None]:
if self._uuid is None:
self._uuid = uuid.uuid4().bytes_le
self._uuid = uuid.uuid4().bytes
return self._uuid
def set_uuid(self, uuid : Union[bytes, None]):