-
Notifications
You must be signed in to change notification settings - Fork 527
[crowdstrike] Fix/5309 Crowdstrike intrusion set name resolution #5410
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
86695b2
795724f
0fdf3e4
ecc1323
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,7 +2,7 @@ | |
| """OpenCTI CrowdStrike report builder module.""" | ||
|
|
||
| import logging | ||
| from typing import List, Mapping, Optional, Tuple, Union | ||
| from typing import Callable, Dict, List, Mapping, Optional, Tuple, Union | ||
|
|
||
| from crowdstrike_feeds_services.utils import ( | ||
| create_external_reference, | ||
|
|
@@ -52,6 +52,7 @@ def __init__( | |
| related_indicators: Optional = None, | ||
| report_guess_relations: bool = False, | ||
| malwares_from_field: Optional[List[dict]] = None, | ||
| actor_resolver: Optional[Callable[[str], Optional[dict]]] = None, | ||
| ) -> None: | ||
| """Initialize report bundle builder.""" | ||
| self.report = report | ||
|
|
@@ -66,6 +67,11 @@ def __init__( | |
| self.report_guess_relations = report_guess_relations | ||
| self.malwares_from_field = malwares_from_field if malwares_from_field else [] | ||
|
|
||
| # Optional resolver to convert CrowdStrike actor identifiers into actor entities | ||
| # (e.g. "LABYRINTHCHOLLIMA" -> {"id": ..., "name": ..., "url": ...}). | ||
| self.actor_resolver = actor_resolver | ||
| self._actor_cache: Dict[str, Optional[dict]] = {} | ||
|
|
||
| # Use report dates for start time and stop time. | ||
| start_time = timestamp_to_datetime(self.report["created_date"]) | ||
| stop_time = None | ||
|
|
@@ -100,34 +106,59 @@ def _create_malware(self, name: str, is_family: bool = False) -> Malware: | |
| ) | ||
|
|
||
| def _create_intrusion_sets(self) -> List[IntrusionSet]: | ||
| report_actors = self.report["actors"] | ||
| if report_actors is None: | ||
| report_actors = self.report.get("actors") | ||
| if not report_actors: | ||
| return [] | ||
|
|
||
| intrusion_sets = [] | ||
| intrusion_sets: List[IntrusionSet] = [] | ||
|
|
||
| for actor in report_actors: | ||
| intrusion_set = self._create_intrusion_set_from_actor(actor) | ||
| intrusion_sets.append(intrusion_set) | ||
| if intrusion_set is not None: | ||
| intrusion_sets.append(intrusion_set) | ||
|
|
||
| return intrusion_sets | ||
|
|
||
| def _create_intrusion_set_from_actor(self, actor: dict) -> Optional[IntrusionSet]: | ||
| actor_name = actor["name"] | ||
| if actor_name is None or not actor_name: | ||
| def _create_intrusion_set_from_actor( | ||
| self, actor: Union[dict, str] | ||
| ) -> Optional[IntrusionSet]: | ||
| # Reports may provide actors as either full actor entities (dict) or as | ||
| # CrowdStrike actor identifiers (str). For identifiers, resolve via the | ||
| # provided resolver (if any) to get the canonical actor name. | ||
| actor_entity: Optional[dict] = None | ||
|
|
||
| if isinstance(actor, dict): | ||
| actor_entity = actor | ||
| elif isinstance(actor, str) and actor: | ||
| if actor in self._actor_cache: | ||
| actor_entity = self._actor_cache[actor] | ||
| elif self.actor_resolver is not None: | ||
| try: | ||
| actor_entity = self.actor_resolver(actor) | ||
| except Exception: | ||
| logger.exception("Failed to resolve actor identifier '%s'", actor) | ||
| actor_entity = None | ||
| self._actor_cache[actor] = actor_entity | ||
|
Comment on lines
+132
to
+141
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've let the ingestion running on a long time (timestamp at 0), to be able to catch that case. Where you able to trigger it ? Any insight on the conf/dataset you are using, so i can replicate that on my side ?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree. To me, this part of the code is unnecessary because "actors" always seems to be represented as a dict and contain all the information we need (especially the name) when retrieving reports. |
||
|
|
||
| if not actor_entity: | ||
| return None | ||
|
|
||
| actor_name = actor_entity.get("name") | ||
| if actor_name is None or not str(actor_name).strip(): | ||
| return None | ||
|
|
||
| external_references = [] | ||
| external_references: List[ExternalReference] = [] | ||
|
|
||
| actor_url = actor["url"] | ||
| if actor_url is not None and actor_url: | ||
| actor_url = actor_entity.get("url") | ||
| actor_id = actor_entity.get("id") | ||
| if actor_url and actor_id: | ||
| external_reference = self._create_external_reference( | ||
| str(actor["id"]), actor_url | ||
| str(actor_id), str(actor_url) | ||
| ) | ||
| external_references.append(external_reference) | ||
|
|
||
| return create_intrusion_set_from_name( | ||
| actor_name, | ||
| str(actor_name), | ||
| self.author, | ||
| self.confidence_level, | ||
| external_references, | ||
|
|
@@ -288,7 +319,7 @@ def build(self) -> Bundle: | |
| bundle_objects.extend(malwares_target_countries) | ||
|
|
||
| # Indicators linked to the report and add to bundle | ||
| indicators_linked = self.related_indicators | ||
| indicators_linked = self.related_indicators or [] | ||
| bundle_objects.extend(indicators_linked) | ||
|
|
||
| # Create object references for the report. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -322,6 +322,7 @@ def _create_report_bundle( | |
| related_indicators_with_related_entities, | ||
| self.report_guess_relations, | ||
| malwares_from_field=malwares_from_field, | ||
| actor_resolver=self.reports_api_cs.get_actor_entity_by_id, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As all previous comments, look overengineered, wasn't able to go through it a single time.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree. To me, this part of the code is unnecessary because "actors" always seems to be represented as a dictionary and contain all the information we need (especially the name) when retrieving reports. |
||
| ) | ||
| return bundle_builder.build() | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,5 @@ | ||
| from typing import List, Optional | ||
|
|
||
| from .base_api import BaseCrowdstrikeClient | ||
|
|
||
|
|
||
|
|
@@ -43,3 +45,45 @@ def query_mitre_attacks(self, actor_id: int): | |
| ) | ||
|
|
||
| return response["body"] | ||
|
|
||
| def get_actors_by_slugs( | ||
| self, | ||
| slugs: List[str], | ||
| fields: Optional[List[str]] = None, | ||
| ): | ||
| """ | ||
| Resolve one or more threat actors by their slug values as provided | ||
| in indicator/report collections. | ||
| """ | ||
| cleaned_slugs = [s for s in slugs if s] | ||
| if not cleaned_slugs: | ||
| return {"errors": [], "meta": {}, "resources": []} | ||
|
|
||
| if fields is None: | ||
| # Start with basic – can switch to "__full__" if you need more. | ||
| fields = ["__basic__"] | ||
|
Comment on lines
+62
to
+64
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
| fql_filter = self.build_slug_filter(cleaned_slugs) | ||
|
|
||
| return self.get_combined_actor_entities( | ||
This comment was marked as outdated.
Sorry, something went wrong.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| limit=len(cleaned_slugs), | ||
| offset=0, | ||
| sort="last_modified_timestamp|desc", | ||
| fql_filter=fql_filter, | ||
| fields=fields, | ||
| ) | ||
|
|
||
| @staticmethod | ||
| def build_slug_filter(slugs: List[str]) -> str: | ||
| """ | ||
| Build an FQL filter to match threat actors by slug. | ||
| Uses OR semantics between slugs so that any matching slug is returned. | ||
| Example output: "(slug:'LABYRINTHCHOLLIMA',slug:'WICKEDPANDA')" | ||
| """ | ||
| cleaned_slugs = [s for s in slugs if s] | ||
| if not cleaned_slugs: | ||
| return "" | ||
|
|
||
| conditions = [f"name:'{slug}'" for slug in cleaned_slugs] | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| # CrowdStrike FQL uses comma as OR between clauses. | ||
| return "(" + ",".join(conditions) + ")" | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,7 +25,7 @@ def get_combined_report_entities( | |
| ) | ||
|
|
||
| self.handle_api_error(response) | ||
| self.helper.connector_logger.info("Getting combined actor entities...") | ||
| self.helper.connector_logger.info("Getting combined report entities...") | ||
|
|
||
| return response["body"] | ||
|
|
||
|
|
@@ -58,3 +58,38 @@ def get_report_pdf(self, report_id: str): | |
| self.helper.connector_logger.info("Getting report PDF...") | ||
|
|
||
| return response | ||
|
|
||
| def get_actor_entity_by_id(self, actor_id: str) -> dict | None: | ||
| """Resolve a CrowdStrike actor identifier into an actor entity. | ||
|
|
||
| Reports/indicators may reference actors as identifiers (e.g. "LABYRINTHCHOLLIMA"). | ||
| This method queries Intel actor entities and returns the first matching resource. | ||
| """ | ||
| if actor_id is None or not str(actor_id).strip(): | ||
| return None | ||
|
|
||
| # NOTE: FalconPy Intel exposes GetIntelActorEntities; the underlying client is `self.cs_intel`. | ||
| # The response is expected to be a dict with a `body` that contains `resources`. | ||
| response = self.cs_intel.get_intel_actor_entities( | ||
|
Comment on lines
+71
to
+73
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Like the previous comment, i never enter the 'actor_resolver' and so this method is never called. But also, feel like look like you should have used |
||
| ids=[str(actor_id)], fields=["__full__"] | ||
| ) | ||
| self.handle_api_error(response) | ||
|
|
||
| body = response.get("body") or {} | ||
| resources = body.get("resources") or [] | ||
| if not resources: | ||
| return None | ||
|
|
||
| # Normalize: some responses return a list, some return a dict keyed by id. | ||
| if isinstance(resources, dict): | ||
| # Try exact match key first | ||
| actor = resources.get(str(actor_id)) | ||
| if actor: | ||
| return actor | ||
| # Otherwise return first value | ||
| for _, value in resources.items(): | ||
| return value | ||
| return None | ||
|
|
||
| # List case | ||
| return resources[0] | ||






There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was unable to verify that statement, leading to a lot of 'unused code' for my review.
I need more insight on how to trigger that.
And was unable to get one matching
elif isinstance(actor, str) and actor