diff --git a/src/secops/chronicle/client.py b/src/secops/chronicle/client.py index 910f1bf..c10a2d9 100644 --- a/src/secops/chronicle/client.py +++ b/src/secops/chronicle/client.py @@ -56,7 +56,9 @@ ) from secops.chronicle.rule_detection import ( list_detections as _list_detections, - list_errors as _list_errors + list_errors as _list_errors, + get_detection as _get_detection, + get_detection_events as _get_detection_events ) from secops.chronicle.rule_retrohunt import ( create_retrohunt as _create_retrohunt, @@ -945,6 +947,24 @@ def list_errors(self, rule_id: str) -> Dict[str, Any]: """ return _list_errors(self, rule_id) + + def get_detection( + self, + rule_id: str, + detection_id: str, + ) -> dict: + + return _get_detection(self, rule_id, detection_id) + + def get_detection_events( + self, + rule_id: str, + detection_id: str, + max_events: int = 100, + ) -> dict: + + return _get_detection_events(self, rule_id, detection_id, max_events) + # Rule Retrohunt methods def create_retrohunt( diff --git a/src/secops/chronicle/rule_detection.py b/src/secops/chronicle/rule_detection.py index 14e4c2e..9db21c8 100644 --- a/src/secops/chronicle/rule_detection.py +++ b/src/secops/chronicle/rule_detection.py @@ -16,6 +16,8 @@ from typing import Dict, Any, Optional from secops.exceptions import APIError +#from datetime import datetime +import datetime as dt def list_detections( @@ -111,4 +113,94 @@ def list_errors( if response.status_code != 200: raise APIError(f"Failed to list rule errors: {response.text}") - return response.json() \ No newline at end of file + return response.json() + + +def get_detection( + client, + rule_id: str, + detection_id: str, +) -> dict: + """Get details on a specific detection. + + Args: + client: ChronicleClient instance + rule_id: Rule ID associated with a detection. + detection_id: Detection ID. + + Returns: + Dictionary containing detection information + + Raises: + APIError: If the API request fails + """ + url = f"{client.base_url}/{client.instance_id}/legacy:legacyGetDetection" + + params = { + "ruleId": rule_id, + 'detectionId': detection_id, + } + + response = client.session.get(url, params=params, stream=True) + + if response.status_code != 200: + raise APIError(f"Failed to get alerts: {response.text}") + + return response.json() + +def get_detection_events( + client, + rule_id: str, + detection_id: str, + max_events: int = 100, +) -> dict: + """Get (search) for all the events associated with a detection + + Args: + client: ChronicleClient instance + rule_id: Rule ID associated with a detection. + detection_id: Detection ID. + max_events: The maximun number of events to return. Default value is 100. Maximum + value is 10,000. + + Returns: + Dictionary containing event samples associated with a detection. + + Raises: + APIError: If the API request fails + """ + + url = f"{client.base_url}/{client.instance_id}/legacy:legacySearchRuleDetectionEvents" + + + # + # Note: The API requires a rule version, but this can be determined from a call to get_detection (legacyGetDetection) + # so in all cases we'll derive the rule version from the response + # + d = get_detection(client, rule_id, detection_id) + if d is None or d.get('detection', None) is None or len(d['detection']) == 0: + raise APIError(f"Failed to get detection: {d}") + + version = d['detection'][0]['ruleVersion'] + + # hacky parsing rule version is in the form @v__ + timestamp_parts = version.split('@')[-1] + v, seconds, ms = timestamp_parts.split('_') + ts = dt.datetime.fromtimestamp(int(seconds), dt.UTC).strftime('%Y-%m-%dT%H:%M:%S') + + # note: precision of version timestamp is only microseconds + ts = "{}.{}Z".format(ts, ms[:6]) + + params = { + "ruleId": rule_id, + 'versionTimestamp': ts, + 'detectionId': detection_id, + 'maxEvents': max_events, + } + + response = client.session.get(url, params=params, stream=True) + + if response.status_code != 200: + raise APIError(f"Failed to get alerts: {response.text}") + + return response.json() \ No newline at end of file