Skip to content

Commit

Permalink
toggle for (concurrent) monthly ERA5 requests; progressbar disabled b…
Browse files Browse the repository at this point in the history
…y default (#372)

* add toggle to request in annual or monthly chunks

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* pass monthy_requests correctly to retrieval_times; skip static

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* add release note [no ci]

* add option to post monthly concurrent requests

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* add release note [no ci]

* cutout_prepare: add option to skip progressbar, and pass dask_kwargs

* default to no progressbar

* add test for monthly and concurrent requests

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
fneum and pre-commit-ci[bot] authored Aug 23, 2024
1 parent 55098dd commit 0b0933f
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 23 deletions.
10 changes: 9 additions & 1 deletion RELEASE_NOTES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,17 @@ Release Notes
Upcoming Release
================

- Adds option to toggle whether ERA5 downloads are requested in monthly or
annual chunks with keyword argument ``cutout.prepare(monthly_requests=True)``.
The default is now annual requests. The monthly requests can also be posted
concurrently using ``cutout.prepare(monthly_requests=True,
concurrent_requests=True)``.

- Improved parallelization of ``atlite.convert.build_line_rating`` by adding
keyword arguments for ``dask.compute`` (``dask_kwargs={}``) and an option to
disable the progressbar (``show_progressbar=True``).
disable the progressbar (``show_progress=False``).

- Default to ``show_progress=False`` for performance reasons.

Version 0.2.13
==============
Expand Down
10 changes: 5 additions & 5 deletions atlite/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def convert_and_aggregate(
return_capacity=False,
capacity_factor=False,
capacity_factor_timeseries=False,
show_progress=True,
show_progress=False,
dask_kwargs={},
**convert_kwds,
):
Expand Down Expand Up @@ -91,7 +91,7 @@ def convert_and_aggregate(
capacity_factor_timeseries : boolean
If True, the capacity factor time series of the chosen resource for
each grid cell is computed.
show_progress : boolean, default True
show_progress : boolean, default False
Whether to show a progress bar.
dask_kwargs : dict, default {}
Dict with keyword arguments passed to `dask.compute`.
Expand Down Expand Up @@ -882,7 +882,7 @@ def hydro(
hydrobasins,
flowspeed=1,
weight_with_height=False,
show_progress=True,
show_progress=False,
**kwargs,
):
"""
Expand Down Expand Up @@ -1047,7 +1047,7 @@ def convert_line_rating(


def line_rating(
cutout, shapes, line_resistance, show_progress=True, dask_kwargs={}, **params
cutout, shapes, line_resistance, show_progress=False, dask_kwargs={}, **params
):
"""
Create a dynamic line rating time series based on the IEEE-738 standard.
Expand All @@ -1073,7 +1073,7 @@ def line_rating(
line_resistance : float/series
Resistance of the lines in Ohm/meter. Alternatively in p.u. system in
Ohm/1000km (see example below).
show_progress : boolean, default True
show_progress : boolean, default False
Whether to show a progress bar.
dask_kwargs : dict, default {}
Dict with keyword arguments passed to `dask.compute`.
Expand Down
51 changes: 46 additions & 5 deletions atlite/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,14 @@
from atlite.datasets import modules as datamodules


def get_features(cutout, module, features, tmpdir=None):
def get_features(
cutout,
module,
features,
tmpdir=None,
monthly_requests=False,
concurrent_requests=False,
):
"""
Load the feature data for a given module.
Expand All @@ -39,7 +46,13 @@ def get_features(cutout, module, features, tmpdir=None):

for feature in features:
feature_data = delayed(get_data)(
cutout, feature, tmpdir=tmpdir, lock=lock, **parameters
cutout,
feature,
tmpdir=tmpdir,
lock=lock,
monthly_requests=monthly_requests,
concurrent_requests=concurrent_requests,
**parameters,
)
datasets.append(feature_data)

Expand Down Expand Up @@ -115,6 +128,10 @@ def cutout_prepare(
tmpdir=None,
overwrite=False,
compression={"zlib": True, "complevel": 9, "shuffle": True},
show_progress=False,
dask_kwargs=None,
monthly_requests=False,
concurrent_requests=False,
):
"""
Prepare all or a selection of features in a cutout.
Expand Down Expand Up @@ -147,12 +164,26 @@ def cutout_prepare(
To efficiently reduce cutout sizes, specify the number of 'least_significant_digits': n here.
To disable compression, set "complevel" to None.
Default is {'zlib': True, 'complevel': 9, 'shuffle': True}.
show_progress : bool, optional
If True, a progress bar is shown. The default is False.
dask_kwargs : dict, default {}
Dict with keyword arguments passed to `dask.compute`.
monthly_requests : bool, optional
If True, the data is requested on a monthly basis in ERA5. This is useful for
large cutouts, where the data is requested in smaller chunks. The
default is False
concurrent_requests : bool, optional
If True, the monthly data requests are posted concurrently.
Only has an effect if `monthly_requests` is True. The default is False.
Returns
-------
cutout : atlite.Cutout
Cutout with prepared data. The variables are stored in `cutout.data`.
"""
if dask_kwargs is None:
dask_kwargs = {}

if cutout.prepared and not overwrite:
logger.info("Cutout already prepared.")
return cutout
Expand All @@ -174,7 +205,14 @@ def cutout_prepare(
continue
logger.info(f"Calculating and writing with module {module}:")
missing_features = missing_vars.index.unique("feature")
ds = get_features(cutout, module, missing_features, tmpdir=tmpdir)
ds = get_features(
cutout,
module,
missing_features,
tmpdir=tmpdir,
monthly_requests=monthly_requests,
concurrent_requests=concurrent_requests,
)
prepared |= set(missing_features)

cutout.data.attrs.update(dict(prepared_features=list(prepared)))
Expand All @@ -198,8 +236,11 @@ def cutout_prepare(
# Delayed writing for large cutout
# cf. https://stackoverflow.com/questions/69810367/python-how-to-write-large-netcdf-with-xarray
write_job = ds.to_netcdf(tmp, compute=False)
with ProgressBar():
write_job.compute()
if show_progress:
with ProgressBar(minimum=2):
write_job.compute(**dask_kwargs)
else:
write_job.compute(**dask_kwargs)
if cutout.path.exists():
cutout.data.close()
cutout.path.unlink()
Expand Down
49 changes: 42 additions & 7 deletions atlite/datasets/era5.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import numpy as np
import pandas as pd
import xarray as xr
from dask import compute, delayed
from numpy import atleast_1d

from atlite.gis import maybe_swap_spatial_dims
Expand Down Expand Up @@ -274,7 +275,7 @@ def _area(coords):
return [y1, x0, y0, x1]


def retrieval_times(coords, static=False):
def retrieval_times(coords, static=False, monthly_requests=False):
"""
Get list of retrieval cdsapi arguments for time dimension in coordinates.
Expand All @@ -287,6 +288,11 @@ def retrieval_times(coords, static=False):
Parameters
----------
coords : atlite.Cutout.coords
static : bool, optional
monthly_requests : bool, optional
If True, the data is requested on a monthly basis. This is useful for
large cutouts, where the data is requested in smaller chunks. The
default is False
Returns
-------
Expand All @@ -305,12 +311,21 @@ def retrieval_times(coords, static=False):
times = []
for year in time.year.unique():
t = time[time.year == year]
for month in t.month.unique():
if monthly_requests:
for month in t.month.unique():
query = {
"year": str(year),
"month": str(month),
"day": list(t[t.month == month].day.unique()),
"time": ["%02d:00" % h for h in t[t.month == month].hour.unique()],
}
times.append(query)
else:
query = {
"year": str(year),
"month": str(month),
"day": list(t[t.month == month].day.unique()),
"time": ["%02d:00" % h for h in t[t.month == month].hour.unique()],
"month": list(t.month.unique()),
"day": list(t.day.unique()),
"time": ["%02d:00" % h for h in t.hour.unique()],
}
times.append(query)
return times
Expand Down Expand Up @@ -377,7 +392,15 @@ def retrieve_data(product, chunks=None, tmpdir=None, lock=None, **updates):
return ds


def get_data(cutout, feature, tmpdir, lock=None, **creation_parameters):
def get_data(
cutout,
feature,
tmpdir,
lock=None,
monthly_requests=False,
concurrent_requests=False,
**creation_parameters,
):
"""
Retrieve data from ECMWFs ERA5 dataset (via CDS).
Expand All @@ -392,6 +415,13 @@ def get_data(cutout, feature, tmpdir, lock=None, **creation_parameters):
`atlite.datasets.era5.features`
tmpdir : str/Path
Directory where the temporary netcdf files are stored.
monthly_requests : bool, optional
If True, the data is requested on a monthly basis in ERA5. This is useful for
large cutouts, where the data is requested in smaller chunks. The
default is False
concurrent_requests : bool, optional
If True, the monthly data requests are posted concurrently.
Only has an effect if `monthly_requests` is True.
**creation_parameters :
Additional keyword arguments. The only effective argument is 'sanitize'
(default True) which sets sanitization of the data on or off.
Expand Down Expand Up @@ -428,6 +458,11 @@ def retrieve_once(time):
if feature in static_features:
return retrieve_once(retrieval_times(coords, static=True)).squeeze()

datasets = map(retrieve_once, retrieval_times(coords))
time_chunks = retrieval_times(coords, monthly_requests=monthly_requests)
if concurrent_requests:
delayed_datasets = [delayed(retrieve_once)(chunk) for chunk in time_chunks]
datasets = compute(*delayed_datasets)
else:
datasets = map(retrieve_once, time_chunks)

return xr.concat(datasets, dim="time").sel(time=coords["time"])
13 changes: 12 additions & 1 deletion atlite/datasets/gebco.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,14 @@ def get_data_gebco_height(xs, ys, gebco_path):
)


def get_data(cutout, feature, tmpdir, **creation_parameters):
def get_data(
cutout,
feature,
tmpdir,
monthly_requests=False,
concurrent_requests=False,
**creation_parameters
):
"""
Get the gebco height data.
Expand All @@ -56,6 +63,10 @@ def get_data(cutout, feature, tmpdir, **creation_parameters):
Takes no effect, only here for consistency with other dataset modules.
tmpdir : str
Takes no effect, only here for consistency with other dataset modules.
monthly_requests : bool
Takes no effect, only here for consistency with other dataset modules.
concurrent_requests : bool
Takes no effect, only here for consistency with other dataset modules.
**creation_parameters :
Must include `gebco_path`.
Expand Down
8 changes: 7 additions & 1 deletion atlite/datasets/sarah.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ def hourly_mean(ds):
return ds


def get_data(cutout, feature, tmpdir, lock=None, **creation_parameters):
def get_data(
cutout, feature, tmpdir, lock=None, monthly_requests=False, **creation_parameters
):
"""
Load stored SARAH data and reformat to matching the given cutout.
Expand All @@ -173,6 +175,10 @@ def get_data(cutout, feature, tmpdir, lock=None, **creation_parameters):
feature : str
Name of the feature data to retrieve. Must be in
`atlite.datasets.sarah.features`
monthly_requests : bool
Takes no effect, only here for consistency with other dataset modules.
concurrent_requests : bool
Takes no effect, only here for consistency with other dataset modules.
**creation_parameters :
Mandatory arguments are:
* 'sarah_dir', str. Directory of the stored SARAH data.
Expand Down
2 changes: 1 addition & 1 deletion atlite/gis.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ def _process_func(i):


def compute_availabilitymatrix(
cutout, shapes, excluder, nprocesses=None, disable_progressbar=False
cutout, shapes, excluder, nprocesses=None, disable_progressbar=True
):
"""
Compute the eligible share within cutout cells in the overlap with shapes.
Expand Down
4 changes: 2 additions & 2 deletions atlite/hydro.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def find_upstream_basins(meta, hid):
return hids


def determine_basins(plants, hydrobasins, show_progress=True):
def determine_basins(plants, hydrobasins, show_progress=False):
if isinstance(hydrobasins, str):
hydrobasins = gpd.read_file(hydrobasins)

Expand Down Expand Up @@ -81,7 +81,7 @@ def determine_basins(plants, hydrobasins, show_progress=True):


def shift_and_aggregate_runoff_for_plants(
basins, runoff, flowspeed=1, show_progress=True
basins, runoff, flowspeed=1, show_progress=False
):
inflow = xr.DataArray(
np.zeros((len(basins.plants), runoff.indexes["time"].size)),
Expand Down
7 changes: 7 additions & 0 deletions test/test_preparation_and_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,13 @@ def test_pv_era5_and_era5t(tmp_path_factory):
cutout, time=str(first_day_prev_month), skip_optimal_sum_test=True
)

@staticmethod
@pytest.mark.parametrize("concurrent_requests", [True, False])
def test_era5_monthly_requests(tmp_path_factory, concurrent_requests):
tmp_path = tmp_path_factory.mktemp("era5_mon")
cutout = Cutout(path=tmp_path / "era5", module="era5", bounds=BOUNDS, time=TIME)
cutout.prepare(monthly_requests=True, concurrent_requests=concurrent_requests)

@staticmethod
def test_wind_era5(cutout_era5):
return wind_test(cutout_era5)
Expand Down

0 comments on commit 0b0933f

Please sign in to comment.