diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 4287b6c9..6cd96367 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -12,6 +12,8 @@ jobs: build_wheels: name: Build and test on ${{ matrix.os }}${{ matrix.numpy-version && format(' (numpy {0})', matrix.numpy-version) || '' }} runs-on: ${{ matrix.os }} + env: + CMAKE_GENERATOR: Ninja strategy: matrix: os: [ubuntu-latest, windows-latest, macos-latest] @@ -30,15 +32,65 @@ jobs: with: python-version: ${{ matrix.python-version }} + - name: Install sccache (Windows) + if: runner.os == 'Windows' + run: choco install sccache --yes + + - name: Cache sccache (Windows) + if: runner.os == 'Windows' + uses: actions/cache@v4 + with: + path: C:\Users\runneradmin\AppData\Local\sccache + key: sccache-${{ runner.os }}-${{ github.sha }} + restore-keys: | + sccache-${{ runner.os }}- + + - name: Cache pip (Windows) + if: runner.os == 'Windows' + uses: actions/cache@v4 + with: + path: C:\Users\runneradmin\AppData\Local\pip\Cache + key: pip-${{ runner.os }}-${{ hashFiles('pyproject.toml') }} + restore-keys: | + pip-${{ runner.os }}- + - name: Install Ninja uses: seanmiddleditch/gha-setup-ninja@master + - name: Add LLVM to PATH (Windows) + if: runner.os == 'Windows' + run: echo "C:\\Program Files\\LLVM\\bin" >> $env:GITHUB_PATH + - name: Install specific numpy version if: matrix.numpy-version run: pip install "numpy==${{ matrix.numpy-version }}.*" - - name: Build + - name: Build (Windows) + if: runner.os == 'Windows' + id: build_windows + run: pip install -e .[test] + env: + CMAKE_C_COMPILER_LAUNCHER: sccache + CMAKE_CXX_COMPILER_LAUNCHER: sccache + SCCACHE_DIR: C:\Users\runneradmin\AppData\Local\sccache + CC: clang-cl + CXX: clang-cl + CMAKE_BUILD_PARALLEL_LEVEL: 8 + SKBUILD_PARALLEL_LEVEL: 8 + + - name: Build (non-Windows) + if: runner.os != 'Windows' + id: build_non_windows run: pip install -e .[test] - - name: Test + - name: Test (Windows) + if: runner.os == 'Windows' + run: python -m pytest -m "not heavy and (network or not network)" +# env: +# BLOSC_NTHREADS: "1" +# NUMEXPR_NUM_THREADS: "1" +# OMP_NUM_THREADS: "1" + + - name: Test (non-Windows) + if: runner.os != 'Windows' run: python -m pytest -m "not heavy and (network or not network)" diff --git a/.github/workflows/cibuildwheels.yml b/.github/workflows/cibuildwheels.yml index f4bb64eb..3444946a 100644 --- a/.github/workflows/cibuildwheels.yml +++ b/.github/workflows/cibuildwheels.yml @@ -17,6 +17,11 @@ env: # Skip PyPy wheels for now (numexpr needs some adjustments first) # musllinux takes too long to build, and it's not worth it for now CIBW_SKIP: "pp* *musllinux* *-win32" + # Use explicit generator/compiler env vars; CMAKE_ARGS with spaces is not split on Windows. + CIBW_ENVIRONMENT_WINDOWS: >- + CMAKE_GENERATOR=Ninja + CC=clang-cl + CXX=clang-cl jobs: @@ -77,6 +82,10 @@ jobs: id: ninja uses: turtlesec-no/get-ninja@main + - name: Add LLVM to PATH (Windows) + if: ${{ matrix.os == 'windows-latest' }} + run: echo "C:\\Program Files\\LLVM\\bin" >> $env:GITHUB_PATH + - name: Install MSVC amd64 uses: ilammy/msvc-dev-cmd@v1 with: diff --git a/.github/workflows/wasm.yml b/.github/workflows/wasm.yml index ab008eea..8f98cbe3 100644 --- a/.github/workflows/wasm.yml +++ b/.github/workflows/wasm.yml @@ -21,6 +21,7 @@ jobs: env: CIBW_BUILD: ${{ matrix.cibw_build }} CMAKE_ARGS: "-DWITH_OPTIM=OFF" + DEACTIVATE_OPENZL: "1" CIBW_TEST_COMMAND: "pytest {project}/tests/ndarray/test_reductions.py" strategy: matrix: diff --git a/CMakeLists.txt b/CMakeLists.txt index 097ae709..74b8b067 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,22 @@ cmake_minimum_required(VERSION 3.15.0) + +if(WIN32) + cmake_policy(SET CMP0091 NEW) + set(CMAKE_MSVC_RUNTIME_LIBRARY "MultiThreaded$<$:Debug>DLL" CACHE STRING "" FORCE) +endif() + + +if(WIN32 AND CMAKE_GENERATOR MATCHES "Visual Studio") + if(NOT DEFINED CMAKE_GENERATOR_TOOLSET) + set(CMAKE_GENERATOR_TOOLSET "ClangCL" CACHE STRING "Use ClangCL toolset for C99/C11 support on Windows." FORCE) + endif() +endif() + project(python-blosc2) + +if(WIN32 AND NOT CMAKE_C_COMPILER_ID STREQUAL "Clang") + message(FATAL_ERROR "Windows builds require clang-cl. Set CC/CXX to clang-cl or configure CMake with -T ClangCL.") +endif() # Specifying Python version below is tricky, but if you don't specify the minimum version here, # it would not consider python3 when looking for the executable. This is problematic since Fedora # does not include a python symbolic link to python3. @@ -23,11 +40,55 @@ add_custom_command( "${CMAKE_CURRENT_SOURCE_DIR}/src/blosc2/blosc2_ext.pyx" --output-file blosc2_ext.c DEPENDS "${CMAKE_CURRENT_SOURCE_DIR}/src/blosc2/blosc2_ext.pyx" VERBATIM) + # ...and add it to the target Python_add_library(blosc2_ext MODULE blosc2_ext.c WITH_SOABI) + # We need to link against NumPy target_link_libraries(blosc2_ext PRIVATE Python::NumPy) +# Fetch and build miniexpr library +include(FetchContent) + +set(CMAKE_POSITION_INDEPENDENT_CODE ON) +set(MINIEXPR_BUILD_SHARED OFF CACHE BOOL "Build miniexpr shared library" FORCE) +set(MINIEXPR_BUILD_TESTS OFF CACHE BOOL "Build miniexpr tests" FORCE) +set(MINIEXPR_BUILD_EXAMPLES OFF CACHE BOOL "Build miniexpr examples" FORCE) +set(MINIEXPR_BUILD_BENCH OFF CACHE BOOL "Build miniexpr benchmarks" FORCE) + +FetchContent_Declare(miniexpr + GIT_REPOSITORY https://github.com/Blosc/miniexpr.git + GIT_TAG sleef # latest SIMD additions + # In case you want to use a local copy of miniexpr for development, uncomment the line below + # SOURCE_DIR "/Users/faltet/blosc/miniexpr" +) +FetchContent_MakeAvailable(miniexpr) + +# Link against miniexpr static library +target_link_libraries(blosc2_ext PRIVATE miniexpr_static) + +target_compile_features(blosc2_ext PRIVATE c_std_11) +if(WIN32 AND CMAKE_C_COMPILER_ID STREQUAL "Clang") + execute_process( + COMMAND "${CMAKE_C_COMPILER}" -print-resource-dir + OUTPUT_VARIABLE _clang_resource_dir + OUTPUT_STRIP_TRAILING_WHITESPACE + ERROR_QUIET + ) + if(_clang_resource_dir) + if(CMAKE_SIZEOF_VOID_P EQUAL 8) + set(_clang_builtins "${_clang_resource_dir}/lib/windows/clang_rt.builtins-x86_64.lib") + else() + set(_clang_builtins "${_clang_resource_dir}/lib/windows/clang_rt.builtins-i386.lib") + endif() + if(EXISTS "${_clang_builtins}") + target_link_libraries(blosc2_ext PRIVATE "${_clang_builtins}") + endif() + unset(_clang_builtins) + endif() + unset(_clang_resource_dir) +endif() + if(DEFINED ENV{USE_SYSTEM_BLOSC2}) set(USE_SYSTEM_BLOSC2 ON) endif() @@ -44,13 +105,20 @@ else() set(BUILD_EXAMPLES OFF CACHE BOOL "Build C-Blosc2 examples") set(BUILD_BENCHMARKS OFF CACHE BOOL "Build C-Blosc2 benchmarks") set(BUILD_FUZZERS OFF CACHE BOOL "Build C-Blosc2 fuzzers") + if(DEFINED ENV{DEACTIVATE_OPENZL}) + set(DEACTIVATE_OPENZL ON CACHE BOOL "Do not include support for the OpenZL library.") + endif() set(CMAKE_POSITION_INDEPENDENT_CODE ON) # we want the binaries of the C-Blosc2 library to go into the wheels set(BLOSC_INSTALL ON) include(FetchContent) FetchContent_Declare(blosc2 GIT_REPOSITORY https://github.com/Blosc/c-blosc2 - GIT_TAG 5a2b0ed9c4d801230c118fbc5811817055b5a3f5 # v2.22.0 + # GIT_TAG 9d250c2201f6e385c56a372b08037f7debc6fa1b # openzl (disposable output) + GIT_TAG add_openzl + GIT_SHALLOW TRUE # fetch only the latest commit (only works with a branch in GIT_TAG) + # in case you want to use a local copy of c-blosc2 for development, uncomment the line below + # SOURCE_DIR "/Users/faltet/blosc/c-blosc2" ) FetchContent_MakeAvailable(blosc2) include_directories("${blosc2_SOURCE_DIR}/include") diff --git a/README.rst b/README.rst index 3d442419..7480d64f 100644 --- a/README.rst +++ b/README.rst @@ -53,6 +53,17 @@ Conda users can install from conda-forge: conda install -c conda-forge python-blosc2 +Windows note +============ + +When building from source on Windows, clang-cl is required (OpenZL depends on C11 support). +Make sure LLVM is on PATH and use the Ninja generator, for example:: + + CMAKE_GENERATOR=Ninja + CC=clang-cl + CXX=clang-cl + pip install -e . + Documentation ============= diff --git a/README_DEVELOPERS.md b/README_DEVELOPERS.md index 5c2ee0cc..acf8493a 100644 --- a/README_DEVELOPERS.md +++ b/README_DEVELOPERS.md @@ -22,6 +22,16 @@ You are done! pip install . # add -e for editable mode ``` +On Windows, clang-cl is required (OpenZL depends on C11 support). Make sure LLVM +is on PATH and build with Ninja, for example: + +```bash +CMAKE_GENERATOR=Ninja \ +CC=clang-cl \ +CXX=clang-cl \ +pip install -e . +``` + There are situations where you may want to build the C-Blosc2 library separately, for example, when debugging issues in the C library. In that case, let's assume you have the C-Blosc2 library installed in `/usr/local`: ```bash @@ -38,6 +48,37 @@ LD_LIBRARY_PATH=/usr/local/lib pytest That's it! You can now proceed to the testing section. +### Speeding up local builds (sccache + Ninja) + +If you do frequent local rebuilds, sccache can significantly speed up C/C++ rebuilds. + +```bash +brew install sccache ninja +``` + +Then run: + +```bash +CMAKE_GENERATOR=Ninja \ +CMAKE_C_COMPILER=clang \ +CMAKE_CXX_COMPILER=clang++ \ +CMAKE_C_COMPILER_LAUNCHER=sccache \ +CMAKE_CXX_COMPILER_LAUNCHER=sccache \ +CMAKE_BUILD_PARALLEL_LEVEL=8 \ +SKBUILD_PARALLEL_LEVEL=8 \ +SKBUILD_BUILD_DIR=build \ +pip install -e . +``` + +Using `SKBUILD_BUILD_DIR` keeps a stable build directory between runs, which +improves incremental rebuilds and sccache hit rates. + +Check cache stats with: + +```bash +sccache --show-stats +``` + ## Testing We are using pytest for testing. You can run the tests by executing diff --git a/bench/ndarray/miniexpr-eval.py b/bench/ndarray/miniexpr-eval.py new file mode 100644 index 00000000..c7b89035 --- /dev/null +++ b/bench/ndarray/miniexpr-eval.py @@ -0,0 +1,47 @@ +from time import time +import blosc2 +import numpy as np +import numexpr as ne + +N = 10_000 +# dtype= np.int32 +dtype= np.float32 +# dtype= np.float64 +cparams = blosc2.CParams(codec=blosc2.Codec.BLOSCLZ, clevel=1) + +t0 = time() +# a = blosc2.ones((N, N), dtype=dtype, cparams=cparams) +# a = blosc2.arange(np.prod((N, N)), shape=(N, N), dtype=dtype, cparams=cparams) +a = blosc2.linspace(0., 1., np.prod((N, N)), shape=(N, N), dtype=dtype, cparams=cparams) +print(f"Time to create data: {(time() - t0) * 1000 :.4f} ms") +t0 = time() +b = a.copy() +c = a.copy() +print(f"Time to copy data: {(time() - t0) * 1000 :.4f} ms") + +t0 = time() +res = (2 * a**2 - 3 * b + c + 1.2).compute(cparams=cparams) +t = time() - t0 +print(f"Time to evaluate: {t * 1000 :.4f} ms", end=" ") +print(f"Speed (GB/s): {(a.nbytes * 4 / 1e9) / t:.2f}") +# print(res.info) + +na = a[:] +nb = b[:] +nc = c[:] + +t0 = time() +nres = 2 * na**2 - 3 * nb + nc + 1.2 +nt = time() - t0 +print(f"Time to evaluate with NumPy: {nt * 1000 :.4f} ms", end=" ") +print(f"Speed (GB/s): {(na.nbytes * 4 / 1e9) / nt:.2f}") +print(f"Speedup Blosc2 vs NumPy: {nt / t:.2f}x") +np.testing.assert_allclose(res, nres, rtol=1e-5) + +t0 = time() +neres = ne.evaluate("2 * na**2 - 3 * nb + nc + 1.2") +net = time() - t0 +print(f"Time to evaluate with NumExpr: {net * 1000 :.4f} ms", end=" ") +print(f"Speed (GB/s): {(na.nbytes * 4 / 1e9) / net:.2f}") +print(f"Speedup Blosc2 vs NumExpr: {net / t:.2f}x") +np.testing.assert_allclose(res, neres, rtol=1e-5) diff --git a/bench/ndarray/miniexpr-reduct-sum-multi.py b/bench/ndarray/miniexpr-reduct-sum-multi.py new file mode 100644 index 00000000..badd9647 --- /dev/null +++ b/bench/ndarray/miniexpr-reduct-sum-multi.py @@ -0,0 +1,49 @@ +from time import time +import blosc2 +import numpy as np +import numexpr as ne + +N = 10_000 +dtype= np.float32 +cparams = blosc2.CParams(codec=blosc2.Codec.BLOSCLZ, clevel=1) + +t0 = time() +#a = blosc2.ones((N, N), dtype=dtype, cparams=cparams) +#a = blosc2.arange(np.prod((N, N)), shape=(N, N), dtype=dtype, cparams=cparams) +a = blosc2.linspace(0., 1., np.prod((N, N)), shape=(N, N), dtype=dtype, cparams=cparams) +#rng = np.random.default_rng(1234) +#a = rng.integers(0, 2, size=(N, N), dtype=dtype) +#a = blosc2.asarray(a, cparams=cparams, urlpath="a.b2nd", mode="w") +print(f"Time to create data: {(time() - t0) * 1000 :.4f} ms") +t0 = time() +b = a.copy() +c = a.copy() +print(f"Time to copy data: {(time() - t0) * 1000 :.4f} ms") + +t0 = time() +res = blosc2.sum(2 * a**2 - 3 * b + c + 1.2) +t = time() - t0 +print(f"Time to evaluate: {t * 1000 :.4f} ms", end=" ") +print(f"Speed (GB/s): {(a.nbytes * 3 / 1e9) / t:.2f}") +print("Result:", res, "Mean:", res / (N * N)) + +na = a[:] +nb = b[:] +nc = c[:] + +t0 = time() +nres = np.sum(2 * na**2 - 3 * nb + nc + 1.2) +nt = time() - t0 +print(f"Time to evaluate with NumPy: {nt * 1000 :.4f} ms", end=" ") +print(f"Speed (GB/s): {(na.nbytes * 3 / 1e9) / nt:.2f}") +print("Result:", nres, "Mean:", nres / (N * N)) +print(f"Speedup Blosc2 vs NumPy: {nt / t:.2f}x") +assert np.allclose(res, nres) + +t0 = time() +neres = ne.evaluate("sum(2 * na**2 - 3 * nb + nc + 1.2)") +net = time() - t0 +print(f"Time to evaluate with NumExpr: {net * 1000 :.4f} ms", end=" ") +print(f"Speed (GB/s): {(na.nbytes * 3 / 1e9) / net:.2f}") +print("Result:", neres, "Mean:", neres / (N * N)) +print(f"Speedup Blosc2 vs NumExpr: {net / t:.2f}x") diff --git a/bench/ndarray/miniexpr-reduct-sum.py b/bench/ndarray/miniexpr-reduct-sum.py new file mode 100644 index 00000000..e8a76119 --- /dev/null +++ b/bench/ndarray/miniexpr-reduct-sum.py @@ -0,0 +1,42 @@ +from time import time +import blosc2 +import numpy as np +import numexpr as ne + +N = 10_000 +# dtype= np.int32 +dtype= np.float32 +# dtype= np.float64 +cparams = blosc2.CParams(codec=blosc2.Codec.BLOSCLZ, clevel=1) + +t0 = time() +# a = blosc2.ones((N, N), dtype=dtype, cparams=cparams) +# a = blosc2.arange(np.prod((N, N)), shape=(N, N), dtype=dtype, cparams=cparams) +a = blosc2.linspace(0., 1., np.prod((N, N)), shape=(N, N), dtype=dtype, cparams=cparams) +print(f"Time to create data: {(time() - t0) * 1000 :.4f} ms") + +t0 = time() +res = blosc2.sum(a) +t = time() - t0 +print(f"Time to evaluate: {t * 1000 :.4f} ms", end=" ") +print(f"Speed (GB/s): {(a.nbytes / 1e9) / t:.2f}") +print("Result:", res, "Mean:", res / (N * N)) + +na = a[:] + +t0 = time() +nres = np.sum(na) +nt = time() - t0 +print(f"Time to evaluate with NumPy: {nt * 1000 :.4f} ms", end=" ") +print(f"Speed (GB/s): {(na.nbytes / 1e9) / nt:.2f}") +print("Result:", nres, "Mean:", nres / (N * N)) +print(f"Speedup Blosc2 vs NumPy: {nt / t:.2f}x") +assert np.allclose(res, nres) + +t0 = time() +neres = ne.evaluate("sum(na)") +net = time() - t0 +print(f"Time to evaluate with NumExpr: {net * 1000 :.4f} ms", end=" ") +print(f"Speed (GB/s): {(na.nbytes / 1e9) / net:.2f}") +print("Result:", neres, "Mean:", neres / (N * N)) +print(f"Speedup Blosc2 vs NumExpr: {net / t:.2f}x") diff --git a/pyproject.toml b/pyproject.toml index 93073710..86cf1999 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -65,7 +65,8 @@ dev = [ test = [ "pytest", "psutil; platform_machine != 'wasm32'", - "torch; platform_machine != 'wasm32'", + # torch is optional because it is quite large (but will still be used if found) + # "torch; platform_machine != 'wasm32'", ] doc = [ "sphinx>=8", diff --git a/src/blosc2/blosc2_ext.pyx b/src/blosc2/blosc2_ext.pyx index 91276883..c36181e6 100644 --- a/src/blosc2/blosc2_ext.pyx +++ b/src/blosc2/blosc2_ext.pyx @@ -23,12 +23,13 @@ from cpython cimport ( PyBytes_FromStringAndSize, PyObject_GetBuffer, ) +from cpython.ref cimport Py_INCREF, Py_DECREF from cpython.pycapsule cimport PyCapsule_GetPointer, PyCapsule_New from cython.operator cimport dereference from libc.stdint cimport uintptr_t -from libc.stdlib cimport free, malloc, realloc +from libc.stdlib cimport free, malloc, realloc, calloc from libc.stdlib cimport abs as c_abs -from libc.string cimport memcpy, strcpy, strdup, strlen +from libc.string cimport memcpy, memset, strcpy, strdup, strlen from libcpp cimport bool as c_bool from enum import Enum @@ -53,6 +54,8 @@ cdef extern from "": ctypedef unsigned int uint32_t ctypedef unsigned long long uint64_t +cdef extern from "": + int printf(const char *format, ...) nogil cdef extern from "blosc2.h": @@ -179,7 +182,7 @@ cdef extern from "blosc2.h": int blosc2_free_resources() int blosc2_cbuffer_sizes(const void* cbuffer, int32_t* nbytes, - int32_t* cbytes, int32_t* blocksize) + int32_t* cbytes, int32_t* blocksize) nogil int blosc1_cbuffer_validate(const void* cbuffer, size_t cbytes, size_t* nbytes) @@ -206,6 +209,7 @@ cdef extern from "blosc2.h": uint8_t* ttmp size_t ttmp_nbytes blosc2_context* ctx + c_bool output_is_disposable ctypedef struct blosc2_postfilter_params: void *user_data @@ -257,7 +261,7 @@ cdef extern from "blosc2.h": blosc2_context* blosc2_create_cctx(blosc2_cparams cparams) nogil - blosc2_context* blosc2_create_dctx(blosc2_dparams dparams) + blosc2_context* blosc2_create_dctx(blosc2_dparams dparams) nogil void blosc2_free_ctx(blosc2_context * context) nogil @@ -280,7 +284,7 @@ cdef extern from "blosc2.h": int blosc2_getitem_ctx(blosc2_context* context, const void* src, int32_t srcsize, int start, int nitems, void* dest, - int32_t destsize) + int32_t destsize) nogil @@ -518,13 +522,77 @@ cdef extern from "b2nd.h": int64_t *buffershape, int64_t buffersize) int b2nd_from_schunk(blosc2_schunk *schunk, b2nd_array_t **array) - void blosc2_unidim_to_multidim(uint8_t ndim, int64_t *shape, int64_t i, int64_t *index) + void blosc2_unidim_to_multidim(uint8_t ndim, int64_t *shape, int64_t i, int64_t *index) nogil int b2nd_copy_buffer2(int8_t ndim, int32_t itemsize, const void *src, const int64_t *src_pad_shape, const int64_t *src_start, const int64_t *src_stop, void *dst, const int64_t *dst_pad_shape, - const int64_t *dst_start); + const int64_t *dst_start) + + +# miniexpr C API declarations +cdef extern from "miniexpr.h": + ctypedef enum me_dtype: + ME_AUTO, + ME_BOOL + ME_INT8 + ME_INT16 + ME_INT32 + ME_INT64 + ME_UINT8 + ME_UINT16 + ME_UINT32 + ME_UINT64 + ME_FLOAT32 + ME_FLOAT64 + ME_COMPLEX64 + ME_COMPLEX128 + + # typedef struct me_variable + ctypedef struct me_variable: + const char *name + me_dtype dtype + const void *address + int type + void *context + + ctypedef struct me_expr: + int type + double value + const double *bound + const void *function + void *output + int nitems + me_dtype dtype + me_dtype input_dtype + void *bytecode + int ncode + void *parameters[1] + + int me_compile(const char *expression, const me_variable *variables, + int var_count, me_dtype dtype, int *error, me_expr **out) + + cdef enum me_compile_status: + ME_COMPILE_SUCCESS + ME_COMPILE_ERR_OOM + ME_COMPILE_ERR_PARSE + ME_COMPILE_ERR_INVALID_ARG + ME_COMPILE_ERR_COMPLEX_UNSUPPORTED + ME_COMPILE_ERR_REDUCTION_INVALID + ME_COMPILE_ERR_VAR_MIXED + ME_COMPILE_ERR_VAR_UNSPECIFIED + ME_COMPILE_ERR_INVALID_ARG_TYPE + + int me_eval(const me_expr *expr, const void ** vars_chunk, + int n_vars, void *output_chunk, int chunk_nitems) nogil + + void me_print(const me_expr *n) nogil + void me_free(me_expr *n) nogil + + +cdef extern from "miniexpr_numpy.h": + me_dtype me_dtype_from_numpy(int numpy_type_num) ctypedef struct user_filters_udata: @@ -547,6 +615,15 @@ ctypedef struct udf_udata: int64_t chunks_in_array[B2ND_MAX_DIM] int64_t blocks_in_chunk[B2ND_MAX_DIM] +ctypedef struct me_udata: + b2nd_array_t** inputs + int ninputs + b2nd_array_t *array + void* aux_reduc_ptr + int64_t chunks_in_array[B2ND_MAX_DIM] + int64_t blocks_in_chunk[B2ND_MAX_DIM] + me_expr* miniexpr_handle + MAX_TYPESIZE = BLOSC2_MAXTYPESIZE MAX_BUFFERSIZE = BLOSC2_MAX_BUFFERSIZE MAX_BLOCKSIZE = BLOSC2_MAXBLOCKSIZE @@ -770,7 +847,8 @@ cdef _check_cparams(blosc2_cparams *cparams): if ufilters[i] and cparams.filters[i] in blosc2.ufilters_registry.keys(): raise ValueError("Cannot use multi-threading with user defined Python filters") - if cparams.prefilter != NULL: + if cparams.prefilter != NULL and cparams.prefilter != miniexpr_prefilter: + # Note: miniexpr_prefilter uses miniexpr C API which is thread-friendly, raise ValueError("`nthreads` must be 1 when a prefilter is set") cdef _check_dparams(blosc2_dparams* dparams, blosc2_cparams* cparams=NULL): @@ -1368,6 +1446,7 @@ cdef class SChunk: cdef int size cdef int32_t len_chunk = (buf.len + BLOSC2_MAX_OVERHEAD) cdef uint8_t* chunk = malloc(len_chunk) + self.schunk.current_nchunk = nchunk # prefilter needs this value to be set if RELEASEGIL: with nogil: # No need to create another cctx @@ -1406,6 +1485,7 @@ cdef class SChunk: cdef int size cdef int32_t len_chunk = (buf.len + BLOSC2_MAX_OVERHEAD) cdef uint8_t* chunk = malloc(len_chunk) + self.schunk.current_nchunk = nchunk # prefilter needs this value to be set if RELEASEGIL: with nogil: size = blosc2_compress_ctx(self.schunk.cctx, buf.buf, buf.len, chunk, len_chunk) @@ -1428,6 +1508,22 @@ cdef class SChunk: raise RuntimeError("Could not update the desired chunk") return rc + # This is used internally for prefiltering + def _prefilter_data(self, nchunk, data, chunk_data): + cdef Py_buffer buf + PyObject_GetBuffer(data, &buf, PyBUF_SIMPLE) + cdef Py_buffer chunk_buf + PyObject_GetBuffer(chunk_data, &chunk_buf, PyBUF_SIMPLE) + self.schunk.current_nchunk = nchunk # prefilter needs this value to be set + cdef int size = blosc2_compress_ctx(self.schunk.cctx, buf.buf, buf.len, chunk_buf.buf, chunk_buf.len) + PyBuffer_Release(&buf) + PyBuffer_Release(&chunk_buf) + if size < 0: + raise RuntimeError("Could not compress the data") + elif size == 0: + raise RuntimeError("The result could not fit ") + return size + def get_slice(self, start=0, stop=None, out=None): cdef int64_t nitems = self.schunk.nbytes // self.schunk.typesize start, stop, _ = slice(start, stop, 1).indices(nitems) @@ -1616,7 +1712,7 @@ cdef class SChunk: cdef blosc2_cparams* cparams = self.schunk.storage.cparams cparams.prefilter = general_filler - cdef blosc2_prefilter_params* preparams = malloc(sizeof(blosc2_prefilter_params)) + cdef blosc2_prefilter_params* preparams = calloc(1, sizeof(blosc2_prefilter_params)) cdef filler_udata* fill_udata = malloc(sizeof(filler_udata)) fill_udata.py_func = malloc(strlen(func_id) + 1) strcpy(fill_udata.py_func, func_id) @@ -1649,7 +1745,7 @@ cdef class SChunk: cdef blosc2_cparams* cparams = self.schunk.storage.cparams cparams.prefilter = general_prefilter - cdef blosc2_prefilter_params* preparams = malloc(sizeof(blosc2_prefilter_params)) + cdef blosc2_prefilter_params* preparams = calloc(1, sizeof(blosc2_prefilter_params)) cdef user_filters_udata* pref_udata = malloc(sizeof(user_filters_udata)) pref_udata.py_func = malloc(strlen(func_id) + 1) strcpy(pref_udata.py_func, func_id) @@ -1661,24 +1757,47 @@ cdef class SChunk: cparams.preparams = preparams _check_cparams(cparams) - blosc2_free_ctx(self.schunk.cctx) + if self.schunk.cctx != NULL: + # Freeing NULL context can lead to segmentation fault + blosc2_free_ctx(self.schunk.cctx) self.schunk.cctx = blosc2_create_cctx(dereference(cparams)) if self.schunk.cctx == NULL: raise RuntimeError("Could not create compression context") cpdef remove_prefilter(self, func_name, _new_ctx=True): - if func_name is not None: + cdef udf_udata* udf_data + cdef user_filters_udata* udata + + if func_name is not None and func_name in blosc2.prefilter_funcs: del blosc2.prefilter_funcs[func_name] - # From Python the preparams->udata with always have the field py_func - cdef user_filters_udata * udata = self.schunk.storage.cparams.preparams.user_data - free(udata.py_func) - free(self.schunk.storage.cparams.preparams.user_data) - free(self.schunk.storage.cparams.preparams) + # Clean up the miniexpr handle if this is a miniexpr_prefilter + if self.schunk.storage.cparams.prefilter == miniexpr_prefilter: + if self.schunk.storage.cparams.preparams != NULL: + me_data = self.schunk.storage.cparams.preparams.user_data + if me_data != NULL: + if me_data.inputs != NULL: + free(me_data.inputs) + if me_data.miniexpr_handle != NULL: # XXX do we really need the conditional? + me_free(me_data.miniexpr_handle) + free(me_data) + elif self.schunk.storage.cparams.prefilter != NULL: + # From Python the preparams->udata with always have the field py_func + if self.schunk.storage.cparams.preparams != NULL: + udata = self.schunk.storage.cparams.preparams.user_data + if udata != NULL: + if udata.py_func != NULL: + free(udata.py_func) + free(udata) + + if self.schunk.storage.cparams.preparams != NULL: + free(self.schunk.storage.cparams.preparams) self.schunk.storage.cparams.preparams = NULL self.schunk.storage.cparams.prefilter = NULL - blosc2_free_ctx(self.schunk.cctx) + if self.schunk.cctx != NULL: + # Freeing NULL context can lead to segmentation fault + blosc2_free_ctx(self.schunk.cctx) if _new_ctx: self.schunk.cctx = blosc2_create_cctx(dereference(self.schunk.storage.cparams)) if self.schunk.cctx == NULL: @@ -1741,6 +1860,107 @@ cdef int general_filler(blosc2_prefilter_params *params): return 0 +# Auxiliary function for just miniexpr as a prefilter +# Only meant for (input and output) arrays that: +# 1) Are blosc2.NDArray objects +# 2) Do not have padding +cdef int aux_miniexpr(me_udata *udata, int64_t nchunk, int32_t nblock, + c_bool is_postfilter, uint8_t *params_output, int32_t typesize) nogil: + # Declare all C variables at the beginning + cdef int64_t chunk_ndim[B2ND_MAX_DIM] + cdef int64_t block_ndim[B2ND_MAX_DIM] + cdef int64_t start_ndim[B2ND_MAX_DIM] + cdef int64_t stop_ndim[B2ND_MAX_DIM] + cdef int64_t buffershape[B2ND_MAX_DIM] + + cdef b2nd_array_t* ndarr + cdef int rc + cdef void** input_buffers = malloc(udata.ninputs * sizeof(uint8_t*)) + cdef float *buf + cdef void* src + cdef int32_t chunk_nbytes, chunk_cbytes, block_nbytes + cdef int start, blocknitems, expected_blocknitems + cdef int32_t input_typesize + cdef blosc2_context* dctx + expected_blocknitems = -1 + for i in range(udata.ninputs): + ndarr = udata.inputs[i] + input_buffers[i] = malloc(ndarr.sc.blocksize) + # A way to check for top speed + if False: + buf = input_buffers[i] + for j in range(ndarr.blocknitems): + buf[j] = 1. + else: + src = ndarr.sc.data[nchunk] + rc = blosc2_cbuffer_sizes(src, &chunk_nbytes, &chunk_cbytes, &block_nbytes) + if rc < 0: + raise ValueError("miniexpr: error getting cbuffer sizes") + input_typesize = ndarr.sc.typesize + blocknitems = block_nbytes // input_typesize + if expected_blocknitems == -1: + expected_blocknitems = blocknitems + elif blocknitems != expected_blocknitems: + raise ValueError("miniexpr: inconsistent block element counts across inputs") + start = nblock * blocknitems + # A way to check for top speed + if False: + # Unsafe, but it works for special arrays (e.g. blosc2.ones), and can be fast + dctx = ndarr.sc.dctx + else: + # This can add a significant overhead, but it is needed for thread safety. + # Perhaps one can create a specific (serial) context just for blosc2_getitem_ctx? + dctx = blosc2_create_dctx(BLOSC2_DPARAMS_DEFAULTS) + if nchunk * ndarr.chunknitems + start + blocknitems > ndarr.nitems: + blocknitems = ndarr.nitems - (nchunk * ndarr.chunknitems + start) + if blocknitems <= 0: + # Should never happen, but anyway + continue + rc = blosc2_getitem_ctx(dctx, src, chunk_cbytes, start, blocknitems, + input_buffers[i], block_nbytes) + blosc2_free_ctx(dctx) + if rc < 0: + raise ValueError("miniexpr: error decompressing the chunk") + + cdef me_expr* miniexpr_handle = udata.miniexpr_handle + cdef void* aux_reduc_ptr + # Calculate blocks per chunk using CEILING division (chunks are padded to fit whole blocks) + cdef int nblocks_per_chunk = (udata.array.chunknitems + udata.array.blocknitems - 1) // udata.array.blocknitems + # Calculate the global linear block index: nchunk * blocks_per_chunk + nblock + # This works because blocks never span chunks (chunks are padded to block boundaries) + cdef int64_t linear_block_index = nchunk * nblocks_per_chunk + nblock + cdef uintptr_t offset_bytes = typesize * linear_block_index + + if miniexpr_handle == NULL: + raise ValueError("miniexpr: handle not assigned") + + # Skip evaluation if blocknitems is invalid (can happen for padding blocks beyond data) + if blocknitems <= 0: + # Free resources + for i in range(udata.ninputs): + free(input_buffers[i]) + free(input_buffers) + return 0 + + # Call thread-safe miniexpr C API + if udata.aux_reduc_ptr == NULL: + rc = me_eval(miniexpr_handle, input_buffers, udata.ninputs, + params_output, blocknitems) + else: + # Reduction operation + aux_reduc_ptr = ( udata.aux_reduc_ptr + offset_bytes) + rc = me_eval(miniexpr_handle, input_buffers, udata.ninputs, aux_reduc_ptr, blocknitems) + if rc != 0: + raise RuntimeError(f"miniexpr: issues during evaluation; error code: {rc}") + + # Free resources + for i in range(udata.ninputs): + free(input_buffers[i]) + free(input_buffers) + + return 0 + + # Aux function for prefilter and postfilter udf cdef int aux_udf(udf_udata *udata, int64_t nchunk, int32_t nblock, c_bool is_postfilter, uint8_t *params_output, int32_t typesize): @@ -1814,6 +2034,11 @@ cdef int aux_udf(udf_udata *udata, int64_t nchunk, int32_t nblock, return 0 +cdef int miniexpr_prefilter(blosc2_prefilter_params *params): + return aux_miniexpr( params.user_data, params.nchunk, params.nblock, False, + params.output, params.output_typesize) + + cdef int general_udf_prefilter(blosc2_prefilter_params *params): cdef udf_udata *udata = params.user_data return aux_udf(udata, params.nchunk, params.nblock, False, params.output, params.output_typesize) @@ -2361,6 +2586,10 @@ cdef class NDArray: self.array = PyCapsule_GetPointer(array, "b2nd_array_t*") self.base = base # add reference to base if NDArray is a view + @property + def c_array(self): + return self.array + @property def shape(self) -> tuple[int]: return tuple([self.array.shape[i] for i in range(self.array.ndim)]) @@ -2607,11 +2836,11 @@ cdef class NDArray: def as_ffi_ptr(self): return PyCapsule_New(self.array, "b2nd_array_t*", NULL) - cdef udf_udata *_fill_udf_udata(self, func_id, inputs_id): + cdef udf_udata *_fill_udf_udata(self, func_id, inputs): cdef udf_udata *udata = malloc(sizeof(udf_udata)) udata.py_func = malloc(strlen(func_id) + 1) strcpy(udata.py_func, func_id) - udata.inputs_id = inputs_id + udata.inputs_id = id(inputs) udata.output_cdtype = np.dtype(self.dtype).num udata.array = self.array # Save these in udf_udata to avoid computing them for each block @@ -2621,6 +2850,81 @@ cdef class NDArray: return udata + cdef me_udata *_fill_me_udata(self, inputs, aux_reduc): + cdef me_udata *udata = malloc(sizeof(me_udata)) + operands = list(inputs.values()) + ninputs = len(operands) + cdef b2nd_array_t** inputs_ = malloc(ninputs * sizeof(b2nd_array_t*)) + for i, operand in enumerate(operands): + inputs_[i] = operand.c_array + udata.inputs = inputs_ + udata.ninputs = ninputs + udata.array = self.array + cdef void* aux_reduc_ptr = NULL + if aux_reduc is not None: + if not isinstance(aux_reduc, np.ndarray): + raise TypeError("aux_reduc must be a NumPy array") + aux_reduc_ptr = np.PyArray_DATA( aux_reduc) + udata.aux_reduc_ptr = aux_reduc_ptr + # Save these in udf_udata to avoid computing them for each block + for i in range(self.array.ndim): + udata.chunks_in_array[i] = udata.array.extshape[i] // udata.array.chunkshape[i] + udata.blocks_in_chunk[i] = udata.array.extchunkshape[i] // udata.array.blockshape[i] + + return udata + + def _set_pref_expr(self, expression, inputs, aux_reduc=None): + # Set prefilter for miniexpr + cdef blosc2_cparams* cparams = self.array.sc.storage.cparams + cparams.prefilter = miniexpr_prefilter + + cdef me_udata* udata = self._fill_me_udata(inputs, aux_reduc) + + # Get the compiled expression handle for multi-threading + cdef Py_ssize_t n = len(inputs) + cdef me_variable* variables = malloc(sizeof(me_variable) * n) + if variables == NULL: + raise MemoryError() + cdef me_variable *var + for i, (k, v) in enumerate(inputs.items()): + var = &variables[i] + var_name = k.encode("utf-8") if isinstance(k, str) else k + var.name = malloc(strlen(var_name) + 1) + strcpy(var.name, var_name) + var.dtype = me_dtype_from_numpy(v.dtype.num) + var.address = NULL # chunked compile: addresses provided later + var.type = 0 # auto-set to ME_VARIABLE inside compiler + var.context = NULL + + cdef int error = 0 + expression = expression.encode("utf-8") if isinstance(expression, str) else expression + cdef me_dtype = me_dtype_from_numpy(self.dtype.num) + cdef me_expr *out_expr + cdef int rc = me_compile(expression, variables, n, me_dtype, &error, &out_expr) + if rc == ME_COMPILE_ERR_INVALID_ARG_TYPE: + raise TypeError(f"miniexpr does not support operand or output dtype: {expression}") + if rc != ME_COMPILE_SUCCESS: + raise NotImplementedError(f"Cannot compile expression: {expression}") + udata.miniexpr_handle = out_expr + + # Free resources + for i in range(len(inputs)): + free(variables[i].name) + free(variables) + + cdef blosc2_prefilter_params* preparams = calloc(1, sizeof(blosc2_prefilter_params)) + preparams.user_data = udata + preparams.output_is_disposable = False if aux_reduc is None else True + cparams.preparams = preparams + _check_cparams(cparams) + + if self.array.sc.cctx != NULL: + # Freeing NULL context can lead to segmentation fault + blosc2_free_ctx(self.array.sc.cctx) + self.array.sc.cctx = blosc2_create_cctx(dereference(cparams)) + if self.array.sc.cctx == NULL: + raise RuntimeError("Could not create compression context") + def _set_pref_udf(self, func, inputs_id): if self.array.sc.storage.cparams.nthreads > 1: raise AttributeError("compress `nthreads` must be 1 when assigning a prefilter") @@ -2633,7 +2937,7 @@ cdef class NDArray: cdef blosc2_cparams* cparams = self.array.sc.storage.cparams cparams.prefilter = general_udf_prefilter - cdef blosc2_prefilter_params* preparams = malloc(sizeof(blosc2_prefilter_params)) + cdef blosc2_prefilter_params* preparams = calloc(1, sizeof(blosc2_prefilter_params)) preparams.user_data = self._fill_udf_udata(func_id, inputs_id) cparams.preparams = preparams _check_cparams(cparams) @@ -2661,7 +2965,9 @@ cdef class NDArray: dparams.postparams = postparams _check_dparams(dparams, self.array.sc.storage.cparams) - blosc2_free_ctx(self.array.sc.dctx) + if self.array.sc.dctx != NULL: + # Freeing NULL context can lead to segmentation fault + blosc2_free_ctx(self.array.sc.dctx) self.array.sc.dctx = blosc2_create_dctx(dereference(dparams)) if self.array.sc.dctx == NULL: raise RuntimeError("Could not create decompression context") diff --git a/src/blosc2/core.py b/src/blosc2/core.py index 4b348c79..15f58ed3 100644 --- a/src/blosc2/core.py +++ b/src/blosc2/core.py @@ -1547,12 +1547,14 @@ def compute_chunks_blocks( # noqa: C901 # min_blocksize = blosc2.cpu_info["l1_data_cache_size"] * 4 elif platform.system() == "Darwin" and "arm" in platform.machine(): # For Apple Silicon, experiments say we can use 4x the L1 size - min_blocksize = blosc2.cpu_info["l1_data_cache_size"] * 4 + # min_blocksize = blosc2.cpu_info["l1_data_cache_size"] * 4 + # However, let's adjust for several operands in cache, so let's use just L1 + min_blocksize = blosc2.cpu_info["l1_data_cache_size"] * 1 elif "l1_data_cache_size" in blosc2.cpu_info and isinstance( blosc2.cpu_info["l1_data_cache_size"], int ): - # For other archs, we don't have hints; be conservative and use 2x the L1 size - min_blocksize = blosc2.cpu_info["l1_data_cache_size"] * 2 + # For other archs, we don't have hints; be conservative and use 1x the L1 size + min_blocksize = blosc2.cpu_info["l1_data_cache_size"] * 1 if blocksize < min_blocksize: blocksize = min_blocksize diff --git a/src/blosc2/lazyexpr.py b/src/blosc2/lazyexpr.py index 7613c9f2..2247995a 100644 --- a/src/blosc2/lazyexpr.py +++ b/src/blosc2/lazyexpr.py @@ -91,6 +91,13 @@ safe_numpy_globals["matrix_transpose"] = np.transpose safe_numpy_globals["vecdot"] = npvecdot +# Set this to False if miniexpr should not be tried out +try_miniexpr = True +if blosc2.IS_WASM: + try_miniexpr = False +if sys.platform == "win32": + try_miniexpr = False + def ne_evaluate(expression, local_dict=None, **kwargs): """Safely evaluate expressions using numexpr when possible, falling back to numpy.""" @@ -1216,12 +1223,24 @@ def fast_eval( # noqa: C901 :ref:`NDArray` or np.ndarray The output array. """ + global try_miniexpr + + # Use a local copy so we don't modify the global + use_miniexpr = try_miniexpr + + # Disable miniexpr for UDFs (callable expressions) + if callable(expression): + use_miniexpr = False + out = kwargs.pop("_output", None) ne_args: dict = kwargs.pop("_ne_args", {}) if ne_args is None: ne_args = {} dtype = kwargs.pop("dtype", None) where: dict | None = kwargs.pop("_where_args", None) + if where is not None: + # miniexpr does not support where(); use the regular path. + use_miniexpr = False if isinstance(out, blosc2.NDArray): # If 'out' has been passed, and is a NDArray, use it as the base array basearr = out @@ -1261,6 +1280,62 @@ def fast_eval( # noqa: C901 # WebAssembly does not support threading, so we cannot use the iter_disk option iter_disk = False + # Check whether we can use miniexpr + # Miniexpr only supports a subset of functions - disable for unsupported ones + unsupported_funcs = [ + "clip", + "maximum", + "minimum", + "contains", + ] + reducers # miniexpr doesn't support reduction functions + + if isinstance(expression, str) and any(func in expression for func in unsupported_funcs): + use_miniexpr = False + + if use_miniexpr: + # Avoid padding issues except for 1D arrays (contiguous along the only axis). + if len(shape) != 1 and builtins.any(s % c != 0 for s, c in zip(shape, chunks, strict=True)): + use_miniexpr = False + for op in operands.values(): + # Only NDArray in-memory operands + if not (isinstance(op, blosc2.NDArray) and op.urlpath is None and out is None): + use_miniexpr = False + break + # Ensure blocks fit exactly in chunks for the n-dim case + blocks_fit = builtins.all(c % b == 0 for c, b in zip(op.chunks, op.blocks, strict=True)) + if len(op.shape) != 1 and not blocks_fit: + use_miniexpr = False + break + + if use_miniexpr: + cparams = kwargs.pop("cparams", blosc2.CParams()) + # All values will be overwritten, so we can use an uninitialized array + res_eval = blosc2.uninit(shape, dtype, chunks=chunks, blocks=blocks, cparams=cparams, **kwargs) + try: + print("expr->miniexpr:", expression) + res_eval._set_pref_expr(expression, operands) + # Data to compress is fetched from operands, so it can be uninitialized here + data = np.empty(res_eval.schunk.chunksize, dtype=np.uint8) + # Exercise prefilter for each chunk + for nchunk in range(res_eval.schunk.nchunks): + res_eval.schunk.update_data(nchunk, data, copy=False) + except Exception: + use_miniexpr = False + finally: + res_eval.schunk.remove_prefilter("miniexpr") + global iter_chunks + # Ensure any background reading thread is closed + iter_chunks = None + + if not use_miniexpr: + # If miniexpr failed, fallback to regular evaluation + # (continue to the manual chunked evaluation below) + pass + else: + if getitem: + return res_eval[()] + return res_eval + chunk_operands = {} # Check which chunks intersect with _slice all_chunks = get_intersecting_chunks((), shape, chunks) # if _slice is (), returns all chunks @@ -1744,6 +1819,8 @@ def infer_reduction_dtype(dtype, operation): dtype, np.float32 if dtype in (np.float32, np.complex64) else blosc2.DEFAULT_FLOAT ) if operation in {ReduceOp.SUM, ReduceOp.PROD}: + if np.issubdtype(dtype, np.bool_): + return np.int64 if np.issubdtype(dtype, np.unsignedinteger): return np.result_type(dtype, np.uint64) return np.result_type(dtype, np.int64 if np.issubdtype(dtype, np.integer) else my_float) @@ -1808,6 +1885,11 @@ def reduce_slices( # noqa: C901 :ref:`NDArray` or np.ndarray The resulting output array. """ + global try_miniexpr + + # Use a local copy so we don't modify the global + use_miniexpr = try_miniexpr # & False + out = kwargs.pop("_output", None) res_out_ = None # temporary required to store max/min for argmax/argmin ne_args: dict = kwargs.pop("_ne_args", {}) @@ -1815,6 +1897,7 @@ def reduce_slices( # noqa: C901 ne_args = {} where: dict | None = kwargs.pop("_where_args", None) reduce_op = reduce_args.pop("op") + reduce_op_str = reduce_args.pop("op_str", None) axis = reduce_args["axis"] keepdims = reduce_args["keepdims"] dtype = reduce_args.get("dtype", None) @@ -1861,7 +1944,10 @@ def reduce_slices( # noqa: C901 # Note: we could have expr = blosc2.lazyexpr('numpy_array + 1') (i.e. no choice for chunks) blosc2_arrs = tuple(o for o in operands.values() if hasattr(o, "chunks")) fast_path = False + all_ndarray = False + any_persisted = False chunks = None + blocks = None if blosc2_arrs: # fast path only relevant if there are blosc2 arrays operand = max(blosc2_arrs, key=lambda x: len(x.shape)) @@ -1871,9 +1957,11 @@ def reduce_slices( # noqa: C901 same_chunks = all(operand.chunks == o.chunks for o in operands.values() if hasattr(o, "chunks")) same_blocks = all(operand.blocks == o.blocks for o in operands.values() if hasattr(o, "blocks")) fast_path = same_shape and same_chunks and same_blocks and (0 not in operand.chunks) - aligned, iter_disk = dict.fromkeys(operands.keys(), False), False + aligned = dict.fromkeys(operands.keys(), False) + iter_disk = False if fast_path: chunks = operand.chunks + blocks = operand.blocks # Check that all operands are NDArray for fast path all_ndarray = all( isinstance(value, blosc2.NDArray) and value.shape != () for value in operands.values() @@ -1907,6 +1995,73 @@ def reduce_slices( # noqa: C901 chunks = temp.chunks del temp + # miniexpr reduction path only supported for some cases so far + if not (where is None and fast_path and all_ndarray and not any_persisted and reduced_shape == ()): + use_miniexpr = False + + # Some reductions are not supported yet in miniexpr + if reduce_op in (ReduceOp.ARGMAX, ReduceOp.ARGMIN): + use_miniexpr = False + + # Only behaved partitions are supported in miniexpr reductions + if use_miniexpr: + # Avoid padding issues except for 1D arrays (contiguous along the only axis). + if len(shape) != 1 and builtins.any(s % c != 0 for s, c in zip(shape, chunks, strict=True)): + use_miniexpr = False + if use_miniexpr and isinstance(expression, str): + has_complex = any( + isinstance(op, blosc2.NDArray) and blosc2.isdtype(op.dtype, "complex floating") + for op in operands.values() + ) + if has_complex and any(tok in expression for tok in ("!=", "==", "<=", ">=", "<", ">")): + use_miniexpr = False + for op in operands.values(): + # Ensure blocks fit exactly in chunks for the n-dim case + blocks_fit = builtins.all(c % b == 0 for c, b in zip(op.chunks, op.blocks, strict=True)) + if len(op.shape) != 1 and not blocks_fit: + use_miniexpr = False + break + + if use_miniexpr: + # Experiments say that not splitting is best (at least on Apple Silicon M4 Pro) + cparams = kwargs.pop("cparams", blosc2.CParams(splitmode=blosc2.SplitMode.NEVER_SPLIT)) + # Create a fake NDArray just to drive the miniexpr evaluation (values won't be used) + res_eval = blosc2.uninit(shape, dtype, chunks=chunks, blocks=blocks, cparams=cparams, **kwargs) + # Compute the number of blocks in the result + nblocks = res_eval.nbytes // res_eval.blocksize + # Initialize to zeros since some blocks may be padding and won't be written + aux_reduc = np.zeros(nblocks, dtype=dtype) + try: + print("expr->miniexpr:", expression, reduce_op) + expression = f"{reduce_op_str}({expression})" + res_eval._set_pref_expr(expression, operands, aux_reduc) + # Data won't even try to be compressed, so buffers can be unitialized and reused + data = np.empty(res_eval.schunk.chunksize, dtype=np.uint8) + chunk_data = np.empty(res_eval.schunk.chunksize + blosc2.MAX_OVERHEAD, dtype=np.uint8) + # Exercise prefilter for each chunk + for nchunk in range(res_eval.schunk.nchunks): + res_eval.schunk._prefilter_data(nchunk, data, chunk_data) + except Exception: + use_miniexpr = False + finally: + res_eval.schunk.remove_prefilter("miniexpr") + global iter_chunks + # Ensure any background reading thread is closed + iter_chunks = None + + if not use_miniexpr: + # If miniexpr failed, fallback to regular evaluation + # (continue to the manual chunked evaluation below) + pass + else: + if reduce_op == ReduceOp.ANY: + result = np.any(aux_reduc, **reduce_args) + elif reduce_op == ReduceOp.ALL: + result = np.all(aux_reduc, **reduce_args) + else: + result = reduce_op.value.reduce(aux_reduc, **reduce_args) + return result + # Iterate over the operands and get the chunks chunk_operands = {} # Check which chunks intersect with _slice @@ -2129,7 +2284,7 @@ def convert_none_out(dtype, reduce_op, reduced_shape): return out if isinstance(out, tuple) else (out, None) -def chunked_eval( # noqa: C901 +def chunked_eval( expression: str | Callable[[tuple, np.ndarray, tuple[int]], None], operands: dict, item=(), **kwargs ): """ @@ -2223,10 +2378,9 @@ def chunked_eval( # noqa: C901 return slices_eval(expression, operands, getitem=getitem, _slice=item, shape=shape, **kwargs) finally: - # Deactivate cache for NDField instances - for op in operands: - if isinstance(operands[op], blosc2.NDField): - operands[op].ndarr.keep_last_read = False + global iter_chunks + # Ensure any background reading thread is closed + iter_chunks = None def fuse_operands(operands1, operands2): @@ -2688,6 +2842,7 @@ def where(self, value1=None, value2=None): def sum(self, axis=None, dtype=None, keepdims=False, **kwargs): reduce_args = { "op": ReduceOp.SUM, + "op_str": "sum", "axis": axis, "dtype": dtype, "keepdims": keepdims, @@ -2697,6 +2852,7 @@ def sum(self, axis=None, dtype=None, keepdims=False, **kwargs): def prod(self, axis=None, dtype=None, keepdims=False, **kwargs): reduce_args = { "op": ReduceOp.PROD, + "op_str": "prod", "axis": axis, "dtype": dtype, "keepdims": keepdims, @@ -2787,6 +2943,7 @@ def var(self, axis=None, dtype=None, keepdims=False, ddof=0, **kwargs): def min(self, axis=None, keepdims=False, **kwargs): reduce_args = { "op": ReduceOp.MIN, + "op_str": "min", "axis": axis, "keepdims": keepdims, } @@ -2795,6 +2952,7 @@ def min(self, axis=None, keepdims=False, **kwargs): def max(self, axis=None, keepdims=False, **kwargs): reduce_args = { "op": ReduceOp.MAX, + "op_str": "max", "axis": axis, "keepdims": keepdims, } @@ -2803,6 +2961,7 @@ def max(self, axis=None, keepdims=False, **kwargs): def any(self, axis=None, keepdims=False, **kwargs): reduce_args = { "op": ReduceOp.ANY, + "op_str": "any", "axis": axis, "keepdims": keepdims, } @@ -2811,6 +2970,7 @@ def any(self, axis=None, keepdims=False, **kwargs): def all(self, axis=None, keepdims=False, **kwargs): reduce_args = { "op": ReduceOp.ALL, + "op_str": "all", "axis": axis, "keepdims": keepdims, } @@ -3363,7 +3523,9 @@ def compute(self, item=(), **kwargs): # # Register a prefilter for eval # res_eval._set_pref_udf(self.func, id(self.inputs)) + # This line would NOT allocate physical RAM on any modern OS: # aux = np.empty(res_eval.shape, res_eval.dtype) + # Physical allocation happens here (when writing): # res_eval[...] = aux # res_eval.schunk.remove_prefilter(self.func.__name__) # res_eval.schunk.cparams.nthreads = self._cnthreads diff --git a/src/blosc2/ndarray.py b/src/blosc2/ndarray.py index 41367ea6..85d4dcb2 100644 --- a/src/blosc2/ndarray.py +++ b/src/blosc2/ndarray.py @@ -3773,7 +3773,7 @@ def ndim(self) -> int: @property def size(self) -> int: - """The size (in bytes) for this container.""" + """The size (in elements) for this container.""" return super().size @property diff --git a/tests/conftest.py b/tests/conftest.py index 35768f59..e4a505dd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -9,6 +9,7 @@ import sys import pytest +import requests import blosc2 @@ -36,15 +37,11 @@ def cat2_context(): yield c2params -# This is to avoid sporadic failures in the CI when reaching network, -# but this makes the tests to stuck in local. Perhaps move this to -# every test module that needs it? -# def pytest_runtest_call(item): -# try: -# item.runtest() -# except requests.ConnectTimeout: -# pytest.skip("Skipping test due to sporadic requests.ConnectTimeout") -# except requests.ReadTimeout: -# pytest.skip("Skipping test due to sporadic requests.ReadTimeout") -# except requests.Timeout: -# pytest.skip("Skipping test due to sporadic requests.Timeout") +def pytest_runtest_call(item): + # Skip network-marked tests on transient request failures to keep CI stable. + if item.get_closest_marker("network") is None: + return + try: + item.runtest() + except requests.exceptions.RequestException as exc: + pytest.skip(f"Skipping network test due to request failure: {exc}") diff --git a/tests/ndarray/test_elementwise_funcs.py b/tests/ndarray/test_elementwise_funcs.py index 0a03a4a2..fdce9239 100644 --- a/tests/ndarray/test_elementwise_funcs.py +++ b/tests/ndarray/test_elementwise_funcs.py @@ -3,7 +3,6 @@ import numpy as np import pytest -import torch import blosc2 @@ -311,9 +310,10 @@ def test_unary_funcs(np_func, blosc_func, dtype, shape, chunkshape): @pytest.mark.parametrize(("np_func", "blosc_func"), UNARY_FUNC_PAIRS) @pytest.mark.parametrize("dtype", STR_DTYPES) @pytest.mark.parametrize("shape", [(10,), (20, 20)]) -@pytest.mark.parametrize("xp", [torch]) -def test_unfuncs_proxy(np_func, blosc_func, dtype, shape, xp): - _test_unary_func_proxy(np_func, blosc_func, dtype, shape, xp) +def test_unary_funcs_torch_proxy(np_func, blosc_func, dtype, shape): + """Test unary functions with torch tensors as input (via proxy).""" + torch = pytest.importorskip("torch") + _test_unary_func_proxy(np_func, blosc_func, dtype, shape, torch) @pytest.mark.heavy @@ -334,9 +334,10 @@ def test_binary_funcs(np_func, blosc_func, dtype, shape, chunkshape): @pytest.mark.parametrize(("np_func", "blosc_func"), BINARY_FUNC_PAIRS) @pytest.mark.parametrize("dtype", STR_DTYPES) @pytest.mark.parametrize(("shape", "chunkshape"), SHAPES_CHUNKS) -@pytest.mark.parametrize("xp", [torch]) -def test_binfuncs_proxy(np_func, blosc_func, dtype, shape, chunkshape, xp): - _test_binary_func_proxy(np_func, blosc_func, dtype, shape, chunkshape, xp) +def test_binary_funcs_torch_proxy(np_func, blosc_func, dtype, shape, chunkshape): + """Test binary functions with torch tensors as input (via proxy).""" + torch = pytest.importorskip("torch") + _test_binary_func_proxy(np_func, blosc_func, dtype, shape, chunkshape, torch) @pytest.mark.heavy diff --git a/tests/ndarray/test_lazyexpr.py b/tests/ndarray/test_lazyexpr.py index 88136602..2f441497 100644 --- a/tests/ndarray/test_lazyexpr.py +++ b/tests/ndarray/test_lazyexpr.py @@ -10,12 +10,20 @@ import numpy as np import pytest -import torch import blosc2 from blosc2.lazyexpr import ne_evaluate from blosc2.utils import get_chunks_idx, npvecdot +# Conditionally import torch for proxy tests +try: + import torch + + PROXY_TEST_XP = [torch, np] +except ImportError: + torch = None + PROXY_TEST_XP = [np] + NITEMS_SMALL = 100 NITEMS = 1000 @@ -172,7 +180,10 @@ def test_simple_expression(array_fixture): expr = a1 + a2 - a3 * a4 nres = ne_evaluate("na1 + na2 - na3 * na4") res = expr.compute(cparams=blosc2.CParams()) - np.testing.assert_allclose(res[:], nres) + if na1.dtype == np.float32: + np.testing.assert_allclose(res[:], nres, rtol=1e-6, atol=1e-6) + else: + np.testing.assert_allclose(res[:], nres) # Mix Proxy and NDArray operands @@ -197,10 +208,16 @@ def test_iXXX(array_fixture): expr **= 2.3 # __ipow__ res = expr.compute() if not blosc2.IS_WASM: - nres = ne_evaluate("(((((na1 ** 3 + na2 ** 2 + na3 ** 3 - na4 + 3) + 5) - 15) * 2) / 7) ** 2.3") + expr_str = "(((((na1 ** 3 + na2 ** 2 + na3 ** 3 - na4 + 3) + 5) - 15) * 2) / 7) ** 2.3" else: - nres = ne_evaluate("(((((na1 ** 3 + na2 ** 2 + na3 ** 3 - na4 + 3) + 5) - 15) * 2) / 7)") - np.testing.assert_allclose(res[:], nres) + expr_str = "(((((na1 ** 3 + na2 ** 2 + na3 ** 3 - na4 + 3) + 5) - 15) * 2) / 7)" + if na1.dtype == np.float32: + with np.errstate(invalid="ignore"): + nres = eval(expr_str, {"np": np}, {"na1": na1, "na2": na2, "na3": na3, "na4": na4}) + np.testing.assert_allclose(res[:], nres, rtol=1e-5, atol=1e-6) + else: + nres = ne_evaluate(expr_str) + np.testing.assert_allclose(res[:], nres) def test_complex_evaluate(array_fixture): @@ -209,7 +226,10 @@ def test_complex_evaluate(array_fixture): expr += 2 nres = ne_evaluate("tan(na1) * (sin(na2) * sin(na2) + cos(na3)) + (sqrt(na4) * 2) + 2") res = expr.compute() - np.testing.assert_allclose(res[:], nres) + if na1.dtype == np.float32: + np.testing.assert_allclose(res[:], nres, rtol=1e-5) + else: + np.testing.assert_allclose(res[:], nres) def test_complex_getitem(array_fixture): @@ -218,7 +238,10 @@ def test_complex_getitem(array_fixture): expr += 2 nres = ne_evaluate("tan(na1) * (sin(na2) * sin(na2) + cos(na3)) + (sqrt(na4) * 2) + 2") res = expr[:] - np.testing.assert_allclose(res, nres) + if na1.dtype == np.float32: + np.testing.assert_allclose(res[:], nres, rtol=1e-5) + else: + np.testing.assert_allclose(res[:], nres) def test_complex_getitem_slice(array_fixture): @@ -236,8 +259,11 @@ def test_func_expression(array_fixture): expr = (a1 + a2) * a3 - a4 expr = blosc2.sin(expr) + blosc2.cos(expr) nres = ne_evaluate("sin((na1 + na2) * na3 - na4) + cos((na1 + na2) * na3 - na4)") - res = expr.compute(storage={}) - np.testing.assert_allclose(res[:], nres) + res = expr.compute() + if na1.dtype == np.float32: + np.testing.assert_allclose(res[:], nres, rtol=1e-5) + else: + np.testing.assert_allclose(res[:], nres) def test_expression_with_constants(array_fixture): @@ -245,7 +271,11 @@ def test_expression_with_constants(array_fixture): # Test with operands with same chunks and blocks expr = a1 + 2 - a3 * 3.14 nres = ne_evaluate("na1 + 2 - na3 * 3.14") - np.testing.assert_allclose(expr[:], nres) + res = expr.compute() + if na1.dtype == np.float32: + np.testing.assert_allclose(res[:], nres, rtol=1e-5) + else: + np.testing.assert_allclose(res[:], nres) @pytest.mark.parametrize("compare_expressions", [True, False]) @@ -312,9 +342,9 @@ def test_functions(function, dtype_fixture, shape_fixture): expr_string = f"{function}(na1)" res_numexpr = ne_evaluate(expr_string) # Compare the results - np.testing.assert_allclose(res_lazyexpr[:], res_numexpr) - np.testing.assert_allclose(expr.slice(slice(0, 10, 1)), res_numexpr[:10]) # slice test - np.testing.assert_allclose(expr[:10], res_numexpr[:10]) # getitem test + np.testing.assert_allclose(res_lazyexpr[:], res_numexpr, rtol=1e-5) + np.testing.assert_allclose(expr.slice(slice(0, 10, 1)), res_numexpr[:10], rtol=1e-5) # slice test + np.testing.assert_allclose(expr[:10], res_numexpr[:10], rtol=1e-5) # getitem test # For some reason real and imag are not supported by numpy's assert_allclose # (TypeError: bad operand type for abs(): 'LazyExpr' and segfaults are observed) @@ -324,7 +354,7 @@ def test_functions(function, dtype_fixture, shape_fixture): # Using numpy functions expr = eval(f"np.{function}(a1)", {"a1": a1, "np": np}) # Compare the results - np.testing.assert_allclose(expr[()], res_numexpr) + np.testing.assert_allclose(expr[()], res_numexpr, rtol=1e-5) # In combination with other operands na2 = np.linspace(0, 10, nelems, dtype=dtype_fixture).reshape(shape_fixture) @@ -338,7 +368,11 @@ def test_functions(function, dtype_fixture, shape_fixture): expr_string = f"na1 + {function}(na2)" res_numexpr = ne_evaluate(expr_string) # Compare the results - np.testing.assert_allclose(res_lazyexpr[:], res_numexpr) + if function == "tan": + # tan in miniexpr has not a lot of precision for values that are close to 0 + np.testing.assert_allclose(res_lazyexpr[:], res_numexpr, rtol=5e-4) + else: + np.testing.assert_allclose(res_lazyexpr[:], res_numexpr, rtol=1e-5) # Functions of the form np.function(a1 + a2) expr = eval(f"np.{function}(a1 + a2)", {"a1": a1, "a2": a2, "np": np}) @@ -346,7 +380,7 @@ def test_functions(function, dtype_fixture, shape_fixture): expr_string = f"{function}(na1 + na2)" res_numexpr = ne_evaluate(expr_string) # Compare the results - np.testing.assert_allclose(expr[()], res_numexpr) + np.testing.assert_allclose(expr[()], res_numexpr, rtol=1e-5) @pytest.mark.parametrize( @@ -1843,7 +1877,7 @@ def test_lazyexpr_2args(): @pytest.mark.parametrize( "xp", - [torch, np], + PROXY_TEST_XP, ) @pytest.mark.parametrize( "dtype", diff --git a/tests/ndarray/test_lazyudf.py b/tests/ndarray/test_lazyudf.py index 29b44e39..4a66a19d 100644 --- a/tests/ndarray/test_lazyudf.py +++ b/tests/ndarray/test_lazyudf.py @@ -21,7 +21,13 @@ def udf1p(inputs_tuple, output, offset): if blosc2._HAS_NUMBA: import numba - @numba.jit(parallel=True) + # We should avoid parallel=True here because makes the complete test suite crash + # in test_save_ludf. I am not sure why, but it might be some interference with + # a previous test, leaving the threading state in a bad way. + # But all the examples and benchmarks seem to work with parallel=True. + # XXX Investigate more. + # @numba.jit(parallel=True) + @numba.jit(nopython=True) def udf1p_numba(inputs_tuple, output, offset): x = inputs_tuple[0] output[:] = x + 1 diff --git a/tests/ndarray/test_linalg.py b/tests/ndarray/test_linalg.py index 9c1a110d..33ff45d6 100644 --- a/tests/ndarray/test_linalg.py +++ b/tests/ndarray/test_linalg.py @@ -3,12 +3,20 @@ import numpy as np import pytest -import torch import blosc2 from blosc2.lazyexpr import linalg_funcs from blosc2.utils import npvecdot +# Conditionally import torch for proxy tests +try: + import torch + + PROXY_TEST_XP = [torch, np] +except ImportError: + torch = None + PROXY_TEST_XP = [np] + @pytest.mark.parametrize( ("ashape", "achunks", "ablocks"), @@ -826,7 +834,7 @@ def test_diagonal(shape, chunkshape, offset): @pytest.mark.parametrize( "xp", - [torch, np], + PROXY_TEST_XP, ) @pytest.mark.parametrize( "dtype", diff --git a/tests/ndarray/test_reductions.py b/tests/ndarray/test_reductions.py index e1bbfb22..054184ee 100644 --- a/tests/ndarray/test_reductions.py +++ b/tests/ndarray/test_reductions.py @@ -571,4 +571,4 @@ def test_reduce_string(): d = blosc2.lazyexpr("sl + c.sum() + a.std()", operands={"a": a, "c": c, "sl": a.slice((1, 1))}) sum = d.compute()[()] npsum = npa[1, 1] + np.sum(npc) + np.std(npa) - assert np.allclose(sum, npsum) + np.testing.assert_allclose(sum, npsum) diff --git a/tests/ndarray/test_setitem.py b/tests/ndarray/test_setitem.py index a2145b90..71137779 100644 --- a/tests/ndarray/test_setitem.py +++ b/tests/ndarray/test_setitem.py @@ -8,7 +8,6 @@ import numpy as np import pytest -import torch import blosc2 @@ -45,14 +44,6 @@ def test_setitem(shape, chunks, blocks, slices, dtype): nparray[slices] = val np.testing.assert_almost_equal(a[...], nparray) - # Object called via SimpleProxy - slice_shape = a[slices].shape - dtype_ = {np.float32: torch.float32, np.int32: torch.int32, np.float64: torch.float64}[dtype] - val = torch.ones(slice_shape, dtype=dtype_) - a[slices] = val - nparray[slices] = val - np.testing.assert_almost_equal(a[...], nparray) - # blosc2.NDArray if np.prod(slice_shape) == 1 or len(slice_shape) != len(blocks): chunks = None @@ -64,6 +55,22 @@ def test_setitem(shape, chunks, blocks, slices, dtype): np.testing.assert_almost_equal(a[...], nparray) +@pytest.mark.parametrize(argnames, argvalues) +def test_setitem_torch_proxy(shape, chunks, blocks, slices, dtype): + torch = pytest.importorskip("torch") + size = int(np.prod(shape)) + nparray = np.arange(size, dtype=dtype).reshape(shape) + a = blosc2.frombuffer(bytes(nparray), nparray.shape, dtype=dtype, chunks=chunks, blocks=blocks) + + # Object called via SimpleProxy (torch tensor) + slice_shape = a[slices].shape + dtype_ = {np.float32: torch.float32, np.int32: torch.int32, np.float64: torch.float64}[dtype] + val = torch.ones(slice_shape, dtype=dtype_) + a[slices] = val + nparray[slices] = val + np.testing.assert_almost_equal(a[...], nparray) + + @pytest.mark.parametrize( ("shape", "slices"), [