Skip to content
Open
3 changes: 2 additions & 1 deletion internal-enrichment/kaspersky-enrichment/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ services:
# Common parameters for connectors of type INTERNAL_ENRICHMENT
- CONNECTOR_ID=CHANGEME
# - CONNECTOR_NAME=Kaspersky Enrichment
# - CONNECTOR_SCOPE=StixFile # Support for additional observable types (IP addresses, domains, and URLs) will be added in future releases.
# - CONNECTOR_SCOPE=StixFile,IPv4-Addr # Support for additional observable types (domains, and URLs) will be added in future releases.
# - CONNECTOR_LOG_LEVEL=error
# - CONNECTOR_AUTO=true

Expand All @@ -17,5 +17,6 @@ services:
- KASPERSKY_API_KEY=CHANGEME
# - KASPERSKY_ZONE_OCTI_SCORE_MAPPING=red:100,orange:80,yellow:60,gray:20,green:0
# - KASPERSKY_FILE_SECTIONS=LicenseInfo,Zone,FileGeneralInfo
# - KASPERSKY_IPV4_SECTIONS=LicenseInfo,Zone,IpGeneralInfo

restart: always
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ connector:
# id: 'ChangeMe'
# type: 'INTERNAL_ENRICHMENT'
# name: 'Kaspersky Enrichment'
# scope: 'StixFile' # Support for additional observable types (IP addresses, domains, and URLs) will be added in future releases.
# scope: 'StixFile,IPv4-Addr' # Support for additional observable types (domains, and URLs) will be added in future releases.
# log_level: 'error'
# auto: true # Enable/disable auto-enrichment of observables

Expand All @@ -15,3 +15,4 @@ kaspersky:
api_key: 'ChangeMe'
# zone_octi_score_mapping: 'red:100,orange:80,yellow:60,gray:20,green:0'
# file_sections: 'LicenseInfo,Zone,FileGeneralInfo'
# ipv4_sections: 'LicenseInfo,Zone,IpGeneralInfo'
232 changes: 44 additions & 188 deletions internal-enrichment/kaspersky-enrichment/src/connector/connector.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from connector.converter_to_stix import ConverterToStix
from connector.settings import ConnectorSettings
from connector.use_cases.enrich_file import FileEnricher
from connector.use_cases.enrich_ipv4 import Ipv4Enricher
from connector.utils import entity_in_scope
from kaspersky_client import KasperskyClient
from pycti import STIX_EXT_OCTI_SCO, OpenCTIConnectorHelper, OpenCTIStix2

from .converter_to_stix import ConverterToStix
from .settings import ConnectorSettings
from pycti import OpenCTIConnectorHelper


class KasperskyConnector:
Expand Down Expand Up @@ -48,156 +50,40 @@ def __init__(
"""
self.config = config
self.helper = helper
self.file_sections = self.config.kaspersky.file_sections
self.zone_octi_score_mapping = self.config.kaspersky.zone_octi_score_mapping
file_sections = self.config.kaspersky.file_sections
ipv4_sections = self.config.kaspersky.ipv4_sections
zone_octi_score_mapping = self.config.kaspersky.zone_octi_score_mapping
api_key = self.config.kaspersky.api_key.get_secret_value()

self.client = KasperskyClient(
client = KasperskyClient(
self.helper,
base_url=self.config.kaspersky.api_base_url,
api_key=api_key,
params={
"count": 1,
"sections": self.file_sections,
"format": "json",
},
)

self.converter_to_stix = ConverterToStix(self.helper)

# Define variables
self.stix_objects = []

def _process_file(self, observable: dict) -> None:
"""
Collect intelligence from the source for a File type
"""
self.helper.connector_logger.info("[CONNECTOR] Starting enrichment...")

# Retrieve file hash
obs_hash = self.resolve_file_hash(observable)
converter_to_stix = ConverterToStix(self.helper)

# Get entity data from api client
entity_data = self.client.get_file_info(obs_hash)

# Check Quota
self.check_quota(entity_data["LicenseInfo"])

# Manage FileGeneralInfo data

self.helper.connector_logger.info(
"[CONNECTOR] Process enrichment from FileGeneralInfo data..."
self.file_enricher = FileEnricher(
helper=self.helper,
client=client,
sections=file_sections,
zone_octi_score_mapping=zone_octi_score_mapping,
converter_to_stix=converter_to_stix,
)
self.ipv4_enricher = Ipv4Enricher(
helper=self.helper,
client=client,
sections=ipv4_sections,
zone_octi_score_mapping=zone_octi_score_mapping,
converter_to_stix=converter_to_stix,
)

entity_file_general_info = entity_data["FileGeneralInfo"]

# Score
if entity_data.get("Zone"):
score = self.zone_octi_score_mapping[entity_data["Zone"].lower()]
OpenCTIStix2.put_attribute_in_extension(
observable, STIX_EXT_OCTI_SCO, "score", score
)

# Hashes
if entity_file_general_info.get("Md5"):
observable["hashes"]["MD5"] = entity_file_general_info["Md5"]
if entity_file_general_info.get("Sha1"):
observable["hashes"]["SHA-1"] = entity_file_general_info["Sha1"]
if entity_file_general_info.get("Sha256"):
observable["hashes"]["SHA-256"] = entity_file_general_info["Sha256"]

# Size, mime_type
mapping_fields = {"Size": "size", "Type": "mime_type"}
for key, value in mapping_fields.items():
if entity_file_general_info.get(key):
observable[value] = entity_file_general_info[key]

# Labels
if entity_file_general_info.get("Categories"):
observable["labels"] = []
if observable.get("x_opencti_labels"):
observable["labels"] = observable["x_opencti_labels"]
for label in entity_file_general_info["Categories"]:
if label not in observable["labels"]:
observable["labels"].append(label)

# Manage FileNames data

if entity_data.get("FileNames"):
self.helper.connector_logger.info(
"[CONNECTOR] Process enrichment from FileNames data..."
)

observable["additional_names"] = observable.get(
"x_opencti_additional_names", []
)
for filename in entity_data["FileNames"]:
if filename["FileName"] not in observable["additional_names"]:
observable["additional_names"].append(f" {filename["FileName"]}")
else:
observable["additional_names"] = filename["FileName"]

# Prepare author object
author = self.converter_to_stix.create_author()
self.stix_objects.append(author)

# Manage DetectionsInfo data

if entity_data.get("DetectionsInfo"):
self.helper.connector_logger.info(
"[CONNECTOR] Process enrichment from DetectionsInfo data..."
)

content = "| Detection Date | Detection Name | Detection Method |\n"
content += "|----------------|----------------|------------------|\n"

for obs_detection_info in entity_data["DetectionsInfo"]:
detection_name = f"[{obs_detection_info["DetectionName"]}]({obs_detection_info["DescriptionUrl"]})"
content += f"| {obs_detection_info["LastDetectDate"]} | {detection_name} | {obs_detection_info["DetectionMethod"]} |\n"

obs_note = self.converter_to_stix.create_file_note(
observable["id"], content
)
self.stix_objects.append(obs_note)

# Manage FileDownloadedFromUrls data

if entity_data.get("FileDownloadedFromUrls"):
self.helper.connector_logger.info(
"[CONNECTOR] Process enrichment from FileDownloadedFromUrls data..."
)

for url_info in entity_data["FileDownloadedFromUrls"]:
obs_url_score = self.zone_octi_score_mapping[url_info["Zone"].lower()]
url_object = self.converter_to_stix.create_url(obs_url_score, url_info)

if url_object:
self.stix_objects.append(url_object)
url_relation = self.converter_to_stix.create_relationship(
source_id=observable["id"],
relationship_type="related-to",
target_id=url_object.id,
)
self.stix_objects.append(url_relation)

# Manage Industries data

if entity_data.get("Industries"):
self.helper.connector_logger.info(
"[CONNECTOR] Process enrichment from Industries data..."
)

for industry in entity_data["Industries"]:
industry_object = self.converter_to_stix.create_sector(industry)

if industry_object:
self.stix_objects.append(industry_object)
industry_relation = self.converter_to_stix.create_relationship(
source_id=observable["id"],
relationship_type="related-to",
target_id=industry_object.id,
)
self.stix_objects.append(industry_relation)
# Define variables
self.stix_objects = []

def _send_bundle(self, stix_objects: list) -> str:
"""
Expand All @@ -211,46 +97,6 @@ def _send_bundle(self, stix_objects: list) -> str:
)
return info_msg

def check_quota(self, entity_info: dict) -> None:
"""
Check if quota is not exceeded.
Raise a warning otherwise.
"""
if entity_info["DayRequests"] >= entity_info["DayQuota"]:
self.helper.connector_logger.warning(
"[CONNECTOR] The daily quota has been exceeded",
{
"day_requests": entity_info["DayRequests"],
"day_quota": entity_info["DayQuota"],
},
)

def entity_in_scope(self, obs_type: str) -> bool:
"""
Security to limit playbook triggers to something other than the initial scope
:param data: Dictionary of data
:return: boolean
"""
scopes = self.helper.connect_scope.lower().replace(" ", "").split(",")
entity_split = obs_type.split("--")
entity_type = entity_split[0].lower()

if entity_type in scopes:
return True
else:
return False

def resolve_file_hash(self, observable: dict) -> str:
if "hashes" in observable and "SHA-256" in observable["hashes"]:
return observable["hashes"]["SHA-256"]
if "hashes" in observable and "SHA-1" in observable["hashes"]:
return observable["hashes"]["SHA-1"]
if "hashes" in observable and "MD5" in observable["hashes"]:
return observable["hashes"]["MD5"]
raise ValueError(
"Unable to enrich the observable, the observable does not have an SHA256, SHA1, or MD5"
)

def process_message(self, data: dict) -> str:
"""
Get the observable created/modified in OpenCTI and check which type to send for process
Expand All @@ -272,25 +118,35 @@ def process_message(self, data: dict) -> str:
)
self.helper.connector_logger.info(info_msg, {"type": {obs_type}})

if self.entity_in_scope(obs_type):
if entity_in_scope(self.helper.connect_scope, obs_type):
# Performing the collection of intelligence and enrich the entity
match obs_type:
case "StixFile":
self._process_file(observable)
# case "IPv4-Addr":
# self._process_ip(observable)
octi_objects = self.file_enricher.process_file_enrichment(
observable
)
case "IPv4-Addr":
octi_objects = self.ipv4_enricher.process_ipv4_enrichment(
observable
)
# case "Domain-Name" | "Hostname":
# self._process_domain(observable)
# octi_objects = self.domain_enricher.process_domain_enrichment(
# observable
# )
# case "Url":
# self._process_url(observable)
# octi_objects = self.url_enricher.process_url_enrichment(
# observable
# )
case _:
raise ValueError(
"Entity type is not supported",
{"entity_type": obs_type},
)

if self.stix_objects is not None and len(self.stix_objects):
return self._send_bundle(self.stix_objects)
bundle_objects = self.stix_objects + octi_objects

if bundle_objects is not None and len(bundle_objects):
return self._send_bundle(bundle_objects)
else:
info_msg = "[CONNECTOR] No information found"
return info_msg
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
SECTIONS = {
"file_sections": {
"mandatories_sections": ["LicenseInfo", "Zone", "FileGeneralInfo"],
"supported_sections": [
"LicenseInfo",
"Zone",
"FileGeneralInfo",
"DetectionsInfo",
"FileDownloadedFromUrls",
"Industries",
"FileNames",
],
},
"ipv4_sections": {
"mandatories_sections": ["LicenseInfo", "Zone", "IpGeneralInfo"],
"supported_sections": [
"LicenseInfo",
"Zone",
"IpGeneralInfo",
"FilesDownloadedFromIp",
"HostedUrls",
"IpWhoIs",
"IpDnsResolutions",
"Industries",
],
},
"domain_sections": {},
"url_sections": {},
}

DATETIME_FORMAT = "%Y-%m-%dT%H:%MZ"
Loading