diff --git a/dags/cumulus/aprfc_qpe_06h.py b/dags/cumulus/aprfc_qpe_06h.py index 4fcb486..91c7ea8 100644 --- a/dags/cumulus/aprfc_qpe_06h.py +++ b/dags/cumulus/aprfc_qpe_06h.py @@ -9,7 +9,7 @@ from airflow import DAG from airflow.decorators import dag, task from airflow.operators.python import get_current_context -from helpers.downloads import trigger_download +from helpers.downloads import s3_file_exists, trigger_download import helpers.cumulus as cumulus @@ -65,6 +65,9 @@ def download_raw_qpe(): url = f"{URL_ROOT}/{filename}" filename = filename.split("/")[1] s3_key = f"{key_prefix}/{PRODUCT_SLUG}/{filename}" + if s3_file_exists(cumulus.S3_BUCKET, s3_key): + print(f"Skipping existing S3 object: s3://{cumulus.S3_BUCKET}/{s3_key}") + continue # Skip to the next file print(f"Downloading file: {filename}") try: trigger_download(url=url, s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key) diff --git a/dags/cumulus/aprfc_qpf_06h.py b/dags/cumulus/aprfc_qpf_06h.py index e87e559..33717bb 100644 --- a/dags/cumulus/aprfc_qpf_06h.py +++ b/dags/cumulus/aprfc_qpf_06h.py @@ -30,23 +30,34 @@ } +QPF_REGEX = re.compile( + r"^qpf06f_has_" # literal prefix with underscore + r".*?" # anything up to _awips_ + r"_awips_" + r"(\d{12})" # group 1: 12-digit forecast time (2025121112) + r"_" + r"(\d{10})" # group 2: 10-digit issuance (202512030603) + r"f" + r"(\d{3})" # group 3: forecast hour (192) + r"\.grb(?:\.gz)?$" # .grb or .grb.gz, no extra .digits +) + def get_latest_files(filenames): # Dictionary to store the latest file for each unique timestamp latest_files = {} - # Corrected regular expression - pattern = r"^qpf06f_has_.*_awips_(\d{12})_(\d{10}f\d{3})\.\d+\.(grb|grb\.gz)$" for filename in filenames: - match = re.search(pattern, filename) - if match: - # Combine the issue timestamp and forecast timestamp as the key - issue_timestamp = match.group(1) - forecast_timestamp = match.group(2) - key = issue_timestamp + "_" + forecast_timestamp - # Update the latest file for the key if it's not present or if the current filename is greater - if key not in latest_files or filename > latest_files[key]: - latest_files[key] = filename - + m = QPF_REGEX.match(filename) + if not m: + continue + issue_ts = m.group(2) # 10-digit issuance + base_ts = m.group(1) # 12-digit valid/base + forecast_hr = m.group(3) # 3-digit forecast hour + key = f"{issue_ts}_{base_ts}_{forecast_hr}" + # Update the latest file for the key if it's not present or if the current filename is greater + + if key not in latest_files or filename > latest_files[key]: + latest_files[key] = filename # Return the list of latest files return list(latest_files.values()) @@ -63,21 +74,20 @@ def get_filenames(edate, url): soup = BeautifulSoup(page.content, "html.parser") links = [node.get("href") for node in soup.find_all("a")] - regex = r"^qpf06f_has_.*_awips_\d{12}_\d{10}f\d{3}\.\d+\.(grb|grb\.gz)$" - issue_re = re.compile( - r"_awips_(\d{12})" - ) # Capture the issuance timestamp after 'awips_' - - # Collect candidates with their issuance timestamp candidates = [] for link in links: - if link and re.match(regex, link): - m = issue_re.search(link) - if m: - candidates.append((link, m.group(1))) - + if not link: + continue + link = link.strip() + m = QPF_REGEX.match(link) + if m: + issue_ts = m.group(1) # 12-digit issuance after _awips_ + candidates.append((link, issue_ts)) + # print(f"DEBUG link: {link}, issue_ts: {issue_ts}") + # No matching files if not candidates: + print("DEBUG: no candidates matched QPF_REGEX") return [] # Find the latest issuance @@ -92,6 +102,13 @@ def get_filenames(edate, url): if latest_issue_dt > edate or (edate - latest_issue_dt) > timedelta(hours=36): # Outside the 36-hour window or in the future: do not download anything return [] + # print(f"DEBUG found {len(candidates)} matching files; latest_issue_str={latest_issue_str}") + + # print(f"DEBUG latest_issue_str: {latest_issue_str}") + # print(f"DEBUG latest_issue_dt: {latest_issue_dt.isoformat()}") + # print(f"DEBUG edate (logical_date): {edate.isoformat()}") + # print(f"DEBUG age_hours: {(edate - latest_issue_dt).total_seconds() / 3600.0}") + # Filter to only files from the latest issuance latest_issue_files = [fn for fn, issue in candidates if issue == latest_issue_str] @@ -128,10 +145,9 @@ def download_raw_qpf(): url = f"{URL_ROOT}/{filename}" s3_key = f"{key_prefix}/{PRODUCT_SLUG}/{filename}" # Check if the file already exists in S3 - # needs infrastructure change to reenable this check - #if s3_file_exists(cumulus.S3_BUCKET, s3_key): - # print(f"Skipping existing S3 object: s3://{cumulus.S3_BUCKET}/{s3_key}") - # continue # Skip to the next file + if s3_file_exists(cumulus.S3_BUCKET, s3_key): + print(f"Skipping existing S3 object: s3://{cumulus.S3_BUCKET}/{s3_key}") + continue # Skip to the next file print(f"Downloading file: {filename}") try: trigger_download(url=url, s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key) diff --git a/dags/cumulus/aprfc_qte_01h.py b/dags/cumulus/aprfc_qte_01h.py index 7dbaddc..f8f3a66 100644 --- a/dags/cumulus/aprfc_qte_01h.py +++ b/dags/cumulus/aprfc_qte_01h.py @@ -13,17 +13,17 @@ from airflow.decorators import dag, task from airflow.operators.python import get_current_context from airflow.utils.task_group import TaskGroup -from helpers.downloads import trigger_download +from helpers.downloads import s3_file_exists, trigger_download import helpers.cumulus as cumulus default_args = { "owner": "airflow", "depends_on_past": False, - "start_date": (datetime.now(timezone.utc) - timedelta(hours=48)).replace( + "start_date": (datetime.now(timezone.utc) - timedelta(hours=72)).replace( minute=0, second=0 ), - "catchup_by_default": False, + "catchup": True, "email_on_failure": False, "email_on_retry": False, "retries": 1, @@ -35,8 +35,8 @@ default_args=default_args, tags=["cumulus", "AIRTEMP", "QTE", "APRFC"], schedule="45 * * * *", - max_active_runs=1, - max_active_tasks=1, + max_active_runs=2, + max_active_tasks=4, ) def cumulus_aprfc_qte_01h(): """ @@ -58,51 +58,71 @@ def cumulus_aprfc_qte_01h(): URL_ROOT = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/urma/prod/" PRODUCT_SLUG = "aprfc-qte-01h" + LOOKBACK_HOURS = 12 # number of hours from runtime to look back for - filename_template = Template("akurma.t${hr_}z.2dvaranl_ndfd_3p0.grb2 ") + filename_template = Template("akurma.t${hr_}z.2dvaranl_ndfd_3p0.grb2") url_suffix_template = Template("akurma.${date_}") @task() def download_raw_qte(): logical_date = get_current_context()["logical_date"] - date_only = logical_date.strftime("%Y%m%d") + anchor = get_current_context()["data_interval_end"].replace(minute=0, second=0, microsecond=0) - url_suffix = url_suffix_template.substitute( - date_=date_only, - ) + results = [] - filename = filename_template.substitute( - hr_=logical_date.strftime("%H"), - ) + for offset in range(LOOKBACK_HOURS): + ts = anchor - timedelta(hours=1 + offset) # last complete hour, then look back + date_only = ts.strftime("%Y%m%d") + hour_str = ts.strftime("%H") - file_dir = f"{URL_ROOT}{url_suffix}" - s3_filename = f"{date_only}_{filename}" - s3_key = f"{key_prefix}/{PRODUCT_SLUG}/{s3_filename}" + url_suffix = url_suffix_template.substitute( + date_=date_only, + ) - print(f"Downloading file: {filename}") + filename = filename_template.substitute( + hr_=hour_str, + ) - trigger_download( - url=f"{file_dir}/{filename}", s3_bucket=s3_bucket, s3_key=s3_key - ) - return json.dumps( - { - "execution": logical_date.isoformat(), - "s3_key": s3_key, - "filename": s3_filename, - } - ) + file_dir = f"{URL_ROOT}{url_suffix}" + + s3_filename = f"{date_only}_{filename}" + s3_key = f"{key_prefix}/{PRODUCT_SLUG}/{s3_filename}" + + if s3_file_exists(cumulus.S3_BUCKET, s3_key): + print(f"Skipping existing S3 object: s3://{cumulus.S3_BUCKET}/{s3_key}") + continue # Skip to the next file + + print(f"Downloading file: {url_suffix}/{filename}") + + try: + + trigger_download( + url=f"{file_dir}/{filename}", s3_bucket=s3_bucket, s3_key=s3_key + ) + except: + print(f'Failed downloading {filename}') + + results.append( + { + "execution": ts.isoformat(), + "s3_key": s3_key, + "filename": s3_filename, + } + ) + return json.dumps(results) @task() def notify_cumulus(payload): payload = json.loads(payload) - print("Notifying Cumulus: " + payload["filename"]) - cumulus.notify_acquirablefile( - acquirable_id=cumulus.acquirables[PRODUCT_SLUG], - datetime=payload["execution"], - s3_key=payload["s3_key"], - ) + for item in payload: + print("Notifying Cumulus: " + item["filename"]) + cumulus.notify_acquirablefile( + acquirable_id=cumulus.acquirables[PRODUCT_SLUG], + datetime=item["execution"], + s3_key=item["s3_key"], + ) notify_cumulus(download_raw_qte()) diff --git a/dags/cumulus/aprfc_qtf_01h.py b/dags/cumulus/aprfc_qtf_01h.py index 3ec1098..dcc760f 100644 --- a/dags/cumulus/aprfc_qtf_01h.py +++ b/dags/cumulus/aprfc_qtf_01h.py @@ -131,10 +131,9 @@ def download_raw_qtf(): url = f"{URL_ROOT}/{filename}" s3_key = f"{key_prefix}/{PRODUCT_SLUG}/{filename}" # Check if the file already exists in S3 - #need infrastructure changes to reenable this check - #if s3_file_exists(cumulus.S3_BUCKET, s3_key): - # print(f"Skipping existing S3 object: s3://{cumulus.S3_BUCKET}/{s3_key}") - # continue # Skip to the next file + if s3_file_exists(cumulus.S3_BUCKET, s3_key): + print(f"Skipping existing S3 object: s3://{cumulus.S3_BUCKET}/{s3_key}") + continue # Skip to the next file print(f"Downloading file: {filename}") try: trigger_download(url=url, s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key) diff --git a/dags/cumulus/prism_early.py b/dags/cumulus/prism_early.py index c6dff2f..dd82df1 100644 --- a/dags/cumulus/prism_early.py +++ b/dags/cumulus/prism_early.py @@ -62,7 +62,7 @@ def cumulus_prism_early(): def download_raw_prism_early(short_name='ppt'): product_slug = f"prism-{short_name}-early" logical_date = get_current_context()["logical_date"] - execution_date = logical_date.date() + execution_date = (get_current_context()["data_interval_end"]).date()-timedelta(hours=24) results = [] dt = execution_date @@ -70,20 +70,19 @@ def download_raw_prism_early(short_name='ppt'): filename = generate_filename_for_product(short_name, dt) s3_key = f"{cumulus.S3_ACQUIRABLE_PREFIX}/{product_slug}/{filename}" print(f"Downloading {filename}") - try: - output = trigger_download( - url=f"{file_dir}/{filename}", s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key, - ) - results.append( - { - "datetime": logical_date.isoformat(), - "s3_key": s3_key, - "product_slug": product_slug, - "filename": filename, - } - ) - except: - print(f'Error downloading {filename}') + + output = trigger_download( + url=f"{file_dir}/{filename}", s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key, + ) + results.append( + { + "datetime": logical_date.isoformat(), + "s3_key": s3_key, + "product_slug": product_slug, + "filename": filename, + } + ) + return json.dumps(results) # Notify Tasks diff --git a/dags/cumulus/prism_revision.py b/dags/cumulus/prism_revision.py index b0f5ca0..5c4e451 100644 --- a/dags/cumulus/prism_revision.py +++ b/dags/cumulus/prism_revision.py @@ -62,7 +62,7 @@ def cumulus_prism_revision(): @task() def download_raw_prism_revision(short_name='ppt'): product_slug = f"prism-{short_name}-early" - logical_date = get_current_context()["logical_date"] + logical_date = get_current_context()["logical_date"]-timedelta(hours=24) execution_date = logical_date.date() start_date = execution_date - timedelta(days=DAYS_BACK) results = [] diff --git a/dags/wmes/wmes_daily_jobs.py b/dags/wmes/cwms_daily_jobs.py similarity index 89% rename from dags/wmes/wmes_daily_jobs.py rename to dags/wmes/cwms_daily_jobs.py index 0be7129..1a5ad0e 100644 --- a/dags/wmes/wmes_daily_jobs.py +++ b/dags/wmes/cwms_daily_jobs.py @@ -28,7 +28,7 @@ max_active_runs=1, max_active_tasks=4, ) -def wmes_daily_jobs(): +def cwms_daily_jobs(): job_configs = [ {"office": "lrc", "office_group": "lrd", "enabled": False}, {"office": "lre", "office_group": "lrd", "enabled": False}, @@ -36,7 +36,9 @@ def wmes_daily_jobs(): {"office": "lrl", "office_group": "lrd", "enabled": False}, {"office": "lrn", "office_group": "lrd", "enabled": False}, {"office": "lrp", "office_group": "lrd", "enabled": False}, - {"office": "swt", "office_group": "swd", + { + "office": "swt", + "office_group": "swd", "github_branch": "cwbi-restructure", "enabled": True, }, @@ -62,15 +64,15 @@ def launch_batch(job_config): logical_date = get_current_context()["logical_date"] dag = DagContext.get_current_dag() - job_name = f"wmes-{job_config['office']}-daily-job-{logical_date.strftime('%Y%m%d-%H%M')}" + job_name = f"cwms-{job_config['office']}-daily-job-{logical_date.strftime('%Y%m%d-%H%M')}" return batch.batch_operator( dag=dag, task_id=job_name, deferrable=True, container_overrides={}, job_name=job_name, - job_queue=f"wmes-{job_config['office_group']}-jq", - job_definition=f"wmes-{job_config['office']}-jobs-jobdef", + job_queue=f"cwms-{job_config['office_group']}-jq", + job_definition=f"cwms-{job_config['office']}-jobs-jobdef", local_command=[], # local docker mock only tags={"Office": job_config["office"]}, ).execute({}) @@ -79,4 +81,4 @@ def launch_batch(job_config): launch_batch.override(task_id=f"{jc['office']}-jobs")(jc) -wmes_jobs_dag = wmes_daily_jobs() +cwms_jobs_dag = cwms_daily_jobs() diff --git a/dags/wmes/wmes_hourly_jobs.py b/dags/wmes/cwms_hourly_jobs.py similarity index 92% rename from dags/wmes/wmes_hourly_jobs.py rename to dags/wmes/cwms_hourly_jobs.py index dae705d..b420420 100644 --- a/dags/wmes/wmes_hourly_jobs.py +++ b/dags/wmes/cwms_hourly_jobs.py @@ -28,7 +28,7 @@ max_active_runs=1, max_active_tasks=4, ) -def wmes_hourly_jobs(): +def cwms_hourly_jobs(): job_configs = [ { "office": "lrc", @@ -65,7 +65,7 @@ def wmes_hourly_jobs(): "office_group": "swd", "enabled": True, }, - { + { "office": "spk", "office_group": "spd", "enabled": True, @@ -91,7 +91,7 @@ def launch_batch(job_config): logical_date = get_current_context()["logical_date"] dag = DagContext.get_current_dag() - job_name = f"wmes-{job_config['office']}-hourly-job-{logical_date.strftime('%Y%m%d-%H%M')}" + job_name = f"cwms-{job_config['office']}-hourly-job-{logical_date.strftime('%Y%m%d-%H%M')}" return batch.batch_operator( dag=dag, task_id=job_name, @@ -103,8 +103,8 @@ def launch_batch(job_config): "command": ["/jobs/bin/hourly.sh"], }, job_name=job_name, - job_queue=f"wmes-{job_config['office_group']}-jq", - job_definition=f"wmes-{job_config['office']}-jobs-jobdef", + job_queue=f"cwms-{job_config['office_group']}-jq", + job_definition=f"cwms-{job_config['office']}-jobs-jobdef", local_command=[], # local docker mock only tags={"Office": job_config["office"]}, ).execute({}) @@ -113,4 +113,4 @@ def launch_batch(job_config): launch_batch.override(task_id=f"{jc['office']}-jobs")(jc) -wmes_jobs_dag = wmes_hourly_jobs() +cwms_jobs_dag = cwms_hourly_jobs()