Skip to content
Merged
37 changes: 26 additions & 11 deletions dags/cumulus/aprfc_qpf_06h.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""

import json
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
import calendar
from bs4 import BeautifulSoup
import re
Expand All @@ -20,14 +20,30 @@
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": (datetime.utcnow() - timedelta(hours=72)).replace(minute=0, second=0),
"start_date": (datetime.now(timezone.utc) - timedelta(hours=36)).replace(minute=0, second=0),
"catchup_by_default": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 6,
"retry_delay": timedelta(minutes=30),
}

def get_latest_files(filenames):
# Dictionary to store the latest file for each unique timestamp
latest_files = {}

# Regular expression to extract the timestamp
pattern = r'qpf06f_has_\d+f_(\d{8}_\d{2})_awips_(\d+)'

for filename in filenames:
match = re.search(pattern, filename)
if match:
key = match.group(1) + '_' + match.group(2)
if key not in latest_files or filename > latest_files[key]:
latest_files[key] = filename

# Return the list of latest files
return list(latest_files.values())

# ALR QPF filename generator
def get_filenames(edate, url):
Expand All @@ -37,25 +53,24 @@ def get_filenames(edate, url):
for the sprcified date.
"""
d_t1 = edate.strftime("%Y%m%d")
d_t2 = (edate - timedelta(hours=24)).strftime("%Y%m%d")


page = requests.get(url)
soup = BeautifulSoup(page.content, "html.parser")
links = [node.get("href") for node in soup.find_all("a")]
filenames = []
for d in [d_t2, d_t1]:
regex = f"^qpf06f_has_.*.awips_{d}\d+.grb.gz$"
filenames = filenames + [link for link in links if re.match(regex, link)]
regex = f"^qpf06f_has_\\d+f_\\d{{8}}_\\d{{2}}_awips_{d_t1}.*\\.grb(\\.gz)?$"
filenames = [link for link in links if re.match(regex, link)]

return filenames
return get_latest_files(filenames)


@dag(
default_args=default_args,
schedule="40 14,5 * * *",
schedule="20 9,15,19 * * *",
tags=["cumulus", "precip", "QPF", "APRFC"],
max_active_runs=2,
max_active_tasks=4,
max_active_runs=1,
max_active_tasks=1,
)
def cumulus_aprfc_qpf_06h():
"""This pipeline handles download, processing, and derivative product creation for \n
Expand Down Expand Up @@ -105,4 +120,4 @@ def notify_cumulus(payload):
notify_cumulus(download_raw_qpf())


aprfc_qpe_dag = cumulus_aprfc_qpf_06h()
aprfc_qpf_dag = cumulus_aprfc_qpf_06h()
128 changes: 128 additions & 0 deletions dags/cumulus/aprfc_qtf_01h.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
"""
Acquire and Process APRFC qtf 01h
"""

import json
from datetime import datetime, timedelta
import calendar
from bs4 import BeautifulSoup
import re
import requests

from airflow import DAG
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from helpers.downloads import trigger_download

import helpers.cumulus as cumulus

# Default arguments
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": (datetime.utcnow() - timedelta(hours=36)).replace(minute=0, second=0),
"catchup_by_default": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 6,
"retry_delay": timedelta(minutes=30),
}

def get_latest_files(filenames):
# Dictionary to store the latest file for each unique timestamp
latest_files = {}

# Regular expression to extract the timestamp
pattern = r'ta01f_has_\d+f_(\d{8}_\d{2})_awips_(\d+)'

for filename in filenames:
match = re.search(pattern, filename)
if match:
key = match.group(1) + '_' + match.group(2)
if key not in latest_files or filename > latest_files[key]:
latest_files[key] = filename

# Return the list of latest files
return list(latest_files.values())

# APRFC qtf filename generator
def get_filenames(edate, url):
"""
date at end of filename hour and min can not be predicted
scraping data from website and finding all matching filenames
for the specified date.
"""
d_t1 = edate.strftime("%Y%m%d")

page = requests.get(url)
soup = BeautifulSoup(page.content, "html.parser")
links = [node.get("href") for node in soup.find_all("a")]
filenames = []


regex = f"^ta01f_has_\\d+f_\\d{{8}}_\\d{{2}}_awips_{d_t1}.*\\.grb(\\.gz)?$"
filenames = [link for link in links if re.match(regex, link)]




return get_latest_files(filenames)



@dag(
default_args=default_args,
schedule="21 9,15,19 * * *",
tags=["cumulus", "temp", "QTF", "APRFC"],
max_active_runs=1,
max_active_tasks=1,
)
def cumulus_aprfc_qtf_01h():
"""This pipeline handles download, processing, and derivative product creation for \n
APRFC QTF\n
URL Dir - https://cbt.crohms.org/akgrids
Files matching ta01f_has_92f_20241219_08_awips_202412150008.grb. - 1 hour\n
"""
key_prefix = cumulus.S3_ACQUIRABLE_PREFIX
URL_ROOT = f"https://cbt.crohms.org/akgrids"
PRODUCT_SLUG = "aprfc-qtf-01h"

@task()
def download_raw_qtf():
logical_date = get_current_context()["logical_date"]

return_list = list()
filenames = get_filenames(logical_date, URL_ROOT)
for filename in filenames:
url = f"{URL_ROOT}/{filename}"
s3_key = f"{key_prefix}/{PRODUCT_SLUG}/{filename}"
print(f"Downloading file: {filename}")
try:
trigger_download(url=url, s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key)
return_list.append(
{
"execution": logical_date.isoformat(),
"s3_key": s3_key,
"filename": filename,
}
)
except:
print(f"{filename} is not available to download")

return json.dumps(return_list)

@task()
def notify_cumulus(payload):
payload = json.loads(payload)
for item in payload:
print("Notifying Cumulus: " + item["filename"])
cumulus.notify_acquirablefile(
acquirable_id=cumulus.acquirables[PRODUCT_SLUG],
datetime=item["execution"],
s3_key=item["s3_key"],
)

notify_cumulus(download_raw_qtf())


aprfc_qtf_dag = cumulus_aprfc_qtf_01h()
1 change: 1 addition & 0 deletions plugins/helpers/cumulus.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"aprfc-qpe-06h": "1f67d822-7cbc-11ee-b962-0242ac120002",
"aprfc-qpf-06h": "a64cb16f-01a8-45c0-a069-9afda805d3a7",
"aprfc-qte-01h": "7f8b2d6a-1f3e-11ee-be56-0242ac120002",
"aprfc-qtf-01h": "80f33047-6234-4949-9c2f-eec6bfcf7b0f",
"cnrfc-qpe-06h": "34a89c35-090d-46e8-964a-c621403301b9",
"cnrfc-qpf-06h": "c22785cd-400e-4664-aef8-426734825c2c",
"cnrfc-nbm-qpf-06h": "40cfce36-cfad-4a10-8b2d-eb8862378ca5",
Expand Down