Skip to content

Commit cf4080d

Browse files
committed
✨ Using the Dask arrays is now optional.
1 parent 3165a7a commit cf4080d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+3685
-3104
lines changed

.pre-commit-config.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ repos:
1515
- id: trailing-whitespace
1616
exclude: conda/meta.yaml
1717
- repo: https://github.com/asottile/pyupgrade
18-
rev: "v3.3.1"
18+
rev: "v3.4.0"
1919
hooks:
2020
- id: pyupgrade
2121
args: [--py38-plus]
@@ -43,7 +43,7 @@ repos:
4343
additional_dependencies:
4444
- toml
4545
- repo: https://github.com/myint/docformatter
46-
rev: "v1.6.3"
46+
rev: "v1.6.5"
4747
hooks:
4848
- id: docformatter
4949
- repo: https://github.com/codespell-project/codespell

conftest.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
"""
1010

1111

12-
def pytest_addoption(parser):
12+
def pytest_addoption(parser) -> None:
1313
"""Add command line options to pytest."""
1414
parser.addoption(
1515
'--s3',

docs/source/api.rst

+13-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,19 @@ Merging of existing datasets in a partition.
3131
zcollection.merging.time_series
3232
zcollection.merging.period
3333

34+
Variable
35+
========
36+
37+
Variables handled by the datasets. These objects manage access to the data
38+
stored in the collection.
39+
40+
.. autosummary::
41+
:toctree: _generated/
42+
43+
zcollection.variable.abc
44+
zcollection.variable.array
45+
zcollection.variable.delayed_array
46+
3447
Collection
3548
==========
3649

@@ -45,7 +58,6 @@ Collection
4558
zcollection.meta
4659
zcollection.sync
4760
zcollection.type_hints
48-
zcollection.variable
4961
zcollection.view
5062

5163
Indexing

examples/ex_collection.py

+40-25
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
"""
88
from __future__ import annotations
99

10+
from typing import Iterator
1011
import datetime
1112
import pprint
1213

@@ -23,19 +24,21 @@
2324
# ---------------------------------
2425
#
2526
# Before we create our first collection, we will create a dataset to record.
26-
def create_dataset():
27+
def create_dataset() -> zcollection.Dataset:
2728
"""Create a dataset to record."""
28-
generator = zcollection.tests.data.create_test_dataset_with_fillvalue()
29+
generator: Iterator[zcollection.Dataset] = \
30+
zcollection.tests.data.create_test_dataset_with_fillvalue()
2931
return next(generator)
3032

3133

32-
ds = create_dataset()
33-
ds.to_xarray()
34+
zds: zcollection.Dataset | None = create_dataset()
35+
assert zds is not None
36+
zds.to_xarray()
3437

3538
# %%
3639
# We will create the file system that we will use. In this example, a file
3740
# system in memory.
38-
fs = fsspec.filesystem('memory')
41+
fs: fsspec.AbstractFileSystem = fsspec.filesystem('memory')
3942

4043
# %%
4144
# Finally we create a local dask cluster using only threads in order to work
@@ -54,11 +57,8 @@ def create_dataset():
5457

5558
# %%
5659
# Finally, we create our collection:
57-
collection = zcollection.create_collection('time',
58-
ds,
59-
partition_handler,
60-
'/my_collection',
61-
filesystem=fs)
60+
collection: zcollection.Collection = zcollection.create_collection(
61+
'time', zds, partition_handler, '/my_collection', filesystem=fs)
6262

6363
# %%
6464
# .. note::
@@ -75,7 +75,7 @@ def create_dataset():
7575

7676
# %%
7777
# Now that the collection has been created, we can insert new records.
78-
collection.insert(ds)
78+
collection.insert(zds)
7979

8080
# %%
8181
# .. note::
@@ -103,9 +103,15 @@ def create_dataset():
103103
# To load the dataset call the method
104104
# :py:meth:`load<zcollection.collection.Collection.load>` on the instance. By
105105
# default, the method loads all partitions stored in the collection.
106-
collection.load()
106+
collection.load(delayed=True)
107107

108108
# %%
109+
# .. note::
110+
#
111+
# By default, the data is loaded as a :py:class:`dask.array<da.Array>`. It is
112+
# possible to load the data as a :py:class:`numpy.ndarray` by specifying the
113+
# parameter ``delayed=False``.
114+
#
109115
# You can also filter the partitions to be considered by filtering the
110116
# partitions using keywords used for partitioning in a valid Python expression.
111117
collection.load(filters='year == 2000 and month == 2')
@@ -145,13 +151,13 @@ def create_dataset():
145151
# %%
146152
# The :py:meth:`add_variable<zcollection.collection.Collection.add_variable>`
147153
# method allows you to add a new variable to the collection.
148-
collection.add_variable(ds.metadata().variables['var2'])
154+
collection.add_variable(zds.metadata().variables['var2'])
149155

150156
# %%
151157
# The newly created variable is initialized with its default value.
152-
ds = collection.load()
153-
assert ds is not None
154-
ds.variables['var2'].values
158+
zds = collection.load()
159+
assert zds is not None
160+
zds.variables['var2'].values
155161

156162

157163
# %%
@@ -161,19 +167,28 @@ def create_dataset():
161167
#
162168
# In this example, we will alter the variable ``var2`` by setting it to 1
163169
# anywhere the variable ``var1`` is defined.
164-
def ones(ds):
170+
def ones(zds) -> dict[str, numpy.ndarray]:
165171
"""Returns a variable with ones everywhere."""
166-
return dict(var2=ds.variables['var1'].values * 0 + 1)
172+
return dict(var2=zds.variables['var1'].values * 0 + 1)
167173

168174

169175
collection.update(ones) # type: ignore[arg-type]
170176

171-
ds = collection.load()
172-
assert ds is not None
173-
ds.variables['var2'].values
177+
zds = collection.load()
178+
assert zds is not None
179+
zds.variables['var2'].values
174180

175181

176182
# %%
183+
# ..note::
184+
#
185+
# The method :py:meth:`update<zcollection.collection.Collection.update>`
186+
# supports the ``delayed`` parameter. If ``delayed=True``, the function
187+
# ``ones`` is applied to each partition using a Dask array as container
188+
# for the variables data stored in the provided dataset. This is the default
189+
# behavior. If ``delayed=False``, the function ``ones`` is applied to each
190+
# partition using a Numpy array as container.
191+
#
177192
# Sometime is it important to know the values of the neighboring partitions.
178193
# This can be done using the
179194
# :py:meth:`update<zcollection.collection.Collection.update>` method with the
@@ -188,7 +203,7 @@ def ones(ds):
188203
# start of the slice is 0, it means that the left partition is missing. If the
189204
# stop of the slice is equal to the length of the given dataset, it means that
190205
# the right partition is missing.
191-
def twos(ds, partition_info: tuple[str, slice]):
206+
def twos(ds, partition_info: tuple[str, slice]) -> dict[str, numpy.ndarray]:
192207
"""Returns a variable with twos everywhere if the partition is surrounded
193208
by partitions on both sides, -1 if the left partition is missing and -2 if
194209
the right partition is missing."""
@@ -206,9 +221,9 @@ def twos(ds, partition_info: tuple[str, slice]):
206221

207222
collection.update(twos, depth=1) # type: ignore[arg-type]
208223

209-
ds = collection.load()
210-
assert ds is not None
211-
ds.variables['var2'].values
224+
zds = collection.load()
225+
assert zds is not None
226+
zds.variables['var2'].values
212227

213228
# %%
214229
# Map a function over the collection

examples/ex_indexing.py

+23-17
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
import zcollection
1616
import zcollection.indexing
17-
import zcollection.tests.data
17+
import zcollection.partitioning.tests.data
1818

1919
# %%
2020
# Initialization of the environment
@@ -32,18 +32,18 @@
3232
#
3333
# For this latest example, we will index another data set. This one contains
3434
# measurements of a fictitious satellite on several half-orbits.
35-
ds = zcollection.Dataset.from_xarray(
36-
zcollection.tests.data.create_test_sequence(5, 20, 10))
37-
ds
35+
zds: zcollection.Dataset = zcollection.Dataset.from_xarray(
36+
zcollection.partitioning.tests.data.create_test_sequence(5, 20, 10))
37+
print(zds)
3838

3939
# %%
40-
collection = zcollection.create_collection(
40+
collection: zcollection.Collection = zcollection.create_collection(
4141
'time',
42-
ds,
42+
zds,
4343
zcollection.partitioning.Date(('time', ), 'M'),
4444
partition_base_dir='/one_other_collection',
4545
filesystem=fs)
46-
collection.insert(ds, merge_callable=zcollection.merging.merge_time_series)
46+
collection.insert(zds, merge_callable=zcollection.merging.merge_time_series)
4747

4848
# %%
4949
# Here we have created a collection partitioned by month.
@@ -87,7 +87,7 @@ def split_half_orbit(
8787
# Now we will compute these constant parts from a dataset contained in a
8888
# partition.
8989
def _half_orbit(
90-
ds: zcollection.Dataset,
90+
zds: zcollection.Dataset,
9191
*args,
9292
**kwargs,
9393
) -> numpy.ndarray:
@@ -100,8 +100,8 @@ def _half_orbit(
100100
"""
101101
pass_number_varname = kwargs.pop('pass_number', 'pass_number')
102102
cycle_number_varname = kwargs.pop('cycle_number', 'cycle_number')
103-
pass_number = ds.variables[pass_number_varname].values
104-
cycle_number = ds.variables[cycle_number_varname].values
103+
pass_number = zds.variables[pass_number_varname].values
104+
cycle_number = zds.variables[cycle_number_varname].values
105105

106106
generator = ((
107107
i0,
@@ -141,7 +141,7 @@ def dtype(cls, /, **kwargs) -> List[Tuple[str, str]]:
141141
def create(
142142
cls,
143143
path: Union[pathlib.Path, str],
144-
ds: zcollection.Collection,
144+
zds: zcollection.Collection,
145145
filesystem: Optional[fsspec.AbstractFileSystem] = None,
146146
**kwargs,
147147
) -> 'HalfOrbitIndexer':
@@ -155,13 +155,13 @@ def create(
155155
The created index.
156156
"""
157157
return super()._create(path,
158-
ds,
158+
zds,
159159
meta=dict(attribute=b'value'),
160160
filesystem=filesystem) # type: ignore
161161

162162
def update(
163163
self,
164-
ds: zcollection.Collection,
164+
zds: zcollection.Collection,
165165
partition_size: Optional[int] = None,
166166
npartitions: Optional[int] = None,
167167
**kwargs,
@@ -177,15 +177,18 @@ def update(
177177
pass_number: The name of the pass number variable stored in the
178178
collection. Defaults to "pass_number".
179179
"""
180-
super()._update(ds, _half_orbit, partition_size, npartitions, **kwargs)
180+
super()._update(zds, _half_orbit, partition_size, npartitions,
181+
**kwargs)
181182

182183

183184
# %%
184185
# Using the index
185186
# ---------------
186187
#
187188
# Now we can create our index and fill it.
188-
indexer = HalfOrbitIndexer.create('/index.parquet', collection, filesystem=fs)
189+
indexer: HalfOrbitIndexer = HalfOrbitIndexer.create('/index.parquet',
190+
collection,
191+
filesystem=fs)
189192
indexer.update(collection)
190193

191194
# The following command allows us to view the information stored in our index:
@@ -195,9 +198,12 @@ def update(
195198

196199
# %%
197200
# This index can now be used to load a part of a collection.
198-
selection = collection.load(indexer=indexer.query(dict(pass_number=[1, 2])))
201+
selection: zcollection.Dataset | None = collection.load(
202+
indexer=indexer.query(dict(pass_number=[1, 2])),
203+
delayed=False,
204+
)
199205
assert selection is not None
200-
selection.to_xarray().compute()
206+
selection.to_xarray()
201207

202208
# %%
203209
# Close the local cluster to avoid printing warning messages in the other

0 commit comments

Comments
 (0)