From 410dd9609714103abccecae8341ea73443509158 Mon Sep 17 00:00:00 2001 From: jross Date: Fri, 2 May 2025 12:37:11 -0600 Subject: [PATCH 1/3] added waterlevels --- .gitignore | 1 + backend/config.py | 2 + backend/persisters/geoserver.py | 5 ++- dagsterio/__init__.py | 10 +++++ dagsterio/assets.py | 67 ++++++++++++++++++++++++++++ dagsterio/config/example_config.yaml | 24 ++++++++++ frontend/cli.py | 39 ++++++++-------- 7 files changed, 127 insertions(+), 21 deletions(-) create mode 100644 dagsterio/__init__.py create mode 100644 dagsterio/assets.py create mode 100644 dagsterio/config/example_config.yaml diff --git a/.gitignore b/.gitignore index cc04cc7..af17ee7 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ frontend/requirements.txt frontend/output_timeseries backend/test00112233.csv frontend/api/cache +dagsterio/config/*.dev.yaml # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/backend/config.py b/backend/config.py index 96d379d..0ce94d2 100644 --- a/backend/config.py +++ b/backend/config.py @@ -557,6 +557,8 @@ def _update_output_units(self): parameter = self.parameter.lower() if parameter == "ph": self.analyte_output_units = "" + elif parameter == "waterlevels": + self.analyte_output_units = "dtwbgs (ft)" @property def start_dt(self): diff --git a/backend/persisters/geoserver.py b/backend/persisters/geoserver.py index a6a38de..7ffbb20 100644 --- a/backend/persisters/geoserver.py +++ b/backend/persisters/geoserver.py @@ -36,6 +36,7 @@ def session_factory(connection: dict): url = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}" engine = create_engine(url) + SessionFactory = sessionmaker(autocommit=False, autoflush=False, bind=engine) return SessionFactory @@ -102,7 +103,7 @@ def __init__(self, *args, **kwargs): self._connection = None self._connect() - def dump_sites(self, path: str): + def dump_sites(self, path: str = None): if self.sites: db = self.config.get('geoserver').get('db') dbname = db.get('db_name') @@ -111,7 +112,7 @@ def dump_sites(self, path: str): else: self.log("no sites to dump", fg="red") - def dump_summary(self, path: str): + def dump_summary(self, path: str = None): if self.records: db = self.config.get('geoserver').get('db') dbname = db.get('db_name') diff --git a/dagsterio/__init__.py b/dagsterio/__init__.py new file mode 100644 index 0000000..28a0970 --- /dev/null +++ b/dagsterio/__init__.py @@ -0,0 +1,10 @@ +# =============================================================================== +# Author: Jake Ross +# Copyright 2025 New Mexico Bureau of Geology & Mineral Resources +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 +# =============================================================================== + + +# ============= EOF ============================================= diff --git a/dagsterio/assets.py b/dagsterio/assets.py new file mode 100644 index 0000000..f569d2b --- /dev/null +++ b/dagsterio/assets.py @@ -0,0 +1,67 @@ +# =============================================================================== +# Author: Jake Ross +# Copyright 2025 New Mexico Bureau of Geology & Mineral Resources +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 +# =============================================================================== +import os +import dagster as dg + +from backend.config import Config +from backend.logger import setup_logging +from backend.unifier import unify_analytes, unify_waterlevels + + +@dg.asset +def tds(): + """TDS asset""" + config_path = os.path.join( + os.path.dirname(__file__), 'config', 'tds_config.dev.yaml' + ) + + config = Config(path=config_path) + config.parameter = 'tds' + config.finalize() + + # setup logging here so that the path can be set to config.output_path + #setup_logging(path=config.output_path) + + # with geoserver.get_connection() as conn: + unify_analytes(config) + + +@dg.asset +def summary_waterlevels(): + """Summary water levels asset""" + config_path = os.path.join( + os.path.dirname(__file__), 'config', 'summary_waterlevels_config.dev.yaml' + ) + + config = Config(path=config_path) + config.parameter = 'waterlevels' + config.finalize() + + # setup logging here so that the path can be set to config.output_path + #setup_logging(path=config.output_path) + + # with geoserver.get_connection() as conn: + unify_waterlevels(config) + + +defs = dg.Definitions( + assets=[tds, summary_waterlevels], + schedules=[ + dg.ScheduleDefinition( + name='tds_schedule', + # job=tds.to_job(), + target=dg.AssetSelection.keys("tds"), + cron_schedule='0 0 * * *', + execution_timezone='America/Denver', + ) + ], + # resources={ + # 'config': dg.ResourceDefinition.hardcoded_resource(Config()), + # }, +) +# ============= EOF ============================================= diff --git a/dagsterio/config/example_config.yaml b/dagsterio/config/example_config.yaml new file mode 100644 index 0000000..ccfbbda --- /dev/null +++ b/dagsterio/config/example_config.yaml @@ -0,0 +1,24 @@ +yes: True +output_format: geoserver +output_summary: True +geoserver: + db: + host: localhost + port: 5432 + dbname: + user: + password: + +sources: + bernco: True + bor: True + cabq: True + ebid: False + nmbgmr_amp: False + nmed_dwb: False + nmose_isc_seven_rivers: True + nmose_roswell: False + nwis: False + pvacd: True + wqp: False + nmose_pod: False diff --git a/frontend/cli.py b/frontend/cli.py index 68d8d4a..6dc1bef 100644 --- a/frontend/cli.py +++ b/frontend/cli.py @@ -267,26 +267,27 @@ def weave( config.parameter = parameter - # output type - if output == "summary": - summary = True - timeseries_unified = False - timeseries_separated = False - elif output == "timeseries_unified": - summary = False - timeseries_unified = True - timeseries_separated = False - elif output == "timeseries_separated": - summary = False - timeseries_unified = False - timeseries_separated = True - else: - click.echo(f"Invalid output type: {output}") - return + if config_path is None: + # output type + if output == "summary": + summary = True + timeseries_unified = False + timeseries_separated = False + elif output == "timeseries_unified": + summary = False + timeseries_unified = True + timeseries_separated = False + elif output == "timeseries_separated": + summary = False + timeseries_unified = False + timeseries_separated = True + else: + click.echo(f"Invalid output type: {output}") + return - config.output_summary = summary - config.output_timeseries_unified = timeseries_unified - config.output_timeseries_separated = timeseries_separated + # config.output_summary = summary + # config.output_timeseries_unified = timeseries_unified + # config.output_timeseries_separated = timeseries_separated config_agencies, false_agencies = config.get_config_and_false_agencies() From 0cddc4615ef141e83b5e4f09d06aeb547258f995 Mon Sep 17 00:00:00 2001 From: jross Date: Mon, 5 May 2025 09:49:45 -0600 Subject: [PATCH 2/3] refactor --- .gitignore | 1 + backend/persisters/geoserver.py | 173 ++-------------------- backend/persisters/geoserver_db_models.py | 100 +++++++++++++ dagsterio/assets.py | 102 ++++++++++--- dagsterio/config/source_constants.py | 38 +++++ requirements.txt | 4 +- 6 files changed, 235 insertions(+), 183 deletions(-) create mode 100644 backend/persisters/geoserver_db_models.py create mode 100644 dagsterio/config/source_constants.py diff --git a/.gitignore b/.gitignore index af17ee7..01cbee9 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ frontend/output_timeseries backend/test00112233.csv frontend/api/cache dagsterio/config/*.dev.yaml +dagsterio/dagster_home # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/backend/persisters/geoserver.py b/backend/persisters/geoserver.py index 7ffbb20..f1d27fa 100644 --- a/backend/persisters/geoserver.py +++ b/backend/persisters/geoserver.py @@ -5,97 +5,17 @@ # You may not use this file except in compliance with the License. # You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 # =============================================================================== -import json -import os import time from itertools import groupby +from typing import Callable -import psycopg2 from shapely.geometry.multipoint import MultiPoint from shapely.geometry.point import Point -from sqlalchemy.dialects.postgresql import JSONB, insert -from sqlalchemy.orm import declarative_base, sessionmaker, relationship +from sqlalchemy.dialects.postgresql import insert from backend.persister import BasePersister -from sqlalchemy import Column, ForeignKey, create_engine, UUID, String, Integer, Float, Date, Time -from geoalchemy2 import Geometry - -Base = declarative_base() -# dbname=db.get('dbname'), -# user=db.get('user'), -# password=db.get('password'), -# host=db.get('host'), -# port=db.get('port'), -def session_factory(connection: dict): - user = connection.get("user", "postgres") - password = connection.get("password", "") - host = connection.get("host", "localhost") - port = connection.get("port", 5432) - database = connection.get("dbname", "gis") - - url = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}" - engine = create_engine(url) - - SessionFactory = sessionmaker(autocommit=False, autoflush=False, bind=engine) - return SessionFactory - - -class Location(Base): - __tablename__ = "tbl_location" - - id = Column(Integer, primary_key=True, index=True) - name = Column(String) - data_source_uid = Column(String, index=True) - - properties = Column(JSONB) - geometry = Column(Geometry(geometry_type="POINT", srid=4326)) - source_slug = Column(String, ForeignKey("tbl_sources.name")) - - source = relationship("Sources", backref="locations") - - -class Summary(Base): - __tablename__ = "tbl_summary" - - id = Column(Integer, primary_key=True, index=True) - name = Column(String) - data_source_uid = Column(String, index=True) - - properties = Column(JSONB) - geometry = Column(Geometry(geometry_type="POINT", srid=4326)) - source_slug = Column(String, ForeignKey("tbl_sources.name")) - parameter_slug = Column(String, ForeignKey("tbl_parameters.name")) - - source = relationship("Sources", backref="summaries") - - value = Column(Float) - nrecords = Column(Integer) - min = Column(Float) - max = Column(Float) - mean = Column(Float) - - latest_value = Column(Float) - latest_date = Column(Date) - latest_time = Column(Time) - - earliest_value = Column(Float) - earliest_date = Column(Date) - earliest_time = Column(Time) - - -class Parameters(Base): - __tablename__ = "tbl_parameters" - name = Column(String, primary_key=True, index=True) - units = Column(String) - - -class Sources(Base): - __tablename__ = "tbl_sources" - id = Column(Integer) - name = Column(String, primary_key=True, index=True) - convex_hull = Column(Geometry(geometry_type="POLYGON", srid=4326)) - +from backend.persisters.geoserver_db_models import session_factory, Location, Summary, Parameters, Sources class GeoServerPersister(BasePersister): def __init__(self, *args, **kwargs): @@ -167,7 +87,6 @@ def _write_parameters(self): sql = insert(Parameters).values([{"name": self.config.parameter, "units": self.config.analyte_output_units}]).on_conflict_do_nothing( index_elements=[Parameters.name],) - print(sql) conn.execute(sql) conn.commit() @@ -208,19 +127,6 @@ def make_stmt(chunk): self._chunk_insert(make_stmt, records) - def _chunk_insert(self, make_stmt, records: list, chunk_size: int = 10): - for i in range(0, len(records), chunk_size): - chunk = records[i:i + chunk_size] - print(f"Writing chunk {i // chunk_size + 1} of {len(records) // chunk_size + 1}") - st = time.time() - - stmt = make_stmt(chunk) - with self._connection as conn: - conn.execute(stmt) - conn.commit() - - print('Chunk write time:', time.time() - st) - def _write_to_sites(self, records: list): """ Write records to a PostgreSQL database in optimized chunks. @@ -251,66 +157,17 @@ def make_stmt(chunk): self._chunk_insert(make_stmt, records, chunk_size) - # - # newrecords = [] - # records = sorted(records, key=lambda r: str(r.id)) - # for name, gs in groupby(records, lambda r: str(r.id)): - # gs = list(gs) - # n = len(gs) - # # print(f"Writing {n} records for {name}") - # if n>1: - # if n > len({r.source for r in gs}): - # print("Duplicate source name found. Skipping...", name, [(r.name, r.source) for r in gs]) - # continue - # newrecords.extend(gs) - # # break - # # pass - # # print("Duplicate source name found. Skipping...", name, [r.source for r in gs]) - # # break - # - # - # for i in range(0, len(newrecords), chunk_size): - # chunk = newrecords[i:i + chunk_size] - # print(f"Writing chunk {i // chunk_size + 1} of {len(records) // chunk_size + 1}") - # st = time.time() - # - # values = [ - # { - # "name": record.name, - # "data_source_uid": record.id, - # "properties": record.to_dict(keys), - # "geometry": f"SRID=4326;POINT({record.longitude} {record.latitude})", - # "source_slug": record.source, - # } - # for record in chunk - # ] - # - # # stmt = insert(Location).values(values).on_conflict_do_nothing() - # linsert = insert(Location) - # stmt = linsert.values(values).on_conflict_do_update( - # index_elements=[Location.data_source_uid], - # set_={"properties": linsert.excluded.properties} - # ) - # - # with self._connection as conn: - # conn.execute(stmt) - # conn.commit() - # - # print('Chunk write time:', time.time() - st) + def _chunk_insert(self, make_stmt: Callable, records: list, chunk_size: int = 10): + for i in range(0, len(records), chunk_size): + chunk = records[i:i + chunk_size] + print(f"Writing chunk {i // chunk_size + 1} of {len(records) // chunk_size + 1}") + st = time.time() + + stmt = make_stmt(chunk) + with self._connection as conn: + conn.execute(stmt) + conn.commit() + + print('Chunk write time:', time.time() - st) - # # Pre-serialize properties to reduce processing time - # values = [ - # (record.name, json.dumps(record.to_dict(keys)), record.longitude, record.latitude, record.source) - # for record in chunk - # ] - # - # with self._connection.cursor() as cursor: - # sql = """INSERT INTO public.tbl_location (name, properties, geometry, source_slug) - # VALUES (%s, %s, public.ST_SetSRID(public.ST_MakePoint(%s, %s), 4326), %s) - # ON CONFLICT (name) DO UPDATE SET properties = EXCLUDED.properties;""" - # cursor.executemany(sql, values) - # - # self._connection.commit() # Commit once per chunk - # print('Chunk write time:', time.time() - st) - # break # ============= EOF ============================================= diff --git a/backend/persisters/geoserver_db_models.py b/backend/persisters/geoserver_db_models.py new file mode 100644 index 0000000..acab86b --- /dev/null +++ b/backend/persisters/geoserver_db_models.py @@ -0,0 +1,100 @@ +# =============================================================================== +# Author: Jake Ross +# Copyright 2025 New Mexico Bureau of Geology & Mineral Resources +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 +# =============================================================================== +from geoalchemy2 import Geometry +from google.cloud.sql.connector import Connector +from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, Float, Date, Time +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.orm import declarative_base, sessionmaker, relationship + +Base = declarative_base() + + +def session_factory(connection: dict): + user = connection.get("user", "postgres") + password = connection.get("password", "") + port = connection.get("port", 5432) + database = connection.get("dbname", "gis") + driver = connection.get("driver", "pg8000") + + url = f'postgresql+{driver}://' + if connection.get("cloud_sql"): + connector= Connector() + instance_connection_name = connection.get("instance_connection_name") + print("Connecting to Cloud SQL instance:", instance_connection_name) + def get_conn(): + return connector.connect( + instance_connection_name, + 'pg8000', + user=user, + password=password, + db=database, + ) + engine = create_engine(url, creator=get_conn) + else: + host = connection.get("host", "localhost") + url = f"{url}{user}:{password}@{host}:{port}/{database}" + engine = create_engine(url) + + return sessionmaker(autocommit=False, autoflush=False, bind=engine) + + +class Location(Base): + __tablename__ = "tbl_location" + + id = Column(Integer, primary_key=True, index=True) + name = Column(String) + data_source_uid = Column(String, index=True) + + properties = Column(JSONB) + geometry = Column(Geometry(geometry_type="POINT", srid=4326)) + source_slug = Column(String, ForeignKey("tbl_sources.name")) + + source = relationship("Sources", backref="locations") + + +class Summary(Base): + __tablename__ = "tbl_summary" + + id = Column(Integer, primary_key=True, index=True) + name = Column(String) + data_source_uid = Column(String, index=True) + + properties = Column(JSONB) + geometry = Column(Geometry(geometry_type="POINT", srid=4326)) + source_slug = Column(String, ForeignKey("tbl_sources.name")) + parameter_slug = Column(String, ForeignKey("tbl_parameters.name")) + + source = relationship("Sources", backref="summaries") + + value = Column(Float) + nrecords = Column(Integer) + min = Column(Float) + max = Column(Float) + mean = Column(Float) + + latest_value = Column(Float) + latest_date = Column(Date) + latest_time = Column(Time) + + earliest_value = Column(Float) + earliest_date = Column(Date) + earliest_time = Column(Time) + + +class Parameters(Base): + __tablename__ = "tbl_parameters" + name = Column(String, primary_key=True, index=True) + units = Column(String) + + +class Sources(Base): + __tablename__ = "tbl_sources" + id = Column(Integer) + name = Column(String, primary_key=True, index=True) + convex_hull = Column(Geometry(geometry_type="POLYGON", srid=4326)) +# ============= EOF ============================================= diff --git a/dagsterio/assets.py b/dagsterio/assets.py index f569d2b..311218c 100644 --- a/dagsterio/assets.py +++ b/dagsterio/assets.py @@ -6,59 +6,113 @@ # You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 # =============================================================================== import os +from os import getenv +from typing import Callable + import dagster as dg from backend.config import Config from backend.logger import setup_logging from backend.unifier import unify_analytes, unify_waterlevels +from dagsterio.config.source_constants import ALL_SOURCES, NMBGMR_SOURCES @dg.asset -def tds(): +def all_tds(): """TDS asset""" - config_path = os.path.join( - os.path.dirname(__file__), 'config', 'tds_config.dev.yaml' + + _analyte( + 'tds', + sources=ALL_SOURCES ) - config = Config(path=config_path) - config.parameter = 'tds' - config.finalize() - # setup logging here so that the path can be set to config.output_path - #setup_logging(path=config.output_path) +@dg.asset +def all_waterlevels(): + """Summary water levels asset""" + _waterlevels( + sources=ALL_SOURCES, + ) + - # with geoserver.get_connection() as conn: - unify_analytes(config) +@dg.asset +def nmbgmr_waterlevels(): + """NMBGMR water levels asset""" + + _waterlevels( + sources=NMBGMR_SOURCES, + ) @dg.asset -def summary_waterlevels(): - """Summary water levels asset""" - config_path = os.path.join( - os.path.dirname(__file__), 'config', 'summary_waterlevels_config.dev.yaml' +def nmbgmr_tds(): + """NMBGMR TDS asset""" + _analyte( + 'tds', + sources=NMBGMR_SOURCES, ) - config = Config(path=config_path) - config.parameter = 'waterlevels' + +def _get_geoserver_connection(): + return { 'db': + { + 'dbname': getenv('GEOSERVER_DBNAME'), + 'user': getenv('GEOSERVER_USER'), + 'password': getenv('GEOSERVER_PASSWORD'), + 'instance_connection_name': getenv('GEOSERVER_INSTANCE_CONNECTION_NAME'), + 'cloud_sql': True + } + } + + +def _waterlevels(**payload): + _unify(unify_waterlevels, 'waterlevels', payload) + + +def _analyte(param: str, **payload): + _unify(unify_analytes, param, payload) + + +def _unify(func: Callable[[Config,], None], parameter: str, payload: dict): + payload['yes'] = True + payload['geoserver'] = _get_geoserver_connection() + payload['output_summary'] = True + payload['output_format']= 'geoserver' + config = Config(payload=payload) + config.parameter = parameter config.finalize() - # setup logging here so that the path can be set to config.output_path - #setup_logging(path=config.output_path) + func(config) - # with geoserver.get_connection() as conn: - unify_waterlevels(config) defs = dg.Definitions( - assets=[tds, summary_waterlevels], + assets=[all_tds, all_waterlevels, nmbgmr_tds, nmbgmr_waterlevels], schedules=[ dg.ScheduleDefinition( name='tds_schedule', - # job=tds.to_job(), - target=dg.AssetSelection.keys("tds"), + target=dg.AssetSelection.keys("all_tds"), + cron_schedule='0 0 * * *', + execution_timezone='America/Denver', + ), + dg.ScheduleDefinition( + name='waterlevels_schedule', + target=dg.AssetSelection.keys("all_waterlevels"), + cron_schedule='0 0 * * *', + execution_timezone='America/Denver', + ), + dg.ScheduleDefinition( + name='nmbgmr_tds_schedule', + target=dg.AssetSelection.keys("nmbgmr_tds"), + cron_schedule='0 0 * * *', + execution_timezone='America/Denver', + ), + dg.ScheduleDefinition( + name='nmbgmr_waterlevels_schedule', + target=dg.AssetSelection.keys("nmbgmr_waterlevels"), cron_schedule='0 0 * * *', execution_timezone='America/Denver', - ) + ), ], # resources={ # 'config': dg.ResourceDefinition.hardcoded_resource(Config()), diff --git a/dagsterio/config/source_constants.py b/dagsterio/config/source_constants.py new file mode 100644 index 0000000..95ccce7 --- /dev/null +++ b/dagsterio/config/source_constants.py @@ -0,0 +1,38 @@ +# =============================================================================== +# Author: Jake Ross +# Copyright 2025 New Mexico Bureau of Geology & Mineral Resources +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 +# =============================================================================== + +ALL_SOURCES = { + 'bernco': True, + 'bor': True, + 'cabq': True, + 'ebid': True, + 'nmbgmr_amp': True, + 'nmed_dwb': True, + 'nmose_isc_seven_rivers': True, + 'nmose_roswell': True, + 'nwis': True, + 'pvacd': True, + 'wqp': True, + 'nmose_pod': False, +} + +NMBGMR_SOURCES = { + 'bernco': False, + 'bor': False, + 'cabq': False, + 'ebid': False, + 'nmbgmr_amp': True, + 'nmed_dwb': False, + 'nmose_isc_seven_rivers': False, + 'nmose_roswell': False, + 'nwis': False, + 'pvacd': False, + 'wqp': False, + 'nmose_pod': False, +} +# ============= EOF ============================================= diff --git a/requirements.txt b/requirements.txt index 4e9f7c5..f6d013e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,4 +6,6 @@ geopandas frost_sta_client google-cloud-storage pytest -urllib3>=2.2.0,<3.0.0 \ No newline at end of file +urllib3>=2.2.2,<3.0.0 +cloud-sql-python-connector[pg8000] +pg8000 \ No newline at end of file From de1afc6d519fbf14143f4a9ba1694620370823ce Mon Sep 17 00:00:00 2001 From: jross Date: Fri, 16 May 2025 13:27:01 -0600 Subject: [PATCH 3/3] refactored --- dagsterio/__init__.py | 34 ++++++++++++++ dagsterio/assets.py | 101 ++++++++++-------------------------------- dagsterio/nmbgmr.py | 71 +++++++++++++++++++++++++++++ 3 files changed, 129 insertions(+), 77 deletions(-) create mode 100644 dagsterio/nmbgmr.py diff --git a/dagsterio/__init__.py b/dagsterio/__init__.py index 28a0970..3ce460e 100644 --- a/dagsterio/__init__.py +++ b/dagsterio/__init__.py @@ -5,6 +5,40 @@ # You may not use this file except in compliance with the License. # You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 # =============================================================================== +from os import getenv +from typing import Callable +from backend.config import Config +from backend.unifier import unify_waterlevels, unify_analytes + +def base_waterlevels_asset(**payload): + _unify(unify_waterlevels, 'waterlevels', payload) + + +def base_analyte_asset(param: str, **payload: object) -> None: + _unify(unify_analytes, param, payload) + + +def _get_geoserver_connection(): + return { 'db': + { + 'dbname': getenv('GEOSERVER_DBNAME'), + 'user': getenv('GEOSERVER_USER'), + 'password': getenv('GEOSERVER_PASSWORD'), + 'instance_connection_name': getenv('GEOSERVER_INSTANCE_CONNECTION_NAME'), + 'cloud_sql': True + } + } + +def _unify(func: Callable[[Config,], None], parameter: str, payload: dict): + payload['yes'] = True + payload['geoserver'] = _get_geoserver_connection() + payload['output_summary'] = True + payload['output_format']= 'geoserver' + config = Config(payload=payload) + config.parameter = parameter + config.finalize() + + func(config) # ============= EOF ============================================= diff --git a/dagsterio/assets.py b/dagsterio/assets.py index 311218c..6456af9 100644 --- a/dagsterio/assets.py +++ b/dagsterio/assets.py @@ -5,23 +5,20 @@ # You may not use this file except in compliance with the License. # You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 # =============================================================================== -import os -from os import getenv -from typing import Callable + import dagster as dg -from backend.config import Config -from backend.logger import setup_logging -from backend.unifier import unify_analytes, unify_waterlevels -from dagsterio.config.source_constants import ALL_SOURCES, NMBGMR_SOURCES +from dagsterio import base_analyte_asset, base_waterlevels_asset +from dagsterio.config.source_constants import ALL_SOURCES +from dagsterio.nmbgmr import tds_request_sensor, nmbgmr_tds, nmbgmr_waterlevels @dg.asset def all_tds(): """TDS asset""" - _analyte( + base_analyte_asset( 'tds', sources=ALL_SOURCES ) @@ -30,89 +27,39 @@ def all_tds(): @dg.asset def all_waterlevels(): """Summary water levels asset""" - _waterlevels( + base_waterlevels_asset( sources=ALL_SOURCES, ) - -@dg.asset -def nmbgmr_waterlevels(): - """NMBGMR water levels asset""" - - _waterlevels( - sources=NMBGMR_SOURCES, - ) - - -@dg.asset -def nmbgmr_tds(): - """NMBGMR TDS asset""" - _analyte( - 'tds', - sources=NMBGMR_SOURCES, - ) - - -def _get_geoserver_connection(): - return { 'db': - { - 'dbname': getenv('GEOSERVER_DBNAME'), - 'user': getenv('GEOSERVER_USER'), - 'password': getenv('GEOSERVER_PASSWORD'), - 'instance_connection_name': getenv('GEOSERVER_INSTANCE_CONNECTION_NAME'), - 'cloud_sql': True - } - } - - -def _waterlevels(**payload): - _unify(unify_waterlevels, 'waterlevels', payload) - - -def _analyte(param: str, **payload): - _unify(unify_analytes, param, payload) - - -def _unify(func: Callable[[Config,], None], parameter: str, payload: dict): - payload['yes'] = True - payload['geoserver'] = _get_geoserver_connection() - payload['output_summary'] = True - payload['output_format']= 'geoserver' - config = Config(payload=payload) - config.parameter = parameter - config.finalize() - - func(config) - - - defs = dg.Definitions( + sensors=[tds_request_sensor], assets=[all_tds, all_waterlevels, nmbgmr_tds, nmbgmr_waterlevels], schedules=[ dg.ScheduleDefinition( - name='tds_schedule', + name='all_tds', target=dg.AssetSelection.keys("all_tds"), - cron_schedule='0 0 * * *', + cron_schedule='0 11 * * *', execution_timezone='America/Denver', ), dg.ScheduleDefinition( - name='waterlevels_schedule', + name='all_waterlevels', target=dg.AssetSelection.keys("all_waterlevels"), - cron_schedule='0 0 * * *', - execution_timezone='America/Denver', - ), - dg.ScheduleDefinition( - name='nmbgmr_tds_schedule', - target=dg.AssetSelection.keys("nmbgmr_tds"), - cron_schedule='0 0 * * *', - execution_timezone='America/Denver', - ), - dg.ScheduleDefinition( - name='nmbgmr_waterlevels_schedule', - target=dg.AssetSelection.keys("nmbgmr_waterlevels"), - cron_schedule='0 0 * * *', + cron_schedule='0 12 * * *', execution_timezone='America/Denver', ), + + # dg.ScheduleDefinition( + # name='nmbgmr_tds', + # target=dg.AssetSelection.keys("nmbgmr_tds"), + # cron_schedule='0 3 * * *', + # execution_timezone='America/Denver', + # ), + # dg.ScheduleDefinition( + # name='nmbgmr_waterlevels', + # target=dg.AssetSelection.keys("nmbgmr_waterlevels"), + # cron_schedule='0 4 * * *', + # execution_timezone='America/Denver', + # ), ], # resources={ # 'config': dg.ResourceDefinition.hardcoded_resource(Config()), diff --git a/dagsterio/nmbgmr.py b/dagsterio/nmbgmr.py new file mode 100644 index 0000000..45b300e --- /dev/null +++ b/dagsterio/nmbgmr.py @@ -0,0 +1,71 @@ +# =============================================================================== +# Author: Jake Ross +# Copyright 2025 New Mexico Bureau of Geology & Mineral Resources +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 +# =============================================================================== +import json + +import dagster as dg +import httpx + +from dagsterio import base_waterlevels_asset, base_analyte_asset +from dagsterio.config.source_constants import NMBGMR_SOURCES + + +@dg.asset +def nmbgmr_waterlevels(): + """NMBGMR water levels asset""" + + base_waterlevels_asset( + sources=NMBGMR_SOURCES, + ) + + +@dg.asset +def nmbgmr_tds(): + """NMBGMR TDS asset""" + base_analyte_asset( + 'tds', + sources=NMBGMR_SOURCES, + ) + + +def get_latest_analyte(param: str, state: dict): + url = 'http://localhost:8009/latest/stats/majorchemistry' + queryparams = {'analyte': param} + resp = httpx.get(url, params=queryparams) + return resp.json().get('count', 0) + + +request_job = dg.define_asset_job( + name='nmbgmr_tds_job', + selection=dg.AssetSelection.assets("nmbgmr_tds"), +) + +@dg.sensor(job=request_job, minimum_interval_seconds=3600) +def tds_request_sensor(context: dg.SensorEvaluationContext): + return analyte_sensor('tds', context) + + +def analyte_sensor(param, context: dg.SensorEvaluationContext): + if context.cursor: + return None + + previous_state = json.loads(context.cursor) if context.cursor else {} + current_state = {} + runs = [] + + latest = get_latest_analyte(param, previous_state) + if latest: + key = f'latest_{param}' + current_state[key] = latest + if latest > previous_state.get(key, 0): + runs.append(dg.RunRequest(run_key=param)) + + return dg.SensorResult( + run_requests=runs, cursor=json.dumps(current_state) + ) + +# ============= EOF =============================================