Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions dags/cumulus/abrfc_qpe_01h.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
)
def cumulus_abrfc_qpe_01h():
"""This pipeline handles download, processing, and derivative product creation for \n
Arkansas-Red Basin River Forecast Center (ABRFC) 1HR QPE\n
ABRFC QPE\n
URL Dir - https://tgftp.nws.noaa.gov/data/rfc/abrfc/xmrg_qpe/
Files matching abrfc_qpe_01hr_YYYYMMDDHHZ.nc - 1 hour\n
Files matching abrfc_qpe_01hr_YYYYMMDDHHZ.nc.gz - 1 hour\n
Note: Delay observed when watching new product timestamp on file at source.
Example: timestamp said 15:50, but was pushed to server at 16:07
"""
Expand All @@ -49,7 +49,7 @@ def cumulus_abrfc_qpe_01h():
@task()
def download_raw_qpe():
logical_date = get_current_context()["logical_date"]
filename = f'abrfc_qpe_01hr_{logical_date.strftime("%Y%m%d%H")}Z.nc'
filename = f'abrfc_qpe_01hr_{logical_date.strftime("%Y%m%d%H")}Z.nc.gz'
s3_key = f"{cumulus.S3_ACQUIRABLE_PREFIX}/{PRODUCT_SLUG}/{filename}"
print(f"Downloading file: {filename}")
trigger_download(
Expand Down
31 changes: 0 additions & 31 deletions dags/cumulus/ncep_stage4_conus_01h.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,38 +84,7 @@ def notify_cumulus(payload):
s3_key=p["s3_key"],
)

@task()
def download_last_n_products():

n = 120 # how many products back to download
logical_date = get_current_context()["logical_date"]
payload = []

for h in range(1, n + 1):
# Skip current hour, we just downloaded it above
lookback_date = logical_date - timedelta(hours=h)
# logging.info(f"lookback_date: {lookback_date}")
dirpath = f'pcpanl.{lookback_date.strftime("%Y%m%d")}'
filename = f'st4_conus.{lookback_date.strftime("%Y%m%d%H")}.01h.grb2'
filepath = f"{dirpath}/{filename}"
try:
s3_key = download_product(filepath)
payload.append(
{"datetime": lookback_date.isoformat(), "s3_key": s3_key}
)
except Exception as err:
logging.error(f"Failed to download {filepath}.")
logging.error(err)
# prevent a single failure from stopping the whole task
pass

# sleep to avoid hitting a rate limit on src server
time.sleep(2)

return json.dumps(payload)

notify_cumulus(download_stage4_qpe())
notify_cumulus(download_last_n_products())


stage4_dag = cumulus_ncep_stage4_conus_01h()
107 changes: 107 additions & 0 deletions dags/cumulus/ncep_stage4_conus_01h_revision.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""
Acquire and Process NCEP Stage 4 MOSAIC QPE Revisions
"""

import os, json, logging, time
from datetime import datetime, timedelta, timezone

# import calendar

from airflow import DAG
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context

# from airflow.exceptions import AirflowSkipException
from helpers.downloads import trigger_download

import helpers.cumulus as cumulus

default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": (datetime.now(timezone.utc) - timedelta(hours=1)).replace(
minute=0, second=0
),
"catchup_by_default": True,
"email_on_failure": False,
"email_on_retry": False,
"retries": 6,
"retry_delay": timedelta(minutes=10),
}


@dag(
default_args=default_args,
schedule="0 18 * * *",
tags=["cumulus", "precip", "QPE", "CONUS", "stage4", "NCEP", 'revision'],
max_active_runs=1,
max_active_tasks=2,
)
def cumulus_ncep_stage4_conus_01h_revision():
"""This pipeline handles download, processing, and derivative product creation for \n
NCEP Stage 4 MOSAIC QPE\n
URL Dir - https://nomads.ncep.noaa.gov/pub/data/nccf/com/pcpanl/prod/pcpanl.20220808/st4_conus.YYYYMMHHMM.01h.grb2\n
Files matching st4_conus.YYYYMMHHMM.01h.grb2 - 1 hour
"""

URL_ROOT = f"https://nomads.ncep.noaa.gov/pub/data/nccf/com/pcpanl/prod"
PRODUCT_SLUG = "ncep-stage4-mosaic-01h"

def download_product(filepath):

filename = os.path.basename(filepath)
s3_key = f"{cumulus.S3_ACQUIRABLE_PREFIX}/{PRODUCT_SLUG}/{filename}"
# logging.info(f"Downloading file: {filepath}")
trigger_download(
url=f"{URL_ROOT}/{filepath}", s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key
)
return s3_key


@task()
def notify_cumulus(payload):

# Airflow will convert the parameter to a string, convert it back
payload = json.loads(payload)

for p in payload:
cumulus.notify_acquirablefile(
acquirable_id=cumulus.acquirables[PRODUCT_SLUG],
datetime=p["datetime"],
s3_key=p["s3_key"],
)

@task()
def download_stage4_qpe_revision():

n = 24*14 # how many products back to download
logical_date = get_current_context()["logical_date"]
payload = []

for h in range(1, n + 1):
# Skip current hour, we just downloaded it in another dag
lookback_date = logical_date - timedelta(hours=h)
# logging.info(f"lookback_date: {lookback_date}")
dirpath = f'pcpanl.{lookback_date.strftime("%Y%m%d")}'
filename = f'st4_conus.{lookback_date.strftime("%Y%m%d%H")}.01h.grb2'
filepath = f"{dirpath}/{filename}"
try:
s3_key = download_product(filepath)
payload.append(
{"datetime": lookback_date.isoformat(), "s3_key": s3_key}
)
except Exception as err:
logging.error(f"Failed to download {filepath}.")
logging.error(err)
# prevent a single failure from stopping the whole task
pass

# sleep to avoid hitting a rate limit on src server
# time.sleep(2)

return json.dumps(payload)

notify_cumulus(download_stage4_qpe_revision())


stage4_dag = cumulus_ncep_stage4_conus_01h_revision()
89 changes: 89 additions & 0 deletions dags/cumulus/prism_backfill_por.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import json
from calendar import monthrange
from datetime import datetime, timedelta, timezone

import helpers.cumulus as cumulus
from airflow import DAG
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from helpers.downloads import trigger_download

default_args_backfill = {
"owner": "airflow",
"depends_on_past": False,
### to backfill POR
"start_date": datetime(1981, 1, 1), # Start from 1981
"end_date": (datetime.now(timezone.utc) - timedelta(days=180)).replace(
minute=0, second=0
), # Stop 6 months ago
"catchup": True, # Enable backfill
### to force backfill past 6 months
"email_on_failure": False,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(minutes=30),
}

@dag(
default_args=default_args_backfill,
schedule="30 01 1 * *", # monthly schedule
tags=["cumulus", "backfill", 'prism'],
max_active_runs=1, # Limit concurrent runs
max_active_tasks=4, # Limit concurrent tasks
)
def cumulus_prism_backfill_por():
"""Backfill historical PRISM data month-by-month."""

URL_ROOT = f"https://data.prism.oregonstate.edu/time_series/us/an/4km"

@task()
def download_historical_prism_month(short_name='ppt'):
product_slug = f"prism-{short_name}-early"
logical_date = get_current_context()["logical_date"]
execution_date = logical_date.date()
year = execution_date.year
month = execution_date.month
results = []

# Get the number of days in the month
num_days = monthrange(year, month)[1]

for day in range(1, num_days + 1):
dt = datetime(year, month, day)
file_dir = f'{URL_ROOT}/{short_name}/daily/{dt.strftime("%Y")}'
filename = f'prism_{short_name}_us_25m_{dt.strftime("%Y%m%d")}.zip'
s3_key = f"{cumulus.S3_ACQUIRABLE_PREFIX}/{product_slug}/{filename}"
print(f"Downloading {filename}")
try:
output = trigger_download(
url=f"{file_dir}/{filename}", s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key,
)
results.append(
{
"datetime": logical_date.isoformat(),
"s3_key": s3_key,
"product_slug": product_slug,
"filename": filename,
}
)
except:
print(f'Error downloading {filename}')
return json.dumps(results)

@task()
def notify_cumulus(payload):
payload = json.loads(payload)
for item in payload:
print("Notifying Cumulus: " + item["filename"])
cumulus.notify_acquirablefile(
acquirable_id=cumulus.acquirables[item["product_slug"]],
datetime=item["datetime"],
s3_key=item["s3_key"],
)

notify_cumulus(download_historical_prism_month(short_name='ppt'))
notify_cumulus(download_historical_prism_month(short_name='tmax'))
notify_cumulus(download_historical_prism_month(short_name='tmin'))


backfill_dag = cumulus_prism_backfill_por()
107 changes: 107 additions & 0 deletions dags/cumulus/prism_early.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""
Acquire and Process PRISM Real-time
"""

import json
from datetime import datetime, timedelta, timezone

from airflow import DAG
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from helpers.downloads import trigger_download

import helpers.cumulus as cumulus

default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": (datetime.now(timezone.utc)-timedelta(days=14)).replace(
minute=0, second=0
),
# "start_date": datetime(2021, 11, 9),
"catchup_by_default": True,
# "email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 5,
"retry_delay": timedelta(minutes=30),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}


def generate_filename_for_product(product, date):
# product: 'tmin', 'tmax', 'ppt'
# date: datetime object
return f'prism_{product}_us_25m_{date.strftime("%Y%m%d")}.zip'

@dag(
default_args=default_args,
schedule="30 12 * * *",
tags=["cumulus", 'prism'],
max_active_runs=2,
max_active_tasks=4,
)

def cumulus_prism_early():
"""This pipeline handles download, processing, and derivative product creation for \n
PRISM: Min Temp (tmin) early, Max Temp (tmax) early and Precip (ppt) early
URL Dir - ftp://prism.nacse.org/daily/tmin/YYYY/
Files matching prism_ppt_us_25m_YYYYMMDD.zip'- Daily around 12:30-14:30 UTC
"""

URL_ROOT = f"ftp://prism.nacse.org/daily"
URL_ROOT = f"https://data.prism.oregonstate.edu/time_series/us/an/4km"


# Download Tasks
#################################################
@task()
def download_raw_prism_early(short_name='ppt'):
product_slug = f"prism-{short_name}-early"
logical_date = get_current_context()["logical_date"]
execution_date = logical_date.date()
results = []

dt = execution_date
file_dir = f'{URL_ROOT}/{short_name}/daily/{dt.strftime("%Y")}'
filename = generate_filename_for_product(short_name, dt)
s3_key = f"{cumulus.S3_ACQUIRABLE_PREFIX}/{product_slug}/{filename}"
print(f"Downloading {filename}")
try:
output = trigger_download(
url=f"{file_dir}/{filename}", s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key,
)
results.append(
{
"datetime": logical_date.isoformat(),
"s3_key": s3_key,
"product_slug": product_slug,
"filename": filename,
}
)
except:
print(f'Error downloading {filename}')
return json.dumps(results)

# Notify Tasks
#################################################
@task()
def notify_cumulus(payload):
payload = json.loads(payload)
for item in payload:
print("Notifying Cumulus: " + item["filename"])
cumulus.notify_acquirablefile(
acquirable_id=cumulus.acquirables[item["product_slug"]],
datetime=item["datetime"],
s3_key=item["s3_key"],
)

notify_cumulus(download_raw_prism_early(short_name='ppt'))
notify_cumulus(download_raw_prism_early(short_name='tmax'))
notify_cumulus(download_raw_prism_early(short_name= 'tmin'))


prism_dag = cumulus_prism_early()
Loading