Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docker-compose.spiffe.yml
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,6 @@ volumes:
driver: local
datasvc-data:
driver: local
driver: local
otel-data:
driver: local
flowgger-data:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,47 +149,46 @@ defmodule ServiceRadarAgentGateway.StatusProcessor do
end
end

# Forward to distributed core process
# Forward to distributed core process via RPC
defp forward_distributed(status) do
# Use Horde registry to find the appropriate core handler
partition = status[:partition]
tenant_slug = status[:tenant_slug] || "default"

case find_core_handler(tenant_slug, partition) do
{:ok, pid} ->
case find_status_handler_node() do
{:ok, node} ->
try do
GenServer.cast(pid, {:status_update, status})
# Cast to StatusHandler on the remote node
GenServer.cast({ServiceRadar.StatusHandler, node}, {:status_update, status})
Logger.debug("Forwarded status to StatusHandler on #{node} for tenant=#{tenant_slug}")
:ok
catch
:exit, reason ->
Logger.warning("Failed to forward status to core: #{inspect(reason)}")
Logger.warning("Failed to forward status to core on #{node}: #{inspect(reason)}")
{:error, :forward_failed}
end

{:error, :not_found} ->
# Log but don't fail - core may not be available yet
Logger.debug("No core handler found for tenant=#{tenant_slug} partition=#{partition}, queuing for retry")
Logger.debug("No StatusHandler found on any node, queuing for retry")
queue_for_retry(status)
end
end

# Find the core handler for a tenant/partition
defp find_core_handler(tenant_slug, partition) do
# Try to find in Horde registry with tenant-scoped key
registry_key = {tenant_slug, partition, :status_handler}

case Horde.Registry.lookup(ServiceRadar.Registry, registry_key) do
[{pid, _}] -> {:ok, pid}
[] ->
# Fall back to partition-only key for backwards compatibility
case Horde.Registry.lookup(ServiceRadar.Registry, {:status_handler, partition}) do
[{pid, _}] -> {:ok, pid}
[] -> {:error, :not_found}
# Find a node that has StatusHandler running
defp find_status_handler_node do
nodes = [Node.self() | Node.list()] |> Enum.uniq()

# First, try to find nodes with StatusHandler
handler_nodes =
Enum.filter(nodes, fn node ->
case :rpc.call(node, Process, :whereis, [ServiceRadar.StatusHandler], 5_000) do
pid when is_pid(pid) -> true
_ -> false
end
end)

case handler_nodes do
[node | _] -> {:ok, node}
[] -> {:error, :not_found}
end
rescue
# Horde may not be available
ArgumentError -> {:error, :not_found}
end

# Queue status for retry when core is not available
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
defmodule ServiceRadar.Identity.Changes.AssignFirstUserRole do
@moduledoc """
Assigns super_admin role to the first user registered for a tenant.

When a user registers and is the first user for their tenant, they are
automatically granted super_admin role. Subsequent users get the default
viewer role.

This ensures every tenant has at least one super_admin who can manage
the tenant's users and settings.
"""

use Ash.Resource.Change

alias ServiceRadar.Cluster.TenantSchemas

require Logger

@impl true
def change(changeset, _opts, _context) do
# Only apply during create actions
if changeset.action_type != :create do
changeset
else
# Check if role is already explicitly set
case Ash.Changeset.get_attribute(changeset, :role) do
nil ->
maybe_assign_super_admin(changeset)

:viewer ->
# Default was applied, check if we should override
maybe_assign_super_admin(changeset)

_other_role ->
# Role was explicitly set, don't override
changeset
end
end
end

defp maybe_assign_super_admin(changeset) do
tenant_id = Ash.Changeset.get_attribute(changeset, :tenant_id)

if is_nil(tenant_id) do
# No tenant yet, can't check - leave default
changeset
else
if first_user_for_tenant?(tenant_id) do
Logger.info("Assigning super_admin role to first user for tenant #{tenant_id}")
Ash.Changeset.force_change_attribute(changeset, :role, :super_admin)
else
changeset
end
end
end

defp first_user_for_tenant?(tenant_id) do
schema = TenantSchemas.schema_for_id(tenant_id)

if is_nil(schema) do
# Can't determine schema, assume not first user for safety
false
else
count = count_users_in_tenant(schema)
count == 0
end
end

defp count_users_in_tenant(schema) do
import Ecto.Query

query =
from(u in {"ng_users", ServiceRadar.Identity.User},
select: count(u.id)
)

case ServiceRadar.Repo.one(query, prefix: schema) do
nil -> 0
count -> count
end
rescue
_ -> 0
end
end
3 changes: 3 additions & 0 deletions elixir/serviceradar_core/lib/serviceradar/identity/user.ex
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ defmodule ServiceRadar.Identity.User do
description "Register a new user with email only (magic link flow)"
accept [:email, :display_name, :tenant_id, :role]
change ServiceRadar.Identity.Changes.AssignDefaultTenant
change ServiceRadar.Identity.Changes.AssignFirstUserRole
primary? true
end

Expand All @@ -119,6 +120,7 @@ defmodule ServiceRadar.Identity.User do
upsert_fields [:email]

change ServiceRadar.Identity.Changes.AssignDefaultTenant
change ServiceRadar.Identity.Changes.AssignFirstUserRole
change AshAuthentication.Strategy.MagicLink.SignInChange

change {AshAuthentication.Strategy.RememberMe.MaybeGenerateTokenChange,
Expand Down Expand Up @@ -166,6 +168,7 @@ defmodule ServiceRadar.Identity.User do
accept [:email, :display_name, :tenant_id]

change ServiceRadar.Identity.Changes.AssignDefaultTenant
change ServiceRadar.Identity.Changes.AssignFirstUserRole

argument :password, :string do
allow_nil? false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,9 @@ defmodule ServiceRadar.Integrations.IntegrationSource do
case record.credentials_encrypted do
nil -> nil
"" -> %{}
json -> Jason.decode!(json)
%Ash.NotLoaded{} -> nil
json when is_binary(json) -> Jason.decode!(json)
_ -> nil
end
end)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,13 @@ defmodule ServiceRadar.Integrations.SyncConfigGenerator do
defp load_sources(agent_id, tenant_id) do
tenant_schema = TenantSchemas.schema_for_tenant(tenant_id)

# Include sources assigned to this specific agent OR sources with no agent (auto-assign)
# Load credentials_encrypted first (so AshCloak can decrypt it), then the credentials calculation
query =
IntegrationSource
|> Ash.Query.for_read(:read, %{}, tenant: tenant_schema, authorize?: false)
|> Ash.Query.filter(enabled == true and agent_id == ^agent_id)
|> Ash.Query.load(:credentials)
|> Ash.Query.filter(enabled == true and (agent_id == ^agent_id or is_nil(agent_id)))
|> Ash.Query.load([:credentials_encrypted, :credentials])
|> Ash.Query.sort(name: :asc)

case Ash.read(query, authorize?: false) do
Expand Down Expand Up @@ -88,6 +90,7 @@ defmodule ServiceRadar.Integrations.SyncConfigGenerator do
"credentials" => credentials,
"queries" => source.queries,
"poll_interval" => format_duration(source.poll_interval_seconds),
"discovery_interval" => format_duration(source.discovery_interval_seconds),
"sweep_interval" => format_duration(source.sweep_interval_seconds),
"agent_id" => source.agent_id,
"gateway_id" => source.gateway_id,
Expand All @@ -97,7 +100,8 @@ defmodule ServiceRadar.Integrations.SyncConfigGenerator do
"batch_size" => get_setting(source.settings, "batch_size"),
"insecure_skip_verify" => get_setting(source.settings, "insecure_skip_verify"),
"tenant_id" => to_string(source.tenant_id),
"tenant_slug" => tenant_slug
"tenant_slug" => tenant_slug,
"sync_service_id" => to_string(source.id)
}
|> compact_map()
end
Expand Down
35 changes: 10 additions & 25 deletions elixir/serviceradar_core/lib/serviceradar/inventory/device.ex
Original file line number Diff line number Diff line change
Expand Up @@ -225,34 +225,26 @@ defmodule ServiceRadar.Inventory.Device do
authorize_if actor_attribute_equals(:role, :super_admin)
end

# Read access: authenticated users in same tenant
# Read access: authenticated users (tenant isolation via context multitenancy)
policy action_type(:read) do
authorize_if expr(
^actor(:role) in [:viewer, :operator, :admin] and
tenant_id == ^actor(:tenant_id)
)
authorize_if actor_attribute_equals(:role, :viewer)
authorize_if actor_attribute_equals(:role, :operator)
authorize_if actor_attribute_equals(:role, :admin)
end

# Create devices: operators/admins in same tenant
# Create devices: operators/admins (tenant isolation via context multitenancy)
policy action_type(:create) do
authorize_if expr(
^actor(:role) in [:operator, :admin] and
tenant_id == ^actor(:tenant_id)
)
authorize_if actor_attribute_equals(:role, :operator)
authorize_if actor_attribute_equals(:role, :admin)
end

# Update devices: operators/admins in same tenant
# Update devices: operators/admins (tenant isolation via context multitenancy)
policy action_type(:update) do
authorize_if expr(
^actor(:role) in [:operator, :admin] and
tenant_id == ^actor(:tenant_id)
)
authorize_if actor_attribute_equals(:role, :operator)
authorize_if actor_attribute_equals(:role, :admin)
end
end

changes do
change ServiceRadar.Changes.AssignTenantId
end

attributes do
# OCSF Core Identity - uid is the primary key
Expand Down Expand Up @@ -461,13 +453,6 @@ defmodule ServiceRadar.Inventory.Device do
description "Additional metadata"
end

# Multi-tenancy
attribute :tenant_id, :uuid do
allow_nil? false
public? false
description "Tenant this device belongs to"
end

# Group assignment
attribute :group_id, :uuid do
public? true
Expand Down
64 changes: 63 additions & 1 deletion elixir/serviceradar_core/lib/serviceradar/status_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ defmodule ServiceRadar.StatusHandler do

require Logger

alias ServiceRadar.Cluster.TenantSchemas
alias ServiceRadar.Integrations.IntegrationSource
alias ServiceRadar.Inventory.SyncIngestor

def start_link(_opts) do
Expand Down Expand Up @@ -40,7 +42,12 @@ defmodule ServiceRadar.StatusHandler do
case decode_results(status[:message]) do
{:ok, updates} ->
actor = system_actor(tenant_id)
sync_ingestor().ingest_updates(updates, tenant_id, actor: actor)
result = sync_ingestor().ingest_updates(updates, tenant_id, actor: actor)

# Record sync status on the IntegrationSource
record_sync_status(updates, tenant_id, actor, result)

result

{:error, reason} ->
{:error, {:invalid_sync_results, reason}}
Expand All @@ -56,6 +63,61 @@ defmodule ServiceRadar.StatusHandler do

defp process(_status), do: :ok

# Record sync status on the IntegrationSource if we have a sync_service_id
defp record_sync_status(updates, tenant_id, actor, ingest_result) do
# Extract sync_service_id from the first update's metadata
sync_service_id = extract_sync_service_id(updates)

if sync_service_id do
tenant_schema = TenantSchemas.schema_for_tenant(tenant_id)

sync_result =
case ingest_result do
:ok -> :success
{:error, _} -> :failed
end

case IntegrationSource.get_by_id(sync_service_id,
tenant: tenant_schema,
actor: actor,
authorize?: false
) do
{:ok, source} ->
source
|> Ash.Changeset.for_update(:record_sync, %{
result: sync_result,
device_count: length(updates)
})
|> Ash.update(tenant: tenant_schema, actor: actor, authorize?: false)
|> case do
{:ok, _} ->
Logger.debug("Recorded sync status for IntegrationSource #{sync_service_id}")

{:error, reason} ->
Logger.warning(
"Failed to record sync status for #{sync_service_id}: #{inspect(reason)}"
)
end

{:error, reason} ->
Logger.debug(
"Could not find IntegrationSource #{sync_service_id} to record sync: #{inspect(reason)}"
)
end
end
rescue
error ->
Logger.warning("Error recording sync status: #{inspect(error)}")
end

defp extract_sync_service_id([update | _]) when is_map(update) do
# Check both string and atom keys
metadata = update["metadata"] || update[:metadata] || %{}
metadata["sync_service_id"] || metadata[:sync_service_id]
end

defp extract_sync_service_id(_), do: nil

defp decode_results(nil), do: {:ok, []}

defp decode_results(message) when is_binary(message) do
Expand Down
Loading
Loading