diff --git a/RELEASE_NOTES.rst b/RELEASE_NOTES.rst index 9d9b5dc4..b348d026 100755 --- a/RELEASE_NOTES.rst +++ b/RELEASE_NOTES.rst @@ -11,9 +11,17 @@ Release Notes Upcoming Release ================ +- Adds option to toggle whether ERA5 downloads are requested in monthly or + annual chunks with keyword argument ``cutout.prepare(monthly_requests=True)``. + The default is now annual requests. The monthly requests can also be posted + concurrently using ``cutout.prepare(monthly_requests=True, + concurrent_requests=True)``. + - Improved parallelization of ``atlite.convert.build_line_rating`` by adding keyword arguments for ``dask.compute`` (``dask_kwargs={}``) and an option to - disable the progressbar (``show_progressbar=True``). + disable the progressbar (``show_progress=False``). + +- Default to ``show_progress=False`` for performance reasons. Version 0.2.13 ============== diff --git a/atlite/convert.py b/atlite/convert.py index f46156c8..97e354b8 100644 --- a/atlite/convert.py +++ b/atlite/convert.py @@ -51,7 +51,7 @@ def convert_and_aggregate( return_capacity=False, capacity_factor=False, capacity_factor_timeseries=False, - show_progress=True, + show_progress=False, dask_kwargs={}, **convert_kwds, ): @@ -91,7 +91,7 @@ def convert_and_aggregate( capacity_factor_timeseries : boolean If True, the capacity factor time series of the chosen resource for each grid cell is computed. - show_progress : boolean, default True + show_progress : boolean, default False Whether to show a progress bar. dask_kwargs : dict, default {} Dict with keyword arguments passed to `dask.compute`. @@ -882,7 +882,7 @@ def hydro( hydrobasins, flowspeed=1, weight_with_height=False, - show_progress=True, + show_progress=False, **kwargs, ): """ @@ -1047,7 +1047,7 @@ def convert_line_rating( def line_rating( - cutout, shapes, line_resistance, show_progress=True, dask_kwargs={}, **params + cutout, shapes, line_resistance, show_progress=False, dask_kwargs={}, **params ): """ Create a dynamic line rating time series based on the IEEE-738 standard. @@ -1073,7 +1073,7 @@ def line_rating( line_resistance : float/series Resistance of the lines in Ohm/meter. Alternatively in p.u. system in Ohm/1000km (see example below). - show_progress : boolean, default True + show_progress : boolean, default False Whether to show a progress bar. dask_kwargs : dict, default {} Dict with keyword arguments passed to `dask.compute`. diff --git a/atlite/data.py b/atlite/data.py index 34620d3c..5e92b0e3 100644 --- a/atlite/data.py +++ b/atlite/data.py @@ -25,7 +25,14 @@ from atlite.datasets import modules as datamodules -def get_features(cutout, module, features, tmpdir=None): +def get_features( + cutout, + module, + features, + tmpdir=None, + monthly_requests=False, + concurrent_requests=False, +): """ Load the feature data for a given module. @@ -39,7 +46,13 @@ def get_features(cutout, module, features, tmpdir=None): for feature in features: feature_data = delayed(get_data)( - cutout, feature, tmpdir=tmpdir, lock=lock, **parameters + cutout, + feature, + tmpdir=tmpdir, + lock=lock, + monthly_requests=monthly_requests, + concurrent_requests=concurrent_requests, + **parameters, ) datasets.append(feature_data) @@ -115,6 +128,10 @@ def cutout_prepare( tmpdir=None, overwrite=False, compression={"zlib": True, "complevel": 9, "shuffle": True}, + show_progress=False, + dask_kwargs=None, + monthly_requests=False, + concurrent_requests=False, ): """ Prepare all or a selection of features in a cutout. @@ -147,12 +164,26 @@ def cutout_prepare( To efficiently reduce cutout sizes, specify the number of 'least_significant_digits': n here. To disable compression, set "complevel" to None. Default is {'zlib': True, 'complevel': 9, 'shuffle': True}. + show_progress : bool, optional + If True, a progress bar is shown. The default is False. + dask_kwargs : dict, default {} + Dict with keyword arguments passed to `dask.compute`. + monthly_requests : bool, optional + If True, the data is requested on a monthly basis in ERA5. This is useful for + large cutouts, where the data is requested in smaller chunks. The + default is False + concurrent_requests : bool, optional + If True, the monthly data requests are posted concurrently. + Only has an effect if `monthly_requests` is True. The default is False. Returns ------- cutout : atlite.Cutout Cutout with prepared data. The variables are stored in `cutout.data`. """ + if dask_kwargs is None: + dask_kwargs = {} + if cutout.prepared and not overwrite: logger.info("Cutout already prepared.") return cutout @@ -174,7 +205,14 @@ def cutout_prepare( continue logger.info(f"Calculating and writing with module {module}:") missing_features = missing_vars.index.unique("feature") - ds = get_features(cutout, module, missing_features, tmpdir=tmpdir) + ds = get_features( + cutout, + module, + missing_features, + tmpdir=tmpdir, + monthly_requests=monthly_requests, + concurrent_requests=concurrent_requests, + ) prepared |= set(missing_features) cutout.data.attrs.update(dict(prepared_features=list(prepared))) @@ -198,8 +236,11 @@ def cutout_prepare( # Delayed writing for large cutout # cf. https://stackoverflow.com/questions/69810367/python-how-to-write-large-netcdf-with-xarray write_job = ds.to_netcdf(tmp, compute=False) - with ProgressBar(): - write_job.compute() + if show_progress: + with ProgressBar(minimum=2): + write_job.compute(**dask_kwargs) + else: + write_job.compute(**dask_kwargs) if cutout.path.exists(): cutout.data.close() cutout.path.unlink() diff --git a/atlite/datasets/era5.py b/atlite/datasets/era5.py index 4e4e2337..4c1f04d4 100644 --- a/atlite/datasets/era5.py +++ b/atlite/datasets/era5.py @@ -20,6 +20,7 @@ import numpy as np import pandas as pd import xarray as xr +from dask import compute, delayed from numpy import atleast_1d from atlite.gis import maybe_swap_spatial_dims @@ -274,7 +275,7 @@ def _area(coords): return [y1, x0, y0, x1] -def retrieval_times(coords, static=False): +def retrieval_times(coords, static=False, monthly_requests=False): """ Get list of retrieval cdsapi arguments for time dimension in coordinates. @@ -287,6 +288,11 @@ def retrieval_times(coords, static=False): Parameters ---------- coords : atlite.Cutout.coords + static : bool, optional + monthly_requests : bool, optional + If True, the data is requested on a monthly basis. This is useful for + large cutouts, where the data is requested in smaller chunks. The + default is False Returns ------- @@ -305,12 +311,21 @@ def retrieval_times(coords, static=False): times = [] for year in time.year.unique(): t = time[time.year == year] - for month in t.month.unique(): + if monthly_requests: + for month in t.month.unique(): + query = { + "year": str(year), + "month": str(month), + "day": list(t[t.month == month].day.unique()), + "time": ["%02d:00" % h for h in t[t.month == month].hour.unique()], + } + times.append(query) + else: query = { "year": str(year), - "month": str(month), - "day": list(t[t.month == month].day.unique()), - "time": ["%02d:00" % h for h in t[t.month == month].hour.unique()], + "month": list(t.month.unique()), + "day": list(t.day.unique()), + "time": ["%02d:00" % h for h in t.hour.unique()], } times.append(query) return times @@ -377,7 +392,15 @@ def retrieve_data(product, chunks=None, tmpdir=None, lock=None, **updates): return ds -def get_data(cutout, feature, tmpdir, lock=None, **creation_parameters): +def get_data( + cutout, + feature, + tmpdir, + lock=None, + monthly_requests=False, + concurrent_requests=False, + **creation_parameters, +): """ Retrieve data from ECMWFs ERA5 dataset (via CDS). @@ -392,6 +415,13 @@ def get_data(cutout, feature, tmpdir, lock=None, **creation_parameters): `atlite.datasets.era5.features` tmpdir : str/Path Directory where the temporary netcdf files are stored. + monthly_requests : bool, optional + If True, the data is requested on a monthly basis in ERA5. This is useful for + large cutouts, where the data is requested in smaller chunks. The + default is False + concurrent_requests : bool, optional + If True, the monthly data requests are posted concurrently. + Only has an effect if `monthly_requests` is True. **creation_parameters : Additional keyword arguments. The only effective argument is 'sanitize' (default True) which sets sanitization of the data on or off. @@ -428,6 +458,11 @@ def retrieve_once(time): if feature in static_features: return retrieve_once(retrieval_times(coords, static=True)).squeeze() - datasets = map(retrieve_once, retrieval_times(coords)) + time_chunks = retrieval_times(coords, monthly_requests=monthly_requests) + if concurrent_requests: + delayed_datasets = [delayed(retrieve_once)(chunk) for chunk in time_chunks] + datasets = compute(*delayed_datasets) + else: + datasets = map(retrieve_once, time_chunks) return xr.concat(datasets, dim="time").sel(time=coords["time"]) diff --git a/atlite/datasets/gebco.py b/atlite/datasets/gebco.py index 1e78ed47..34a7b5f2 100755 --- a/atlite/datasets/gebco.py +++ b/atlite/datasets/gebco.py @@ -45,7 +45,14 @@ def get_data_gebco_height(xs, ys, gebco_path): ) -def get_data(cutout, feature, tmpdir, **creation_parameters): +def get_data( + cutout, + feature, + tmpdir, + monthly_requests=False, + concurrent_requests=False, + **creation_parameters +): """ Get the gebco height data. @@ -56,6 +63,10 @@ def get_data(cutout, feature, tmpdir, **creation_parameters): Takes no effect, only here for consistency with other dataset modules. tmpdir : str Takes no effect, only here for consistency with other dataset modules. + monthly_requests : bool + Takes no effect, only here for consistency with other dataset modules. + concurrent_requests : bool + Takes no effect, only here for consistency with other dataset modules. **creation_parameters : Must include `gebco_path`. diff --git a/atlite/datasets/sarah.py b/atlite/datasets/sarah.py index fd53e3c5..3aff621f 100644 --- a/atlite/datasets/sarah.py +++ b/atlite/datasets/sarah.py @@ -160,7 +160,9 @@ def hourly_mean(ds): return ds -def get_data(cutout, feature, tmpdir, lock=None, **creation_parameters): +def get_data( + cutout, feature, tmpdir, lock=None, monthly_requests=False, **creation_parameters +): """ Load stored SARAH data and reformat to matching the given cutout. @@ -173,6 +175,10 @@ def get_data(cutout, feature, tmpdir, lock=None, **creation_parameters): feature : str Name of the feature data to retrieve. Must be in `atlite.datasets.sarah.features` + monthly_requests : bool + Takes no effect, only here for consistency with other dataset modules. + concurrent_requests : bool + Takes no effect, only here for consistency with other dataset modules. **creation_parameters : Mandatory arguments are: * 'sarah_dir', str. Directory of the stored SARAH data. diff --git a/atlite/gis.py b/atlite/gis.py index ebf943ac..66e3a244 100644 --- a/atlite/gis.py +++ b/atlite/gis.py @@ -674,7 +674,7 @@ def _process_func(i): def compute_availabilitymatrix( - cutout, shapes, excluder, nprocesses=None, disable_progressbar=False + cutout, shapes, excluder, nprocesses=None, disable_progressbar=True ): """ Compute the eligible share within cutout cells in the overlap with shapes. diff --git a/atlite/hydro.py b/atlite/hydro.py index a94bbd47..58e34799 100644 --- a/atlite/hydro.py +++ b/atlite/hydro.py @@ -41,7 +41,7 @@ def find_upstream_basins(meta, hid): return hids -def determine_basins(plants, hydrobasins, show_progress=True): +def determine_basins(plants, hydrobasins, show_progress=False): if isinstance(hydrobasins, str): hydrobasins = gpd.read_file(hydrobasins) @@ -81,7 +81,7 @@ def determine_basins(plants, hydrobasins, show_progress=True): def shift_and_aggregate_runoff_for_plants( - basins, runoff, flowspeed=1, show_progress=True + basins, runoff, flowspeed=1, show_progress=False ): inflow = xr.DataArray( np.zeros((len(basins.plants), runoff.indexes["time"].size)), diff --git a/test/test_preparation_and_conversion.py b/test/test_preparation_and_conversion.py index a1501ac5..822d8d3d 100644 --- a/test/test_preparation_and_conversion.py +++ b/test/test_preparation_and_conversion.py @@ -678,6 +678,13 @@ def test_pv_era5_and_era5t(tmp_path_factory): cutout, time=str(first_day_prev_month), skip_optimal_sum_test=True ) + @staticmethod + @pytest.mark.parametrize("concurrent_requests", [True, False]) + def test_era5_monthly_requests(tmp_path_factory, concurrent_requests): + tmp_path = tmp_path_factory.mktemp("era5_mon") + cutout = Cutout(path=tmp_path / "era5", module="era5", bounds=BOUNDS, time=TIME) + cutout.prepare(monthly_requests=True, concurrent_requests=concurrent_requests) + @staticmethod def test_wind_era5(cutout_era5): return wind_test(cutout_era5)