diff --git a/contentctl/objects/abstract_security_content_objects/detection_abstract.py b/contentctl/objects/abstract_security_content_objects/detection_abstract.py index 3c5def00..071642e6 100644 --- a/contentctl/objects/abstract_security_content_objects/detection_abstract.py +++ b/contentctl/objects/abstract_security_content_objects/detection_abstract.py @@ -435,12 +435,8 @@ def serialize_model(self): "datamodel": self.datamodel, "source": self.source, "nes_fields": self.nes_fields, + "rba": self.rba, } - if self.rba is not None: - model["risk_severity"] = self.rba.severity - model["tags"]["risk_score"] = self.rba.risk_score - else: - model["tags"]["risk_score"] = 0 # Only a subset of macro fields are required: all_macros: list[dict[str, str | list[str]]] = [] diff --git a/contentctl/objects/rba.py b/contentctl/objects/rba.py index a63c043e..5c4d6e6a 100644 --- a/contentctl/objects/rba.py +++ b/contentctl/objects/rba.py @@ -11,6 +11,23 @@ RiskScoreValue_Type = Annotated[int, Field(ge=1, le=100)] +def risk_score_to_severity(num: int) -> RiskSeverity: + if 0 <= num <= 20: + return RiskSeverity.INFORMATIONAL + elif 20 < num <= 40: + return RiskSeverity.LOW + elif 40 < num <= 60: + return RiskSeverity.MEDIUM + elif 60 < num <= 80: + return RiskSeverity.HIGH + elif 80 < num <= 100: + return RiskSeverity.CRITICAL + else: + raise Exception( + f"Error getting severity - risk_score must be between 0-100, but was actually {num}" + ) + + class RiskObjectType(str, Enum): SYSTEM = "system" USER = "user" @@ -62,6 +79,11 @@ def __lt__(self, other: RiskObject) -> bool: return True return False + @computed_field + @property + def severity(self) -> RiskSeverity: + return risk_score_to_severity(self.score) + @model_serializer def serialize_risk_object(self) -> dict[str, str | int]: """ @@ -74,6 +96,7 @@ def serialize_risk_object(self) -> dict[str, str | int]: "risk_object_field": self.field, "risk_object_type": self.type, "risk_score": self.score, + "severity": self.severity, } @@ -123,20 +146,7 @@ def risk_score(self) -> RiskScoreValue_Type: @computed_field @property def severity(self) -> RiskSeverity: - if 0 <= self.risk_score <= 20: - return RiskSeverity.INFORMATIONAL - elif 20 < self.risk_score <= 40: - return RiskSeverity.LOW - elif 40 < self.risk_score <= 60: - return RiskSeverity.MEDIUM - elif 60 < self.risk_score <= 80: - return RiskSeverity.HIGH - elif 80 < self.risk_score <= 100: - return RiskSeverity.CRITICAL - else: - raise Exception( - f"Error getting severity - risk_score must be between 0-100, but was actually {self.risk_score}" - ) + return risk_score_to_severity(self.risk_score) @model_serializer def serialize_rba(self) -> dict[str, str | list[dict[str, str | int]]]: @@ -144,4 +154,5 @@ def serialize_rba(self) -> dict[str, str | list[dict[str, str | int]]]: "message": self.message, "risk_objects": [obj.model_dump() for obj in sorted(self.risk_objects)], "threat_objects": [obj.model_dump() for obj in sorted(self.threat_objects)], + "severity": self.severity, } diff --git a/contentctl/output/api_json_output.py b/contentctl/output/api_json_output.py index 80c66b23..02f0eb59 100644 --- a/contentctl/output/api_json_output.py +++ b/contentctl/output/api_json_output.py @@ -1,14 +1,15 @@ from __future__ import annotations + from typing import TYPE_CHECKING if TYPE_CHECKING: + from contentctl.objects.baseline import Baseline + from contentctl.objects.deployment import Deployment from contentctl.objects.detection import Detection + from contentctl.objects.investigation import Investigation from contentctl.objects.lookup import Lookup from contentctl.objects.macro import Macro from contentctl.objects.story import Story - from contentctl.objects.baseline import Baseline - from contentctl.objects.investigation import Investigation - from contentctl.objects.deployment import Deployment import os import pathlib @@ -39,6 +40,7 @@ def writeDetections( "id", "description", "tags", + "rba", "search", "how_to_implement", "known_false_positives",