Skip to content

Commit df4d35a

Browse files
committed
FIX: adjust paths in data_retrieval_utils + new hdf5 option
1 parent 9fcf2ce commit df4d35a

File tree

1 file changed

+94
-87
lines changed

1 file changed

+94
-87
lines changed

src/pyrad_proc/pyrad/util/data_retrieval_utils.py

+94-87
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,21 @@
2222
import fnmatch
2323
import re
2424
import pandas as pd # function used in retrieve_hzt_prod
25+
from io import BytesIO
2526

2627
# MeteoSwiss constants
2728
OFFSET_CCS4 = [297, -100]
2829
# Folder depends on server:
2930
if ("lom" in socket.gethostname()) or ("meteoswiss" in socket.gethostname()):
3031
FOLDER_RADAR = "/srn/data/"
3132
FOLDER_ISO0 = "/srn/data/HZT/"
32-
elif "tsa" or "balfrin" in socket.gethostname():
33-
FOLDER_DATABASE = "/store/msrad/radar/radar_database/"
34-
FOLDER_RADAR = "/store/msrad/radar/swiss/data/"
35-
FOLDER_RADARH = "/store/msrad/radar/polarHR/data/"
36-
FOLDER_CPCCV = "/store/msrad/radar/cpc_validation/daily/"
37-
FOLDER_ISO0 = "/store/msrad/radar/swiss/data/"
33+
elif "balfrin" in socket.gethostname():
34+
FOLDER_DATABASE = "/store_new/mch/msrad/radar/radar_database/"
35+
FOLDER_RADAR = "/store_new/mch//msrad/radar/swiss/data/"
36+
FOLDER_RADARH = "/store_new/mch//msrad/radar/polarHR/data/"
37+
FOLDER_CPCCV = "/store_new/mch//msrad/radar/cpc_validation/daily/"
38+
FOLDER_ISO0 = "/store_new/mch//msrad/radar/swiss/data/"
39+
FOLDER_RADAR_HDF5 = "/store_new/mch/msrad/radar/swiss/data/hdf5/"
3840

3941

4042
def _make_timezone_aware(dt, tz=datetime.timezone.utc):
@@ -323,28 +325,30 @@ def _retrieve_hzt_prod_daily(folder_out, start_time, end_time, pattern_type="she
323325

324326

325327
def retrieve_mch_prod(
326-
folder_out,
327328
start_time,
328329
end_time,
329330
product_name,
331+
folder_out=None,
330332
pattern=None,
331333
pattern_type="shell",
332334
sweeps=None,
335+
hdf5=False,
333336
):
334337
"""Retrieves radar data from the CSCS repository for a specified
335338
time range, unzips them and places them in a specified folder
336339
337340
Parameters
338341
----------
339342
340-
folder_out: str
341-
directory where to store the unzipped files
342343
start_time : datetime.datetime instance
343344
starting time of the time range
344345
end_time : datetime.datetime instance
345346
end time of the time range
346347
product_name: str
347348
name of the product, as stored on CSCS, e.g. RZC, CPCH, MZC, BZC...
349+
folder_out: str
350+
directory where to store the unzipped files, if set to None
351+
will read the file to memory
348352
pattern: str
349353
pattern constraint on file names, can be used for products which contain
350354
multiple filetypes, f.ex CPCH folders contain both rda and gif files,
@@ -355,31 +359,32 @@ def retrieve_mch_prod(
355359
sweeps: list of int (optional)
356360
For polar products, specifies which sweeps (elevations) must be
357361
retrieved, if not specified all available sweeps will be retrieved
362+
hdf5: bool
363+
If True will retrieve the hdf5 files for the given product (beware
364+
hdf5 is not available for all products)
358365
359366
Returns
360367
-------
361368
A list containing all the filepaths of the retrieved files
362369
363370
"""
364-
start_time = _make_timezone_aware(start_time)
365-
end_time = _make_timezone_aware(end_time)
366371

367372
if product_name == "ZZW" or product_name == "ZZP": # no vpr for PPM and WEI
368373
product_name = "ZZA"
369374

370-
if product_name == "CPC":
371-
folder_out = folder_out + "/CPC"
372-
if product_name == "CPCH":
373-
folder_out = folder_out + "/CPCH"
374-
375-
if not os.path.exists(folder_out):
376-
os.makedirs(folder_out)
375+
if folder_out:
376+
if not os.path.exists(folder_out):
377+
os.makedirs(folder_out)
378+
if product_name == "CPC":
379+
folder_out = os.path.join(folder_out, "CPC")
380+
if product_name == "CPCH":
381+
folder_out = os.path.join(folder_out, "CPCH")
377382

378383
# Check if times are aware or naive
379384
if start_time.tzinfo is None:
380-
start_time.replace(tzinfo=datetime.timezone.utc)
385+
start_time = start_time.replace(tzinfo=datetime.timezone.utc)
381386
if end_time.tzinfo is None:
382-
end_time.replace(tzinfo=datetime.timezone.utc)
387+
end_time = end_time.replace(tzinfo=datetime.timezone.utc)
383388

384389
dt = datetime.timedelta(minutes=5)
385390
delta = end_time - start_time
@@ -415,13 +420,14 @@ def retrieve_mch_prod(
415420
tzinfo=datetime.timezone.utc,
416421
)
417422
files = _retrieve_prod_daily(
418-
folder_out,
419423
start_time,
420424
end_time,
421425
product_name,
426+
folder_out,
422427
pattern,
423428
pattern_type,
424429
sweeps,
430+
hdf5,
425431
)
426432

427433
all_files.extend(files)
@@ -502,95 +508,96 @@ def retrieve_mch_prod_RT(
502508

503509

504510
def _retrieve_prod_daily(
505-
folder_out,
506511
start_time,
507512
end_time,
508513
product_name,
514+
folder_out=None,
509515
pattern=None,
510516
pattern_type="shell",
511517
sweeps=None,
518+
hdf5=False,
512519
):
513-
"""This is a version that works only for a given day (i.e. start and end
514-
time on the same day)
515-
"""
516-
start_time = _make_timezone_aware(start_time)
517-
end_time = _make_timezone_aware(end_time)
520+
"""Retrieve radar product files for a given day, with an option to store them in RAM."""
518521

519-
if product_name[0:2] == "MH":
520-
folder_radar = FOLDER_RADARH
522+
if hdf5:
523+
folder_radar = FOLDER_RADAR_HDF5
521524
else:
522-
folder_radar = FOLDER_RADAR
523-
524-
folder_out += "/"
525+
folder_radar = FOLDER_RADARH if product_name[:2] == "MH" else FOLDER_RADAR
525526

526527
suffix = str(start_time.year)[-2:] + str(start_time.timetuple().tm_yday).zfill(3)
527-
folder_in = folder_radar + str(start_time.year) + "/" + suffix + "/"
528+
folder_in = os.path.join(folder_radar, str(start_time.year), suffix)
528529
name_zipfile = product_name + suffix + ".zip"
529530

530-
# Get list of files in zipfile
531-
zipp = zipfile.ZipFile(folder_in + name_zipfile)
532-
content_zip = np.array(zipp.namelist())
531+
# Open the zip file
532+
with zipfile.ZipFile(os.path.join(folder_in, name_zipfile), "r") as zipp:
533+
content_zip = np.array(zipp.namelist())
534+
535+
# Filter files based on pattern if provided
536+
if pattern:
537+
if pattern_type == "shell":
538+
content_zip = [
539+
c
540+
for c in content_zip
541+
if fnmatch.fnmatch(os.path.basename(c), pattern)
542+
]
543+
elif pattern_type == "regex":
544+
content_zip = [
545+
c for c in content_zip if re.match(pattern, os.path.basename(c))
546+
]
547+
else:
548+
raise ValueError(
549+
'Unknown pattern_type, must be either "shell" or "regex".'
550+
)
533551

534-
if pattern is not None:
535-
if pattern_type == "shell":
536-
content_zip = [
537-
c for c in content_zip if fnmatch.fnmatch(os.path.basename(c), pattern)
538-
]
539-
elif pattern_type == "regex":
540-
content_zip = [
541-
c
552+
content_zip = np.array(content_zip)
553+
554+
# Extract timestamps from filenames
555+
times_zip = np.array(
556+
[
557+
datetime.datetime.strptime(c[3:12], "%y%j%H%M").replace(
558+
tzinfo=datetime.timezone.utc
559+
)
542560
for c in content_zip
543-
if re.match(pattern, os.path.basename(c)) is not None
544561
]
545-
else:
546-
raise ValueError('Unknown pattern_type, must be either "shell" or "regex".')
547-
548-
content_zip = np.array(content_zip)
549-
550-
times_zip = np.array(
551-
[
552-
datetime.datetime.strptime(c[3:12], "%y%j%H%M").replace(
553-
tzinfo=datetime.timezone.utc
554-
)
555-
for c in content_zip
556-
]
557-
)
562+
)
558563

559-
# Get a list of all files to retrieve
560-
conditions = np.array(
561-
[np.logical_and(t >= start_time, t <= end_time) for t in times_zip]
562-
)
564+
# Filter files based on the given time range
565+
conditions = np.array([start_time <= t <= end_time for t in times_zip])
563566

564-
# Filter on sweeps:
565-
if sweeps is not None:
566-
sweeps_zip = np.array([int(c[-3:]) for c in content_zip])
567-
# Get a list of all files to retrieve
568-
conditions_sweep = np.array([s in sweeps for s in sweeps_zip])
569-
conditions = np.logical_and(conditions, conditions_sweep)
567+
# Further filter based on sweeps if provided
568+
if sweeps is not None:
569+
sweeps_zip = np.array([int(c[-3:]) for c in content_zip])
570+
conditions_sweep = np.array([s in sweeps for s in sweeps_zip])
571+
conditions = np.logical_and(conditions, conditions_sweep)
570572

571-
if not np.any(conditions):
572-
msg = """
573-
No file was found corresponding to this format, verify pattern and product_name
574-
"""
575-
raise ValueError(msg)
573+
if not np.any(conditions):
574+
raise ValueError(
575+
"No file was found corresponding to this format, verify pattern and product_name"
576+
)
576577

577-
# Create string to retrieve files over unzip
578-
files_to_retrieve = " ".join(content_zip[conditions])
578+
selected_files = content_zip[conditions]
579+
if not folder_out:
580+
# Load selected files into memory as dictionary {filename: file_content}
581+
files_in_memory = [
582+
BytesIO(zipp.read(file_name)) for file_name in selected_files
583+
]
584+
return files_in_memory
585+
else:
586+
# Prepare files for extraction
587+
files_to_retrieve = " ".join(selected_files)
579588

580-
# Check if files are already unzipped (saves time if they already exist)
581-
for fi in content_zip[conditions]:
582-
if os.path.exists(folder_out + fi):
583-
files_to_retrieve = files_to_retrieve.replace(fi, "")
589+
# Check if files are already unzipped (skip those that exist)
590+
for fi in selected_files:
591+
if os.path.exists(folder_out + fi):
592+
files_to_retrieve = files_to_retrieve.replace(fi, "")
584593

585-
# Only unzip if at least one file does not exist
586-
if len(files_to_retrieve.strip()) > 0:
587-
cmd = 'unzip -j -o -qq "{:s}" {:s} -d {:s}'.format(
588-
folder_in + name_zipfile, files_to_retrieve, folder_out
589-
)
590-
subprocess.call(cmd, shell=True)
594+
# Unzip only if needed
595+
if len(files_to_retrieve.strip()) > 0:
596+
cmd = f'unzip -j -o -qq "{os.path.join(folder_in, name_zipfile)}" {files_to_retrieve} -d {folder_out}'
597+
subprocess.call(cmd, shell=True)
591598

592-
files = sorted(np.array([folder_out + c for c in content_zip[conditions]]))
593-
return files
599+
files = sorted([folder_out + c for c in selected_files])
600+
return files
594601

595602

596603
def retrieve_CPCCV(time, stations):

0 commit comments

Comments
 (0)