Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concatenate arrays with varchunks #374

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

ivirshup
Copy link
Contributor

@ivirshup ivirshup commented Oct 16, 2023

Allow kerchunk based concatenation of zarr arrays with variable length chunks. This is mostly to allow me to play around with some downstream usecases.

This is basically feature complete at the moment. Remaining tasks are largely polish (error handling, further testing) or upstream (ZEP 3 approval)

Works off of zarr-developers/zarr-python#1483

TODO

Upstream tasks

  • Get the zarr PR merged (😉)
  • Make work for v3 (downstream of Kerchunk and Zarr V3 #235)
    • Figure out if this should ONLY work for v3

This PR

  • Allow mixed input chunk types (but only if there is no remainder for any fixed length chunks)
    • The more I think about allowing extra values in any of the variable length chunks the less I want to support it
  • Better tests
    • Parameterize success cases
    • Parameterize failure cases
  • Potentially allow going from all fixed chunks to variable chunks
  • Better error messages, we have far more limited expectations for chunking on the concatenation dimension in this case

@martindurant
Copy link
Member

I would really like to see some success examples, even if based on POC on POCs, to help justify the whole idea!

One thing I have been meaning to check: I believe that passing a zarr array with complex chunks to dask will do the right thing, since it just reads the .chunks attribute, which is already int he right format. This should be tested.

@ivirshup
Copy link
Contributor Author

Demo gist

I've got virtual concatenation of sparse arrays working. Dataframes should be easier. Unfortunately I can't use this with existing stores (data has to be rewritten) since the chunk boundaries need to be exact.

Dask does not seem to want to work immediately based on:

https://github.com/dask/dask/blob/a6be172ecdddb7a16c923190a561cb8fc88bcf21/dask/array/core.py#L3097-L3098

traceback
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[21], line 1
----> 1 da.from_zarr(result_group["data"], chunks=result_group["data"].chunks)

File ~/miniforge3/envs/variable-chunks/lib/python3.11/site-packages/dask/array/core.py:3600, in from_zarr(url, component, storage_options, chunks, name, inline_array, **kwargs)
   3598 if name is None:
   3599     name = "from-zarr-" + tokenize(z, component, storage_options, chunks, **kwargs)
-> 3600 return from_array(z, chunks, name=name, inline_array=inline_array)

File ~/miniforge3/envs/variable-chunks/lib/python3.11/site-packages/dask/array/core.py:3483, in from_array(x, chunks, name, lock, asarray, fancy, getitem, meta, inline_array)
   3479     asarray = not hasattr(x, "__array_function__")
   3481 previous_chunks = getattr(x, "chunks", None)
-> 3483 chunks = normalize_chunks(
   3484     chunks, x.shape, dtype=x.dtype, previous_chunks=previous_chunks
   3485 )
   3487 if name in (None, True):
   3488     token = tokenize(x, chunks, lock, asarray, fancy, getitem, inline_array)

File ~/miniforge3/envs/variable-chunks/lib/python3.11/site-packages/dask/array/core.py:3098, in normalize_chunks(chunks, shape, limit, dtype, previous_chunks)
   3095     chunks = auto_chunks(chunks, shape, limit, dtype, previous_chunks)
   3097 if shape is not None:
-> 3098     chunks = tuple(c if c not in {None, -1} else s for c, s in zip(chunks, shape))
   3100 if chunks and shape is not None:
   3101     chunks = sum(
   3102         (
   3103             blockdims_from_blockshape((s,), (c,))
   (...)
   3108         (),
   3109     )

File ~/miniforge3/envs/variable-chunks/lib/python3.11/site-packages/dask/array/core.py:3098, in <genexpr>(.0)
   3095     chunks = auto_chunks(chunks, shape, limit, dtype, previous_chunks)
   3097 if shape is not None:
-> 3098     chunks = tuple(c if c not in {None, -1} else s for c, s in zip(chunks, shape))
   3100 if chunks and shape is not None:
   3101     chunks = sum(
   3102         (
   3103             blockdims_from_blockshape((s,), (c,))
   (...)
   3108         (),
   3109     )

TypeError: unhashable type: 'list'

But we can do something like:

da.from_array(
    zarr_array, 
    chunks=tuple(tuple(c) if isinstance(c, list) else c for c in zarr_array.chunks))
)

@martindurant
Copy link
Member

I was just looking at normalize_chunks, since it turns out it gets called a lot when constructing an xr.DataSet with chunks={}. It should not be called... at all, for the case where there is no rechunking requested (and in any case, xarray should only call it when a variable is accessed, as currently happens on the non-dask path).
Each call of normalize_chunks is only 10ms, all the time in the final line tuple(tuple(..) for ..) line, but for a dataset with many variables, it adds up.

(cc @rsignell-usgs )

@martindurant
Copy link
Member

dask/dask#10579

@NikosAlexandris
Copy link

Demo gist

I've got virtual concatenation of sparse arrays working. Dataframes should be easier. Unfortunately I can't use this with existing stores (data has to be rewritten) since the chunk boundaries need to be exact.

Dask does not seem to want to work immediately based on:

https://github.com/dask/dask/blob/a6be172ecdddb7a16c923190a561cb8fc88bcf21/dask/array/core.py#L3097-L3098
traceback

But we can do something like:

da.from_array(
    zarr_array, 
    chunks=tuple(tuple(c) if isinstance(c, list) else c for c in zarr_array.chunks))
)

I can run the example from the gist. Now trying to understand how I can adapt this to my use-case.

@ivirshup What does indptr(s) mean ? Kindest request, please spell out names to make them easy to grasp.

@ivirshup
Copy link
Contributor Author

@NikosAlexandris, that's the indptr array from a CSR or CSC matrix. They are offsets into the indices and data arrays. Was this your question? Also maybe better to discuss in comments on the gist?

@ivirshup
Copy link
Contributor Author

ivirshup commented Oct 31, 2023

Updated to allow inference of variable chunked output from input with fixed chunking. E.g. can now concatenate arrays like:

[zarr.ones(4, chunks=(2,)), zarr.ones(3, chunks=(3,))]
# result has chunking ([2, 2, 3],)

Should there be a switch so users can turn this off? It would probably be better to error on this input if you know downstream consumers won't be able to handle the output.

@tinaok
Copy link
Contributor

tinaok commented Nov 17, 2023

It would fix my problem if this works!! I tried to apply your approach but I think I missed something and I can not apply it to my workflow...@martindurant any thought?
https://gist.github.com/tinaok/d232cb7b9f31fd0cee26ce7c3c865958

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

concatenate_arrays with (slightly) different array shapes
4 participants