Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion dags/cumulus/aprfc_qpe_06h.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from airflow import DAG
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from helpers.downloads import trigger_download
from helpers.downloads import s3_file_exists, trigger_download

import helpers.cumulus as cumulus

Expand Down Expand Up @@ -65,6 +65,9 @@ def download_raw_qpe():
url = f"{URL_ROOT}/{filename}"
filename = filename.split("/")[1]
s3_key = f"{key_prefix}/{PRODUCT_SLUG}/{filename}"
if s3_file_exists(cumulus.S3_BUCKET, s3_key):
print(f"Skipping existing S3 object: s3://{cumulus.S3_BUCKET}/{s3_key}")
continue # Skip to the next file
print(f"Downloading file: {filename}")
try:
trigger_download(url=url, s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key)
Expand Down
70 changes: 43 additions & 27 deletions dags/cumulus/aprfc_qpf_06h.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,34 @@
}


QPF_REGEX = re.compile(
r"^qpf06f_has_" # literal prefix with underscore
r".*?" # anything up to _awips_
r"_awips_"
r"(\d{12})" # group 1: 12-digit forecast time (2025121112)
r"_"
r"(\d{10})" # group 2: 10-digit issuance (202512030603)
r"f"
r"(\d{3})" # group 3: forecast hour (192)
r"\.grb(?:\.gz)?$" # .grb or .grb.gz, no extra .digits
)

def get_latest_files(filenames):
# Dictionary to store the latest file for each unique timestamp
latest_files = {}

# Corrected regular expression
pattern = r"^qpf06f_has_.*_awips_(\d{12})_(\d{10}f\d{3})\.\d+\.(grb|grb\.gz)$"
for filename in filenames:
match = re.search(pattern, filename)
if match:
# Combine the issue timestamp and forecast timestamp as the key
issue_timestamp = match.group(1)
forecast_timestamp = match.group(2)
key = issue_timestamp + "_" + forecast_timestamp
# Update the latest file for the key if it's not present or if the current filename is greater
if key not in latest_files or filename > latest_files[key]:
latest_files[key] = filename

m = QPF_REGEX.match(filename)
if not m:
continue
issue_ts = m.group(2) # 10-digit issuance
base_ts = m.group(1) # 12-digit valid/base
forecast_hr = m.group(3) # 3-digit forecast hour
key = f"{issue_ts}_{base_ts}_{forecast_hr}"
# Update the latest file for the key if it's not present or if the current filename is greater

if key not in latest_files or filename > latest_files[key]:
latest_files[key] = filename
# Return the list of latest files
return list(latest_files.values())

Expand All @@ -63,21 +74,20 @@ def get_filenames(edate, url):
soup = BeautifulSoup(page.content, "html.parser")
links = [node.get("href") for node in soup.find_all("a")]

regex = r"^qpf06f_has_.*_awips_\d{12}_\d{10}f\d{3}\.\d+\.(grb|grb\.gz)$"
issue_re = re.compile(
r"_awips_(\d{12})"
) # Capture the issuance timestamp after 'awips_'

# Collect candidates with their issuance timestamp
candidates = []
for link in links:
if link and re.match(regex, link):
m = issue_re.search(link)
if m:
candidates.append((link, m.group(1)))

if not link:
continue
link = link.strip()
m = QPF_REGEX.match(link)
if m:
issue_ts = m.group(1) # 12-digit issuance after _awips_
candidates.append((link, issue_ts))
# print(f"DEBUG link: {link}, issue_ts: {issue_ts}")

# No matching files
if not candidates:
print("DEBUG: no candidates matched QPF_REGEX")
return []

# Find the latest issuance
Expand All @@ -92,6 +102,13 @@ def get_filenames(edate, url):
if latest_issue_dt > edate or (edate - latest_issue_dt) > timedelta(hours=36):
# Outside the 36-hour window or in the future: do not download anything
return []
# print(f"DEBUG found {len(candidates)} matching files; latest_issue_str={latest_issue_str}")

# print(f"DEBUG latest_issue_str: {latest_issue_str}")
# print(f"DEBUG latest_issue_dt: {latest_issue_dt.isoformat()}")
# print(f"DEBUG edate (logical_date): {edate.isoformat()}")
# print(f"DEBUG age_hours: {(edate - latest_issue_dt).total_seconds() / 3600.0}")


# Filter to only files from the latest issuance
latest_issue_files = [fn for fn, issue in candidates if issue == latest_issue_str]
Expand Down Expand Up @@ -128,10 +145,9 @@ def download_raw_qpf():
url = f"{URL_ROOT}/{filename}"
s3_key = f"{key_prefix}/{PRODUCT_SLUG}/{filename}"
# Check if the file already exists in S3
# needs infrastructure change to reenable this check
#if s3_file_exists(cumulus.S3_BUCKET, s3_key):
# print(f"Skipping existing S3 object: s3://{cumulus.S3_BUCKET}/{s3_key}")
# continue # Skip to the next file
if s3_file_exists(cumulus.S3_BUCKET, s3_key):
print(f"Skipping existing S3 object: s3://{cumulus.S3_BUCKET}/{s3_key}")
continue # Skip to the next file
print(f"Downloading file: {filename}")
try:
trigger_download(url=url, s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key)
Expand Down
86 changes: 53 additions & 33 deletions dags/cumulus/aprfc_qte_01h.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.utils.task_group import TaskGroup
from helpers.downloads import trigger_download
from helpers.downloads import s3_file_exists, trigger_download

import helpers.cumulus as cumulus

default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": (datetime.now(timezone.utc) - timedelta(hours=48)).replace(
"start_date": (datetime.now(timezone.utc) - timedelta(hours=72)).replace(
minute=0, second=0
),
"catchup_by_default": False,
"catchup": True,
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
Expand All @@ -35,8 +35,8 @@
default_args=default_args,
tags=["cumulus", "AIRTEMP", "QTE", "APRFC"],
schedule="45 * * * *",
max_active_runs=1,
max_active_tasks=1,
max_active_runs=2,
max_active_tasks=4,
)
def cumulus_aprfc_qte_01h():
"""
Expand All @@ -58,51 +58,71 @@ def cumulus_aprfc_qte_01h():

URL_ROOT = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/urma/prod/"
PRODUCT_SLUG = "aprfc-qte-01h"
LOOKBACK_HOURS = 12 # number of hours from runtime to look back for

filename_template = Template("akurma.t${hr_}z.2dvaranl_ndfd_3p0.grb2 ")
filename_template = Template("akurma.t${hr_}z.2dvaranl_ndfd_3p0.grb2")

url_suffix_template = Template("akurma.${date_}")

@task()
def download_raw_qte():
logical_date = get_current_context()["logical_date"]
date_only = logical_date.strftime("%Y%m%d")
anchor = get_current_context()["data_interval_end"].replace(minute=0, second=0, microsecond=0)

url_suffix = url_suffix_template.substitute(
date_=date_only,
)
results = []

filename = filename_template.substitute(
hr_=logical_date.strftime("%H"),
)
for offset in range(LOOKBACK_HOURS):
ts = anchor - timedelta(hours=1 + offset) # last complete hour, then look back
date_only = ts.strftime("%Y%m%d")
hour_str = ts.strftime("%H")

file_dir = f"{URL_ROOT}{url_suffix}"

s3_filename = f"{date_only}_{filename}"
s3_key = f"{key_prefix}/{PRODUCT_SLUG}/{s3_filename}"
url_suffix = url_suffix_template.substitute(
date_=date_only,
)

print(f"Downloading file: {filename}")
filename = filename_template.substitute(
hr_=hour_str,
)

trigger_download(
url=f"{file_dir}/{filename}", s3_bucket=s3_bucket, s3_key=s3_key
)
return json.dumps(
{
"execution": logical_date.isoformat(),
"s3_key": s3_key,
"filename": s3_filename,
}
)
file_dir = f"{URL_ROOT}{url_suffix}"

s3_filename = f"{date_only}_{filename}"
s3_key = f"{key_prefix}/{PRODUCT_SLUG}/{s3_filename}"

if s3_file_exists(cumulus.S3_BUCKET, s3_key):
print(f"Skipping existing S3 object: s3://{cumulus.S3_BUCKET}/{s3_key}")
continue # Skip to the next file

print(f"Downloading file: {url_suffix}/{filename}")

try:

trigger_download(
url=f"{file_dir}/{filename}", s3_bucket=s3_bucket, s3_key=s3_key
)
except:
print(f'Failed downloading {filename}')

results.append(
{
"execution": ts.isoformat(),
"s3_key": s3_key,
"filename": s3_filename,
}
)
return json.dumps(results)

@task()
def notify_cumulus(payload):
payload = json.loads(payload)
print("Notifying Cumulus: " + payload["filename"])
cumulus.notify_acquirablefile(
acquirable_id=cumulus.acquirables[PRODUCT_SLUG],
datetime=payload["execution"],
s3_key=payload["s3_key"],
)
for item in payload:
print("Notifying Cumulus: " + item["filename"])
cumulus.notify_acquirablefile(
acquirable_id=cumulus.acquirables[PRODUCT_SLUG],
datetime=item["execution"],
s3_key=item["s3_key"],
)

notify_cumulus(download_raw_qte())

Expand Down
7 changes: 3 additions & 4 deletions dags/cumulus/aprfc_qtf_01h.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,9 @@ def download_raw_qtf():
url = f"{URL_ROOT}/{filename}"
s3_key = f"{key_prefix}/{PRODUCT_SLUG}/{filename}"
# Check if the file already exists in S3
#need infrastructure changes to reenable this check
#if s3_file_exists(cumulus.S3_BUCKET, s3_key):
# print(f"Skipping existing S3 object: s3://{cumulus.S3_BUCKET}/{s3_key}")
# continue # Skip to the next file
if s3_file_exists(cumulus.S3_BUCKET, s3_key):
print(f"Skipping existing S3 object: s3://{cumulus.S3_BUCKET}/{s3_key}")
continue # Skip to the next file
print(f"Downloading file: {filename}")
try:
trigger_download(url=url, s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key)
Expand Down
29 changes: 14 additions & 15 deletions dags/cumulus/prism_early.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,28 +62,27 @@ def cumulus_prism_early():
def download_raw_prism_early(short_name='ppt'):
product_slug = f"prism-{short_name}-early"
logical_date = get_current_context()["logical_date"]
execution_date = logical_date.date()
execution_date = (get_current_context()["data_interval_end"]).date()-timedelta(hours=24)
results = []

dt = execution_date
file_dir = f'{URL_ROOT}/{short_name}/daily/{dt.strftime("%Y")}'
filename = generate_filename_for_product(short_name, dt)
s3_key = f"{cumulus.S3_ACQUIRABLE_PREFIX}/{product_slug}/{filename}"
print(f"Downloading {filename}")
try:
output = trigger_download(
url=f"{file_dir}/{filename}", s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key,
)
results.append(
{
"datetime": logical_date.isoformat(),
"s3_key": s3_key,
"product_slug": product_slug,
"filename": filename,
}
)
except:
print(f'Error downloading {filename}')

output = trigger_download(
url=f"{file_dir}/{filename}", s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key,
)
results.append(
{
"datetime": logical_date.isoformat(),
"s3_key": s3_key,
"product_slug": product_slug,
"filename": filename,
}
)

return json.dumps(results)

# Notify Tasks
Expand Down
2 changes: 1 addition & 1 deletion dags/cumulus/prism_revision.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def cumulus_prism_revision():
@task()
def download_raw_prism_revision(short_name='ppt'):
product_slug = f"prism-{short_name}-early"
logical_date = get_current_context()["logical_date"]
logical_date = get_current_context()["logical_date"]-timedelta(hours=24)
execution_date = logical_date.date()
start_date = execution_date - timedelta(days=DAYS_BACK)
results = []
Expand Down
14 changes: 8 additions & 6 deletions dags/wmes/wmes_daily_jobs.py → dags/wmes/cwms_daily_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,17 @@
max_active_runs=1,
max_active_tasks=4,
)
def wmes_daily_jobs():
def cwms_daily_jobs():
job_configs = [
{"office": "lrc", "office_group": "lrd", "enabled": False},
{"office": "lre", "office_group": "lrd", "enabled": False},
{"office": "lrh", "office_group": "lrd", "enabled": False},
{"office": "lrl", "office_group": "lrd", "enabled": False},
{"office": "lrn", "office_group": "lrd", "enabled": False},
{"office": "lrp", "office_group": "lrd", "enabled": False},
{"office": "swt", "office_group": "swd",
{
"office": "swt",
"office_group": "swd",
"github_branch": "cwbi-restructure",
"enabled": True,
},
Expand All @@ -62,15 +64,15 @@ def launch_batch(job_config):

logical_date = get_current_context()["logical_date"]
dag = DagContext.get_current_dag()
job_name = f"wmes-{job_config['office']}-daily-job-{logical_date.strftime('%Y%m%d-%H%M')}"
job_name = f"cwms-{job_config['office']}-daily-job-{logical_date.strftime('%Y%m%d-%H%M')}"
return batch.batch_operator(
dag=dag,
task_id=job_name,
deferrable=True,
container_overrides={},
job_name=job_name,
job_queue=f"wmes-{job_config['office_group']}-jq",
job_definition=f"wmes-{job_config['office']}-jobs-jobdef",
job_queue=f"cwms-{job_config['office_group']}-jq",
job_definition=f"cwms-{job_config['office']}-jobs-jobdef",
local_command=[], # local docker mock only
tags={"Office": job_config["office"]},
).execute({})
Expand All @@ -79,4 +81,4 @@ def launch_batch(job_config):
launch_batch.override(task_id=f"{jc['office']}-jobs")(jc)


wmes_jobs_dag = wmes_daily_jobs()
cwms_jobs_dag = cwms_daily_jobs()
Loading
Loading