Skip to content

[WIP] jir-dagster #55

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: pre-production
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ frontend/requirements.txt
frontend/output_timeseries
backend/test00112233.csv
frontend/api/cache
dagsterio/config/*.dev.yaml
dagsterio/dagster_home

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
2 changes: 2 additions & 0 deletions backend/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,8 @@ def _update_output_units(self):
parameter = self.parameter.lower()
if parameter == "ph":
self.analyte_output_units = ""
elif parameter == "waterlevels":
self.analyte_output_units = "dtwbgs (ft)"

@property
def start_dt(self):
Expand Down
176 changes: 17 additions & 159 deletions backend/persisters/geoserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,104 +5,25 @@
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
# ===============================================================================
import json
import os
import time
from itertools import groupby
from typing import Callable

import psycopg2
from shapely.geometry.multipoint import MultiPoint
from shapely.geometry.point import Point
from sqlalchemy.dialects.postgresql import JSONB, insert
from sqlalchemy.orm import declarative_base, sessionmaker, relationship
from sqlalchemy.dialects.postgresql import insert

from backend.persister import BasePersister

from sqlalchemy import Column, ForeignKey, create_engine, UUID, String, Integer, Float, Date, Time
from geoalchemy2 import Geometry

Base = declarative_base()
# dbname=db.get('dbname'),
# user=db.get('user'),
# password=db.get('password'),
# host=db.get('host'),
# port=db.get('port'),
def session_factory(connection: dict):
user = connection.get("user", "postgres")
password = connection.get("password", "")
host = connection.get("host", "localhost")
port = connection.get("port", 5432)
database = connection.get("dbname", "gis")

url = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}"
engine = create_engine(url)
SessionFactory = sessionmaker(autocommit=False, autoflush=False, bind=engine)
return SessionFactory


class Location(Base):
__tablename__ = "tbl_location"

id = Column(Integer, primary_key=True, index=True)
name = Column(String)
data_source_uid = Column(String, index=True)

properties = Column(JSONB)
geometry = Column(Geometry(geometry_type="POINT", srid=4326))
source_slug = Column(String, ForeignKey("tbl_sources.name"))

source = relationship("Sources", backref="locations")


class Summary(Base):
__tablename__ = "tbl_summary"

id = Column(Integer, primary_key=True, index=True)
name = Column(String)
data_source_uid = Column(String, index=True)

properties = Column(JSONB)
geometry = Column(Geometry(geometry_type="POINT", srid=4326))
source_slug = Column(String, ForeignKey("tbl_sources.name"))
parameter_slug = Column(String, ForeignKey("tbl_parameters.name"))

source = relationship("Sources", backref="summaries")

value = Column(Float)
nrecords = Column(Integer)
min = Column(Float)
max = Column(Float)
mean = Column(Float)

latest_value = Column(Float)
latest_date = Column(Date)
latest_time = Column(Time)

earliest_value = Column(Float)
earliest_date = Column(Date)
earliest_time = Column(Time)


class Parameters(Base):
__tablename__ = "tbl_parameters"
name = Column(String, primary_key=True, index=True)
units = Column(String)


class Sources(Base):
__tablename__ = "tbl_sources"
id = Column(Integer)
name = Column(String, primary_key=True, index=True)
convex_hull = Column(Geometry(geometry_type="POLYGON", srid=4326))

from backend.persisters.geoserver_db_models import session_factory, Location, Summary, Parameters, Sources

class GeoServerPersister(BasePersister):
def __init__(self, *args, **kwargs):
super(GeoServerPersister, self).__init__(*args, **kwargs)
self._connection = None
self._connect()

def dump_sites(self, path: str):
def dump_sites(self, path: str = None):
if self.sites:
db = self.config.get('geoserver').get('db')
dbname = db.get('db_name')
Expand All @@ -111,7 +32,7 @@ def dump_sites(self, path: str):
else:
self.log("no sites to dump", fg="red")

def dump_summary(self, path: str):
def dump_summary(self, path: str = None):
if self.records:
db = self.config.get('geoserver').get('db')
dbname = db.get('db_name')
Expand Down Expand Up @@ -166,7 +87,6 @@ def _write_parameters(self):
sql = insert(Parameters).values([{"name": self.config.parameter,
"units": self.config.analyte_output_units}]).on_conflict_do_nothing(
index_elements=[Parameters.name],)
print(sql)
conn.execute(sql)
conn.commit()

Expand Down Expand Up @@ -207,19 +127,6 @@ def make_stmt(chunk):

self._chunk_insert(make_stmt, records)

def _chunk_insert(self, make_stmt, records: list, chunk_size: int = 10):
for i in range(0, len(records), chunk_size):
chunk = records[i:i + chunk_size]
print(f"Writing chunk {i // chunk_size + 1} of {len(records) // chunk_size + 1}")
st = time.time()

stmt = make_stmt(chunk)
with self._connection as conn:
conn.execute(stmt)
conn.commit()

print('Chunk write time:', time.time() - st)

def _write_to_sites(self, records: list):
"""
Write records to a PostgreSQL database in optimized chunks.
Expand Down Expand Up @@ -250,66 +157,17 @@ def make_stmt(chunk):

self._chunk_insert(make_stmt, records, chunk_size)

#
# newrecords = []
# records = sorted(records, key=lambda r: str(r.id))
# for name, gs in groupby(records, lambda r: str(r.id)):
# gs = list(gs)
# n = len(gs)
# # print(f"Writing {n} records for {name}")
# if n>1:
# if n > len({r.source for r in gs}):
# print("Duplicate source name found. Skipping...", name, [(r.name, r.source) for r in gs])
# continue
# newrecords.extend(gs)
# # break
# # pass
# # print("Duplicate source name found. Skipping...", name, [r.source for r in gs])
# # break
#
#
# for i in range(0, len(newrecords), chunk_size):
# chunk = newrecords[i:i + chunk_size]
# print(f"Writing chunk {i // chunk_size + 1} of {len(records) // chunk_size + 1}")
# st = time.time()
#
# values = [
# {
# "name": record.name,
# "data_source_uid": record.id,
# "properties": record.to_dict(keys),
# "geometry": f"SRID=4326;POINT({record.longitude} {record.latitude})",
# "source_slug": record.source,
# }
# for record in chunk
# ]
#
# # stmt = insert(Location).values(values).on_conflict_do_nothing()
# linsert = insert(Location)
# stmt = linsert.values(values).on_conflict_do_update(
# index_elements=[Location.data_source_uid],
# set_={"properties": linsert.excluded.properties}
# )
#
# with self._connection as conn:
# conn.execute(stmt)
# conn.commit()
#
# print('Chunk write time:', time.time() - st)
def _chunk_insert(self, make_stmt: Callable, records: list, chunk_size: int = 10):
for i in range(0, len(records), chunk_size):
chunk = records[i:i + chunk_size]
print(f"Writing chunk {i // chunk_size + 1} of {len(records) // chunk_size + 1}")
st = time.time()

stmt = make_stmt(chunk)
with self._connection as conn:
conn.execute(stmt)
conn.commit()

print('Chunk write time:', time.time() - st)

# # Pre-serialize properties to reduce processing time
# values = [
# (record.name, json.dumps(record.to_dict(keys)), record.longitude, record.latitude, record.source)
# for record in chunk
# ]
#
# with self._connection.cursor() as cursor:
# sql = """INSERT INTO public.tbl_location (name, properties, geometry, source_slug)
# VALUES (%s, %s, public.ST_SetSRID(public.ST_MakePoint(%s, %s), 4326), %s)
# ON CONFLICT (name) DO UPDATE SET properties = EXCLUDED.properties;"""
# cursor.executemany(sql, values)
#
# self._connection.commit() # Commit once per chunk
# print('Chunk write time:', time.time() - st)
# break
# ============= EOF =============================================
100 changes: 100 additions & 0 deletions backend/persisters/geoserver_db_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# ===============================================================================
# Author: Jake Ross
# Copyright 2025 New Mexico Bureau of Geology & Mineral Resources
# Licensed under the Apache License, Version 2.0 (the "License");
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
# ===============================================================================
from geoalchemy2 import Geometry
from google.cloud.sql.connector import Connector
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, Float, Date, Time
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import declarative_base, sessionmaker, relationship

Base = declarative_base()


def session_factory(connection: dict):
user = connection.get("user", "postgres")
password = connection.get("password", "")
port = connection.get("port", 5432)
database = connection.get("dbname", "gis")
driver = connection.get("driver", "pg8000")

url = f'postgresql+{driver}://'
if connection.get("cloud_sql"):
connector= Connector()
instance_connection_name = connection.get("instance_connection_name")
print("Connecting to Cloud SQL instance:", instance_connection_name)
def get_conn():
return connector.connect(
instance_connection_name,
'pg8000',
user=user,
password=password,
db=database,
)
engine = create_engine(url, creator=get_conn)
else:
host = connection.get("host", "localhost")
url = f"{url}{user}:{password}@{host}:{port}/{database}"
engine = create_engine(url)

return sessionmaker(autocommit=False, autoflush=False, bind=engine)


class Location(Base):
__tablename__ = "tbl_location"

id = Column(Integer, primary_key=True, index=True)
name = Column(String)
data_source_uid = Column(String, index=True)

properties = Column(JSONB)
geometry = Column(Geometry(geometry_type="POINT", srid=4326))
source_slug = Column(String, ForeignKey("tbl_sources.name"))

source = relationship("Sources", backref="locations")


class Summary(Base):
__tablename__ = "tbl_summary"

id = Column(Integer, primary_key=True, index=True)
name = Column(String)
data_source_uid = Column(String, index=True)

properties = Column(JSONB)
geometry = Column(Geometry(geometry_type="POINT", srid=4326))
source_slug = Column(String, ForeignKey("tbl_sources.name"))
parameter_slug = Column(String, ForeignKey("tbl_parameters.name"))

source = relationship("Sources", backref="summaries")

value = Column(Float)
nrecords = Column(Integer)
min = Column(Float)
max = Column(Float)
mean = Column(Float)

latest_value = Column(Float)
latest_date = Column(Date)
latest_time = Column(Time)

earliest_value = Column(Float)
earliest_date = Column(Date)
earliest_time = Column(Time)


class Parameters(Base):
__tablename__ = "tbl_parameters"
name = Column(String, primary_key=True, index=True)
units = Column(String)


class Sources(Base):
__tablename__ = "tbl_sources"
id = Column(Integer)
name = Column(String, primary_key=True, index=True)
convex_hull = Column(Geometry(geometry_type="POLYGON", srid=4326))
# ============= EOF =============================================
44 changes: 44 additions & 0 deletions dagsterio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# ===============================================================================
# Author: Jake Ross
# Copyright 2025 New Mexico Bureau of Geology & Mineral Resources
# Licensed under the Apache License, Version 2.0 (the "License");
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
# ===============================================================================
from os import getenv
from typing import Callable

from backend.config import Config
from backend.unifier import unify_waterlevels, unify_analytes


def base_waterlevels_asset(**payload):
_unify(unify_waterlevels, 'waterlevels', payload)


def base_analyte_asset(param: str, **payload: object) -> None:
_unify(unify_analytes, param, payload)


def _get_geoserver_connection():
return { 'db':
{
'dbname': getenv('GEOSERVER_DBNAME'),
'user': getenv('GEOSERVER_USER'),
'password': getenv('GEOSERVER_PASSWORD'),
'instance_connection_name': getenv('GEOSERVER_INSTANCE_CONNECTION_NAME'),
'cloud_sql': True
}
}

def _unify(func: Callable[[Config,], None], parameter: str, payload: dict):
payload['yes'] = True
payload['geoserver'] = _get_geoserver_connection()
payload['output_summary'] = True
payload['output_format']= 'geoserver'
config = Config(payload=payload)
config.parameter = parameter
config.finalize()

func(config)
# ============= EOF =============================================
Loading