Skip to content

add support for get_detections and get_detection_events #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion src/secops/chronicle/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@
)
from secops.chronicle.rule_detection import (
list_detections as _list_detections,
list_errors as _list_errors
list_errors as _list_errors,
get_detection as _get_detection,
get_detection_events as _get_detection_events
)
from secops.chronicle.rule_retrohunt import (
create_retrohunt as _create_retrohunt,
Expand Down Expand Up @@ -945,6 +947,24 @@ def list_errors(self, rule_id: str) -> Dict[str, Any]:
"""
return _list_errors(self, rule_id)


def get_detection(
self,
rule_id: str,
detection_id: str,
) -> dict:

return _get_detection(self, rule_id, detection_id)

def get_detection_events(
self,
rule_id: str,
detection_id: str,
max_events: int = 100,
) -> dict:

return _get_detection_events(self, rule_id, detection_id, max_events)

# Rule Retrohunt methods

def create_retrohunt(
Expand Down
94 changes: 93 additions & 1 deletion src/secops/chronicle/rule_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

from typing import Dict, Any, Optional
from secops.exceptions import APIError
#from datetime import datetime
import datetime as dt


def list_detections(
Expand Down Expand Up @@ -111,4 +113,94 @@ def list_errors(
if response.status_code != 200:
raise APIError(f"Failed to list rule errors: {response.text}")

return response.json()
return response.json()


def get_detection(
client,
rule_id: str,
detection_id: str,
) -> dict:
"""Get details on a specific detection.

Args:
client: ChronicleClient instance
rule_id: Rule ID associated with a detection.
detection_id: Detection ID.

Returns:
Dictionary containing detection information

Raises:
APIError: If the API request fails
"""
url = f"{client.base_url}/{client.instance_id}/legacy:legacyGetDetection"

params = {
"ruleId": rule_id,
'detectionId': detection_id,
}

response = client.session.get(url, params=params, stream=True)

if response.status_code != 200:
raise APIError(f"Failed to get alerts: {response.text}")

return response.json()

def get_detection_events(
client,
rule_id: str,
detection_id: str,
max_events: int = 100,
) -> dict:
"""Get (search) for all the events associated with a detection

Args:
client: ChronicleClient instance
rule_id: Rule ID associated with a detection.
detection_id: Detection ID.
max_events: The maximun number of events to return. Default value is 100. Maximum
value is 10,000.

Returns:
Dictionary containing event samples associated with a detection.

Raises:
APIError: If the API request fails
"""

url = f"{client.base_url}/{client.instance_id}/legacy:legacySearchRuleDetectionEvents"


#
# Note: The API requires a rule version, but this can be determined from a call to get_detection (legacyGetDetection)
# so in all cases we'll derive the rule version from the response
#
d = get_detection(client, rule_id, detection_id)
if d is None or d.get('detection', None) is None or len(d['detection']) == 0:
raise APIError(f"Failed to get detection: {d}")

version = d['detection'][0]['ruleVersion']

# hacky parsing rule version is in the form <rule_id>@v_<seconds>_<nanos>
timestamp_parts = version.split('@')[-1]
v, seconds, ms = timestamp_parts.split('_')
ts = dt.datetime.fromtimestamp(int(seconds), dt.UTC).strftime('%Y-%m-%dT%H:%M:%S')

# note: precision of version timestamp is only microseconds
ts = "{}.{}Z".format(ts, ms[:6])

params = {
"ruleId": rule_id,
'versionTimestamp': ts,
'detectionId': detection_id,
'maxEvents': max_events,
}

response = client.session.get(url, params=params, stream=True)

if response.status_code != 200:
raise APIError(f"Failed to get alerts: {response.text}")

return response.json()