Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement to ERA5 Data Retrieval and Download Process #397

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
184 changes: 174 additions & 10 deletions atlite/datasets/era5.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@
https://confluence.ecmwf.int/display/CKB/ERA5%3A+data+documentation
"""

import hashlib
import logging
import os
import time
import warnings
import weakref
from tempfile import mkstemp

import cdsapi
import numpy as np
import pandas as pd
import requests
import xarray as xr
from dask import compute, delayed
from dask.array import arctan2, sqrt
Expand All @@ -25,6 +28,10 @@
from atlite.gis import maybe_swap_spatial_dims
from atlite.pv.solar_position import SolarPosition

download_status = {}
file_aliases = {}
MAX_DISPLAY_FILES = 3

# Null context for running a with statements wihout any context
try:
from contextlib import nullcontext
Expand Down Expand Up @@ -96,6 +103,26 @@
ds = maybe_swap_spatial_dims(ds)
if add_lon_lat:
ds = ds.assign_coords(lon=ds.coords["x"], lat=ds.coords["y"])

# Combine ERA5 and ERA5T data into a single dimension.
# See https://github.com/PyPSA/atlite/issues/190
if "expver" in ds.coords:
unique_expver = np.unique(ds["expver"].values)

Check warning on line 110 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L110

Added line #L110 was not covered by tests
if len(unique_expver) > 1:
expver_dim = xr.DataArray(

Check warning on line 112 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L112

Added line #L112 was not covered by tests
unique_expver, dims=["expver"], coords={"expver": unique_expver}
)
ds = (

Check warning on line 115 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L115

Added line #L115 was not covered by tests
ds.assign_coords({"expver_dim": expver_dim})
.drop_vars("expver")
.rename({"expver_dim": "expver"})
.set_index(expver="expver")
)
for var in ds.data_vars:
ds[var] = ds[var].expand_dims("expver")

Check warning on line 122 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L122

Added line #L122 was not covered by tests
# expver=1 is ERA5 data, expver=5 is ERA5T data This combines both
# by filling in NaNs from ERA5 data with values from ERA5T.
ds = ds.sel(expver="0001").combine_first(ds.sel(expver="0005"))

Check warning on line 125 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L125

Added line #L125 was not covered by tests
ds = ds.drop_vars(["expver", "number"], errors="ignore")

return ds
Expand Down Expand Up @@ -331,6 +358,125 @@
logger.error(f"Unable to delete file {path}, as it is still in use.")


def get_cache_filename(request, cache_dir):
"""
Generate a unique cache filename based on the request parameters.
"""
# Serialize the request dictionary into a sorted string to ensure consistency
request_str = "_".join(
f"{key}-{sorted(value) if isinstance(value, list) else value}"
for key, value in sorted(request.items())
)
# Generate a hash of the request string
request_hash = hashlib.md5(request_str.encode("utf-8")).hexdigest()
# Use the first 8 characters of the hash for brevity
return f"{request_hash}.nc"


def custom_download(url, size, target, lock, filename):
"""
Optimized download function that uses a simple progress bar and removes
completed files from the display.
"""
if target is None:
target = url.split("/")[-1]

Check warning on line 382 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L382

Added line #L382 was not covered by tests

# Assign a short alias for the filename (e.g. f1, f2, ...)
file_number = len(file_aliases) + 1
file_aliases[filename] = f"f{file_number}"

Check warning on line 386 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L385-L386

Added lines #L385 - L386 were not covered by tests

logging.info(f"Downloading {filename} to {target} ({size} bytes)")
start = time.time()

Check warning on line 389 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L388-L389

Added lines #L388 - L389 were not covered by tests

mode = "wb"
total = 0
sleep = 10
tries = 0
headers = None

Check warning on line 395 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L391-L395

Added lines #L391 - L395 were not covered by tests

while tries < 5:
r = requests.get(url, stream=True, headers=headers)
try:
r.raise_for_status()

Check warning on line 400 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L398-L400

Added lines #L398 - L400 were not covered by tests

with open(target, mode) as f:

Check warning on line 402 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L402

Added line #L402 was not covered by tests
for chunk in r.iter_content(chunk_size=1024 * 1024):
if chunk:
f.write(chunk)
total += len(chunk)
with lock:
download_status[filename] = total / size * 100
update_progress_bar()

Check warning on line 409 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L405-L409

Added lines #L405 - L409 were not covered by tests

except requests.exceptions.ConnectionError as e:
logging.error(f"Download interrupted: {e}")
break

Check warning on line 413 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L411-L413

Added lines #L411 - L413 were not covered by tests
finally:
r.close()

Check warning on line 415 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L415

Added line #L415 was not covered by tests

if total >= size:
break

Check warning on line 418 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L418

Added line #L418 was not covered by tests

logging.error(f"Download incomplete, downloaded {total} bytes out of {size}")
logging.warning(f"Sleeping {sleep} seconds")
time.sleep(sleep)
mode = "ab"
total = os.path.getsize(target)
sleep *= 1.5
headers = {"Range": f"bytes={total}-"}
tries += 1

Check warning on line 427 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L420-L427

Added lines #L420 - L427 were not covered by tests

if total != size:
raise Exception(f"Download failed: downloaded {total} bytes out of {size}")

Check warning on line 430 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L430

Added line #L430 was not covered by tests

elapsed = time.time() - start

Check warning on line 432 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L432

Added line #L432 was not covered by tests
if elapsed:
logging.info(f"Download rate {total / elapsed:.2f} bytes/s")

Check warning on line 434 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L434

Added line #L434 was not covered by tests

return target

Check warning on line 436 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L436

Added line #L436 was not covered by tests


def update_progress_bar():
"""
Update a progress bar that shows the percentage of all files being
downloaded.

Files that have reached 100% are removed from the display. Only
short aliases are displayed.
"""
completed_files = [

Check warning on line 447 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L447

Added line #L447 was not covered by tests
file for file, progress in download_status.items() if progress >= 100
]

# Remove completed files from the progress dictionary
for file in completed_files:
del download_status[file]
del file_aliases[file] # Remove alias as well

Check warning on line 454 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L453-L454

Added lines #L453 - L454 were not covered by tests

if not download_status:
# If no active downloads, clear the progress bar
print("\r", end="")
return

Check warning on line 459 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L458-L459

Added lines #L458 - L459 were not covered by tests

# Only display the top N files to avoid multi-line output
displayed_files = list(download_status.items())[:MAX_DISPLAY_FILES]

Check warning on line 462 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L462

Added line #L462 was not covered by tests

# Create progress string using the short aliases
progress = " | ".join(

Check warning on line 465 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L465

Added line #L465 was not covered by tests
[
f"{file_aliases[file]}: {int(progress)}%"
for file, progress in displayed_files
]
)

# If there are more files, show a summary
if len(download_status) > MAX_DISPLAY_FILES:
progress += f" | ... and {len(download_status) - MAX_DISPLAY_FILES} more"

Check warning on line 474 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L474

Added line #L474 was not covered by tests

# Use \r to overwrite the same line
print(f"\r{progress}", end="")

Check warning on line 477 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L477

Added line #L477 was not covered by tests


def retrieve_data(product, chunks=None, tmpdir=None, lock=None, **updates):
"""
Download data like ERA5 from the Climate Data Store (CDS).
Expand All @@ -345,6 +491,21 @@
request
), "Need to specify at least 'variable', 'year' and 'month'"

# Use tmpdir for cache directory; if not provided, use current working directory
if tmpdir is None:
tmpdir = os.getcwd()

Check warning on line 496 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L496

Added line #L496 was not covered by tests
cache_dir = tmpdir
os.makedirs(cache_dir, exist_ok=True)

# Generate cache filename based on request
cache_filename = get_cache_filename(request, cache_dir)
cache_filepath = os.path.join(cache_dir, cache_filename)

if os.path.exists(cache_filepath):
logging.info(f"Using cached file for request: {cache_filename}")
ds = xr.open_dataset(cache_filepath, chunks=chunks or {})
return ds

Check warning on line 507 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L505-L507

Added lines #L505 - L507 were not covered by tests

client = cdsapi.Client(
info_callback=logger.debug, debug=logging.DEBUG >= logging.root.level
)
Expand All @@ -357,17 +518,20 @@
fd, target = mkstemp(suffix=".nc", dir=tmpdir)
os.close(fd)

# Inform user about data being downloaded as "* variable (year-month)"
timestr = f"{request['year']}-{request['month']}"
variables = atleast_1d(request["variable"])
varstr = "\n\t".join([f"{v} ({timestr})" for v in variables])
logger.info(f"CDS: Downloading variables\n\t{varstr}\n")
result.download(target)

ds = xr.open_dataset(target, chunks=chunks or {})
# Inform user about data being downloaded as "* variable (year-month)"
timestr = f"{request['year']}-{request['month']}"
variables = atleast_1d(request["variable"])
varstr = "\n\t".join([f"{v} ({timestr})" for v in variables])
filename = f"{variables[0]}_{timestr}.nc"
logger.info(f"CDS: Downloading variables\n\t{varstr}\n")
custom_download(result.location, result.content_length, target, lock, filename)

Check warning on line 527 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L522-L527

Added lines #L522 - L527 were not covered by tests

# Move the downloaded file to cache directory
os.rename(target, cache_filepath)
ds = xr.open_dataset(cache_filepath, chunks=chunks or {})

Check warning on line 531 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L530-L531

Added lines #L530 - L531 were not covered by tests
if tmpdir is None:
logger.debug(f"Adding finalizer for {target}")
weakref.finalize(ds._file_obj._manager, noisy_unlink, target)
logger.debug(f"Adding finalizer for {cache_filepath}")
weakref.finalize(ds._file_obj._manager, noisy_unlink, cache_filepath)

Check warning on line 534 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L533-L534

Added lines #L533 - L534 were not covered by tests

return ds

Expand Down
Loading