Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions dags/cumulus/abrfc_qpe_01h.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
)
def cumulus_abrfc_qpe_01h():
"""This pipeline handles download, processing, and derivative product creation for \n
Arkansas-Red Basin River Forecast Center (ABRFC) 1HR QPE\n
ABRFC QPE\n
URL Dir - https://tgftp.nws.noaa.gov/data/rfc/abrfc/xmrg_qpe/
Files matching abrfc_qpe_01hr_YYYYMMDDHHZ.nc - 1 hour\n
Files matching abrfc_qpe_01hr_YYYYMMDDHHZ.nc.gz - 1 hour\n
Note: Delay observed when watching new product timestamp on file at source.
Example: timestamp said 15:50, but was pushed to server at 16:07
"""
Expand All @@ -49,7 +49,7 @@ def cumulus_abrfc_qpe_01h():
@task()
def download_raw_qpe():
logical_date = get_current_context()["logical_date"]
filename = f'abrfc_qpe_01hr_{logical_date.strftime("%Y%m%d%H")}Z.nc'
filename = f'abrfc_qpe_01hr_{logical_date.strftime("%Y%m%d%H")}Z.nc.gz'
s3_key = f"{cumulus.S3_ACQUIRABLE_PREFIX}/{PRODUCT_SLUG}/{filename}"
print(f"Downloading file: {filename}")
trigger_download(
Expand Down
4 changes: 2 additions & 2 deletions dags/cumulus/abrfc_qpf_06h.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ def qpf_filenames(edate):
schedule="8 */6 * * *",
tags=["cumulus", "precip", "QPF", "ABRFC"],
doc_md=__doc__,
max_active_runs=1,
max_active_tasks=2,
max_active_runs=2,
max_active_tasks=4,
)
def cumulus_abrfc_qpf_06h():
key_prefix = cumulus.S3_ACQUIRABLE_PREFIX
Expand Down
6 changes: 3 additions & 3 deletions dags/cumulus/aprfc_qpe_06h.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"catchup_by_default": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 6,
"retries": 1,
"retry_delay": timedelta(minutes=30),
}

Expand All @@ -43,8 +43,8 @@ def get_filenames(edate):
default_args=default_args,
schedule="40 14,5 * * *",
tags=["cumulus", "precip", "QPE", "APRFC"],
max_active_runs=2,
max_active_tasks=4,
max_active_runs=1,
max_active_tasks=1,
)
def cumulus_aprfc_qpe_06h():
"""This pipeline handles download, processing, and derivative product creation for \n
Expand Down
9 changes: 5 additions & 4 deletions dags/cumulus/aprfc_qpf_06h.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"catchup_by_default": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 6,
"retries": 1,
"retry_delay": timedelta(minutes=30),
}

Expand Down Expand Up @@ -128,9 +128,10 @@ def download_raw_qpf():
url = f"{URL_ROOT}/{filename}"
s3_key = f"{key_prefix}/{PRODUCT_SLUG}/{filename}"
# Check if the file already exists in S3
if s3_file_exists(cumulus.S3_BUCKET, s3_key):
print(f"Skipping existing S3 object: s3://{cumulus.S3_BUCKET}/{s3_key}")
continue # Skip to the next file
# needs infrastructure change to reenable this check
#if s3_file_exists(cumulus.S3_BUCKET, s3_key):
# print(f"Skipping existing S3 object: s3://{cumulus.S3_BUCKET}/{s3_key}")
# continue # Skip to the next file
print(f"Downloading file: {filename}")
try:
trigger_download(url=url, s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key)
Expand Down
2 changes: 1 addition & 1 deletion dags/cumulus/aprfc_qte_01h.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"catchup_by_default": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 6,
"retries": 1,
"retry_delay": timedelta(minutes=30),
}

Expand Down
9 changes: 5 additions & 4 deletions dags/cumulus/aprfc_qtf_01h.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"catchup_by_default": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 6,
"retries": 1,
"retry_delay": timedelta(minutes=30),
}

Expand Down Expand Up @@ -131,9 +131,10 @@ def download_raw_qtf():
url = f"{URL_ROOT}/{filename}"
s3_key = f"{key_prefix}/{PRODUCT_SLUG}/{filename}"
# Check if the file already exists in S3
if s3_file_exists(cumulus.S3_BUCKET, s3_key):
print(f"Skipping existing S3 object: s3://{cumulus.S3_BUCKET}/{s3_key}")
continue # Skip to the next file
#need infrastructure changes to reenable this check
#if s3_file_exists(cumulus.S3_BUCKET, s3_key):
# print(f"Skipping existing S3 object: s3://{cumulus.S3_BUCKET}/{s3_key}")
# continue # Skip to the next file
print(f"Downloading file: {filename}")
try:
trigger_download(url=url, s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key)
Expand Down
31 changes: 0 additions & 31 deletions dags/cumulus/ncep_stage4_conus_01h.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,38 +84,7 @@ def notify_cumulus(payload):
s3_key=p["s3_key"],
)

@task()
def download_last_n_products():

n = 120 # how many products back to download
logical_date = get_current_context()["logical_date"]
payload = []

for h in range(1, n + 1):
# Skip current hour, we just downloaded it above
lookback_date = logical_date - timedelta(hours=h)
# logging.info(f"lookback_date: {lookback_date}")
dirpath = f'pcpanl.{lookback_date.strftime("%Y%m%d")}'
filename = f'st4_conus.{lookback_date.strftime("%Y%m%d%H")}.01h.grb2'
filepath = f"{dirpath}/{filename}"
try:
s3_key = download_product(filepath)
payload.append(
{"datetime": lookback_date.isoformat(), "s3_key": s3_key}
)
except Exception as err:
logging.error(f"Failed to download {filepath}.")
logging.error(err)
# prevent a single failure from stopping the whole task
pass

# sleep to avoid hitting a rate limit on src server
time.sleep(2)

return json.dumps(payload)

notify_cumulus(download_stage4_qpe())
notify_cumulus(download_last_n_products())


stage4_dag = cumulus_ncep_stage4_conus_01h()
107 changes: 107 additions & 0 deletions dags/cumulus/ncep_stage4_conus_01h_revision.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""
Acquire and Process NCEP Stage 4 MOSAIC QPE Revisions
"""

import os, json, logging, time
from datetime import datetime, timedelta, timezone

# import calendar

from airflow import DAG
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context

# from airflow.exceptions import AirflowSkipException
from helpers.downloads import trigger_download

import helpers.cumulus as cumulus

default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": (datetime.now(timezone.utc) - timedelta(hours=1)).replace(
minute=0, second=0
),
"catchup_by_default": True,
"email_on_failure": False,
"email_on_retry": False,
"retries": 6,
"retry_delay": timedelta(minutes=10),
}


@dag(
default_args=default_args,
schedule="0 18 * * *",
tags=["cumulus", "precip", "QPE", "CONUS", "stage4", "NCEP", 'revision'],
max_active_runs=1,
max_active_tasks=2,
)
def cumulus_ncep_stage4_conus_01h_revision():
"""This pipeline handles download, processing, and derivative product creation for \n
NCEP Stage 4 MOSAIC QPE\n
URL Dir - https://nomads.ncep.noaa.gov/pub/data/nccf/com/pcpanl/prod/pcpanl.20220808/st4_conus.YYYYMMHHMM.01h.grb2\n
Files matching st4_conus.YYYYMMHHMM.01h.grb2 - 1 hour
"""

URL_ROOT = f"https://nomads.ncep.noaa.gov/pub/data/nccf/com/pcpanl/prod"
PRODUCT_SLUG = "ncep-stage4-mosaic-01h"

def download_product(filepath):

filename = os.path.basename(filepath)
s3_key = f"{cumulus.S3_ACQUIRABLE_PREFIX}/{PRODUCT_SLUG}/{filename}"
# logging.info(f"Downloading file: {filepath}")
trigger_download(
url=f"{URL_ROOT}/{filepath}", s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key
)
return s3_key


@task()
def notify_cumulus(payload):

# Airflow will convert the parameter to a string, convert it back
payload = json.loads(payload)

for p in payload:
cumulus.notify_acquirablefile(
acquirable_id=cumulus.acquirables[PRODUCT_SLUG],
datetime=p["datetime"],
s3_key=p["s3_key"],
)

@task()
def download_stage4_qpe_revision():

n = 24*14 # how many products back to download
logical_date = get_current_context()["logical_date"]
payload = []

for h in range(1, n + 1):
# Skip current hour, we just downloaded it in another dag
lookback_date = logical_date - timedelta(hours=h)
# logging.info(f"lookback_date: {lookback_date}")
dirpath = f'pcpanl.{lookback_date.strftime("%Y%m%d")}'
filename = f'st4_conus.{lookback_date.strftime("%Y%m%d%H")}.01h.grb2'
filepath = f"{dirpath}/{filename}"
try:
s3_key = download_product(filepath)
payload.append(
{"datetime": lookback_date.isoformat(), "s3_key": s3_key}
)
except Exception as err:
logging.error(f"Failed to download {filepath}.")
logging.error(err)
# prevent a single failure from stopping the whole task
pass

# sleep to avoid hitting a rate limit on src server
# time.sleep(2)

return json.dumps(payload)

notify_cumulus(download_stage4_qpe_revision())


stage4_dag = cumulus_ncep_stage4_conus_01h_revision()
89 changes: 89 additions & 0 deletions dags/cumulus/prism_backfill_por.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import json
from calendar import monthrange
from datetime import datetime, timedelta, timezone

import helpers.cumulus as cumulus
from airflow import DAG
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from helpers.downloads import trigger_download

default_args_backfill = {
"owner": "airflow",
"depends_on_past": False,
### to backfill POR
"start_date": datetime(1981, 1, 1), # Start from 1981
"end_date": (datetime.now(timezone.utc) - timedelta(days=180)).replace(
minute=0, second=0
), # Stop 6 months ago
"catchup": True, # Enable backfill
### to force backfill past 6 months
"email_on_failure": False,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(minutes=30),
}

@dag(
default_args=default_args_backfill,
schedule="30 01 1 * *", # monthly schedule
tags=["cumulus", "backfill", 'prism'],
max_active_runs=1, # Limit concurrent runs
max_active_tasks=4, # Limit concurrent tasks
)
def cumulus_prism_backfill_por():
"""Backfill historical PRISM data month-by-month."""

URL_ROOT = f"https://data.prism.oregonstate.edu/time_series/us/an/4km"

@task()
def download_historical_prism_month(short_name='ppt'):
product_slug = f"prism-{short_name}-early"
logical_date = get_current_context()["logical_date"]
execution_date = logical_date.date()
year = execution_date.year
month = execution_date.month
results = []

# Get the number of days in the month
num_days = monthrange(year, month)[1]

for day in range(1, num_days + 1):
dt = datetime(year, month, day)
file_dir = f'{URL_ROOT}/{short_name}/daily/{dt.strftime("%Y")}'
filename = f'prism_{short_name}_us_25m_{dt.strftime("%Y%m%d")}.zip'
s3_key = f"{cumulus.S3_ACQUIRABLE_PREFIX}/{product_slug}/{filename}"
print(f"Downloading {filename}")
try:
output = trigger_download(
url=f"{file_dir}/{filename}", s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key,
)
results.append(
{
"datetime": logical_date.isoformat(),
"s3_key": s3_key,
"product_slug": product_slug,
"filename": filename,
}
)
except:
print(f'Error downloading {filename}')
return json.dumps(results)

@task()
def notify_cumulus(payload):
payload = json.loads(payload)
for item in payload:
print("Notifying Cumulus: " + item["filename"])
cumulus.notify_acquirablefile(
acquirable_id=cumulus.acquirables[item["product_slug"]],
datetime=item["datetime"],
s3_key=item["s3_key"],
)

notify_cumulus(download_historical_prism_month(short_name='ppt'))
notify_cumulus(download_historical_prism_month(short_name='tmax'))
notify_cumulus(download_historical_prism_month(short_name='tmin'))


backfill_dag = cumulus_prism_backfill_por()
Loading