diff --git a/dags/cumulus/aprfc_qpf_06h.py b/dags/cumulus/aprfc_qpf_06h.py index 574c706..12712e5 100644 --- a/dags/cumulus/aprfc_qpf_06h.py +++ b/dags/cumulus/aprfc_qpf_06h.py @@ -2,25 +2,26 @@ Acquire and Process APRFC QPF 06h """ -import json -from datetime import datetime, timedelta, timezone import calendar -from bs4 import BeautifulSoup +import json import re -import requests +from datetime import datetime, timedelta, timezone +import helpers.cumulus as cumulus +import requests from airflow import DAG from airflow.decorators import dag, task from airflow.operators.python import get_current_context -from helpers.downloads import trigger_download - -import helpers.cumulus as cumulus +from bs4 import BeautifulSoup +from helpers.downloads import s3_file_exists, trigger_download # Default arguments default_args = { "owner": "airflow", "depends_on_past": False, - "start_date": (datetime.now(timezone.utc) - timedelta(hours=36)).replace(minute=0, second=0), + "start_date": (datetime.now(timezone.utc) - timedelta(hours=36)).replace( + minute=0, second=0 + ), "catchup_by_default": False, "email_on_failure": False, "email_on_retry": False, @@ -28,49 +29,84 @@ "retry_delay": timedelta(minutes=30), } + def get_latest_files(filenames): # Dictionary to store the latest file for each unique timestamp latest_files = {} - - # Regular expression to extract the timestamp - pattern = r'qpf06f_has_\d+f_(\d{8}_\d{2})_awips_(\d+)' - + + # Corrected regular expression + pattern = r"^qpf06f_has_.*_awips_(\d{12})_(\d{10}f\d{3})\.\d+\.(grb|grb\.gz)$" for filename in filenames: match = re.search(pattern, filename) if match: - key = match.group(1) + '_' + match.group(2) + # Combine the issue timestamp and forecast timestamp as the key + issue_timestamp = match.group(1) + forecast_timestamp = match.group(2) + key = issue_timestamp + "_" + forecast_timestamp + # Update the latest file for the key if it's not present or if the current filename is greater if key not in latest_files or filename > latest_files[key]: latest_files[key] = filename - + # Return the list of latest files return list(latest_files.values()) + # ALR QPF filename generator def get_filenames(edate, url): """ - date at end of filename hour and min can not be predicted - scraping data from website and finding all matching filenames - for the sprcified date. + Scrape website, collect matching filenames, then: + - find the latest issuance timestamp (the 12-digit value after 'awips_') + - if latest issuance is within 36 hours of edate (Airflow logical_date), return only those files + - otherwise return an empty list (nothing to download) """ - d_t1 = edate.strftime("%Y%m%d") - - - page = requests.get(url) + page = requests.get(url, timeout=30) soup = BeautifulSoup(page.content, "html.parser") links = [node.get("href") for node in soup.find_all("a")] - filenames = [] - regex = f"^qpf06f_has_\\d+f_\\d{{8}}_\\d{{2}}_awips_{d_t1}.*\\.grb(\\.gz)?$" - filenames = [link for link in links if re.match(regex, link)] - return get_latest_files(filenames) + regex = r"^qpf06f_has_.*_awips_\d{12}_\d{10}f\d{3}\.\d+\.(grb|grb\.gz)$" + issue_re = re.compile( + r"_awips_(\d{12})" + ) # Capture the issuance timestamp after 'awips_' + + # Collect candidates with their issuance timestamp + candidates = [] + for link in links: + if link and re.match(regex, link): + m = issue_re.search(link) + if m: + candidates.append((link, m.group(1))) + + # No matching files + if not candidates: + return [] + + # Find the latest issuance + latest_issue_str = max(issue for _, issue in candidates) + + # Convert to an aware UTC datetime for comparison against Airflow's logical_date + latest_issue_dt = datetime.strptime(latest_issue_str, "%Y%m%d%H%M").replace( + tzinfo=timezone.utc + ) + + # Only proceed if latest issuance is in the past and within the last 36 hours relative to edate (logical_date) + if latest_issue_dt > edate or (edate - latest_issue_dt) > timedelta(hours=36): + # Outside the 36-hour window or in the future: do not download anything + return [] + + # Filter to only files from the latest issuance + latest_issue_files = [fn for fn, issue in candidates if issue == latest_issue_str] + + # Deduplicate within that issuance set by issue+forecast key as your existing logic defines + return get_latest_files(latest_issue_files) @dag( default_args=default_args, - schedule="20 9,15,19 * * *", + schedule="25 * * * *", tags=["cumulus", "precip", "QPF", "APRFC"], max_active_runs=1, max_active_tasks=1, + catchup=False, # Disable backfills ) def cumulus_aprfc_qpf_06h(): """This pipeline handles download, processing, and derivative product creation for \n diff --git a/dags/cumulus/aprfc_qtf_01h.py b/dags/cumulus/aprfc_qtf_01h.py index 54a9485..0a6e4e5 100644 --- a/dags/cumulus/aprfc_qtf_01h.py +++ b/dags/cumulus/aprfc_qtf_01h.py @@ -2,25 +2,26 @@ Acquire and Process APRFC qtf 01h """ -import json -from datetime import datetime, timedelta import calendar -from bs4 import BeautifulSoup +import json import re -import requests +from datetime import datetime, timedelta, timezone +import helpers.cumulus as cumulus +import requests from airflow import DAG from airflow.decorators import dag, task from airflow.operators.python import get_current_context -from helpers.downloads import trigger_download - -import helpers.cumulus as cumulus +from bs4 import BeautifulSoup +from helpers.downloads import s3_file_exists, trigger_download # Default arguments default_args = { "owner": "airflow", "depends_on_past": False, - "start_date": (datetime.utcnow() - timedelta(hours=36)).replace(minute=0, second=0), + "start_date": (datetime.now(timezone.utc) - timedelta(hours=36)).replace( + minute=0, second=0 + ), "catchup_by_default": False, "email_on_failure": False, "email_on_retry": False, @@ -28,54 +29,87 @@ "retry_delay": timedelta(minutes=30), } + def get_latest_files(filenames): # Dictionary to store the latest file for each unique timestamp latest_files = {} - - # Regular expression to extract the timestamp - pattern = r'ta01f_has_\d+f_(\d{8}_\d{2})_awips_(\d+)' - + + # Updated regular expression to extract the issue timestamp and forecast timestamp + pattern = r"^ta01.*_awips_(\d{12})_(\d{10}f\d{3})" + for filename in filenames: match = re.search(pattern, filename) if match: - key = match.group(1) + '_' + match.group(2) + # Combine the issue timestamp and forecast timestamp as the key + issue_timestamp = match.group(1) + forecast_timestamp = match.group(2) + key = issue_timestamp + "_" + forecast_timestamp + # Update the latest file for the key if it's not present or if the current filename is greater if key not in latest_files or filename > latest_files[key]: latest_files[key] = filename - + # Return the list of latest files return list(latest_files.values()) + # APRFC qtf filename generator def get_filenames(edate, url): """ - date at end of filename hour and min can not be predicted - scraping data from website and finding all matching filenames - for the specified date. + Scrape website, collect matching filenames, then: + - find the latest issuance timestamp (the 12-digit value after 'awips_') + - if latest issuance is within 36 hours of edate (Airflow logical_date), return only those files + - otherwise return an empty list (nothing to download) """ - d_t1 = edate.strftime("%Y%m%d") - - page = requests.get(url) + # Fetch page and extract links + page = requests.get(url, timeout=30) soup = BeautifulSoup(page.content, "html.parser") links = [node.get("href") for node in soup.find_all("a")] - filenames = [] - - regex = f"^ta01f_has_\\d+f_\\d{{8}}_\\d{{2}}_awips_{d_t1}.*\\.grb(\\.gz)?$" - filenames = [link for link in links if re.match(regex, link)] + # Match the target files; allows optional integer version before .grb and optional .gz + regex = r"^ta01.*_awips_\d{12}_\d{10}f\d{3}(?:\.\d+)?\.grb(\.gz)?$" + issue_re = re.compile( + r"_awips_(\d{12})" + ) # Capture the issuance timestamp after 'awips_' + + # Collect candidates with their issuance timestamp + candidates = [] + for link in links: + if link and re.match(regex, link): + m = issue_re.search(link) + if m: + candidates.append((link, m.group(1))) + + # No matching files + if not candidates: + return [] + # Find the latest issuance + latest_issue_str = max(issue for _, issue in candidates) + # Convert to an aware UTC datetime for comparison against Airflow's logical_date + latest_issue_dt = datetime.strptime(latest_issue_str, "%Y%m%d%H%M").replace( + tzinfo=timezone.utc + ) + # Only proceed if latest issuance is in the past and within the last 36 hours relative to edate (logical_date) + if latest_issue_dt > edate or (edate - latest_issue_dt) > timedelta(hours=36): + # Outside the 36-hour window or in the future: do not download anything + return [] - return get_latest_files(filenames) + # Filter to only files from the latest issuance + latest_issue_files = [fn for fn, issue in candidates if issue == latest_issue_str] + # Deduplicate within that issuance set by issue+forecast key as your existing logic defines + return get_latest_files(latest_issue_files) @dag( default_args=default_args, - schedule="21 9,15,19 * * *", + schedule="25 * * * *", tags=["cumulus", "temp", "QTF", "APRFC"], max_active_runs=1, max_active_tasks=1, + catchup=False, # Disable backfills ) def cumulus_aprfc_qtf_01h(): """This pipeline handles download, processing, and derivative product creation for \n