import multiprocessing import pickle import struct from core.joblib import Subprocessor class MPSharedList(): """ Provides read-only pickled list of constant objects via shared memory aka 'multiprocessing.Array' Thus no 4GB limit for subprocesses. supports list concat via + or sum() """ def __init__(self, obj_list): if obj_list is None: self.obj_counts = None self.table_offsets = None self.data_offsets = None self.sh_bs = None else: obj_count, table_offset, data_offset, sh_b = MPSharedList.bake_data(obj_list) self.obj_counts = [obj_count] self.table_offsets = [table_offset] self.data_offsets = [data_offset] self.sh_bs = [sh_b] def __add__(self, o): if isinstance(o, MPSharedList): m = MPSharedList(None) m.obj_counts = self.obj_counts + o.obj_counts m.table_offsets = self.table_offsets + o.table_offsets m.data_offsets = self.data_offsets + o.data_offsets m.sh_bs = self.sh_bs + o.sh_bs return m elif isinstance(o, int): return self else: raise ValueError(f"MPSharedList object of class {o.__class__} is not supported for __add__ operator.") def __radd__(self, o): return self+o def __len__(self): return sum(self.obj_counts) def __getitem__(self, key): obj_count = sum(self.obj_counts) if key < 0: key = obj_count+key if key < 0 or key >= obj_count: raise ValueError("out of range") for i in range(len(self.obj_counts)): if key < self.obj_counts[i]: table_offset = self.table_offsets[i] data_offset = self.data_offsets[i] sh_b = self.sh_bs[i] break key -= self.obj_counts[i] offset_start, offset_end = struct.unpack(' 0: return self.data_list.pop(-1) return None #override def on_data_return (self, host_dict, data): self.data_list.insert(0, data) #override def on_result (self, host_dict, data, result): pass class Cli(Subprocessor.Cli): #overridable optional def on_initialize(self, client_dict): self.sh_b = client_dict['sh_b'] def process_data(self, data): offset, b = data self.sh_b[offset:offset+len(b)]=b return 0