diff --git a/cloudvolume/datasource/precomputed/image/common.py b/cloudvolume/datasource/precomputed/image/common.py index 5250308f..9f9f1360 100644 --- a/cloudvolume/datasource/precomputed/image/common.py +++ b/cloudvolume/datasource/precomputed/image/common.py @@ -63,12 +63,17 @@ def parallel_execution( ): global error_queue - error_queue = mp.Queue() - progress_queue = mp.Queue() - fs_lock = mp.Lock() + # Ensure processes do not accidentally inherit + # locks from forking and deadlock. Instead launch + # new interpreters. + ctx = mp.get_context("spawn") + + error_queue = ctx.Queue() + progress_queue = ctx.Queue() + fs_lock = ctx.Lock() if parallel is True: - parallel = mp.cpu_count() + parallel = ctx.cpu_count() elif parallel <= 0: raise ValueError(f"Parallel must be a positive number or boolean (True: all cpus). Got: {parallel}") @@ -104,11 +109,6 @@ def cleanup(signum, frame): ) proc.start() - # Ensure processes do not accidentally inherit - # locks from forking and deadlock. Instead launch - # new interpreters. - ctx = mp.get_context("spawn") - with concurrent.futures.ProcessPoolExecutor( max_workers=parallel, initializer=initialize_synchronization, diff --git a/cloudvolume/datasource/precomputed/image/rx.py b/cloudvolume/datasource/precomputed/image/rx.py index fdd5004a..ba90b0b2 100644 --- a/cloudvolume/datasource/precomputed/image/rx.py +++ b/cloudvolume/datasource/precomputed/image/rx.py @@ -562,6 +562,7 @@ def process(src_img, src_bbox): green=green, secrets=secrets, background_color=background_color ) + del dest_img array_like.close() return len(cloudpaths) diff --git a/cloudvolume/datasource/precomputed/image/tx.py b/cloudvolume/datasource/precomputed/image/tx.py index 3c361a8d..f7dca21d 100644 --- a/cloudvolume/datasource/precomputed/image/tx.py +++ b/cloudvolume/datasource/precomputed/image/tx.py @@ -205,17 +205,19 @@ def upload_aligned( secrets=secrets ) - parallel_execution( - cup, chunk_ranges, parallel, - progress, desc="Upload", - cleanup_shm=location - ) - - # If manual mode is enabled, it's the - # responsibilty of the user to clean up - if not use_shared_memory: - array_like.close() - shm.unlink(location) + try: + parallel_execution( + cup, chunk_ranges, parallel, + progress, desc="Upload", + cleanup_shm=location + ) + finally: + # If manual mode is enabled, it's the + # responsibilty of the user to clean up + if not use_shared_memory: + del renderbuffer + array_like.close() + shm.unlink(location) def child_upload_process( meta, cache, @@ -267,6 +269,7 @@ def updatefn(): secrets=secrets, ) finally: + del renderbuffer array_like.close() def threaded_upload_chunks( diff --git a/cloudvolume/frontends/precomputed.py b/cloudvolume/frontends/precomputed.py index 8d3f7bba..a1cb8a34 100644 --- a/cloudvolume/frontends/precomputed.py +++ b/cloudvolume/frontends/precomputed.py @@ -1127,6 +1127,8 @@ def upload_from_shared_memory(self, location, bbox, order='F', cutout_bbox=None) order=order, use_shared_memory=True, ) + del cutout_image + del shared_image mmap_handle.close() def upload_from_file(self, location, bbox, order='F', cutout_bbox=None): @@ -1188,6 +1190,7 @@ def upload_from_file(self, location, bbox, order='F', cutout_bbox=None): order=order, use_file=True, ) + del shared_image mmap_handle.close() def viewer(self, port=1337): diff --git a/cloudvolume/sharedmemory.py b/cloudvolume/sharedmemory.py index 79aaee56..35efb8d2 100644 --- a/cloudvolume/sharedmemory.py +++ b/cloudvolume/sharedmemory.py @@ -127,8 +127,7 @@ def allocate_shm_file(filename, nbytes, dbytes, readonly): def ndarray_shm(shape, dtype, location, readonly=False, order='F', **kwargs): """Create a shared memory numpy array. Requires /dev/shm to exist.""" - import posix_ipc - from posix_ipc import O_CREAT + from multiprocessing import shared_memory import psutil nbytes = Vec(*shape).rectVolume() * np.dtype(dtype).itemsize @@ -170,21 +169,26 @@ def ndarray_shm(shape, dtype, location, readonly=False, order='F', **kwargs): # a threading condition where the condition of the shared memory # was adjusted between the check above and now. Better to make sure # that we don't accidently change anything if readonly is set. - flags = 0 if readonly else O_CREAT size = 0 if readonly else int(nbytes) + create = (not readonly) and (not preexisting) try: - shared = posix_ipc.SharedMemory(location, flags=flags, size=size) - array_like = mmap.mmap(shared.fd, shared.size) - os.close(shared.fd) - renderbuffer = np.ndarray(buffer=array_like, dtype=dtype, shape=shape, order=order, **kwargs) + shm = shared_memory.SharedMemory(name=location, create=create, size=size) + renderbuffer = np.frombuffer(buffer=shm.buf, dtype=dtype) + renderbuffer = renderbuffer.reshape(shape, order=order) + except FileExistsError: + if not readonly: + raise + shm = shared_memory.SharedMemory(name=location, create=False, size=size) + renderbuffer = np.frombuffer(buffer=shm.buf, dtype=dtype) + renderbuffer = renderbuffer.reshape(shape, order=order) except OSError as err: if err.errno == errno.ENOMEM: # Out of Memory - posix_ipc.unlink_shared_memory(location) + unlink_shm(location) raise renderbuffer.setflags(write=(not readonly)) - return array_like, renderbuffer + return shm, renderbuffer def unlink(location): if EMULATE_SHM: @@ -192,10 +196,11 @@ def unlink(location): return unlink_shm(location) def unlink_shm(location): - import posix_ipc + from multiprocessing import shared_memory try: - posix_ipc.unlink_shared_memory(location) - except posix_ipc.ExistentialError: + shm = shared_memory.SharedMemory(name=location, create=False) + shm.unlink() + except FileNotFoundError: return False return True diff --git a/setup.py b/setup.py index 6e23ea6f..ae506f54 100644 --- a/setup.py +++ b/setup.py @@ -56,7 +56,6 @@ def requirements(): "blosc", ], ':sys_platform!="win32"': [ - "posix_ipc>=1.0.4", "psutil>=5.4.3", ], "mesh_viewer": [ 'vtk' ], diff --git a/test/test_cloudvolume.py b/test/test_cloudvolume.py index 5bfa25ff..78fa327f 100644 --- a/test/test_cloudvolume.py +++ b/test/test_cloudvolume.py @@ -470,6 +470,7 @@ def test_parallel_shared_memory_write(): assert np.all(cv[0,0,:] == 1) assert np.all(cv[1,0,:] == 0) + del shareddata mmapfh.close() shm.unlink(shm_location) diff --git a/test/test_sharedmemory.py b/test/test_sharedmemory.py index e1a61a90..3d93cfc1 100644 --- a/test/test_sharedmemory.py +++ b/test/test_sharedmemory.py @@ -68,10 +68,12 @@ def test_ndarray_sh(): array_like, array = shm.ndarray_shm(shape=(2,2,2), dtype=np.uint8, location=location) assert np.all(array == np.zeros(shape=(2,2,2), dtype=np.uint8)) array[:] = 100 + del array array_like.close() array_like, array = shm.ndarray_shm(shape=(2,2,2), dtype=np.uint8, location=location) assert np.all(array[:] == 100) + del array array_like.close() filename = os.path.join(shm.SHM_DIRECTORY, location) @@ -86,7 +88,9 @@ def test_ndarray_sh(): available = psutil.virtual_memory().available array_like, array = shm.ndarray_shm(shape=(available // 10,2,2), dtype=np.uint8, location=location) + del array array_like.close() + try: array_like, array = shm.ndarray_shm(shape=(available,2,2), dtype=np.uint8, location=location) assert False @@ -109,5 +113,7 @@ def test_ndarray_sh(): except shm.SharedMemoryReadError: pass + del array + array_like.close() assert shm.unlink_shm(location) == True assert shm.unlink_shm(location) == False