diff --git a/_docker/gitsync/Dockerfile b/_docker/gitsync/Dockerfile index 8a35b61..d1ec1c1 100644 --- a/_docker/gitsync/Dockerfile +++ b/_docker/gitsync/Dockerfile @@ -1,5 +1,5 @@ # BUILD GO BINARIES -FROM alpine:3.22.1 +FROM alpine:3.22.2 # Create User; https://github.com/mhart/alpine-node/issues/48 # NOTE: Using same UID/GID as airflow user in container for shared volume convenience diff --git a/dags/cumulus/aprfc_qpf_06h.py b/dags/cumulus/aprfc_qpf_06h.py index f98eb84..99f319c 100644 --- a/dags/cumulus/aprfc_qpf_06h.py +++ b/dags/cumulus/aprfc_qpf_06h.py @@ -2,19 +2,18 @@ Acquire and Process APRFC QPF 06h """ -import json -from datetime import datetime, timedelta, timezone import calendar -from bs4 import BeautifulSoup +import json import re -import requests +from datetime import datetime, timedelta, timezone +import helpers.cumulus as cumulus +import requests from airflow import DAG from airflow.decorators import dag, task from airflow.operators.python import get_current_context -from helpers.downloads import trigger_download - -import helpers.cumulus as cumulus +from bs4 import BeautifulSoup +from helpers.downloads import s3_file_exists, trigger_download # Default arguments default_args = { @@ -35,13 +34,16 @@ def get_latest_files(filenames): # Dictionary to store the latest file for each unique timestamp latest_files = {} - # Regular expression to extract the timestamp - pattern = r"qpf06f_has_\d+f_(\d{8}_\d{2})_awips_(\d+)" - + # Corrected regular expression + pattern = r"^qpf06f_has_.*_awips_(\d{12})_(\d{10}f\d{3})\.\d+\.(grb|grb\.gz)$" for filename in filenames: match = re.search(pattern, filename) if match: - key = match.group(1) + "_" + match.group(2) + # Combine the issue timestamp and forecast timestamp as the key + issue_timestamp = match.group(1) + forecast_timestamp = match.group(2) + key = issue_timestamp + "_" + forecast_timestamp + # Update the latest file for the key if it's not present or if the current filename is greater if key not in latest_files or filename > latest_files[key]: latest_files[key] = filename @@ -52,28 +54,59 @@ def get_latest_files(filenames): # ALR QPF filename generator def get_filenames(edate, url): """ - date at end of filename hour and min can not be predicted - scraping data from website and finding all matching filenames - for the sprcified date. + Scrape website, collect matching filenames, then: + - find the latest issuance timestamp (the 12-digit value after 'awips_') + - if latest issuance is within 36 hours of edate (Airflow logical_date), return only those files + - otherwise return an empty list (nothing to download) """ - d_t1 = edate.strftime("%Y%m%d") - - page = requests.get(url) + page = requests.get(url, timeout=30) soup = BeautifulSoup(page.content, "html.parser") links = [node.get("href") for node in soup.find_all("a")] - filenames = [] - regex = f"^qpf06f_has_\\d+f_\\d{{8}}_\\d{{2}}_awips_{d_t1}.*\\.grb(\\.gz)?$" - filenames = [link for link in links if re.match(regex, link)] - return get_latest_files(filenames) + regex = r"^qpf06f_has_.*_awips_\d{12}_\d{10}f\d{3}\.\d+\.(grb|grb\.gz)$" + issue_re = re.compile( + r"_awips_(\d{12})" + ) # Capture the issuance timestamp after 'awips_' + + # Collect candidates with their issuance timestamp + candidates = [] + for link in links: + if link and re.match(regex, link): + m = issue_re.search(link) + if m: + candidates.append((link, m.group(1))) + + # No matching files + if not candidates: + return [] + + # Find the latest issuance + latest_issue_str = max(issue for _, issue in candidates) + + # Convert to an aware UTC datetime for comparison against Airflow's logical_date + latest_issue_dt = datetime.strptime(latest_issue_str, "%Y%m%d%H%M").replace( + tzinfo=timezone.utc + ) + + # Only proceed if latest issuance is in the past and within the last 36 hours relative to edate (logical_date) + if latest_issue_dt > edate or (edate - latest_issue_dt) > timedelta(hours=36): + # Outside the 36-hour window or in the future: do not download anything + return [] + + # Filter to only files from the latest issuance + latest_issue_files = [fn for fn, issue in candidates if issue == latest_issue_str] + + # Deduplicate within that issuance set by issue+forecast key as your existing logic defines + return get_latest_files(latest_issue_files) @dag( default_args=default_args, - schedule="20 9,15,19 * * *", + schedule="25 * * * *", tags=["cumulus", "precip", "QPF", "APRFC"], max_active_runs=1, max_active_tasks=1, + catchup=False, # Disable backfills ) def cumulus_aprfc_qpf_06h(): """This pipeline handles download, processing, and derivative product creation for \n @@ -94,6 +127,10 @@ def download_raw_qpf(): for filename in filenames: url = f"{URL_ROOT}/{filename}" s3_key = f"{key_prefix}/{PRODUCT_SLUG}/{filename}" + # Check if the file already exists in S3 + if s3_file_exists(cumulus.S3_BUCKET, s3_key): + print(f"Skipping existing S3 object: s3://{cumulus.S3_BUCKET}/{s3_key}") + continue # Skip to the next file print(f"Downloading file: {filename}") try: trigger_download(url=url, s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key) diff --git a/dags/cumulus/aprfc_qtf_01h.py b/dags/cumulus/aprfc_qtf_01h.py index 077d5ed..59c87f3 100644 --- a/dags/cumulus/aprfc_qtf_01h.py +++ b/dags/cumulus/aprfc_qtf_01h.py @@ -2,19 +2,18 @@ Acquire and Process APRFC qtf 01h """ -import json -from datetime import datetime, timedelta, timezone import calendar -from bs4 import BeautifulSoup +import json import re -import requests +from datetime import datetime, timedelta, timezone +import helpers.cumulus as cumulus +import requests from airflow import DAG from airflow.decorators import dag, task from airflow.operators.python import get_current_context -from helpers.downloads import trigger_download - -import helpers.cumulus as cumulus +from bs4 import BeautifulSoup +from helpers.downloads import s3_file_exists, trigger_download # Default arguments default_args = { @@ -35,13 +34,17 @@ def get_latest_files(filenames): # Dictionary to store the latest file for each unique timestamp latest_files = {} - # Regular expression to extract the timestamp - pattern = r"ta01f_has_\d+f_(\d{8}_\d{2})_awips_(\d+)" + # Updated regular expression to extract the issue timestamp and forecast timestamp + pattern = r"^ta01.*_awips_(\d{12})_(\d{10}f\d{3})" for filename in filenames: match = re.search(pattern, filename) if match: - key = match.group(1) + "_" + match.group(2) + # Combine the issue timestamp and forecast timestamp as the key + issue_timestamp = match.group(1) + forecast_timestamp = match.group(2) + key = issue_timestamp + "_" + forecast_timestamp + # Update the latest file for the key if it's not present or if the current filename is greater if key not in latest_files or filename > latest_files[key]: latest_files[key] = filename @@ -52,29 +55,61 @@ def get_latest_files(filenames): # APRFC qtf filename generator def get_filenames(edate, url): """ - date at end of filename hour and min can not be predicted - scraping data from website and finding all matching filenames - for the specified date. + Scrape website, collect matching filenames, then: + - find the latest issuance timestamp (the 12-digit value after 'awips_') + - if latest issuance is within 36 hours of edate (Airflow logical_date), return only those files + - otherwise return an empty list (nothing to download) """ - d_t1 = edate.strftime("%Y%m%d") - - page = requests.get(url) + # Fetch page and extract links + page = requests.get(url, timeout=30) soup = BeautifulSoup(page.content, "html.parser") links = [node.get("href") for node in soup.find_all("a")] - filenames = [] - regex = f"^ta01f_has_\\d+f_\\d{{8}}_\\d{{2}}_awips_{d_t1}.*\\.grb(\\.gz)?$" - filenames = [link for link in links if re.match(regex, link)] + # Match the target files; allows optional integer version before .grb and optional .gz + regex = r"^ta01.*_awips_\d{12}_\d{10}f\d{3}(?:\.\d+)?\.grb(\.gz)?$" + issue_re = re.compile( + r"_awips_(\d{12})" + ) # Capture the issuance timestamp after 'awips_' + + # Collect candidates with their issuance timestamp + candidates = [] + for link in links: + if link and re.match(regex, link): + m = issue_re.search(link) + if m: + candidates.append((link, m.group(1))) + + # No matching files + if not candidates: + return [] + + # Find the latest issuance + latest_issue_str = max(issue for _, issue in candidates) + + # Convert to an aware UTC datetime for comparison against Airflow's logical_date + latest_issue_dt = datetime.strptime(latest_issue_str, "%Y%m%d%H%M").replace( + tzinfo=timezone.utc + ) + + # Only proceed if latest issuance is in the past and within the last 36 hours relative to edate (logical_date) + if latest_issue_dt > edate or (edate - latest_issue_dt) > timedelta(hours=36): + # Outside the 36-hour window or in the future: do not download anything + return [] + + # Filter to only files from the latest issuance + latest_issue_files = [fn for fn, issue in candidates if issue == latest_issue_str] - return get_latest_files(filenames) + # Deduplicate within that issuance set by issue+forecast key as your existing logic defines + return get_latest_files(latest_issue_files) @dag( default_args=default_args, - schedule="21 9,15,19 * * *", + schedule="25 * * * *", tags=["cumulus", "temp", "QTF", "APRFC"], max_active_runs=1, max_active_tasks=1, + catchup=False, # Disable backfills ) def cumulus_aprfc_qtf_01h(): """This pipeline handles download, processing, and derivative product creation for \n @@ -95,6 +130,10 @@ def download_raw_qtf(): for filename in filenames: url = f"{URL_ROOT}/{filename}" s3_key = f"{key_prefix}/{PRODUCT_SLUG}/{filename}" + # Check if the file already exists in S3 + if s3_file_exists(cumulus.S3_BUCKET, s3_key): + print(f"Skipping existing S3 object: s3://{cumulus.S3_BUCKET}/{s3_key}") + continue # Skip to the next file print(f"Downloading file: {filename}") try: trigger_download(url=url, s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key) diff --git a/dags/wmes/wmes_daily_jobs.py b/dags/wmes/wmes_daily_jobs.py index ce720d8..0be7129 100644 --- a/dags/wmes/wmes_daily_jobs.py +++ b/dags/wmes/wmes_daily_jobs.py @@ -36,12 +36,11 @@ def wmes_daily_jobs(): {"office": "lrl", "office_group": "lrd", "enabled": False}, {"office": "lrn", "office_group": "lrd", "enabled": False}, {"office": "lrp", "office_group": "lrd", "enabled": False}, - { - "office": "swt", - "office_group": "swd", + {"office": "swt", "office_group": "swd", "github_branch": "cwbi-restructure", "enabled": True, }, + {"office": "spk", "office_group": "spd", "enabled": True}, ] # Organize configs by office_group diff --git a/dags/wmes/wmes_hourly_jobs.py b/dags/wmes/wmes_hourly_jobs.py index 7bb2ea8..dae705d 100644 --- a/dags/wmes/wmes_hourly_jobs.py +++ b/dags/wmes/wmes_hourly_jobs.py @@ -48,7 +48,7 @@ def wmes_hourly_jobs(): { "office": "lrl", "office_group": "lrd", - "enabled": False, + "enabled": True, }, { "office": "lrn", @@ -64,6 +64,11 @@ def wmes_hourly_jobs(): "office": "swt", "office_group": "swd", "enabled": True, + }, + { + "office": "spk", + "office_group": "spd", + "enabled": True, }, ]