diff --git a/python/ndstorage/ndtiff_dataset.py b/python/ndstorage/ndtiff_dataset.py index fe5a9dc..edd0301 100644 --- a/python/ndstorage/ndtiff_dataset.py +++ b/python/ndstorage/ndtiff_dataset.py @@ -21,7 +21,7 @@ class NDTiffDataset(NDStorageBase, WritableNDStorageAPI): """ def __init__(self, dataset_path=None, file_io: NDTiffFileIO = BUILTIN_FILE_IO, summary_metadata=None, - name=None, writable=False, **kwargs): + name=None, writable=False, pixel_compression = 1, **kwargs): """ Provides access to an NDTiffStorage dataset, either one currently being acquired or one on disk @@ -44,10 +44,15 @@ def __init__(self, dataset_path=None, file_io: NDTiffFileIO = BUILTIN_FILE_IO, s self.file_io = file_io self._lock = threading.RLock() + self._put_image_lock = threading.Lock() if writable: self.major_version = MAJOR_VERSION self.minor_version = MINOR_VERSION self._index_file = None + if pixel_compression in [1,8]: + self._pixel_compression = pixel_compression + else: + raise ValueError("Compression scheme must be 1 (No) or 8 (zlib)") if summary_metadata is not None or writable: # this dataset is either: # - a view of an active acquisition. Image data is being written by code on the Java side @@ -165,10 +170,18 @@ def read_metadata(self, channel=None, z=None, time=None, position=None, row=None return self._do_read_metadata(axes) - def put_image(self, coordinates, image, metadata): + def put_image(self, coordinates, image, metadata, pixel_compression = 0): + # wait for put_image to finish before calling it again. + self._put_image_lock.acquire() if not self._writable: raise RuntimeError("Cannot write to a read-only dataset") + if pixel_compression == 0: + pixel_compression = self._pixel_compression + elif not pixel_compression in [1,8]: + warnings.warn(f"Pixel compression {pixel_compression}: only 1 (no compression) and 8 (zlib) are supported. Using {self._pixel_compression}.") + pixel_compression = self._pixel_compression + # add to write pending images self._write_pending_images[frozenset(coordinates.items())] = (image, metadata) @@ -184,7 +197,7 @@ def put_image(self, coordinates, image, metadata): filename = 'NDTiffStack.tif' if self.name is not None: filename = self.name + '_' + filename - self.current_writer = SingleNDTiffWriter(self.path, filename, self._summary_metadata) + self.current_writer = SingleNDTiffWriter(self.path, filename, self._summary_metadata, self._pixel_compression) self.file_index += 1 # create the index file self._index_file = open(os.path.join(self.path, "NDTiff.index"), "wb") @@ -193,16 +206,17 @@ def put_image(self, coordinates, image, metadata): filename = 'NDTiffStack_{}.tif'.format(self.file_index) if self.name is not None: filename = self.name + '_' + filename - self.current_writer = SingleNDTiffWriter(self.path, filename, self._summary_metadata) + self.current_writer = SingleNDTiffWriter(self.path, filename, self._summary_metadata, self._pixel_compression) self.file_index += 1 - index_data_entry = self.current_writer.write_image(frozenset(coordinates.items()), image, metadata) + index_data_entry = self.current_writer.write_image(frozenset(coordinates.items()), image, metadata, pixel_compression = pixel_compression) # create readers and update axes self.add_index_entry(index_data_entry, new_image_updates=False) # write the index to disk self._index_file.write(index_data_entry.as_byte_buffer().getvalue()) # remove from pending images del self._write_pending_images[frozenset(coordinates.items())] + self._put_image_lock.release() def finish(self): if self.current_writer is not None: @@ -258,7 +272,11 @@ def add_index_entry(self, data, new_image_updates=True): self.index[frozenset(image_coordinates.items())] = index_entry if index_entry.filename not in self._readers_by_filename: - new_reader = SingleNDTiffReader(os.path.join(self.path, index_entry.filename), file_io=self.file_io) + # prevent new reader object when writing: + if self._writable and self.current_writer.filename.split(os.sep)[-1] == index_entry.filename: + new_reader = self.current_writer.reader + else: + new_reader = SingleNDTiffReader(os.path.join(self.path, index_entry.filename), file_io=self.file_io) self._readers_by_filename[index_entry.filename] = new_reader # Should be the same on every file so resetting them is fine self.major_version, self.minor_version = new_reader.major_version, new_reader.minor_version diff --git a/python/ndstorage/ndtiff_file.py b/python/ndstorage/ndtiff_file.py index 5fcac31..552e6eb 100644 --- a/python/ndstorage/ndtiff_file.py +++ b/python/ndstorage/ndtiff_file.py @@ -5,6 +5,8 @@ import time import struct import warnings +import mmap +import zlib from collections import OrderedDict from io import BytesIO from .file_io import NDTiffFileIO, BUILTIN_FILE_IO @@ -49,7 +51,7 @@ class SingleNDTiffWriter: - def __init__(self, directory, filename, summary_md): + def __init__(self, directory, filename, summary_md, pixel_compression = 1): self.filename = os.path.join(directory, filename) self.index_map = {} self.next_ifd_offset_location = -1 @@ -59,10 +61,15 @@ def __init__(self, directory, filename, summary_md): self.buffers = deque() self.first_ifd = True - self.start_time = None + if pixel_compression in [1, 8]: + self.pixel_compression = pixel_compression + else: + raise ValueError("Invalid pixel compression, only 1 (no compression) and 8 (zlib) are supported") + self.start_time = None + os.makedirs(directory, exist_ok=True) - # pre-allocate the file + # pre-allocate the file file_path = os.path.join(directory, filename) with open(file_path, 'wb') as f: f.seek(MAX_FILE_SIZE - 1) @@ -132,12 +139,12 @@ def _write_null_offset_after_last_image(self): self.file.write(buffer) self.file.seek(current_pos) - def write_image(self, index_key, pixels, metadata, bit_depth='auto'): + def write_image(self, index_key, pixels, metadata, bit_depth='auto', pixel_compression = 0): """ Write an image to the file Parameters - ---------- + ---------- index_key : frozenset The key to index the image pixels : np.ndarray or bytearray @@ -152,14 +159,25 @@ def write_image(self, index_key, pixels, metadata, bit_depth='auto'): NDTiffIndexEntry The index entry for the image """ + if pixel_compression == 0: + pixel_compression = self.pixel_compression + image_height, image_width = pixels.shape rgb = pixels.ndim == 3 and pixels.shape[2] == 3 + + if rgb and pixel_compression in [8]: + warnings.warn(f"Pixel compression {pixel_compression} is not supported for RGB images. Using no compression.") + pixel_compression = 1 + if not pixel_compression in [1,8]: + warnings.warn(f"Invalid pixel compression {pixel_compression}: only 1 (no compression) and 8 (zlib) are supported. Using 1 (no compression).") + pixel_compression = 1 + if bit_depth == 'auto': bit_depth = 8 if pixels.dtype == np.uint8 else 16 # if metadata is a dict, serialize it to a json string and make it a utf8 byte buffer if isinstance(metadata, dict): metadata = self._get_bytes_from_string(json.dumps(metadata)) - ied = self._write_ifd(index_key, pixels, metadata, rgb, image_height, image_width, bit_depth) + ied = self._write_ifd(index_key, pixels, metadata, rgb, image_height, image_width, bit_depth, pixel_compression) while self.buffers: self.file.write(self.buffers.popleft()) # make sure the file is flushed to disk @@ -168,12 +186,26 @@ def write_image(self, index_key, pixels, metadata, bit_depth='auto'): return ied - def _write_ifd(self, index_key, pixels, metadata, rgb, image_height, image_width, bit_depth): + def _write_ifd(self, index_key, pixels, metadata, rgb, image_height, image_width, bit_depth, pixel_compression): if self.file.tell() % 2 == 1: self.file.seek(self.file.tell() + 1) # Make IFD start on word - byte_depth = 1 if isinstance(pixels, bytearray) else 2 - bytes_per_image_pixels = self._bytes_per_image_pixels(pixels, rgb) + if isinstance(pixels, bytearray): + byte_depth = 1 + # if the pixel object is a numpy array, it is type of + # when using np_array.tobytes it is + # therefore taking the the bit_depth information "pixels.dtype" into account + elif bit_depth == 8: + byte_depth = 1 + else: + byte_depth = 2 + + if pixel_compression == 8: + compressed_pixels = zlib.compress(pixels) + bytes_per_image_pixels = len(compressed_pixels) + else: + bytes_per_image_pixels = self._bytes_per_image_pixels(pixels, rgb) + num_entries = 13 # 2 bytes for number of directory entries, 12 bytes per directory entry, 4 byte offset of next IFD @@ -202,7 +234,7 @@ def _write_ifd(self, index_key, pixels, metadata, rgb, image_height, image_width buffer_position += self._write_ifd_entry(ifd_and_small_vals_buffer, HEIGHT, 4, 1, image_height, buffer_position) buffer_position += self._write_ifd_entry(ifd_and_small_vals_buffer, BITS_PER_SAMPLE, 3, 3 if rgb else 1, bits_per_sample_offset if rgb else byte_depth * 8, buffer_position) - buffer_position += self._write_ifd_entry(ifd_and_small_vals_buffer, COMPRESSION, 3, 1, 1, buffer_position) + buffer_position += self._write_ifd_entry(ifd_and_small_vals_buffer, COMPRESSION, 3, 1, pixel_compression, buffer_position) buffer_position += self._write_ifd_entry(ifd_and_small_vals_buffer, PHOTOMETRIC_INTERPRETATION, 3, 1, 2 if rgb else 1, buffer_position) buffer_position += self._write_ifd_entry(ifd_and_small_vals_buffer, STRIP_OFFSETS, 4, 1, pixel_data_offset, @@ -235,7 +267,10 @@ def _write_ifd(self, index_key, pixels, metadata, rgb, image_height, image_width buffer_position += 8 self.buffers.append(ifd_and_small_vals_buffer) - self.buffers.append(self._get_pixel_buffer(pixels, rgb)) + if pixel_compression in [8]: + self.buffers.append(compressed_pixels) + else: + self.buffers.append(self._get_pixel_buffer(pixels, rgb)) self.buffers.append(metadata) self.first_ifd = False @@ -251,7 +286,7 @@ def _write_ifd(self, index_key, pixels, metadata, rgb, image_height, image_width }.get(bit_depth, NDTiffIndexEntry.EIGHT_BIT_RGB if rgb else None) return NDTiffIndexEntry(index_key, pixel_type, pixel_data_offset, image_width, image_height, metadata_offset, - len(metadata), self.filename.split(os.sep)[-1]) + len(metadata), self.filename.split(os.sep)[-1], pixel_compression) def _write_ifd_entry(self, buffer, tag, dtype, count, value, buffer_position): struct.pack_into('= 3, "For test_write_multithreaded_zlib Python >= 3.13 is recommended" + #assert sys.version_info[1] >= 13, "For test_write_multithreaded_zlib Python >= 3.13 is recommended" + + full_path = os.path.join(test_data_path, 'test_write_full_dataset') + dataset = NDTiffDataset(full_path, summary_metadata={}, writable=True, pixel_compression=8) + image_deque = deque() + run_event = threading.Event() + run_event.set() + + image_height = 256 + image_width = 256 + + thread = threading.Thread(target=image_write_loop, args=(image_deque, dataset, run_event)) + thread.start() + + time_counter = 0 + time_limit = 10 + + while True: + if len(image_deque) < 4: + pixels = np.ones(image_height * image_width, dtype=np.uint16).reshape((image_height, image_width)) * time_counter + image_deque.append((time_counter, pixels)) + time_counter += 1 + if time_counter >= time_limit: + break + else: + time.sleep(0.001) # if the deque is full, wait a bit + + run_event.clear() + thread.join() + dataset.finish() + + # read the file back in + dataset = NDTiffDataset(full_path) + for time_index in range(time_limit): + pixels = np.ones(image_height * image_width, dtype=np.uint16).reshape((image_height, image_width)) * time_index + axes = {'time': time_index} + read_image = dataset.read_image(**axes) + assert np.all(read_image == pixels) + assert dataset.read_metadata(**axes) == {'time_metadata': time_index} +