Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
5af3f0c
chore: update batcher so it doesn't Ack if jobs were not flushed succ…
dennisgsmith Aug 25, 2025
4f74e1a
chore: refactor river queue for greater distinction between inserter …
dennisgsmith Aug 29, 2025
8261a89
Merge branch 'develop' into feat/formula-refactor
dennisgsmith Aug 29, 2025
7bef76c
feat: add custom Point and Window mode functions for new computed tim…
dennisgsmith Aug 29, 2025
76acc2b
Merge branch 'develop' into feat/computed-timeseries-overhaul
dennisgsmith Sep 3, 2025
319f12f
feat: wip; continue to implement chunked expression evaluation
dennisgsmith Sep 5, 2025
db49bff
chore: temporarily remove enqueue jobs to enable basic crud integration
dennisgsmith Sep 5, 2025
2d1439e
feat: create handlers for crud api
dennisgsmith Sep 5, 2025
4be044b
fix: remove body from expression DELETE
dennisgsmith Sep 5, 2025
80b01af
chore: default expression create/update/delete to project admin only
dennisgsmith Sep 5, 2025
861283c
chore: export dbservice logger
dennisgsmith Sep 9, 2025
5bcaa26
feat: wip; implement job processing for full and partial compute even…
dennisgsmith Sep 9, 2025
b52d653
fix: job missing expression id in payload
dennisgsmith Sep 9, 2025
73091ba
fix: timestamping policy name incorrectly referenced "middle", should…
dennisgsmith Sep 9, 2025
9511fa3
fix: incorrectly referenced old package name for custom cel type
dennisgsmith Sep 9, 2025
edff9ae
chore: add oneOf type for ExpressionDTO opts schema
dennisgsmith Sep 9, 2025
01b4536
chore: add completions for available functions and constants
dennisgsmith Sep 10, 2025
faa6f2d
chore: rename endpoint from /autocomplete -> /completions
dennisgsmith Sep 10, 2025
8b0db05
chore: deduplicate info message for reserved keywords
dennisgsmith Sep 10, 2025
c50976e
fix: unneeded slot2var optimiation done prior to compile
dennisgsmith Sep 10, 2025
b4f96a7
fix: incorrect seed data
dennisgsmith Sep 10, 2025
5106316
fix: some incorrect synax in expression completions docs, formatting
dennisgsmith Sep 10, 2025
de04f7b
feat: add expression create/update validation endpoints
dennisgsmith Sep 10, 2025
3fb0429
chore: use protobuf methods for unmarshaling into go type
dennisgsmith Sep 11, 2025
77b3fe5
chore: update completion api to enable easier client-side rendering
dennisgsmith Sep 12, 2025
5ddd83f
Merge branch 'develop' into feat/computed-timeseries-overhaul
dennisgsmith Sep 12, 2025
acb2cc6
chore: add eval package tests for helper functions
dennisgsmith Sep 12, 2025
15a43fb
chore: update logger implementation to use interface
dennisgsmith Sep 12, 2025
6617930
chore!: update dependency injection for EvalSession, use querier and …
dennisgsmith Sep 12, 2025
771939e
chore: fix some logging errors/errors.As issues
dennisgsmith Sep 12, 2025
43d64df
fix: failing tests for expression evaluation caused by incorrect Meas…
dennisgsmith Sep 12, 2025
2482dd8
Merge branch 'develop' into feat/computed-timeseries-overhaul
dennisgsmith Sep 15, 2025
59c07f3
Merge branch 'develop' into feat/computed-timeseries-overhaul
dennisgsmith Sep 15, 2025
2311969
Merge branch 'develop' into feat/computed-timeseries-overhaul
dennisgsmith Sep 19, 2025
6ea3fd5
Merge branch 'develop' into feat/computed-timeseries-overhaul
dennisgsmith Oct 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/cmd/midas-api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func main() {
}
}

func run(ctx context.Context, cfg *config.ApiConfig, l *logger.Logger) (err error) {
func run(ctx context.Context, cfg *config.ApiConfig, l logger.Logger) (err error) {
h, err := handler.NewApi(ctx, cfg, l)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion api/cmd/midas-dcs-loader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func main() {
}
}

func run(ctx context.Context, cfg *config.DcsLoaderConfig, l *logger.Logger) (err error) {
func run(ctx context.Context, cfg *config.DcsLoaderConfig, l logger.Logger) (err error) {
h, err := handler.NewDcsLoader(ctx, cfg, l)
if err != nil {
return err
Expand Down
134 changes: 72 additions & 62 deletions api/cmd/midas-task/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (

"github.com/USACE/instrumentation-api/api/v4/internal/cloud"
"github.com/USACE/instrumentation-api/api/v4/internal/config"
"github.com/USACE/instrumentation-api/api/v4/internal/eval"
"github.com/USACE/instrumentation-api/api/v4/internal/logger"
"github.com/USACE/instrumentation-api/api/v4/internal/pgqueue"
"github.com/USACE/instrumentation-api/api/v4/internal/service"
"github.com/USACE/instrumentation-api/api/v4/internal/worker"
"github.com/google/uuid"
"github.com/riverqueue/river"
)

Expand All @@ -28,7 +28,9 @@ func main() {
}
}

func run(ctx context.Context, cfg *config.TaskConfig, l *logger.Logger) (err error) {
var riverTimeout = 15 * time.Second

func run(ctx context.Context, cfg *config.TaskConfig, l logger.Logger) (err error) {
dbpool, err := service.NewDBPool(ctx, cfg.DBConfig)
if err != nil {
return err
Expand All @@ -42,83 +44,91 @@ func run(ctx context.Context, cfg *config.TaskConfig, l *logger.Logger) (err err
if err != nil {
return err
}
defer func() {
err = errors.Join(err, taskServices.Shutdown(ctx))
}()
defer func() { err = errors.Join(err, taskServices.Shutdown(ctx)) }()

workers := river.NewWorkers()
periodicJobs := make([]*river.PeriodicJob, 0)

alertEventWorker := worker.NewAlertEventWorker(dbservice, taskServices)
river.AddWorker(workers, alertEventWorker)

emailEventWorker := worker.NewEamilEventWorker(dbservice, taskServices)
river.AddWorker(workers, emailEventWorker)

// TODO
// fetchThinglogixScheduleWorker := worker.NewFetchThinglogixScheduleWorker(dbservice, cfg)
// river.AddWorker(workers, fetchThinglogixScheduleWorker)
//
// fetchThinglogixEventWorker := worker.NewFetchThinglogixEventWorker(dbservice, cfg)
// river.AddWorker(workers, fetchThinglogixEventWorker)

// V1 batches submittals every 15 minutes, V2 runs dynamic scheduling per-submittal.
// We need to benchmark the performance of V2 before enabling it by default. It allows
// for higher accuracy scheduling of alerts, but may be more resource intensive.
//
// Note that V2 will also require reqorking how measurement submittals are handled.
// While Evaluation submittals are always done manually, some measurement submittals
// are automated via telemetry, while others are done manually. Instead of the user manually
// assigning a submittal to a measurement upload, we simple check if the measurement exists for
// the given instrument/timeseries/interval. Technically, if we wanted the submittals to behave
// sumilarly, we would need to complete and create a new submittal for each measurement upload,
// which would increase database round trips. Maybe instead, we can make a clearer
// differentation between measurement submittals and evaluation submittals, where we create the
// check for existing measurements and create the next submittal during the scheduled alert check job.
//
if cfg.FeatureFlags.DynamicAlertSchedulerDisabled {
river.AddWorker(workers, worker.NewAlertScheduleWorkerV1(dbservice))
periodicJobs = append(periodicJobs, worker.AlertScheduleV1JobOptions)
} else {
river.AddWorker(workers, worker.NewAlertScheduleWorkerV2(dbservice))
// NewEnv loads custom Measurement type from protobuf
baseEnv, err := eval.NewEnv()
if err != nil {
return fmt.Errorf("failed to create program baseEnv: %w", err)
}

tlgxWorker, err := worker.NewFetchThinglogixScheduleWorker(ctx, dbservice, &cfg.ThinglogixConfig, l)
// set up lru program cache
cache, err := eval.NewProgramCache(1024)
if err != nil {
l.Error(ctx, "failed to initialize thinglogix; skipping worker", "error", err)
} else {
river.AddWorker(workers, tlgxWorker)
periodicJobs = append(periodicJobs, worker.FetchThinglogixJobOptions)
return fmt.Errorf("failed to create program cache: %w", err)
}

rivercfg := &river.Config{
ID: "midas-task__" + uuid.New().String() + "__" + time.Now().Format("2006_01_02T15_04_05_000000"),
Logger: l.Slogger(),
// Need references for in-process bus used for batching
alertEventW := worker.NewAlertEventWorker(dbservice)
exprTsComputeFullEventW := worker.NewEvaluationTimeseriesComputeFullEventWorker(dbservice, baseEnv, cache)
exprTsComputePartialEventW := worker.NewEvaluationTimeseriesComputePartialEventWorker(dbservice, baseEnv, cache)

q, err := pgqueue.NewWorkerClient(ctx, dbpool, &pgqueue.Options{
ClientName: "midas-task",
Logger: l.Slogger(),
Schema: cfg.RiverQueueSchema,
DefaultMaxAttempts: 1,
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: cfg.MaxWorkers},
// worker.HighPriorityQueue: {MaxWorkers: 100},
},
Schema: cfg.RiverQueueSchema,
Workers: workers,
PeriodicJobs: periodicJobs,
ErrorHandler: pgqueue.NewErrorHandler(l),
MaxAttempts: 1,
}

pgq, err := pgqueue.New(ctx, dbpool, rivercfg)
// Register all River workers
RegisterWorkers: func(ws *river.Workers) error {
// Workers that publish into the in-process bus will be injected after creation
// Used for tasks with high throughput
river.AddWorker(ws, alertEventW)
river.AddWorker(ws, exprTsComputePartialEventW)

// Other event workers
river.AddWorker(ws, worker.NewEmailEventWorker(dbservice, taskServices))

// Periodic workers
river.AddWorker(ws, worker.NewAlertScheduleWorkerV1(dbservice))
if tlgx, e := worker.NewFetchThinglogixScheduleWorker(ctx, dbservice, &cfg.ThinglogixConfig, l); e != nil {
l.Error(ctx, "failed to initialize thinglogix; skipping worker", "error", e)
} else {
river.AddWorker(ws, tlgx)
}
return nil
},
Periodic: func(p *pgqueue.Periodics) error {
worker.RegisterAlertScheduleV1(p)
worker.RegisterFetchThinglogixPeriodic(p)
return nil
},
})
if err != nil {
return err
}
dbservice.PGQueue = pgq

// TODO: This isn't a great pattern, if we forget to inject q into dbservice.PGQueue we will panic at runtime for certain tasks.
// Inject q into workers and injected database
dbservice.PGQueue = q
alertEventW.PGQueue = q
exprTsComputeFullEventW.PGQueue = q
exprTsComputePartialEventW.PGQueue = q

// Start River workers
defer func() {
err = errors.Join(err, pgq.Stop(ctx))
err = errors.Join(err, q.Stop(ctx, riverTimeout))
}()

l.Info(ctx, "Starting worker pool (background)...")
if err := pgq.Start(ctx); err != nil {
if err := q.Start(ctx); err != nil {
return fmt.Errorf("error starting riverqueue client: %w", err)
}

l.Info(ctx, "Starting ListenPoolProcess for AlertEventWorker")
return alertEventWorker.ListenPoolProcess(ctx, cfg, l)
// Build in-process routes and run the aggregator (blocks until ctx done or error)
routes := pgqueue.Routes{}
if alertEventW != nil {
k, r := alertEventW.Route()
routes[k] = r
}
if exprTsComputePartialEventW != nil {
k, r := exprTsComputePartialEventW.Route()
routes[k] = r
}

l.Info(ctx, "Starting in-process batch dispatcher...")
bw := pgqueue.NewBatchWorker(cfg, l, q.WorkerBus.Sub, routes)
return bw.Start(ctx)
}
2 changes: 1 addition & 1 deletion api/cmd/midas-telemetry/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func main() {
}
}

func run(ctx context.Context, cfg *config.TelemetryConfig, l *logger.Logger) error {
func run(ctx context.Context, cfg *config.TelemetryConfig, l logger.Logger) error {
h, err := handler.NewTelemetry(ctx, cfg, l)
if err != nil {
return err
Expand Down
36 changes: 24 additions & 12 deletions api/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,41 @@ require (
github.com/danielgtaylor/huma/v2 v2.32.0
github.com/gofrs/uuid v4.4.0+incompatible
github.com/golang-jwt/jwt/v5 v5.2.3
github.com/google/cel-go v0.26.0
github.com/google/uuid v1.6.0
github.com/hashicorp/go-version v1.7.0
github.com/jackc/pgx/v5 v5.7.4
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/jackc/pgx/v5 v5.7.5
github.com/labstack/echo/v4 v4.13.3
github.com/riverqueue/river v0.22.0
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.22.0
github.com/riverqueue/river/rivertype v0.22.0
github.com/stretchr/testify v1.10.0
github.com/riverqueue/river v0.24.0
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.24.0
github.com/riverqueue/river/rivertype v0.24.0
github.com/robfig/cron/v3 v3.0.1
github.com/stretchr/testify v1.11.0
github.com/tidwall/btree v1.7.0
github.com/twpayne/go-geom v1.6.1
github.com/twpayne/pgx-geom v1.0.0
github.com/xeipuuv/gojsonschema v1.2.0
github.com/xnacly/go-iso8601-duration v1.1.0
github.com/xuri/excelize/v2 v2.9.0
gocloud.dev v0.43.0
golang.org/x/crypto v0.40.0
golang.org/x/term v0.33.0
golang.org/x/text v0.27.0
golang.org/x/crypto v0.41.0
golang.org/x/term v0.34.0
golang.org/x/text v0.28.0
google.golang.org/protobuf v1.36.6
)

require (
cel.dev/expr v0.24.0 // indirect
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/aws/aws-sdk-go v1.55.7 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.11 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.70 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.32 // indirect
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.84 // indirect
)

require (
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.36 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.36 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect
Expand Down Expand Up @@ -72,8 +82,9 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/richardlehane/mscfb v1.0.4 // indirect
github.com/richardlehane/msoleps v1.0.4 // indirect
github.com/riverqueue/river/riverdriver v0.22.0 // indirect
github.com/riverqueue/river/rivershared v0.22.0 // indirect
github.com/riverqueue/river/riverdriver v0.24.0 // indirect
github.com/riverqueue/river/rivershared v0.24.0 // indirect
github.com/stoewer/go-strcase v1.2.0 // indirect
github.com/tidwall/gjson v1.18.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
Expand All @@ -91,15 +102,16 @@ require (
go.opentelemetry.io/otel/sdk/metric v1.37.0 // indirect
go.opentelemetry.io/otel/trace v1.37.0 // indirect
go.uber.org/goleak v1.3.0 // indirect
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect
golang.org/x/image v0.27.0 // indirect
golang.org/x/net v0.42.0 // indirect
golang.org/x/sync v0.16.0 // indirect
golang.org/x/sys v0.34.0 // indirect
golang.org/x/sys v0.35.0 // indirect
golang.org/x/time v0.12.0 // indirect
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
google.golang.org/api v0.242.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250715232539-7130f93afb79 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250715232539-7130f93afb79 // indirect
google.golang.org/grpc v1.73.0 // indirect
google.golang.org/protobuf v1.36.6 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading