Skip to content

Adding risk data model validation to integration testing #387

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions contentctl/objects/base_security_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from abc import ABC, abstractmethod

from pydantic import BaseModel, ConfigDict

from contentctl.objects.detection import Detection


class BaseSecurityEvent(BaseModel, ABC):
"""
Base event class for a Splunk security event (e.g. risks and notables)
"""

# The search name (e.g. "ESCU - Windows Modify Registry EnableLinkedConnections - Rule")
search_name: str

# The search ID that found that generated this event
orig_sid: str

# Allowing fields that aren't explicitly defined to be passed since some of the risk/notable
# event's fields vary depending on the SPL which generated them
model_config = ConfigDict(extra="allow")

@abstractmethod
def validate_against_detection(self, detection: Detection) -> None:
"""
Validate this risk/notable event against the given detection
"""
raise NotImplementedError()
177 changes: 161 additions & 16 deletions contentctl/objects/correlation_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
format_pbar_string, # type: ignore
)
from contentctl.helper.utils import Utils
from contentctl.objects.base_security_event import BaseSecurityEvent
from contentctl.objects.base_test_result import TestResultStatus
from contentctl.objects.detection import Detection
from contentctl.objects.errors import (
Expand Down Expand Up @@ -222,6 +223,9 @@ class CorrelationSearch(BaseModel):
# The list of risk events found
_risk_events: list[RiskEvent] | None = PrivateAttr(default=None)

# The list of risk data model events found
_risk_dm_events: list[BaseSecurityEvent] | None = PrivateAttr(default=None)

# The list of notable events found
_notable_events: list[NotableEvent] | None = PrivateAttr(default=None)

Expand Down Expand Up @@ -554,6 +558,13 @@ def get_risk_events(self, force_update: bool = False) -> list[RiskEvent]:
raise
events.append(event)
self.logger.debug(f"Found risk event for '{self.name}': {event}")
else:
msg = (
f"Found event for unexpected index ({result['index']}) in our query "
f"results (expected {Indexes.RISK_INDEX})"
)
self.logger.error(msg)
raise ValueError(msg)
except ServerError as e:
self.logger.error(f"Error returned from Splunk instance: {e}")
raise e
Expand Down Expand Up @@ -623,6 +634,13 @@ def get_notable_events(self, force_update: bool = False) -> list[NotableEvent]:
raise
events.append(event)
self.logger.debug(f"Found notable event for '{self.name}': {event}")
else:
msg = (
f"Found event for unexpected index ({result['index']}) in our query "
f"results (expected {Indexes.NOTABLE_INDEX})"
)
self.logger.error(msg)
raise ValueError(msg)
except ServerError as e:
self.logger.error(f"Error returned from Splunk instance: {e}")
raise e
Expand All @@ -637,15 +655,119 @@ def get_notable_events(self, force_update: bool = False) -> list[NotableEvent]:

return events

def risk_dm_event_exists(self) -> bool:
"""Whether at least one matching risk data model event exists

Queries the `risk` data model and returns True if at least one matching event (could come
from risk or notable index) exists for this search
:return: a bool indicating whether a risk data model event for this search exists in the
risk data model
"""
# We always force an update on the cache when checking if events exist
events = self.get_risk_dm_events(force_update=True)
return len(events) > 0

def get_risk_dm_events(self, force_update: bool = False) -> list[BaseSecurityEvent]:
"""Get risk data model events from the Splunk instance

Queries the `risk` data model and returns any matching events (could come from risk or
notable index)
:param force_update: whether the cached _risk_events should be forcibly updated if already
set
:return: a list of risk events
"""
# Reset the list of risk data model events if we're forcing an update
if force_update:
self.logger.debug("Resetting risk data model event cache.")
self._risk_dm_events = None

# Use the cached risk_dm_events unless we're forcing an update
if self._risk_dm_events is not None:
self.logger.debug(
f"Using cached risk data model events ({len(self._risk_dm_events)} total)."
)
return self._risk_dm_events

# TODO (#248): Refactor risk/notable querying to pin to a single savedsearch ID
# Search for all risk data model events from a single scheduled search (indicated by
# orig_sid)
query = (
f'datamodel Risk All_Risk flat | search search_name="{self.name}" [datamodel Risk '
f'All_Risk flat | search search_name="{self.name}" | tail 1 | fields orig_sid] '
"| tojson"
)
result_iterator = self._search(query)

# Iterate over the events, storing them in a list and checking for any errors
events: list[BaseSecurityEvent] = []
risk_count = 0
notable_count = 0
try:
for result in result_iterator:
# sanity check that this result from the iterator is a risk event and not some
# other metadata
if result["index"] == Indexes.RISK_INDEX:
try:
parsed_raw = json.loads(result["_raw"])
event = RiskEvent.model_validate(parsed_raw)
except Exception:
self.logger.error(
f"Failed to parse RiskEvent from search result: {result}"
)
raise
events.append(event)
risk_count += 1
self.logger.debug(
f"Found risk event in risk data model for '{self.name}': {event}"
)
elif result["index"] == Indexes.NOTABLE_INDEX:
try:
parsed_raw = json.loads(result["_raw"])
event = NotableEvent.model_validate(parsed_raw)
except Exception:
self.logger.error(
f"Failed to parse NotableEvent from search result: {result}"
)
raise
events.append(event)
notable_count += 1
self.logger.debug(
f"Found notable event in risk data model for '{self.name}': {event}"
)
else:
msg = (
f"Found event for unexpected index ({result['index']}) in our query "
f"results (expected {Indexes.NOTABLE_INDEX} or {Indexes.RISK_INDEX})"
)
self.logger.error(msg)
raise ValueError(msg)
except ServerError as e:
self.logger.error(f"Error returned from Splunk instance: {e}")
raise e

# Log if no events were found
if len(events) < 1:
self.logger.debug(f"No events found in risk data model for '{self.name}'")
else:
# Set the cache if we found events
self._risk_dm_events = events
self.logger.debug(
f"Caching {len(self._risk_dm_events)} risk data model events."
)

# Log counts of risk and notable events found
self.logger.debug(
f"Found {risk_count} risk events and {notable_count} notable events in the risk data "
"model"
)

return events

def validate_risk_events(self) -> None:
"""Validates the existence of any expected risk events

First ensure the risk event exists, and if it does validate its risk message and make sure
any events align with the specified risk object. Also adds the risk index to the purge list
if risk events existed
:param elapsed_sleep_time: an int representing the amount of time slept thus far waiting to
check the risks/notables
:returns: an IntegrationTestResult on failure; None on success
any events align with the specified risk object.
"""
# Ensure the rba object is defined
if self.detection.rba is None:
Expand Down Expand Up @@ -735,13 +857,29 @@ def validate_risk_events(self) -> None:
def validate_notable_events(self) -> None:
"""Validates the existence of any expected notables

Ensures the notable exists. Also adds the notable index to the purge list if notables
existed
:param elapsed_sleep_time: an int representing the amount of time slept thus far waiting to
check the risks/notables
:returns: an IntegrationTestResult on failure; None on success
Check various fields within the notable to ensure alignment with the detection definition.
Additionally, ensure that the notable does not appear in the risk data model, as this is
currently undesired behavior for ESCU detections.
"""
if self.notable_in_risk_dm():
raise ValidationFailed(
"One or more notables appeared in the risk data model. This could lead to risk "
"score doubling, and/or notable multiplexing, depending on the detection type "
"(e.g. TTP), or the number of risk modifiers."
)

def notable_in_risk_dm(self) -> bool:
"""Check if notables are in the risk data model

Returns a bool indicating whether notables are in the risk data model or not.

:returns: a bool, True if notables are in the risk data model results; False if not
"""
raise NotImplementedError()
if self.risk_dm_event_exists():
for event in self.get_risk_dm_events():
if isinstance(event, NotableEvent):
return True
return False

# NOTE: it would be more ideal to switch this to a system which gets the handle of the saved search job and polls
# it for completion, but that seems more tricky
Expand Down Expand Up @@ -828,8 +966,8 @@ def test(

try:
# Validate risk events
self.logger.debug("Checking for matching risk events")
if self.has_risk_analysis_action:
self.logger.debug("Checking for matching risk events")
if self.risk_event_exists():
# TODO (PEX-435): should this in the retry loop? or outside it?
# -> I've observed there being a missing risk event (15/16) on
Expand All @@ -846,22 +984,28 @@ def test(
raise ValidationFailed(
f"TEST FAILED: No matching risk event created for: {self.name}"
)
else:
self.logger.debug(
f"No risk action defined for '{self.name}'"
)

# Validate notable events
self.logger.debug("Checking for matching notable events")
if self.has_notable_action:
self.logger.debug("Checking for matching notable events")
# NOTE: because we check this last, if both fail, the error message about notables will
# always be the last to be added and thus the one surfaced to the user
if self.notable_event_exists():
# TODO (PEX-435): should this in the retry loop? or outside it?
# TODO (PEX-434): implement deeper notable validation (the method
# commented out below is unimplemented)
# self.validate_notable_events(elapsed_sleep_time)
self.validate_notable_events()
pass
else:
raise ValidationFailed(
f"TEST FAILED: No matching notable event created for: {self.name}"
)
else:
self.logger.debug(
f"No notable action defined for '{self.name}'"
)
except ValidationFailed as e:
self.logger.error(f"Risk/notable validation failed: {e}")
result = IntegrationTestResult(
Expand Down Expand Up @@ -1015,6 +1159,7 @@ def cleanup(self, delete_test_index: bool = False) -> None:
# reset caches
self._risk_events = None
self._notable_events = None
self._risk_dm_events = None

def update_pbar(self, state: str) -> str:
"""
Expand Down
19 changes: 6 additions & 13 deletions contentctl/objects/notable_event.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,12 @@
from pydantic import ConfigDict, BaseModel

from contentctl.objects.base_security_event import BaseSecurityEvent
from contentctl.objects.detection import Detection


# TODO (PEX-434): implement deeper notable validation
class NotableEvent(BaseModel):
# The search name (e.g. "ESCU - Windows Modify Registry EnableLinkedConnections - Rule")
search_name: str

# The search ID that found that generated this risk event
orig_sid: str

# Allowing fields that aren't explicitly defined to be passed since some of the risk event's
# fields vary depending on the SPL which generated them
model_config = ConfigDict(extra="allow")
class NotableEvent(BaseSecurityEvent):
# TODO (PEX-434): implement deeper notable validation

def validate_against_detection(self, detection: Detection) -> None:
"""
Validate this risk/notable event against the given detection
"""
raise NotImplementedError()
22 changes: 3 additions & 19 deletions contentctl/objects/risk_event.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,17 @@
import re
from functools import cached_property

from pydantic import (
BaseModel,
ConfigDict,
Field,
PrivateAttr,
computed_field,
field_validator,
)
from pydantic import Field, PrivateAttr, computed_field, field_validator

from contentctl.objects.base_security_event import BaseSecurityEvent
from contentctl.objects.detection import Detection
from contentctl.objects.errors import ValidationFailed
from contentctl.objects.rba import RiskObject


class RiskEvent(BaseModel):
class RiskEvent(BaseSecurityEvent):
"""Model for risk event in ES"""

# The search name (e.g. "ESCU - Windows Modify Registry EnableLinkedConnections - Rule")
search_name: str

# The subject of the risk event (e.g. a username, process name, system name, account ID, etc.)
# (not to be confused w/ the risk object from the detection)
es_risk_object: int | str = Field(alias="risk_object")
Expand All @@ -32,9 +23,6 @@ class RiskEvent(BaseModel):
# The level of risk associated w/ the risk event
risk_score: int

# The search ID that found that generated this risk event
orig_sid: str

# The message for the risk event
risk_message: str

Expand All @@ -53,10 +41,6 @@ class RiskEvent(BaseModel):
# Private attribute caching the risk object this RiskEvent is mapped to
_matched_risk_object: RiskObject | None = PrivateAttr(default=None)

# Allowing fields that aren't explicitly defined to be passed since some of the risk event's
# fields vary depending on the SPL which generated them
model_config = ConfigDict(extra="allow")

@field_validator("annotations_mitre_attack", "analyticstories", mode="before")
@classmethod
def _convert_str_value_to_singleton(cls, v: str | list[str]) -> list[str]:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "contentctl"

version = "5.3.2"
version = "5.4.0"

description = "Splunk Content Control Tool"
authors = ["STRT <[email protected]>"]
Expand Down
Loading