The load time of training data has been reduced significantly.

This commit is contained in:
iperov 2020-12-20 19:17:24 +04:00
parent 554217d026
commit 977d8a2d77

View file

@ -7,52 +7,52 @@ class MPSharedList():
""" """
Provides read-only pickled list of constant objects via shared memory aka 'multiprocessing.Array' Provides read-only pickled list of constant objects via shared memory aka 'multiprocessing.Array'
Thus no 4GB limit for subprocesses. Thus no 4GB limit for subprocesses.
supports list concat via + or sum() supports list concat via + or sum()
""" """
def __init__(self, obj_list): def __init__(self, obj_list):
if obj_list is None: if obj_list is None:
self.obj_counts = None self.obj_counts = None
self.table_offsets = None self.table_offsets = None
self.data_offsets = None self.data_offsets = None
self.sh_bs = None self.sh_bs = None
else: else:
obj_count, table_offset, data_offset, sh_b = MPSharedList.bake_data(obj_list) obj_count, table_offset, data_offset, sh_b = MPSharedList.bake_data(obj_list)
self.obj_counts = [obj_count] self.obj_counts = [obj_count]
self.table_offsets = [table_offset] self.table_offsets = [table_offset]
self.data_offsets = [data_offset] self.data_offsets = [data_offset]
self.sh_bs = [sh_b] self.sh_bs = [sh_b]
def __add__(self, o): def __add__(self, o):
if isinstance(o, MPSharedList): if isinstance(o, MPSharedList):
m = MPSharedList(None) m = MPSharedList(None)
m.obj_counts = self.obj_counts + o.obj_counts m.obj_counts = self.obj_counts + o.obj_counts
m.table_offsets = self.table_offsets + o.table_offsets m.table_offsets = self.table_offsets + o.table_offsets
m.data_offsets = self.data_offsets + o.data_offsets m.data_offsets = self.data_offsets + o.data_offsets
m.sh_bs = self.sh_bs + o.sh_bs m.sh_bs = self.sh_bs + o.sh_bs
return m return m
elif isinstance(o, int): elif isinstance(o, int):
return self return self
else: else:
raise ValueError(f"MPSharedList object of class {o.__class__} is not supported for __add__ operator.") raise ValueError(f"MPSharedList object of class {o.__class__} is not supported for __add__ operator.")
def __radd__(self, o): def __radd__(self, o):
return self+o return self+o
def __len__(self): def __len__(self):
return sum(self.obj_counts) return sum(self.obj_counts)
def __getitem__(self, key): def __getitem__(self, key):
obj_count = sum(self.obj_counts) obj_count = sum(self.obj_counts)
if key < 0: if key < 0:
key = obj_count+key key = obj_count+key
if key < 0 or key >= obj_count: if key < 0 or key >= obj_count:
raise ValueError("out of range") raise ValueError("out of range")
for i in range(len(self.obj_counts)): for i in range(len(self.obj_counts)):
if key < self.obj_counts[i]: if key < self.obj_counts[i]:
table_offset = self.table_offsets[i] table_offset = self.table_offsets[i]
data_offset = self.data_offsets[i] data_offset = self.data_offsets[i]
@ -63,85 +63,47 @@ class MPSharedList():
offset_start, offset_end = struct.unpack('<QQ', bytes(sh_b[ table_offset + key*8 : table_offset + (key+2)*8]) ) offset_start, offset_end = struct.unpack('<QQ', bytes(sh_b[ table_offset + key*8 : table_offset + (key+2)*8]) )
return pickle.loads( bytes(sh_b[ data_offset + offset_start : data_offset + offset_end ]) ) return pickle.loads( bytes(sh_b[ data_offset + offset_start : data_offset + offset_end ]) )
def __iter__(self): def __iter__(self):
for i in range(self.__len__()): for i in range(self.__len__()):
yield self.__getitem__(i) yield self.__getitem__(i)
@staticmethod @staticmethod
def bake_data(obj_list): def bake_data(obj_list):
if not isinstance(obj_list, list): if not isinstance(obj_list, list):
raise ValueError("MPSharedList: obj_list should be list type.") raise ValueError("MPSharedList: obj_list should be list type.")
obj_count = len(obj_list) obj_count = len(obj_list)
if obj_count != 0: if obj_count != 0:
obj_pickled_ar = [pickle.dumps(o, 4) for o in obj_list] obj_pickled_ar = [pickle.dumps(o, 4) for o in obj_list]
table_offset = 0 table_offset = 0
table_size = (obj_count+1)*8 table_size = (obj_count+1)*8
data_offset = table_offset + table_size data_offset = table_offset + table_size
data_size = sum([len(x) for x in obj_pickled_ar]) data_size = sum([len(x) for x in obj_pickled_ar])
sh_b = multiprocessing.RawArray('B', table_size + data_size) sh_b = multiprocessing.RawArray('B', table_size + data_size)
sh_b[0:8] = struct.pack('<Q', obj_count) #sh_b[0:8] = struct.pack('<Q', obj_count)
sh_b_view = memoryview(sh_b).cast('B')
offset = 0 offset = 0
sh_b_table = bytes() sh_b_table = bytes()
offsets = [] offsets = []
offset = 0 offset = 0
for i in range(obj_count): for i in range(obj_count):
offsets.append(offset) offsets.append(offset)
offset += len(obj_pickled_ar[i]) offset += len(obj_pickled_ar[i])
offsets.append(offset) offsets.append(offset)
sh_b[table_offset:table_offset+table_size] = struct.pack( '<'+'Q'*len(offsets), *offsets )
ArrayFillerSubprocessor(sh_b, [ (data_offset+offsets[i], obj_pickled_ar[i] ) for i in range(obj_count) ] ).run() sh_b_view[table_offset:table_offset+table_size] = struct.pack( '<'+'Q'*len(offsets), *offsets )
for i, obj_pickled in enumerate(obj_pickled_ar):
offset = data_offset+offsets[i]
sh_b_view[offset:offset+len(obj_pickled)] = obj_pickled_ar[i]
return obj_count, table_offset, data_offset, sh_b return obj_count, table_offset, data_offset, sh_b
return 0, 0, 0, None return 0, 0, 0, None
class ArrayFillerSubprocessor(Subprocessor):
"""
Much faster to fill shared memory via subprocesses rather than direct whole bytes fill.
"""
#override
def __init__(self, sh_b, data_list ):
self.sh_b = sh_b
self.data_list = data_list
super().__init__('ArrayFillerSubprocessor', ArrayFillerSubprocessor.Cli, 60, io_loop_sleep_time=0.001)
#override
def process_info_generator(self):
for i in range(min(multiprocessing.cpu_count(), 8)):
yield 'CPU%d' % (i), {}, {'sh_b':self.sh_b}
#override
def get_data(self, host_dict):
if len(self.data_list) > 0:
return self.data_list.pop(-1)
return None
#override
def on_data_return (self, host_dict, data):
self.data_list.insert(0, data)
#override
def on_result (self, host_dict, data, result):
pass
class Cli(Subprocessor.Cli):
#overridable optional
def on_initialize(self, client_dict):
self.sh_b = client_dict['sh_b']
def process_data(self, data):
offset, b = data
self.sh_b[offset:offset+len(b)]=b
return 0