From e755fa789827cdf28f78aeac21c23e267e87242a Mon Sep 17 00:00:00 2001 From: Ryan Abernathey Date: Fri, 12 Jul 2024 18:10:05 -0400 Subject: [PATCH 1/2] add ArrowRecordBatchCodec --- pyproject.toml | 1 + src/zarr/buffer.py | 5 +- src/zarr/codecs/__init__.py | 2 + src/zarr/codecs/arrow.py | 81 ++++++++++++++++++++++++++++++ tests/v3/test_codecs/test_arrow.py | 57 +++++++++++++++++++++ 5 files changed, 145 insertions(+), 1 deletion(-) create mode 100644 src/zarr/codecs/arrow.py create mode 100644 tests/v3/test_codecs/test_arrow.py diff --git a/pyproject.toml b/pyproject.toml index f1be6725b6..2039fa6568 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -109,6 +109,7 @@ dependencies = [ "universal_pathlib" ] extra-dependencies = [ + "pyarrow", "coverage", "pytest", "pytest-cov", diff --git a/src/zarr/buffer.py b/src/zarr/buffer.py index 86f9b53477..bec992508e 100644 --- a/src/zarr/buffer.py +++ b/src/zarr/buffer.py @@ -283,7 +283,10 @@ class NDBuffer: def __init__(self, array: NDArrayLike): # assert array.ndim > 0 - assert array.dtype != object + + # Commented this out because string arrays have dtype object + # TODO: decide how to handle strings (e.g. numpy 2.0 StringDtype) + # assert array.dtype != object self._data = array @classmethod diff --git a/src/zarr/codecs/__init__.py b/src/zarr/codecs/__init__.py index 9394284319..a7ff229c29 100644 --- a/src/zarr/codecs/__init__.py +++ b/src/zarr/codecs/__init__.py @@ -1,5 +1,6 @@ from __future__ import annotations +from zarr.codecs.arrow import ArrowRecordBatchCodec from zarr.codecs.blosc import BloscCname, BloscCodec, BloscShuffle from zarr.codecs.bytes import BytesCodec, Endian from zarr.codecs.crc32c_ import Crc32cCodec @@ -10,6 +11,7 @@ from zarr.codecs.zstd import ZstdCodec __all__ = [ + "ArrowRecordBatchCodec", "BatchedCodecPipeline", "BloscCname", "BloscCodec", diff --git a/src/zarr/codecs/arrow.py b/src/zarr/codecs/arrow.py new file mode 100644 index 0000000000..4240c00d43 --- /dev/null +++ b/src/zarr/codecs/arrow.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING + +import pyarrow as pa + +from zarr.abc.codec import ArrayBytesCodec +from zarr.array_spec import ArraySpec +from zarr.buffer import Buffer, NDBuffer +from zarr.codecs.registry import register_codec +from zarr.common import JSON, parse_named_configuration + +if TYPE_CHECKING: + from typing_extensions import Self + +CHUNK_FIELD_NAME = "zarr_chunk" + + +@dataclass(frozen=True) +class ArrowRecordBatchCodec(ArrayBytesCodec): + def __init__(self) -> None: + pass + + @classmethod + def from_dict(cls, data: dict[str, JSON]) -> Self: + _, configuration_parsed = parse_named_configuration( + data, "arrow", require_configuration=False + ) + configuration_parsed = configuration_parsed or {} + return cls(**configuration_parsed) + + def to_dict(self) -> dict[str, JSON]: + return {"name": "arrow"} + + def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: + return self + + async def _decode_single( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> NDBuffer: + assert isinstance(chunk_bytes, Buffer) + + # TODO: make this compatible with buffer prototype + arrow_buffer = pa.py_buffer(chunk_bytes.to_bytes()) + with pa.ipc.open_stream(arrow_buffer) as reader: + batches = [b for b in reader] + assert len(batches) == 1 + arrow_array = batches[0][CHUNK_FIELD_NAME] + chunk_array = chunk_spec.prototype.nd_buffer.from_ndarray_like( + arrow_array.to_numpy(zero_copy_only=False) + ) + + # ensure correct chunk shape + if chunk_array.shape != chunk_spec.shape: + chunk_array = chunk_array.reshape( + chunk_spec.shape, + ) + return chunk_array + + async def _encode_single( + self, + chunk_array: NDBuffer, + chunk_spec: ArraySpec, + ) -> Buffer | None: + assert isinstance(chunk_array, NDBuffer) + arrow_array = pa.array(chunk_array.as_ndarray_like().ravel()) + rb = pa.record_batch([arrow_array], names=[CHUNK_FIELD_NAME]) + # TODO: allocate buffer differently + sink = pa.BufferOutputStream() + with pa.ipc.new_stream(sink, rb.schema) as writer: + writer.write_batch(rb) + return chunk_spec.prototype.buffer.from_bytes(memoryview(sink.getvalue())) + + def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int: + raise ValueError("Don't know how to compute encoded size!") + + +register_codec("arrow", ArrowRecordBatchCodec) diff --git a/tests/v3/test_codecs/test_arrow.py b/tests/v3/test_codecs/test_arrow.py new file mode 100644 index 0000000000..de1113ef25 --- /dev/null +++ b/tests/v3/test_codecs/test_arrow.py @@ -0,0 +1,57 @@ +import numpy as np +import pytest + +from zarr.abc.store import Store +from zarr.array import Array +from zarr.codecs import ArrowRecordBatchCodec +from zarr.store.core import StorePath + + +@pytest.mark.parametrize("store", ("local", "memory"), indirect=["store"]) +@pytest.mark.parametrize( + "dtype", + [ + "uint8", + "uint16", + "uint32", + "uint64", + "int8", + "int16", + "int32", + "int64", + "float32", + "float64", + ], +) +def test_arrow_standard_dtypes(store: Store, dtype) -> None: + data = np.arange(0, 256, dtype=dtype).reshape((16, 16)) + + a = Array.create( + StorePath(store, path="arrow"), + shape=data.shape, + chunk_shape=(16, 16), + dtype=data.dtype, + fill_value=0, + codecs=[ArrowRecordBatchCodec()], + ) + + a[:, :] = data + assert np.array_equal(data, a[:, :]) + + +@pytest.mark.parametrize("store", ("local", "memory"), indirect=["store"]) +def test_arrow_vlen_string(store: Store) -> None: + strings = ["hello", "world", "this", "is", "a", "test"] + data = np.array(strings).reshape((2, 3)) + + a = Array.create( + StorePath(store, path="arrow"), + shape=data.shape, + chunk_shape=data.shape, + dtype=data.dtype, + fill_value=0, + codecs=[ArrowRecordBatchCodec()], + ) + + a[:, :] = data + assert np.array_equal(data, a[:, :]) From 62adc60cb2e27cd7622723152cf7308a2e289988 Mon Sep 17 00:00:00 2001 From: Ryan Abernathey Date: Fri, 12 Jul 2024 18:33:45 -0400 Subject: [PATCH 2/2] use memoryview instead of py_bytes --- src/zarr/codecs/arrow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/zarr/codecs/arrow.py b/src/zarr/codecs/arrow.py index 4240c00d43..1ac7128030 100644 --- a/src/zarr/codecs/arrow.py +++ b/src/zarr/codecs/arrow.py @@ -44,7 +44,7 @@ async def _decode_single( assert isinstance(chunk_bytes, Buffer) # TODO: make this compatible with buffer prototype - arrow_buffer = pa.py_buffer(chunk_bytes.to_bytes()) + arrow_buffer = memoryview(chunk_bytes.to_bytes()) with pa.ipc.open_stream(arrow_buffer) as reader: batches = [b for b in reader] assert len(batches) == 1