Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
708987f
refactor: drop posix_ipc and use native python shared_memory
william-silversmith Apr 9, 2025
51a972f
fix: missing import
william-silversmith Apr 9, 2025
29e645c
fix: track is not well supported
william-silversmith Apr 9, 2025
25a6015
fix: incorrect name for buffer attribute
william-silversmith Apr 9, 2025
0a68c6a
fix: return shm instead of old array_like
william-silversmith Apr 9, 2025
35b5910
fix: try deleting shared image first
william-silversmith Apr 9, 2025
5030279
fix: try just letting __del__ handle things
william-silversmith Apr 9, 2025
c27b9c1
fix: try just deleting the shared array
william-silversmith Apr 9, 2025
f19b523
fix: delete renderbuffer before calling close
william-silversmith Apr 9, 2025
a1b035d
fix: ensure shm is closed
william-silversmith Apr 9, 2025
2291fbc
fix: delete renderbuffer in child process
william-silversmith Apr 9, 2025
5188cb8
fix: ensure all views are closed
william-silversmith Sep 29, 2025
d20f3e6
fix: ensure fs_lock is created in a spawn context
william-silversmith Sep 29, 2025
d0d3a10
fix: use shmloc not bare location
william-silversmith Sep 29, 2025
48e4586
Revert "fix: use shmloc not bare location"
william-silversmith Sep 29, 2025
2df2fbc
fix: check for preexisting
william-silversmith Sep 29, 2025
3448295
fixtest: delete hanging reference
william-silversmith Sep 29, 2025
12b903b
fix: handle file exists
william-silversmith Sep 29, 2025
42ee492
fixtest: remove dangling references
william-silversmith Sep 29, 2025
8977a4e
fixtest: remove other references to array before closing
william-silversmith Sep 29, 2025
b6e1c84
fix: delete dangling reference
william-silversmith Sep 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions cloudvolume/datasource/precomputed/image/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,17 @@ def parallel_execution(
):
global error_queue

error_queue = mp.Queue()
progress_queue = mp.Queue()
fs_lock = mp.Lock()
# Ensure processes do not accidentally inherit
# locks from forking and deadlock. Instead launch
# new interpreters.
ctx = mp.get_context("spawn")

error_queue = ctx.Queue()
progress_queue = ctx.Queue()
fs_lock = ctx.Lock()

if parallel is True:
parallel = mp.cpu_count()
parallel = ctx.cpu_count()
elif parallel <= 0:
raise ValueError(f"Parallel must be a positive number or boolean (True: all cpus). Got: {parallel}")

Expand Down Expand Up @@ -104,11 +109,6 @@ def cleanup(signum, frame):
)
proc.start()

# Ensure processes do not accidentally inherit
# locks from forking and deadlock. Instead launch
# new interpreters.
ctx = mp.get_context("spawn")

with concurrent.futures.ProcessPoolExecutor(
max_workers=parallel,
initializer=initialize_synchronization,
Expand Down
1 change: 1 addition & 0 deletions cloudvolume/datasource/precomputed/image/rx.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ def process(src_img, src_bbox):
green=green, secrets=secrets, background_color=background_color
)

del dest_img
array_like.close()

return len(cloudpaths)
Expand Down
25 changes: 14 additions & 11 deletions cloudvolume/datasource/precomputed/image/tx.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,17 +205,19 @@ def upload_aligned(
secrets=secrets
)

parallel_execution(
cup, chunk_ranges, parallel,
progress, desc="Upload",
cleanup_shm=location
)

# If manual mode is enabled, it's the
# responsibilty of the user to clean up
if not use_shared_memory:
array_like.close()
shm.unlink(location)
try:
parallel_execution(
cup, chunk_ranges, parallel,
progress, desc="Upload",
cleanup_shm=location
)
finally:
# If manual mode is enabled, it's the
# responsibilty of the user to clean up
if not use_shared_memory:
del renderbuffer
array_like.close()
shm.unlink(location)

def child_upload_process(
meta, cache,
Expand Down Expand Up @@ -267,6 +269,7 @@ def updatefn():
secrets=secrets,
)
finally:
del renderbuffer
array_like.close()

def threaded_upload_chunks(
Expand Down
3 changes: 3 additions & 0 deletions cloudvolume/frontends/precomputed.py
Original file line number Diff line number Diff line change
Expand Up @@ -1127,6 +1127,8 @@ def upload_from_shared_memory(self, location, bbox, order='F', cutout_bbox=None)
order=order,
use_shared_memory=True,
)
del cutout_image
del shared_image
mmap_handle.close()

def upload_from_file(self, location, bbox, order='F', cutout_bbox=None):
Expand Down Expand Up @@ -1188,6 +1190,7 @@ def upload_from_file(self, location, bbox, order='F', cutout_bbox=None):
order=order,
use_file=True,
)
del shared_image
mmap_handle.close()

def viewer(self, port=1337):
Expand Down
29 changes: 17 additions & 12 deletions cloudvolume/sharedmemory.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ def allocate_shm_file(filename, nbytes, dbytes, readonly):

def ndarray_shm(shape, dtype, location, readonly=False, order='F', **kwargs):
"""Create a shared memory numpy array. Requires /dev/shm to exist."""
import posix_ipc
from posix_ipc import O_CREAT
from multiprocessing import shared_memory
import psutil

nbytes = Vec(*shape).rectVolume() * np.dtype(dtype).itemsize
Expand Down Expand Up @@ -170,32 +169,38 @@ def ndarray_shm(shape, dtype, location, readonly=False, order='F', **kwargs):
# a threading condition where the condition of the shared memory
# was adjusted between the check above and now. Better to make sure
# that we don't accidently change anything if readonly is set.
flags = 0 if readonly else O_CREAT
size = 0 if readonly else int(nbytes)
create = (not readonly) and (not preexisting)

try:
shared = posix_ipc.SharedMemory(location, flags=flags, size=size)
array_like = mmap.mmap(shared.fd, shared.size)
os.close(shared.fd)
renderbuffer = np.ndarray(buffer=array_like, dtype=dtype, shape=shape, order=order, **kwargs)
shm = shared_memory.SharedMemory(name=location, create=create, size=size)
renderbuffer = np.frombuffer(buffer=shm.buf, dtype=dtype)
renderbuffer = renderbuffer.reshape(shape, order=order)
except FileExistsError:
if not readonly:
raise
shm = shared_memory.SharedMemory(name=location, create=False, size=size)
renderbuffer = np.frombuffer(buffer=shm.buf, dtype=dtype)
renderbuffer = renderbuffer.reshape(shape, order=order)
except OSError as err:
if err.errno == errno.ENOMEM: # Out of Memory
posix_ipc.unlink_shared_memory(location)
unlink_shm(location)
raise

renderbuffer.setflags(write=(not readonly))
return array_like, renderbuffer
return shm, renderbuffer

def unlink(location):
if EMULATE_SHM:
return unlink_fs(location)
return unlink_shm(location)

def unlink_shm(location):
import posix_ipc
from multiprocessing import shared_memory
try:
posix_ipc.unlink_shared_memory(location)
except posix_ipc.ExistentialError:
shm = shared_memory.SharedMemory(name=location, create=False)
shm.unlink()
except FileNotFoundError:
return False
return True

Expand Down
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ def requirements():
"blosc",
],
':sys_platform!="win32"': [
"posix_ipc>=1.0.4",
"psutil>=5.4.3",
],
"mesh_viewer": [ 'vtk' ],
Expand Down
1 change: 1 addition & 0 deletions test/test_cloudvolume.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ def test_parallel_shared_memory_write():
assert np.all(cv[0,0,:] == 1)
assert np.all(cv[1,0,:] == 0)

del shareddata
mmapfh.close()
shm.unlink(shm_location)

Expand Down
6 changes: 6 additions & 0 deletions test/test_sharedmemory.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,12 @@ def test_ndarray_sh():
array_like, array = shm.ndarray_shm(shape=(2,2,2), dtype=np.uint8, location=location)
assert np.all(array == np.zeros(shape=(2,2,2), dtype=np.uint8))
array[:] = 100
del array
array_like.close()

array_like, array = shm.ndarray_shm(shape=(2,2,2), dtype=np.uint8, location=location)
assert np.all(array[:] == 100)
del array
array_like.close()

filename = os.path.join(shm.SHM_DIRECTORY, location)
Expand All @@ -86,7 +88,9 @@ def test_ndarray_sh():

available = psutil.virtual_memory().available
array_like, array = shm.ndarray_shm(shape=(available // 10,2,2), dtype=np.uint8, location=location)
del array
array_like.close()

try:
array_like, array = shm.ndarray_shm(shape=(available,2,2), dtype=np.uint8, location=location)
assert False
Expand All @@ -109,5 +113,7 @@ def test_ndarray_sh():
except shm.SharedMemoryReadError:
pass

del array
array_like.close()
assert shm.unlink_shm(location) == True
assert shm.unlink_shm(location) == False