diff --git a/.env.example b/.env.example index 1329b7af..7334c6fd 100644 --- a/.env.example +++ b/.env.example @@ -1,6 +1,7 @@ # .env used for port and service confiuration # for service specific environment variables, see ./env_files/*.env +# ports API_PORT=8080 TELEMETRY_PORT=9090 LOCALSTACK_GATEWAY_PORT=9000 @@ -8,12 +9,24 @@ LOCALSTACK_UI_PORT=9001 KEYCLOAK_PORT=8090 RIVER_QUEUE_UI_PORT=9326 -INSTRUMENTATION_AUTH_JWT_MOCKED= -INSTRUMENTATION_SURVEY123_IP_WHITELIST= +# api +INSTRUMENTATION_AUTH_JWT_MOCKED=false + +# sl-client SLCLIENT_SEEDLINK_SERVER_URI= + +# task TASK_THINGLOGIX_COGNITO_POOL= TASK_THINGLOGIX_PROVIDER_NAME= TASK_THINGLOGIX_API_GATEWAY_ENDPOINT= TASK_THINGLOGIX_USER= TASK_THINGLOGIX_PASSWORD= TASK_THINGLOGIX_ACCOUNT_ID= + +# opendcs +CDADATA_USERNAME= +CDADATA_PASSWORD= +CDABACKUP_USERNAME= +CDABACKUP_PASSWORD= +EDDN1_USERNAME= +EDDN1_PASSWORD= diff --git a/.gitignore b/.gitignore index dde8a666..8184aa10 100644 --- a/.gitignore +++ b/.gitignore @@ -36,3 +36,5 @@ test.log **/dist go.work.sum + +**/.settings diff --git a/api/internal/db/batch.go b/api/internal/db/batch.go index 8bac8a3d..d4e6174c 100644 --- a/api/internal/db/batch.go +++ b/api/internal/db/batch.go @@ -658,6 +658,111 @@ func (b *EvaluationInstrumentCreateBatchBatchResults) Close() error { return b.br.Close() } +const goesPlatformConfigFileCommit = `-- name: GoesPlatformConfigFileCommit :batchexec +update goes_platform_config_file set + committed=true, + committed_at=$2 +where id=$1 +` + +type GoesPlatformConfigFileCommitBatchResults struct { + br pgx.BatchResults + tot int + closed bool +} + +type GoesPlatformConfigFileCommitParams struct { + ID uuid.UUID `json:"id"` + CommittedAt *time.Time `json:"committed_at"` +} + +func (q *Queries) GoesPlatformConfigFileCommit(ctx context.Context, arg []GoesPlatformConfigFileCommitParams) *GoesPlatformConfigFileCommitBatchResults { + batch := &pgx.Batch{} + for _, a := range arg { + vals := []interface{}{ + a.ID, + a.CommittedAt, + } + batch.Queue(goesPlatformConfigFileCommit, vals...) + } + br := q.db.SendBatch(ctx, batch) + return &GoesPlatformConfigFileCommitBatchResults{br, len(arg), false} +} + +func (b *GoesPlatformConfigFileCommitBatchResults) Exec(f func(int, error)) { + defer b.br.Close() + for t := 0; t < b.tot; t++ { + if b.closed { + if f != nil { + f(t, ErrBatchAlreadyClosed) + } + continue + } + _, err := b.br.Exec() + if f != nil { + f(t, err) + } + } +} + +func (b *GoesPlatformConfigFileCommitBatchResults) Close() error { + b.closed = true + return b.br.Close() +} + +const goesTelemetryConfigMappingsCreateBatch = `-- name: GoesTelemetryConfigMappingsCreateBatch :batchexec +insert into goes_telemetry_config_mappings (goes_platform_config_file_id, platform_sensor_key, timeseries_id) +values ($1, $2, $3) +on conflict on constraint unique_goes_platform_config_file_id_platform_sensor_key do nothing +` + +type GoesTelemetryConfigMappingsCreateBatchBatchResults struct { + br pgx.BatchResults + tot int + closed bool +} + +type GoesTelemetryConfigMappingsCreateBatchParams struct { + GoesPlatformConfigFileID uuid.UUID `json:"goes_platform_config_file_id"` + PlatformSensorKey string `json:"platform_sensor_key"` + TimeseriesID *uuid.UUID `json:"timeseries_id"` +} + +func (q *Queries) GoesTelemetryConfigMappingsCreateBatch(ctx context.Context, arg []GoesTelemetryConfigMappingsCreateBatchParams) *GoesTelemetryConfigMappingsCreateBatchBatchResults { + batch := &pgx.Batch{} + for _, a := range arg { + vals := []interface{}{ + a.GoesPlatformConfigFileID, + a.PlatformSensorKey, + a.TimeseriesID, + } + batch.Queue(goesTelemetryConfigMappingsCreateBatch, vals...) + } + br := q.db.SendBatch(ctx, batch) + return &GoesTelemetryConfigMappingsCreateBatchBatchResults{br, len(arg), false} +} + +func (b *GoesTelemetryConfigMappingsCreateBatchBatchResults) Exec(f func(int, error)) { + defer b.br.Close() + for t := 0; t < b.tot; t++ { + if b.closed { + if f != nil { + f(t, ErrBatchAlreadyClosed) + } + continue + } + _, err := b.br.Exec() + if f != nil { + f(t, err) + } + } +} + +func (b *GoesTelemetryConfigMappingsCreateBatchBatchResults) Close() error { + b.closed = true + return b.br.Close() +} + const inclOptsCreateBatch = `-- name: InclOptsCreateBatch :batchexec insert into incl_opts (instrument_id, num_segments, bottom_elevation_timeseries_id, initial_time) values ($1, $2, $3, $4) diff --git a/api/internal/db/goes.sql_gen.go b/api/internal/db/goes.sql_gen.go new file mode 100644 index 00000000..4349372c --- /dev/null +++ b/api/internal/db/goes.sql_gen.go @@ -0,0 +1,170 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.29.0 +// source: goes.sql + +package db + +import ( + "context" + "time" + + "github.com/google/uuid" +) + +const goesPlatformConfigFileCreate = `-- name: GoesPlatformConfigFileCreate :one +insert into goes_platform_config_file (goes_telemetry_source_id, project_id, name, alias, size_bytes, content, created_by) +values ($1, $2, $3, $4, $5, $6::xml, $7) +returning id +` + +type GoesPlatformConfigFileCreateParams struct { + GoesTelemetrySourceID uuid.UUID `json:"goes_telemetry_source_id"` + ProjectID uuid.UUID `json:"project_id"` + Name string `json:"name"` + Alias string `json:"alias"` + SizeBytes int64 `json:"size_bytes"` + Content string `json:"content"` + CreatedBy uuid.UUID `json:"created_by"` +} + +func (q *Queries) GoesPlatformConfigFileCreate(ctx context.Context, arg GoesPlatformConfigFileCreateParams) (uuid.UUID, error) { + row := q.db.QueryRow(ctx, goesPlatformConfigFileCreate, + arg.GoesTelemetrySourceID, + arg.ProjectID, + arg.Name, + arg.Alias, + arg.SizeBytes, + arg.Content, + arg.CreatedBy, + ) + var id uuid.UUID + err := row.Scan(&id) + return id, err +} + +const goesPlatformConfigFileDelete = `-- name: GoesPlatformConfigFileDelete :exec +delete from goes_platform_config_file where id=$1 +` + +func (q *Queries) GoesPlatformConfigFileDelete(ctx context.Context, id uuid.UUID) error { + _, err := q.db.Exec(ctx, goesPlatformConfigFileDelete, id) + return err +} + +const goesPlatformConfigFileGet = `-- name: GoesPlatformConfigFileGet :one +select id, goes_telemetry_source_id, project_id, name, alias, size_bytes, content, committed, committed_at, created_at, created_by, updated_at, updated_by from goes_platform_config_file where id=$1 +` + +func (q *Queries) GoesPlatformConfigFileGet(ctx context.Context, id uuid.UUID) (GoesPlatformConfigFile, error) { + row := q.db.QueryRow(ctx, goesPlatformConfigFileGet, id) + var i GoesPlatformConfigFile + err := row.Scan( + &i.ID, + &i.GoesTelemetrySourceID, + &i.ProjectID, + &i.Name, + &i.Alias, + &i.SizeBytes, + &i.Content, + &i.Committed, + &i.CommittedAt, + &i.CreatedAt, + &i.CreatedBy, + &i.UpdatedAt, + &i.UpdatedBy, + ) + return i, err +} + +const goesPlatformConfigFileUpdate = `-- name: GoesPlatformConfigFileUpdate :exec +update goes_platform_config_file set + name=$1, + alias=$2, + size_bytes=$3, + content=$4::xml, + committed=false, + updated_at=$5, + updated_by=$6 +where id=$7 +` + +type GoesPlatformConfigFileUpdateParams struct { + Name string `json:"name"` + Alias string `json:"alias"` + SizeBytes int64 `json:"size_bytes"` + Content string `json:"content"` + UpdatedAt *time.Time `json:"updated_at"` + UpdatedBy *uuid.UUID `json:"updated_by"` + ID uuid.UUID `json:"id"` +} + +func (q *Queries) GoesPlatformConfigFileUpdate(ctx context.Context, arg GoesPlatformConfigFileUpdateParams) error { + _, err := q.db.Exec(ctx, goesPlatformConfigFileUpdate, + arg.Name, + arg.Alias, + arg.SizeBytes, + arg.Content, + arg.UpdatedAt, + arg.UpdatedBy, + arg.ID, + ) + return err +} + +const goesTelemetryConfigMappingsDeleteForGoesPlatformConfigFile = `-- name: GoesTelemetryConfigMappingsDeleteForGoesPlatformConfigFile :exec +delete from goes_telemetry_config_mappings where goes_platform_config_file_id=$1 +` + +func (q *Queries) GoesTelemetryConfigMappingsDeleteForGoesPlatformConfigFile(ctx context.Context, goesPlatformConfigFileID uuid.UUID) error { + _, err := q.db.Exec(ctx, goesTelemetryConfigMappingsDeleteForGoesPlatformConfigFile, goesPlatformConfigFileID) + return err +} + +const goesTelemetryConfigMappingsList = `-- name: GoesTelemetryConfigMappingsList :many +select goes_platform_config_file_id, platform_sensor_key, timeseries_id from goes_telemetry_config_mappings where goes_platform_config_file_id=$1 +` + +func (q *Queries) GoesTelemetryConfigMappingsList(ctx context.Context, goesPlatformConfigFileID uuid.UUID) ([]GoesTelemetryConfigMappings, error) { + rows, err := q.db.Query(ctx, goesTelemetryConfigMappingsList, goesPlatformConfigFileID) + if err != nil { + return nil, err + } + defer rows.Close() + items := []GoesTelemetryConfigMappings{} + for rows.Next() { + var i GoesTelemetryConfigMappings + if err := rows.Scan(&i.GoesPlatformConfigFileID, &i.PlatformSensorKey, &i.TimeseriesID); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const goesTelemetrySourceList = `-- name: GoesTelemetrySourceList :many +select id, name, files from v_goes_telemetry_source +` + +func (q *Queries) GoesTelemetrySourceList(ctx context.Context) ([]VGoesTelemetrySource, error) { + rows, err := q.db.Query(ctx, goesTelemetrySourceList) + if err != nil { + return nil, err + } + defer rows.Close() + items := []VGoesTelemetrySource{} + for rows.Next() { + var i VGoesTelemetrySource + if err := rows.Scan(&i.ID, &i.Name, &i.Files); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} diff --git a/api/internal/db/models.go b/api/internal/db/models.go index 9280dab3..1558b940 100644 --- a/api/internal/db/models.go +++ b/api/internal/db/models.go @@ -643,6 +643,33 @@ type EvaluationInstrument struct { InstrumentID *uuid.UUID `json:"instrument_id"` } +type GoesPlatformConfigFile struct { + ID uuid.UUID `json:"id"` + GoesTelemetrySourceID uuid.UUID `json:"goes_telemetry_source_id"` + ProjectID uuid.UUID `json:"project_id"` + Name string `json:"name"` + Alias string `json:"alias"` + SizeBytes int64 `json:"size_bytes"` + Content string `json:"content"` + Committed bool `json:"committed"` + CommittedAt *time.Time `json:"committed_at"` + CreatedAt time.Time `json:"created_at"` + CreatedBy uuid.UUID `json:"created_by"` + UpdatedAt *time.Time `json:"updated_at"` + UpdatedBy *uuid.UUID `json:"updated_by"` +} + +type GoesTelemetryConfigMappings struct { + GoesPlatformConfigFileID uuid.UUID `json:"goes_platform_config_file_id"` + PlatformSensorKey string `json:"platform_sensor_key"` + TimeseriesID *uuid.UUID `json:"timeseries_id"` +} + +type GoesTelemetrySource struct { + ID uuid.UUID `json:"id"` + Name string `json:"name"` +} + type Heartbeat struct { Time time.Time `json:"time"` } @@ -1414,6 +1441,12 @@ type VEvaluation struct { Instruments []InstrumentIDName `json:"instruments"` } +type VGoesTelemetrySource struct { + ID uuid.UUID `json:"id"` + Name string `json:"name"` + Files []VGoesTelemetrySourceFiles `json:"files"` +} + type VInclMeasurement struct { InstrumentID uuid.UUID `json:"instrument_id"` Time time.Time `json:"time"` diff --git a/api/internal/db/overrides.go b/api/internal/db/overrides.go index 56c1ef66..9981831c 100644 --- a/api/internal/db/overrides.go +++ b/api/internal/db/overrides.go @@ -90,6 +90,19 @@ type IDSlugName struct { Name string `json:"name"` } +type IDName struct { + ID uuid.UUID `json:"id"` + Name string `json:"name"` +} + +type VGoesTelemetrySourceFiles struct { + IDName + ProjectID uuid.UUID `json:"project_id"` + Alias string `json:"alias"` + SizeBytes int64 `json:"size_bytes"` + Committed bool `json:"committed"` +} + type InstrumentIDName struct { InstrumentID uuid.UUID `json:"instrument_id"` InstrumentName string `json:"instrument_name"` diff --git a/api/internal/db/querier.go b/api/internal/db/querier.go index bc84971f..7add110e 100644 --- a/api/internal/db/querier.go +++ b/api/internal/db/querier.go @@ -116,6 +116,15 @@ type Querier interface { EvaluationListForProject(ctx context.Context, projectID uuid.UUID) ([]VEvaluation, error) EvaluationListForProjectAlertConfig(ctx context.Context, arg EvaluationListForProjectAlertConfigParams) ([]VEvaluation, error) EvaluationUpdate(ctx context.Context, arg EvaluationUpdateParams) error + GoesPlatformConfigFileCommit(ctx context.Context, arg []GoesPlatformConfigFileCommitParams) *GoesPlatformConfigFileCommitBatchResults + GoesPlatformConfigFileCreate(ctx context.Context, arg GoesPlatformConfigFileCreateParams) (uuid.UUID, error) + GoesPlatformConfigFileDelete(ctx context.Context, id uuid.UUID) error + GoesPlatformConfigFileGet(ctx context.Context, id uuid.UUID) (GoesPlatformConfigFile, error) + GoesPlatformConfigFileUpdate(ctx context.Context, arg GoesPlatformConfigFileUpdateParams) error + GoesTelemetryConfigMappingsCreateBatch(ctx context.Context, arg []GoesTelemetryConfigMappingsCreateBatchParams) *GoesTelemetryConfigMappingsCreateBatchBatchResults + GoesTelemetryConfigMappingsDeleteForGoesPlatformConfigFile(ctx context.Context, goesPlatformConfigFileID uuid.UUID) error + GoesTelemetryConfigMappingsList(ctx context.Context, goesPlatformConfigFileID uuid.UUID) ([]GoesTelemetryConfigMappings, error) + GoesTelemetrySourceList(ctx context.Context) ([]VGoesTelemetrySource, error) HeartbeatCreate(ctx context.Context, argTime time.Time) (time.Time, error) HeartbeatGetLatest(ctx context.Context) (time.Time, error) HeartbeatList(ctx context.Context, resultLimit int32) ([]time.Time, error) diff --git a/api/internal/dto/goes.go b/api/internal/dto/goes.go new file mode 100644 index 00000000..79e3df69 --- /dev/null +++ b/api/internal/dto/goes.go @@ -0,0 +1,17 @@ +package dto + +import ( + "time" + + "github.com/google/uuid" +) + +type GoesTelemetryConfigMappingDTO struct { + PlatformSensorKey string `json:"platform_sensor_key"` + TimeseriesID *uuid.UUID `json:"timeseries_id" required:"false"` +} + +type GoesPlatformConfigFileCommitDTO struct { + ID uuid.UUID `json:"id"` + CommittedAt time.Time `json:"committed_at"` +} diff --git a/api/internal/handler/goes.go b/api/internal/handler/goes.go new file mode 100644 index 00000000..0872f393 --- /dev/null +++ b/api/internal/handler/goes.go @@ -0,0 +1,255 @@ +package handler + +import ( + "context" + "errors" + "io" + "net/http" + "time" + + "github.com/USACE/instrumentation-api/api/v4/internal/ctxkey" + "github.com/USACE/instrumentation-api/api/v4/internal/db" + "github.com/USACE/instrumentation-api/api/v4/internal/dto" + "github.com/USACE/instrumentation-api/api/v4/internal/httperr" + "github.com/USACE/instrumentation-api/api/v4/internal/service" + "github.com/danielgtaylor/huma/v2" +) + +var goesTags = []string{"GOES Telemetry"} + +type TelemetrySourceIDParam struct { + TelemetrySourceID UUID `path:"telemetry_source_id"` +} + +type TelemetryConfigIDParam struct { + TelemetryConfigID UUID `path:"telemetry_config_id"` +} + +func (h *ApiHandler) RegisterGoesTelemetry(api huma.API) { + huma.Register(api, huma.Operation{ + Middlewares: h.Public, + OperationID: "goes-telemetry-client-list", + Method: http.MethodGet, + Path: "/domains/goes", + Description: "list of goes client instances (opendcs)", + Tags: goesTags, + }, func(ctx context.Context, input *struct { + }) (*Response[[]db.VGoesTelemetrySource], error) { + aa, err := h.DBService.GoesTelemetrySourceList(ctx) + if err != nil { + return nil, httperr.InternalServerError(err) + } + return NewResponse(aa), nil + }) + + huma.Register(api, huma.Operation{ + Middlewares: h.Public, + OperationID: "goes-telemetry-config-get", + Method: http.MethodGet, + Path: "/projects/{project_id}/goes/{telemetry_source_id}/configs/{telemetry_config_id}", + Description: "gets a platform configuraiton xml file", + Tags: goesTags, + }, func(ctx context.Context, input *struct { + ProjectIDParam + TelemetrySourceIDParam + TelemetryConfigIDParam + }) (*Response[db.GoesPlatformConfigFile], error) { + a, err := h.DBService.GoesPlatformConfigFileGet(ctx, input.TelemetryConfigID.UUID) + if err != nil { + return nil, httperr.InternalServerError(err) + } + return NewResponse(a), nil + }) + + type XmlPlatformConfig struct { + PlatformConfig huma.FormFile `form:"file" contentType:"text/xml" required:"true"` + Alias string `form:"alias"` + DryRun bool `form:"dry_run"` + DeleteOldMappings bool `form:"delete_old_mappings"` + } + + huma.Register(api, huma.Operation{ + Middlewares: h.ProjectAdmin, + OperationID: "goes-telemetry-config-create", + Method: http.MethodPost, + Path: "/projects/{project_id}/goes/{telemetry_source_id}", + Description: "create a goes telemetry configuration", + Tags: goesTags, + }, func(ctx context.Context, input *struct { + ProjectIDParam + TelemetrySourceIDParam + RawBody huma.MultipartFormFiles[XmlPlatformConfig] + }) (*Response[service.DbImportResponse], error) { + p := ctx.Value(ctxkey.Profile).(db.VProfile) + formData := input.RawBody.Data() + xmlDoc, err := io.ReadAll(formData.PlatformConfig) + if err != nil { + return nil, httperr.BadRequest(err) + } + if xmlDoc == nil { + return nil, httperr.BadRequest(errors.New("uploaded file is empty")) + } + alias := formData.Alias + if alias == "" { + alias = formData.PlatformConfig.Filename + } + a, err := h.DBService.GoesPlatformConfigFileCreate(ctx, db.GoesPlatformConfigFileCreateParams{ + GoesTelemetrySourceID: input.TelemetrySourceID.UUID, + ProjectID: input.ProjectID.UUID, + Name: formData.PlatformConfig.Filename, + SizeBytes: formData.PlatformConfig.Size, + Alias: alias, + Content: string(xmlDoc), + CreatedBy: p.ID, + }, formData.DryRun) + if err != nil { + return nil, httperr.InternalServerError(err) + } + return NewResponse(a), nil + }) + + huma.Register(api, huma.Operation{ + Middlewares: h.ProjectAdmin, + OperationID: "goes-telemetry-config-update", + Method: http.MethodPut, + Path: "/projects/{project_id}/goes/{telemetry_source_id}/configs/{telemetry_config_id}", + Description: "lists goes telemetry configurations", + Tags: goesTags, + }, func(ctx context.Context, input *struct { + ProjectIDParam + TelemetrySourceIDParam + TelemetryConfigIDParam + RawBody huma.MultipartFormFiles[XmlPlatformConfig] + }) (*Response[service.DbImportResponse], error) { + p := ctx.Value(ctxkey.Profile).(db.VProfile) + formData := input.RawBody.Data() + xmlDoc, err := io.ReadAll(formData.PlatformConfig) + if err != nil { + return nil, httperr.BadRequest(err) + } + if xmlDoc == nil { + return nil, httperr.BadRequest(errors.New("uploaded file is empty")) + } + now := time.Now().UTC() + alias := formData.Alias + if alias == "" { + alias = formData.PlatformConfig.Filename + } + a, err := h.DBService.GoesPlatformConfigFileUpdate(ctx, db.GoesPlatformConfigFileUpdateParams{ + ID: input.TelemetryConfigID.UUID, + Name: formData.PlatformConfig.Filename, + Alias: alias, + SizeBytes: formData.PlatformConfig.Size, + Content: string(xmlDoc), + UpdatedBy: &p.ID, + UpdatedAt: &now, + }, formData.DryRun, formData.DeleteOldMappings) + if err != nil { + return nil, httperr.InternalServerError(err) + } + return NewResponse(a), nil + }) + + huma.Register(api, huma.Operation{ + Middlewares: h.ProjectAdmin, + OperationID: "goes-telemetry-config-delete", + Method: http.MethodDelete, + Path: "/projects/{project_id}/goes/{telemetry_source_id}/configs/{telemetry_config_id}", + Description: "delete a goes telemetry configuration", + Tags: goesTags, + }, func(ctx context.Context, input *struct { + ProjectIDParam + TelemetrySourceIDParam + TelemetryConfigIDParam + }) (*Response[struct{}], error) { + if err := h.DBService.GoesPlatformConfigFileDelete(ctx, input.TelemetryConfigID.UUID); err != nil { + return nil, httperr.InternalServerError(err) + } + return nil, nil + }) + + huma.Register(api, huma.Operation{ + Middlewares: h.Public, + OperationID: "goes-telemetry-config-mapping-list", + Method: http.MethodGet, + Path: "/projects/{project_id}/goes/{telemetry_source_id}/configs/{telemetry_config_id}/mappings", + Description: "lists goes telemetry timeseries mappings", + Tags: goesTags, + }, func(ctx context.Context, input *struct { + ProjectIDParam + TelemetrySourceIDParam + TelemetryConfigIDParam + }) (*Response[[]db.GoesTelemetryConfigMappings], error) { + aa, err := h.DBService.GoesTelemetryConfigMappingsList(ctx, input.TelemetryConfigID.UUID) + if err != nil { + return nil, httperr.InternalServerError(err) + } + return NewResponse(aa), nil + }) + + huma.Register(api, huma.Operation{ + Middlewares: h.ProjectAdmin, + OperationID: "goes-update-mappings", + Method: http.MethodPut, + Path: "/projects/{project_id}/goes/{telemetry_source_id}/configs/{telemetry_config_id}/mappings", + Description: "updates goes telemetry timeseries mappings", + Tags: goesTags, + }, func(ctx context.Context, input *struct { + ProjectIDParam + TelemetrySourceIDParam + TelemetryConfigIDParam + Body []dto.GoesTelemetryConfigMappingDTO + }) (*Response[struct{}], error) { + if err := h.DBService.GoesTelemetryConfigMappingsUpdate(ctx, input.TelemetryConfigID.UUID, input.Body); err != nil { + return nil, httperr.InternalServerError(err) + } + return nil, nil + }) + + huma.Register(api, huma.Operation{ + Middlewares: h.ProjectAdmin, + OperationID: "goes-telemetry-validate", + Method: http.MethodPut, + Path: "/projects/{project_id}/goes/{telemetry_source_id}/validate", + Description: "updates goes telemetry timeseries mappings", + Tags: goesTags, + }, func(ctx context.Context, input *struct { + ProjectIDParam + TelemetrySourceIDParam + }) (*Response[service.DbImportResponse], error) { + // TODO + return NewResponse(service.DbImportResponse{}), nil + }) + + huma.Register(api, huma.Operation{ + Middlewares: h.ProjectAdmin, + OperationID: "goes-telemetry-commit", + Method: http.MethodPost, + Path: "/projects/{project_id}/goes/{telemetry_source_id}/commit", + Description: "starts a commit action to update an opendcs routescheduler", + Tags: goesTags, + }, func(ctx context.Context, input *struct { + ProjectIDParam + TelemetrySourceIDParam + }) (*Response[struct{}], error) { + // TODO + return nil, nil + }) + + huma.Register(api, huma.Operation{ + Middlewares: h.InternalApp, + OperationID: "goes-telemetry-commit-callback", + Method: http.MethodPost, + Path: "/callback/goes/{telemetry_source_id}/commit", + Description: "callback to update API DB state after OpenDCS wrapper commit completes", + Tags: goesTags, + }, func(ctx context.Context, input *struct { + TelemetrySourceIDParam + Body []dto.GoesPlatformConfigFileCommitDTO + }) (*struct{}, error) { + if err := h.DBService.GoesPlatformConfigCommit(ctx, input.Body); err != nil { + return nil, httperr.InternalServerError(err) + } + return nil, nil + }) +} diff --git a/api/internal/service/goes.go b/api/internal/service/goes.go new file mode 100644 index 00000000..b1542039 --- /dev/null +++ b/api/internal/service/goes.go @@ -0,0 +1,284 @@ +package service + +import ( + "bytes" + "context" + "encoding/json" + "encoding/xml" + "errors" + "fmt" + "io" + "net/http" + "strings" + "time" + + "github.com/USACE/instrumentation-api/api/v4/internal/db" + "github.com/USACE/instrumentation-api/api/v4/internal/dto" + "github.com/google/uuid" +) + +type Platform struct { + XMLName xml.Name `xml:"Platform"` + PlatformConfig PlatformConfig `xml:"PlatformConfig"` +} + +type PlatformConfig struct { + ConfigSensors []ConfigSensor `xml:"ConfigSensor"` +} + +type ConfigSensor struct { + SensorName string `xml:"SensorName"` + SensorNumber string `xml:"SensorNumber"` +} + +type DbImportCommandType string + +type DbImportResponse struct { + PlatformFileID *uuid.UUID `json:"platform_file_id,omitempty"` + Response json.RawMessage `json:"response"` +} + +// GoesPlatformConfigFileCreate validates and creates a platform configuration file for a given MIDAS project +func (s *DBService) GoesPlatformConfigFileCreate(ctx context.Context, arg db.GoesPlatformConfigFileCreateParams, dryRun bool) (DbImportResponse, error) { + names, err := extractSensorNames(arg.Content) + if err != nil { + return DbImportResponse{}, err + } + + // TODO: proxy request to opendcs service to validate dbimport + var a DbImportResponse + // http.Get... + + if dryRun { + return a, nil + } + + tx, err := s.db.Begin(ctx) + if err != nil { + return a, err + } + defer s.TxDo(ctx, tx.Rollback) + qtx := s.WithTx(tx) + + newID, err := qtx.GoesPlatformConfigFileCreate(ctx, arg) + if err != nil { + return a, fmt.Errorf("GoesPlatformConfigFileCreate %w", err) + } + a.PlatformFileID = &newID + + mm := make([]db.GoesTelemetryConfigMappingsCreateBatchParams, 0, len(names)) + for _, n := range names { + mm = append(mm, db.GoesTelemetryConfigMappingsCreateBatchParams{ + GoesPlatformConfigFileID: newID, + PlatformSensorKey: n, + TimeseriesID: nil, + }) + } + + qtx.GoesTelemetryConfigMappingsCreateBatch(ctx, mm).Exec(batchExecErr(&err)) + if err != nil { + return a, fmt.Errorf("GoesTelemetryConfigMappingsCreateBatch %w", err) + } + + return a, tx.Commit(ctx) +} + +// TODO: return validation results +func (s *DBService) GoesPlatformConfigFileUpdate(ctx context.Context, arg db.GoesPlatformConfigFileUpdateParams, dryRun, deleteOldMappings bool) (DbImportResponse, error) { + var a DbImportResponse + names, err := extractSensorNames(arg.Content) + if err != nil { + return a, err + } + + // TODO: proxy request to opendcs service to validate dbimport + + if dryRun { + // TODO: respond with validation result / error + return a, errors.New("TODO") + } + + tx, err := s.db.Begin(ctx) + if err != nil { + return a, err + } + defer s.TxDo(ctx, tx.Rollback) + qtx := s.WithTx(tx) + + if err := qtx.GoesPlatformConfigFileUpdate(ctx, arg); err != nil { + return a, fmt.Errorf("GoesPlatformConfigFileUpdate %w", err) + } + + if deleteOldMappings { + if err := qtx.GoesTelemetryConfigMappingsDeleteForGoesPlatformConfigFile(ctx, arg.ID); err != nil { + return a, fmt.Errorf("GoesTelemetryConfigMappingsDeleteForGoesPlatformConfigFile %w", err) + } + } + + mm := make([]db.GoesTelemetryConfigMappingsCreateBatchParams, 0, len(names)) + for _, n := range names { + mm = append(mm, db.GoesTelemetryConfigMappingsCreateBatchParams{ + GoesPlatformConfigFileID: arg.ID, + PlatformSensorKey: n, + TimeseriesID: nil, + }) + } + + qtx.GoesTelemetryConfigMappingsCreateBatch(ctx, mm).Exec(batchExecErr(&err)) + if err != nil { + return a, fmt.Errorf("GoesTelemetryConfigMappingsCreateBatch %w", err) + } + + return a, tx.Commit(ctx) +} + +func (s *DBService) GoesTelemetryConfigMappingsUpdate(ctx context.Context, cfgID uuid.UUID, mappings []dto.GoesTelemetryConfigMappingDTO) error { + mm := make([]db.GoesTelemetryConfigMappingsCreateBatchParams, len(mappings)) + for i, m := range mappings { + mm[i] = db.GoesTelemetryConfigMappingsCreateBatchParams{ + GoesPlatformConfigFileID: cfgID, + PlatformSensorKey: m.PlatformSensorKey, + TimeseriesID: m.TimeseriesID, + } + } + tx, err := s.db.Begin(ctx) + if err != nil { + return err + } + defer s.TxDo(ctx, tx.Rollback) + qtx := s.WithTx(tx) + + if err := qtx.GoesTelemetryConfigMappingsDeleteForGoesPlatformConfigFile(ctx, cfgID); err != nil { + return fmt.Errorf("GoesTelemetryConfigMappingsDeleteForGoesPlatformConfigFile %w", err) + } + + qtx.GoesTelemetryConfigMappingsCreateBatch(ctx, mm).Exec(batchExecErr(&err)) + if err != nil { + return fmt.Errorf("GoesTelemetryConfigMappingsCreateBatch %w", err) + } + + return tx.Commit(ctx) +} + +func extractSensorNames(xmlStr string) ([]string, error) { + dec := xml.NewDecoder(strings.NewReader(xmlStr)) + for { + tok, err := dec.Token() + if err != nil { + return nil, fmt.Errorf("failed to read xml token: %w", err) + } + + start, ok := tok.(xml.StartElement) + if !ok { + continue + } + + switch start.Name.Local { + case "Platform": + var p Platform + if err := dec.DecodeElement(&p, &start); err != nil { + return nil, fmt.Errorf("failed to decode Platform: %w", err) + } + return extractFromPlatforms([]Platform{p}), nil + case "Database": + var wrapper struct { + Platforms []Platform `xml:"Platform"` + } + if err := dec.DecodeElement(&wrapper, &start); err != nil { + return nil, fmt.Errorf("failed to decode Database: %w", err) + } + return extractFromPlatforms(wrapper.Platforms), nil + + default: + return nil, fmt.Errorf("unexpected root element <%s>", start.Name.Local) + } + } +} + +func extractFromPlatforms(platforms []Platform) []string { + var result []string + for _, platform := range platforms { + for _, sensor := range platform.PlatformConfig.ConfigSensors { + result = append(result, sensor.SensorName+"."+sensor.SensorNumber) + } + } + return result +} + +type OpendcsImportResponse struct { + Status string `json:"status"` + ValidateLog string `json:"validate_log,omitempty"` + ImportLog string `json:"import_log,omitempty"` + CommandOutput string `json:"command_output,omitempty"` + Error string `json:"error,omitempty"` +} + +type opendcsImportRequest struct { + Files []string `json:"files"` + ValidateOnly bool `json:"validate_only"` +} + +type OpendcsImportParams struct { + OpendcsBaseURL string + OpendcsAuthToken string + opendcsImportRequest +} + +func (s *DBService) OpendcsImport(ctx context.Context, arg OpendcsImportParams) (json.RawMessage, error) { + if arg.OpendcsBaseURL == "" { + return nil, fmt.Errorf("opendcsBaseURL not configured") + } + if arg.OpendcsAuthToken == "" { + return nil, fmt.Errorf("opendcsAuthToken not configured") + } + + reqBody, err := json.Marshal(opendcsImportRequest{ + Files: arg.Files, + ValidateOnly: arg.ValidateOnly, + }) + if err != nil { + return nil, err + } + + u := strings.TrimRight(arg.OpendcsBaseURL, "/") + "/import" + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, u, bytes.NewReader(reqBody)) + if err != nil { + return nil, err + } + + httpReq.Header.Set("content-type", "application/json") + q := httpReq.URL.Query() + q.Set("key", arg.OpendcsAuthToken) + httpReq.URL.RawQuery = q.Encode() + + client := &http.Client{Timeout: 5 * time.Minute} + resp, err := client.Do(httpReq) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return json.RawMessage(bodyBytes), fmt.Errorf("opendcs wrapper /import failed: status=%d body=%s", resp.StatusCode, string(bodyBytes)) + } + + return json.RawMessage(bodyBytes), nil +} + +func (s *DBService) GoesPlatformConfigCommit(ctx context.Context, arg []dto.GoesPlatformConfigFileCommitDTO) error { + bb := make([]db.GoesPlatformConfigFileCommitParams, len(arg)) + for idx, b := range arg { + bb[idx] = db.GoesPlatformConfigFileCommitParams{ + ID: b.ID, + CommittedAt: &b.CommittedAt, + } + } + var err error + s.Queries.GoesPlatformConfigFileCommit(ctx, bb).Exec(batchExecErr(&err)) + return err +} diff --git a/api/migrations/repeat/0190__views_telemetry.sql b/api/migrations/repeat/0190__views_telemetry.sql new file mode 100644 index 00000000..ca3f7908 --- /dev/null +++ b/api/migrations/repeat/0190__views_telemetry.sql @@ -0,0 +1,16 @@ +create or replace view v_goes_telemetry_source as +select + s.*, + f.files +from goes_telemetry_source s +left join ( + select coalesce(jsonb_agg(jsonb_build_object( + 'id', cf.id, + 'name', cf.name, + 'project_id', cf.project_id, + 'alias', cf.alias, + 'size_bytes', cf.size_bytes, + 'committed', cf.committed + )), '[]'::jsonb) as files + from goes_platform_config_file cf +) f on true; diff --git a/api/migrations/schema/V1.58.00__goes.sql b/api/migrations/schema/V1.58.00__goes.sql new file mode 100644 index 00000000..ff9975bb --- /dev/null +++ b/api/migrations/schema/V1.58.00__goes.sql @@ -0,0 +1,32 @@ +create table goes_telemetry_source ( + id uuid primary key default uuid_generate_v4(), + name text unique not null +); + + +create table goes_platform_config_file ( + id uuid primary key default uuid_generate_v4(), + goes_telemetry_source_id uuid not null references goes_telemetry_source(id), + project_id uuid not null references project(id), + name text not null, + alias text not null, + size_bytes bigint not null, + content xml not null, + committed boolean not null default false, + committed_at timestamptz, + created_at timestamptz not null default now(), + created_by uuid not null references profile(id), + updated_at timestamptz, + updated_by uuid references profile(id) +); + + +create table goes_telemetry_config_mappings ( + goes_platform_config_file_id uuid not null references goes_platform_config_file(id) on delete cascade, + platform_sensor_key text not null, + timeseries_id uuid unique references timeseries(id), + constraint unique_goes_platform_config_file_id_platform_sensor_key unique (goes_platform_config_file_id, platform_sensor_key) +); + + +insert into goes_telemetry_source (id, name) values ('666e60ec-2c0a-4446-9eda-6f45cbcd0a60', 'OpenDCS #1'); diff --git a/api/queries/goes.sql b/api/queries/goes.sql new file mode 100644 index 00000000..93ebf80a --- /dev/null +++ b/api/queries/goes.sql @@ -0,0 +1,49 @@ +-- name: GoesTelemetrySourceList :many +select * from v_goes_telemetry_source; + + +-- name: GoesPlatformConfigFileCreate :one +insert into goes_platform_config_file (goes_telemetry_source_id, project_id, name, alias, size_bytes, content, created_by) +values (sqlc.arg(goes_telemetry_source_id), sqlc.arg(project_id), sqlc.arg(name), sqlc.arg(alias), sqlc.arg(size_bytes), sqlc.arg(content)::xml, sqlc.arg(created_by)) +returning id; + + +-- name: GoesPlatformConfigFileGet :one +select * from goes_platform_config_file where id=$1; + + +-- name: GoesPlatformConfigFileUpdate :exec +update goes_platform_config_file set + name=sqlc.arg(name), + alias=sqlc.arg(alias), + size_bytes=sqlc.arg(size_bytes), + content=sqlc.arg(content)::xml, + committed=false, + updated_at=sqlc.arg(updated_at), + updated_by=sqlc.arg(updated_by) +where id=sqlc.arg(id); + + +-- name: GoesPlatformConfigFileCommit :batchexec +update goes_platform_config_file set + committed=true, + committed_at=$2 +where id=$1; + + +-- name: GoesPlatformConfigFileDelete :exec +delete from goes_platform_config_file where id=$1; + + +-- name: GoesTelemetryConfigMappingsCreateBatch :batchexec +insert into goes_telemetry_config_mappings (goes_platform_config_file_id, platform_sensor_key, timeseries_id) +values ($1, $2, $3) +on conflict on constraint unique_goes_platform_config_file_id_platform_sensor_key do nothing; + + +-- name: GoesTelemetryConfigMappingsDeleteForGoesPlatformConfigFile :exec +delete from goes_telemetry_config_mappings where goes_platform_config_file_id=$1; + + +-- name: GoesTelemetryConfigMappingsList :many +select * from goes_telemetry_config_mappings where goes_platform_config_file_id=$1; diff --git a/compose.sh b/compose.sh index be3523ff..af99e49d 100755 --- a/compose.sh +++ b/compose.sh @@ -180,6 +180,12 @@ elif [ "$1" = "test" ]; then elif [ "$1" = "mkdocs" ]; then mkdocs +elif [ "$1" = "opendcs-dep" ]; then + cid=$(docker create ghcr.io/opendcs/routingscheduler:7.0-nightly) && + mkdir -p "${parent_path}/opendcs/rsgis/src/main/resources" && + docker cp "$cid:/opt/opendcs/bin/opendcs.jar" "${parent_path}/opendcs/rsgis/src/main/resources/opendcs.jar" && + docker rm "$cid" + else echo -e "usage:\n\t./compose.sh watch\n\t./compose.sh up\n\t./compose.sh down\n\t./compose.sh clean\n\t./compose.sh test\n\t./compose.sh mkdocs" fi diff --git a/docker-compose.yaml b/docker-compose.yaml index 8e7a679a..ab55ced9 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -180,6 +180,16 @@ services: localstack-init: condition: service_completed_successfully + opendcs: + build: + context: ./opendcs + image: opendcs + env_file: + - path: ./env_files/opendcs.env + required: true + - path: .env + required: true + localstack: image: localstack/localstack:4 ports: diff --git a/env_files/opendcs.env b/env_files/opendcs.env new file mode 100644 index 00000000..a4b77198 --- /dev/null +++ b/env_files/opendcs.env @@ -0,0 +1,9 @@ +AWS_ENDPOINT_URL="http://localstack:4566" +DATALOAD_S3_ROOT="s3://corpsmap-data-incoming/instrumentation" +OPENDCS_IMPORT_TOKEN="appkey" +CDADATA_USERNAME= +CDADATA_PASSWORD= +CDABACKUP_USERNAME= +CDABACKUP_PASSWORD= +EDDN1_USERNAME= +EDDN1_PASSWORD= diff --git a/go.work b/go.work index 9671ce28..8fbc0b78 100644 --- a/go.work +++ b/go.work @@ -1,3 +1,5 @@ -go 1.25 +go 1.25.5 use ./api + +use ./opendcs diff --git a/opendcs/.gitignore b/opendcs/.gitignore new file mode 100644 index 00000000..641c73d1 --- /dev/null +++ b/opendcs/.gitignore @@ -0,0 +1,19 @@ +target/ +pom.xml.tag +pom.xml.releaseBackup +pom.xml.versionsBackup +pom.xml.next +release.properties +dependency-reduced-pom.xml +buildNumber.properties +.mvn/timing.properties +# https://maven.apache.org/wrapper/#usage-without-binary-jar +.mvn/wrapper/maven-wrapper.jar + +# Eclipse m2e generated files +# Eclipse Core +.project +# JDT-specific (Eclipse Java Development Tools) +.classpath + +rsgis/src/main/resources diff --git a/opendcs/Dockerfile b/opendcs/Dockerfile new file mode 100644 index 00000000..8b3fc773 --- /dev/null +++ b/opendcs/Dockerfile @@ -0,0 +1,80 @@ +ARG OPENDCS_BASE_IMAGE=ghcr.io/opendcs/routingscheduler:7.0-nightly +ARG MAVEN_BUILD_IMAGE=maven:3-eclipse-temurin-17-noble +ARG GO_BUILD_IMAGE=golang:1.25-alpine +ARG OPENDCS_VERSION=7.0-nightly + +FROM ${OPENDCS_BASE_IMAGE} AS opendcs_patched + +USER root + +COPY patch_opendcs.sh /patch_opendcs.sh +RUN /patch_opendcs.sh && rm /patch_opendcs.sh + +FROM opendcs_patched AS opendcs_base + +FROM ${MAVEN_BUILD_IMAGE} AS maven_builder + +ARG OPENDCS_VERSION + +# workaround for issues with cross-compilation +ENV JAVA_TOOL_OPTIONS="-XX:TieredStopAtLevel=1" + +COPY ./rsgis /opt/rsgis +RUN mkdir -p /opt/rsgis/src/main/resources +COPY --from=opendcs_base /opt/opendcs/bin/opendcs.jar /opt/rsgis/src/main/resources/opendcs.jar + +RUN --mount=type=cache,target=/root/.m2 \ + mvn -f /opt/rsgis/pom.xml -q install:install-file \ + -Dfile=/opt/rsgis/src/main/resources/opendcs.jar \ + -DgroupId=org.opendcs \ + -DartifactId=opendcs \ + -Dversion=${OPENDCS_VERSION} \ + -Dpackaging=jar \ + -DgeneratePom=true + +RUN --mount=type=cache,target=/root/.m2 \ + mvn -f /opt/rsgis/pom.xml -Dopendcs.version=${OPENDCS_VERSION} clean package + +FROM ${GO_BUILD_IMAGE} AS go_builder + +WORKDIR /src + +COPY go.mod go.sum ./ +RUN --mount=type=cache,target=/go/pkg/mod \ + go mod download + +COPY . . + +RUN --mount=type=cache,target=/root/.cache/go-build \ + go build -o /opendcs-wrapper . + +FROM opendcs_patched + +USER root + +RUN rm -rf /opt/java/openjdk/release + +RUN apk add --no-cache coreutils ca-certificates + +ENV INSTRUMENTATION_DCS_CONFIG=${HOME}/midas_config +ENV DCSTOOL_USERDIR=/opt/opendcs +ENV DATABASE_URL=/opt/opendcs/edit-db +ENV OPENDCS_IMPORT_DIR=/opt/opendcs/import +ENV OPENDCS_HTTP_ADDR=:8080 +ENV OPENDCS_LOG_DIR=/opendcs_output + +RUN mkdir -p -m 775 ${DCSTOOL_HOME} /opendcs_output ${OPENDCS_IMPORT_DIR} && \ + chown -R opendcs:opendcs ${DCSTOOL_HOME} /opendcs_output ${OPENDCS_IMPORT_DIR} + +RUN apk del py3-cryptography || true + +COPY --chown=opendcs:opendcs --from=maven_builder /opt/rsgis/target/rsgis.jar ${DCSTOOL_HOME}/dep + +COPY --chown=opendcs:opendcs ./logback.xml ${DCSTOOL_HOME}/logback.xml +COPY --chown=opendcs:opendcs ./decodes.properties ${DCSTOOL_HOME}/decodes.properties +COPY --chown=opendcs:opendcs ./midas_config ${INSTRUMENTATION_DCS_CONFIG} +COPY --chown=opendcs:opendcs --from=go_builder /opendcs-wrapper /usr/local/bin/opendcs-wrapper + +USER opendcs + +CMD ["/usr/local/bin/opendcs-wrapper"] diff --git a/opendcs/decodes.properties b/opendcs/decodes.properties new file mode 100644 index 00000000..e422df70 --- /dev/null +++ b/opendcs/decodes.properties @@ -0,0 +1,47 @@ +# +# The 'EditDatabase' is the provisional working database. +# The default installation is set up for a local XML database. +# +EditDatabaseType=XML +EditDatabaseLocation=/opt/opendcs/edit-db + +# +# For SQL Editable Database, change EditDatabaseType to sql +# Then... +# Format for EditDatabaseLocation is a JDBC Database URL: +# +# jdbc:protocol:[//host[:port]]/databasename +# +# where +# protocol is usually the DB product name like 'postgresql' +# host and port are optional. If not supplied, a local database is assumed. +# databasename is the database name - required. +# +# example: +# EditDatabaseLocation=jdbc:postgresql://mylrgs/decodesedit +# + +# Settings for the dbedit GUI: +EditPresentationGroup=CWMS-English + +# Various agency-specific preferences: +SiteNameTypePreference=CWMS +EditTimeZone=UTC +#EditOutputFormat=Human-Readable + +jdbcDriverClass=org.postgresql.Driver + +SqlKeyGenerator=decodes.sql.SequenceKeyGenerator +#sqlDateFormat= +#sqlTimeZone= + +transportMediumTypePreference=goes + +#defaultDataSource= +#routingStatusDir= +dataTypeStdPreference=CWMS +#decwizTimeZone= +#decwizOutputFormat= +#decwizDebugLevel= +#decwizDecodedDataDir= +#decwizSummaryLog= diff --git a/opendcs/go.mod b/opendcs/go.mod new file mode 100644 index 00000000..1e7e5c72 --- /dev/null +++ b/opendcs/go.mod @@ -0,0 +1,49 @@ +module github.com/USACE/instrumentation-api/opendcs + +go 1.25.5 + +require ( + github.com/danielgtaylor/huma/v2 v2.34.1 + gocloud.dev v0.44.0 +) + +require ( + github.com/aws/aws-sdk-go-v2 v1.39.6 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3 // indirect + github.com/aws/aws-sdk-go-v2/config v1.31.17 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.18.21 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.13 // indirect + github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.3 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.13 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.3 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.4 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.13 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.13 // indirect + github.com/aws/aws-sdk-go-v2/service/s3 v1.89.2 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.30.1 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.5 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.39.1 // indirect + github.com/aws/smithy-go v1.23.2 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/google/wire v0.7.0 // indirect + github.com/googleapis/gax-go/v2 v2.15.0 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/otel v1.37.0 // indirect + go.opentelemetry.io/otel/metric v1.37.0 // indirect + go.opentelemetry.io/otel/sdk v1.37.0 // indirect + go.opentelemetry.io/otel/sdk/metric v1.37.0 // indirect + go.opentelemetry.io/otel/trace v1.37.0 // indirect + golang.org/x/net v0.43.0 // indirect + golang.org/x/sys v0.35.0 // indirect + golang.org/x/text v0.28.0 // indirect + golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect + google.golang.org/api v0.247.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250811230008-5f3141c8851a // indirect + google.golang.org/grpc v1.74.2 // indirect + google.golang.org/protobuf v1.36.7 // indirect +) diff --git a/opendcs/go.sum b/opendcs/go.sum new file mode 100644 index 00000000..bfa4b0ca --- /dev/null +++ b/opendcs/go.sum @@ -0,0 +1,162 @@ +cel.dev/expr v0.24.0 h1:56OvJKSH3hDGL0ml5uSxZmz3/3Pq4tJ+fb1unVLAFcY= +cel.dev/expr v0.24.0/go.mod h1:hLPLo1W4QUmuYdA72RBX06QTs6MXw941piREPl3Yfiw= +cloud.google.com/go v0.121.6 h1:waZiuajrI28iAf40cWgycWNgaXPO06dupuS+sgibK6c= +cloud.google.com/go v0.121.6/go.mod h1:coChdst4Ea5vUpiALcYKXEpR1S9ZgXbhEzzMcMR66vI= +cloud.google.com/go/auth v0.16.4 h1:fXOAIQmkApVvcIn7Pc2+5J8QTMVbUGLscnSVNl11su8= +cloud.google.com/go/auth v0.16.4/go.mod h1:j10ncYwjX/g3cdX7GpEzsdM+d+ZNsXAbb6qXA7p1Y5M= +cloud.google.com/go/auth/oauth2adapt v0.2.8 h1:keo8NaayQZ6wimpNSmW5OPc283g65QNIiLpZnkHRbnc= +cloud.google.com/go/auth/oauth2adapt v0.2.8/go.mod h1:XQ9y31RkqZCcwJWNSx2Xvric3RrU88hAYYbjDWYDL+c= +cloud.google.com/go/compute/metadata v0.8.0 h1:HxMRIbao8w17ZX6wBnjhcDkW6lTFpgcaobyVfZWqRLA= +cloud.google.com/go/compute/metadata v0.8.0/go.mod h1:sYOGTp851OV9bOFJ9CH7elVvyzopvWQFNNghtDQ/Biw= +cloud.google.com/go/iam v1.5.2 h1:qgFRAGEmd8z6dJ/qyEchAuL9jpswyODjA2lS+w234g8= +cloud.google.com/go/iam v1.5.2/go.mod h1:SE1vg0N81zQqLzQEwxL2WI6yhetBdbNQuTvIKCSkUHE= +cloud.google.com/go/monitoring v1.24.2 h1:5OTsoJ1dXYIiMiuL+sYscLc9BumrL3CarVLL7dd7lHM= +cloud.google.com/go/monitoring v1.24.2/go.mod h1:x7yzPWcgDRnPEv3sI+jJGBkwl5qINf+6qY4eq0I9B4U= +cloud.google.com/go/storage v1.56.0 h1:iixmq2Fse2tqxMbWhLWC9HfBj1qdxqAmiK8/eqtsLxI= +cloud.google.com/go/storage v1.56.0/go.mod h1:Tpuj6t4NweCLzlNbw9Z9iwxEkrSem20AetIeH/shgVU= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.29.0 h1:UQUsRi8WTzhZntp5313l+CHIAT95ojUI2lpP/ExlZa4= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.29.0/go.mod h1:Cz6ft6Dkn3Et6l2v2a9/RpN7epQ1GtDlO6lj8bEcOvw= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.53.0 h1:owcC2UnmsZycprQ5RfRgjydWhuoxg71LUfyiQdijZuM= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.53.0/go.mod h1:ZPpqegjbE99EPKsu3iUWV22A04wzGPcAY/ziSIQEEgs= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.53.0 h1:Ron4zCA/yk6U7WOBXhTJcDpsUBG9npumK6xw2auFltQ= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.53.0/go.mod h1:cSgYe11MCNYunTnRXrKiR/tHc0eoKjICUuWpNZoVCOo= +github.com/aws/aws-sdk-go-v2 v1.39.6 h1:2JrPCVgWJm7bm83BDwY5z8ietmeJUbh3O2ACnn+Xsqk= +github.com/aws/aws-sdk-go-v2 v1.39.6/go.mod h1:c9pm7VwuW0UPxAEYGyTmyurVcNrbF6Rt/wixFqDhcjE= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3 h1:DHctwEM8P8iTXFxC/QK0MRjwEpWQeM9yzidCRjldUz0= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3/go.mod h1:xdCzcZEtnSTKVDOmUZs4l/j3pSV6rpo1WXl5ugNsL8Y= +github.com/aws/aws-sdk-go-v2/config v1.31.17 h1:QFl8lL6RgakNK86vusim14P2k8BFSxjvUkcWLDjgz9Y= +github.com/aws/aws-sdk-go-v2/config v1.31.17/go.mod h1:V8P7ILjp/Uef/aX8TjGk6OHZN6IKPM5YW6S78QnRD5c= +github.com/aws/aws-sdk-go-v2/credentials v1.18.21 h1:56HGpsgnmD+2/KpG0ikvvR8+3v3COCwaF4r+oWwOeNA= +github.com/aws/aws-sdk-go-v2/credentials v1.18.21/go.mod h1:3YELwedmQbw7cXNaII2Wywd+YY58AmLPwX4LzARgmmA= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.13 h1:T1brd5dR3/fzNFAQch/iBKeX07/ffu/cLu+q+RuzEWk= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.13/go.mod h1:Peg/GBAQ6JDt+RoBf4meB1wylmAipb7Kg2ZFakZTlwk= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.3 h1:4GNV1lhyELGjMz5ILMRxDvxvOaeo3Ux9Z69S1EgVMMQ= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.3/go.mod h1:br7KA6edAAqDGUYJ+zVVPAyMrPhnN+zdt17yTUT6FPw= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13 h1:a+8/MLcWlIxo1lF9xaGt3J/u3yOZx+CdSveSNwjhD40= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13/go.mod h1:oGnKwIYZ4XttyU2JWxFrwvhF6YKiK/9/wmE3v3Iu9K8= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13 h1:HBSI2kDkMdWz4ZM7FjwE7e/pWDEZ+nR95x8Ztet1ooY= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13/go.mod h1:YE94ZoDArI7awZqJzBAZ3PDD2zSfuP7w6P2knOzIn8M= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 h1:WKuaxf++XKWlHWu9ECbMlha8WOEGm0OUEZqm4K/Gcfk= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4/go.mod h1:ZWy7j6v1vWGmPReu0iSGvRiise4YI5SkR3OHKTZ6Wuc= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.13 h1:eg/WYAa12vqTphzIdWMzqYRVKKnCboVPRlvaybNCqPA= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.13/go.mod h1:/FDdxWhz1486obGrKKC1HONd7krpk38LBt+dutLcN9k= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.3 h1:x2Ibm/Af8Fi+BH+Hsn9TXGdT+hKbDd5XOTZxTMxDk7o= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.3/go.mod h1:IW1jwyrQgMdhisceG8fQLmQIydcT/jWY21rFhzgaKwo= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.4 h1:NvMjwvv8hpGUILarKw7Z4Q0w1H9anXKsesMxtw++MA4= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.4/go.mod h1:455WPHSwaGj2waRSpQp7TsnpOnBfw8iDfPfbwl7KPJE= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.13 h1:kDqdFvMY4AtKoACfzIGD8A0+hbT41KTKF//gq7jITfM= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.13/go.mod h1:lmKuogqSU3HzQCwZ9ZtcqOc5XGMqtDK7OIc2+DxiUEg= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.13 h1:zhBJXdhWIFZ1acfDYIhu4+LCzdUS2Vbcum7D01dXlHQ= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.13/go.mod h1:JaaOeCE368qn2Hzi3sEzY6FgAZVCIYcC2nwbro2QCh8= +github.com/aws/aws-sdk-go-v2/service/s3 v1.89.2 h1:xgBWsgaeUESl8A8k80p6yBdexMWDVeiDmJ/pkjohJ7c= +github.com/aws/aws-sdk-go-v2/service/s3 v1.89.2/go.mod h1:+wArOOrcHUevqdto9k1tKOF5++YTe9JEcPSc9Tx2ZSw= +github.com/aws/aws-sdk-go-v2/service/sso v1.30.1 h1:0JPwLz1J+5lEOfy/g0SURC9cxhbQ1lIMHMa+AHZSzz0= +github.com/aws/aws-sdk-go-v2/service/sso v1.30.1/go.mod h1:fKvyjJcz63iL/ftA6RaM8sRCtN4r4zl4tjL3qw5ec7k= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.5 h1:OWs0/j2UYR5LOGi88sD5/lhN6TDLG6SfA7CqsQO9zF0= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.5/go.mod h1:klO+ejMvYsB4QATfEOIXk8WAEwN4N0aBfJpvC+5SZBo= +github.com/aws/aws-sdk-go-v2/service/sts v1.39.1 h1:mLlUgHn02ue8whiR4BmxxGJLR2gwU6s6ZzJ5wDamBUs= +github.com/aws/aws-sdk-go-v2/service/sts v1.39.1/go.mod h1:E19xDjpzPZC7LS2knI9E6BaRFDK43Eul7vd6rSq2HWk= +github.com/aws/smithy-go v1.23.2 h1:Crv0eatJUQhaManss33hS5r40CG3ZFH+21XSkqMrIUM= +github.com/aws/smithy-go v1.23.2/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 h1:aQ3y1lwWyqYPiWZThqv1aFbZMiM9vblcSArJRf2Irls= +github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= +github.com/danielgtaylor/huma/v2 v2.34.1 h1:EmOJAbzEGfy0wAq/QMQ1YKfEMBEfE94xdBRLPBP0gwQ= +github.com/danielgtaylor/huma/v2 v2.34.1/go.mod h1:ynwJgLk8iGVgoaipi5tgwIQ5yoFNmiu+QdhU7CEEmhk= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.13.4 h1:zEqyPVyku6IvWCFwux4x9RxkLOMUL+1vC9xUFv5l2/M= +github.com/envoyproxy/go-control-plane/envoy v1.32.4 h1:jb83lalDRZSpPWW2Z7Mck/8kXZ5CQAFYVjQcdVIr83A= +github.com/envoyproxy/go-control-plane/envoy v1.32.4/go.mod h1:Gzjc5k8JcJswLjAx1Zm+wSYE20UrLtt7JZMWiWQXQEw= +github.com/envoyproxy/protoc-gen-validate v1.2.1 h1:DEo3O99U8j4hBFwbJfrz9VtgcDfUKS7KJ7spH3d86P8= +github.com/envoyproxy/protoc-gen-validate v1.2.1/go.mod h1:d/C80l/jxXLdfEIhX1W2TmLfsJ31lvEjwamM4DxlWXU= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/go-jose/go-jose/v4 v4.1.1 h1:JYhSgy4mXXzAdF3nUx3ygx347LRXJRrpgyU3adRmkAI= +github.com/go-jose/go-jose/v4 v4.1.1/go.mod h1:BdsZGqgdO3b6tTc6LSE56wcDbMMLuPsw5d4ZD5f94kA= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/go-replayers/grpcreplay v1.3.0 h1:1Keyy0m1sIpqstQmgz307zhiJ1pV4uIlFds5weTmxbo= +github.com/google/go-replayers/grpcreplay v1.3.0/go.mod h1:v6NgKtkijC0d3e3RW8il6Sy5sqRVUwoQa4mHOGEy8DI= +github.com/google/go-replayers/httpreplay v1.2.0 h1:VM1wEyyjaoU53BwrOnaf9VhAyQQEEioJvFYxYcLRKzk= +github.com/google/go-replayers/httpreplay v1.2.0/go.mod h1:WahEFFZZ7a1P4VM1qEeHy+tME4bwyqPcwWbNlUI1Mcg= +github.com/google/martian/v3 v3.3.3 h1:DIhPTQrbPkgs2yJYdXU/eNACCG5DVQjySNRNlflZ9Fc= +github.com/google/martian/v3 v3.3.3/go.mod h1:iEPrYcgCF7jA9OtScMFQyAlZZ4YXTKEtJ1E6RWzmBA0= +github.com/google/s2a-go v0.1.9 h1:LGD7gtMgezd8a/Xak7mEWL0PjoTQFvpRudN895yqKW0= +github.com/google/s2a-go v0.1.9/go.mod h1:YA0Ei2ZQL3acow2O62kdp9UlnvMmU7kA6Eutn0dXayM= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/wire v0.7.0 h1:JxUKI6+CVBgCO2WToKy/nQk0sS+amI9z9EjVmdaocj4= +github.com/google/wire v0.7.0/go.mod h1:n6YbUQD9cPKTnHXEBN2DXlOp/mVADhVErcMFb0v3J18= +github.com/googleapis/enterprise-certificate-proxy v0.3.6 h1:GW/XbdyBFQ8Qe+YAmFU9uHLo7OnF5tL52HFAgMmyrf4= +github.com/googleapis/enterprise-certificate-proxy v0.3.6/go.mod h1:MkHOF77EYAE7qfSuSS9PU6g4Nt4e11cnsDUowfwewLA= +github.com/googleapis/gax-go/v2 v2.15.0 h1:SyjDc1mGgZU5LncH8gimWo9lW1DtIfPibOG81vgd/bo= +github.com/googleapis/gax-go/v2 v2.15.0/go.mod h1:zVVkkxAQHa1RQpg9z2AUCMnKhi0Qld9rcmyfL1OZhoc= +github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo= +github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/spiffe/go-spiffe/v2 v2.5.0 h1:N2I01KCUkv1FAjZXJMwh95KK1ZIQLYbPfhaxw8WS0hE= +github.com/spiffe/go-spiffe/v2 v2.5.0/go.mod h1:P+NxobPc6wXhVtINNtFjNWGBTreew1GBUCwT2wPmb7g= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/zeebo/errs v1.4.0 h1:XNdoD/RRMKP7HD0UhJnIzUy74ISdGGxURlYG8HSWSfM= +github.com/zeebo/errs v1.4.0/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/contrib/detectors/gcp v1.37.0 h1:B+WbN9RPsvobe6q4vP6KgM8/9plR/HNjgGBrfcOlweA= +go.opentelemetry.io/contrib/detectors/gcp v1.37.0/go.mod h1:K5zQ3TT7p2ru9Qkzk0bKtCql0RGkPj9pRjpXgZJZ+rU= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.62.0 h1:rbRJ8BBoVMsQShESYZ0FkvcITu8X8QNwJogcLUmDNNw= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.62.0/go.mod h1:ru6KHrNtNHxM4nD/vd6QrLVWgKhxPYgblq4VAtNawTQ= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0 h1:Hf9xI/XLML9ElpiHVDNwvqI0hIFlzV8dgIr35kV1kRU= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0/go.mod h1:NfchwuyNoMcZ5MLHwPrODwUF1HWCXWrL31s8gSAdIKY= +go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ= +go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I= +go.opentelemetry.io/otel/metric v1.37.0 h1:mvwbQS5m0tbmqML4NqK+e3aDiO02vsf/WgbsdpcPoZE= +go.opentelemetry.io/otel/metric v1.37.0/go.mod h1:04wGrZurHYKOc+RKeye86GwKiTb9FKm1WHtO+4EVr2E= +go.opentelemetry.io/otel/sdk v1.37.0 h1:ItB0QUqnjesGRvNcmAcU0LyvkVyGJ2xftD29bWdDvKI= +go.opentelemetry.io/otel/sdk v1.37.0/go.mod h1:VredYzxUvuo2q3WRcDnKDjbdvmO0sCzOvVAiY+yUkAg= +go.opentelemetry.io/otel/sdk/metric v1.37.0 h1:90lI228XrB9jCMuSdA0673aubgRobVZFhbjxHHspCPc= +go.opentelemetry.io/otel/sdk/metric v1.37.0/go.mod h1:cNen4ZWfiD37l5NhS+Keb5RXVWZWpRE+9WyVCpbo5ps= +go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4= +go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0= +gocloud.dev v0.44.0 h1:iVyMAqFl2r6xUy7M4mfqwlN+21UpJoEtgHEcfiLMUXs= +gocloud.dev v0.44.0/go.mod h1:ZmjROXGdC/eKZLF1N+RujDlFRx3D+4Av2thREKDMVxY= +golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4= +golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc= +golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE= +golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= +golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI= +golang.org/x/oauth2 v0.30.0/go.mod h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU= +golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= +golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= +golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= +golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE= +golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= +golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da h1:noIWHXmPHxILtqtCOPIhSt0ABwskkZKjD3bXGnZGpNY= +golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= +google.golang.org/api v0.247.0 h1:tSd/e0QrUlLsrwMKmkbQhYVa109qIintOls2Wh6bngc= +google.golang.org/api v0.247.0/go.mod h1:r1qZOPmxXffXg6xS5uhx16Fa/UFY8QU/K4bfKrnvovM= +google.golang.org/genproto v0.0.0-20250715232539-7130f93afb79 h1:Nt6z9UHqSlIdIGJdz6KhTIs2VRx/iOsA5iE8bmQNcxs= +google.golang.org/genproto v0.0.0-20250715232539-7130f93afb79/go.mod h1:kTmlBHMPqR5uCZPBvwa2B18mvubkjyY3CRLI0c6fj0s= +google.golang.org/genproto/googleapis/api v0.0.0-20250818200422-3122310a409c h1:AtEkQdl5b6zsybXcbz00j1LwNodDuH6hVifIaNqk7NQ= +google.golang.org/genproto/googleapis/api v0.0.0-20250818200422-3122310a409c/go.mod h1:ea2MjsO70ssTfCjiwHgI0ZFqcw45Ksuk2ckf9G468GA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250811230008-5f3141c8851a h1:tPE/Kp+x9dMSwUm/uM0JKK0IfdiJkwAbSMSeZBXXJXc= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250811230008-5f3141c8851a/go.mod h1:gw1tLEfykwDz2ET4a12jcXt4couGAm7IwsVaTy0Sflo= +google.golang.org/grpc v1.74.2 h1:WoosgB65DlWVC9FqI82dGsZhWFNBSLjQ84bjROOpMu4= +google.golang.org/grpc v1.74.2/go.mod h1:CtQ+BGjaAIXHs/5YS3i473GqwBBa1zGQNevxdeBEXrM= +google.golang.org/protobuf v1.36.7 h1:IgrO7UwFQGJdRNXH/sQux4R1Dj1WAKcLElzeeRaXV2A= +google.golang.org/protobuf v1.36.7/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/opendcs/logback.xml b/opendcs/logback.xml new file mode 100644 index 00000000..ec3699f7 --- /dev/null +++ b/opendcs/logback.xml @@ -0,0 +1,30 @@ + + + + + + + + time + yyyy-MM-dd'T'HH:mm:ss.SSS'Z' + + + + { + "level": "%level", + "thread": "%thread", + "msg": "%message" + } + + + + + + + + + + + + + diff --git a/opendcs/main.go b/opendcs/main.go new file mode 100644 index 00000000..86d9ffd5 --- /dev/null +++ b/opendcs/main.go @@ -0,0 +1,930 @@ +package main + +import ( + "bytes" + "context" + "crypto/subtle" + "errors" + "fmt" + "io" + "log/slog" + "net/http" + "net/url" + "os" + "os/exec" + "os/signal" + "path" + "path/filepath" + "sort" + "strings" + "sync" + "syscall" + "time" + + "github.com/danielgtaylor/huma/v2" + "github.com/danielgtaylor/huma/v2/adapters/humago" + + "gocloud.dev/blob" + _ "gocloud.dev/blob/s3blob" +) + +const ( + VERSION = "1.0.0" +) + +type Config struct { + InstrConfigDir string + ImportDir string + RoutingSpec string + AuthToken string + ListenAddr string + LogDir string + + DcsToolUserDir string + DecodesProps string + + DataloadS3Root string + AWSEndpointURL string +} + +type Response[T any] struct { + Body T +} + +func NewResponse[T any](body T) *Response[T] { return &Response[T]{Body: body} } + +type KeyQueryParam struct { + Key string `query:"key" required:"true" doc:"API key for authentication"` +} + +type ImportRequest struct { + Files []string `json:"files"` + ValidateOnly bool `json:"validate_only"` +} + +type ImportResponse struct { + Status string `json:"status"` + ValidateLog string `json:"validate_log,omitempty"` + ImportLog string `json:"import_log,omitempty"` + CommandOutput string `json:"command_output,omitempty"` + Error string `json:"error,omitempty"` +} + +type TryMutex struct { + ch chan struct{} +} + +func NewTryMutex() *TryMutex { + m := &TryMutex{ch: make(chan struct{}, 1)} + m.ch <- struct{}{} + return m +} + +func (m *TryMutex) TryLock() bool { + select { + case <-m.ch: + return true + default: + return false + } +} + +func (m *TryMutex) Unlock() { + select { + case m.ch <- struct{}{}: + default: + } +} + +type CommandRunner struct{} + +func (r *CommandRunner) Run(ctx context.Context, name string, args []string, env []string) ([]byte, error) { + slog.Info("exec command", "name", name, "args", strings.Join(args, " ")) + cmd := exec.CommandContext(ctx, name, args...) + if env != nil { + cmd.Env = env + } + + var buf bytes.Buffer + cmd.Stdout = &buf + cmd.Stderr = &buf + + err := cmd.Run() + out := buf.Bytes() + + if ctx.Err() != nil { + return out, fmt.Errorf("command canceled: %w", ctx.Err()) + } + if err != nil { + return out, fmt.Errorf("%s failed: %w", name, err) + } + return out, nil +} + +type RouterScheduler struct { + mu sync.Mutex + cmd *exec.Cmd + + logDir string + routingSpec string +} + +func NewRouterScheduler(logDir, routingSpec string) *RouterScheduler { + return &RouterScheduler{ + logDir: logDir, + routingSpec: routingSpec, + } +} + +func (s *RouterScheduler) Start() error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.cmd != nil { + return errors.New("routing scheduler already running") + } + + runtimeLogPath := filepath.Join(s.logDir, "runtime.log") + slog.Info("starting routing scheduler 'rs'", "spec", s.routingSpec, "runtimeLogPath", runtimeLogPath) + + cmd := exec.Command("rs", "-l", runtimeLogPath, s.routingSpec) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + if err := cmd.Start(); err != nil { + return fmt.Errorf("failed to start rs: %w", err) + } + s.cmd = cmd + return nil +} + +func (s *RouterScheduler) Stop(ctx context.Context) error { + s.mu.Lock() + cmd := s.cmd + s.mu.Unlock() + + if cmd == nil || cmd.Process == nil { + return nil + } + + slog.Info("stopping routing scheduler") + _ = cmd.Process.Signal(syscall.SIGTERM) + + done := make(chan error, 1) + go func() { done <- cmd.Wait() }() + + select { + case err := <-done: + s.mu.Lock() + s.cmd = nil + s.mu.Unlock() + + if err != nil { + slog.Warn("routing scheduler exited with error during stop", "err", err) + } + return nil + case <-ctx.Done(): + slog.Warn("routing scheduler did not stop in time; sending SIGKILL") + _ = cmd.Process.Kill() + <-done + + s.mu.Lock() + s.cmd = nil + s.mu.Unlock() + + return fmt.Errorf("rs stop timeout: %w", ctx.Err()) + } +} + +func (s *RouterScheduler) ForwardSignal(sig os.Signal) { + s.mu.Lock() + defer s.mu.Unlock() + if s.cmd != nil && s.cmd.Process != nil { + _ = s.cmd.Process.Signal(sig.(syscall.Signal)) + } +} + +type Importer struct { + cfg Config + rs *RouterScheduler + runner CommandRunner +} + +func NewImporter(cfg Config, rs *RouterScheduler) *Importer { + return &Importer{cfg: cfg, rs: rs, runner: CommandRunner{}} +} + +func (i *Importer) InitialImport(ctx context.Context) error { + slog.Info("performing initial import", "dir", i.cfg.InstrConfigDir) + + files, err := i.findInitialXMLFiles() + if err != nil { + return fmt.Errorf("find initial xml files: %w", err) + } + if len(files) == 0 { + slog.Info("no initial XML files found") + return nil + } + + args := make([]string, 0, len(files)+2) + args = append(args, "-l", "/proc/self/fd/1") + args = append(args, files...) + + slog.Info("initial dbimport", "command", "dbimport", "args", strings.Join(args, " ")) + _, err = i.runner.Run(ctx, "dbimport", args, nil) + if err != nil { + return fmt.Errorf("dbimport command failed: %w", err) + } + return nil +} + +func (i *Importer) ProcessAtomic(ctx context.Context, req ImportRequest) ImportResponse { + ctx, cancel := context.WithTimeout(ctx, 15*time.Minute) + defer cancel() + + files, err := i.resolveImportFiles(req.Files) + if err != nil { + return ImportResponse{Status: "error", Error: err.Error()} + } + if len(files) == 0 { + return ImportResponse{Status: "error", Error: "no xml files found to import"} + } + + basePropsBytes, err := os.ReadFile(i.cfg.DecodesProps) + if err != nil { + return ImportResponse{Status: "error", Error: fmt.Sprintf("failed to read decodes.properties: %v", err)} + } + baseProps := string(basePropsBytes) + + liveEditDB, err := parseEditDBLocation(baseProps) + if err != nil { + return ImportResponse{Status: "error", Error: err.Error()} + } + + ts := time.Now().UTC().Format("20060102-150405") + + // NOTE: stage on the same filesystem as liveEditDB for atomic rename semantics + stageRoot := filepath.Dir(liveEditDB) + stageUserDir := filepath.Join(stageRoot, ".opendcs-userdir-stage-"+ts) + stageEditDB := filepath.Join(stageRoot, ".opendcs-edit-db-stage-"+ts) + backupEditDB := liveEditDB + ".prev-" + ts + + if err := os.MkdirAll(stageUserDir, 0o775); err != nil { + return ImportResponse{Status: "error", Error: fmt.Sprintf("failed to create staging userdir: %v", err)} + } + defer func() { _ = os.RemoveAll(stageUserDir) }() + + if err := copyDir(liveEditDB, stageEditDB); err != nil { + return ImportResponse{Status: "error", Error: fmt.Sprintf("failed to stage edit-db copy: %v", err)} + } + defer func() { _ = os.RemoveAll(stageEditDB) }() + + stageProps := rewriteEditDBLocation(baseProps, stageEditDB) + stagePropsPath := filepath.Join(stageUserDir, "decodes.properties") + if err := os.WriteFile(stagePropsPath, []byte(stageProps), 0o664); err != nil { + return ImportResponse{Status: "error", Error: fmt.Sprintf("failed to write staging decodes.properties: %v", err)} + } + + env := append(os.Environ(), "DCSTOOL_USERDIR="+stageUserDir) + + validateLogPath := filepath.Join(i.cfg.LogDir, fmt.Sprintf("dbimport-validate-%s.log", ts)) + importLogPath := filepath.Join(i.cfg.LogDir, fmt.Sprintf("dbimport-import-%s.log", ts)) + + valArgs := append([]string{"-v", "-l", validateLogPath}, files...) + valOut, valErr := i.runner.Run(ctx, "dbimport", valArgs, env) + + resp := ImportResponse{ + ValidateLog: readFileOrEmpty(validateLogPath), + CommandOutput: string(valOut), + } + + if valErr != nil { + resp.Status = "validation_failed" + resp.Error = valErr.Error() + return resp + } + + if req.ValidateOnly { + resp.Status = "validation_ok" + return resp + } + + impArgs := append([]string{"-l", importLogPath}, files...) + impOut, impErr := i.runner.Run(ctx, "dbimport", impArgs, env) + resp.ImportLog = readFileOrEmpty(importLogPath) + resp.CommandOutput = string(impOut) + + if impErr != nil { + resp.Status = "import_failed" + resp.Error = impErr.Error() + return resp + } + + stopCtx, stopCancel := context.WithTimeout(ctx, 30*time.Second) + defer stopCancel() + if err := i.rs.Stop(stopCtx); err != nil { + return ImportResponse{Status: "error", Error: fmt.Sprintf("failed stopping rs: %v", err)} + } + + if err := atomicSwapDir(liveEditDB, stageEditDB, backupEditDB); err != nil { + _ = i.rs.Start() + return ImportResponse{Status: "error", Error: fmt.Sprintf("failed swapping edit-db: %v", err)} + } + + if err := i.rs.Start(); err != nil { + slog.Error("rs failed to start after commit; rolling back", "err", err) + + if rbErr := atomicRollbackDir(liveEditDB, backupEditDB); rbErr != nil { + return ImportResponse{ + Status: "error", + Error: fmt.Sprintf("rs restart failed (%v) and rollback failed (%v)", err, rbErr), + } + } + + _ = i.rs.Start() + return ImportResponse{Status: "error", Error: fmt.Sprintf("rs restart failed; rolled back to previous db: %v", err)} + } + + resp.Status = "success" + return resp +} + +func (i *Importer) findInitialXMLFiles() ([]string, error) { + var results []string + root := i.cfg.InstrConfigDir + + err := filepath.WalkDir(root, func(pathStr string, d os.DirEntry, err error) error { + if err != nil { + return err + } + if d.IsDir() { + return nil + } + if strings.EqualFold(filepath.Ext(pathStr), ".xml") { + results = append(results, pathStr) + } + return nil + }) + if err != nil { + return nil, err + } + + sort.Strings(results) + return results, nil +} + +func (i *Importer) resolveImportFiles(files []string) ([]string, error) { + var resolved []string + + if len(files) == 0 { + err := filepath.WalkDir(i.cfg.ImportDir, func(pathStr string, d os.DirEntry, err error) error { + if err != nil { + return err + } + if d.IsDir() { + return nil + } + if strings.EqualFold(filepath.Ext(pathStr), ".xml") { + resolved = append(resolved, pathStr) + } + return nil + }) + if err != nil { + return nil, fmt.Errorf("walk import dir: %w", err) + } + sort.Strings(resolved) + return resolved, nil + } + + baseImport := filepath.Clean(i.cfg.ImportDir) + + for _, f := range files { + if !filepath.IsAbs(f) { + f = filepath.Join(i.cfg.ImportDir, f) + } + clean := filepath.Clean(f) + if !strings.HasPrefix(clean, baseImport+string(os.PathSeparator)) && clean != baseImport { + return nil, fmt.Errorf("file %q is outside allowed import dir %q", clean, i.cfg.ImportDir) + } + resolved = append(resolved, clean) + } + + sort.Strings(resolved) + return resolved, nil +} + +type Uploader struct { + root string + awsEndpoint string + + once sync.Once + b *blob.Bucket + err error +} + +func NewUploader(root, endpoint string) *Uploader { + return &Uploader{root: root, awsEndpoint: endpoint} +} + +func (u *Uploader) Bucket(ctx context.Context) (*blob.Bucket, error) { + u.once.Do(func() { + if u.root == "" { + u.err = fmt.Errorf("DATALOAD_S3_ROOT is not set; cannot upload") + return + } + bucketURL, _, err := buildBucketURLFromRoot(u.root, u.awsEndpoint) + if err != nil { + u.err = err + return + } + b, err := blob.OpenBucket(ctx, bucketURL) + if err != nil { + u.err = fmt.Errorf("failed to open bucket %q: %w", bucketURL, err) + return + } + u.b = b + }) + return u.b, u.err +} + +func (u *Uploader) Close() { + if u.b != nil { + if err := u.b.Close(); err != nil { + slog.Warn("failed to close bucket", "err", err) + } + } +} + +func (u *Uploader) RunUploadCLI(ctx context.Context, filePath string) error { + if u.root == "" { + return fmt.Errorf("DATALOAD_S3_ROOT is not set; cannot upload") + } + + const app = "goes" + + stat, err := os.Stat(filePath) + if err != nil { + return fmt.Errorf("input file %q does not exist: %w", filePath, err) + } + if stat.Size() == 0 { + if err := os.Remove(filePath); err != nil && !os.IsNotExist(err) { + slog.Warn("failed to remove empty file", "file", filePath, "err", err) + } + return nil + } + + bucketURL, prefix, err := buildBucketURLFromRoot(u.root, u.awsEndpoint) + if err != nil { + return err + } + + b, err := u.Bucket(ctx) + if err != nil { + return err + } + + base := filepath.Base(filePath) + platform := derivePlatformFromFilename(base) + key := path.Join(prefix, app, platform, base) + + slog.Info("uploading file to bucket", "file", filePath, "bucketURL", bucketURL, "key", key) + + f, err := os.Open(filePath) + if err != nil { + return fmt.Errorf("failed to open file %q: %w", filePath, err) + } + defer f.Close() + + w, err := b.NewWriter(ctx, key, nil) + if err != nil { + return fmt.Errorf("failed to create blob writer: %w", err) + } + + if _, err := io.Copy(w, f); err != nil { + _ = w.Close() + return fmt.Errorf("failed to stream file to bucket: %w", err) + } + if err := w.Close(); err != nil { + return fmt.Errorf("failed to finalize blob write: %w", err) + } + + if err := os.Remove(filePath); err != nil && !os.IsNotExist(err) { + slog.Warn("failed to remove local file after upload", "file", filePath, "err", err) + } + return nil +} + +type App struct { + cfg Config + importMu *TryMutex + + rs *RouterScheduler + importer *Importer + uploader *Uploader + + httpServer *http.Server +} + +func NewApp(cfg Config) *App { + rs := NewRouterScheduler(cfg.LogDir, cfg.RoutingSpec) + return &App{ + cfg: cfg, + importMu: NewTryMutex(), + rs: rs, + importer: NewImporter(cfg, rs), + uploader: NewUploader(cfg.DataloadS3Root, cfg.AWSEndpointURL), + } +} + +func (a *App) Close() { + a.uploader.Close() +} + +func (a *App) RunServer(ctx context.Context) error { + router := http.NewServeMux() + api := humago.New(router, huma.DefaultConfig("OpenDCS Wrapper", VERSION)) + + huma.Post(api, "/import", func(ctx context.Context, input *struct { + KeyQueryParam + Body ImportRequest `contentType:"application/json"` + }) (*Response[ImportResponse], error) { + if err := a.checkKey(input.Key); err != nil { + return nil, err + } + + if !a.importMu.TryLock() { + return nil, huma.NewError(http.StatusConflict, "import already in progress") + } + defer a.importMu.Unlock() + + resp := a.importer.ProcessAtomic(ctx, input.Body) + if resp.Status == "conflict" { + return nil, huma.NewError(http.StatusConflict, resp.Error) + } + return NewResponse(resp), nil + }) + + type RuntimeLogs struct { + Log string `json:"log" doc:"Contents of routing scheduler runtime log"` + } + huma.Get(api, "/logs/runtime", func(ctx context.Context, input *struct { + KeyQueryParam + }) (*Response[RuntimeLogs], error) { + if err := a.checkKey(input.Key); err != nil { + return nil, err + } + runtimeLogPath := filepath.Join(a.cfg.LogDir, "runtime.log") + data, err := os.ReadFile(runtimeLogPath) + if err != nil { + return nil, huma.NewError(http.StatusInternalServerError, fmt.Sprintf("failed to read runtime log: %v", err)) + } + return NewResponse(RuntimeLogs{Log: string(data)}), nil + }) + + router.HandleFunc("/healthz", handleHealth) + + a.httpServer = &http.Server{ + Addr: a.cfg.ListenAddr, + Handler: router, + ReadHeaderTimeout: 5 * time.Second, + ReadTimeout: 30 * time.Second, + WriteTimeout: 30 * time.Second, + IdleTimeout: 2 * time.Minute, + MaxHeaderBytes: 1 << 20, + } + + errCh := make(chan error, 1) + go func() { + slog.Info("http api listening", "addr", a.cfg.ListenAddr) + if err := a.httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + errCh <- err + return + } + errCh <- nil + }() + + select { + case <-ctx.Done(): + shutdownCtx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + _ = a.httpServer.Shutdown(shutdownCtx) + return ctx.Err() + case err := <-errCh: + return err + } +} + +func (a *App) checkKey(key string) error { + if subtle.ConstantTimeCompare([]byte(key), []byte(a.cfg.AuthToken)) != 1 { + return huma.NewError(http.StatusUnauthorized, "invalid key") + } + return nil +} + +func (a *App) Run(ctx context.Context) error { + if err := a.importer.InitialImport(ctx); err != nil { + return err + } + + if err := a.rs.Start(); err != nil { + return err + } + + go func() { + if err := a.RunServer(ctx); err != nil && !errors.Is(err, context.Canceled) { + slog.Error("http server failed", "err", err) + } + }() + + <-ctx.Done() + + stopCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + _ = a.rs.Stop(stopCtx) + return ctx.Err() +} + +func main() { + initLogger() + + cfg, err := loadConfig() + if err != nil { + slog.Error("config error", "err", err) + os.Exit(1) + } + + app := NewApp(cfg) + defer app.Close() + + if len(os.Args) > 1 && os.Args[1] == "upload" { + if len(os.Args) < 3 { + slog.Error("usage: upload requires file path argument", "argv", os.Args) + os.Exit(1) + } + if err := app.uploader.RunUploadCLI(context.Background(), os.Args[2]); err != nil { + slog.Error("upload failed", "err", err) + os.Exit(1) + } + return + } + + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + slog.Info("starting opendcs wrapper (server mode)", "version", VERSION) + + if err := app.Run(ctx); err != nil && !errors.Is(err, context.Canceled) { + slog.Error("app exited with error", "err", err) + os.Exit(1) + } +} + +func initLogger() { + var level slog.Level + levelText := getenvDefault("LOGLEVEL", "INFO") + if err := level.UnmarshalText([]byte(levelText)); err != nil { + panic(err) + } + logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: level})) + slog.SetDefault(logger) +} + +func loadConfig() (Config, error) { + userDir := getenvDefault("DCSTOOL_USERDIR", "/opt/opendcs") + props := getenvDefault("DECODES_PROPERTIES_PATH", filepath.Join(userDir, "decodes.properties")) + + c := Config{ + InstrConfigDir: os.Getenv("INSTRUMENTATION_DCS_CONFIG"), + ImportDir: getenvDefault("OPENDCS_IMPORT_DIR", "/opt/opendcs/import"), + RoutingSpec: getenvDefault("ROUTING_SPEC", "goes"), + AuthToken: os.Getenv("OPENDCS_IMPORT_TOKEN"), + ListenAddr: getenvDefault("OPENDCS_HTTP_ADDR", ":8080"), + LogDir: getenvDefault("OPENDCS_LOG_DIR", "/opendcs_output"), + + DcsToolUserDir: userDir, + DecodesProps: props, + + DataloadS3Root: os.Getenv("DATALOAD_S3_ROOT"), + AWSEndpointURL: os.Getenv("AWS_ENDPOINT_URL"), + } + + if c.InstrConfigDir == "" { + return Config{}, fmt.Errorf("INSTRUMENTATION_DCS_CONFIG must be set") + } + if c.AuthToken == "" { + return Config{}, fmt.Errorf("OPENDCS_IMPORT_TOKEN must be set for secure access") + } + if err := os.MkdirAll(c.ImportDir, 0o775); err != nil { + return Config{}, fmt.Errorf("failed to ensure import dir exists (%s): %w", c.ImportDir, err) + } + if err := os.MkdirAll(c.LogDir, 0o775); err != nil { + return Config{}, fmt.Errorf("failed to ensure log dir exists (%s): %w", c.LogDir, err) + } + return c, nil +} + +func getenvDefault(key, def string) string { + if v := os.Getenv(key); v != "" { + return v + } + return def +} + +func handleHealth(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = io.WriteString(w, "ok\n") +} + +func readFileOrEmpty(pathStr string) string { + data, err := os.ReadFile(pathStr) + if err != nil { + return "" + } + return string(data) +} + +func parseEditDBLocation(props string) (string, error) { + lines := strings.SplitSeq(props, "\n") + for line := range lines { + trim := strings.TrimSpace(line) + if trim == "" || strings.HasPrefix(trim, "#") { + continue + } + if v, found := strings.CutPrefix(trim, "EditDatabaseLocation="); found { + v = strings.TrimSpace(v) + if v == "" { + return "", fmt.Errorf("EditDatabaseLocation present but empty in decodes.properties") + } + return v, nil + } + } + return "", fmt.Errorf("EditDatabaseLocation not found in decodes.properties") +} + +func rewriteEditDBLocation(props, newPath string) string { + lines := strings.Split(props, "\n") + out := make([]string, 0, len(lines)+1) + replaced := false + + for _, line := range lines { + trim := strings.TrimSpace(line) + if strings.HasPrefix(trim, "EditDatabaseLocation=") { + out = append(out, "EditDatabaseLocation="+newPath) + replaced = true + continue + } + out = append(out, line) + } + if !replaced { + out = append(out, "EditDatabaseLocation="+newPath) + } + return strings.Join(out, "\n") +} + +func atomicSwapDir(liveDir, stagedDir, backupDir string) error { + if _, err := os.Stat(stagedDir); err != nil { + return fmt.Errorf("staged dir missing: %w", err) + } + + if _, err := os.Stat(liveDir); err == nil { + if err := os.Rename(liveDir, backupDir); err != nil { + return fmt.Errorf("failed to move live->backup: %w", err) + } + } + + if err := os.Rename(stagedDir, liveDir); err != nil { + _ = os.Rename(backupDir, liveDir) + return fmt.Errorf("failed to move staged->live: %w", err) + } + return nil +} + +func atomicRollbackDir(liveDir, backupDir string) error { + if _, err := os.Stat(backupDir); err != nil { + return fmt.Errorf("backup dir missing: %w", err) + } + + badDir := liveDir + ".bad-" + time.Now().UTC().Format("20060102-150405") + if _, err := os.Stat(liveDir); err == nil { + _ = os.Rename(liveDir, badDir) + } + + if err := os.Rename(backupDir, liveDir); err != nil { + return fmt.Errorf("failed to restore backup->live: %w", err) + } + return nil +} + +func copyDir(src, dst string) error { + src = filepath.Clean(src) + dst = filepath.Clean(dst) + + info, err := os.Stat(src) + if err != nil { + return err + } + if !info.IsDir() { + return fmt.Errorf("source is not a directory: %s", src) + } + + if err := os.MkdirAll(dst, info.Mode().Perm()); err != nil { + return err + } + + return filepath.WalkDir(src, func(p string, d os.DirEntry, walkErr error) error { + if walkErr != nil { + return walkErr + } + rel, err := filepath.Rel(src, p) + if err != nil { + return err + } + if rel == "." { + return nil + } + target := filepath.Join(dst, rel) + + if d.IsDir() { + di, err := d.Info() + if err != nil { + return err + } + return os.MkdirAll(target, di.Mode().Perm()) + } + + fi, err := d.Info() + if err != nil { + return err + } + + if fi.Mode()&os.ModeSymlink != 0 { + linkTarget, err := os.Readlink(p) + if err != nil { + return err + } + return os.Symlink(linkTarget, target) + } + + return copyFile(p, target, fi.Mode().Perm()) + }) +} + +func copyFile(src, dst string, perm os.FileMode) error { + in, err := os.Open(src) + if err != nil { + return err + } + defer in.Close() + + if err := os.MkdirAll(filepath.Dir(dst), 0o775); err != nil { + return err + } + + out, err := os.OpenFile(dst, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, perm) + if err != nil { + return err + } + defer func() { _ = out.Close() }() + + if _, err := io.Copy(out, in); err != nil { + return err + } + return out.Close() +} + +func buildBucketURLFromRoot(root, awsEndpoint string) (bucketURL string, prefix string, err error) { + u, err := url.Parse(root) + if err != nil { + return "", "", fmt.Errorf("invalid DATALOAD_S3_ROOT %q: %w", root, err) + } + if u.Scheme != "s3" { + return "", "", fmt.Errorf("DATALOAD_S3_ROOT %q must use s3:// scheme", root) + } + if u.Host == "" { + return "", "", fmt.Errorf("DATALOAD_S3_ROOT %q missing bucket name", root) + } + + prefix = strings.TrimPrefix(u.Path, "/") + + v := u.Query() + if awsEndpoint != "" { + v.Set("endpoint", awsEndpoint) + } + u.Path = "" + u.RawQuery = v.Encode() + + bucketURL = u.Scheme + "://" + u.Host + if u.RawQuery != "" { + bucketURL += "?" + u.RawQuery + } + + return bucketURL, prefix, nil +} + +func derivePlatformFromFilename(filename string) string { + name := strings.TrimSuffix(filename, filepath.Ext(filename)) + if idx := strings.LastIndex(name, "-"); idx > 0 { + return name[:idx] + } + return name +} diff --git a/opendcs/midas_config/datasource/hotbackup.xml b/opendcs/midas_config/datasource/hotbackup.xml new file mode 100644 index 00000000..2753d808 --- /dev/null +++ b/opendcs/midas_config/datasource/hotbackup.xml @@ -0,0 +1,25 @@ + + + + + + + hostname=cdadata.wcda.noaa.gov, port=16003, password=${env.CDADATA_PASSWORD}, username=${env.CDADATA_USERNAME} + + + + + + + hostname=cdabackup.wcda.noaa.gov, port=16003, password=${env.CDABACKUP_PASSWORD}, username=${env.CDABACKUP_USERNAME} + + + + + + + hostname=lrgseddn1.cr.usgs.gov, port=16003, password=${env.EDDN1_PASSWORD}, username=${env.EDDN1_USERNAME} + + + + diff --git a/opendcs/midas_config/reference/DataTypeEquivalenceList.xml b/opendcs/midas_config/reference/DataTypeEquivalenceList.xml new file mode 100644 index 00000000..371ba059 --- /dev/null +++ b/opendcs/midas_config/reference/DataTypeEquivalenceList.xml @@ -0,0 +1,590 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/opendcs/midas_config/reference/EnumList.xml b/opendcs/midas_config/reference/EnumList.xml new file mode 100644 index 00000000..ec9cb693 --- /dev/null +++ b/opendcs/midas_config/reference/EnumList.xml @@ -0,0 +1,402 @@ + + + + + Read messages from LRGS data server + decodes.datasource.LrgsDataSource + 1 + + + Read messages from one source in a group + decodes.datasource.HotBackupGroup + 2 + + + Read message(s) from specified file + decodes.datasource.FileDataSource + 3 + + + + Read messages from files in a specified directory + + decodes.datasource.DirectoryDataSource + 4 + + + Read messages a socket stream + decodes.datasource.SocketStreamDataSource + 5 + + + Read messages from all sources in a group + decodes.datasource.RoundRobinGroup + 6 + + + + + + National Data Buoy Center Context-Sensitive Parser + + NDBCMessageParser + 1 + + + Hard-coded NOS data parser + NOSMessageParser + 2 + + + DECODES Format Statements and Unit Conversions + DecodesScript + 3 + + + + + National Weather Service Handbook 5 Name + 1 + + + Local Site Naming Convention + 2 + + + USGS Numeric Station ID + 3 + + + Columbia Basin TeleType + 4 + + + CWMS Name + + + 5 + + + UUID4 identifier + + + 6 + + + + + Pipe data to standard output. + decodes.consumer.PipeConsumer + 1 + + + Save data in specified file + decodes.consumer.FileConsumer + 2 + + + Append data to file in a specified directory. + decodes.consumer.FileAppendConsumer + 3 + + + + Save message data in files in a directory, then optionally run a trigger + script. + + decodes.consumer.DirectoryConsumer + 4 + + + + + degrees or radians + 1 + + + Area + 2 + + + Volume / Time + 3 + + + Length or distance + 4 + + + Ratio + 5 + + + Temperature + 6 + + + Time + 7 + + + Velocity + 8 + + + voltage + 9 + + + Volume + 10 + + + + + Y = Ax + B + LinearConverter + 1 + + + No Conversion (output = input) + NoConversion + 2 + + + Y = Ax5 + Bx4 + Cx3 + Dx2 + Ex + F + Poly5Converter + 3 + + + Y = A * (B + x)^C + D + UsgsStdConverter + 4 + + + + + Descending + 1 + + + Ascending + 2 + + + + + + Input must match table value to produce an output + + ExactMatchLookup + + + Exponential interpolation between table values + ExponentialInterpLookup + + + Linear interpolation between table values + LinearInterpLookup + + + Logarithmic interpolation between table values + LogarithmicInterpLookup + + + Inputs are rounded to nearest table value + RoundingLookup + + + Inputs are truncated to lower table value + TruncatingLookup + + + + + Apply to all platforms + + + + Apply to platforms sharing a given configuration + + + + Apply to specific platform(s) + + + Apply to platforms in a network list + + + Apply to platform at a given site + + + + + English Measurements + 1 + + + International Metric System + 2 + + + + + Display Format + decodes.consumer.HumanReadableFormatter + 1 + + + Standard Hydrometerologic Exchange Format + decodes.consumer.ShefFormatter + 2 + + + USACE HEC Intermediate SHEF Format + decodes.consumer.ShefitFormatter + 3 + + + USGS Standard Message Format + decodes.consumer.StdmsgFormatter + 4 + + + Compatible with EMIT ASCII format + decodes.consumer.EmitAsciiFormatter + 5 + + + Compatible with EMIT Oracle format + decodes.consumer.EmitOracleFormatter + 6 + + + Dump Format for testing and trouble-shooting + decodes.consumer.DumpFormatter + 7 + + + Transmission Monitor + decodes.consumer.TransmitMonitorFormatter + 8 + + + Delimited row-column format + decodes.consumer.TableFormatter + 9 + + + Hydstra Format. + decodes.consumer.HydstraFormatter + 10 + + + HTML Report Format + decodes.consumer.HtmlFormatter + 11 + + + CWMS Oracle with TSID format + rsgis.consumer.CwmsOracleFormatter + + 12 + + + CWMS Oracle Output Formatter + rsgis.consumer.CwmsOutputFormatter + + 13 + + + CWMS Oracle with TSID format + rsgis.consumer.MidasOutputFormatter + + 14 + + + + shef-pe + + + Standard Hydrometeorologic Exchange Format Physical Element Code + + 1 + + + Environmental Protection Agency Parameter Code + 2 + + + U.S. Bureau of Reclamations Hydrologic Database + 3 + + + Hydstra Data Code + 4 + + + CWMS parameters + + + 5 + + + UUID4 + + + 6 + + + + + Electronic Data Logger File + 1 + + + GOES DCP + 2 + + + GOES DCP Random Message + 3 + + + GOES DCP Self-Timed Message + 4 + + + LRGS Archive File + 5 + + + Data collected via telephone telementry + 6 + + + + + + + Data Collection Platform + 1 + + + Transmitter, data logger, modem, etc. + 2 + + + Environmental Sensor + 3 + + + + + Fixed Regular Interval + 1 + + + Variable, Triggered or Random + 2 + + + diff --git a/opendcs/midas_config/reference/MIDAS-English.xml b/opendcs/midas_config/reference/MIDAS-English.xml new file mode 100644 index 00000000..270fb68d --- /dev/null +++ b/opendcs/midas_config/reference/MIDAS-English.xml @@ -0,0 +1,268 @@ + + + true + + + + in + 2 + + + + + in + 2 + + + + + ft + 2 + + + + + ft + 2 + + + + + ft + 2 + + + + + cfs + 2 + + + + + cfs + 2 + + + + + cfs + 2 + + + + + ft + 2 + + + + + W/m2 + 3 + + + + + ft + 2 + + + + + ft + 2 + + + + + ft + 2 + + + + + ft + 2 + + + + + ft + 2 + + + + + ft + 2 + + + + + kW + 3 + + + + + in + 2 + + + + + mb + 2 + + + + + J/m2 + 3 + + + + + % + 3 + + + + + rev + 3 + + + + + mph + 3 + + + + + mph + 2 + + + + + mph + 2 + + + + + rpm + 3 + + + + + ft + 2 + + + + + ft + 2 + + + + + ft + 2 + + + + + ac-ft + 3 + + + + + F + 3 + + + + + F + 2 + + + + + F + 2 + + + + + in + 2 + + + + + in + 2 + + + + + hr + 3 + + + + + ft + 2 + + + + + JTU + 3 + + + + + FNU + 3 + + + + + JTU + 3 + + + + + NTU + 3 + + + + + Volts + 3 + + + + + ac-ft + 3 + + + + + su + 2 + + diff --git a/opendcs/midas_config/reference/MIDAS-Metric.xml b/opendcs/midas_config/reference/MIDAS-Metric.xml new file mode 100644 index 00000000..5f548c3a --- /dev/null +++ b/opendcs/midas_config/reference/MIDAS-Metric.xml @@ -0,0 +1,274 @@ + + + false + + + + m + 3 + + + + + m + 3 + + + + + W/m2 + 3 + + + + + m + 3 + + + + + su + 3 + + + + + kW + 3 + + + + + mm + 3 + + + + + mb + 3 + + + + + J/m2 + 3 + + + + + % + 3 + + + + + rev + 3 + + + + + kph + 3 + + + + + rpm + 3 + + + + + m3 + 3 + + + + + C + 3 + + + + + cm + 3 + + + + + hr + 3 + + + + + m + 3 + + + + + JTU + 3 + + + + + FNU + 3 + + + + + JTU + 3 + + + + + NTU + 3 + + + + + v + 3 + + + + + m3 + 3 + + + + + mm + 3 + + + + + mm + 3 + + + + + m + 3 + + + + + m + 3 + + + + + m + 3 + + + + + cms + 3 + + + + + cms + 3 + + + + + cms + 3 + + + + + m + 3 + + + + + m + 3 + + + + + m + 3 + + + + + m + 3 + + + + + m + 3 + + + + + kph + 3 + + + + + kph + 3 + + + + + m + 3 + + + + + m + 3 + + + + + m + 3 + + + + + C + 3 + + + + + C + 3 + + + + + cm + 3 + + diff --git a/opendcs/midas_config/routing/goes.xml b/opendcs/midas_config/routing/goes.xml new file mode 100644 index 00000000..c2b4bddb --- /dev/null +++ b/opendcs/midas_config/routing/goes.xml @@ -0,0 +1,55 @@ + + + true + + + + + + hostname=cdadata.wcda.noaa.gov, port=16003, password=${env.CDADATA_PASSWORD}, username=${env.CDADATA_USERNAME} + + + + + + + hostname=cdabackup.wcda.noaa.gov, port=16003, password=${env.CDABACKUP_PASSWORD}, username=${env.CDABACKUP_USERNAME} + + + + + + + hostname=lrgseddn1.cr.usgs.gov, port=16003, password=${env.EDDN1_PASSWORD}, username=${env.EDDN1_USERNAME} + + + + + false + false + midas-formatter + UTC + MIDAS-English + directory + /opendcs_output + now - 2 hours + + + yyyy-MM-dd'T'HH:mm:ss'Z' + + + ${java.TRANSPORTID}-$DATE(yyyyMMddHHmmss) + + + False + + + , + + + l + + + /opendcs-wrapper upload ${java.FILENAME} + + diff --git a/opendcs/midas_config/routing/monitor.xml b/opendcs/midas_config/routing/monitor.xml new file mode 100644 index 00000000..efa0e196 --- /dev/null +++ b/opendcs/midas_config/routing/monitor.xml @@ -0,0 +1,51 @@ + + + true + + + + + + hostname=cdadata.wcda.noaa.gov, port=16003, password=${env.CDADATA_PASSWORD}, username=${env.CDADATA_USERNAME} + + + + + + + hostname=cdabackup.wcda.noaa.gov, port=16003, password=${env.CDABACKUP_PASSWORD}, username=${env.CDABACKUP_USERNAME} + + + + + + + hostname=lrgseddn1.cr.usgs.gov, port=16003, password=${env.EDDN1_PASSWORD}, username=${env.EDDN1_USERNAME} + + + + + false + false + transmit-monitor + UTC + directory + /opendcs_output + now - 2 hours + + + ${java.TRANSPORTID}-$DATE(yyyyMMddHHmmss) + + + , + + + False + + + l + + + /opendcs-wrapper upload ${java.FILENAME} + + diff --git a/opendcs/patch_opendcs.sh b/opendcs/patch_opendcs.sh new file mode 100755 index 00000000..aaf49e6f --- /dev/null +++ b/opendcs/patch_opendcs.sh @@ -0,0 +1,31 @@ +#!/bin/sh + +set -eu + +rm -f /opt/opendcs/dep/commons-net-*.jar \ + /opt/opendcs/dep/jackson-core*.jar \ + /opt/opendcs/dep/jackson-dataformat-toml*.jar \ + /opt/opendcs/dep/commons-vfs2-*.jar \ + /opt/opendcs/dep/javax.el-*.jar \ + /opt/opendcs/dep/jdom-*.jar \ + /opt/opendcs/dep/poi-*.jar \ + /opt/opendcs/dep/postgresql-*.jar \ + /opt/opendcs/dep/jetty-*.jar || true + +wget -qO /opt/opendcs/dep/commons-net-3.11.1.jar \ + "https://repo1.maven.org/maven2/commons-net/commons-net/3.11.1/commons-net-3.11.1.jar" && + wget -qO /opt/opendcs/dep/jackson-dataformat-toml-2.18.2.jar \ + "https://repo1.maven.org/maven2/com/fasterxml/jackson/dataformat/jackson-dataformat-toml/2.18.2/jackson-dataformat-toml-2.18.2.jar" && + wget -qO /opt/opendcs/dep/jackson-core-2.19.2.jar \ + "https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.19.2/jackson-core-2.19.2.jar" && + wget -qO /opt/opendcs/dep/commons-vfs2-2.10.0.jar \ + "https://repo1.maven.org/maven2/org/apache/commons/commons-vfs2/2.10.0/commons-vfs2-2.10.0.jar" && + wget -qO /opt/opendcs/dep/jdom2-2.0.6.1.jar \ + "https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar" && + wget -qO /opt/opendcs/dep/poi-5.4.1.jar \ + "https://repo1.maven.org/maven2/org/apache/poi/poi/5.4.1/poi-5.4.1.jar" && + wget -qO /opt/opendcs/dep/postgresql-42.7.7.jar \ + "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.7.7/postgresql-42.7.7.jar" && + wget -qO /opt/opendcs/dep/logstash-logback-encoder.jar \ + "https://repo1.maven.org/maven2/net/logstash/logback/logstash-logback-encoder/8.0/logstash-logback-encoder-8.0.jar" && + chown opendcs:opendcs /opt/opendcs/dep/*.jar diff --git a/opendcs/rsgis/.mvn/jvm.config b/opendcs/rsgis/.mvn/jvm.config new file mode 100644 index 00000000..e69de29b diff --git a/opendcs/rsgis/.mvn/maven.config b/opendcs/rsgis/.mvn/maven.config new file mode 100644 index 00000000..e69de29b diff --git a/opendcs/rsgis/pom.xml b/opendcs/rsgis/pom.xml new file mode 100644 index 00000000..0bb326ed --- /dev/null +++ b/opendcs/rsgis/pom.xml @@ -0,0 +1,67 @@ + + + 4.0.0 + + rsgis.consumer + rsgis + 1.0-SNAPSHOT + rsgis + jar + + + UTF-8 + 17 + 7.0-nightly + + + + + org.opendcs + opendcs + ${opendcs.version} + provided + + + + org.slf4j + slf4j-api + 2.0.16 + provided + + + + + rsgis + + + + + maven-clean-plugin + 3.4.0 + + + maven-resources-plugin + 3.3.1 + + + maven-compiler-plugin + 3.13.0 + + + maven-surefire-plugin + 3.3.0 + + + maven-jar-plugin + 3.4.2 + + + maven-install-plugin + 3.1.2 + + + + + diff --git a/opendcs/rsgis/src/main/java/rsgis/consumer/MidasOutputFormatter.java b/opendcs/rsgis/src/main/java/rsgis/consumer/MidasOutputFormatter.java new file mode 100644 index 00000000..fb21bf73 --- /dev/null +++ b/opendcs/rsgis/src/main/java/rsgis/consumer/MidasOutputFormatter.java @@ -0,0 +1,124 @@ +package rsgis.consumer; + +import decodes.consumer.DataConsumer; +import decodes.consumer.DataConsumerException; +import decodes.consumer.OutputFormatter; +import decodes.consumer.OutputFormatterException; +import decodes.datasource.RawMessage; +import decodes.datasource.UnknownPlatformException; +import decodes.db.Platform; +import decodes.db.PresentationGroup; +import decodes.decoder.DecodedMessage; +import decodes.decoder.Sensor; +import decodes.decoder.TimeSeries; +import decodes.util.PropertySpec; +import ilex.util.PropertiesUtil; +import ilex.var.TimedVariable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.SimpleDateFormat; +import java.util.Iterator; +import java.util.Properties; +import java.util.TimeZone; + +public class MidasOutputFormatter extends OutputFormatter { + private static final Logger log = LoggerFactory.getLogger(MidasOutputFormatter.class); + + private String delimiter = " "; + private String dateFormat = "yyyy-MM-dd'T'HH:mmZ"; + private PropertySpec[] propSpecs = new PropertySpec[] { + new PropertySpec("cwmsOfficeID", "s", ""), + new PropertySpec("delimiter", "s", ""), + new PropertySpec("justify", "b", ""), + new PropertySpec("dateFormat", "s", "") + }; + private SimpleDateFormat sdf; + + protected void initFormatter( + String type, + TimeZone timeZone, + PresentationGroup presentationGroup, + Properties properties) throws OutputFormatterException { + + String s; + if ((s = PropertiesUtil.getIgnoreCase(properties, "delimiter")) != null) { + delimiter = s; + } + if ((s = PropertiesUtil.getIgnoreCase(properties, "dateFormat")) != null) { + dateFormat = s; + } + sdf = new SimpleDateFormat(dateFormat); + sdf.setTimeZone(timeZone); + } + + public void shutdown() { + } + + public void formatMessage(DecodedMessage decodedMessage, DataConsumer consumer) + throws DataConsumerException, OutputFormatterException { + + consumer.startMessage(decodedMessage); + RawMessage raw = decodedMessage.getRawMessage(); + + Platform platform; + try { + platform = raw.getPlatform(); + } catch (UnknownPlatformException e) { + throw new OutputFormatterException(e.toString()); + } + + String platformName = platform.getDisplayName(); + String platformFileId = platform.getProperty("fileId"); + + Iterator it = decodedMessage.getAllTimeSeries(); + while (it.hasNext()) { + TimeSeries ts = it.next(); + Sensor sensor = ts.getSensor(); + + if (sensor == null) { + log.warn("sensor_null platform={} timeseries={}", platformName, ts.getDisplayName()); + continue; + } + if (ts.size() == 0) { + log.warn("timeseries_empty platform={} timeseries={}", platformName, ts.getDisplayName()); + continue; + } + + String sensorNameNumber = sensor.getName() + "." + sensor.getNumber(); + processDataOutput(consumer, ts, platformFileId, sensorNameNumber); + + log.info( + "measurements_written platform={} timeseries={} count={}", + platformName, + ts.getDisplayName(), + ts.size()); + } + + consumer.endMessage(); + } + + public void processDataOutput( + DataConsumer consumer, + TimeSeries ts, + String platformFileId, + String sensorNameNumber) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < ts.size(); i++) { + TimedVariable tv = ts.sampleAt(i); + if ((tv.getFlags() & 0x60000000) != 0) { + continue; + } + sb.setLength(0); + sb.append(platformFileId).append(delimiter) + .append(sensorNameNumber).append(delimiter) + .append(sdf.format(tv.getTime())).append(delimiter) + .append(ts.formattedSampleAt(i)); + consumer.println(sb.toString()); + } + } + + public PropertySpec[] getSupportedProps() { + return propSpecs; + } +} diff --git a/sqlc.generate.yaml b/sqlc.generate.yaml index e41574a3..0c7ec2ce 100644 --- a/sqlc.generate.yaml +++ b/sqlc.generate.yaml @@ -41,6 +41,10 @@ sql: type: uuid.UUID pointer: true + # xml + - db_type: xml + go_type: string + # timestamptz - db_type: timestamptz go_type: time.Time @@ -149,6 +153,12 @@ sql: type: InstrumentIDName slice: true + # v_goes_telemetry + - column: v_goes_telemetry_source.files + go_type: + type: VGoesTelemetrySourceFiles + slice: true + # v_incl_measurement - column: v_incl_measurement.measurements go_type: