Skip to content
Merged
2 changes: 1 addition & 1 deletion _docker/gitsync/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# BUILD GO BINARIES
FROM alpine:3.22.1
FROM alpine:3.22.2

# Create User; https://github.com/mhart/alpine-node/issues/48
# NOTE: Using same UID/GID as airflow user in container for shared volume convenience
Expand Down
81 changes: 59 additions & 22 deletions dags/cumulus/aprfc_qpf_06h.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,18 @@
Acquire and Process APRFC QPF 06h
"""

import json
from datetime import datetime, timedelta, timezone
import calendar
from bs4 import BeautifulSoup
import json
import re
import requests
from datetime import datetime, timedelta, timezone

import helpers.cumulus as cumulus
import requests
from airflow import DAG
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from helpers.downloads import trigger_download

import helpers.cumulus as cumulus
from bs4 import BeautifulSoup
from helpers.downloads import s3_file_exists, trigger_download

# Default arguments
default_args = {
Expand All @@ -35,13 +34,16 @@ def get_latest_files(filenames):
# Dictionary to store the latest file for each unique timestamp
latest_files = {}

# Regular expression to extract the timestamp
pattern = r"qpf06f_has_\d+f_(\d{8}_\d{2})_awips_(\d+)"

# Corrected regular expression
pattern = r"^qpf06f_has_.*_awips_(\d{12})_(\d{10}f\d{3})\.\d+\.(grb|grb\.gz)$"
for filename in filenames:
match = re.search(pattern, filename)
if match:
key = match.group(1) + "_" + match.group(2)
# Combine the issue timestamp and forecast timestamp as the key
issue_timestamp = match.group(1)
forecast_timestamp = match.group(2)
key = issue_timestamp + "_" + forecast_timestamp
# Update the latest file for the key if it's not present or if the current filename is greater
if key not in latest_files or filename > latest_files[key]:
latest_files[key] = filename

Expand All @@ -52,28 +54,59 @@ def get_latest_files(filenames):
# ALR QPF filename generator
def get_filenames(edate, url):
"""
date at end of filename hour and min can not be predicted
scraping data from website and finding all matching filenames
for the sprcified date.
Scrape website, collect matching filenames, then:
- find the latest issuance timestamp (the 12-digit value after 'awips_')
- if latest issuance is within 36 hours of edate (Airflow logical_date), return only those files
- otherwise return an empty list (nothing to download)
"""
d_t1 = edate.strftime("%Y%m%d")

page = requests.get(url)
page = requests.get(url, timeout=30)
soup = BeautifulSoup(page.content, "html.parser")
links = [node.get("href") for node in soup.find_all("a")]
filenames = []
regex = f"^qpf06f_has_\\d+f_\\d{{8}}_\\d{{2}}_awips_{d_t1}.*\\.grb(\\.gz)?$"
filenames = [link for link in links if re.match(regex, link)]

return get_latest_files(filenames)
regex = r"^qpf06f_has_.*_awips_\d{12}_\d{10}f\d{3}\.\d+\.(grb|grb\.gz)$"
issue_re = re.compile(
r"_awips_(\d{12})"
) # Capture the issuance timestamp after 'awips_'

# Collect candidates with their issuance timestamp
candidates = []
for link in links:
if link and re.match(regex, link):
m = issue_re.search(link)
if m:
candidates.append((link, m.group(1)))

# No matching files
if not candidates:
return []

# Find the latest issuance
latest_issue_str = max(issue for _, issue in candidates)

# Convert to an aware UTC datetime for comparison against Airflow's logical_date
latest_issue_dt = datetime.strptime(latest_issue_str, "%Y%m%d%H%M").replace(
tzinfo=timezone.utc
)

# Only proceed if latest issuance is in the past and within the last 36 hours relative to edate (logical_date)
if latest_issue_dt > edate or (edate - latest_issue_dt) > timedelta(hours=36):
# Outside the 36-hour window or in the future: do not download anything
return []

# Filter to only files from the latest issuance
latest_issue_files = [fn for fn, issue in candidates if issue == latest_issue_str]

# Deduplicate within that issuance set by issue+forecast key as your existing logic defines
return get_latest_files(latest_issue_files)


@dag(
default_args=default_args,
schedule="20 9,15,19 * * *",
schedule="25 * * * *",
tags=["cumulus", "precip", "QPF", "APRFC"],
max_active_runs=1,
max_active_tasks=1,
catchup=False, # Disable backfills
)
def cumulus_aprfc_qpf_06h():
"""This pipeline handles download, processing, and derivative product creation for \n
Expand All @@ -94,6 +127,10 @@ def download_raw_qpf():
for filename in filenames:
url = f"{URL_ROOT}/{filename}"
s3_key = f"{key_prefix}/{PRODUCT_SLUG}/{filename}"
# Check if the file already exists in S3
if s3_file_exists(cumulus.S3_BUCKET, s3_key):
print(f"Skipping existing S3 object: s3://{cumulus.S3_BUCKET}/{s3_key}")
continue # Skip to the next file
print(f"Downloading file: {filename}")
try:
trigger_download(url=url, s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key)
Expand Down
81 changes: 60 additions & 21 deletions dags/cumulus/aprfc_qtf_01h.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,18 @@
Acquire and Process APRFC qtf 01h
"""

import json
from datetime import datetime, timedelta, timezone
import calendar
from bs4 import BeautifulSoup
import json
import re
import requests
from datetime import datetime, timedelta, timezone

import helpers.cumulus as cumulus
import requests
from airflow import DAG
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from helpers.downloads import trigger_download

import helpers.cumulus as cumulus
from bs4 import BeautifulSoup
from helpers.downloads import s3_file_exists, trigger_download

# Default arguments
default_args = {
Expand All @@ -35,13 +34,17 @@ def get_latest_files(filenames):
# Dictionary to store the latest file for each unique timestamp
latest_files = {}

# Regular expression to extract the timestamp
pattern = r"ta01f_has_\d+f_(\d{8}_\d{2})_awips_(\d+)"
# Updated regular expression to extract the issue timestamp and forecast timestamp
pattern = r"^ta01.*_awips_(\d{12})_(\d{10}f\d{3})"

for filename in filenames:
match = re.search(pattern, filename)
if match:
key = match.group(1) + "_" + match.group(2)
# Combine the issue timestamp and forecast timestamp as the key
issue_timestamp = match.group(1)
forecast_timestamp = match.group(2)
key = issue_timestamp + "_" + forecast_timestamp
# Update the latest file for the key if it's not present or if the current filename is greater
if key not in latest_files or filename > latest_files[key]:
latest_files[key] = filename

Expand All @@ -52,29 +55,61 @@ def get_latest_files(filenames):
# APRFC qtf filename generator
def get_filenames(edate, url):
"""
date at end of filename hour and min can not be predicted
scraping data from website and finding all matching filenames
for the specified date.
Scrape website, collect matching filenames, then:
- find the latest issuance timestamp (the 12-digit value after 'awips_')
- if latest issuance is within 36 hours of edate (Airflow logical_date), return only those files
- otherwise return an empty list (nothing to download)
"""
d_t1 = edate.strftime("%Y%m%d")

page = requests.get(url)
# Fetch page and extract links
page = requests.get(url, timeout=30)
soup = BeautifulSoup(page.content, "html.parser")
links = [node.get("href") for node in soup.find_all("a")]
filenames = []

regex = f"^ta01f_has_\\d+f_\\d{{8}}_\\d{{2}}_awips_{d_t1}.*\\.grb(\\.gz)?$"
filenames = [link for link in links if re.match(regex, link)]
# Match the target files; allows optional integer version before .grb and optional .gz
regex = r"^ta01.*_awips_\d{12}_\d{10}f\d{3}(?:\.\d+)?\.grb(\.gz)?$"
issue_re = re.compile(
r"_awips_(\d{12})"
) # Capture the issuance timestamp after 'awips_'

# Collect candidates with their issuance timestamp
candidates = []
for link in links:
if link and re.match(regex, link):
m = issue_re.search(link)
if m:
candidates.append((link, m.group(1)))

# No matching files
if not candidates:
return []

# Find the latest issuance
latest_issue_str = max(issue for _, issue in candidates)

# Convert to an aware UTC datetime for comparison against Airflow's logical_date
latest_issue_dt = datetime.strptime(latest_issue_str, "%Y%m%d%H%M").replace(
tzinfo=timezone.utc
)

# Only proceed if latest issuance is in the past and within the last 36 hours relative to edate (logical_date)
if latest_issue_dt > edate or (edate - latest_issue_dt) > timedelta(hours=36):
# Outside the 36-hour window or in the future: do not download anything
return []

# Filter to only files from the latest issuance
latest_issue_files = [fn for fn, issue in candidates if issue == latest_issue_str]

return get_latest_files(filenames)
# Deduplicate within that issuance set by issue+forecast key as your existing logic defines
return get_latest_files(latest_issue_files)


@dag(
default_args=default_args,
schedule="21 9,15,19 * * *",
schedule="25 * * * *",
tags=["cumulus", "temp", "QTF", "APRFC"],
max_active_runs=1,
max_active_tasks=1,
catchup=False, # Disable backfills
)
def cumulus_aprfc_qtf_01h():
"""This pipeline handles download, processing, and derivative product creation for \n
Expand All @@ -95,6 +130,10 @@ def download_raw_qtf():
for filename in filenames:
url = f"{URL_ROOT}/{filename}"
s3_key = f"{key_prefix}/{PRODUCT_SLUG}/{filename}"
# Check if the file already exists in S3
if s3_file_exists(cumulus.S3_BUCKET, s3_key):
print(f"Skipping existing S3 object: s3://{cumulus.S3_BUCKET}/{s3_key}")
continue # Skip to the next file
print(f"Downloading file: {filename}")
try:
trigger_download(url=url, s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key)
Expand Down
5 changes: 2 additions & 3 deletions dags/wmes/wmes_daily_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,11 @@ def wmes_daily_jobs():
{"office": "lrl", "office_group": "lrd", "enabled": False},
{"office": "lrn", "office_group": "lrd", "enabled": False},
{"office": "lrp", "office_group": "lrd", "enabled": False},
{
"office": "swt",
"office_group": "swd",
{"office": "swt", "office_group": "swd",
"github_branch": "cwbi-restructure",
"enabled": True,
},
{"office": "spk", "office_group": "spd", "enabled": True},
]

# Organize configs by office_group
Expand Down
7 changes: 6 additions & 1 deletion dags/wmes/wmes_hourly_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def wmes_hourly_jobs():
{
"office": "lrl",
"office_group": "lrd",
"enabled": False,
"enabled": True,
},
{
"office": "lrn",
Expand All @@ -64,6 +64,11 @@ def wmes_hourly_jobs():
"office": "swt",
"office_group": "swd",
"enabled": True,
},
{
"office": "spk",
"office_group": "spd",
"enabled": True,
},
]

Expand Down