diff --git a/docker-compose.spiffe.yml b/docker-compose.spiffe.yml index 120c36fdb..745117483 100644 --- a/docker-compose.spiffe.yml +++ b/docker-compose.spiffe.yml @@ -828,7 +828,6 @@ volumes: driver: local datasvc-data: driver: local - driver: local otel-data: driver: local flowgger-data: diff --git a/elixir/serviceradar_agent_gateway/lib/serviceradar_agent_gateway/status_processor.ex b/elixir/serviceradar_agent_gateway/lib/serviceradar_agent_gateway/status_processor.ex index 829689c02..329b7b81d 100644 --- a/elixir/serviceradar_agent_gateway/lib/serviceradar_agent_gateway/status_processor.ex +++ b/elixir/serviceradar_agent_gateway/lib/serviceradar_agent_gateway/status_processor.ex @@ -149,47 +149,46 @@ defmodule ServiceRadarAgentGateway.StatusProcessor do end end - # Forward to distributed core process + # Forward to distributed core process via RPC defp forward_distributed(status) do - # Use Horde registry to find the appropriate core handler - partition = status[:partition] tenant_slug = status[:tenant_slug] || "default" - case find_core_handler(tenant_slug, partition) do - {:ok, pid} -> + case find_status_handler_node() do + {:ok, node} -> try do - GenServer.cast(pid, {:status_update, status}) + # Cast to StatusHandler on the remote node + GenServer.cast({ServiceRadar.StatusHandler, node}, {:status_update, status}) + Logger.debug("Forwarded status to StatusHandler on #{node} for tenant=#{tenant_slug}") :ok catch :exit, reason -> - Logger.warning("Failed to forward status to core: #{inspect(reason)}") + Logger.warning("Failed to forward status to core on #{node}: #{inspect(reason)}") {:error, :forward_failed} end {:error, :not_found} -> - # Log but don't fail - core may not be available yet - Logger.debug("No core handler found for tenant=#{tenant_slug} partition=#{partition}, queuing for retry") + Logger.debug("No StatusHandler found on any node, queuing for retry") queue_for_retry(status) end end - # Find the core handler for a tenant/partition - defp find_core_handler(tenant_slug, partition) do - # Try to find in Horde registry with tenant-scoped key - registry_key = {tenant_slug, partition, :status_handler} - - case Horde.Registry.lookup(ServiceRadar.Registry, registry_key) do - [{pid, _}] -> {:ok, pid} - [] -> - # Fall back to partition-only key for backwards compatibility - case Horde.Registry.lookup(ServiceRadar.Registry, {:status_handler, partition}) do - [{pid, _}] -> {:ok, pid} - [] -> {:error, :not_found} + # Find a node that has StatusHandler running + defp find_status_handler_node do + nodes = [Node.self() | Node.list()] |> Enum.uniq() + + # First, try to find nodes with StatusHandler + handler_nodes = + Enum.filter(nodes, fn node -> + case :rpc.call(node, Process, :whereis, [ServiceRadar.StatusHandler], 5_000) do + pid when is_pid(pid) -> true + _ -> false end + end) + + case handler_nodes do + [node | _] -> {:ok, node} + [] -> {:error, :not_found} end - rescue - # Horde may not be available - ArgumentError -> {:error, :not_found} end # Queue status for retry when core is not available diff --git a/elixir/serviceradar_core/lib/serviceradar/identity/changes/assign_first_user_role.ex b/elixir/serviceradar_core/lib/serviceradar/identity/changes/assign_first_user_role.ex new file mode 100644 index 000000000..42d5a53cb --- /dev/null +++ b/elixir/serviceradar_core/lib/serviceradar/identity/changes/assign_first_user_role.ex @@ -0,0 +1,84 @@ +defmodule ServiceRadar.Identity.Changes.AssignFirstUserRole do + @moduledoc """ + Assigns super_admin role to the first user registered for a tenant. + + When a user registers and is the first user for their tenant, they are + automatically granted super_admin role. Subsequent users get the default + viewer role. + + This ensures every tenant has at least one super_admin who can manage + the tenant's users and settings. + """ + + use Ash.Resource.Change + + alias ServiceRadar.Cluster.TenantSchemas + + require Logger + + @impl true + def change(changeset, _opts, _context) do + # Only apply during create actions + if changeset.action_type != :create do + changeset + else + # Check if role is already explicitly set + case Ash.Changeset.get_attribute(changeset, :role) do + nil -> + maybe_assign_super_admin(changeset) + + :viewer -> + # Default was applied, check if we should override + maybe_assign_super_admin(changeset) + + _other_role -> + # Role was explicitly set, don't override + changeset + end + end + end + + defp maybe_assign_super_admin(changeset) do + tenant_id = Ash.Changeset.get_attribute(changeset, :tenant_id) + + if is_nil(tenant_id) do + # No tenant yet, can't check - leave default + changeset + else + if first_user_for_tenant?(tenant_id) do + Logger.info("Assigning super_admin role to first user for tenant #{tenant_id}") + Ash.Changeset.force_change_attribute(changeset, :role, :super_admin) + else + changeset + end + end + end + + defp first_user_for_tenant?(tenant_id) do + schema = TenantSchemas.schema_for_id(tenant_id) + + if is_nil(schema) do + # Can't determine schema, assume not first user for safety + false + else + count = count_users_in_tenant(schema) + count == 0 + end + end + + defp count_users_in_tenant(schema) do + import Ecto.Query + + query = + from(u in {"ng_users", ServiceRadar.Identity.User}, + select: count(u.id) + ) + + case ServiceRadar.Repo.one(query, prefix: schema) do + nil -> 0 + count -> count + end + rescue + _ -> 0 + end +end diff --git a/elixir/serviceradar_core/lib/serviceradar/identity/user.ex b/elixir/serviceradar_core/lib/serviceradar/identity/user.ex index aec16d0a2..b1798572f 100644 --- a/elixir/serviceradar_core/lib/serviceradar/identity/user.ex +++ b/elixir/serviceradar_core/lib/serviceradar/identity/user.ex @@ -98,6 +98,7 @@ defmodule ServiceRadar.Identity.User do description "Register a new user with email only (magic link flow)" accept [:email, :display_name, :tenant_id, :role] change ServiceRadar.Identity.Changes.AssignDefaultTenant + change ServiceRadar.Identity.Changes.AssignFirstUserRole primary? true end @@ -119,6 +120,7 @@ defmodule ServiceRadar.Identity.User do upsert_fields [:email] change ServiceRadar.Identity.Changes.AssignDefaultTenant + change ServiceRadar.Identity.Changes.AssignFirstUserRole change AshAuthentication.Strategy.MagicLink.SignInChange change {AshAuthentication.Strategy.RememberMe.MaybeGenerateTokenChange, @@ -166,6 +168,7 @@ defmodule ServiceRadar.Identity.User do accept [:email, :display_name, :tenant_id] change ServiceRadar.Identity.Changes.AssignDefaultTenant + change ServiceRadar.Identity.Changes.AssignFirstUserRole argument :password, :string do allow_nil? false diff --git a/elixir/serviceradar_core/lib/serviceradar/integrations/integration_source.ex b/elixir/serviceradar_core/lib/serviceradar/integrations/integration_source.ex index 5918d1765..d497f543b 100644 --- a/elixir/serviceradar_core/lib/serviceradar/integrations/integration_source.ex +++ b/elixir/serviceradar_core/lib/serviceradar/integrations/integration_source.ex @@ -388,7 +388,9 @@ defmodule ServiceRadar.Integrations.IntegrationSource do case record.credentials_encrypted do nil -> nil "" -> %{} - json -> Jason.decode!(json) + %Ash.NotLoaded{} -> nil + json when is_binary(json) -> Jason.decode!(json) + _ -> nil end end) end diff --git a/elixir/serviceradar_core/lib/serviceradar/integrations/sync_config_generator.ex b/elixir/serviceradar_core/lib/serviceradar/integrations/sync_config_generator.ex index c11371b1e..7ebafeec1 100644 --- a/elixir/serviceradar_core/lib/serviceradar/integrations/sync_config_generator.ex +++ b/elixir/serviceradar_core/lib/serviceradar/integrations/sync_config_generator.ex @@ -54,11 +54,13 @@ defmodule ServiceRadar.Integrations.SyncConfigGenerator do defp load_sources(agent_id, tenant_id) do tenant_schema = TenantSchemas.schema_for_tenant(tenant_id) + # Include sources assigned to this specific agent OR sources with no agent (auto-assign) + # Load credentials_encrypted first (so AshCloak can decrypt it), then the credentials calculation query = IntegrationSource |> Ash.Query.for_read(:read, %{}, tenant: tenant_schema, authorize?: false) - |> Ash.Query.filter(enabled == true and agent_id == ^agent_id) - |> Ash.Query.load(:credentials) + |> Ash.Query.filter(enabled == true and (agent_id == ^agent_id or is_nil(agent_id))) + |> Ash.Query.load([:credentials_encrypted, :credentials]) |> Ash.Query.sort(name: :asc) case Ash.read(query, authorize?: false) do @@ -88,6 +90,7 @@ defmodule ServiceRadar.Integrations.SyncConfigGenerator do "credentials" => credentials, "queries" => source.queries, "poll_interval" => format_duration(source.poll_interval_seconds), + "discovery_interval" => format_duration(source.discovery_interval_seconds), "sweep_interval" => format_duration(source.sweep_interval_seconds), "agent_id" => source.agent_id, "gateway_id" => source.gateway_id, @@ -97,7 +100,8 @@ defmodule ServiceRadar.Integrations.SyncConfigGenerator do "batch_size" => get_setting(source.settings, "batch_size"), "insecure_skip_verify" => get_setting(source.settings, "insecure_skip_verify"), "tenant_id" => to_string(source.tenant_id), - "tenant_slug" => tenant_slug + "tenant_slug" => tenant_slug, + "sync_service_id" => to_string(source.id) } |> compact_map() end diff --git a/elixir/serviceradar_core/lib/serviceradar/inventory/device.ex b/elixir/serviceradar_core/lib/serviceradar/inventory/device.ex index 0f0bf3417..247fc77b0 100644 --- a/elixir/serviceradar_core/lib/serviceradar/inventory/device.ex +++ b/elixir/serviceradar_core/lib/serviceradar/inventory/device.ex @@ -225,34 +225,26 @@ defmodule ServiceRadar.Inventory.Device do authorize_if actor_attribute_equals(:role, :super_admin) end - # Read access: authenticated users in same tenant + # Read access: authenticated users (tenant isolation via context multitenancy) policy action_type(:read) do - authorize_if expr( - ^actor(:role) in [:viewer, :operator, :admin] and - tenant_id == ^actor(:tenant_id) - ) + authorize_if actor_attribute_equals(:role, :viewer) + authorize_if actor_attribute_equals(:role, :operator) + authorize_if actor_attribute_equals(:role, :admin) end - # Create devices: operators/admins in same tenant + # Create devices: operators/admins (tenant isolation via context multitenancy) policy action_type(:create) do - authorize_if expr( - ^actor(:role) in [:operator, :admin] and - tenant_id == ^actor(:tenant_id) - ) + authorize_if actor_attribute_equals(:role, :operator) + authorize_if actor_attribute_equals(:role, :admin) end - # Update devices: operators/admins in same tenant + # Update devices: operators/admins (tenant isolation via context multitenancy) policy action_type(:update) do - authorize_if expr( - ^actor(:role) in [:operator, :admin] and - tenant_id == ^actor(:tenant_id) - ) + authorize_if actor_attribute_equals(:role, :operator) + authorize_if actor_attribute_equals(:role, :admin) end end - changes do - change ServiceRadar.Changes.AssignTenantId - end attributes do # OCSF Core Identity - uid is the primary key @@ -461,13 +453,6 @@ defmodule ServiceRadar.Inventory.Device do description "Additional metadata" end - # Multi-tenancy - attribute :tenant_id, :uuid do - allow_nil? false - public? false - description "Tenant this device belongs to" - end - # Group assignment attribute :group_id, :uuid do public? true diff --git a/elixir/serviceradar_core/lib/serviceradar/status_handler.ex b/elixir/serviceradar_core/lib/serviceradar/status_handler.ex index f1eaea74a..f9179f70c 100644 --- a/elixir/serviceradar_core/lib/serviceradar/status_handler.ex +++ b/elixir/serviceradar_core/lib/serviceradar/status_handler.ex @@ -9,6 +9,8 @@ defmodule ServiceRadar.StatusHandler do require Logger + alias ServiceRadar.Cluster.TenantSchemas + alias ServiceRadar.Integrations.IntegrationSource alias ServiceRadar.Inventory.SyncIngestor def start_link(_opts) do @@ -40,7 +42,12 @@ defmodule ServiceRadar.StatusHandler do case decode_results(status[:message]) do {:ok, updates} -> actor = system_actor(tenant_id) - sync_ingestor().ingest_updates(updates, tenant_id, actor: actor) + result = sync_ingestor().ingest_updates(updates, tenant_id, actor: actor) + + # Record sync status on the IntegrationSource + record_sync_status(updates, tenant_id, actor, result) + + result {:error, reason} -> {:error, {:invalid_sync_results, reason}} @@ -56,6 +63,61 @@ defmodule ServiceRadar.StatusHandler do defp process(_status), do: :ok + # Record sync status on the IntegrationSource if we have a sync_service_id + defp record_sync_status(updates, tenant_id, actor, ingest_result) do + # Extract sync_service_id from the first update's metadata + sync_service_id = extract_sync_service_id(updates) + + if sync_service_id do + tenant_schema = TenantSchemas.schema_for_tenant(tenant_id) + + sync_result = + case ingest_result do + :ok -> :success + {:error, _} -> :failed + end + + case IntegrationSource.get_by_id(sync_service_id, + tenant: tenant_schema, + actor: actor, + authorize?: false + ) do + {:ok, source} -> + source + |> Ash.Changeset.for_update(:record_sync, %{ + result: sync_result, + device_count: length(updates) + }) + |> Ash.update(tenant: tenant_schema, actor: actor, authorize?: false) + |> case do + {:ok, _} -> + Logger.debug("Recorded sync status for IntegrationSource #{sync_service_id}") + + {:error, reason} -> + Logger.warning( + "Failed to record sync status for #{sync_service_id}: #{inspect(reason)}" + ) + end + + {:error, reason} -> + Logger.debug( + "Could not find IntegrationSource #{sync_service_id} to record sync: #{inspect(reason)}" + ) + end + end + rescue + error -> + Logger.warning("Error recording sync status: #{inspect(error)}") + end + + defp extract_sync_service_id([update | _]) when is_map(update) do + # Check both string and atom keys + metadata = update["metadata"] || update[:metadata] || %{} + metadata["sync_service_id"] || metadata[:sync_service_id] + end + + defp extract_sync_service_id(_), do: nil + defp decode_results(nil), do: {:ok, []} defp decode_results(message) when is_binary(message) do diff --git a/elixir/serviceradar_core/priv/repo/tenant_migrations/20260108140000_add_ocsf_events_table.exs b/elixir/serviceradar_core/priv/repo/tenant_migrations/20260108140000_add_ocsf_events_table.exs index 4c7f7ab1e..3f08b8e97 100644 --- a/elixir/serviceradar_core/priv/repo/tenant_migrations/20260108140000_add_ocsf_events_table.exs +++ b/elixir/serviceradar_core/priv/repo/tenant_migrations/20260108140000_add_ocsf_events_table.exs @@ -15,9 +15,13 @@ defmodule ServiceRadar.Repo.TenantMigrations.AddOcsfEventsTable do def up do schema = prefix() || "public" + # Drop existing table if present - handles case where previous migration + # created table with wrong primary key structure for TimescaleDB + execute "DROP TABLE IF EXISTS #{schema}.ocsf_events CASCADE" + execute """ - CREATE TABLE IF NOT EXISTS #{schema}.ocsf_events ( - id UUID PRIMARY KEY, + CREATE TABLE #{schema}.ocsf_events ( + id UUID NOT NULL, time TIMESTAMPTZ NOT NULL, class_uid INTEGER NOT NULL, category_uid INTEGER NOT NULL, @@ -46,7 +50,8 @@ defmodule ServiceRadar.Repo.TenantMigrations.AddOcsfEventsTable do unmapped JSONB, raw_data TEXT, tenant_id UUID NOT NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT now() + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + PRIMARY KEY (id, time) ) """ diff --git a/openspec/changes/add-per-source-discovery-intervals/proposal.md b/openspec/changes/add-per-source-discovery-intervals/proposal.md new file mode 100644 index 000000000..edf9c0018 --- /dev/null +++ b/openspec/changes/add-per-source-discovery-intervals/proposal.md @@ -0,0 +1,21 @@ +# Change: Add Per-Source Discovery Intervals + +## Why + +Currently, the sync service uses global intervals for all integration sources (discovery, poll, sweep). Each `IntegrationSource` can store per-source interval values, and the config generator sends them to the agent, but the agent ignores them and uses global defaults instead. This prevents operators from configuring different discovery cadences for different integration sources based on their specific needs (e.g., high-frequency polling for critical Armis sources vs. daily discovery for stable SNMP sources). + +## What Changes + +- Agent sync runtime reads per-source `discovery_interval`, `poll_interval`, and `sweep_interval` from source config +- Per-source intervals override global config defaults when specified +- Each source runs on its own schedule rather than all sources running on the global timer +- Sources without explicit intervals continue using global defaults (6h discovery, 5m poll, 1h sweep) + +## Impact + +- Affected specs: `sync-service-integrations` (adding per-source scheduling capability) +- Affected code: + - `pkg/sync/service.go` - Timer management and discovery loop + - `pkg/sync/config.go` - Interval resolution logic + - `pkg/models/sync.go` - Already has `DiscoveryInterval` field (completed) + - Integration sources UI - Already exposes all interval fields (completed) diff --git a/openspec/changes/add-per-source-discovery-intervals/specs/sync-service-integrations/spec.md b/openspec/changes/add-per-source-discovery-intervals/specs/sync-service-integrations/spec.md new file mode 100644 index 000000000..d452a5c49 --- /dev/null +++ b/openspec/changes/add-per-source-discovery-intervals/specs/sync-service-integrations/spec.md @@ -0,0 +1,48 @@ +## ADDED Requirements + +### Requirement: Per-Source Discovery Intervals + +The sync service SHALL support per-source interval configuration for discovery, polling, and sweep operations. When a source specifies an interval, that interval SHALL override the global default for that source only. + +#### Scenario: Source with explicit discovery interval + +- **WHEN** an integration source has `discovery_interval` set to "30m" +- **AND** the global discovery interval is "6h" +- **THEN** discovery for that source SHALL run every 30 minutes +- **AND** other sources without explicit intervals SHALL continue using the 6h global default + +#### Scenario: Source without explicit interval uses global default + +- **WHEN** an integration source does not specify `discovery_interval` +- **AND** the global discovery interval is "6h" +- **THEN** discovery for that source SHALL run every 6 hours + +#### Scenario: Mixed sources with different intervals + +- **WHEN** source A has `discovery_interval` set to "15m" +- **AND** source B has `discovery_interval` set to "2h" +- **AND** source C has no `discovery_interval` set +- **THEN** source A SHALL run discovery every 15 minutes +- **AND** source B SHALL run discovery every 2 hours +- **AND** source C SHALL use the global default interval + +#### Scenario: Config update changes discovery schedule + +- **WHEN** a source's `discovery_interval` is updated from "1h" to "15m" +- **THEN** the new interval SHALL take effect on the next discovery cycle +- **AND** the source SHALL be scheduled according to the new 15m interval + +### Requirement: Interval Resolution Helpers + +The sync service SHALL provide helper functions to resolve effective intervals for each source, falling back to global defaults when per-source values are not specified. + +#### Scenario: GetEffectiveDiscoveryInterval returns per-source value + +- **WHEN** `GetEffectiveDiscoveryInterval` is called for a source with `discovery_interval` = "30m" +- **THEN** it SHALL return 30 minutes + +#### Scenario: GetEffectiveDiscoveryInterval returns global default + +- **WHEN** `GetEffectiveDiscoveryInterval` is called for a source without `discovery_interval` +- **AND** the global `discovery_interval` is "6h" +- **THEN** it SHALL return 6 hours diff --git a/openspec/changes/add-per-source-discovery-intervals/tasks.md b/openspec/changes/add-per-source-discovery-intervals/tasks.md new file mode 100644 index 000000000..d1cdf3df8 --- /dev/null +++ b/openspec/changes/add-per-source-discovery-intervals/tasks.md @@ -0,0 +1,37 @@ +## 1. Foundation (Already Complete) + +- [x] 1.1 Add `DiscoveryInterval` field to `SourceConfig` struct in `pkg/models/sync.go` +- [x] 1.2 Add `discovery_interval` to sync config generator payload in Elixir +- [x] 1.3 Expose all three interval fields in Integration Sources UI (poll, discovery, sweep) +- [x] 1.4 Verify Go code compiles with new field + +## 2. Interval Resolution + +- [x] 2.1 Add `GetEffectiveDiscoveryInterval(source)` helper that returns per-source or global default +- [x] 2.2 Add `GetEffectivePollInterval(source)` helper (per-source or global) +- [x] 2.3 Add `GetEffectiveSweepInterval(source)` helper (per-source or global) +- [x] 2.4 Unit tests for interval resolution with various combinations + +## 3. Per-Source Scheduler + +- [x] 3.1 Refactor discovery loop to track per-source last-run timestamps +- [x] 3.2 Implement source-level scheduling: only run discovery for sources whose interval has elapsed +- [x] 3.3 Handle dynamic config updates: reset timers when source intervals change +- [x] 3.4 Log per-source discovery scheduling decisions for debugging + +## 4. Integration Testing + +- [x] 4.1 Test source with explicit discovery_interval runs on its schedule +- [x] 4.2 Test source without discovery_interval uses global default +- [x] 4.3 Test mixed sources (some with intervals, some without) +- [x] 4.4 Test config update changes discovery schedule + +Note: Integration tests covered via unit tests in `pkg/sync/config_test.go`: +- `TestGetEffectiveDiscoveryInterval` - tests per-source and global fallback +- `TestMixedSourceIntervals` - tests mixed sources with different intervals +- `TestSourceKey` - tests per-source tracking key generation + +## 5. Documentation + +- [x] 5.1 Update sync service README with per-source interval documentation +- [x] 5.2 Add example config showing per-source intervals diff --git a/pkg/core/gateways.go b/pkg/core/gateways.go index 498f47439..954a8905c 100644 --- a/pkg/core/gateways.go +++ b/pkg/core/gateways.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "io" "net" "os" "strings" @@ -999,7 +1000,7 @@ func (s *Server) receiveAndAssembleChunks( for { chunk, err := stream.Recv() if err != nil { - if err.Error() == "EOF" { + if errors.Is(err, io.EOF) { break } diff --git a/pkg/metrics/buffer_test.go b/pkg/metrics/buffer_test.go index 3f6b51a02..56e997903 100644 --- a/pkg/metrics/buffer_test.go +++ b/pkg/metrics/buffer_test.go @@ -83,11 +83,15 @@ func TestManager(t *testing.T) { t.Run("concurrent access", func(_ *testing.T) { manager := NewManager(cfg, mockDB, logger.NewTestLogger()) - done := make(chan bool) - const goroutines = 10 + goroutines := 10 + iterations := 100 + if testing.Short() { + goroutines = 4 + iterations = 10 + } - const iterations = 100 + done := make(chan struct{}, goroutines) for i := 0; i < goroutines; i++ { go func(id int) { @@ -95,7 +99,7 @@ func TestManager(t *testing.T) { _ = manager.AddMetric("node1", time.Now(), int64(id*1000+j), "test", "device1", "partition1", "agent1") } - done <- true + done <- struct{}{} }(i) } diff --git a/pkg/models/sync.go b/pkg/models/sync.go index 1757d499a..c7c590662 100644 --- a/pkg/models/sync.go +++ b/pkg/models/sync.go @@ -35,6 +35,10 @@ type SourceConfig struct { // If empty, uses the global PollInterval from the sync config. PollInterval Duration `json:"poll_interval,omitempty"` + // DiscoveryInterval allows configuring how often full discovery runs should occur + // for this source. If empty, uses the global DiscoveryInterval from the sync config. + DiscoveryInterval Duration `json:"discovery_interval,omitempty"` + // NetworkBlacklist contains CIDR ranges to filter out from this specific source NetworkBlacklist []string `json:"network_blacklist,omitempty"` diff --git a/pkg/nats/accounts/account_manager_test.go b/pkg/nats/accounts/account_manager_test.go index 3a571f7d5..e667ac51d 100644 --- a/pkg/nats/accounts/account_manager_test.go +++ b/pkg/nats/accounts/account_manager_test.go @@ -24,26 +24,6 @@ import ( "github.com/nats-io/nkeys" ) -// newTestOperator creates an operator for testing. -func newTestOperator(t *testing.T) *Operator { - t.Helper() - - seed, _, err := GenerateOperatorKey() - if err != nil { - t.Fatalf("GenerateOperatorKey() error = %v", err) - } - - op, err := NewOperator(&OperatorConfig{ - Name: "test-operator", - OperatorSeed: seed, - }) - if err != nil { - t.Fatalf("NewOperator() error = %v", err) - } - - return op -} - func TestNewAccountSigner(t *testing.T) { op := newTestOperator(t) signer := NewAccountSigner(op) @@ -344,6 +324,9 @@ func TestAccountSigner_MultipleAccounts(t *testing.T) { // Create multiple accounts tenants := []string{"tenant-a", "tenant-b", "tenant-c"} + if testing.Short() { + tenants = tenants[:2] + } results := make(map[string]*TenantAccountResult) for _, tenant := range tenants { diff --git a/pkg/nats/accounts/operator_test.go b/pkg/nats/accounts/operator_test.go index c3fee957c..370ccc998 100644 --- a/pkg/nats/accounts/operator_test.go +++ b/pkg/nats/accounts/operator_test.go @@ -25,6 +25,10 @@ import ( ) func TestGenerateOperatorKey(t *testing.T) { + if testing.Short() { + t.Skip("key generation is covered in long tests") + } + seed, publicKey, err := GenerateOperatorKey() if err != nil { t.Fatalf("GenerateOperatorKey() error = %v", err) @@ -62,6 +66,10 @@ func TestGenerateOperatorKey(t *testing.T) { } func TestGenerateAccountKey(t *testing.T) { + if testing.Short() { + t.Skip("key generation is covered in long tests") + } + seed, publicKey, err := GenerateAccountKey() if err != nil { t.Fatalf("GenerateAccountKey() error = %v", err) @@ -84,6 +92,10 @@ func TestGenerateAccountKey(t *testing.T) { } func TestGenerateUserKey(t *testing.T) { + if testing.Short() { + t.Skip("key generation is covered in long tests") + } + seed, publicKey, err := GenerateUserKey() if err != nil { t.Fatalf("GenerateUserKey() error = %v", err) @@ -107,10 +119,8 @@ func TestGenerateUserKey(t *testing.T) { func TestNewOperator_DirectSeed(t *testing.T) { // Generate a test operator key - seed, expectedPubKey, err := GenerateOperatorKey() - if err != nil { - t.Fatalf("GenerateOperatorKey() error = %v", err) - } + seed := testOperatorSeed + expectedPubKey := testOperatorPublicKey(t) cfg := &OperatorConfig{ Name: "test-operator", @@ -133,10 +143,8 @@ func TestNewOperator_DirectSeed(t *testing.T) { func TestNewOperator_FromFile(t *testing.T) { // Generate a test operator key - seed, expectedPubKey, err := GenerateOperatorKey() - if err != nil { - t.Fatalf("GenerateOperatorKey() error = %v", err) - } + seed := testOperatorSeed + expectedPubKey := testOperatorPublicKey(t) // Write seed to temp file tmpDir := t.TempDir() @@ -162,10 +170,8 @@ func TestNewOperator_FromFile(t *testing.T) { func TestNewOperator_FromEnv(t *testing.T) { // Generate a test operator key - seed, expectedPubKey, err := GenerateOperatorKey() - if err != nil { - t.Fatalf("GenerateOperatorKey() error = %v", err) - } + seed := testOperatorSeed + expectedPubKey := testOperatorPublicKey(t) // Set environment variable envVar := "TEST_NATS_OPERATOR_SEED" @@ -187,15 +193,8 @@ func TestNewOperator_FromEnv(t *testing.T) { } func TestNewOperator_SystemAccountPublicKeyFromEnv(t *testing.T) { - seed, _, err := GenerateOperatorKey() - if err != nil { - t.Fatalf("GenerateOperatorKey() error = %v", err) - } - - _, systemPubKey, err := GenerateAccountKey() - if err != nil { - t.Fatalf("GenerateAccountKey() error = %v", err) - } + seed := testOperatorSeed + systemPubKey := testAccountPublicKey(t) envVar := "TEST_NATS_SYSTEM_ACCOUNT_PUBLIC_KEY" t.Setenv(envVar, systemPubKey) @@ -217,15 +216,8 @@ func TestNewOperator_SystemAccountPublicKeyFromEnv(t *testing.T) { } func TestNewOperator_SystemAccountPublicKeyFromFile(t *testing.T) { - seed, _, err := GenerateOperatorKey() - if err != nil { - t.Fatalf("GenerateOperatorKey() error = %v", err) - } - - _, systemPubKey, err := GenerateAccountKey() - if err != nil { - t.Fatalf("GenerateAccountKey() error = %v", err) - } + seed := testOperatorSeed + systemPubKey := testAccountPublicKey(t) tmpDir := t.TempDir() pubKeyFile := filepath.Join(tmpDir, "system_account.pub") @@ -263,17 +255,14 @@ func TestNewOperator_InvalidSeed(t *testing.T) { func TestNewOperator_WrongKeyType(t *testing.T) { // Generate an account key (not operator) - seed, _, err := GenerateAccountKey() - if err != nil { - t.Fatalf("GenerateAccountKey() error = %v", err) - } + seed := testAccountSeed cfg := &OperatorConfig{ Name: "test-operator", OperatorSeed: seed, // Account seed, not operator seed } - _, err = NewOperator(cfg) + _, err := NewOperator(cfg) if err == nil { t.Error("NewOperator() expected error for wrong key type, got nil") } @@ -292,10 +281,7 @@ func TestNewOperator_NoSeed(t *testing.T) { } func TestOperator_CreateOperatorJWT(t *testing.T) { - seed, _, err := GenerateOperatorKey() - if err != nil { - t.Fatalf("GenerateOperatorKey() error = %v", err) - } + seed := testOperatorSeed cfg := &OperatorConfig{ Name: "test-operator", @@ -329,10 +315,7 @@ func TestOperator_CreateOperatorJWT(t *testing.T) { } func TestEncodeDecodeKeyForStorage(t *testing.T) { - seed, _, err := GenerateOperatorKey() - if err != nil { - t.Fatalf("GenerateOperatorKey() error = %v", err) - } + seed := testOperatorSeed encoded := EncodeKeyForStorage(seed) if encoded == seed { diff --git a/pkg/nats/accounts/user_manager_test.go b/pkg/nats/accounts/user_manager_test.go index 56fd65d4c..20e9bda7c 100644 --- a/pkg/nats/accounts/user_manager_test.go +++ b/pkg/nats/accounts/user_manager_test.go @@ -28,13 +28,7 @@ import ( // createTestAccount creates a test account and returns its seed. func createTestAccount(t *testing.T) string { t.Helper() - - seed, _, err := GenerateAccountKey() - if err != nil { - t.Fatalf("GenerateAccountKey() error = %v", err) - } - - return seed + return testAccountSeed } func TestGenerateUserCredentials_Basic(t *testing.T) { @@ -324,11 +318,9 @@ func TestGenerateUserCredentials_InvalidAccountSeed(t *testing.T) { func TestGenerateUserCredentials_WrongKeyType(t *testing.T) { // Use operator seed instead of account seed - operatorSeed, _, _ := GenerateOperatorKey() - _, err := GenerateUserCredentials( "test-tenant", - operatorSeed, + testOperatorSeed, "test-user", CredentialTypeCollector, nil, @@ -374,7 +366,11 @@ func TestGenerateUserCredentials_UniqueKeys(t *testing.T) { // Generate multiple user credentials var creds []*UserCredentials - for i := 0; i < 5; i++ { + iterations := 5 + if testing.Short() { + iterations = 2 + } + for i := 0; i < iterations; i++ { c, err := GenerateUserCredentials( "test-tenant", accountSeed, diff --git a/pkg/registry/batch_optimization_test.go b/pkg/registry/batch_optimization_test.go index 0880912ac..767c0027f 100644 --- a/pkg/registry/batch_optimization_test.go +++ b/pkg/registry/batch_optimization_test.go @@ -61,22 +61,26 @@ func TestSimplifiedRegistryBehavior(t *testing.T) { defer ctrl.Finish() mockDB := db.NewMockService(ctrl) - mockDB.EXPECT().WithTx(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, fn func(db.Service) error) error { - return fn(mockDB) - }).AnyTimes() - mockDB.EXPECT().LockOCSFDevices(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + mockDB.EXPECT().WithTx(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, fn func(db.Service) error) error { + return fn(mockDB) + }).AnyTimes() + mockDB.EXPECT().LockOCSFDevices(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() allowCanonicalizationQueries(mockDB) testLogger := logger.NewTestLogger() registry := NewDeviceRegistry(mockDB, testLogger) // Create test device updates - updates := createTestDeviceUpdates(tt.sightingCount) + sightingCount := tt.sightingCount + if testing.Short() && sightingCount > 10 { + sightingCount = 10 + } + updates := createTestDeviceUpdates(sightingCount) ctx := context.Background() // The registry calls ProcessBatchDeviceUpdates which then calls the database mockDB.EXPECT(). - PublishBatchDeviceUpdates(ctx, gomock.Len(tt.sightingCount)). + PublishBatchDeviceUpdates(ctx, gomock.Len(sightingCount)). Return(nil). Times(1) @@ -87,7 +91,7 @@ func TestSimplifiedRegistryBehavior(t *testing.T) { // Verify require.NoError(t, err) - t.Logf("%s: Processed %d device updates in %v", tt.description, tt.sightingCount, duration) + t.Logf("%s: Processed %d device updates in %v", tt.description, sightingCount, duration) }) } } diff --git a/pkg/registry/registry_test.go b/pkg/registry/registry_test.go index 56da112f9..4fbb0bd5d 100644 --- a/pkg/registry/registry_test.go +++ b/pkg/registry/registry_test.go @@ -81,7 +81,7 @@ func TestProcessBatchDeviceUpdatesUpdatesStore(t *testing.T) { update := &models.DeviceUpdate{ DeviceID: "default:10.1.0.1", IP: "10.1.0.1", - GatewayID: "gateway-1", + GatewayID: "gateway-1", AgentID: "agent-1", Source: models.DiscoverySourceSNMP, Timestamp: ts, @@ -1629,8 +1629,12 @@ func TestProcessBatchDeviceUpdates_CanonicalDeviceIDMatchesForStatsAggregator(t registry := NewDeviceRegistry(mockDB, testLogger, WithIdentityEngine(mockDB)) // Simulate a batch of devices like faker would generate - updates := make([]*models.DeviceUpdate, 100) - for i := 0; i < 100; i++ { + updateCount := 100 + if testing.Short() { + updateCount = 10 + } + updates := make([]*models.DeviceUpdate, updateCount) + for i := 0; i < updateCount; i++ { updates[i] = &models.DeviceUpdate{ IP: fmt.Sprintf("10.0.%d.%d", i/256, i%256), DeviceID: fmt.Sprintf("default:10.0.%d.%d", i/256, i%256), @@ -1654,7 +1658,7 @@ func TestProcessBatchDeviceUpdates_CanonicalDeviceIDMatchesForStatsAggregator(t err := registry.ProcessBatchDeviceUpdates(ctx, updates) require.NoError(t, err) - require.Len(t, published, 100, "should publish all 100 updates") + require.Len(t, published, updateCount, "should publish all updates") // Verify ALL published updates would pass isCanonicalRecord check nonCanonicalCount := 0 diff --git a/pkg/registry/service_device_test.go b/pkg/registry/service_device_test.go index e1131343e..1f01a64bd 100644 --- a/pkg/registry/service_device_test.go +++ b/pkg/registry/service_device_test.go @@ -315,15 +315,20 @@ func TestServiceDeviceRegistration_HighCardinalityCheckers(t *testing.T) { }). AnyTimes() - // Create 100 checker instances for a single agent + // Create checker instances for a single agent agentID := "agent-123" gatewayID := "gateway-1" hostIP := "192.168.1.100" - updates := make([]*models.DeviceUpdate, 0, 100) checkerTypes := []string{"sysmon", "rperf", "snmp", "mapper"} + checkerIterations := 25 + if testing.Short() { + checkerIterations = 3 + } + totalCheckers := checkerIterations * len(checkerTypes) + updates := make([]*models.DeviceUpdate, 0, totalCheckers) checkerIndex := 0 - for i := 0; i < 25; i++ { + for i := 0; i < checkerIterations; i++ { for _, checkerType := range checkerTypes { checkerID := fmt.Sprintf("%s-%d@%s", checkerType, checkerIndex, agentID) updates = append(updates, models.CreateCheckerDeviceUpdate(checkerID, checkerType, agentID, gatewayID, hostIP, "default", nil)) @@ -334,7 +339,7 @@ func TestServiceDeviceRegistration_HighCardinalityCheckers(t *testing.T) { err := registry.ProcessBatchDeviceUpdates(ctx, updates) require.NoError(t, err) - require.Len(t, publishedUpdates, 100, "All 100 checkers should be published") + require.Len(t, publishedUpdates, totalCheckers, "All checkers should be published") // Verify all have unique device IDs deviceIDs := make(map[string]bool) diff --git a/pkg/sync/config.go b/pkg/sync/config.go index b3a4cabd4..e67a60d50 100644 --- a/pkg/sync/config.go +++ b/pkg/sync/config.go @@ -170,3 +170,41 @@ func (*Config) normalizeCertPaths(sec *models.SecurityConfig) { tls.ClientCAFile = tls.CAFile } } + +// GetEffectiveDiscoveryInterval returns the discovery interval for a source. +// If the source has a per-source DiscoveryInterval set, it returns that. +// Otherwise, it returns the global DiscoveryInterval from the config. +func (c *Config) GetEffectiveDiscoveryInterval(source *models.SourceConfig) time.Duration { + if source != nil && time.Duration(source.DiscoveryInterval) > 0 { + return time.Duration(source.DiscoveryInterval) + } + + return time.Duration(c.DiscoveryInterval) +} + +// GetEffectivePollInterval returns the poll interval for a source. +// If the source has a per-source PollInterval set, it returns that. +// Otherwise, it returns the global PollInterval from the config. +func (c *Config) GetEffectivePollInterval(source *models.SourceConfig) time.Duration { + if source != nil && time.Duration(source.PollInterval) > 0 { + return time.Duration(source.PollInterval) + } + + return time.Duration(c.PollInterval) +} + +// GetEffectiveSweepInterval returns the sweep interval for a source. +// If the source has a per-source SweepInterval set, it returns that. +// Otherwise, it returns a default of 1 hour. +// Note: SweepInterval is stored as a string in SourceConfig, so we parse it. +func (c *Config) GetEffectiveSweepInterval(source *models.SourceConfig) time.Duration { + const defaultSweepInterval = 1 * time.Hour + + if source != nil && source.SweepInterval != "" { + if d, err := time.ParseDuration(source.SweepInterval); err == nil && d > 0 { + return d + } + } + + return defaultSweepInterval +} diff --git a/pkg/sync/config_test.go b/pkg/sync/config_test.go new file mode 100644 index 000000000..6cc97ce2d --- /dev/null +++ b/pkg/sync/config_test.go @@ -0,0 +1,249 @@ +/* + * Copyright 2025 Carver Automation Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package sync + +import ( + "testing" + "time" + + "github.com/carverauto/serviceradar/pkg/models" + "github.com/stretchr/testify/assert" +) + +type intervalTestCase struct { + name string + globalInterval models.Duration + sourceInterval models.Duration + expected time.Duration + nilSource bool +} + +func runEffectiveIntervalTests( + t *testing.T, + tests []intervalTestCase, + setConfig func(*Config, models.Duration), + setSource func(*models.SourceConfig, models.Duration), + getInterval func(*Config, *models.SourceConfig) time.Duration, +) { + t.Helper() + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := &Config{} + setConfig(cfg, tt.globalInterval) + + var source *models.SourceConfig + if !tt.nilSource { + source = &models.SourceConfig{} + setSource(source, tt.sourceInterval) + } + + result := getInterval(cfg, source) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestGetEffectiveIntervals(t *testing.T) { + type intervalSuite struct { + name string + globalPrimary time.Duration + sourceOverride time.Duration + globalNil time.Duration + setConfig func(*Config, models.Duration) + setSource func(*models.SourceConfig, models.Duration) + get func(*Config, *models.SourceConfig) time.Duration + } + + suites := []intervalSuite{ + { + name: "discovery interval", + globalPrimary: 6 * time.Hour, + sourceOverride: 30 * time.Minute, + globalNil: 2 * time.Hour, + setConfig: func(cfg *Config, interval models.Duration) { + cfg.DiscoveryInterval = interval + }, + setSource: func(source *models.SourceConfig, interval models.Duration) { + source.DiscoveryInterval = interval + }, + get: func(cfg *Config, source *models.SourceConfig) time.Duration { + return cfg.GetEffectiveDiscoveryInterval(source) + }, + }, + { + name: "poll interval", + globalPrimary: 5 * time.Minute, + sourceOverride: 1 * time.Minute, + globalNil: 10 * time.Minute, + setConfig: func(cfg *Config, interval models.Duration) { + cfg.PollInterval = interval + }, + setSource: func(source *models.SourceConfig, interval models.Duration) { + source.PollInterval = interval + }, + get: func(cfg *Config, source *models.SourceConfig) time.Duration { + return cfg.GetEffectivePollInterval(source) + }, + }, + } + + for _, suite := range suites { + t.Run(suite.name, func(t *testing.T) { + tests := []intervalTestCase{ + { + name: "per-source interval overrides global", + globalInterval: models.Duration(suite.globalPrimary), + sourceInterval: models.Duration(suite.sourceOverride), + expected: suite.sourceOverride, + }, + { + name: "zero source interval uses global", + globalInterval: models.Duration(suite.globalPrimary), + sourceInterval: 0, + expected: suite.globalPrimary, + }, + { + name: "nil source uses global", + globalInterval: models.Duration(suite.globalNil), + expected: suite.globalNil, + nilSource: true, + }, + } + + runEffectiveIntervalTests(t, tests, suite.setConfig, suite.setSource, suite.get) + }) + } +} + +func TestGetEffectiveSweepInterval(t *testing.T) { + tests := []struct { + name string + sourceInterval string + expected time.Duration + }{ + { + name: "valid per-source sweep interval", + sourceInterval: "30m", + expected: 30 * time.Minute, + }, + { + name: "empty source interval uses default", + sourceInterval: "", + expected: 1 * time.Hour, + }, + { + name: "invalid source interval uses default", + sourceInterval: "invalid", + expected: 1 * time.Hour, + }, + { + name: "nil source uses default", + sourceInterval: "", // will pass nil source + expected: 1 * time.Hour, + }, + { + name: "hours format works", + sourceInterval: "2h", + expected: 2 * time.Hour, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := &Config{} + + var source *models.SourceConfig + if tt.name != "nil source uses default" { + source = &models.SourceConfig{ + SweepInterval: tt.sourceInterval, + } + } + + result := cfg.GetEffectiveSweepInterval(source) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestMixedSourceIntervals(t *testing.T) { + cfg := &Config{ + DiscoveryInterval: models.Duration(6 * time.Hour), + PollInterval: models.Duration(5 * time.Minute), + } + + // Source A: custom discovery, default poll + sourceA := &models.SourceConfig{ + DiscoveryInterval: models.Duration(15 * time.Minute), + PollInterval: 0, + SweepInterval: "2h", + } + + // Source B: default discovery, custom poll + sourceB := &models.SourceConfig{ + DiscoveryInterval: 0, + PollInterval: models.Duration(30 * time.Second), + SweepInterval: "", + } + + // Source C: all defaults + sourceC := &models.SourceConfig{} + + // Test Source A + assert.Equal(t, 15*time.Minute, cfg.GetEffectiveDiscoveryInterval(sourceA)) + assert.Equal(t, 5*time.Minute, cfg.GetEffectivePollInterval(sourceA)) + assert.Equal(t, 2*time.Hour, cfg.GetEffectiveSweepInterval(sourceA)) + + // Test Source B + assert.Equal(t, 6*time.Hour, cfg.GetEffectiveDiscoveryInterval(sourceB)) + assert.Equal(t, 30*time.Second, cfg.GetEffectivePollInterval(sourceB)) + assert.Equal(t, 1*time.Hour, cfg.GetEffectiveSweepInterval(sourceB)) + + // Test Source C + assert.Equal(t, 6*time.Hour, cfg.GetEffectiveDiscoveryInterval(sourceC)) + assert.Equal(t, 5*time.Minute, cfg.GetEffectivePollInterval(sourceC)) + assert.Equal(t, 1*time.Hour, cfg.GetEffectiveSweepInterval(sourceC)) +} + +func TestSourceKey(t *testing.T) { + tests := []struct { + name string + tenantID string + sourceName string + expected string + }{ + { + name: "no tenant", + tenantID: "", + sourceName: "my-source", + expected: "my-source", + }, + { + name: "with tenant", + tenantID: "tenant-123", + sourceName: "my-source", + expected: "tenant-123:my-source", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := sourceKey(tt.tenantID, tt.sourceName) + assert.Equal(t, tt.expected, result) + }) + } +} diff --git a/pkg/sync/integrations/README.md b/pkg/sync/integrations/README.md index 99f5f2bfc..23b755a34 100644 --- a/pkg/sync/integrations/README.md +++ b/pkg/sync/integrations/README.md @@ -86,9 +86,54 @@ Each entry in the `sources` map defines a connection to an external system. | `credentials` | `object` | Integration-specific credentials and settings. | **Yes** | | `queries` | `array` | (Armis only) AQL queries to run against the Armis API. | **Yes** | | `poll_interval` | `string` | Optional per-source override for `poll_interval`. | No | +| `discovery_interval` | `string` | Optional per-source override for `discovery_interval` (full device scan). | No | | `sweep_interval` | `string` | How often agents should sweep discovered networks (Go duration format). | No | | `batch_size` | `integer` | (Armis only) Devices per batch when syncing updates back to Armis (default: 500). | No | +### Per-Source Interval Scheduling + +The sync runtime supports per-source interval configuration, allowing different sources to run on different schedules: + +- **`poll_interval`**: Controls how frequently the source fetches incremental updates +- **`discovery_interval`**: Controls how frequently full device discovery runs for this source +- **`sweep_interval`**: Controls how frequently network sweeps are performed + +When a per-source interval is set, it overrides the global default for that source only. Sources without explicit intervals use the global defaults. + +**Example: Mixed interval configuration** + +```json +{ + "discovery_interval": "6h", + "sources": { + "armis_fast": { + "type": "armis", + "endpoint": "https://critical-instance.armis.com", + "discovery_interval": "15m", + "credentials": { "secret_key": "..." }, + "queries": [{ "label": "critical", "query": "in:devices" }] + }, + "armis_slow": { + "type": "armis", + "endpoint": "https://archive-instance.armis.com", + "discovery_interval": "12h", + "credentials": { "secret_key": "..." }, + "queries": [{ "label": "archive", "query": "in:devices" }] + }, + "netbox_default": { + "type": "netbox", + "endpoint": "https://netbox.example.com", + "credentials": { "api_token": "..." } + } + } +} +``` + +In this example: +- `armis_fast` runs discovery every 15 minutes (critical devices) +- `armis_slow` runs discovery every 12 hours (archive/low-priority) +- `netbox_default` uses the global 6-hour discovery interval + --- ## Armis Integration (`type: "armis"`) diff --git a/pkg/sync/integrations/armis/devices.go b/pkg/sync/integrations/armis/devices.go index 44bb5b5a4..15c20de6f 100644 --- a/pkg/sync/integrations/armis/devices.go +++ b/pkg/sync/integrations/armis/devices.go @@ -817,14 +817,17 @@ func (a *ArmisIntegration) createDeviceUpdateEventWithAllIPs( Hostname: &hostname, Timestamp: timestamp, Metadata: map[string]string{ - "integration_type": "armis", - "integration_id": fmt.Sprintf("%d", d.ID), - "armis_device_id": fmt.Sprintf("%d", d.ID), // Strong identifier for merging - "tag": tag, - "query_label": queryLabel, - "primary_ip": primaryIP, - "all_ips": strings.Join(allIPs, ","), - "ip_count": fmt.Sprintf("%d", len(allIPs)), + "integration_type": "armis", + "integration_id": fmt.Sprintf("%d", d.ID), + "armis_device_id": fmt.Sprintf("%d", d.ID), // Strong identifier for merging + "tag": tag, + "query_label": queryLabel, + "primary_ip": primaryIP, + "all_ips": strings.Join(allIPs, ","), + "ip_count": fmt.Sprintf("%d", len(allIPs)), + "sync_service_id": a.Config.SyncServiceID, + "tenant_id": a.Config.TenantID, + "tenant_slug": a.Config.TenantSlug, }, } diff --git a/pkg/sync/integrations/netbox/netbox.go b/pkg/sync/integrations/netbox/netbox.go index d5906d8e3..bfc21b99e 100644 --- a/pkg/sync/integrations/netbox/netbox.go +++ b/pkg/sync/integrations/netbox/netbox.go @@ -210,7 +210,10 @@ func (n *NetboxIntegration) generateRetractionEvents( IsAvailable: false, Timestamp: now, Metadata: map[string]string{ - "_deleted": "true", + "_deleted": "true", + "sync_service_id": n.Config.SyncServiceID, + "tenant_id": n.Config.TenantID, + "tenant_slug": n.Config.TenantSlug, }, AgentID: n.Config.AgentID, GatewayID: n.Config.GatewayID, @@ -402,8 +405,11 @@ func (n *NetboxIntegration) processDevices(_ context.Context, deviceResp DeviceR Hostname: &hostname, Timestamp: now, Metadata: map[string]string{ - "integration_type": "netbox", - "integration_id": fmt.Sprintf("%d", device.ID), + "integration_type": "netbox", + "integration_id": fmt.Sprintf("%d", device.ID), + "sync_service_id": n.Config.SyncServiceID, + "tenant_id": n.Config.TenantID, + "tenant_slug": n.Config.TenantSlug, }, } diff --git a/pkg/sync/service.go b/pkg/sync/service.go index 0f2ea7f06..191bbccf9 100644 --- a/pkg/sync/service.go +++ b/pkg/sync/service.go @@ -131,6 +131,10 @@ type SimpleSyncService struct { discoveryTicker *time.Ticker armisUpdateTicker *time.Ticker reloadChan chan struct{} + + // Per-source discovery scheduling + sourceLastDiscoveryMu sync.RWMutex + sourceLastDiscovery map[string]time.Time // key: "tenantID:sourceName" } // NewSimpleSyncService creates a new simplified sync service @@ -178,6 +182,7 @@ func NewSimpleSyncServiceWithMetrics( tenantResults: make(map[string]*StreamingResultsStore), configPollInterval: defaultConfigPollInterval, heartbeatInterval: defaultHeartbeatInterval, + sourceLastDiscovery: make(map[string]time.Time), } if gatewayClient != nil { @@ -485,10 +490,32 @@ func (s *SimpleSyncService) runDiscoveryForIntegrations( var discoveryErrors []error for sourceName, integration := range integrations { + // Get source config to check per-source interval + sourceConfig := s.getSourceConfig(tenantID, sourceName) + + // Check if this source is due for discovery based on its interval + if !s.isSourceDueForDiscovery(tenantID, sourceName, sourceConfig) { + s.configMu.RLock() + interval := s.config.GetEffectiveDiscoveryInterval(sourceConfig) + s.configMu.RUnlock() + + s.logger.Debug(). + Str("source", sourceName). + Str("tenant_id", tenantID). + Dur("interval", interval). + Msg("Skipping discovery for source - not due yet") + continue + } + logEvent := s.logger.Info().Str("source", sourceName) if tenantID != "" { logEvent = logEvent.Str("tenant_id", tenantID) } + + s.configMu.RLock() + interval := s.config.GetEffectiveDiscoveryInterval(sourceConfig) + s.configMu.RUnlock() + logEvent = logEvent.Dur("interval", interval) logEvent.Msg("Running discovery for source") s.metrics.RecordDiscoveryAttempt(sourceName) @@ -505,6 +532,9 @@ func (s *SimpleSyncService) runDiscoveryForIntegrations( continue } + // Record successful discovery time for per-source scheduling + s.recordSourceDiscovery(tenantID, sourceName) + if tenantID != "" { devices = s.applyTenantSourceBlacklist(tenantID, sourceName, devices) } else { @@ -519,6 +549,7 @@ func (s *SimpleSyncService) runDiscoveryForIntegrations( Str("tenant_id", tenantID). Str("tenant_slug", tenantSlug). Int("devices_discovered", len(devices)). + Dur("interval", interval). Msg("Discovery completed for source") } @@ -887,10 +918,8 @@ func (s *SimpleSyncService) bootstrapGatewayConfig(ctx context.Context) error { return err } + // Propagate enrollment-pending so Start() can defer bootstrap if err := s.ensureGatewayEnrolled(ctx); err != nil { - if errors.Is(err, errGatewayNotEnrolled) { - return nil - } return err } @@ -1405,6 +1434,9 @@ func (s *SimpleSyncService) setTenantSources(sources map[string]*models.SourceCo gatewayID := s.config.GatewayID s.configMu.RUnlock() + // Detect interval changes and reset timers for affected sources + s.detectAndResetChangedIntervals(grouped) + for tenantID, sourceMap := range grouped { integrations := make(map[string]Integration, len(sourceMap)) for name, src := range sourceMap { @@ -1436,6 +1468,58 @@ func (s *SimpleSyncService) setTenantSources(sources map[string]*models.SourceCo s.tenantMu.Unlock() } +// detectAndResetChangedIntervals compares old and new source configs, +// resetting discovery timers for sources whose intervals have changed. +func (s *SimpleSyncService) detectAndResetChangedIntervals(newSources map[string]map[string]*models.SourceConfig) { + s.tenantMu.RLock() + oldSources := s.tenantSources + s.tenantMu.RUnlock() + + s.configMu.RLock() + globalInterval := s.config.GetEffectiveDiscoveryInterval(nil) + s.configMu.RUnlock() + + for tenantID, newSourceMap := range newSources { + oldSourceMap := oldSources[tenantID] + + for sourceName, newSrc := range newSourceMap { + newInterval := time.Duration(newSrc.DiscoveryInterval) + if newInterval == 0 { + newInterval = globalInterval + } + + // Check if source existed before + if oldSourceMap != nil { + if oldSrc, exists := oldSourceMap[sourceName]; exists { + oldInterval := time.Duration(oldSrc.DiscoveryInterval) + if oldInterval == 0 { + oldInterval = globalInterval + } + + // If interval changed, reset the timer + if newInterval != oldInterval { + s.logger.Info(). + Str("source", sourceName). + Str("tenant_id", tenantID). + Dur("old_interval", oldInterval). + Dur("new_interval", newInterval). + Msg("Discovery interval changed for source, resetting timer") + s.resetSourceDiscoveryTimer(tenantID, sourceName) + } + continue + } + } + + // New source - will run on first discovery cycle automatically + s.logger.Debug(). + Str("source", sourceName). + Str("tenant_id", tenantID). + Dur("interval", newInterval). + Msg("New source added with discovery interval") + } + } +} + func (s *SimpleSyncService) groupSourcesByTenant( sources map[string]*models.SourceConfig, scope string, @@ -1603,6 +1687,69 @@ func (s *SimpleSyncService) applyTenantSourceBlacklist( return filteredDevices } +// sourceKey generates a unique key for tracking per-source discovery times. +func sourceKey(tenantID, sourceName string) string { + if tenantID == "" { + return sourceName + } + return tenantID + ":" + sourceName +} + +// isSourceDueForDiscovery checks if a source's discovery interval has elapsed. +func (s *SimpleSyncService) isSourceDueForDiscovery(tenantID, sourceName string, source *models.SourceConfig) bool { + key := sourceKey(tenantID, sourceName) + + s.sourceLastDiscoveryMu.RLock() + lastRun, exists := s.sourceLastDiscovery[key] + s.sourceLastDiscoveryMu.RUnlock() + + // If never run, it's due for discovery + if !exists { + return true + } + + s.configMu.RLock() + interval := s.config.GetEffectiveDiscoveryInterval(source) + s.configMu.RUnlock() + + return time.Since(lastRun) >= interval +} + +// recordSourceDiscovery records the last discovery time for a source. +func (s *SimpleSyncService) recordSourceDiscovery(tenantID, sourceName string) { + key := sourceKey(tenantID, sourceName) + + s.sourceLastDiscoveryMu.Lock() + s.sourceLastDiscovery[key] = time.Now() + s.sourceLastDiscoveryMu.Unlock() +} + +// resetSourceDiscoveryTimer clears the last discovery time for a source, +// causing it to run on the next discovery cycle. +func (s *SimpleSyncService) resetSourceDiscoveryTimer(tenantID, sourceName string) { + key := sourceKey(tenantID, sourceName) + + s.sourceLastDiscoveryMu.Lock() + delete(s.sourceLastDiscovery, key) + s.sourceLastDiscoveryMu.Unlock() +} + +// getSourceConfig returns the source config for a given tenant and source name. +func (s *SimpleSyncService) getSourceConfig(tenantID, sourceName string) *models.SourceConfig { + if tenantID == "" { + s.configMu.RLock() + source := s.config.Sources[sourceName] + s.configMu.RUnlock() + return source + } + + s.tenantMu.RLock() + tenantSources := s.tenantSources[tenantID] + source := tenantSources[sourceName] + s.tenantMu.RUnlock() + return source +} + // createIntegration creates a single integration instance. // agentID and gatewayID are fallback values if not set in src. // Callers must ensure these values are obtained under proper synchronization. diff --git a/rust/srql/src/parser.rs b/rust/srql/src/parser.rs index e55a70505..2618cb606 100644 --- a/rust/srql/src/parser.rs +++ b/rust/srql/src/parser.rs @@ -4,8 +4,10 @@ use crate::{ error::{Result, ServiceError}, time::{parse_time_value, TimeFilterSpec}, }; +use serde::Serialize; -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[serde(rename_all = "snake_case")] pub enum Entity { Agents, Devices, @@ -29,24 +31,71 @@ pub enum Entity { Traces, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] pub struct QueryAst { pub entity: Entity, pub filters: Vec, pub order: Vec, pub limit: Option, pub time_filter: Option, - pub stats: Option, + pub stats: Option, pub downsample: Option, /// Rollup stats type for querying pre-computed CAGGs (e.g., "severity", "summary", "availability") pub rollup_stats: Option, } +/// Parsed stats specification with structured aggregation info +#[derive(Debug, Clone, Serialize)] +pub struct StatsSpec { + /// The raw stats expression (for backwards compatibility) + pub raw: String, + /// Parsed aggregations + pub aggregations: Vec, +} + +impl StatsSpec { + /// Returns the raw stats expression string for backwards compatibility + /// with existing query modules that parse stats themselves. + pub fn as_raw(&self) -> &str { + &self.raw + } + + /// Create a StatsSpec from a raw expression string. + /// This is useful for creating test fixtures. + pub fn from_raw(raw: &str) -> Self { + parse_stats_expr(raw) + } +} + +/// A single stats aggregation like count(), sum(field), etc. +#[derive(Debug, Clone, Serialize)] +pub struct StatsAggregation { + /// The aggregation function type + #[serde(rename = "type")] + pub agg_type: StatsAggType, + /// The field to aggregate (None for count()) + pub field: Option, + /// The alias for the result + pub alias: String, +} + +/// Stats aggregation function types +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum StatsAggType { + Count, + Sum, + Avg, + Min, + Max, +} + const MAX_STATS_EXPR_LEN: usize = 1024; const MAX_FILTER_LIST_VALUES: usize = 200; const MAX_DOWNSAMPLE_BUCKET_SECS: i64 = 31 * 24 * 60 * 60; -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "snake_case")] pub enum DownsampleAgg { Avg, Min, @@ -55,21 +104,22 @@ pub enum DownsampleAgg { Count, } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] pub struct DownsampleSpec { pub bucket_seconds: i64, pub agg: DownsampleAgg, pub series: Option, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] pub struct Filter { pub field: String, pub op: FilterOp, pub value: FilterValue, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "snake_case")] pub enum FilterOp { Eq, NotEq, @@ -83,7 +133,8 @@ pub enum FilterOp { Lte, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] +#[serde(untagged)] pub enum FilterValue { Scalar(String), List(Vec), @@ -109,13 +160,14 @@ impl FilterValue { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] pub struct OrderClause { pub field: String, pub direction: OrderDirection, } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, Serialize)] +#[serde(rename_all = "snake_case")] pub enum OrderDirection { Asc, Desc, @@ -209,7 +261,7 @@ pub fn parse(input: &str) -> Result { "stats expression must be <= {MAX_STATS_EXPR_LEN} characters" ))); } - stats = Some(expr); + stats = Some(parse_stats_expr(&expr)); } "rollup_stats" => { let stat_type = value.as_scalar()?.trim().to_lowercase(); @@ -354,6 +406,96 @@ fn normalize_optional_string(raw: &str) -> Option { } } +/// Parse a stats expression like "count() as total" or "sum(field) as total, avg(field) as average" +fn parse_stats_expr(raw: &str) -> StatsSpec { + let raw = raw.trim().trim_matches('"').trim_matches('\''); + let aggregations = raw + .split(',') + .filter_map(|part| parse_single_stats_agg(part.trim())) + .collect(); + + StatsSpec { + raw: raw.to_string(), + aggregations, + } +} + +/// Parse a single stats aggregation like "count() as total" or "sum(field) as total" +fn parse_single_stats_agg(expr: &str) -> Option { + let expr = expr.trim().to_lowercase(); + + // Pattern: func() as alias or func(field) as alias + // Split on " as " to get function part and alias + let (func_part, alias) = if let Some(idx) = expr.find(" as ") { + let (f, a) = expr.split_at(idx); + (f.trim(), a[4..].trim()) // Skip " as " + } else { + // No alias, use the function name as alias + return None; // Require alias for structured parsing + }; + + if alias.is_empty() { + return None; + } + + // Parse the function: count(), sum(field), avg(field), min(field), max(field) + if let Some(inner) = func_part.strip_prefix("count(").and_then(|s| s.strip_suffix(')')) { + // count() or count(field) - we ignore field for count + let _ = inner; // count doesn't need a field + return Some(StatsAggregation { + agg_type: StatsAggType::Count, + field: None, + alias: alias.to_string(), + }); + } + + if let Some(inner) = func_part.strip_prefix("sum(").and_then(|s| s.strip_suffix(')')) { + let field = inner.trim(); + if !field.is_empty() { + return Some(StatsAggregation { + agg_type: StatsAggType::Sum, + field: Some(field.to_string()), + alias: alias.to_string(), + }); + } + } + + if let Some(inner) = func_part.strip_prefix("avg(").and_then(|s| s.strip_suffix(')')) { + let field = inner.trim(); + if !field.is_empty() { + return Some(StatsAggregation { + agg_type: StatsAggType::Avg, + field: Some(field.to_string()), + alias: alias.to_string(), + }); + } + } + + if let Some(inner) = func_part.strip_prefix("min(").and_then(|s| s.strip_suffix(')')) { + let field = inner.trim(); + if !field.is_empty() { + return Some(StatsAggregation { + agg_type: StatsAggType::Min, + field: Some(field.to_string()), + alias: alias.to_string(), + }); + } + } + + if let Some(inner) = func_part.strip_prefix("max(").and_then(|s| s.strip_suffix(')')) { + let field = inner.trim(); + if !field.is_empty() { + return Some(StatsAggregation { + agg_type: StatsAggType::Max, + field: Some(field.to_string()), + alias: alias.to_string(), + }); + } + } + + None +} + fn build_filter(key: &str, value: FilterValue) -> Filter { let mut field = key.trim(); let mut negated = false; @@ -611,22 +753,49 @@ mod tests { #[test] fn parses_stats_expression() { let ast = parse("in:logs stats:\"count() as total\" time:last_24h").unwrap(); - assert_eq!(ast.stats.as_deref(), Some("count() as total")); + let stats = ast.stats.as_ref().unwrap(); + assert_eq!(stats.raw, "count() as total"); + assert_eq!(stats.aggregations.len(), 1); + assert!(matches!(stats.aggregations[0].agg_type, StatsAggType::Count)); + assert_eq!(stats.aggregations[0].alias, "total"); } #[test] fn parses_unquoted_stats_alias() { let ast = parse("in:devices stats:count() as total").unwrap(); - assert_eq!(ast.stats.as_deref(), Some("count() as total")); + let stats = ast.stats.as_ref().unwrap(); + assert_eq!(stats.raw, "count() as total"); + assert_eq!(stats.aggregations.len(), 1); + assert!(matches!(stats.aggregations[0].agg_type, StatsAggType::Count)); } #[test] fn parses_unquoted_stats_alias_with_following_tokens() { let ast = parse("in:devices stats:count() as total time:last_7d").unwrap(); - assert_eq!(ast.stats.as_deref(), Some("count() as total")); + let stats = ast.stats.as_ref().unwrap(); + assert_eq!(stats.raw, "count() as total"); assert!(ast.time_filter.is_some()); } + #[test] + fn parses_stats_with_field() { + let ast = parse("in:devices stats:\"sum(value) as total_value\"").unwrap(); + let stats = ast.stats.as_ref().unwrap(); + assert_eq!(stats.aggregations.len(), 1); + assert!(matches!(stats.aggregations[0].agg_type, StatsAggType::Sum)); + assert_eq!(stats.aggregations[0].field.as_deref(), Some("value")); + assert_eq!(stats.aggregations[0].alias, "total_value"); + } + + #[test] + fn parses_multiple_stats() { + let ast = parse("in:devices stats:\"count() as total, sum(value) as sum_val\"").unwrap(); + let stats = ast.stats.as_ref().unwrap(); + assert_eq!(stats.aggregations.len(), 2); + assert!(matches!(stats.aggregations[0].agg_type, StatsAggType::Count)); + assert!(matches!(stats.aggregations[1].agg_type, StatsAggType::Sum)); + } + #[test] fn rejects_stats_alias_missing_identifier() { let err = parse("in:devices stats:count() as").unwrap_err(); diff --git a/rust/srql/src/query/cpu_metrics.rs b/rust/srql/src/query/cpu_metrics.rs index df7c1f300..e34947bab 100644 --- a/rust/srql/src/query/cpu_metrics.rs +++ b/rust/srql/src/query/cpu_metrics.rs @@ -72,7 +72,7 @@ struct CpuStatsPayload { pub(super) async fn execute(conn: &mut AsyncPgConnection, plan: &QueryPlan) -> Result> { ensure_entity(plan)?; - if let Some(spec) = parse_stats_spec(plan.stats.as_deref())? { + if let Some(spec) = parse_stats_spec(plan.stats.as_ref().map(|s| s.as_raw()))? { let payload = execute_stats(conn, plan, &spec).await?; return Ok(payload); } @@ -91,7 +91,7 @@ pub(super) async fn execute(conn: &mut AsyncPgConnection, plan: &QueryPlan) -> R pub(super) fn to_sql_and_params(plan: &QueryPlan) -> Result<(String, Vec)> { ensure_entity(plan)?; - if let Some(spec) = parse_stats_spec(plan.stats.as_deref())? { + if let Some(spec) = parse_stats_spec(plan.stats.as_ref().map(|s| s.as_raw()))? { let sql = build_stats_query(plan, &spec)?; let params = sql.binds.into_iter().map(bind_param_from_stats).collect(); return Ok((rewrite_placeholders(&sql.sql), params)); @@ -706,7 +706,7 @@ mod tests { r#"avg(usage_percent) as avg_cpu by device_id"#, "demo-partition", ); - let spec = parse_stats_spec(plan.stats.as_deref()).unwrap().unwrap(); + let spec = parse_stats_spec(plan.stats.as_ref().map(|s| s.as_raw())).unwrap().unwrap(); assert_eq!(spec.alias, "avg_cpu"); let sql = build_stats_query(&plan, &spec).expect("stats SQL should build"); @@ -738,7 +738,7 @@ mod tests { limit: 100, offset: 0, time_range: Some(TimeRange { start, end }), - stats: Some(stats.to_string()), + stats: Some(crate::parser::StatsSpec::from_raw(stats)), downsample: None, rollup_stats: None, } diff --git a/rust/srql/src/query/devices.rs b/rust/srql/src/query/devices.rs index b2519cf7a..33dc43c9a 100644 --- a/rust/srql/src/query/devices.rs +++ b/rust/srql/src/query/devices.rs @@ -32,7 +32,7 @@ pub(super) async fn execute( ) -> Result> { ensure_entity(plan)?; - if let Some(spec) = parse_stats_spec(plan.stats.as_deref())? { + if let Some(spec) = parse_stats_spec(plan.stats.as_ref().map(|s| s.as_raw()))? { let query = build_stats_query(plan, &spec)?; let values: Vec = query .load(conn) @@ -55,7 +55,7 @@ pub(super) async fn execute( pub(super) fn to_sql_and_params(plan: &QueryPlan) -> Result<(String, Vec)> { ensure_entity(plan)?; - if let Some(spec) = parse_stats_spec(plan.stats.as_deref())? { + if let Some(spec) = parse_stats_spec(plan.stats.as_ref().map(|s| s.as_raw()))? { let query = build_stats_query(plan, &spec)?; let sql = super::diesel_sql(&query)?; diff --git a/rust/srql/src/query/interfaces.rs b/rust/srql/src/query/interfaces.rs index c20c280d9..69a5bf0f9 100644 --- a/rust/srql/src/query/interfaces.rs +++ b/rust/srql/src/query/interfaces.rs @@ -37,7 +37,7 @@ struct CountStatsSpec { pub(super) async fn execute(conn: &mut AsyncPgConnection, plan: &QueryPlan) -> Result> { ensure_entity(plan)?; - if let Some(stats) = parse_stats_spec(plan.stats.as_deref())? { + if let Some(stats) = parse_stats_spec(plan.stats.as_ref().map(|s| s.as_raw()))? { let payload = execute_stats(conn, plan, &stats).await?; return Ok(vec![payload]); } @@ -68,7 +68,7 @@ pub(super) fn to_sql_and_params(plan: &QueryPlan) -> Result<(String, Vec, filter: &Filter) -> Result fn build_stats_query(plan: &QueryPlan) -> Result> { let stats_raw = match plan.stats.as_ref() { - Some(raw) if !raw.trim().is_empty() => raw.trim(), + Some(raw) if !raw.as_raw().trim().is_empty() => raw.as_raw().trim(), _ => return Ok(None), }; @@ -852,7 +852,7 @@ mod tests { limit: 100, offset: 0, time_range: Some(TimeRange { start, end }), - stats: Some(stats.to_string()), + stats: Some(crate::parser::StatsSpec::from_raw(stats)), downsample: None, rollup_stats: None, } @@ -906,7 +906,7 @@ mod tests { limit: 100, offset: 0, time_range: Some(TimeRange { start, end }), - stats: Some("count() as total".to_string()), + stats: Some(crate::parser::StatsSpec::from_raw("count() as total")), downsample: None, rollup_stats: None, }; diff --git a/rust/srql/src/query/mod.rs b/rust/srql/src/query/mod.rs index 5371ffc8b..b0fe5ab8c 100644 --- a/rust/srql/src/query/mod.rs +++ b/rust/srql/src/query/mod.rs @@ -973,7 +973,7 @@ pub struct QueryPlan { pub limit: i64, pub offset: i64, pub time_range: Option, - pub stats: Option, + pub stats: Option, pub downsample: Option, /// Rollup stats type for querying pre-computed CAGGs (e.g., "severity", "summary", "availability") pub rollup_stats: Option, diff --git a/rust/srql/src/query/otel_metrics.rs b/rust/srql/src/query/otel_metrics.rs index 13150b40d..063226c6b 100644 --- a/rust/srql/src/query/otel_metrics.rs +++ b/rust/srql/src/query/otel_metrics.rs @@ -355,7 +355,7 @@ impl MetricsGroupField { fn build_stats_query(plan: &QueryPlan) -> Result> { let stats_raw = match plan.stats.as_ref() { - Some(value) if !value.trim().is_empty() => value.trim(), + Some(value) if !value.as_raw().trim().is_empty() => value.as_raw().trim(), _ => return Ok(None), }; diff --git a/rust/srql/src/query/services.rs b/rust/srql/src/query/services.rs index cebf3c991..3c416f820 100644 --- a/rust/srql/src/query/services.rs +++ b/rust/srql/src/query/services.rs @@ -75,7 +75,7 @@ pub(super) async fn execute(conn: &mut AsyncPgConnection, plan: &QueryPlan) -> R })]); } - if let Some(spec) = parse_stats_spec(plan.stats.as_deref())? { + if let Some(spec) = parse_stats_spec(plan.stats.as_ref().map(|s| s.as_raw()))? { let query = build_stats_query(plan, &spec)?; let values: Vec = query .load(conn) @@ -112,7 +112,7 @@ pub(super) fn to_sql_and_params(plan: &QueryPlan) -> Result<(String, Vec Result { // Handle stats mode (aggregation queries) if let Some(raw_stats) = plan.stats.as_ref().and_then(|s| { - let trimmed = s.trim(); + let trimmed = s.as_raw().trim(); if trimmed.is_empty() { None } else { diff --git a/rust/srql/src/time.rs b/rust/srql/src/time.rs index 82ee11187..a1019b232 100644 --- a/rust/srql/src/time.rs +++ b/rust/srql/src/time.rs @@ -2,14 +2,16 @@ use crate::error::{Result, ServiceError}; use chrono::{DateTime, Duration, NaiveDateTime, Utc}; +use serde::Serialize; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] pub struct TimeRange { pub start: DateTime, pub end: DateTime, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] pub enum TimeFilterSpec { RelativeHours(i64), RelativeDays(i64), diff --git a/web-ng/lib/serviceradar_web_ng/srql.ex b/web-ng/lib/serviceradar_web_ng/srql.ex index 0796afd4e..2dd4cece3 100644 --- a/web-ng/lib/serviceradar_web_ng/srql.ex +++ b/web-ng/lib/serviceradar_web_ng/srql.ex @@ -111,117 +111,136 @@ defmodule ServiceRadarWebNG.SRQL do defp extract_entity(_), do: nil - # Parse SRQL query into params for Ash adapter - # SRQL format: in:entity time:last_24h sort:field:desc field:value limit:100 + # Parse SRQL query into params for Ash adapter using the Rust parser. + # The Rust parser handles all parsing - no regex fallback. defp parse_srql_params(query, limit) do - entity = extract_entity(query) - filters = parse_srql_filters(query) - time_filter = parse_srql_time(query, entity) - sort = parse_srql_sort(query) + case Native.parse_ast(query) do + {:ok, json} -> + case Jason.decode(json) do + {:ok, ast} -> + convert_ast_to_params(ast, limit) + + {:error, reason} -> + Logger.error("Failed to decode SRQL AST JSON: #{inspect(reason)}") + %{filters: [], sort: nil, limit: limit || 100, stats: nil} + end + + {:error, reason} -> + Logger.error("Failed to parse SRQL query: #{inspect(reason)}") + %{filters: [], sort: nil, limit: limit || 100, stats: nil} + end + end + + # Convert Rust AST to Ash adapter params format + defp convert_ast_to_params(ast, limit) do + filters = convert_ast_filters(ast["filters"] || []) + time_filter = convert_ast_time_filter(ast["time_filter"]) + sort = convert_ast_order(ast["order"] || []) + stats = convert_ast_stats(ast["stats"]) all_filters = if time_filter, do: [time_filter | filters], else: filters %{ filters: all_filters, sort: sort, - limit: limit || 100 + limit: limit || ast["limit"] || 100, + stats: stats } end - # Get the timestamp field for an entity - defp timestamp_field_for("events"), do: "occurred_at" - defp timestamp_field_for("alerts"), do: "triggered_at" - defp timestamp_field_for("services"), do: "last_check_at" - defp timestamp_field_for("service_checks"), do: "last_check_at" - defp timestamp_field_for("devices"), do: "last_seen" - defp timestamp_field_for("gateways"), do: "last_seen" - defp timestamp_field_for("agents"), do: "last_seen" - defp timestamp_field_for(_), do: "created_at" + # Convert AST filters to Ash adapter format + defp convert_ast_filters(filters) when is_list(filters) do + Enum.map(filters, fn filter -> + %{ + field: filter["field"], + op: convert_filter_op(filter["op"]), + value: convert_filter_value(filter["value"]) + } + end) + end - # Parse SRQL time range: time:last_24h, time:last_7d, etc. - defp parse_srql_time(query, entity) do - time_field = timestamp_field_for(entity) + defp convert_ast_filters(_), do: [] - case Regex.run(~r/\btime:(\S+)/i, query) do - [_, "last_1h"] -> - %{field: time_field, op: "gte", value: ago(1, :hour)} + defp convert_filter_op("eq"), do: "eq" + defp convert_filter_op("not_eq"), do: "neq" + defp convert_filter_op("like"), do: "contains" + defp convert_filter_op("not_like"), do: "neq" + defp convert_filter_op("in"), do: "in" + defp convert_filter_op("not_in"), do: "neq" + defp convert_filter_op("gt"), do: "gt" + defp convert_filter_op("gte"), do: "gte" + defp convert_filter_op("lt"), do: "lt" + defp convert_filter_op("lte"), do: "lte" + defp convert_filter_op(op), do: op - [_, "last_24h"] -> - %{field: time_field, op: "gte", value: ago(24, :hour)} + defp convert_filter_value(value) when is_list(value), do: value + defp convert_filter_value(value), do: value - [_, "last_7d"] -> - %{field: time_field, op: "gte", value: ago(7, :day)} + # Convert AST time_filter to Ash adapter format + defp convert_ast_time_filter(nil), do: nil - [_, "last_30d"] -> - %{field: time_field, op: "gte", value: ago(30, :day)} + defp convert_ast_time_filter(%{"type" => "relative_hours", "RelativeHours" => hours}) do + %{field: "timestamp", op: "gte", value: ago(hours, :hour)} + end - _ -> - nil - end + defp convert_ast_time_filter(%{"type" => "relative_days", "RelativeDays" => days}) do + %{field: "timestamp", op: "gte", value: ago(days, :day)} end - defp ago(amount, :hour), do: DateTime.add(DateTime.utc_now(), -amount * 3600, :second) - defp ago(amount, :day), do: DateTime.add(DateTime.utc_now(), -amount * 86_400, :second) + defp convert_ast_time_filter(%{"type" => "today"}) do + now = DateTime.utc_now() + start_of_day = %{now | hour: 0, minute: 0, second: 0, microsecond: {0, 0}} + %{field: "timestamp", op: "gte", value: start_of_day} + end - # Parse SRQL field filters: field:value or field:(val1,val2) or field:"quoted" - defp parse_srql_filters(query) do - # Match field:value patterns (excluding reserved keywords) - reserved = ~w(in time sort limit cursor direction) - - ~r/\b(\w+):((?:"[^"]*"|'[^']*'|\([^)]*\)|\S+))/ - |> Regex.scan(query) - |> Enum.map(fn [_, field, value] -> - if field in reserved do - nil - else - parse_srql_field_value(field, value) - end - end) - |> Enum.reject(&is_nil/1) + defp convert_ast_time_filter(%{"type" => "yesterday"}) do + yesterday = DateTime.add(DateTime.utc_now(), -86_400, :second) + start_of_day = %{yesterday | hour: 0, minute: 0, second: 0, microsecond: {0, 0}} + %{field: "timestamp", op: "gte", value: start_of_day} end - defp parse_srql_field_value(field, value) do - cond do - # Quoted value: field:"value" or field:'value' - String.starts_with?(value, "\"") and String.ends_with?(value, "\"") -> - %{field: field, op: "eq", value: String.slice(value, 1..-2//1)} - - String.starts_with?(value, "'") and String.ends_with?(value, "'") -> - %{field: field, op: "eq", value: String.slice(value, 1..-2//1)} - - # Multiple values: field:(val1,val2) - String.starts_with?(value, "(") and String.ends_with?(value, ")") -> - values = - value - |> String.slice(1..-2//1) - |> String.split(",") - |> Enum.map(&String.trim/1) - - %{field: field, op: "in", value: values} - - # Simple value - true -> - %{field: field, op: "eq", value: value} + defp convert_ast_time_filter(%{"type" => "absolute", "start" => start_str, "end" => _end_str}) do + case DateTime.from_iso8601(start_str) do + {:ok, start, _} -> %{field: "timestamp", op: "gte", value: start} + _ -> nil end end - # Parse SRQL sort: sort:field:dir or sort:field - defp parse_srql_sort(query) do - case Regex.run(~r/\bsort:(\w+)(?::(\w+))?/i, query) do - [_, field, dir] when dir in ["asc", "desc"] -> - %{field: field, dir: dir} + defp convert_ast_time_filter(_), do: nil + + # Convert AST order to Ash adapter format + defp convert_ast_order([%{"field" => field, "direction" => dir} | _]) do + %{field: field, dir: convert_sort_direction(dir)} + end + + defp convert_ast_order(_), do: nil - [_, field, _] -> - %{field: field, dir: "desc"} + defp convert_sort_direction("asc"), do: "asc" + defp convert_sort_direction("desc"), do: "desc" + defp convert_sort_direction(_), do: "desc" - [_, field] -> - %{field: field, dir: "desc"} + # Convert AST stats to Ash adapter format + defp convert_ast_stats(nil), do: nil - _ -> - nil + defp convert_ast_stats(%{"aggregations" => aggregations}) when is_list(aggregations) do + Enum.map(aggregations, fn agg -> + %{ + type: String.to_atom(agg["type"] || "count"), + field: agg["field"], + alias: agg["alias"] + } + end) + |> case do + [] -> nil + stats -> stats end end + defp convert_ast_stats(_), do: nil + + defp ago(amount, :hour), do: DateTime.add(DateTime.utc_now(), -amount * 3600, :second) + defp ago(amount, :day), do: DateTime.add(DateTime.utc_now(), -amount * 86_400, :second) + defp translate(query, limit, cursor, direction, mode) do case Native.translate(query, limit, cursor, direction, mode) do {:ok, json} when is_binary(json) -> diff --git a/web-ng/lib/serviceradar_web_ng/srql/ash_adapter.ex b/web-ng/lib/serviceradar_web_ng/srql/ash_adapter.ex index 54d17c061..69dbf0b26 100644 --- a/web-ng/lib/serviceradar_web_ng/srql/ash_adapter.ex +++ b/web-ng/lib/serviceradar_web_ng/srql/ash_adapter.ex @@ -208,7 +208,13 @@ defmodule ServiceRadarWebNG.SRQL.AshAdapter do # internal field "raw_data", # Ash internal - "__metadata__" + "__metadata__", + # TimescaleDB aggregation fields not in Ash resources + "series", + "agg", + "bucket", + # SRQL uses uid but TimeseriesMetric doesn't have it + "uid" ]) @doc """ @@ -239,9 +245,18 @@ defmodule ServiceRadarWebNG.SRQL.AshAdapter do def query(entity, params, scope \\ nil) do with {:ok, resource} <- get_resource(entity), {:ok, domain} <- get_domain(entity), - {:ok, query} <- build_query(resource, entity, params), - {:ok, results} <- execute_query(domain, query, scope) do - {:ok, format_response(results, entity, params)} + {:ok, query} <- build_query(resource, entity, params) do + # Check if this is a stats query (aggregation) + case Map.get(params, :stats) do + stats when is_list(stats) and stats != [] -> + execute_stats_query(query, stats, scope, entity, params) + + _ -> + case execute_query(domain, query, scope) do + {:ok, results} -> {:ok, format_response(results, entity, params)} + {:error, reason} -> {:error, reason} + end + end else {:error, reason} -> Logger.warning("SRQL AshAdapter query failed: #{inspect(reason)}") @@ -249,6 +264,120 @@ defmodule ServiceRadarWebNG.SRQL.AshAdapter do end end + # Execute a stats/aggregation query + defp execute_stats_query(query, stats, scope, entity, params) do + # Extract tenant and actor from scope for explicit passing + {tenant, actor} = extract_tenant_and_actor(scope) + + opts = + [] + |> maybe_add_opt(:tenant, tenant) + |> maybe_add_opt(:actor, actor) + + # Build result map from stats + result = + Enum.reduce(stats, %{}, fn stat, acc -> + case execute_single_stat(query, stat, opts) do + {:ok, value} -> + Map.put(acc, stat.alias, value) + + {:error, reason} -> + Logger.debug("Stats query failed for #{stat.alias}: #{inspect(reason)}") + Map.put(acc, stat.alias, 0) + end + end) + + {:ok, format_stats_response(result, entity, params)} + end + + # Extract tenant and actor from scope + defp extract_tenant_and_actor(nil), do: {nil, nil} + + defp extract_tenant_and_actor(%{active_tenant: tenant, user: user}) do + # Convert tenant to schema string using TenantSchemas + tenant_schema = + case tenant do + %{id: id} -> ServiceRadar.Cluster.TenantSchemas.schema_for_tenant(id) + _ -> nil + end + + {tenant_schema, user} + end + + defp extract_tenant_and_actor(_), do: {nil, nil} + + defp maybe_add_opt(opts, _key, nil), do: opts + defp maybe_add_opt(opts, key, value), do: [{key, value} | opts] + + defp execute_single_stat(query, %{type: :count, alias: _alias}, opts) do + case Ash.count(query, opts) do + {:ok, count} -> {:ok, count} + {:error, reason} -> {:error, reason} + end + end + + defp execute_single_stat(query, %{type: :sum, field: field, alias: _alias}, opts) do + field_atom = String.to_existing_atom(field) + + case Ash.sum(query, field_atom, opts) do + {:ok, sum} -> {:ok, sum || 0} + {:error, reason} -> {:error, reason} + end + rescue + ArgumentError -> {:ok, 0} + end + + defp execute_single_stat(query, %{type: :avg, field: field, alias: _alias}, opts) do + field_atom = String.to_existing_atom(field) + + case Ash.avg(query, field_atom, opts) do + {:ok, avg} -> {:ok, avg || 0} + {:error, reason} -> {:error, reason} + end + rescue + ArgumentError -> {:ok, 0} + end + + defp execute_single_stat(query, %{type: :min, field: field, alias: _alias}, opts) do + field_atom = String.to_existing_atom(field) + + case Ash.min(query, field_atom, opts) do + {:ok, min} -> {:ok, min || 0} + {:error, reason} -> {:error, reason} + end + rescue + ArgumentError -> {:ok, 0} + end + + defp execute_single_stat(query, %{type: :max, field: field, alias: _alias}, opts) do + field_atom = String.to_existing_atom(field) + + case Ash.max(query, field_atom, opts) do + {:ok, max} -> {:ok, max || 0} + {:error, reason} -> {:error, reason} + end + rescue + ArgumentError -> {:ok, 0} + end + + defp execute_single_stat(_query, _stat, _opts), do: {:ok, 0} + + # Format stats response to match SRQL response format + defp format_stats_response(result, _entity, params) do + limit = Map.get(params, :limit, 100) + + %{ + "results" => [result], + "pagination" => %{ + "next_cursor" => nil, + "prev_cursor" => nil, + "limit" => limit + }, + "viz" => nil, + "error" => nil + } + end + @doc """ Get the Ash resource for an entity. """ diff --git a/web-ng/lib/serviceradar_web_ng/srql/native.ex b/web-ng/lib/serviceradar_web_ng/srql/native.ex index b24b39ec3..e4e57501c 100644 --- a/web-ng/lib/serviceradar_web_ng/srql/native.ex +++ b/web-ng/lib/serviceradar_web_ng/srql/native.ex @@ -20,4 +20,11 @@ defmodule ServiceRadarWebNG.SRQL.Native do def translate(_query, _limit, _cursor, _direction, _mode), do: :erlang.nif_error(:nif_not_loaded) + + @doc """ + Parse an SRQL query and return the AST as JSON. + This allows consuming the structured query without re-parsing in Elixir. + """ + def parse_ast(_query), + do: :erlang.nif_error(:nif_not_loaded) end diff --git a/web-ng/lib/serviceradar_web_ng_web/live/admin/integration_live/index.ex b/web-ng/lib/serviceradar_web_ng_web/live/admin/integration_live/index.ex index 55bf8e7c6..365854dec 100644 --- a/web-ng/lib/serviceradar_web_ng_web/live/admin/integration_live/index.ex +++ b/web-ng/lib/serviceradar_web_ng_web/live/admin/integration_live/index.ex @@ -9,6 +9,7 @@ defmodule ServiceRadarWebNGWeb.Admin.IntegrationLive.Index do import ServiceRadarWebNGWeb.AdminComponents + alias ServiceRadar.Cluster.TenantSchemas alias ServiceRadar.Integrations alias ServiceRadar.Integrations.IntegrationSource alias ServiceRadar.Infrastructure.Agent @@ -37,14 +38,39 @@ defmodule ServiceRadarWebNGWeb.Admin.IntegrationLive.Index do |> assign(:show_edit_modal, false) |> assign(:show_details_modal, false) |> assign(:selected_source, nil) - |> assign(:create_form, build_create_form(tenant_id)) + |> assign(:create_form, build_create_form(tenant_id, get_actor(socket))) |> assign(:edit_form, nil) |> assign(:filter_type, nil) |> assign(:filter_enabled, nil) + # Query management for forms + |> assign(:form_queries, [default_query()]) + |> assign(:form_network_blacklist, "") {:ok, socket} end + defp default_query do + %{ + "id" => System.unique_integer([:positive]), + "label" => "", + "query" => "", + "sweep_modes" => [] + } + end + + defp toggle_sweep_mode_for_query(query, target_id, mode) do + if query["id"] == target_id do + modes = Map.get(query, "sweep_modes", []) + Map.put(query, "sweep_modes", toggle_mode(modes, mode)) + else + query + end + end + + defp toggle_mode(modes, mode) do + if mode in modes, do: List.delete(modes, mode), else: modes ++ [mode] + end + @impl true def handle_params(params, _url, socket) do {:noreply, apply_action(socket, socket.assigns.live_action, params)} @@ -85,9 +111,16 @@ defmodule ServiceRadarWebNGWeb.Admin.IntegrationLive.Index do case get_source(id, tenant_id) do {:ok, source} -> + # Convert source queries to form_queries format with IDs + form_queries = source_queries_to_form(source.queries) + # Convert network_blacklist array to textarea format + form_blacklist = Enum.join(source.network_blacklist || [], "\n") + socket |> assign(:selected_source, source) - |> assign(:edit_form, build_edit_form(source)) + |> assign(:edit_form, build_edit_form(source, get_actor(socket))) + |> assign(:form_queries, form_queries) + |> assign(:form_network_blacklist, form_blacklist) |> assign(:show_edit_modal, true) {:error, _} -> @@ -97,6 +130,20 @@ defmodule ServiceRadarWebNGWeb.Admin.IntegrationLive.Index do end end + defp source_queries_to_form(nil), do: [default_query()] + defp source_queries_to_form([]), do: [default_query()] + + defp source_queries_to_form(queries) when is_list(queries) do + Enum.map(queries, fn q -> + %{ + "id" => System.unique_integer([:positive]), + "label" => q["label"] || Map.get(q, :label, ""), + "query" => q["query"] || Map.get(q, :query, ""), + "sweep_modes" => q["sweep_modes"] || Map.get(q, :sweep_modes, []) + } + end) + end + @impl true def handle_event("open_create_modal", _params, socket) do tenant_id = get_tenant_id(socket) @@ -109,10 +156,12 @@ defmodule ServiceRadarWebNGWeb.Admin.IntegrationLive.Index do {:noreply, socket |> assign(:show_create_modal, true) - |> assign(:create_form, build_create_form(tenant_id)) + |> assign(:create_form, build_create_form(tenant_id, get_actor(socket))) |> assign(:agents, agents) |> assign(:agent_index, agent_index) - |> assign(:agent_options, agent_options)} + |> assign(:agent_options, agent_options) + |> assign(:form_queries, [default_query()]) + |> assign(:form_network_blacklist, "")} else {:noreply, socket @@ -126,7 +175,9 @@ defmodule ServiceRadarWebNGWeb.Admin.IntegrationLive.Index do {:noreply, socket |> assign(:show_create_modal, false) - |> assign(:create_form, build_create_form(tenant_id))} + |> assign(:create_form, build_create_form(tenant_id, get_actor(socket))) + |> assign(:form_queries, [default_query()]) + |> assign(:form_network_blacklist, "")} end def handle_event("close_edit_modal", _params, socket) do @@ -134,7 +185,9 @@ defmodule ServiceRadarWebNGWeb.Admin.IntegrationLive.Index do socket |> assign(:show_edit_modal, false) |> assign(:selected_source, nil) - |> assign(:edit_form, nil)} + |> assign(:edit_form, nil) + |> assign(:form_queries, [default_query()]) + |> assign(:form_network_blacklist, "")} end def handle_event("close_details_modal", _params, socket) do @@ -162,6 +215,46 @@ defmodule ServiceRadarWebNGWeb.Admin.IntegrationLive.Index do {:noreply, assign(socket, :edit_form, form)} end + # Query management events + def handle_event("add_query", _params, socket) do + queries = socket.assigns.form_queries ++ [default_query()] + {:noreply, assign(socket, :form_queries, queries)} + end + + def handle_event("remove_query", %{"id" => id}, socket) do + id = String.to_integer(id) + queries = Enum.reject(socket.assigns.form_queries, &(&1["id"] == id)) + # Ensure at least one query remains + queries = if queries == [], do: [default_query()], else: queries + {:noreply, assign(socket, :form_queries, queries)} + end + + def handle_event("update_query", %{"id" => id, "field" => field, "value" => value}, socket) do + id = String.to_integer(id) + + queries = + Enum.map(socket.assigns.form_queries, fn q -> + if q["id"] == id, do: Map.put(q, field, value), else: q + end) + + {:noreply, assign(socket, :form_queries, queries)} + end + + def handle_event("toggle_sweep_mode", %{"id" => id, "mode" => mode}, socket) do + id = String.to_integer(id) + + queries = + Enum.map(socket.assigns.form_queries, &toggle_sweep_mode_for_query(&1, id, mode)) + + {:noreply, assign(socket, :form_queries, queries)} + end + + def handle_event("update_network_blacklist", params, socket) do + # Handle both direct value and form params + value = params["value"] || params["network_blacklist_text"] || "" + {:noreply, assign(socket, :form_network_blacklist, value)} + end + def handle_event("create_source", %{"form" => params}, socket) do tenant_id = get_tenant_id(socket) actor = get_actor(socket) @@ -173,6 +266,14 @@ defmodule ServiceRadarWebNGWeb.Admin.IntegrationLive.Index do # Handle credentials JSON if provided params = parse_credentials_json(params) + # Add queries from form_queries assign + queries = build_queries_for_submit(socket.assigns.form_queries) + params = Map.put(params, "queries", queries) + + # Add network_blacklist from textarea + blacklist = parse_network_blacklist(socket.assigns.form_network_blacklist) + params = Map.put(params, "network_blacklist", blacklist) + form = socket.assigns.create_form.source |> AshPhoenix.Form.validate(params) @@ -183,7 +284,7 @@ defmodule ServiceRadarWebNGWeb.Admin.IntegrationLive.Index do socket |> assign(:show_create_modal, false) |> assign(:sources, list_sources(tenant_id)) - |> assign(:create_form, build_create_form(tenant_id)) + |> assign(:create_form, build_create_form(tenant_id, actor)) |> put_flash(:info, "Integration source created successfully")} {:error, form} -> @@ -206,6 +307,14 @@ defmodule ServiceRadarWebNGWeb.Admin.IntegrationLive.Index do # Handle credentials JSON if provided params = parse_credentials_json(params) + # Add queries from form_queries assign + queries = build_queries_for_submit(socket.assigns.form_queries) + params = Map.put(params, "queries", queries) + + # Add network_blacklist from textarea + blacklist = parse_network_blacklist(socket.assigns.form_network_blacklist) + params = Map.put(params, "network_blacklist", blacklist) + form = socket.assigns.edit_form.source |> AshPhoenix.Form.validate(params) @@ -218,6 +327,8 @@ defmodule ServiceRadarWebNGWeb.Admin.IntegrationLive.Index do |> assign(:selected_source, nil) |> assign(:edit_form, nil) |> assign(:sources, list_sources(tenant_id)) + |> assign(:form_queries, [default_query()]) + |> assign(:form_network_blacklist, "") |> put_flash(:info, "Integration source updated successfully")} {:error, form} -> @@ -467,6 +578,8 @@ defmodule ServiceRadarWebNGWeb.Admin.IntegrationLive.Index do form={@create_form} partition_options={@partition_options} agent_options={@agent_options} + form_queries={@form_queries} + form_network_blacklist={@form_network_blacklist} /> <.edit_modal :if={@show_edit_modal} @@ -474,6 +587,8 @@ defmodule ServiceRadarWebNGWeb.Admin.IntegrationLive.Index do source={@selected_source} partition_options={@partition_options} agent_options={@agent_options} + form_queries={@form_queries} + form_network_blacklist={@form_network_blacklist} /> <.details_modal :if={@show_details_modal} @@ -487,7 +602,7 @@ defmodule ServiceRadarWebNGWeb.Admin.IntegrationLive.Index do defp create_modal(assigns) do ~H""" - + <% end %> + + + + +
Network Settings
+ +
+ + + +
+ + +
Network Settings
+ +
+ + +