From 31c3c353a3b069df7b710f52595d1b8310e615df Mon Sep 17 00:00:00 2001 From: aber-sandag Date: Wed, 10 Apr 2024 10:24:36 -0700 Subject: [PATCH] Don't use shared memory when running singleprocessed (temp fix) --- activitysim/core/skim_dict_factory.py | 37 +++++++++++++++------------ 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/activitysim/core/skim_dict_factory.py b/activitysim/core/skim_dict_factory.py index 5755964d5..a3b37aea2 100644 --- a/activitysim/core/skim_dict_factory.py +++ b/activitysim/core/skim_dict_factory.py @@ -409,7 +409,7 @@ def allocate_skim_buffer(self, skim_info, shared=False): f"total size: {util.INT(csz)} ({util.GB(csz)})" ) - # if shared: + if shared: # if dtype_name == "float64": # typecode = "d" # elif dtype_name == "float32": @@ -420,15 +420,15 @@ def allocate_skim_buffer(self, skim_info, shared=False): # ) # buffer = multiprocessing.RawArray(typecode, buffer_size) - shared_mem_name = f"skim_shared_memory__{skim_info.skim_tag}" - try: - buffer = multiprocessing.shared_memory.SharedMemory(name=shared_mem_name) - logger.info(f"skim buffer already allocated in shared memory: {shared_mem_name}, size: {buffer.size}") - except FileNotFoundError: - buffer = multiprocessing.shared_memory.SharedMemory(create=True, size=csz, name=shared_mem_name) - logger.info(f"allocating skim buffer in shared memory: {shared_mem_name}, size: {buffer.size}") - # else: - # buffer = np.zeros(buffer_size, dtype=dtype) + shared_mem_name = f"skim_shared_memory__{skim_info.skim_tag}" + try: + buffer = multiprocessing.shared_memory.SharedMemory(name=shared_mem_name) + logger.info(f"skim buffer already allocated in shared memory: {shared_mem_name}, size: {buffer.size}") + except FileNotFoundError: + buffer = multiprocessing.shared_memory.SharedMemory(create=True, size=csz, name=shared_mem_name) + logger.info(f"allocating skim buffer in shared memory: {shared_mem_name}, size: {buffer.size}") + else: + buffer = np.zeros(buffer_size, dtype=dtype) return buffer @@ -447,11 +447,16 @@ def _skim_data_from_buffer(self, skim_info, skim_buffer): """ dtype = np.dtype(skim_info.dtype_name) - # assert len(skim_buffer) == util.iprod(skim_info.skim_data_shape) - assert skim_buffer.size >= util.iprod(skim_info.skim_data_shape) * dtype.itemsize - skim_data = np.frombuffer(skim_buffer.buf, dtype=dtype, count=util.iprod(skim_info.skim_data_shape)).reshape( - skim_info.skim_data_shape - ) + if isinstance(skim_buffer, multiprocessing.shared_memory.SharedMemory): + assert skim_buffer.size >= util.iprod(skim_info.skim_data_shape) * dtype.itemsize + skim_data = np.frombuffer(skim_buffer.buf, dtype=dtype, count=util.iprod(skim_info.skim_data_shape)).reshape( + skim_info.skim_data_shape + ) + else: + assert len(skim_buffer) == util.iprod(skim_info.skim_data_shape) + skim_data = np.frombuffer(skim_buffer, dtype=dtype).reshape( + skim_info.skim_data_shape + ) return skim_data def load_skims_to_buffer(self, skim_info, skim_buffer): @@ -470,7 +475,7 @@ def load_skims_to_buffer(self, skim_info, skim_buffer): skim_data = self._skim_data_from_buffer(skim_info, skim_buffer) assert skim_data.shape == skim_info.skim_data_shape - if skim_data.any(): + if isinstance(skim_buffer, multiprocessing.shared_memory.SharedMemory) and skim_data.any(): return if read_cache: