mirror of
https://github.com/iperov/DeepFaceLive
synced 2025-07-06 13:02:16 -07:00
refactoring
This commit is contained in:
parent
d90ec2d024
commit
8b385f6d80
11 changed files with 263 additions and 137 deletions
17
main.py
17
main.py
|
@ -52,6 +52,14 @@ def main():
|
||||||
p.add_argument('--delete-parts', action="store_true", default=False)
|
p.add_argument('--delete-parts', action="store_true", default=False)
|
||||||
p.set_defaults(func=run_merge_large_files)
|
p.set_defaults(func=run_merge_large_files)
|
||||||
|
|
||||||
|
def run_extract_FaceSynthetics(args):
|
||||||
|
from scripts import dev
|
||||||
|
dev.extract_FaceSynthetics(input_dir=args.input_dir)
|
||||||
|
|
||||||
|
p = dev_subparsers.add_parser('extract_FaceSynthetics')
|
||||||
|
p.add_argument('--input-dir', default=None, action=fixPathAction, help="FaceSynthetics directory.")
|
||||||
|
p.set_defaults(func=run_extract_FaceSynthetics)
|
||||||
|
|
||||||
train_parser = subparsers.add_parser( "train", help="Train neural network.")
|
train_parser = subparsers.add_parser( "train", help="Train neural network.")
|
||||||
train_parsers = train_parser.add_subparsers()
|
train_parsers = train_parser.add_subparsers()
|
||||||
|
|
||||||
|
@ -64,6 +72,15 @@ def main():
|
||||||
p = train_parsers.add_parser('FaceAligner')
|
p = train_parsers.add_parser('FaceAligner')
|
||||||
p.add_argument('--faceset-path', default=None, action=fixPathAction, help=".dfs path")
|
p.add_argument('--faceset-path', default=None, action=fixPathAction, help=".dfs path")
|
||||||
p.set_defaults(func=train_FaceAligner)
|
p.set_defaults(func=train_FaceAligner)
|
||||||
|
|
||||||
|
def train_CTSOT(args):
|
||||||
|
from apps.trainers.CTSOT.CTSOTTrainerApp import run_app
|
||||||
|
run_app(userdata_path=Path(args.userdata_dir), faceset_path=Path(args.faceset_path))
|
||||||
|
|
||||||
|
p = train_parsers.add_parser('CTSOT')
|
||||||
|
p.add_argument('--userdata-dir', default=None, action=fixPathAction, help="Directory to save app data.")
|
||||||
|
p.add_argument('--faceset-path', default=None, action=fixPathAction, help=".dfs path")
|
||||||
|
p.set_defaults(func=train_CTSOT)
|
||||||
|
|
||||||
def bad_args(arguments):
|
def bad_args(arguments):
|
||||||
parser.print_help()
|
parser.print_help()
|
||||||
|
|
|
@ -26,7 +26,7 @@ def split_large_files(delete_original=False):
|
||||||
SplittedFile.split(filepath, part_size=part_size, delete_original=delete_original)
|
SplittedFile.split(filepath, part_size=part_size, delete_original=delete_original)
|
||||||
print('Done')
|
print('Done')
|
||||||
|
|
||||||
def extract_facesynthetics_dataset(input_dir):
|
def extract_FaceSynthetics(input_dir):
|
||||||
"""
|
"""
|
||||||
extract FaceSynthetics dataset https://github.com/microsoft/FaceSynthetics
|
extract FaceSynthetics dataset https://github.com/microsoft/FaceSynthetics
|
||||||
|
|
||||||
|
@ -53,7 +53,7 @@ def extract_facesynthetics_dataset(input_dir):
|
||||||
"""
|
"""
|
||||||
input_path = Path(input_dir)
|
input_path = Path(input_dir)
|
||||||
faceset_path = input_path.parent / f'{input_path.name}.dfs'
|
faceset_path = input_path.parent / f'{input_path.name}.dfs'
|
||||||
|
|
||||||
# fs = lib_face.Faceset(output_dbpath)
|
# fs = lib_face.Faceset(output_dbpath)
|
||||||
# for ufm in fs.iter_UFaceMark():
|
# for ufm in fs.iter_UFaceMark():
|
||||||
# uimg = fs.get_UImage_by_uuid( ufm.get_UImage_uuid() )
|
# uimg = fs.get_UImage_by_uuid( ufm.get_UImage_uuid() )
|
||||||
|
@ -65,7 +65,7 @@ def extract_facesynthetics_dataset(input_dir):
|
||||||
filepaths = lib_path.get_files_paths(input_path)[:100] #TODO
|
filepaths = lib_path.get_files_paths(input_path)[:100] #TODO
|
||||||
|
|
||||||
fs = lib_face.Faceset(faceset_path)
|
fs = lib_face.Faceset(faceset_path)
|
||||||
fs.clear_db()
|
fs.recreate()
|
||||||
|
|
||||||
|
|
||||||
for filepath in lib_con.progress_bar_iterator(filepaths, 'Processing'):
|
for filepath in lib_con.progress_bar_iterator(filepaths, 'Processing'):
|
||||||
|
@ -104,6 +104,7 @@ def extract_facesynthetics_dataset(input_dir):
|
||||||
|
|
||||||
fs.add_UImage(uimg, format='png')
|
fs.add_UImage(uimg, format='png')
|
||||||
fs.add_UFaceMark(ufm)
|
fs.add_UFaceMark(ufm)
|
||||||
|
|
||||||
|
|
||||||
fs.shrink()
|
fs.shrink()
|
||||||
fs.close()
|
fs.close()
|
||||||
|
|
|
@ -7,9 +7,9 @@ import numpy.linalg as npla
|
||||||
from ..math import Affine2DMat, Affine2DUniMat
|
from ..math import Affine2DMat, Affine2DUniMat
|
||||||
from .ELandmarks2D import ELandmarks2D
|
from .ELandmarks2D import ELandmarks2D
|
||||||
from .FRect import FRect
|
from .FRect import FRect
|
||||||
|
from .IState import IState
|
||||||
|
|
||||||
|
class FLandmarks2D(IState):
|
||||||
class FLandmarks2D:
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
"""
|
"""
|
||||||
Describes 2D face landmarks in uniform float coordinates
|
Describes 2D face landmarks in uniform float coordinates
|
||||||
|
@ -17,19 +17,20 @@ class FLandmarks2D:
|
||||||
self._type : ELandmarks2D = None
|
self._type : ELandmarks2D = None
|
||||||
self._ulmrks : np.ndarray = None
|
self._ulmrks : np.ndarray = None
|
||||||
|
|
||||||
def __getstate__(self):
|
def restore_state(self, state : dict):
|
||||||
return self.__dict__.copy()
|
self._type = IState._restore_enum(ELandmarks2D, state.get('_type', None))
|
||||||
|
self._ulmrks = IState._restore_np_array(state.get('_ulmrks', None))
|
||||||
|
|
||||||
def __setstate__(self, d):
|
def dump_state(self) -> dict:
|
||||||
self.__init__()
|
return {'_type' : IState._dump_enum(self._type),
|
||||||
self.__dict__.update(d)
|
'_ulmrks' : IState._dump_np_array(self._ulmrks),
|
||||||
|
}
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def create( type : ELandmarks2D, ulmrks : np.ndarray):
|
def create( type : ELandmarks2D, ulmrks : np.ndarray):
|
||||||
"""
|
"""
|
||||||
ulmrks np.ndarray (*,2|3)
|
ulmrks np.ndarray (*,2|3)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if not isinstance(type, ELandmarks2D):
|
if not isinstance(type, ELandmarks2D):
|
||||||
raise ValueError('type must be ELandmarks2D')
|
raise ValueError('type must be ELandmarks2D')
|
||||||
|
|
||||||
|
|
|
@ -1,21 +1,23 @@
|
||||||
from typing import Tuple
|
from typing import Tuple
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
from .. import math as lib_math
|
from .. import math as lib_math
|
||||||
|
from .IState import IState
|
||||||
|
|
||||||
|
|
||||||
class FPose:
|
class FPose(IState):
|
||||||
"""
|
"""
|
||||||
Describes face pitch/yaw/roll
|
Describes face pitch/yaw/roll
|
||||||
"""
|
"""
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._pyr : np.ndarray = None
|
self._pyr : np.ndarray = None
|
||||||
|
|
||||||
def __getstate__(self):
|
def restore_state(self, state : dict):
|
||||||
return self.__dict__.copy()
|
self._pyr = IState._restore_np_array(state.get('_pyr', None))
|
||||||
|
|
||||||
def __setstate__(self, d):
|
def dump_state(self) -> dict:
|
||||||
self.__init__()
|
return {'_pyr' : IState._dump_np_array(self._pyr)}
|
||||||
self.__dict__.update(d)
|
|
||||||
|
|
||||||
def as_radians(self) -> Tuple[float, float, float]:
|
def as_radians(self) -> Tuple[float, float, float]:
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -5,22 +5,24 @@ from typing import List, Tuple
|
||||||
import cv2
|
import cv2
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import numpy.linalg as npla
|
import numpy.linalg as npla
|
||||||
|
|
||||||
from .. import math as lib_math
|
from .. import math as lib_math
|
||||||
from ..math import Affine2DMat, Affine2DUniMat
|
from ..math import Affine2DMat, Affine2DUniMat
|
||||||
|
from .IState import IState
|
||||||
|
|
||||||
class FRect:
|
|
||||||
|
class FRect(IState):
|
||||||
"""
|
"""
|
||||||
Describes face rectangle in uniform float coordinates
|
Describes face rectangle in uniform float coordinates
|
||||||
"""
|
"""
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._pts = None
|
self._pts : np.ndarray = None
|
||||||
|
|
||||||
def __getstate__(self):
|
def restore_state(self, state : dict):
|
||||||
return self.__dict__.copy()
|
self._pts = IState._restore_np_array( state.get('_pts', None) )
|
||||||
|
|
||||||
def __setstate__(self, d):
|
def dump_state(self) -> dict:
|
||||||
self.__init__()
|
return {'_pts' : IState._dump_np_array(self._pts) }
|
||||||
self.__dict__.update(d)
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def sort_by_area_size(rects : List['FRect']):
|
def sort_by_area_size(rects : List['FRect']):
|
||||||
|
|
|
@ -39,29 +39,31 @@ class FaceWarper:
|
||||||
self._img_to_face_uni_mat = img_to_face_uni_mat
|
self._img_to_face_uni_mat = img_to_face_uni_mat
|
||||||
self._face_to_img_uni_mat = img_to_face_uni_mat.invert()
|
self._face_to_img_uni_mat = img_to_face_uni_mat.invert()
|
||||||
|
|
||||||
if rnd_state is None:
|
rnd_state = np.random.RandomState()
|
||||||
rnd_state = np.random
|
rnd_state.set_state(rnd_state.get_state() if rnd_state is not None else np.random.RandomState().get_state())
|
||||||
self._rnd_state_state = rnd_state.get_state()
|
|
||||||
|
|
||||||
self._align_rot_deg = rnd_state.uniform(*align_rot_deg) if isinstance(align_rot_deg, Iterable) else align_rot_deg
|
self._align_rot_deg = rnd_state.uniform(*align_rot_deg) if isinstance(align_rot_deg, Iterable) else align_rot_deg
|
||||||
self._align_scale = rnd_state.uniform(*align_scale) if isinstance(align_scale, Iterable) else align_scale
|
self._align_scale = rnd_state.uniform(*align_scale) if isinstance(align_scale, Iterable) else align_scale
|
||||||
self._align_tx = rnd_state.uniform(*align_tx) if isinstance(align_tx, Iterable) else align_tx
|
self._align_tx = rnd_state.uniform(*align_tx) if isinstance(align_tx, Iterable) else align_tx
|
||||||
self._align_ty = rnd_state.uniform(*align_ty) if isinstance(align_ty, Iterable) else align_ty
|
self._align_ty = rnd_state.uniform(*align_ty) if isinstance(align_ty, Iterable) else align_ty
|
||||||
self._rw_grid_cell_count = rnd_state.randint(*rw_grid_cell_count) if isinstance(rw_grid_cell_count, Iterable) else rw_grid_cell_count
|
self._rw_grid_cell_count = rnd_state.randint(*rw_grid_cell_count) if isinstance(rw_grid_cell_count, Iterable) else rw_grid_cell_count
|
||||||
self._rw_grid_rot_deg = rnd_state.uniform(*rw_grid_rot_deg) if isinstance(rw_grid_rot_deg, Iterable) else rw_grid_rot_deg
|
self._rw_grid_rot_deg = rnd_state.uniform(*rw_grid_rot_deg) if isinstance(rw_grid_rot_deg, Iterable) else rw_grid_rot_deg
|
||||||
self._rw_grid_scale = rnd_state.uniform(*rw_grid_scale) if isinstance(rw_grid_scale, Iterable) else rw_grid_scale
|
self._rw_grid_scale = rnd_state.uniform(*rw_grid_scale) if isinstance(rw_grid_scale, Iterable) else rw_grid_scale
|
||||||
self._rw_grid_tx = rnd_state.uniform(*rw_grid_tx) if isinstance(rw_grid_tx, Iterable) else rw_grid_tx
|
self._rw_grid_tx = rnd_state.uniform(*rw_grid_tx) if isinstance(rw_grid_tx, Iterable) else rw_grid_tx
|
||||||
self._rw_grid_ty = rnd_state.uniform(*rw_grid_ty) if isinstance(rw_grid_ty, Iterable) else rw_grid_ty
|
self._rw_grid_ty = rnd_state.uniform(*rw_grid_ty) if isinstance(rw_grid_ty, Iterable) else rw_grid_ty
|
||||||
|
|
||||||
|
self._rnd_state_state = rnd_state.get_state()
|
||||||
self._cached = {}
|
self._cached = {}
|
||||||
|
|
||||||
def transform(self, img : np.ndarray, out_res : int, random_warp : bool = True) -> np.ndarray:
|
def transform(self, img : np.ndarray, out_res : int, random_warp : bool = True) -> np.ndarray:
|
||||||
"""
|
"""
|
||||||
transform an image. Subsequent calls will output the same result for any img shape and out_res.
|
transform an image.
|
||||||
|
|
||||||
img np.ndarray (HWC)
|
Subsequent calls will output the same result for any img shape and out_res.
|
||||||
|
|
||||||
|
img np.ndarray (HWC)
|
||||||
|
|
||||||
out_res int
|
out_res int
|
||||||
|
|
||||||
random_warp(True) bool
|
random_warp(True) bool
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -38,7 +38,7 @@ class Faceset:
|
||||||
self.shrink()
|
self.shrink()
|
||||||
else:
|
else:
|
||||||
cur.execute('END')
|
cur.execute('END')
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
self.close()
|
self.close()
|
||||||
|
|
||||||
|
@ -50,7 +50,7 @@ class Faceset:
|
||||||
|
|
||||||
def __repr__(self): return self.__str__()
|
def __repr__(self): return self.__str__()
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return f"Faceset. UImage:{self.get_UImage_count()} UFaceMark:{self.get_UFaceMark_count()}"
|
return f"Faceset. UImage:{self.get_UImage_count()} UFaceMark:{self.get_UFaceMark_count()} UPerson:{self.get_UPerson_count()}"
|
||||||
|
|
||||||
def _is_table_exists(self, name):
|
def _is_table_exists(self, name):
|
||||||
return self._cur.execute(f"SELECT count(*) FROM sqlite_master WHERE type='table' AND name=?", [name]).fetchone()[0] != 0
|
return self._cur.execute(f"SELECT count(*) FROM sqlite_master WHERE type='table' AND name=?", [name]).fetchone()[0] != 0
|
||||||
|
@ -85,8 +85,8 @@ class Faceset:
|
||||||
.execute('INSERT INTO FacesetInfo VALUES (1)')
|
.execute('INSERT INTO FacesetInfo VALUES (1)')
|
||||||
|
|
||||||
.execute('CREATE TABLE UImage (uuid BLOB, name TEXT, format TEXT, data BLOB)')
|
.execute('CREATE TABLE UImage (uuid BLOB, name TEXT, format TEXT, data BLOB)')
|
||||||
.execute('CREATE TABLE UPerson (uuid BLOB, name TEXT, age NUMERIC)')
|
.execute('CREATE TABLE UPerson (uuid BLOB, data BLOB)')
|
||||||
.execute('CREATE TABLE UFaceMark (uuid BLOB, UImage_uuid BLOB, UPerson_uuid BLOB, pickled_bytes BLOB)')
|
.execute('CREATE TABLE UFaceMark (uuid BLOB, UImage_uuid BLOB, UPerson_uuid BLOB, data BLOB)')
|
||||||
)
|
)
|
||||||
|
|
||||||
if _transaction:
|
if _transaction:
|
||||||
|
@ -99,46 +99,49 @@ class Faceset:
|
||||||
### UFaceMark
|
### UFaceMark
|
||||||
###################
|
###################
|
||||||
def _UFaceMark_from_db_row(self, db_row) -> UFaceMark:
|
def _UFaceMark_from_db_row(self, db_row) -> UFaceMark:
|
||||||
uuid, UImage_uuid, UPerson_uuid, pickled_bytes = db_row
|
uuid, UImage_uuid, UPerson_uuid, data = db_row
|
||||||
return pickle.loads(pickled_bytes)
|
|
||||||
|
ufm = UFaceMark()
|
||||||
|
ufm.restore_state(pickle.loads(data))
|
||||||
|
return ufm
|
||||||
|
|
||||||
def add_UFaceMark(self, ufacemark_or_list : UFaceMark):
|
def add_UFaceMark(self, ufacemark_or_list : UFaceMark):
|
||||||
"""
|
"""
|
||||||
add or update UFaceMark in DB
|
add or update UFaceMark in DB
|
||||||
"""
|
"""
|
||||||
if not isinstance(ufacemark_or_list, Iterable):
|
if not isinstance(ufacemark_or_list, Iterable):
|
||||||
ufacemark_or_list = [ufacemark_or_list]
|
ufacemark_or_list : List[UFaceMark] = [ufacemark_or_list]
|
||||||
|
|
||||||
cur = self._cur
|
cur = self._cur
|
||||||
cur.execute('BEGIN IMMEDIATE')
|
cur.execute('BEGIN IMMEDIATE')
|
||||||
for ufacemark in ufacemark_or_list:
|
for ufm in ufacemark_or_list:
|
||||||
pickled_bytes = pickle.dumps(ufacemark)
|
uuid = ufm.get_uuid()
|
||||||
uuid = ufacemark.get_uuid()
|
UImage_uuid = ufm.get_UImage_uuid()
|
||||||
UImage_uuid = ufacemark.get_UImage_uuid()
|
UPerson_uuid = ufm.get_UPerson_uuid()
|
||||||
UPerson_uuid = ufacemark.get_UPerson_uuid()
|
|
||||||
|
data = pickle.dumps(ufm.dump_state())
|
||||||
|
|
||||||
if cur.execute('SELECT COUNT(*) from UFaceMark where uuid=?', [uuid] ).fetchone()[0] != 0:
|
if cur.execute('SELECT COUNT(*) from UFaceMark where uuid=?', [uuid] ).fetchone()[0] != 0:
|
||||||
cur.execute('UPDATE UFaceMark SET UImage_uuid=?, UPerson_uuid=?, pickled_bytes=? WHERE uuid=?',
|
cur.execute('UPDATE UFaceMark SET UImage_uuid=?, UPerson_uuid=?, data=? WHERE uuid=?',
|
||||||
[UImage_uuid, UPerson_uuid, pickled_bytes, uuid])
|
[UImage_uuid, UPerson_uuid, data, uuid])
|
||||||
else:
|
else:
|
||||||
cur.execute('INSERT INTO UFaceMark VALUES (?, ?, ?, ?)', [uuid, UImage_uuid, UPerson_uuid, pickled_bytes])
|
cur.execute('INSERT INTO UFaceMark VALUES (?, ?, ?, ?)', [uuid, UImage_uuid, UPerson_uuid, data])
|
||||||
cur.execute('COMMIT')
|
cur.execute('COMMIT')
|
||||||
|
|
||||||
|
|
||||||
def get_UFaceMark_count(self) -> int:
|
def get_UFaceMark_count(self) -> int:
|
||||||
return self._cur.execute('SELECT COUNT(*) FROM UFaceMark').fetchone()[0]
|
return self._cur.execute('SELECT COUNT(*) FROM UFaceMark').fetchone()[0]
|
||||||
|
|
||||||
def get_all_UFaceMark(self) -> List[UFaceMark]:
|
def get_all_UFaceMark(self) -> List[UFaceMark]:
|
||||||
return [ pickle.loads(pickled_bytes) for pickled_bytes, in self._cur.execute('SELECT pickled_bytes FROM UFaceMark').fetchall() ]
|
return [ self._UFaceMark_from_db_row(db_row) for db_row in self._cur.execute('SELECT * FROM UFaceMark').fetchall() ]
|
||||||
|
|
||||||
def get_UFaceMark_by_uuid(self, uuid : bytes) -> Union[UFaceMark, None]:
|
def get_UFaceMark_by_uuid(self, uuid : bytes) -> Union[UFaceMark, None]:
|
||||||
c = self._cur.execute('SELECT * FROM UFaceMark WHERE uuid=?', [uuid])
|
c = self._cur.execute('SELECT * FROM UFaceMark WHERE uuid=?', [uuid])
|
||||||
db_row = c.fetchone()
|
db_row = c.fetchone()
|
||||||
if db_row is None:
|
if db_row is None:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
return self._UFaceMark_from_db_row(db_row)
|
return self._UFaceMark_from_db_row(db_row)
|
||||||
|
|
||||||
def iter_UFaceMark(self) -> Generator[UFaceMark, None, None]:
|
def iter_UFaceMark(self) -> Generator[UFaceMark, None, None]:
|
||||||
"""
|
"""
|
||||||
returns Generator of UFaceMark
|
returns Generator of UFaceMark
|
||||||
|
@ -156,35 +159,44 @@ class Faceset:
|
||||||
###################
|
###################
|
||||||
### UPerson
|
### UPerson
|
||||||
###################
|
###################
|
||||||
|
def _UPerson_from_db_row(self, db_row) -> UPerson:
|
||||||
|
uuid, data = db_row
|
||||||
|
up = UPerson()
|
||||||
|
up.restore_state(pickle.loads(data))
|
||||||
|
return up
|
||||||
|
|
||||||
def add_UPerson(self, uperson_or_list : UPerson):
|
def add_UPerson(self, uperson_or_list : UPerson):
|
||||||
"""
|
"""
|
||||||
add or update UPerson in DB
|
add or update UPerson in DB
|
||||||
"""
|
"""
|
||||||
if not isinstance(uperson_or_list, Iterable):
|
if not isinstance(uperson_or_list, Iterable):
|
||||||
uperson_or_list = [uperson_or_list]
|
uperson_or_list : List[UPerson] = [uperson_or_list]
|
||||||
|
|
||||||
cur = self._cur
|
cur = self._cur
|
||||||
cur.execute('BEGIN IMMEDIATE')
|
cur.execute('BEGIN IMMEDIATE')
|
||||||
for uperson in uperson_or_list:
|
for uperson in uperson_or_list:
|
||||||
uuid = uperson.get_uuid()
|
uuid = uperson.get_uuid()
|
||||||
name = uperson.get_name()
|
|
||||||
age = uperson.get_age()
|
data = pickle.dumps(uperson.dump_state())
|
||||||
|
|
||||||
if cur.execute('SELECT COUNT(*) from UPerson where uuid=?', [uuid]).fetchone()[0] != 0:
|
if cur.execute('SELECT COUNT(*) from UPerson where uuid=?', [uuid]).fetchone()[0] != 0:
|
||||||
cur.execute('UPDATE UPerson SET name=?, age=? WHERE uuid=?', [name, age, uuid])
|
cur.execute('UPDATE UPerson SET data=? WHERE uuid=?', [data])
|
||||||
else:
|
else:
|
||||||
cur.execute('INSERT INTO UPerson VALUES (?, ?, ?)', [uuid, name, age])
|
cur.execute('INSERT INTO UPerson VALUES (?, ?)', [uuid, data])
|
||||||
cur.execute('COMMIT')
|
cur.execute('COMMIT')
|
||||||
|
|
||||||
|
def get_UPerson_count(self) -> int:
|
||||||
|
return self._cur.execute('SELECT COUNT(*) FROM UPerson').fetchone()[0]
|
||||||
|
|
||||||
|
def get_all_UPerson(self) -> List[UPerson]:
|
||||||
|
return [ self._UPerson_from_db_row(db_row) for db_row in self._cur.execute('SELECT * FROM UPerson').fetchall() ]
|
||||||
|
|
||||||
def iter_UPerson(self) -> Generator[UPerson, None, None]:
|
def iter_UPerson(self) -> Generator[UPerson, None, None]:
|
||||||
"""
|
"""
|
||||||
iterator of all UPerson's
|
iterator of all UPerson's
|
||||||
"""
|
"""
|
||||||
for uuid, name, age in self._cur.execute('SELECT * FROM UPerson').fetchall():
|
for db_row in self._cur.execute('SELECT * FROM UPerson').fetchall():
|
||||||
uperson = UPerson()
|
yield self._UPerson_from_db_row(db_row)
|
||||||
uperson.set_uuid(uuid)
|
|
||||||
uperson.set_name(name)
|
|
||||||
uperson.set_age(age)
|
|
||||||
yield uperson
|
|
||||||
|
|
||||||
def delete_all_UPerson(self):
|
def delete_all_UPerson(self):
|
||||||
"""
|
"""
|
||||||
|
@ -211,7 +223,7 @@ class Faceset:
|
||||||
"""
|
"""
|
||||||
add or update UImage in DB
|
add or update UImage in DB
|
||||||
|
|
||||||
uimage UImage object
|
uimage UImage or list
|
||||||
|
|
||||||
format('png') webp ( does not support lossless on 100 quality ! )
|
format('png') webp ( does not support lossless on 100 quality ! )
|
||||||
png ( lossless )
|
png ( lossless )
|
||||||
|
@ -229,13 +241,8 @@ class Faceset:
|
||||||
if not isinstance(uimage_or_list, Iterable):
|
if not isinstance(uimage_or_list, Iterable):
|
||||||
uimage_or_list = [uimage_or_list]
|
uimage_or_list = [uimage_or_list]
|
||||||
|
|
||||||
cur = self._cur
|
uimage_datas = []
|
||||||
cur.execute('BEGIN IMMEDIATE')
|
|
||||||
for uimage in uimage_or_list:
|
for uimage in uimage_or_list:
|
||||||
# TODO optimize move encoding to out of transaction
|
|
||||||
img = uimage.get_image()
|
|
||||||
uuid = uimage.get_uuid()
|
|
||||||
|
|
||||||
if format == 'webp':
|
if format == 'webp':
|
||||||
imencode_args = [int(cv2.IMWRITE_WEBP_QUALITY), quality]
|
imencode_args = [int(cv2.IMWRITE_WEBP_QUALITY), quality]
|
||||||
elif format == 'jpg':
|
elif format == 'jpg':
|
||||||
|
@ -244,15 +251,19 @@ class Faceset:
|
||||||
imencode_args = [int(cv2.IMWRITE_JPEG2000_COMPRESSION_X1000), quality*10]
|
imencode_args = [int(cv2.IMWRITE_JPEG2000_COMPRESSION_X1000), quality*10]
|
||||||
else:
|
else:
|
||||||
imencode_args = []
|
imencode_args = []
|
||||||
|
ret, data_bytes = cv2.imencode( f'.{format}', uimage.get_image(), imencode_args)
|
||||||
ret, data_bytes = cv2.imencode( f'.{format}', img, imencode_args)
|
|
||||||
if not ret:
|
if not ret:
|
||||||
raise Exception(f'Unable to encode image format {format}')
|
raise Exception(f'Unable to encode image format {format}')
|
||||||
|
uimage_datas.append(data_bytes.data)
|
||||||
|
|
||||||
|
cur = self._cur
|
||||||
|
cur.execute('BEGIN IMMEDIATE')
|
||||||
|
for uimage, data in zip(uimage_or_list, uimage_datas):
|
||||||
|
uuid = uimage.get_uuid()
|
||||||
if cur.execute('SELECT COUNT(*) from UImage where uuid=?', [uuid] ).fetchone()[0] != 0:
|
if cur.execute('SELECT COUNT(*) from UImage where uuid=?', [uuid] ).fetchone()[0] != 0:
|
||||||
cur.execute('UPDATE UImage SET name=?, format=?, data=? WHERE uuid=?', [uimage.get_name(), format, data_bytes.data, uuid])
|
cur.execute('UPDATE UImage SET name=?, format=?, data=? WHERE uuid=?', [uimage.get_name(), format, data, uuid])
|
||||||
else:
|
else:
|
||||||
cur.execute('INSERT INTO UImage VALUES (?, ?, ?, ?)', [uuid, uimage.get_name(), format, data_bytes.data])
|
cur.execute('INSERT INTO UImage VALUES (?, ?, ?, ?)', [uuid, uimage.get_name(), format, data])
|
||||||
cur.execute('COMMIT')
|
cur.execute('COMMIT')
|
||||||
|
|
||||||
def get_UImage_count(self) -> int: return self._cur.execute('SELECT COUNT(*) FROM UImage').fetchone()[0]
|
def get_UImage_count(self) -> int: return self._cur.execute('SELECT COUNT(*) FROM UImage').fetchone()[0]
|
||||||
|
@ -261,7 +272,7 @@ class Faceset:
|
||||||
"""
|
"""
|
||||||
if uuid is None:
|
if uuid is None:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
db_row = self._cur.execute('SELECT * FROM UImage where uuid=?', [uuid]).fetchone()
|
db_row = self._cur.execute('SELECT * FROM UImage where uuid=?', [uuid]).fetchone()
|
||||||
if db_row is None:
|
if db_row is None:
|
||||||
return None
|
return None
|
||||||
|
|
68
xlib/face/IState.py
Normal file
68
xlib/face/IState.py
Normal file
|
@ -0,0 +1,68 @@
|
||||||
|
from typing import Any, Union
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
|
class IState:
|
||||||
|
"""
|
||||||
|
"""
|
||||||
|
def __getstate__(self):
|
||||||
|
return self.__dict__.copy()
|
||||||
|
|
||||||
|
def __setstate__(self, d):
|
||||||
|
self.__init__()
|
||||||
|
self.__dict__.update(d)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _dump_IState_obj(obj : Union[Any, None]) -> Union[Any, None]:
|
||||||
|
"""
|
||||||
|
"""
|
||||||
|
if obj is None:
|
||||||
|
return None
|
||||||
|
return obj.dump_state()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _dump_np_array(n : Union[np.ndarray, None] ) -> Union[Any, None]:
|
||||||
|
if n is None:
|
||||||
|
return None
|
||||||
|
return ( n.data.tobytes(), n.dtype, n.shape )
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _dump_enum(enum_obj : Union[Any, None]) -> Union[Any, None]:
|
||||||
|
if enum_obj is None:
|
||||||
|
return None
|
||||||
|
return enum_obj.value
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _restore_IState_obj(cls_, state : Union[Any, None]) -> Union[np.ndarray, None]:
|
||||||
|
if state is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
obj = cls_()
|
||||||
|
obj.restore_state(state)
|
||||||
|
return obj
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _restore_np_array(state : Union[Any, None]) -> Union[np.ndarray, None]:
|
||||||
|
if state is None:
|
||||||
|
return None
|
||||||
|
return np.frombuffer(state[0], dtype=state[1]).reshape(state[2])
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _restore_enum(enum_cls, state : Union[Any, None]) -> Union[Any, None]:
|
||||||
|
if state is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return enum_cls(state)
|
||||||
|
|
||||||
|
def restore_state(self, state : dict):
|
||||||
|
"""
|
||||||
|
"""
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def dump_state(self) -> dict:
|
||||||
|
"""
|
||||||
|
returns import-independent state of class in dict
|
||||||
|
"""
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
|
@ -1,41 +1,53 @@
|
||||||
import uuid
|
import uuid
|
||||||
from typing import List, Tuple, Union
|
from typing import List, Union
|
||||||
|
|
||||||
from ..math import Affine2DMat
|
|
||||||
from .ELandmarks2D import ELandmarks2D
|
from .ELandmarks2D import ELandmarks2D
|
||||||
from .EMaskType import EMaskType
|
from .EMaskType import EMaskType
|
||||||
from .FLandmarks2D import FLandmarks2D
|
from .FLandmarks2D import FLandmarks2D
|
||||||
from .FPose import FPose
|
from .FPose import FPose
|
||||||
from .FRect import FRect
|
from .FRect import FRect
|
||||||
|
from .IState import IState
|
||||||
|
|
||||||
|
|
||||||
class UFaceMark:
|
class UFaceMark(IState):
|
||||||
def __init__(self, _from_pickled=False):
|
def __init__(self):
|
||||||
"""
|
"""
|
||||||
Describes single face in the image.
|
Describes single face in the image.
|
||||||
"""
|
"""
|
||||||
self._uuid : Union[bytes, None] = uuid.uuid4().bytes_le if not _from_pickled else None
|
self._uuid : Union[bytes, None] = None
|
||||||
self._UImage_uuid : Union[bytes, None] = None
|
self._UImage_uuid : Union[bytes, None] = None
|
||||||
self._UPerson_uuid : Union[bytes, None] = None
|
self._UPerson_uuid : Union[bytes, None] = None
|
||||||
self._FRect : Union[FRect, None] = None
|
self._FRect : Union[FRect, None] = None
|
||||||
self._FLandmarks2D_list : List[FLandmarks2D] = []
|
self._FLandmarks2D_list : List[FLandmarks2D] = []
|
||||||
self._FPose : Union[FPose, None] = None
|
self._FPose : Union[FPose, None] = None
|
||||||
self._mask_info_list : List = []
|
#self._mask_info_list : List = []
|
||||||
|
|
||||||
def __getstate__(self):
|
|
||||||
return self.__dict__.copy()
|
|
||||||
|
|
||||||
def __setstate__(self, d):
|
|
||||||
self.__init__(_from_pickled=True)
|
|
||||||
self.__dict__.update(d)
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
s = "Masks: "
|
|
||||||
return f"UFaceMark UUID:[...{self._uuid[-4:].hex()}]"
|
|
||||||
|
|
||||||
def __repr__(self): return self.__str__()
|
def __repr__(self): return self.__str__()
|
||||||
|
def __str__(self):
|
||||||
def get_uuid(self) -> Union[bytes, None]: return self._uuid
|
return f"UFaceMark UUID:[...{self.get_uuid()[-4:].hex()}]"
|
||||||
|
|
||||||
|
def restore_state(self, state : dict):
|
||||||
|
self._uuid = state.get('_uuid', None)
|
||||||
|
self._UImage_uuid = state.get('_UImage_uuid', None)
|
||||||
|
self._UPerson_uuid = state.get('_UPerson_uuid', None)
|
||||||
|
self._FRect = IState._restore_IState_obj(FRect, state.get('_FRect', None))
|
||||||
|
self._FLandmarks2D_list = [ IState._restore_IState_obj(FLandmarks2D, lmrks_state) for lmrks_state in state['_FLandmarks2D_list'] ]
|
||||||
|
self._FPose = IState._restore_IState_obj(FPose, state.get('_FPose', None))
|
||||||
|
|
||||||
|
def dump_state(self) -> dict:
|
||||||
|
return {'_uuid' : self._uuid,
|
||||||
|
'_UImage_uuid' : self._UImage_uuid,
|
||||||
|
'_UPerson_uuid' : self._UPerson_uuid,
|
||||||
|
'_FRect' : IState._dump_IState_obj(self._FRect),
|
||||||
|
'_FLandmarks2D_list': tuple( IState._dump_IState_obj(fl) for fl in self._FLandmarks2D_list),
|
||||||
|
'_FPose' : IState._dump_IState_obj(self._FPose),
|
||||||
|
}
|
||||||
|
|
||||||
|
def get_uuid(self) -> Union[bytes, None]:
|
||||||
|
if self._uuid is None:
|
||||||
|
self._uuid = uuid.uuid4().bytes_le
|
||||||
|
return self._uuid
|
||||||
|
|
||||||
def set_uuid(self, uuid : Union[bytes, None]):
|
def set_uuid(self, uuid : Union[bytes, None]):
|
||||||
if uuid is not None and not isinstance(uuid, bytes):
|
if uuid is not None and not isinstance(uuid, bytes):
|
||||||
raise ValueError(f'uuid must be an instance of bytes or None')
|
raise ValueError(f'uuid must be an instance of bytes or None')
|
||||||
|
@ -85,15 +97,15 @@ class UFaceMark:
|
||||||
raise ValueError('face_pose must be an instance of FPose')
|
raise ValueError('face_pose must be an instance of FPose')
|
||||||
self._FPose = face_pose
|
self._FPose = face_pose
|
||||||
|
|
||||||
def get_mask_info_list(self) -> List[Tuple[EMaskType, bytes, Affine2DMat]]:
|
# def get_mask_info_list(self) -> List[Tuple[EMaskType, bytes, Affine2DMat]]:
|
||||||
return self._mask_info_list
|
# return self._mask_info_list
|
||||||
|
|
||||||
def add_mask_info(self, mask_type : EMaskType, UImage_uuid : bytes, mask_to_mark_uni_mat : Affine2DMat):
|
# def add_mask_info(self, mask_type : EMaskType, UImage_uuid : bytes, mask_to_mark_uni_mat : Affine2DMat):
|
||||||
if not isinstance(mask_type, EMaskType):
|
# if not isinstance(mask_type, EMaskType):
|
||||||
raise ValueError('mask_type must be an instance of EMaskType')
|
# raise ValueError('mask_type must be an instance of EMaskType')
|
||||||
if not isinstance(UImage_uuid, bytes):
|
# if not isinstance(UImage_uuid, bytes):
|
||||||
raise ValueError('UImage_uuid must be an instance of bytes')
|
# raise ValueError('UImage_uuid must be an instance of bytes')
|
||||||
if not isinstance(mask_to_mark_uni_mat, Affine2DMat):
|
# if not isinstance(mask_to_mark_uni_mat, Affine2DMat):
|
||||||
raise ValueError('mask_to_mark_uni_mat must be an instance of Affine2DMat')
|
# raise ValueError('mask_to_mark_uni_mat must be an instance of Affine2DMat')
|
||||||
|
|
||||||
self._mask_info_list.append( (mask_type, UImage_uuid, mask_to_mark_uni_mat) )
|
# self._mask_info_list.append( (mask_type, UImage_uuid, mask_to_mark_uni_mat) )
|
||||||
|
|
|
@ -3,27 +3,27 @@ from typing import Union
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
class UImage:
|
from .IState import IState
|
||||||
|
|
||||||
|
|
||||||
|
class UImage(IState):
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
"""
|
"""
|
||||||
represents uncompressed image uint8 HWC ( 1/3/4 channels )
|
represents uncompressed image uint8 HWC ( 1/3/4 channels )
|
||||||
"""
|
"""
|
||||||
self._uuid : Union[bytes, None] = uuid.uuid4().bytes_le
|
self._uuid : Union[bytes, None] = None
|
||||||
self._name : Union[str, None] = None
|
self._name : Union[str, None] = None
|
||||||
self._image : np.ndarray = None
|
self._image : np.ndarray = None
|
||||||
|
|
||||||
def __getstate__(self):
|
def __str__(self): return f"UImage UUID:[...{self.get_uuid()[-4:].hex()}] name:[{self._name}] image:[{ (self._image.shape, self._image.dtype) if self._image is not None else None}]"
|
||||||
return self.__dict__.copy()
|
|
||||||
|
|
||||||
def __setstate__(self, d):
|
|
||||||
self.__init__()
|
|
||||||
self.__dict__.update(d)
|
|
||||||
|
|
||||||
def __str__(self): return f"UImage UUID:[...{self._uuid[-4:].hex()}] name:[{self._name}] image:[{ (self._image.shape, self._image.dtype) if self._image is not None else None}]"
|
|
||||||
def __repr__(self): return self.__str__()
|
def __repr__(self): return self.__str__()
|
||||||
|
|
||||||
def get_uuid(self) -> Union[bytes, None]: return self._uuid
|
def get_uuid(self) -> Union[bytes, None]:
|
||||||
|
if self._uuid is None:
|
||||||
|
self._uuid = uuid.uuid4().bytes_le
|
||||||
|
return self._uuid
|
||||||
|
|
||||||
def set_uuid(self, uuid : Union[bytes, None]):
|
def set_uuid(self, uuid : Union[bytes, None]):
|
||||||
if uuid is not None and not isinstance(uuid, bytes):
|
if uuid is not None and not isinstance(uuid, bytes):
|
||||||
raise ValueError(f'uuid must be an instance of bytes or None')
|
raise ValueError(f'uuid must be an instance of bytes or None')
|
||||||
|
@ -47,7 +47,7 @@ class UImage:
|
||||||
if image is not None:
|
if image is not None:
|
||||||
if image.ndim == 2:
|
if image.ndim == 2:
|
||||||
image = image[...,None]
|
image = image[...,None]
|
||||||
|
|
||||||
if image.ndim != 3:
|
if image.ndim != 3:
|
||||||
raise ValueError('image must have ndim == 3')
|
raise ValueError('image must have ndim == 3')
|
||||||
_,_,C = image.shape
|
_,_,C = image.shape
|
||||||
|
|
|
@ -1,26 +1,36 @@
|
||||||
import uuid
|
import uuid
|
||||||
from typing import Union
|
from typing import Union
|
||||||
|
|
||||||
|
from .IState import IState
|
||||||
|
|
||||||
class UPerson:
|
|
||||||
def __init__(self, _from_pickled=False):
|
class UPerson(IState):
|
||||||
|
def __init__(self):
|
||||||
"""
|
"""
|
||||||
"""
|
"""
|
||||||
self._uuid : Union[bytes, None] = uuid.uuid4().bytes_le if not _from_pickled else None
|
self._uuid : Union[bytes, None] = None
|
||||||
self._name : Union[str, None] = None
|
self._name : Union[str, None] = None
|
||||||
self._age : Union[int, None] = None
|
self._age : Union[int, None] = None
|
||||||
|
|
||||||
def __getstate__(self):
|
|
||||||
return self.__dict__.copy()
|
|
||||||
|
|
||||||
def __setstate__(self, d):
|
|
||||||
self.__init__(_from_pickled=True)
|
|
||||||
self.__dict__.update(d)
|
|
||||||
|
|
||||||
def __str__(self): return f"UPerson UUID:[...{self._uuid[-4:].hex()}] name:[{self._name}] age:[{self._age}]"
|
def __str__(self): return f"UPerson UUID:[...{self._uuid[-4:].hex()}] name:[{self._name}] age:[{self._age}]"
|
||||||
def __repr__(self): return self.__str__()
|
def __repr__(self): return self.__str__()
|
||||||
|
|
||||||
def get_uuid(self) -> Union[bytes, None]: return self._uuid
|
def restore_state(self, state : dict):
|
||||||
|
self._uuid = state.get('_uuid', None)
|
||||||
|
self._name = state.get('_name', None)
|
||||||
|
self._age = state.get('_age', None)
|
||||||
|
|
||||||
|
def dump_state(self) -> dict:
|
||||||
|
return {'_uuid' : self._uuid,
|
||||||
|
'_name' : self._name,
|
||||||
|
'_age' : self._age,
|
||||||
|
}
|
||||||
|
|
||||||
|
def get_uuid(self) -> Union[bytes, None]:
|
||||||
|
if self._uuid is None:
|
||||||
|
self._uuid = uuid.uuid4().bytes_le
|
||||||
|
return self._uuid
|
||||||
|
|
||||||
def set_uuid(self, uuid : Union[bytes, None]):
|
def set_uuid(self, uuid : Union[bytes, None]):
|
||||||
if uuid is not None and not isinstance(uuid, bytes):
|
if uuid is not None and not isinstance(uuid, bytes):
|
||||||
raise ValueError(f'uuid must be an instance of bytes or None')
|
raise ValueError(f'uuid must be an instance of bytes or None')
|
||||||
|
@ -36,4 +46,4 @@ class UPerson:
|
||||||
def set_age(self, age : Union[int, None]):
|
def set_age(self, age : Union[int, None]):
|
||||||
if age is not None and not isinstance(age, int):
|
if age is not None and not isinstance(age, int):
|
||||||
raise ValueError(f'age must be an instance of int or None')
|
raise ValueError(f'age must be an instance of int or None')
|
||||||
self._age = age
|
self._age = age
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue