Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 62 additions & 26 deletions dags/cumulus/aprfc_qpf_06h.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,75 +2,111 @@
Acquire and Process APRFC QPF 06h
"""

import json
from datetime import datetime, timedelta, timezone
import calendar
from bs4 import BeautifulSoup
import json
import re
import requests
from datetime import datetime, timedelta, timezone

import helpers.cumulus as cumulus
import requests
from airflow import DAG
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from helpers.downloads import trigger_download

import helpers.cumulus as cumulus
from bs4 import BeautifulSoup
from helpers.downloads import s3_file_exists, trigger_download

# Default arguments
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": (datetime.now(timezone.utc) - timedelta(hours=36)).replace(minute=0, second=0),
"start_date": (datetime.now(timezone.utc) - timedelta(hours=36)).replace(
minute=0, second=0
),
"catchup_by_default": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 6,
"retry_delay": timedelta(minutes=30),
}


def get_latest_files(filenames):
# Dictionary to store the latest file for each unique timestamp
latest_files = {}

# Regular expression to extract the timestamp
pattern = r'qpf06f_has_\d+f_(\d{8}_\d{2})_awips_(\d+)'


# Corrected regular expression
pattern = r"^qpf06f_has_.*_awips_(\d{12})_(\d{10}f\d{3})\.\d+\.(grb|grb\.gz)$"
for filename in filenames:
match = re.search(pattern, filename)
if match:
key = match.group(1) + '_' + match.group(2)
# Combine the issue timestamp and forecast timestamp as the key
issue_timestamp = match.group(1)
forecast_timestamp = match.group(2)
key = issue_timestamp + "_" + forecast_timestamp
# Update the latest file for the key if it's not present or if the current filename is greater
if key not in latest_files or filename > latest_files[key]:
latest_files[key] = filename

# Return the list of latest files
return list(latest_files.values())


# ALR QPF filename generator
def get_filenames(edate, url):
"""
date at end of filename hour and min can not be predicted
scraping data from website and finding all matching filenames
for the sprcified date.
Scrape website, collect matching filenames, then:
- find the latest issuance timestamp (the 12-digit value after 'awips_')
- if latest issuance is within 36 hours of edate (Airflow logical_date), return only those files
- otherwise return an empty list (nothing to download)
"""
d_t1 = edate.strftime("%Y%m%d")


page = requests.get(url)
page = requests.get(url, timeout=30)
soup = BeautifulSoup(page.content, "html.parser")
links = [node.get("href") for node in soup.find_all("a")]
filenames = []
regex = f"^qpf06f_has_\\d+f_\\d{{8}}_\\d{{2}}_awips_{d_t1}.*\\.grb(\\.gz)?$"
filenames = [link for link in links if re.match(regex, link)]

return get_latest_files(filenames)
regex = r"^qpf06f_has_.*_awips_\d{12}_\d{10}f\d{3}\.\d+\.(grb|grb\.gz)$"
issue_re = re.compile(
r"_awips_(\d{12})"
) # Capture the issuance timestamp after 'awips_'

# Collect candidates with their issuance timestamp
candidates = []
for link in links:
if link and re.match(regex, link):
m = issue_re.search(link)
if m:
candidates.append((link, m.group(1)))

# No matching files
if not candidates:
return []

# Find the latest issuance
latest_issue_str = max(issue for _, issue in candidates)

# Convert to an aware UTC datetime for comparison against Airflow's logical_date
latest_issue_dt = datetime.strptime(latest_issue_str, "%Y%m%d%H%M").replace(
tzinfo=timezone.utc
)

# Only proceed if latest issuance is in the past and within the last 36 hours relative to edate (logical_date)
if latest_issue_dt > edate or (edate - latest_issue_dt) > timedelta(hours=36):
# Outside the 36-hour window or in the future: do not download anything
return []

# Filter to only files from the latest issuance
latest_issue_files = [fn for fn, issue in candidates if issue == latest_issue_str]

# Deduplicate within that issuance set by issue+forecast key as your existing logic defines
return get_latest_files(latest_issue_files)


@dag(
default_args=default_args,
schedule="20 9,15,19 * * *",
schedule="25 * * * *",
tags=["cumulus", "precip", "QPF", "APRFC"],
max_active_runs=1,
max_active_tasks=1,
catchup=False, # Disable backfills
)
def cumulus_aprfc_qpf_06h():
"""This pipeline handles download, processing, and derivative product creation for \n
Expand Down
86 changes: 60 additions & 26 deletions dags/cumulus/aprfc_qtf_01h.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,80 +2,114 @@
Acquire and Process APRFC qtf 01h
"""

import json
from datetime import datetime, timedelta
import calendar
from bs4 import BeautifulSoup
import json
import re
import requests
from datetime import datetime, timedelta, timezone

import helpers.cumulus as cumulus
import requests
from airflow import DAG
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from helpers.downloads import trigger_download

import helpers.cumulus as cumulus
from bs4 import BeautifulSoup
from helpers.downloads import s3_file_exists, trigger_download

# Default arguments
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": (datetime.utcnow() - timedelta(hours=36)).replace(minute=0, second=0),
"start_date": (datetime.now(timezone.utc) - timedelta(hours=36)).replace(
minute=0, second=0
),
"catchup_by_default": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 6,
"retry_delay": timedelta(minutes=30),
}


def get_latest_files(filenames):
# Dictionary to store the latest file for each unique timestamp
latest_files = {}
# Regular expression to extract the timestamp
pattern = r'ta01f_has_\d+f_(\d{8}_\d{2})_awips_(\d+)'

# Updated regular expression to extract the issue timestamp and forecast timestamp
pattern = r"^ta01.*_awips_(\d{12})_(\d{10}f\d{3})"

for filename in filenames:
match = re.search(pattern, filename)
if match:
key = match.group(1) + '_' + match.group(2)
# Combine the issue timestamp and forecast timestamp as the key
issue_timestamp = match.group(1)
forecast_timestamp = match.group(2)
key = issue_timestamp + "_" + forecast_timestamp
# Update the latest file for the key if it's not present or if the current filename is greater
if key not in latest_files or filename > latest_files[key]:
latest_files[key] = filename

# Return the list of latest files
return list(latest_files.values())


# APRFC qtf filename generator
def get_filenames(edate, url):
"""
date at end of filename hour and min can not be predicted
scraping data from website and finding all matching filenames
for the specified date.
Scrape website, collect matching filenames, then:
- find the latest issuance timestamp (the 12-digit value after 'awips_')
- if latest issuance is within 36 hours of edate (Airflow logical_date), return only those files
- otherwise return an empty list (nothing to download)
"""
d_t1 = edate.strftime("%Y%m%d")

page = requests.get(url)
# Fetch page and extract links
page = requests.get(url, timeout=30)
soup = BeautifulSoup(page.content, "html.parser")
links = [node.get("href") for node in soup.find_all("a")]
filenames = []


regex = f"^ta01f_has_\\d+f_\\d{{8}}_\\d{{2}}_awips_{d_t1}.*\\.grb(\\.gz)?$"
filenames = [link for link in links if re.match(regex, link)]
# Match the target files; allows optional integer version before .grb and optional .gz
regex = r"^ta01.*_awips_\d{12}_\d{10}f\d{3}(?:\.\d+)?\.grb(\.gz)?$"
issue_re = re.compile(
r"_awips_(\d{12})"
) # Capture the issuance timestamp after 'awips_'

# Collect candidates with their issuance timestamp
candidates = []
for link in links:
if link and re.match(regex, link):
m = issue_re.search(link)
if m:
candidates.append((link, m.group(1)))

# No matching files
if not candidates:
return []

# Find the latest issuance
latest_issue_str = max(issue for _, issue in candidates)

# Convert to an aware UTC datetime for comparison against Airflow's logical_date
latest_issue_dt = datetime.strptime(latest_issue_str, "%Y%m%d%H%M").replace(
tzinfo=timezone.utc
)

# Only proceed if latest issuance is in the past and within the last 36 hours relative to edate (logical_date)
if latest_issue_dt > edate or (edate - latest_issue_dt) > timedelta(hours=36):
# Outside the 36-hour window or in the future: do not download anything
return []

return get_latest_files(filenames)
# Filter to only files from the latest issuance
latest_issue_files = [fn for fn, issue in candidates if issue == latest_issue_str]

# Deduplicate within that issuance set by issue+forecast key as your existing logic defines
return get_latest_files(latest_issue_files)


@dag(
default_args=default_args,
schedule="21 9,15,19 * * *",
schedule="25 * * * *",
tags=["cumulus", "temp", "QTF", "APRFC"],
max_active_runs=1,
max_active_tasks=1,
catchup=False, # Disable backfills
)
def cumulus_aprfc_qtf_01h():
"""This pipeline handles download, processing, and derivative product creation for \n
Expand Down