diff --git a/dags/cumulus/aprfc_qpf_06h.py b/dags/cumulus/aprfc_qpf_06h.py index fc5822f..574c706 100644 --- a/dags/cumulus/aprfc_qpf_06h.py +++ b/dags/cumulus/aprfc_qpf_06h.py @@ -3,7 +3,7 @@ """ import json -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone import calendar from bs4 import BeautifulSoup import re @@ -20,7 +20,7 @@ default_args = { "owner": "airflow", "depends_on_past": False, - "start_date": (datetime.utcnow() - timedelta(hours=72)).replace(minute=0, second=0), + "start_date": (datetime.now(timezone.utc) - timedelta(hours=36)).replace(minute=0, second=0), "catchup_by_default": False, "email_on_failure": False, "email_on_retry": False, @@ -28,6 +28,22 @@ "retry_delay": timedelta(minutes=30), } +def get_latest_files(filenames): + # Dictionary to store the latest file for each unique timestamp + latest_files = {} + + # Regular expression to extract the timestamp + pattern = r'qpf06f_has_\d+f_(\d{8}_\d{2})_awips_(\d+)' + + for filename in filenames: + match = re.search(pattern, filename) + if match: + key = match.group(1) + '_' + match.group(2) + if key not in latest_files or filename > latest_files[key]: + latest_files[key] = filename + + # Return the list of latest files + return list(latest_files.values()) # ALR QPF filename generator def get_filenames(edate, url): @@ -37,25 +53,24 @@ def get_filenames(edate, url): for the sprcified date. """ d_t1 = edate.strftime("%Y%m%d") - d_t2 = (edate - timedelta(hours=24)).strftime("%Y%m%d") + page = requests.get(url) soup = BeautifulSoup(page.content, "html.parser") links = [node.get("href") for node in soup.find_all("a")] filenames = [] - for d in [d_t2, d_t1]: - regex = f"^qpf06f_has_.*.awips_{d}\d+.grb.gz$" - filenames = filenames + [link for link in links if re.match(regex, link)] + regex = f"^qpf06f_has_\\d+f_\\d{{8}}_\\d{{2}}_awips_{d_t1}.*\\.grb(\\.gz)?$" + filenames = [link for link in links if re.match(regex, link)] - return filenames + return get_latest_files(filenames) @dag( default_args=default_args, - schedule="40 14,5 * * *", + schedule="20 9,15,19 * * *", tags=["cumulus", "precip", "QPF", "APRFC"], - max_active_runs=2, - max_active_tasks=4, + max_active_runs=1, + max_active_tasks=1, ) def cumulus_aprfc_qpf_06h(): """This pipeline handles download, processing, and derivative product creation for \n @@ -105,4 +120,4 @@ def notify_cumulus(payload): notify_cumulus(download_raw_qpf()) -aprfc_qpe_dag = cumulus_aprfc_qpf_06h() +aprfc_qpf_dag = cumulus_aprfc_qpf_06h() diff --git a/dags/cumulus/aprfc_qtf_01h.py b/dags/cumulus/aprfc_qtf_01h.py new file mode 100644 index 0000000..54a9485 --- /dev/null +++ b/dags/cumulus/aprfc_qtf_01h.py @@ -0,0 +1,128 @@ +""" +Acquire and Process APRFC qtf 01h +""" + +import json +from datetime import datetime, timedelta +import calendar +from bs4 import BeautifulSoup +import re +import requests + +from airflow import DAG +from airflow.decorators import dag, task +from airflow.operators.python import get_current_context +from helpers.downloads import trigger_download + +import helpers.cumulus as cumulus + +# Default arguments +default_args = { + "owner": "airflow", + "depends_on_past": False, + "start_date": (datetime.utcnow() - timedelta(hours=36)).replace(minute=0, second=0), + "catchup_by_default": False, + "email_on_failure": False, + "email_on_retry": False, + "retries": 6, + "retry_delay": timedelta(minutes=30), +} + +def get_latest_files(filenames): + # Dictionary to store the latest file for each unique timestamp + latest_files = {} + + # Regular expression to extract the timestamp + pattern = r'ta01f_has_\d+f_(\d{8}_\d{2})_awips_(\d+)' + + for filename in filenames: + match = re.search(pattern, filename) + if match: + key = match.group(1) + '_' + match.group(2) + if key not in latest_files or filename > latest_files[key]: + latest_files[key] = filename + + # Return the list of latest files + return list(latest_files.values()) + +# APRFC qtf filename generator +def get_filenames(edate, url): + """ + date at end of filename hour and min can not be predicted + scraping data from website and finding all matching filenames + for the specified date. + """ + d_t1 = edate.strftime("%Y%m%d") + + page = requests.get(url) + soup = BeautifulSoup(page.content, "html.parser") + links = [node.get("href") for node in soup.find_all("a")] + filenames = [] + + + regex = f"^ta01f_has_\\d+f_\\d{{8}}_\\d{{2}}_awips_{d_t1}.*\\.grb(\\.gz)?$" + filenames = [link for link in links if re.match(regex, link)] + + + + + return get_latest_files(filenames) + + + +@dag( + default_args=default_args, + schedule="21 9,15,19 * * *", + tags=["cumulus", "temp", "QTF", "APRFC"], + max_active_runs=1, + max_active_tasks=1, +) +def cumulus_aprfc_qtf_01h(): + """This pipeline handles download, processing, and derivative product creation for \n + APRFC QTF\n + URL Dir - https://cbt.crohms.org/akgrids + Files matching ta01f_has_92f_20241219_08_awips_202412150008.grb. - 1 hour\n + """ + key_prefix = cumulus.S3_ACQUIRABLE_PREFIX + URL_ROOT = f"https://cbt.crohms.org/akgrids" + PRODUCT_SLUG = "aprfc-qtf-01h" + + @task() + def download_raw_qtf(): + logical_date = get_current_context()["logical_date"] + + return_list = list() + filenames = get_filenames(logical_date, URL_ROOT) + for filename in filenames: + url = f"{URL_ROOT}/{filename}" + s3_key = f"{key_prefix}/{PRODUCT_SLUG}/{filename}" + print(f"Downloading file: {filename}") + try: + trigger_download(url=url, s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key) + return_list.append( + { + "execution": logical_date.isoformat(), + "s3_key": s3_key, + "filename": filename, + } + ) + except: + print(f"{filename} is not available to download") + + return json.dumps(return_list) + + @task() + def notify_cumulus(payload): + payload = json.loads(payload) + for item in payload: + print("Notifying Cumulus: " + item["filename"]) + cumulus.notify_acquirablefile( + acquirable_id=cumulus.acquirables[PRODUCT_SLUG], + datetime=item["execution"], + s3_key=item["s3_key"], + ) + + notify_cumulus(download_raw_qtf()) + + +aprfc_qtf_dag = cumulus_aprfc_qtf_01h() diff --git a/plugins/helpers/cumulus.py b/plugins/helpers/cumulus.py index 6f182f2..9516f12 100644 --- a/plugins/helpers/cumulus.py +++ b/plugins/helpers/cumulus.py @@ -15,6 +15,7 @@ "aprfc-qpe-06h": "1f67d822-7cbc-11ee-b962-0242ac120002", "aprfc-qpf-06h": "a64cb16f-01a8-45c0-a069-9afda805d3a7", "aprfc-qte-01h": "7f8b2d6a-1f3e-11ee-be56-0242ac120002", + "aprfc-qtf-01h": "80f33047-6234-4949-9c2f-eec6bfcf7b0f", "cnrfc-qpe-06h": "34a89c35-090d-46e8-964a-c621403301b9", "cnrfc-qpf-06h": "c22785cd-400e-4664-aef8-426734825c2c", "cnrfc-nbm-qpf-06h": "40cfce36-cfad-4a10-8b2d-eb8862378ca5",