diff --git a/dags/cumulus/abrfc_qpe_01h.py b/dags/cumulus/abrfc_qpe_01h.py index 181d035..cded0be 100644 --- a/dags/cumulus/abrfc_qpe_01h.py +++ b/dags/cumulus/abrfc_qpe_01h.py @@ -36,9 +36,9 @@ ) def cumulus_abrfc_qpe_01h(): """This pipeline handles download, processing, and derivative product creation for \n - Arkansas-Red Basin River Forecast Center (ABRFC) 1HR QPE\n + ABRFC QPE\n URL Dir - https://tgftp.nws.noaa.gov/data/rfc/abrfc/xmrg_qpe/ - Files matching abrfc_qpe_01hr_YYYYMMDDHHZ.nc - 1 hour\n + Files matching abrfc_qpe_01hr_YYYYMMDDHHZ.nc.gz - 1 hour\n Note: Delay observed when watching new product timestamp on file at source. Example: timestamp said 15:50, but was pushed to server at 16:07 """ @@ -49,7 +49,7 @@ def cumulus_abrfc_qpe_01h(): @task() def download_raw_qpe(): logical_date = get_current_context()["logical_date"] - filename = f'abrfc_qpe_01hr_{logical_date.strftime("%Y%m%d%H")}Z.nc' + filename = f'abrfc_qpe_01hr_{logical_date.strftime("%Y%m%d%H")}Z.nc.gz' s3_key = f"{cumulus.S3_ACQUIRABLE_PREFIX}/{PRODUCT_SLUG}/{filename}" print(f"Downloading file: {filename}") trigger_download( diff --git a/dags/cumulus/abrfc_qpf_06h.py b/dags/cumulus/abrfc_qpf_06h.py index b2590e4..4adee67 100644 --- a/dags/cumulus/abrfc_qpf_06h.py +++ b/dags/cumulus/abrfc_qpf_06h.py @@ -56,8 +56,8 @@ def qpf_filenames(edate): schedule="8 */6 * * *", tags=["cumulus", "precip", "QPF", "ABRFC"], doc_md=__doc__, - max_active_runs=1, - max_active_tasks=2, + max_active_runs=2, + max_active_tasks=4, ) def cumulus_abrfc_qpf_06h(): key_prefix = cumulus.S3_ACQUIRABLE_PREFIX diff --git a/dags/cumulus/aprfc_qpe_06h.py b/dags/cumulus/aprfc_qpe_06h.py index 07678cc..4fcb486 100644 --- a/dags/cumulus/aprfc_qpe_06h.py +++ b/dags/cumulus/aprfc_qpe_06h.py @@ -23,7 +23,7 @@ "catchup_by_default": False, "email_on_failure": False, "email_on_retry": False, - "retries": 6, + "retries": 1, "retry_delay": timedelta(minutes=30), } @@ -43,8 +43,8 @@ def get_filenames(edate): default_args=default_args, schedule="40 14,5 * * *", tags=["cumulus", "precip", "QPE", "APRFC"], - max_active_runs=2, - max_active_tasks=4, + max_active_runs=1, + max_active_tasks=1, ) def cumulus_aprfc_qpe_06h(): """This pipeline handles download, processing, and derivative product creation for \n diff --git a/dags/cumulus/aprfc_qpf_06h.py b/dags/cumulus/aprfc_qpf_06h.py index 99f319c..e87e559 100644 --- a/dags/cumulus/aprfc_qpf_06h.py +++ b/dags/cumulus/aprfc_qpf_06h.py @@ -25,7 +25,7 @@ "catchup_by_default": False, "email_on_failure": False, "email_on_retry": False, - "retries": 6, + "retries": 1, "retry_delay": timedelta(minutes=30), } @@ -128,9 +128,10 @@ def download_raw_qpf(): url = f"{URL_ROOT}/{filename}" s3_key = f"{key_prefix}/{PRODUCT_SLUG}/{filename}" # Check if the file already exists in S3 - if s3_file_exists(cumulus.S3_BUCKET, s3_key): - print(f"Skipping existing S3 object: s3://{cumulus.S3_BUCKET}/{s3_key}") - continue # Skip to the next file + # needs infrastructure change to reenable this check + #if s3_file_exists(cumulus.S3_BUCKET, s3_key): + # print(f"Skipping existing S3 object: s3://{cumulus.S3_BUCKET}/{s3_key}") + # continue # Skip to the next file print(f"Downloading file: {filename}") try: trigger_download(url=url, s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key) diff --git a/dags/cumulus/aprfc_qte_01h.py b/dags/cumulus/aprfc_qte_01h.py index 37ab964..7dbaddc 100644 --- a/dags/cumulus/aprfc_qte_01h.py +++ b/dags/cumulus/aprfc_qte_01h.py @@ -26,7 +26,7 @@ "catchup_by_default": False, "email_on_failure": False, "email_on_retry": False, - "retries": 6, + "retries": 1, "retry_delay": timedelta(minutes=30), } diff --git a/dags/cumulus/aprfc_qtf_01h.py b/dags/cumulus/aprfc_qtf_01h.py index 59c87f3..3ec1098 100644 --- a/dags/cumulus/aprfc_qtf_01h.py +++ b/dags/cumulus/aprfc_qtf_01h.py @@ -25,7 +25,7 @@ "catchup_by_default": False, "email_on_failure": False, "email_on_retry": False, - "retries": 6, + "retries": 1, "retry_delay": timedelta(minutes=30), } @@ -131,9 +131,10 @@ def download_raw_qtf(): url = f"{URL_ROOT}/{filename}" s3_key = f"{key_prefix}/{PRODUCT_SLUG}/{filename}" # Check if the file already exists in S3 - if s3_file_exists(cumulus.S3_BUCKET, s3_key): - print(f"Skipping existing S3 object: s3://{cumulus.S3_BUCKET}/{s3_key}") - continue # Skip to the next file + #need infrastructure changes to reenable this check + #if s3_file_exists(cumulus.S3_BUCKET, s3_key): + # print(f"Skipping existing S3 object: s3://{cumulus.S3_BUCKET}/{s3_key}") + # continue # Skip to the next file print(f"Downloading file: {filename}") try: trigger_download(url=url, s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key) diff --git a/dags/cumulus/ncep_stage4_conus_01h.py b/dags/cumulus/ncep_stage4_conus_01h.py index beb5cff..6a9ec7f 100644 --- a/dags/cumulus/ncep_stage4_conus_01h.py +++ b/dags/cumulus/ncep_stage4_conus_01h.py @@ -84,38 +84,7 @@ def notify_cumulus(payload): s3_key=p["s3_key"], ) - @task() - def download_last_n_products(): - - n = 120 # how many products back to download - logical_date = get_current_context()["logical_date"] - payload = [] - - for h in range(1, n + 1): - # Skip current hour, we just downloaded it above - lookback_date = logical_date - timedelta(hours=h) - # logging.info(f"lookback_date: {lookback_date}") - dirpath = f'pcpanl.{lookback_date.strftime("%Y%m%d")}' - filename = f'st4_conus.{lookback_date.strftime("%Y%m%d%H")}.01h.grb2' - filepath = f"{dirpath}/{filename}" - try: - s3_key = download_product(filepath) - payload.append( - {"datetime": lookback_date.isoformat(), "s3_key": s3_key} - ) - except Exception as err: - logging.error(f"Failed to download {filepath}.") - logging.error(err) - # prevent a single failure from stopping the whole task - pass - - # sleep to avoid hitting a rate limit on src server - time.sleep(2) - - return json.dumps(payload) - notify_cumulus(download_stage4_qpe()) - notify_cumulus(download_last_n_products()) stage4_dag = cumulus_ncep_stage4_conus_01h() diff --git a/dags/cumulus/ncep_stage4_conus_01h_revision.py b/dags/cumulus/ncep_stage4_conus_01h_revision.py new file mode 100644 index 0000000..7f6b333 --- /dev/null +++ b/dags/cumulus/ncep_stage4_conus_01h_revision.py @@ -0,0 +1,107 @@ +""" +Acquire and Process NCEP Stage 4 MOSAIC QPE Revisions +""" + +import os, json, logging, time +from datetime import datetime, timedelta, timezone + +# import calendar + +from airflow import DAG +from airflow.decorators import dag, task +from airflow.operators.python import get_current_context + +# from airflow.exceptions import AirflowSkipException +from helpers.downloads import trigger_download + +import helpers.cumulus as cumulus + +default_args = { + "owner": "airflow", + "depends_on_past": False, + "start_date": (datetime.now(timezone.utc) - timedelta(hours=1)).replace( + minute=0, second=0 + ), + "catchup_by_default": True, + "email_on_failure": False, + "email_on_retry": False, + "retries": 6, + "retry_delay": timedelta(minutes=10), +} + + +@dag( + default_args=default_args, + schedule="0 18 * * *", + tags=["cumulus", "precip", "QPE", "CONUS", "stage4", "NCEP", 'revision'], + max_active_runs=1, + max_active_tasks=2, +) +def cumulus_ncep_stage4_conus_01h_revision(): + """This pipeline handles download, processing, and derivative product creation for \n + NCEP Stage 4 MOSAIC QPE\n + URL Dir - https://nomads.ncep.noaa.gov/pub/data/nccf/com/pcpanl/prod/pcpanl.20220808/st4_conus.YYYYMMHHMM.01h.grb2\n + Files matching st4_conus.YYYYMMHHMM.01h.grb2 - 1 hour + """ + + URL_ROOT = f"https://nomads.ncep.noaa.gov/pub/data/nccf/com/pcpanl/prod" + PRODUCT_SLUG = "ncep-stage4-mosaic-01h" + + def download_product(filepath): + + filename = os.path.basename(filepath) + s3_key = f"{cumulus.S3_ACQUIRABLE_PREFIX}/{PRODUCT_SLUG}/{filename}" + # logging.info(f"Downloading file: {filepath}") + trigger_download( + url=f"{URL_ROOT}/{filepath}", s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key + ) + return s3_key + + + @task() + def notify_cumulus(payload): + + # Airflow will convert the parameter to a string, convert it back + payload = json.loads(payload) + + for p in payload: + cumulus.notify_acquirablefile( + acquirable_id=cumulus.acquirables[PRODUCT_SLUG], + datetime=p["datetime"], + s3_key=p["s3_key"], + ) + + @task() + def download_stage4_qpe_revision(): + + n = 24*14 # how many products back to download + logical_date = get_current_context()["logical_date"] + payload = [] + + for h in range(1, n + 1): + # Skip current hour, we just downloaded it in another dag + lookback_date = logical_date - timedelta(hours=h) + # logging.info(f"lookback_date: {lookback_date}") + dirpath = f'pcpanl.{lookback_date.strftime("%Y%m%d")}' + filename = f'st4_conus.{lookback_date.strftime("%Y%m%d%H")}.01h.grb2' + filepath = f"{dirpath}/{filename}" + try: + s3_key = download_product(filepath) + payload.append( + {"datetime": lookback_date.isoformat(), "s3_key": s3_key} + ) + except Exception as err: + logging.error(f"Failed to download {filepath}.") + logging.error(err) + # prevent a single failure from stopping the whole task + pass + + # sleep to avoid hitting a rate limit on src server + # time.sleep(2) + + return json.dumps(payload) + + notify_cumulus(download_stage4_qpe_revision()) + + +stage4_dag = cumulus_ncep_stage4_conus_01h_revision() diff --git a/dags/cumulus/prism_backfill_por.py b/dags/cumulus/prism_backfill_por.py new file mode 100644 index 0000000..3f08952 --- /dev/null +++ b/dags/cumulus/prism_backfill_por.py @@ -0,0 +1,89 @@ +import json +from calendar import monthrange +from datetime import datetime, timedelta, timezone + +import helpers.cumulus as cumulus +from airflow import DAG +from airflow.decorators import dag, task +from airflow.operators.python import get_current_context +from helpers.downloads import trigger_download + +default_args_backfill = { + "owner": "airflow", + "depends_on_past": False, + ### to backfill POR + "start_date": datetime(1981, 1, 1), # Start from 1981 + "end_date": (datetime.now(timezone.utc) - timedelta(days=180)).replace( + minute=0, second=0 + ), # Stop 6 months ago + "catchup": True, # Enable backfill + ### to force backfill past 6 months + "email_on_failure": False, + "email_on_retry": False, + "retries": 2, + "retry_delay": timedelta(minutes=30), +} + +@dag( + default_args=default_args_backfill, + schedule="30 01 1 * *", # monthly schedule + tags=["cumulus", "backfill", 'prism'], + max_active_runs=1, # Limit concurrent runs + max_active_tasks=4, # Limit concurrent tasks +) +def cumulus_prism_backfill_por(): + """Backfill historical PRISM data month-by-month.""" + + URL_ROOT = f"https://data.prism.oregonstate.edu/time_series/us/an/4km" + + @task() + def download_historical_prism_month(short_name='ppt'): + product_slug = f"prism-{short_name}-early" + logical_date = get_current_context()["logical_date"] + execution_date = logical_date.date() + year = execution_date.year + month = execution_date.month + results = [] + + # Get the number of days in the month + num_days = monthrange(year, month)[1] + + for day in range(1, num_days + 1): + dt = datetime(year, month, day) + file_dir = f'{URL_ROOT}/{short_name}/daily/{dt.strftime("%Y")}' + filename = f'prism_{short_name}_us_25m_{dt.strftime("%Y%m%d")}.zip' + s3_key = f"{cumulus.S3_ACQUIRABLE_PREFIX}/{product_slug}/{filename}" + print(f"Downloading {filename}") + try: + output = trigger_download( + url=f"{file_dir}/{filename}", s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key, + ) + results.append( + { + "datetime": logical_date.isoformat(), + "s3_key": s3_key, + "product_slug": product_slug, + "filename": filename, + } + ) + except: + print(f'Error downloading {filename}') + return json.dumps(results) + + @task() + def notify_cumulus(payload): + payload = json.loads(payload) + for item in payload: + print("Notifying Cumulus: " + item["filename"]) + cumulus.notify_acquirablefile( + acquirable_id=cumulus.acquirables[item["product_slug"]], + datetime=item["datetime"], + s3_key=item["s3_key"], + ) + + notify_cumulus(download_historical_prism_month(short_name='ppt')) + notify_cumulus(download_historical_prism_month(short_name='tmax')) + notify_cumulus(download_historical_prism_month(short_name='tmin')) + + +backfill_dag = cumulus_prism_backfill_por() diff --git a/dags/cumulus/prism_early.py b/dags/cumulus/prism_early.py new file mode 100644 index 0000000..c6dff2f --- /dev/null +++ b/dags/cumulus/prism_early.py @@ -0,0 +1,107 @@ +""" +Acquire and Process PRISM Real-time +""" + +import json +from datetime import datetime, timedelta, timezone + +from airflow import DAG +from airflow.decorators import dag, task +from airflow.operators.python import get_current_context +from helpers.downloads import trigger_download + +import helpers.cumulus as cumulus + +default_args = { + "owner": "airflow", + "depends_on_past": False, + "start_date": (datetime.now(timezone.utc)-timedelta(days=14)).replace( + minute=0, second=0 + ), + # "start_date": datetime(2021, 11, 9), + "catchup_by_default": True, + # "email": ["airflow@airflow.com"], + "email_on_failure": False, + "email_on_retry": False, + "retries": 5, + "retry_delay": timedelta(minutes=30), + # 'queue': 'bash_queue', + # 'pool': 'backfill', + # 'priority_weight': 10, + # 'end_date': datetime(2016, 1, 1), +} + + +def generate_filename_for_product(product, date): + # product: 'tmin', 'tmax', 'ppt' + # date: datetime object + return f'prism_{product}_us_25m_{date.strftime("%Y%m%d")}.zip' + +@dag( + default_args=default_args, + schedule="30 12 * * *", + tags=["cumulus", 'prism'], + max_active_runs=2, + max_active_tasks=4, +) + +def cumulus_prism_early(): + """This pipeline handles download, processing, and derivative product creation for \n + PRISM: Min Temp (tmin) early, Max Temp (tmax) early and Precip (ppt) early + URL Dir - ftp://prism.nacse.org/daily/tmin/YYYY/ + Files matching prism_ppt_us_25m_YYYYMMDD.zip'- Daily around 12:30-14:30 UTC + """ + + URL_ROOT = f"ftp://prism.nacse.org/daily" + URL_ROOT = f"https://data.prism.oregonstate.edu/time_series/us/an/4km" + + + # Download Tasks + ################################################# + @task() + def download_raw_prism_early(short_name='ppt'): + product_slug = f"prism-{short_name}-early" + logical_date = get_current_context()["logical_date"] + execution_date = logical_date.date() + results = [] + + dt = execution_date + file_dir = f'{URL_ROOT}/{short_name}/daily/{dt.strftime("%Y")}' + filename = generate_filename_for_product(short_name, dt) + s3_key = f"{cumulus.S3_ACQUIRABLE_PREFIX}/{product_slug}/{filename}" + print(f"Downloading {filename}") + try: + output = trigger_download( + url=f"{file_dir}/{filename}", s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key, + ) + results.append( + { + "datetime": logical_date.isoformat(), + "s3_key": s3_key, + "product_slug": product_slug, + "filename": filename, + } + ) + except: + print(f'Error downloading {filename}') + return json.dumps(results) + + # Notify Tasks + ################################################# + @task() + def notify_cumulus(payload): + payload = json.loads(payload) + for item in payload: + print("Notifying Cumulus: " + item["filename"]) + cumulus.notify_acquirablefile( + acquirable_id=cumulus.acquirables[item["product_slug"]], + datetime=item["datetime"], + s3_key=item["s3_key"], + ) + + notify_cumulus(download_raw_prism_early(short_name='ppt')) + notify_cumulus(download_raw_prism_early(short_name='tmax')) + notify_cumulus(download_raw_prism_early(short_name= 'tmin')) + + +prism_dag = cumulus_prism_early() diff --git a/dags/cumulus/prism_revision.py b/dags/cumulus/prism_revision.py new file mode 100644 index 0000000..b0f5ca0 --- /dev/null +++ b/dags/cumulus/prism_revision.py @@ -0,0 +1,110 @@ +""" +Acquire and Process PRISM Revisions +""" + +import json +from datetime import datetime, timedelta, timezone + +from airflow import DAG +from airflow.decorators import dag, task +from airflow.operators.python import get_current_context +from helpers.downloads import trigger_download + +import helpers.cumulus as cumulus + +default_args = { + "owner": "airflow", + "depends_on_past": False, + "start_date": (datetime.now(timezone.utc)).replace( + minute=0, second=0 + ), + # "start_date": datetime(2021, 11, 9), + "catchup_by_default": True, + # "email": ["airflow@airflow.com"], + "email_on_failure": False, + "email_on_retry": False, + "retries": 5, + "retry_delay": timedelta(minutes=30), + # 'queue': 'bash_queue', + # 'pool': 'backfill', + # 'priority_weight': 10, + # 'end_date': datetime(2016, 1, 1), +} + + +def generate_filename_for_product(product, date): + # product: 'tmin', 'tmax', 'ppt' + # date: datetime object + return f'prism_{product}_us_25m_{date.strftime("%Y%m%d")}.zip' + +@dag( + default_args=default_args, + schedule="30 12 * * 0", # run once per week sunday + tags=["cumulus", 'prism'], + max_active_runs=2, + max_active_tasks=4, +) + +def cumulus_prism_revision(): + """This pipeline handles download, processing, and derivative product creation for \n + PRISM: Min Temp (tmin) early, Max Temp (tmax) early and Precip (ppt) early + URL Dir - ftp://prism.nacse.org/daily/tmin/YYYY/ + Files matching prism_ppt_us_25m_YYYYMMDD.zip'- Daily around 12:30-14:30 UTC + """ + + URL_ROOT = f"ftp://prism.nacse.org/daily" + URL_ROOT = f"https://data.prism.oregonstate.edu/time_series/us/an/4km" + + DAYS_BACK = 190 + + # Download Tasks + ################################################# + @task() + def download_raw_prism_revision(short_name='ppt'): + product_slug = f"prism-{short_name}-early" + logical_date = get_current_context()["logical_date"] + execution_date = logical_date.date() + start_date = execution_date - timedelta(days=DAYS_BACK) + results = [] + + for offset in range(DAYS_BACK+1): # includes today + dt = start_date + timedelta(days=offset) + file_dir = f'{URL_ROOT}/{short_name}/daily/{dt.strftime("%Y")}' + filename = generate_filename_for_product(short_name, dt) + s3_key = f"{cumulus.S3_ACQUIRABLE_PREFIX}/{product_slug}/{filename}" + print(f"Downloading {filename}") + try: + output = trigger_download( + url=f"{file_dir}/{filename}", s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key, + ) + results.append( + { + "datetime": logical_date.isoformat(), + "s3_key": s3_key, + "product_slug": product_slug, + "filename": filename, + } + ) + except: + print(f'Error downloading {filename}') + return json.dumps(results) + + # Notify Tasks + ################################################# + @task() + def notify_cumulus(payload): + payload = json.loads(payload) + for item in payload: + print("Notifying Cumulus: " + item["filename"]) + cumulus.notify_acquirablefile( + acquirable_id=cumulus.acquirables[item["product_slug"]], + datetime=item["datetime"], + s3_key=item["s3_key"], + ) + + notify_cumulus(download_raw_prism_revision(short_name='ppt')) + notify_cumulus(download_raw_prism_revision(short_name='tmax')) + notify_cumulus(download_raw_prism_revision(short_name= 'tmin')) + + +prism_dag = cumulus_prism_revision()