diff --git a/external-import/vmray-platform/Dockerfile b/external-import/vmray-platform/Dockerfile new file mode 100644 index 00000000000..7fe96565b8a --- /dev/null +++ b/external-import/vmray-platform/Dockerfile @@ -0,0 +1,20 @@ +FROM python:3.12-alpine +ENV CONNECTOR_TYPE=EXTERNAL_IMPORT + +# Copy the connector +COPY src /opt/opencti-connector-vmray-platform + +# Install Python modules +# hadolint ignore=DL3003 +RUN apk update && apk upgrade && \ + apk --no-cache add git build-base libmagic libffi-dev libxml2-dev libxslt-dev + +RUN cd /opt/opencti-connector-vmray-platform && \ + pip3 install --no-cache-dir -r requirements.txt && \ + apk del git build-base && \ + rm -rf /var/cache/apk/* + +# Expose and entrypoint +COPY entrypoint.sh / +RUN chmod +x /entrypoint.sh +ENTRYPOINT ["/entrypoint.sh"] diff --git a/external-import/vmray-platform/README.md b/external-import/vmray-platform/README.md new file mode 100644 index 00000000000..e74c5c72480 --- /dev/null +++ b/external-import/vmray-platform/README.md @@ -0,0 +1,138 @@ +# OpenCTI VMRay Platform Connector + +Table of Contents + +- [OpenCTI VMRay Platform Connector](#opencti-vmray-platform-connector) + - [Introduction](#introduction) + - [Installation](#installation) + - [Requirements](#requirements) + - [Configuration variables](#configuration-variables) + - [OpenCTI environment variables](#opencti-environment-variables) + - [Base connector environment variables](#base-connector-environment-variables) + - [VMRay Platform environment variables](#vmray-platform-environment-variables) + - [Deployment](#deployment) + - [Docker Deployment](#docker-deployment) + - [Manual Deployment](#manual-deployment) + - [Usage](#usage) + - [Behavior](#behavior) + - [Debugging](#debugging) + + +## Introduction +VMRay is an advanced malware sandbox and threat analysis platform used by hundreds of leading security teams worldwide, including Fortune 100 enterprises, government agencies, financial institutions, and MSSPs. By combining dynamic, evasion-resistant sandboxing with rich, reusable output, VMRay enables security teams to investigate unknown, advanced, and targeted threats, reduce analysis time, and build reliable, independent threat intelligence on the attacks that actually target their environment. + +This connector continuously ingests high-quality IOCs and analysis context from VMRay Platform into OpenCTI, including classifications, threat names, and other enriched observables derived from in-depth malware and phishing analysis. By bringing VMRay’s definitive verdicts and noise-free data into OpenCTI, security, IR, and threat intel teams can better correlate suspicious activity, prioritize investigations, and strengthen their overall detection and response workflows. + +## Installation + +### Requirements + +- OpenCTI Platform >= 6.9.0 + +## Configuration variables + +There are a number of configuration options, which are set either in `docker-compose.yml` (for Docker) or +in `config.yml` (for manual deployment). + +### OpenCTI environment variables + +Below are the parameters you'll need to set for OpenCTI: + +| Parameter | config.yml | Docker environment variable | Mandatory | Description | +|---------------|------------|-----------------------------|-----------|------------------------------------------------------| +| OpenCTI URL | url | `OPENCTI_URL` | Yes | The URL of the OpenCTI platform. | +| OpenCTI Token | token | `OPENCTI_TOKEN` | Yes | The default admin token set in the OpenCTI platform. | + +### Base connector environment variables + +Below are the parameters you'll need to set for running the connector properly: + +| Parameter | config.yml | Docker environment variable | Default | Mandatory | Description | +|-------------------|-----------------|-----------------------------|-----------------|-----------|---------------------------------------------------------------------------------------------| +| Connector ID | id | `CONNECTOR_ID` | / | Yes | A unique `UUIDv4` identifier for this connector instance. | | +| Connector Name | name | `CONNECTOR_NAME` | | Yes | Name of the connector. | +| Connector Scope | scope | `CONNECTOR_SCOPE` | | Yes | The scope or type of data the connector is importing, either a MIME type or Stix Object. | +| Log Level | log_level | `CONNECTOR_LOG_LEVEL` | info | Yes | Determines the verbosity of the logs. Options are `debug`, `info`, `warn`, or `error`. | +| Duration Period | duration_period | `CONNECTOR_DURATION_PERIOD` | PT1D | No | Determines the time interval between each launch of the connector in ISO 8601, ex: `PT30M`. | + +### VMRay Platform environment variables + +Below are the parameters you'll need to set for the connector: + +| Parameter | config.yml | Docker environment variable | Default | Mandatory | Description | +|----------------------------|----------------------------|---------------------------------------------|---------|-----------|---------------------------------------------------------------------------------------------------------------| +| VMRay Server | server | `VMRAY_SERVER` | https://cloud.vmray.com | Yes | VMRay Server URL | +| VMRay API Key | api_key | `VMRAY_API_KEY` | / | Yes | VMRay API Key | +| Inititla Fetch Date | initial_fetch_date | `VMRAY_INITIAL_FETCH_DATE` | YYYY-MM-DD | Yes | Fetch feeds from date (ex: 2025-09-09) | +| VMRay Sample Verdict | sample_verdict | `VMRAY_SAMPLE_VERDICT` | malicious | Yes | Samples can be pulled based on verdict. Supported values include malicious, suspicious +| VMRay IOCs Verdict | iocs_verdict | `VMRAY_IOCS_VERDICT` | malicious | Yes | IOCs can be pulled based on their verdict. Supported values include malicious, suspicious | +| VMRay Default TLP | default_tlp | `VMRAY_DEFAULT_TLP` | TLP:AMBER | Yes | TLP markings can be assigned in OpenCTI platform. Supported values include TLP:AMBER, TLP:RED, TLP:WHITE, TLP:GREEN | +| VMRay Threat Names color | threat_names_color | `VMRAY_THREAT_NAMES_COLOR` | #d60904 | Yes | Configurable color for threat names labels +| VMRay Classifications color | classifications_color | `VMRAY_CLASSIFICATIONS_COLOR` | #fa560a | Yes | Configurable color for family classifications labels +| VMRay VTI color | vti_color | `VMRAY_VTI_COLOR` | #40f5ef | Yes | Configurable color for VMRay Threat Identifier labels +| VMRay MITRE color | mitre_color | `VMRAY_MITRE_COLOR` | #a9f723 | Yes | Configurable color for MITRE Technique ID labels + +## Deployment + +### Docker Deployment + +Before building the Docker container, you need to set the version of pycti in `requirements.txt` equal to whatever +version of OpenCTI you're running. Example, `pycti==6.9.0`. If you don't, it will take the latest version, but +sometimes the OpenCTI SDK fails to initialize. + +Build a Docker Image using the provided `Dockerfile`. + +Example: + +```shell +# Replace the IMAGE NAME with the appropriate value +docker build . -t [IMAGE NAME]:latest +``` + +Make sure to replace the environment variables in `docker-compose.yml` with the appropriate configurations for your +environment. Then, start the docker container with the provided docker-compose.yml + +```shell +docker compose up -d +# -d for detached +``` + +### Manual Deployment + +Create a file `config.yml` based on the provided `config.yml.sample`. + +Replace the configuration variables (especially the "**ChangeMe**" variables) with the appropriate configurations for +you environment. + +Install the required python dependencies (preferably in a virtual environment): + +```shell +pip3 install -r requirements.txt +``` + +Then, start the connector from vmray-platform/src: + +```shell +python3 main.py +``` + +## Usage + +After Installation, the connector should require minimal interaction to use, and should update automatically at a regular interval specified in your `docker-compose.yml` or `config.yml` in `duration_period`. + +However, if you would like to force an immediate download of a new batch of entities, navigate to: + +`Data management` -> `Ingestion` -> `Connectors` in the OpenCTI platform. + +Find the connector, and click on the refresh button to reset the connector's state and force a new +download of data by re-running the connector. + +## Behavior + +The connector pulls feeds from VMRay Platform and ingests into OpenCTI. + + +## Debugging + +The connector can be debugged by setting the appropriate log level. +Note that logging messages can be added using `self.helper.connector_logger,{LOG_LEVEL}("Sample message")`, i.e., `self.helper.connector_logger.error("An error message")`. \ No newline at end of file diff --git a/external-import/vmray-platform/__metadata__/connector_manifest.json b/external-import/vmray-platform/__metadata__/connector_manifest.json new file mode 100644 index 00000000000..82a8af3b28a --- /dev/null +++ b/external-import/vmray-platform/__metadata__/connector_manifest.json @@ -0,0 +1,21 @@ +{ + "title": "VMRay Platform", + "slug": "vmray-platform", + "description": "VMRay provides advanced threat analysis and detection by integrating its unique agentless hypervisor-based sandbox with a real-time analysis. \n\nThis connector enables the ingestion and correlation of VMRay analysis data including IOCs, VTIs, and MITRE ATT&CK techniques for faster detection and response.", + "short_description": "VMRay provides advanced threat analysis and detection by integrating its unique agentless hypervisor-based sandbox with a real-time analysis.", + "logo": "external-import/vmray-platform/__metadata__/logo.png", + "use_cases": [ + "Commercial Threat Intel" + ], + "verified": false, + "last_verified_date": null, + "playbook_supported": false, + "max_confidence_level": 100, + "support_version": ">= 6.9.0", + "subscription_link": "https://www.vmray.com/vmray-platform/", + "source_code": "https://github.com/OpenCTI-Platform/connectors/tree/master/external-import/vmray-platform", + "manager_supported": false, + "container_version": "rolling", + "container_image": "opencti/connector-vmray-platform", + "container_type": "EXTERNAL_IMPORT" +} diff --git a/external-import/vmray-platform/__metadata__/logo.png b/external-import/vmray-platform/__metadata__/logo.png new file mode 100644 index 00000000000..659473247b3 Binary files /dev/null and b/external-import/vmray-platform/__metadata__/logo.png differ diff --git a/external-import/vmray-platform/docker-compose.yml b/external-import/vmray-platform/docker-compose.yml new file mode 100644 index 00000000000..b297cd9eec5 --- /dev/null +++ b/external-import/vmray-platform/docker-compose.yml @@ -0,0 +1,24 @@ +version: "3" + +services: + connector-vmray-platform: + image: opencti/connector-vmray-platform:latest + environment: + - OPENCTI_URL=http://opencti:8080 + - OPENCTI_TOKEN=ChangeMe(UUIDv4 token) + - CONNECTOR_ID=VMRay + - CONNECTOR_NAME=VMRay Platform + - CONNECTOR_SCOPE=VMRay + - CONNECTOR_LOG_LEVEL=info + - CONNECTOR_DURATION_PERIOD=P1D + - VMRAY_SERVER=https://cloud.vmray.com + - VMRAY_API_KEY=ChangeMe + - VMRAY_INITIAL_FETCH_DATE=2025-09-09 + - VMRAY_SAMPLE_VERDICT='malicious' + - VMRAY_IOCS_VERDICT='malicious' + - VMRAY_DEFAULT_TLP=TLP:AMBER + - VMRAY_THREAT_NAMES_COLOR=#d60904 + - VMRAY_CLASSIFICATIONS_COLOR=#fa560a + - VMRAY_VTI_COLOR=#40f5ef + - VMRAY_MITRE_COLOR=#a9f723 + restart: always \ No newline at end of file diff --git a/external-import/vmray-platform/entrypoint.sh b/external-import/vmray-platform/entrypoint.sh new file mode 100644 index 00000000000..12fc228b47f --- /dev/null +++ b/external-import/vmray-platform/entrypoint.sh @@ -0,0 +1,7 @@ +#!/bin/sh + +# Correct working directory +cd /opt/opencti-connector-vmray-platform + +# Start the connector +python3 main.py \ No newline at end of file diff --git a/external-import/vmray-platform/src/config.yml.sample b/external-import/vmray-platform/src/config.yml.sample new file mode 100644 index 00000000000..95a4aeba005 --- /dev/null +++ b/external-import/vmray-platform/src/config.yml.sample @@ -0,0 +1,23 @@ +opencti: + url: 'http://localhost:8080' + token: 'ChangeMe' + +connector: + id: 'VMRay' + type: 'EXTERNAL_IMPORT' + name: 'VMRay Platform' + scope: 'report, malware, indicator, attack-pattern, location' + log_level: 'info' + duration_period: 'P1D' # Interval given for scheduler process in ISO-8601 format + +vmray: + server: 'https://cloud.vmray.com' + api_key: 'ChangeMe' + sample_verdict: 'malicious' + iocs_verdict: 'malicious' + threat_names_color: '#d60904' + classifications_color: '#f76928' + vti_color: '#40f5ef' + mitre_color: '#a9f723' + default_tlp: 'TLP:AMBER' + initial_fetch_date: '2025-09-09' \ No newline at end of file diff --git a/external-import/vmray-platform/src/main.py b/external-import/vmray-platform/src/main.py new file mode 100644 index 00000000000..fe0ad339f95 --- /dev/null +++ b/external-import/vmray-platform/src/main.py @@ -0,0 +1,20 @@ +""" +Main entry point for the VMRay connector script. +Initializes the connector and runs it. +""" + +from sys import exit +from traceback import print_exc + +from vmray_connector import VMRayConnector + +if __name__ == "__main__": + # Entry point of the script + # print_exc(): Prints the exception traceback to stderr + # exit(1): Signals an error to the operating system + try: + connector = VMRayConnector() + connector.run() + except Exception: + print_exc() + exit(1) diff --git a/external-import/vmray-platform/src/requirements.txt b/external-import/vmray-platform/src/requirements.txt new file mode 100644 index 00000000000..95d36133aa4 --- /dev/null +++ b/external-import/vmray-platform/src/requirements.txt @@ -0,0 +1,3 @@ +pycti==6.9.0 +urllib3==2.5.0 +vmray-rest-api==6.0.0 diff --git a/external-import/vmray-platform/src/vmray_connector/__init__.py b/external-import/vmray-platform/src/vmray_connector/__init__.py new file mode 100644 index 00000000000..79210bcd94a --- /dev/null +++ b/external-import/vmray-platform/src/vmray_connector/__init__.py @@ -0,0 +1,9 @@ +""" +VMRay Connector package. + +Exports the main `VMRayConnector` class for use by external modules. +""" + +from .connector import VMRayConnector + +__all__ = ["VMRayConnector"] diff --git a/external-import/vmray-platform/src/vmray_connector/config_loader.py b/external-import/vmray-platform/src/vmray_connector/config_loader.py new file mode 100644 index 00000000000..276c89ad268 --- /dev/null +++ b/external-import/vmray-platform/src/vmray_connector/config_loader.py @@ -0,0 +1,101 @@ +""" +Config File +""" + +from pathlib import Path + +from pycti import get_config_variable +from yaml import safe_load + + +class ConfigConnector: + """ + Loads and initializes configuration settings for the VMRay connector. + Handles reading from a YAML config file and environment variables. + """ + + def __init__(self) -> None: + """ + Initialize the connector with necessary configurations + """ + # Load configuration file + self.load = self._load_config() + self._initialize_configurations() + + @staticmethod + def _load_config() -> dict: + """ + Load the configuration from the YAML file + :return: Configuration dictionary + """ + config_file_path = Path(__file__).parents[1].joinpath("config.yml") + if config_file_path.is_file(): + with open(config_file_path, encoding="utf-8") as f: + config = safe_load(f) or {} + else: + config = {} + return config + + def _initialize_configurations(self) -> None: + """ + Connector configuration variables + :return: None + """ + # OpenCTI configurations + self.duration_period = get_config_variable( + "CONNECTOR_DURATION_PERIOD", + ["connector", "duration_period"], + self.load, + ) + + # Connector extra parameters + self.vmray_base_url = get_config_variable( + "VMRAY_SERVER", + ["vmray", "server"], + self.load, + ) + self.vmray_api_key = get_config_variable( + "VMRAY_API_KEY", + ["vmray", "api_key"], + self.load, + ) + self.sample_verdict = get_config_variable( + "VMRAY_SAMPLE_VERDICT", + ["vmray", "sample_verdict"], + self.load, + ) + self.iocs_verdict = get_config_variable( + "VMRAY_IOCS_VERDICT", + ["vmray", "iocs_verdict"], + self.load, + ) + self.initial_fetch_date = get_config_variable( + "VMRAY_INITIAL_FETCH_DATE", + ["vmray", "initial_fetch_date"], + self.load, + ) + self.default_tlp = get_config_variable( + "VMRAY_DEFAULT_TLP", + ["vmray", "default_tlp"], + self.load, + ) + self.classifications_color = get_config_variable( + "VMRAY_MALICIO", + ["vmray", "classifications_color"], + self.load, + ) + self.threat_names_color = get_config_variable( + "VMRAY_THREAT_NAMES_COLOR", + ["vmray", "threat_names_color"], + self.load, + ) + self.vti_color = get_config_variable( + "VMRAY_VTI_COLOR", + ["vmray", "vti_color"], + self.load, + ) + self.mitre_color = get_config_variable( + "VMRAY_MITRE_COLOR", + ["vmray", "mitre_color"], + self.load, + ) diff --git a/external-import/vmray-platform/src/vmray_connector/connector.py b/external-import/vmray-platform/src/vmray_connector/connector.py new file mode 100644 index 00000000000..d34ede3f936 --- /dev/null +++ b/external-import/vmray-platform/src/vmray_connector/connector.py @@ -0,0 +1,1753 @@ +""" +Connect to VMRay and ingest feeds into OpenCTI. +""" + +from datetime import datetime, timezone +from logging import WARNING, getLogger +from re import match as re_match +from re import search +from sys import exit +from typing import Any, Callable, Dict, List, Optional, Set, Tuple +from uuid import uuid4 + +from pycti import AttackPattern as PyctiAttackPattern +from pycti import OpenCTIConnectorHelper +from pycti import Report as PyctiReport +from pycti import StixCoreRelationship +from requests.exceptions import ( + ConnectionError, + ProxyError, + RequestException, + SSLError, + Timeout, + TooManyRedirects, +) +from stix2 import AttackPattern, Relationship, Report +from vmray.rest_api import VMRayRESTAPI, VMRayRESTAPIError + +from .config_loader import ConfigConnector +from .utils import parse_to_vmray_datetime +from .vmray_observable_transform import VMRayObservableTransform +from .vmray_stix_builder import VMRaySTIXBuilder + +THREAT_NAMES_REGEX = r"^[a-zA-Z0-9\s]+$" + + +def build_vtis_lookup(threat_indicators: List[Dict[str, Any]]) -> Dict[int, dict]: + """ + Build a lookup dictionary for VTI threat indicators by analysis ID. + + Args: + threat_indicators (List[Dict[str, Any]]): List of threat indicator objects. + + Returns: + Dict[int, dict]: Lookup dictionary keyed by analysis IDs. + """ + lookup_dict: Dict[int, dict] = {} + + for threat_indicator in threat_indicators: + analysis_ids = threat_indicator.get("analysis_ids", []) + category = threat_indicator.get("category") + operation = threat_indicator.get("operation") + score = threat_indicator.get("score", 0) + + for analysis_id in analysis_ids: + entry = lookup_dict.setdefault( + analysis_id, + {"category_operation": {}, "operations": set(), "score": 0}, + ) + + if category and operation: + entry["category_operation"].setdefault(category, set()).add(operation) + entry["operations"].add(operation) + + if score > entry["score"]: + entry["score"] = score + + return lookup_dict + + +def build_mitre_lookup( + mitre_attack_techniques: List[Dict[str, Any]], +) -> Dict[int, dict]: + """ + Build a lookup dictionary for MITRE techniques by analysis ID. + + Args: + mitre_attack_techniques (List[Dict[str, Any]]): List of MITRE attack techniques. + + Returns: + Dict[int, dict]: Lookup dictionary keyed by analysis IDs. + """ + lookup_dict: Dict[int, dict] = {} + + for mitre_attack_technique in mitre_attack_techniques: + analysis_ids = mitre_attack_technique.get("analysis_ids", []) + technique_id = mitre_attack_technique.get("technique_id") + technique = mitre_attack_technique.get("technique") + tactics = mitre_attack_technique.get("tactics", []) + score = mitre_attack_technique.get("score", 0) + + for aid in analysis_ids: + entry = lookup_dict.setdefault( + aid, + { + "techniques": set(), + "technique_ids": set(), + "tactics": set(), + "score": 0, + }, + ) + + if technique: + entry["techniques"].add(technique) + + if technique_id: + entry["technique_ids"].add(technique_id) + + if tactics: + entry["tactics"].update(t.lower() for t in tactics) + + if score > entry["score"]: + entry["score"] = score + + return lookup_dict + + +def extract_vti_mitre_labels_by_analysis( + analysis_ids: List[int], + vti_lookup: Dict[int, dict], + mitre_lookup: Dict[int, dict], +) -> Tuple[List[str], List[str]]: + """ + Extract VTI and MITRE labels for a list of analysis IDs. + + Args: + analysis_ids (List[int]): List of VMRay analysis IDs. + vti_lookup (Dict[int, dict]): Lookup dict containing VTI info for each analysis ID. + mitre_lookup (Dict[int, dict]): Lookup dict containing MITRE technique info for each analysis ID. + + Returns: + tuple[list[str], list[str]]: A tuple containing List of VTI labels and MITRE labels. + """ + + mitre_labels, vti_labels = [], [] + unique_mitre_labels = set() + unique_vti_labels = set() + + for analysis_id in analysis_ids: + vti_entry = vti_lookup.get(analysis_id) + if not vti_entry: + continue + + for category, operations in vti_entry.get("category_operation", {}).items(): + for operation in operations: + lbl = f"{category}:{operation}".replace(" ", "_") + if lbl in unique_vti_labels: + continue + unique_vti_labels.add(lbl) + vti_labels.append(lbl) + + mitre_entry = mitre_lookup.get(analysis_id) + if not mitre_entry: + continue + + for technique_id in mitre_entry.get("technique_ids", []): + lbl = f"mitre:{technique_id}" + if lbl in unique_mitre_labels: + continue + unique_mitre_labels.add(lbl) + mitre_labels.append(lbl) + + return vti_labels, mitre_labels + + +def combine_labels( + vti: List[str], + mitre: List[str], + threat_names: Optional[List[str]] = None, + classifications: Optional[List[str]] = None, +) -> List[str]: + """ + Combine VTI, MITRE, threat names, and classifications into a single list of labels. + + Args: + vti (list[str]): VTI labels. + mitre (list[str]): MITRE labels. + threat_names (list[str], optional): Threat name labels. + classifications (list[str], optional): Classification labels. + + Returns: + list[str]: Combined list of labels. + """ + labels = [] + if threat_names: + labels += threat_names + if classifications: + labels += classifications + labels += vti or [] + labels += mitre or [] + return labels + + +def build_indicator_description( + analysis_ids: List[int], + vti_lookup: Dict[int, dict], + classifications: Optional[List[str]] = None, +) -> str: + """ + Build a textual description for an indicator based on classifications and VTI operations. + + Args: + classifications (List[str]): List of malware classifications associated with the IOC. + analysis_ids (List[int]): List of VMRay analysis IDs linked to this IOC. + vti_lookup (Dict[int, dict]): Lookup dict containing VTI operation info per analysis. + + Returns: + str: A human-readable description summarizing classifications and VTI operations. + """ + description = "This indicator originates from VMRay Platform." + if classifications: + description += ( + f" It was observed with a classification of {', '.join(classifications)}." + ) + description += "Following detections and observations were recorded:\n" + + operations_list: List[str] = [] + + if vti_lookup: + for aid in analysis_ids: + vti_entry = vti_lookup.get(aid) + if vti_entry: + operations_list.extend(vti_entry.get("operations", [])) + + if operations_list: + bullet_points = "\n".join( + f"* {op.lower()}" for op in dict.fromkeys(operations_list) + ) + return description + bullet_points + + return description + "* No VTI operations observed" + + +def build_killchain_and_confidence( + analysis_ids: List[int], + mitre_lookup: Dict[int, Dict], + vti_lookup: Dict[int, Dict], +) -> Tuple[List[Dict[str, str]], int]: + kill_chain_phases: List[Dict[str, str]] = [] + max_confidence_score: int = 0 + + if not analysis_ids: + return [], 0 + + for aid in analysis_ids: + mitre_entry = mitre_lookup.get(aid) if mitre_lookup else None + if mitre_entry: + for tactic in mitre_entry.get("tactics", []): + kill_chain_phases.append( + { + "kill_chain_name": "mitre-attack", + "phase_name": tactic.lower().replace(" ", "-"), + } + ) + + vti_entry = vti_lookup.get(aid) if vti_lookup else None + if vti_entry: + max_confidence_score = max(max_confidence_score, vti_entry.get("score", 0)) + + return kill_chain_phases, max_confidence_score * 20 + + +def build_parent_child_map(samples: List[Dict[str, Any]]) -> Dict[int, List[int]]: + """ + Build a mapping of parent sample IDs to their child sample IDs. + + Args: + samples (List[Dict[str, Any]]): List of sample dictionaries. + + Returns: + Dict[int, List[int]]: Mapping from parent sample ID to list of child sample IDs. + """ + mapping = {} + for sample in samples: + parent_id = sample.get("sample_id") + if parent_id is None: + continue + child_ids = sample.get("sample_child_sample_ids") or [] + if child_ids: + mapping[parent_id] = [c for c in child_ids if c is not None] + return mapping + + +class VMRayConnector: + """ + Class to manage VMRay interactions. + """ + + def __init__(self) -> None: + """Initialize connector and load configuration.""" + + self.config = ConfigConnector() + self.helper = OpenCTIConnectorHelper(self.config.load) + self.logger = self.helper.connector_logger + getLogger("api").setLevel(WARNING) + + # vmray configurations + self.vmray_base_url = self.config.vmray_base_url + self.vmray_api_key = self.config.vmray_api_key + self.vmray_initial_fetch_date = self.config.initial_fetch_date + self.duration_period = self.config.duration_period + self.sample_verdict = [ + v.strip() for v in self.config.sample_verdict.split(",") if v.strip() + ] + self.iocs_verdict = [ + v.strip() for v in self.config.iocs_verdict.split(",") if v.strip() + ] + default_tlp = self.config.default_tlp.strip().upper() + tlp_filter = { + "mode": "and", + "filters": [ + {"key": "definition", "operator": "eq", "values": [default_tlp]} + ], + "filterGroups": [], + } + marking_definition = self.helper.api.marking_definition.read(filters=tlp_filter) + if not marking_definition: + raise ValueError(f"TLP marking not found for: {default_tlp}") + self.default_markings = [marking_definition["standard_id"]] + self.threat_names_color = self.config.threat_names_color + self.classifications_color = self.config.classifications_color + self.vti_color = self.config.vti_color + self.mitre_color = self.config.mitre_color + + self.identity = self.helper.api.identity.create( + type="Organization", + name="VMRay", + description="Threat intelligence from VMRay Platform", + x_opencti_reliability="A - Completely reliable", + )["standard_id"] + + self.vmray_analyzer_client = VMRayRESTAPI( + server=self.vmray_base_url, + api_key=self.vmray_api_key, + connector_name=self.helper.connect_name, + ) + self.vmray_headers = { + "Authorization": f"api_key {self.vmray_api_key}", + "User-Agent": "OpenCTI", + "Accept": "application/json", + } + + self.stix_builder = VMRaySTIXBuilder( + identity=self.identity, + default_markings=self.default_markings, + helper=self.helper, + threat_names_color=self.threat_names_color, + classifications_color=self.classifications_color, + vti_color=self.vti_color, + mitre_color=self.mitre_color, + ) + + def get_submissions_by_timestamp(self) -> List[Dict]: + """ + Fetch all submissions from VMRay within the configured time window. + + Returns: + List[Dict]: List of submissions. + """ + all_submissions: List[Dict] = [] + params = {"submission_finish_time": f"{self.from_date}~{self.to_date}"} + + try: + submission_data = self.vmray_analyzer_client.call( + "GET", "/rest/submission", params=params + ) + if submission_data: + all_submissions.extend(submission_data) + except VMRayRESTAPIError as e: + self.logger.error( + f"[VMRay] Submission API error: '{e}' (HTTP {e.status_code})" + ) + except (Timeout, ConnectionError, SSLError, ProxyError, TooManyRedirects) as e: + self.logger.error( + f"[VMRay] Network error while fetching submissions: {type(e).__name__}: {e}" + ) + except RequestException as e: + self.logger.error(f"[VMRay] Request error while fetching submissions: {e}") + except Exception as e: + self.logger.error(f"[VMRay] Unexpected error fetching submissions: {e}") + + return all_submissions + + def get_sample(self, sample_id: int) -> Optional[Dict]: + """ + Fetch details of a sample by its ID. + + Args: + sample_id (int): The unique identifier of the sample. + + Returns: + Optional[Dict]: Sample details if found, otherwise None. + """ + try: + sample_data = self.vmray_analyzer_client.call( + "GET", f"/rest/sample/{sample_id}" + ) + return sample_data + except VMRayRESTAPIError as e: + self.logger.error( + f"[VMRay] Error fetching sample {sample_id}: '{e}' (HTTP {e.status_code})" + ) + except (Timeout, ConnectionError, SSLError, ProxyError, TooManyRedirects) as e: + self.logger.error( + f"[VMRay] Network error fetching sample {sample_id}: {type(e).__name__}: {e}" + ) + except RequestException as e: + self.logger.error(f"[VMRay] Request error fetching sample {sample_id}: {e}") + except Exception as e: + self.logger.error( + f"[VMRay] Unexpected error fetching sample {sample_id}: {e}" + ) + return None + + def get_samples_by_verdict(self, sample_ids: Set[int]) -> List[Dict]: + """ + Fetch samples and filter them based on the configured sample verdicts. + + Args: + sample_ids (Set[int]): Set of sample IDs to fetch. + + Returns: + List[Dict]: List of samples that match the configured verdicts. + """ + self.logger.info( + f"[VMRay] Fetching samples for verdict filtering: {sample_ids}" + ) + + samples_by_verdict = [] + for sample_id in sample_ids: + sample = self.get_sample(sample_id) + if sample is None: + continue + + verdict = sample.get("sample_verdict") or sample.get("verdict") + + if verdict in self.sample_verdict: + samples_by_verdict.append(sample) + + self.logger.info( + f"[VMRay] Total samples passing verdict: {len(samples_by_verdict)}" + ) + + return samples_by_verdict + + def get_sample_iocs(self, sample_id: int) -> Optional[Dict]: + """ + Fetch all IOCs for a given sample ID. + + Args: + sample_id (int): The sample ID to fetch IOCs for. + + Returns: + Optional[Dict]: Dictionary of IOCs grouped by type or None. + """ + try: + sample_iocs_data = self.vmray_analyzer_client.call( + "GET", f"/rest/sample/{sample_id}/iocs" + ) + return sample_iocs_data + except VMRayRESTAPIError as e: + self.logger.error( + f"[VMRay] Error fetching IOCs for sample {sample_id}: '{e}' (HTTP {e.status_code})" + ) + except (Timeout, ConnectionError, SSLError, ProxyError, TooManyRedirects) as e: + self.logger.error( + f"[VMRay] Network error fetching IOCs for sample {sample_id}: {type(e).__name__}: {e}" + ) + except RequestException as e: + self.logger.error( + f"[VMRay] Request error fetching IOCs for sample {sample_id}: {e}" + ) + except Exception as e: + self.logger.error( + f"[VMRay] Unexpected error fetching IOCs for sample {sample_id}: {e}" + ) + return None + + def get_sample_iocs_by_verdict(self, sample_id: int) -> Dict[str, List[Dict]]: + """ + Fetch IOCs for a sample and filter them by configured IOC verdicts. + + Args: + sample_id (int): The sample ID. + + Returns: + Dict[str, List[Dict]]: Filtered IOCs grouped by IOC type. + """ + self.logger.info(f"[VMRay] Fetching IOCs for sample {sample_id}") + + ioc_response = self.get_sample_iocs(sample_id) + if not ioc_response: + self.logger.warning(f"[VMRay] IOC API returned EMPTY for {sample_id}") + return {} + + raw_iocs = ioc_response.get("iocs", {}) + + if not raw_iocs: + return {} + + filtered_iocs = {} + allowed_verdicts = {v.lower() for v in self.iocs_verdict} + for ioc_type, ioc_list in raw_iocs.items(): + ioc_list[:] = [ + ioc + for ioc in ioc_list + if ioc.get("verdict", "").lower() in allowed_verdicts + ] + if ioc_list: + filtered_iocs[ioc_type] = ioc_list + + return filtered_iocs + + def fetch_sample_vtis(self, sample_id: int) -> List[Dict[str, Any]]: + """ + Fetch VMRay Threat Indicators (VTIs) for a sample. + + Args: + sample_id (int): The sample ID. + + Returns: + List[Dict[str, Any]]: List of threat indicators. + """ + try: + vtis_data = self.vmray_analyzer_client.call( + "GET", f"/rest/sample/{sample_id}/vtis" + ) + threat_indicators = vtis_data.get("threat_indicators", []) + self.logger.info( + f"[VMRay] Retrieved {len(threat_indicators)} VTIs for sample {sample_id}" + ) + return threat_indicators or [] + + except VMRayRESTAPIError as e: + self.logger.error( + f"[VMRay] No VTIs for sample {sample_id}: '{e}' (HTTP {e.status_code})" + ) + except (Timeout, ConnectionError, SSLError, ProxyError, TooManyRedirects) as e: + self.logger.error( + f"[VMRay] Network error fetching VTIs for sample {sample_id}: {type(e).__name__}: {e}" + ) + except RequestException as e: + self.logger.error( + f"[VMRay] Request error fetching VTIs for sample {sample_id}: {e}" + ) + except Exception as e: + self.logger.error( + f"[VMRay] Unexpected error fetching VTIs for sample {sample_id}: {e}" + ) + return [] + + def fetch_sample_mitre_attacks(self, sample_id: int) -> List[Dict[str, Any]]: + """ + Fetch MITRE ATT&CK techniques for a sample. + + Args: + sample_id (int): The sample ID. + + Returns: + List[Dict[str, Any]]: List of MITRE techniques. + """ + try: + vtis_data = self.vmray_analyzer_client.call( + "GET", f"/rest/sample/{sample_id}/mitre_attack" + ) + mitre_attack_techniques = vtis_data.get("mitre_attack_techniques", []) + self.logger.info( + f"[VMRay] Retrieved {len(mitre_attack_techniques)} MITRE ATT&CK techniques for sample {sample_id}" + ) + return mitre_attack_techniques or [] + + except VMRayRESTAPIError as e: + self.logger.error( + f"[VMRay] No MITRE ATT&CK techniques for sample {sample_id}: '{e}' (HTTP {e.status_code})" + ) + except (Timeout, ConnectionError, SSLError, ProxyError, TooManyRedirects) as e: + self.logger.error( + f"[VMRay] Network error fetching MITRE ATT&CK for sample {sample_id}: {type(e).__name__}: {e}" + ) + except RequestException as e: + self.logger.error( + f"[VMRay] Request error fetching MITRE ATT&CK for sample {sample_id}: {e}" + ) + except Exception as e: + self.logger.error( + f"[VMRay] Unexpected error fetching MITRE ATT&CK for sample {sample_id}: {e}" + ) + return [] + + def build_colored_labels( + self, + *, + threat_names: Optional[List[str]] = None, + classifications: Optional[List[str]] = None, + mitre_labels: Optional[List[str]] = None, + vti_labels: Optional[List[str]] = None, + ) -> Dict[str, List[str]]: + """ + Build colored labels for threat names, classifications, MITRE, and VTI. + + Args: + threat_names (List[str] | None): List of threat names. + classifications (List[str] | None): List of classifications. + mitre_labels (List[str] | None): List of MITRE labels. + vti_labels (List[str] | None): List of VTI labels. + + Returns: + Dict[str, List[str]]: Dictionary containing colored labels. + """ + threat_names = threat_names or [] + classifications = classifications or [] + mitre_labels = mitre_labels or [] + vti_labels = vti_labels or [] + colored_labels: Dict[str, List[str]] = { + "threat_names": [], + "classifications": [], + "vti": [], + "mitre": [], + } + + colored_labels["threat_names"] = [ + self.helper.api.label.create( + value=threat_name, color=self.threat_names_color + ).get("value", threat_name) + for threat_name in threat_names + ] + colored_labels["classifications"] = [ + self.helper.api.label.create( + value=classification, color=self.classifications_color + ).get("value", classification) + for classification in classifications + ] + + colored_labels["vti"] = [ + self.helper.api.label.create(value=vti_label, color=self.vti_color).get( + "value", vti_label + ) + for vti_label in vti_labels + ] + + colored_labels["mitre"] = [ + self.helper.api.label.create(value=mitre_label, color=self.mitre_color).get( + "value", mitre_label + ) + for mitre_label in mitre_labels + ] + + return colored_labels + + def handle_file_iocs( + self, + file_iocs: List[Dict[str, Any]], + vti_lookup: Dict[int, dict], + mitre_lookup: Dict[int, dict], + ) -> List[Any]: + """ + Process file IOCs and create corresponding STIX objects. + + Args: + file_iocs (List[Dict[str, Any]]): List of file IOC entries retrieved from VMRay. + vti_lookup (Dict[int, dict]): Lookup table for VTI data mapped by analysis ID. + mitre_lookup (Dict[int, dict]): Lookup table for MITRE ATT&CK technique mappings. + + Returns: + list[Any]: A list containing File Observables, + Indicators, Relationships, Malware SDOs and their relationships + """ + observables: List[Any] = [] + for file_ioc in file_iocs: + analysis_ids = file_ioc.get("analysis_ids", []) + all_threat_names = file_ioc.get("threat_names", []) + threat_names = [ + t + for t in all_threat_names + if isinstance(t, str) and re_match(THREAT_NAMES_REGEX, t) + ] + classifications = file_ioc.get("classifications", []) + vti_labels, mitre_labels = extract_vti_mitre_labels_by_analysis( + analysis_ids, vti_lookup, mitre_lookup + ) + colored = self.build_colored_labels( + threat_names=threat_names, + classifications=classifications, + mitre_labels=mitre_labels, + vti_labels=vti_labels, + ) + all_labels = combine_labels( + vti=colored["vti"], + mitre=colored["mitre"], + threat_names=colored.get("threat_names"), + classifications=colored.get("classifications"), + ) + kill_chain_phases, confidence = build_killchain_and_confidence( + analysis_ids, mitre_lookup, vti_lookup + ) + + vmray_hashes = {} + if file_ioc.get("hashes"): + hash_obj = file_ioc["hashes"][0] + for vmray_field, stix_key in [ + ("md5_hash", "MD5"), + ("sha1_hash", "SHA1"), + ("sha256_hash", "SHA256"), + ]: + if hash_obj.get(vmray_field): + vmray_hashes[stix_key] = hash_obj[vmray_field] + file_obs = VMRayObservableTransform( + observable_type="file", + observable_value=file_ioc.get("filename", "unknown"), + labels=colored["threat_names"] + colored["classifications"], + created_by_ref=self.identity, + score=confidence, + description="Primary File IOC from VMRay", + observable={ + "hashes": vmray_hashes, + "filename": file_ioc.get("filename", "unknown"), + "size": file_ioc.get("file_size"), + "mime_type": file_ioc.get("mime_type"), + }, + markings=self.default_markings, + ) + observable_obj = file_obs.stix_observable + if not observable_obj: + continue + observables.append(observable_obj) + description = build_indicator_description( + analysis_ids, vti_lookup, classifications + ) + kill_chain_phases, confidence = build_killchain_and_confidence( + analysis_ids, mitre_lookup, vti_lookup + ) + indicator, rel = self.stix_builder.create_indicator_from_observable( + observable=observable_obj, + labels=all_labels, + created_by_ref=self.identity, + kill_chain_phases=kill_chain_phases, + confidence=confidence, + description=description, + score=confidence, + ) + observables.append(indicator) + observables.append(rel) + observables += self.stix_builder.create_malware_objects_for_threat_names( + threat_names, + classifications, + indicator, + file_obs, + labels=colored["classifications"], + ) + return observables + + def handle_processes_iocs( + self, + process_iocs: List[Dict[str, Any]], + vti_lookup: Dict[int, dict], + mitre_lookup: Dict[int, dict], + ) -> List[Any]: + """ + Handle process IOCs and build all associated STIX objects. + """ + observables: List[Any] = [] + + for process_ioc in process_iocs: + cmd_line = process_ioc.get("cmd_line") + all_threat_names = process_ioc.get("threat_names", []) + analysis_ids = process_ioc.get("analysis_ids", []) + threat_names = [ + t + for t in all_threat_names + if isinstance(t, str) and re_match(THREAT_NAMES_REGEX, t) + ] + classifications = process_ioc.get("classifications", []) + vti_labels, mitre_labels = extract_vti_mitre_labels_by_analysis( + analysis_ids, vti_lookup, mitre_lookup + ) + colored = self.build_colored_labels( + threat_names=threat_names, + classifications=classifications, + mitre_labels=mitre_labels, + vti_labels=vti_labels, + ) + all_labels = combine_labels( + vti=colored["vti"], + mitre=colored["mitre"], + threat_names=colored.get("threat_names"), + classifications=colored.get("classifications"), + ) + kill_chain_phases, confidence = build_killchain_and_confidence( + analysis_ids, mitre_lookup, vti_lookup + ) + + # Create process observable + process_obs = VMRayObservableTransform( + observable_type="process", + observable_value=cmd_line, + labels=colored["threat_names"] + colored["classifications"], + description="Primary Process IOC from VMRay", + created_by_ref=self.identity, + score=confidence, + markings=self.default_markings, + observable=process_ioc, + ) + + observable_obj = process_obs.stix_observable + if not observable_obj: + continue + observables.append(observable_obj) + + description = build_indicator_description( + analysis_ids, vti_lookup, classifications + ) + + indicator, rel = self.stix_builder.create_indicator_from_observable( + observable=observable_obj, + labels=all_labels, + created_by_ref=self.identity, + kill_chain_phases=kill_chain_phases, + confidence=confidence, + description=description, + score=confidence, + ) + observables.append(indicator) + observables.append(rel) + + observables += self.stix_builder.create_malware_objects_for_threat_names( + threat_names, + classifications, + indicator, + process_obs, + labels=colored["classifications"], + ) + + return observables + + def handle_domain_iocs( + self, + domain_iocs: List[Dict[str, Any]], + vti_lookup: Dict[int, dict], + mitre_lookup: Dict[int, dict], + ) -> List[Any]: + """ + Process Domain IOCs and build all associated STIX objects. + """ + observables: List[Any] = [] + + for domain_ioc in domain_iocs: + domain = domain_ioc.get("domain") + analysis_ids = domain_ioc.get("analysis_ids", []) + + labels = [] + vti_labels, mitre_labels = extract_vti_mitre_labels_by_analysis( + analysis_ids, vti_lookup, mitre_lookup + ) + colored = self.build_colored_labels( + mitre_labels=mitre_labels, + vti_labels=vti_labels, + ) + all_labels = combine_labels(vti=colored["vti"], mitre=colored["mitre"]) + kill_chain_phases, confidence = build_killchain_and_confidence( + analysis_ids, mitre_lookup, vti_lookup + ) + for protocol in domain_ioc.get("protocols", []): + labels.append(f"protocol: {protocol}") + + domain_obs = VMRayObservableTransform( + observable_type="domain", + observable_value=domain_ioc["domain"], + description="Primary Domain IOC from VMRay", + labels=labels, + created_by_ref=self.identity, + score=confidence, + markings=self.default_markings, + ) + if domain: + labels.append(domain) + observable_obj = domain_obs.stix_observable + if not observable_obj: + continue + observables.append(observable_obj) + + description = build_indicator_description( + analysis_ids=analysis_ids, vti_lookup=vti_lookup + ) + + indicator, rel = self.stix_builder.create_indicator_from_observable( + observable=observable_obj, + labels=all_labels, + created_by_ref=self.identity, + kill_chain_phases=kill_chain_phases, + confidence=confidence, + description=description, + score=confidence, + ) + observables.append(indicator) + observables.append(rel) + + observables += ( + self.stix_builder.create_related_obs_for_domain_url_originals( + indicator, + domain_obs, + domain_ioc.get("original_domains", []), + "domain", + labels, + score=confidence, + ) + ) + + # IP addresses + for ip in domain_ioc.get("ip_addresses", []): + ip_obs = VMRayObservableTransform( + observable_type="ip", + observable_value=ip, + description="IP IOC from VMRay", + labels=labels, + created_by_ref=self.identity, + score=confidence, + markings=self.default_markings, + ) + ip_obj = ip_obs.stix_observable + if ip_obj: + observables.append(ip_obj) + rel_ip = ip_obs.create_relationship( + src_id=indicator.id, + tgt_id=ip_obj.id, + markings=self.default_markings, + rel_type="based-on", + ) + observables.append(rel_ip) + + observables += self.stix_builder.create_location_objects( + indicator, + domain_obs, + domain_ioc.get("countries", []), + domain_ioc.get("country_codes", []), + labels, + ) + + return observables + + def handle_url_iocs( + self, + url_iocs: List[Dict[str, Any]], + vti_lookup: Dict[int, dict], + mitre_lookup: Dict[int, dict], + ) -> List[Any]: + """ + Process URL IOCs and build all associated STIX objects. + """ + observables: List[Any] = [] + + for url_ioc in url_iocs: + url = url_ioc.get("url") + + analysis_ids = url_ioc.get("analysis_ids", []) + labels = [] + vti_labels, mitre_labels = extract_vti_mitre_labels_by_analysis( + analysis_ids, vti_lookup, mitre_lookup + ) + colored = self.build_colored_labels( + mitre_labels=mitre_labels, + vti_labels=vti_labels, + ) + all_labels = combine_labels(vti=colored["vti"], mitre=colored["mitre"]) + kill_chain_phases, confidence = build_killchain_and_confidence( + analysis_ids, mitre_lookup, vti_lookup + ) + + url_obs = VMRayObservableTransform( + observable_type="url", + observable_value=url_ioc["url"], + description="Primary URL IOC from VMRay", + labels=None, + created_by_ref=self.identity, + score=confidence, + markings=self.default_markings, + ) + observable_obj = url_obs.stix_observable + if not observable_obj: + continue + + observables.append(observable_obj) + description = build_indicator_description( + analysis_ids=analysis_ids, vti_lookup=vti_lookup + ) + + indicator, rel = self.stix_builder.create_indicator_from_observable( + observable=observable_obj, + labels=all_labels, + created_by_ref=self.identity, + kill_chain_phases=kill_chain_phases, + confidence=confidence, + description=description, + score=confidence, + ) + observables.append(indicator) + observables.append(rel) + if url: + labels.append(url) + observables += ( + self.stix_builder.create_related_obs_for_domain_url_originals( + indicator, + url_obs, + url_ioc.get("original_urls", []), + "url", + labels, + score=confidence, + ) + ) + + # IP addresses + for ip in url_ioc.get("ip_addresses", []): + ip_obs = VMRayObservableTransform( + observable_type="ip", + observable_value=ip, + description="IP IOC from VMRay", + labels=labels, + created_by_ref=self.identity, + score=confidence, + markings=self.default_markings, + ) + ip_obj = ip_obs.stix_observable + if ip_obj: + observables.append(ip_obj) + rel_ip = ip_obs.create_relationship( + src_id=indicator.id, + tgt_id=ip_obj.id, + markings=self.default_markings, + rel_type="based-on", + ) + observables.append(rel_ip) + + # Countries + observables += self.stix_builder.create_location_objects( + indicator, + url_obs, + url_ioc.get("countries", []), + url_ioc.get("country_codes", []), + labels, + ) + + return observables + + def handle_mutexes_iocs( + self, + mutex_iocs: List[Dict[str, Any]], + vti_lookup: Dict[int, dict], + mitre_lookup: Dict[int, dict], + ) -> List[Any]: + """ + Process Mutex IOCs and build all associated STIX objects. + """ + observables: List[Any] = [] + + for mutex_ioc in mutex_iocs: + analysis_ids = mutex_ioc.get("analysis_ids", []) + all_threat_names = mutex_ioc.get("threat_names", []) + threat_names = [ + t + for t in all_threat_names + if isinstance(t, str) and re_match(THREAT_NAMES_REGEX, t) + ] + classifications = mutex_ioc.get("classifications", []) + + vti_labels, mitre_labels = extract_vti_mitre_labels_by_analysis( + analysis_ids, vti_lookup, mitre_lookup + ) + colored = self.build_colored_labels( + threat_names=threat_names, + classifications=classifications, + mitre_labels=mitre_labels, + vti_labels=vti_labels, + ) + all_labels = combine_labels( + vti=colored["vti"], + mitre=colored["mitre"], + threat_names=colored.get("threat_names"), + classifications=colored.get("classifications"), + ) + kill_chain_phases, confidence = build_killchain_and_confidence( + analysis_ids, mitre_lookup, vti_lookup + ) + + mutex_obs = VMRayObservableTransform( + observable_type="mutex", + observable_value=mutex_ioc["mutex_name"], + labels=colored["threat_names"] + colored["classifications"], + description="Primary Mutex IOC from VMRay", + created_by_ref=self.identity, + score=confidence, + observable=mutex_ioc, + markings=self.default_markings, + ) + observable_obj = mutex_obs.stix_observable + if not observable_obj: + continue + observables.append(observable_obj) + + description = build_indicator_description( + analysis_ids, vti_lookup, classifications + ) + + indicator, rel = self.stix_builder.create_indicator_from_observable( + observable=observable_obj, + labels=all_labels, + created_by_ref=self.identity, + kill_chain_phases=kill_chain_phases, + confidence=confidence, + description=description, + score=confidence, + ) + observables.append(indicator) + observables.append(rel) + + # Threat Names + observables += self.stix_builder.create_malware_objects_for_threat_names( + threat_names, + classifications, + indicator, + mutex_obs, + labels=colored["classifications"], + ) + + return observables + + def handle_registry_iocs( + self, + reg_iocs: List[Dict[str, Any]], + vti_lookup: Dict[int, dict], + mitre_lookup: Dict[int, dict], + ) -> List[Any]: + """ + Process Registry Key IOCs and build all associated STIX objects. + """ + observables: List[Any] = [] + + for reg_ioc in reg_iocs: + + analysis_ids = reg_ioc.get("analysis_ids", []) + all_threat_names = reg_ioc.get("threat_names", []) + threat_names = [ + t + for t in all_threat_names + if isinstance(t, str) and re_match(THREAT_NAMES_REGEX, t) + ] + classifications = reg_ioc.get("classifications", []) + + vti_labels, mitre_labels = extract_vti_mitre_labels_by_analysis( + analysis_ids, vti_lookup, mitre_lookup + ) + colored = self.build_colored_labels( + threat_names=threat_names, + classifications=classifications, + mitre_labels=mitre_labels, + vti_labels=vti_labels, + ) + all_labels = combine_labels( + vti=colored["vti"], + mitre=colored["mitre"], + threat_names=colored.get("threat_names"), + classifications=colored.get("classifications"), + ) + kill_chain_phases, confidence = build_killchain_and_confidence( + analysis_ids, mitre_lookup, vti_lookup + ) + + reg_obs = VMRayObservableTransform( + observable_type="registry", + observable_value=reg_ioc["reg_key_name"], + description="Primary Registry Key IOC from VMRay", + labels=colored["threat_names"] + colored["classifications"], + created_by_ref=self.identity, + score=confidence, + markings=self.default_markings, + observable=reg_ioc, + ) + observable_obj = reg_obs.stix_observable + if not observable_obj: + continue + observables.append(observable_obj) + + description = build_indicator_description( + analysis_ids, vti_lookup, classifications + ) + + indicator, rel = self.stix_builder.create_indicator_from_observable( + observable=observable_obj, + labels=all_labels, + created_by_ref=self.identity, + kill_chain_phases=kill_chain_phases, + confidence=confidence, + description=description, + score=confidence, + ) + observables.append(indicator) + observables.append(rel) + + # Threat Names + observables += self.stix_builder.create_malware_objects_for_threat_names( + threat_names, + classifications, + indicator, + reg_obs, + labels=colored["classifications"], + ) + + return observables + + def handle_email_iocs( + self, + email_iocs: List[Dict[str, Any]], + vti_lookup: Dict[int, dict], + mitre_lookup: Dict[int, dict], + ) -> List[Any]: + """ + Process Email Address IOCs and build all associated STIX objects. + """ + observables: List[Any] = [] + + for email_ioc in email_iocs: + analysis_ids = email_ioc.get("analysis_ids", []) + all_threat_names = email_ioc.get("threat_names", []) + threat_names = [ + t + for t in all_threat_names + if isinstance(t, str) and re_match(THREAT_NAMES_REGEX, t) + ] + classifications = email_ioc.get("classifications", []) + + vti_labels, mitre_labels = extract_vti_mitre_labels_by_analysis( + analysis_ids, vti_lookup, mitre_lookup + ) + colored = self.build_colored_labels( + threat_names=threat_names, + classifications=classifications, + mitre_labels=mitre_labels, + vti_labels=vti_labels, + ) + all_labels = combine_labels( + vti=colored["vti"], + mitre=colored["mitre"], + threat_names=colored.get("threat_names"), + classifications=colored.get("classifications"), + ) + subject_label = ( + f"subject:{email_ioc['subject']}" if email_ioc.get("subject") else None + ) + email_labels = colored["threat_names"] + colored["classifications"] + if subject_label: + email_labels.append(subject_label) + email_address = search(r"<(.+?)>", email_ioc["sender"]) + observable_value = ( + email_address.group(1) if email_address else email_ioc["sender"] + ) + kill_chain_phases, confidence = build_killchain_and_confidence( + analysis_ids, mitre_lookup, vti_lookup + ) + + email_obs = VMRayObservableTransform( + observable_type="email_address", + observable_value=observable_value, + description="Primary Email IOC from VMRay", + labels=email_labels, + created_by_ref=self.identity, + score=confidence, + markings=self.default_markings, + observable=email_ioc, + ) + observable_obj = email_obs.stix_observable + if not observable_obj: + continue + observables.append(observable_obj) + + description = build_indicator_description( + analysis_ids, vti_lookup, classifications + ) + + indicator, rel = self.stix_builder.create_indicator_from_observable( + observable=observable_obj, + labels=all_labels, + created_by_ref=self.identity, + kill_chain_phases=kill_chain_phases, + confidence=confidence, + description=description, + score=confidence, + ) + observables.append(indicator) + observables.append(rel) + + # Threat Names + observables += self.stix_builder.create_malware_objects_for_threat_names( + threat_names, + classifications, + indicator, + email_obs, + labels=colored["classifications"], + ) + + return observables + + def handle_ip_iocs( + self, + ip_iocs: List[Dict[str, Any]], + vti_lookup: Dict[int, dict], + mitre_lookup: Dict[int, dict], + ) -> List[Any]: + """ + Process IP IOCs and build all associated STIX objects. + """ + observables: List[Any] = [] + + for ip_ioc in ip_iocs: + ip = ip_ioc.get("ip_address") + analysis_ids = ip_ioc.get("analysis_ids", []) + labels = [] + vti_labels, mitre_labels = extract_vti_mitre_labels_by_analysis( + analysis_ids, vti_lookup, mitre_lookup + ) + colored = self.build_colored_labels( + mitre_labels=mitre_labels, + vti_labels=vti_labels, + ) + all_labels = combine_labels(vti=colored["vti"], mitre=colored["mitre"]) + kill_chain_phases, confidence = build_killchain_and_confidence( + analysis_ids, mitre_lookup, vti_lookup + ) + + for protocol in ip_ioc.get("protocols", []): + labels.append(f"protocol: {protocol}") + + ip_obs = VMRayObservableTransform( + observable_type="ip", + observable_value=ip_ioc["ip_address"], + description="Primary IP IOC from VMRay", + labels=labels, + created_by_ref=self.identity, + score=confidence, + markings=self.default_markings, + ) + observable_obj = ip_obs.stix_observable + if not observable_obj: + continue + if ip: + labels.append(ip) + observables.append(observable_obj) + + description = build_indicator_description( + analysis_ids=analysis_ids, vti_lookup=vti_lookup + ) + indicator, rel = self.stix_builder.create_indicator_from_observable( + observable=observable_obj, + labels=all_labels, + created_by_ref=self.identity, + kill_chain_phases=kill_chain_phases, + confidence=confidence, + description=description, + score=confidence, + ) + observables.append(indicator) + observables.append(rel) + + # Domain + for domain in ip_ioc.get("domains", []): + domain_obs = VMRayObservableTransform( + observable_type="domain", + observable_value=domain, + description="Domain IOC from VMRay", + labels=labels, + created_by_ref=self.identity, + score=confidence, + markings=self.default_markings, + ) + domain_obj = domain_obs.stix_observable + if domain_obj: + observables.append(domain_obj) + rel_domain = ip_obs.create_relationship( + src_id=indicator.id, + tgt_id=domain_obj.id, + markings=self.default_markings, + rel_type="based-on", + ) + observables.append(rel_domain) + + # Countries + observables += self.stix_builder.create_location_objects( + indicator, + ip_obs, + ip_ioc.get("countries", []), + ip_ioc.get("country_codes", []), + labels, + ) + + return observables + + def create_ioc(self, ioc_name: str) -> Callable[[Any, Any, Any], List[Any]]: + """ + Return the IOC handler function associated with the given IOC type. + + Args: + ioc_name (str): The name of the IOC category to handle + + Returns: + Callable[[Any, Any, Any], List[Any]]: The corresponding IOC handler function, + or a fallback function that returns an empty list if the IOC type is unknown. + """ + indicator_mapping = { + "files": self.handle_file_iocs, + "processes": self.handle_processes_iocs, + "domains": self.handle_domain_iocs, + "urls": self.handle_url_iocs, + "mutexes": self.handle_mutexes_iocs, + "registry": self.handle_registry_iocs, + "ips": self.handle_ip_iocs, + "emails": self.handle_email_iocs, + } + return indicator_mapping.get(ioc_name, lambda *args, **kwargs: []) + + def build_attack_patterns_and_refs( + self, mitre_data: List[Dict[str, Any]] + ) -> Tuple[List[Any], List[Dict[str, Any]]]: + """ + Build STIX AttackPattern objects and external references from MITRE data. + + Args: + mitre_data (list[dict]): MITRE technique entries. + + Returns: + tuple[list, list]: AttackPattern objects and external reference dicts. + """ + attack_patterns = [] + external_refs = [] + + for mitre in mitre_data: + tid = mitre.get("technique_id") + tname = mitre.get("technique") + tactics = mitre.get("tactics", []) + + if not tid or not tname: + continue + + ext_ref = { + "source_name": "mitre-attack", + "external_id": tid, + "url": f"https://attack.mitre.org/techniques/{tid}", + } + external_refs.append(ext_ref) + + kill_chain_phases = [ + { + "kill_chain_name": "mitre-attack", + "phase_name": tactic.lower().replace(" ", "-"), + } + for tactic in tactics + ] + + attack_pattern = AttackPattern( + id=PyctiAttackPattern.generate_id(tname), + name=tname, + description="Attack Pattern identified VMRay", + created_by_ref=self.identity, + object_marking_refs=self.default_markings, + allow_custom=True, + external_references=[ext_ref], + kill_chain_phases=kill_chain_phases, + ) + attack_patterns.append(attack_pattern) + + return attack_patterns, external_refs + + def build_sample_stix_objects( + self, sample: Dict[str, Any] + ) -> Tuple[Optional[Report], List[Any]]: + """ + Build all STIX objects for a VMRay sample. + + Args: + sample (Dict[str, Any]): Raw VMRay sample object. + + Returns: + Tuple[Optional[Report], List[Any]]: A STIX Report and related STIX objects. + """ + sample_id = sample.get("sample_id") + if not sample_id: + return None, [] + + threat_names = [ + t + for t in sample.get("sample_threat_names", []) + if isinstance(t, str) and re_match(THREAT_NAMES_REGEX, t) + ] + classifications = list(sample.get("sample_classifications", [])) + + iocs_by_type = self.get_sample_iocs_by_verdict(sample_id) + if not iocs_by_type: + return None, [] + + vti_data = self.fetch_sample_vtis(sample_id) + vti_lookup = build_vtis_lookup(vti_data) + mitre_data = self.fetch_sample_mitre_attacks(sample_id) + mitre_lookup = build_mitre_lookup(mitre_data) + analysis_ids = [ + aid + for iocs in iocs_by_type.values() + for ioc in iocs + for aid in ioc.get("analysis_ids", []) + ] + + vti_labels, mitre_labels = extract_vti_mitre_labels_by_analysis( + analysis_ids, vti_lookup, mitre_lookup + ) + colored = self.build_colored_labels( + threat_names=threat_names, + classifications=classifications, + mitre_labels=mitre_labels, + vti_labels=vti_labels, + ) + all_labels = combine_labels( + vti=colored["vti"], + mitre=colored["mitre"], + threat_names=colored.get("threat_names"), + classifications=colored.get("classifications"), + ) + + stix_objects: List[Any] = [] + for ioc_type, ioc_list in iocs_by_type.items(): + filtered_ioc_list = [ + ioc + for ioc in ioc_list + if "Memory Dump" not in ioc.get("categories", []) + and ioc.get("ioc", False) + ] + if not filtered_ioc_list: + continue + handler = self.create_ioc(ioc_type) + ioc_objects = handler(filtered_ioc_list, vti_lookup, mitre_lookup) + if ioc_objects: + stix_objects.extend(ioc_objects) + + if not stix_objects: + return None, [] + + attack_patterns, external_refs = self.build_attack_patterns_and_refs(mitre_data) + stix_objects.extend(attack_patterns) + created_date = sample.get("sample_created") + published_date = ( + datetime.fromisoformat(created_date).replace(tzinfo=timezone.utc) + if created_date + else datetime.now(timezone.utc) + ) + web_url = sample.get("sample_webif_url") + if web_url: + external_refs.append( + { + "source_name": "vmray-sample", + "url": web_url, + "description": "VMRay Sample Web Interface URL", + } + ) + self.helper.connector_logger.info( + f"Creating Stix objects and report for sample {sample_id}" + ) + description = ( + f"Report for Sample ID {sample_id}. " + "Marks one or more indicators and cyber observables that " + "originate from a common analysis such as a detonation." + ) + rep_name = f"VMRay Platform STIX 2.1 Analysis Report - report--{uuid4()}" + report = Report( + id=PyctiReport.generate_id(rep_name, published_date), + name=rep_name, + description=description, + published=published_date, + labels=all_labels, + report_types=["observed-data"], + object_marking_refs=self.default_markings, + x_opencti_reliability="A - Completely reliable", + created_by_ref=self.identity, + object_refs=[obj.id for obj in stix_objects], + external_references=external_refs, + allow_custom=True, + ) + + return report, stix_objects + + def create_parent_child_relationships( + self, sample_reports: Dict[int, Report], parent_child_map: Dict[int, List[int]] + ) -> List[Relationship]: + """ + Create relationships between parent and child sample reports. + + Args: + sample_reports (Dict[int, Report]): Mapping of sample IDs to Report objects. + parent_child_map (Dict[int, List[int]]): Mapping of parent IDs to list of child IDs. + + Returns: + List[Relationship]: List of Relationship objects representing parent-child links. + """ + relationships = [] + for parent_id, child_ids in parent_child_map.items(): + parent_report = sample_reports.get(parent_id) + if not parent_report: + continue + for child_id in child_ids: + child_report = sample_reports.get(child_id) + if not child_report: + continue + if child_report: + relationships.append( + Relationship( + id=StixCoreRelationship.generate_id( + "related-to", parent_report.id, child_report.id + ), + source_ref=parent_report.id, + target_ref=child_report.id, + relationship_type="related-to", + created_by_ref=self.identity, + object_marking_refs=self.default_markings, + allow_custom=True, + ) + ) + return relationships + + def process_samples(self, samples: List[Dict[str, Any]]) -> List[Any]: + """ + Process multiple VMRay samples into full STIX objects with reports and relationships. + + Args: + samples (List[Dict[str, Any]]): List of VMRay samples. + + Returns: + List[Any]: All generated STIX objects including reports and parent–child relationships. + """ + all_objects = [] + sample_reports = {} + + for sample in samples: + report, stix_objects = self.build_sample_stix_objects(sample) + if not report: + continue + sample_id = sample.get("sample_id") + if sample_id is None: + continue + sample_reports[sample_id] = report + all_objects.extend(stix_objects) + all_objects.append(report) + + parent_child_map = build_parent_child_map(samples) + relationships = self.create_parent_child_relationships( + sample_reports, parent_child_map + ) + all_objects.extend(relationships) + + return all_objects + + def process_message(self) -> None: + """ + Connector main process to collect intelligence from VMRay. + """ + self.helper.connector_logger.info( + "[CONNECTOR] Starting connector...", + {"connector_name": self.helper.connect_name}, + ) + try: + # Load last run timestamp + current_state = self.helper.get_state() + last_run = current_state.get("last_run") if current_state else None + next_checkpoint = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + if last_run: + self.from_date = parse_to_vmray_datetime(last_run) + self.helper.connector_logger.info( + f"[VMRay] Using last_run as from_date: {self.from_date}" + ) + else: + self.from_date = parse_to_vmray_datetime(self.vmray_initial_fetch_date) + self.helper.connector_logger.info( + f"[VMRay] Using initial_fetch_date as from_date: {self.from_date}" + ) + + # Always set the new end time + self.to_date = parse_to_vmray_datetime(datetime.now(timezone.utc)) + + self.helper.connector_logger.info("Connecting to VMRay...") + + # Fetch submissions + vmray_submissions = self.get_submissions_by_timestamp() + self.helper.connector_logger.info( + f"[VMRay] Fetched {len(vmray_submissions)} submissions " + f"from {self.from_date} to {self.to_date}" + ) + + # Friendly name for OpenCTI + friendly_name = self.helper.connect_name + + # Initiate work in OpenCTI + work_id = self.helper.api.work.initiate_work( + self.helper.connect_id, friendly_name + ) + + # Process submissions + unique_sample_ids = set( + [x["submission_sample_id"] for x in vmray_submissions] + ) + vmray_samples_by_verdict = self.get_samples_by_verdict(unique_sample_ids) + processed_objects = self.process_samples(vmray_samples_by_verdict) + + self.logger.info( + f"[VMRay] Total processed objects: {len(processed_objects)}" + ) + + if processed_objects: + self.logger.info("[VMRay] Sending STIX bundle to OpenCTI...") + all_bundle = self.helper.stix2_create_bundle(processed_objects) + self.helper.send_stix2_bundle( + bundle=all_bundle, update=True, work_id=work_id + ) + else: + self.logger.info("[VMRay] FINAL RESULT: No new data to process.") + + self.helper.set_state({"last_run": next_checkpoint}) + message = ( + f"{self.helper.connect_name} connector successfully run, storing last_run as " + f"{next_checkpoint}" + ) + self.helper.api.work.to_processed(work_id, message) + self.helper.connector_logger.info(message) + + except (KeyboardInterrupt, SystemExit): + self.helper.connector_logger.info( + "[CONNECTOR] Connector stopped...", + {"connector_name": self.helper.connect_name}, + ) + exit(0) + except Exception as err: + self.helper.connector_logger.error(str(err)) + + def run(self) -> None: + """ + Run the main process encapsulated in a scheduler. + + This scheduler allows you to run the process at certain intervals. It also checks the + connector queue size. If `CONNECTOR_QUEUE_THRESHOLD` is set and the queue size exceeds + the threshold, the main process will not run until the queue is sufficiently reduced, + allowing it to restart during the next scheduler check (default threshold is 500MB). + + Requires the `duration_period` connector variable in ISO-8601 format. Example: + `CONNECTOR_DURATION_PERIOD=PT5M` will run the process every 5 minutes. + + Returns: + None + """ + + self.helper.schedule_iso( + message_callback=self.process_message, + duration_period=self.duration_period, + ) diff --git a/external-import/vmray-platform/src/vmray_connector/utils.py b/external-import/vmray-platform/src/vmray_connector/utils.py new file mode 100644 index 00000000000..93d021cebe6 --- /dev/null +++ b/external-import/vmray-platform/src/vmray_connector/utils.py @@ -0,0 +1,94 @@ +""" +Utility functions for the VMRay connector. + +Includes functions for date formatting, IP address validation, and hash type checking. +""" + +from datetime import datetime, timezone +from ipaddress import ( + AddressValueError, + IPv4Address, + IPv4Network, + IPv6Address, + IPv6Network, + NetmaskValueError, +) +from typing import Union + + +def parse_to_vmray_datetime(value: Union[str, datetime]) -> str: + """ + Convert a date value into a UTC datetime string formatted for VMRay API. + + Supported input formats: + - datetime object + - ISO 8601 string + - Short date string + + Returns: + str: UTC datetime string in the format 'YYYY-MM-DDTHH:MM:SS'. + + Raises: + ValueError: If the input value is not in a supported format. + """ + dt = None + + if isinstance(value, datetime): + dt = value.astimezone(timezone.utc) + elif isinstance(value, str): + try: + dt = datetime.fromisoformat(value.replace("Z", "")).replace( + tzinfo=timezone.utc + ) + except ValueError: + try: + dt = datetime.strptime(value, "%Y-%m-%d").replace(tzinfo=timezone.utc) + except ValueError: + pass + + if dt is None: + raise ValueError(f"Unsupported date format: {value}") + + return dt.strftime("%Y-%m-%dT%H:%M:%S") + + +def validate_ip_or_network( + parse_ip: callable, parse_ip_network: callable, value: str +) -> bool: + """ + Helper function to validate whether a string is a valid IP address + or a valid IP network (range in CIDR notation). + """ + try: + parse_ip(value) + return True + except AddressValueError: + try: + parse_ip_network(value, strict=False) + return True + except (AddressValueError, NetmaskValueError): + return False + + +def is_ipv4(ip_str: str) -> bool: + """Check whether a string is a valid IPv4 address or IPv4 network.""" + return validate_ip_or_network(IPv4Address, IPv4Network, ip_str) + + +def is_ipv6(ip_str: str) -> bool: + """Determine whether the provided string is an IPv6 address or valid IPv6 CIDR.""" + return validate_ip_or_network(IPv6Address, IPv6Network, ip_str) + + +def check_hash_type(value: str) -> str: + """ + Check hash type based on length. + """ + hash_length_map = { + 32: "MD5", + 40: "SHA-1", + 64: "SHA-256", + 128: "SHA-512", + } + + return hash_length_map.get(len(value), "unknown-hash") diff --git a/external-import/vmray-platform/src/vmray_connector/vmray_observable_transform.py b/external-import/vmray-platform/src/vmray_connector/vmray_observable_transform.py new file mode 100644 index 00000000000..dae40934153 --- /dev/null +++ b/external-import/vmray-platform/src/vmray_connector/vmray_observable_transform.py @@ -0,0 +1,186 @@ +from pycti import StixCoreRelationship +from stix2 import ( + URL, + DomainName, + EmailAddress, + File, + IPv4Address, + IPv6Address, + Mutex, + Process, + Relationship, + WindowsRegistryKey, +) + +from .utils import is_ipv4, is_ipv6 + + +class VMRayObservableTransform: + """Class to support the transformation of VMRay Observables to STIX observables.""" + + def __init__( + self, + observable_type, + observable_value, + labels, + created_by_ref, + score, + create_indicator=False, + observable=None, + description=None, + markings=None, + ): + self.created_by_ref = created_by_ref + self.observable_type = observable_type + self.observable_value = observable_value + self.labels = labels + self.score = score + self.create_indicator = create_indicator + self.observable = observable + self.description = description + self.markings = markings or [] + + # self.data_type = self.standardize_data_type() + if self.observable_type == "ip": + self.observable_type = self.standardize_data_type() + self.stix_observable = self.create_stix_observable() + + def standardize_data_type(self): + """Standardize a set of observables like hash, file, and ip.""" + data_type = None + + if self.observable_type in ["ip", "ipv4"] and is_ipv4(self.observable_value): + data_type = "ipv4" + if self.observable_type in ["ip", "ipv6"] and is_ipv6(self.observable_value): + data_type = "ipv6" + return data_type + + def create_custom_properties(self, default_desc="Imported from VMRay"): + """Create standard custom properties as it's repeated.""" + desc = self.description if self.description else default_desc + return { + "description": desc, + "labels": self.labels, + "x_opencti_score": self.score, + "created_by_ref": self.created_by_ref, + "x_opencti_create_indicator": self.create_indicator, + } + + def create_stix_observable(self): + """ + Create stix observable if it exists in the provided map, + if not, return the default observable. + """ + data_type_to_observable_map = { + "domain": self.create_domain_name, + "email_address": self.create_email_address, + "file": self.create_file_by_hashes, + "ipv4": self.create_ipv4_address, + "ipv6": self.create_ipv6_address, + "url": self.create_url, + "process": self.create_process, + "mutex": self.create_mutex, + "registry": self.create_registry, + } + + # Return the results from the function in the MAP, if it's not in the MAP return the default_observable + return data_type_to_observable_map.get(self.observable_type)() + + def create_domain_name(self): + """Create a STIX DomainName observable.""" + return DomainName( + value=self.observable_value, + object_marking_refs=self.markings, + custom_properties=self.create_custom_properties(), + ) + + def create_email_address(self): + """Create a STIX EmailAddress observable.""" + return EmailAddress( + value=self.observable_value, + object_marking_refs=self.markings, + custom_properties=self.create_custom_properties(), + ) + + def create_file_by_hashes(self): + """Create a STIX File observable using provided hashes.""" + return File( + hashes=self.observable.get("hashes", {}), + name=self.observable.get("filename", ""), + size=self.observable.get("size"), + mime_type=self.observable.get("mime_type"), + object_marking_refs=self.markings, + custom_properties=self.create_custom_properties(), + ) + + def create_ipv4_address(self): + """Create a STIX IPv4Address observable.""" + return IPv4Address( + value=self.observable_value, + object_marking_refs=self.markings, + custom_properties=self.create_custom_properties(), + ) + + def create_ipv6_address(self): + """Create a STIX IPv6Address observable.""" + return IPv6Address( + value=self.observable_value, + object_marking_refs=self.markings, + custom_properties=self.create_custom_properties(), + ) + + def create_registry(self): + """Create a STIX WindowsRegistryKey observable.""" + return WindowsRegistryKey( + key=self.observable_value, + object_marking_refs=self.markings, + custom_properties=self.create_custom_properties(), + ) + + def create_url(self): + """Create a STIX URL observable.""" + return URL( + value=self.observable_value, + object_marking_refs=self.markings, + custom_properties=self.create_custom_properties(), + ) + + def create_process(self): + """Create a STIX Process observable.""" + return Process( + command_line=self.observable_value, + object_marking_refs=self.markings, + custom_properties=self.create_custom_properties(), + ) + + def create_mutex(self): + """Create a STIX Mutex observable.""" + return Mutex( + name=self.observable_value, + object_marking_refs=self.markings, + custom_properties=self.create_custom_properties(), + ) + + def create_relationship( + self, + src_id, + tgt_id, + markings, + rel_type="related-to", + description="Imported from VMRay", + ): + """Create a STIX Relationship between two STIX objects.""" + return Relationship( + id=StixCoreRelationship.generate_id( + rel_type, + src_id, + tgt_id, + ), + relationship_type=rel_type, + created_by_ref=self.created_by_ref, + source_ref=src_id, + target_ref=tgt_id, + description=description, + object_marking_refs=markings, + allow_custom=True, + ) diff --git a/external-import/vmray-platform/src/vmray_connector/vmray_stix_builder.py b/external-import/vmray-platform/src/vmray_connector/vmray_stix_builder.py new file mode 100644 index 00000000000..86d67d902ed --- /dev/null +++ b/external-import/vmray-platform/src/vmray_connector/vmray_stix_builder.py @@ -0,0 +1,339 @@ +""" +VMRay STIX Builder. + +Provides the VMRaySTIXBuilder class to convert VMRay data into STIX 2.1 objects +for ingestion into OpenCTI, including indicators, malware, locations, and relationships. +""" + +from datetime import datetime, timezone +from typing import Dict, List, Optional, Tuple, Union + +from pycti import Indicator as PyctiIndicator +from pycti import Location as PyctiLocation +from pycti import Malware as PyctiMalware +from pycti import StixCoreRelationship +from stix2 import ( + URL, + DomainName, + EmailAddress, + EmailMessage, + File, + Indicator, + IPv4Address, + IPv6Address, + Location, + Malware, + Mutex, + Process, + Relationship, + WindowsRegistryKey, +) + +from .vmray_observable_transform import VMRayObservableTransform + +STIXObservable = Union[ + URL, + DomainName, + EmailAddress, + EmailMessage, + File, + IPv4Address, + IPv6Address, + WindowsRegistryKey, + Mutex, + Process, +] + +THREAT_NAMES_REGEX = r"^[a-zA-Z0-9\s]+$" + + +class VMRaySTIXBuilder: + """ + Unified builder for creating STIX 2.1 objects from VMRay. + Handles Indicators, Malware, Locations, and Relationships. + """ + + def __init__( + self, + identity: str, + default_markings: List[str], + helper, + threat_names_color: str, + classifications_color: str, + vti_color: str, + mitre_color: str, + ): + self.identity = identity + self.default_markings = default_markings + self.helper = helper + + self.threat_names_color = threat_names_color + self.classifications_color = classifications_color + self.vti_color = vti_color + self.mitre_color = mitre_color + + def create_indicator_from_observable( + self, + observable: STIXObservable, + labels: Optional[List[str]] = None, + created_by_ref: Optional[str] = None, + kill_chain_phases: Optional[List[Dict[str, str]]] = None, + confidence: Optional[int] = None, + description: Optional[str] = None, + score: Optional[int] = None, + ) -> Tuple[Indicator, Relationship]: + """ + Create a STIX 2.1 Indicator object from a STIX Observable. + + Args: + observable (Observable): The observable object. + labels (Optional[List[str]]): List of labels to attach. + created_by_ref (Optional[str]): Reference ID for the creator. + kill_chain_phases (Optional[List[Dict[str, str]]]): + Kill-chain phase objects. + confidence (Optional[int]): Confidence value (0–100). + description (Optional[str]): Description for the indicator. + score (Optional[int]): Score of the indicator. + + Returns: + Tuple[Indicator, Relationship]: Indicator and its relationship to the observable. + """ + pattern = self._generate_pattern(observable) + name = self._get_observable_name(observable) + OPENCTI_MAIN_TYPE_MAP = { + "ipv4-addr": "IPv4-Addr", + "ipv6-addr": "IPv6-Addr", + "domain-name": "Domain-Name", + "url": "Url", + "file": "File", + "windows-registry-key": "Windows-Registry-Key", + "process": "Process", + "mutex": "Mutex", + "email-addr": "Email-Addr", + } + indicator = Indicator( + id=PyctiIndicator.generate_id(pattern), + name=name, + description=description, + pattern_type="stix", + pattern=pattern, + created_by_ref=created_by_ref or self.identity, + labels=labels, + valid_from=datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), + kill_chain_phases=kill_chain_phases, + confidence=confidence, + object_marking_refs=self.default_markings, + x_opencti_score=score, + x_opencti_main_observable_type=OPENCTI_MAIN_TYPE_MAP.get( + getattr(observable, "type") + ), + allow_custom=True, + ) + relationship = Relationship( + id=StixCoreRelationship.generate_id( + "based-on", indicator.id, observable.id + ), + relationship_type="based-on", + source_ref=indicator.id, + target_ref=observable.id, + created_by_ref=created_by_ref or self.identity, + object_marking_refs=self.default_markings, + allow_custom=True, + ) + return indicator, relationship + + def create_malware_objects_for_threat_names( + self, + threat_names: List[str], + classifications: List[str], + indicator: Indicator, + obs: STIXObservable, + labels: Optional[List[str]] = None, + ) -> List[Union[Malware, Relationship]]: + """ + Create malware objects for given threat names + and relate them to an indicator. + + Args: + threat_names (List[str]): List of threat names. + classifications (List[str]): Malware classifications. + indicator (Indicator): The associated indicator. + obs (STIXObservable): The originating observable. + labels (Optional[List[str]]): Labels for the malware objects. + + Returns: + List[Union[Malware, Relationship]]: List of malware objects and relationships. + """ + observables = [] + labels = labels or [] + for name in threat_names: + malware_obj = Malware( + id=PyctiMalware.generate_id(name), + name=name, + description="Malware created from VMRay IOC", + malware_types=classifications, + is_family=True, + created_by_ref=self.identity, + labels=labels, + object_marking_refs=self.default_markings, + allow_custom=True, + ) + observables.append(malware_obj) + rel = obs.create_relationship( + src_id=indicator.id, + tgt_id=malware_obj.id, + markings=self.default_markings, + rel_type="indicates", + description="Indicator is related to target malware", + ) + observables.append(rel) + return observables + + def create_location_objects( + self, + indicator: Indicator, + obs: STIXObservable, + countries: List[str], + country_codes: Optional[List[str]] = None, + labels: Optional[List[str]] = None, + ) -> List[Union[Location, Relationship]]: + """ + Create Location objects from country information. + + Args: + indicator(Indicator): Parent Indicator object. + obs(STIXObservable): Primary Observable object. + countries (List[str]): List of country names. + country_codes (Optional[List[str]]): + Optional list of ISO country codes. + labels (Optional[List[str]]): Optional labels. + + Returns: + List[Union[Location, Relationship]]: List of location objects and relationships. + """ + observables = [] + labels = labels or [] + country_codes = country_codes or [] + for i, country in enumerate(countries): + code = country_codes[i] if i < len(country_codes) else country + loc_obj = Location( + id=PyctiLocation.generate_id(country, "country"), + name=country, + description="Country created from VMRay IOC", + country=code, + created_by_ref=self.identity, + labels=labels, + object_marking_refs=self.default_markings, + allow_custom=True, + ) + observables.append(loc_obj) + rel = obs.create_relationship( + src_id=indicator.id, + tgt_id=loc_obj.id, + markings=self.default_markings, + rel_type="related-to", + ) + observables.append(rel) + return observables + + def create_related_obs_for_domain_url_originals( + self, + indicator: Indicator, + obs: STIXObservable, + originals: List[str], + obs_type: str, + labels: List[str], + score: int, + rel_type: str = "based-on", + ) -> List[Union[STIXObservable, Relationship]]: + """ + Create related observables for original domains or URLs + and relate them to the indicator. + + Args: + indicator (Indicator): The primary indicator. + obs (STIXObservable): The originating observable. + originals (List[str]): List of original domains or URLs. + obs_type (str): Type of the observable (domain/url). + labels (List[str]): Labels to attach. + rel_type (str): Relationship type. + + Returns: + List[Union[STIXObservable, Relationship]]: + List of created observables and relationships. + """ + observables = [] + for original in originals: + orig_obs = VMRayObservableTransform( + observable_type=obs_type, + observable_value=original, + labels=labels, + description=f"{obs_type.capitalize()} IOC from VMRay", + created_by_ref=self.identity, + score=score, + markings=self.default_markings, + ) + orig_obj = orig_obs.stix_observable + if orig_obj: + observables.append(orig_obj) + rel = orig_obs.create_relationship( + src_id=indicator.id, + tgt_id=orig_obj.id, + markings=self.default_markings, + rel_type=rel_type, + ) + observables.append(rel) + return observables + + def _generate_pattern(self, observable: STIXObservable) -> str: + """Generate STIX pattern for observable.""" + + # Helper functions return safe values only + def safe_registry_key(obs): + return ( + "".join(c for c in getattr(obs, "key", "") if c.isprintable()) + .replace("\\", "\\\\") + .replace("'", "\\'") + ) + + def safe_process_cmd(obs): + return ( + getattr(obs, "command_line", "") + .replace("\\", "\\\\") + .replace("'", "\\'") + ) + + type_map = { + "ipv4-addr": lambda obs: f"[ipv4-addr:value = '{obs.value}']", + "ipv6-addr": lambda obs: f"[ipv6-addr:value = '{obs.value}']", + "domain-name": lambda obs: f"[domain-name:value = '{obs.value}']", + "url": lambda obs: f"[url:value = '{obs.value}']", + "file": lambda obs: " OR ".join( + f"[file:hashes.'{k}' = '{v}']" + for k, v in getattr(obs, "hashes", {}).items() + ), + "process": lambda obs: f"[process:command_line = '{safe_process_cmd(obs)}']", + "mutex": lambda obs: f"[mutex:name = '{getattr(obs, 'name', '')}']", + "windows-registry-key": lambda obs: f"[windows-registry-key:key = '{safe_registry_key(obs)}']", + "email-addr": lambda obs: f"[email-addr:value = '{getattr(obs, 'value', '')}']", + } + + obs_type = getattr(observable, "type", None) + if obs_type not in type_map: + raise ValueError(f"Unsupported observable type: {obs_type}") + + return type_map[obs_type](observable) + + def _get_observable_name(self, obs: STIXObservable) -> str: + """Return human-readable name of observable.""" + if obs.type == "file": + for algo in ["SHA-256", "SHA1", "MD5"]: + if hasattr(obs, "hashes") and algo in obs.hashes: + return obs.hashes[algo] + if hasattr(obs, "name") and obs.name: + return obs.name + return "file" + for attr in ["value", "name", "key", "command_line"]: + if hasattr(obs, attr): + return getattr(obs, attr) + return obs.type