diff --git a/internal-enrichment/kaspersky-enrichment/docker-compose.yml b/internal-enrichment/kaspersky-enrichment/docker-compose.yml index 02a5b156a92..c9a8cb15d29 100644 --- a/internal-enrichment/kaspersky-enrichment/docker-compose.yml +++ b/internal-enrichment/kaspersky-enrichment/docker-compose.yml @@ -9,7 +9,7 @@ services: # Common parameters for connectors of type INTERNAL_ENRICHMENT - CONNECTOR_ID=ChangeMe # - CONNECTOR_NAME=Kaspersky Enrichment - # - CONNECTOR_SCOPE=StixFile,IPv4-Addr,Domain-Name,Hostname # Support for additional observable types (URLs) will be added in future releases. + # - CONNECTOR_SCOPE=StixFile,IPv4-Addr,Domain-Name,Hostname,Url # - CONNECTOR_LOG_LEVEL=error # - CONNECTOR_AUTO=true @@ -21,5 +21,6 @@ services: # - KASPERSKY_FILE_SECTIONS=LicenseInfo,Zone,FileGeneralInfo # - KASPERSKY_IPV4_SECTIONS=LicenseInfo,Zone,IpGeneralInfo # - KASPERSKY_DOMAIN_SECTIONS=LicenseInfo,Zone,DomainGeneralInfo + # - KASPERSKY_URL_SECTIONS=LicenseInfo,Zone,UrlGeneralInfo restart: always diff --git a/internal-enrichment/kaspersky-enrichment/src/config.yml.sample b/internal-enrichment/kaspersky-enrichment/src/config.yml.sample index 981f7556c14..b0d6147d9ed 100644 --- a/internal-enrichment/kaspersky-enrichment/src/config.yml.sample +++ b/internal-enrichment/kaspersky-enrichment/src/config.yml.sample @@ -6,7 +6,7 @@ connector: # type: 'INTERNAL_ENRICHMENT' # id: 'ChangeMe' # name: 'Kaspersky Enrichment' -# scope: 'StixFile,IPv4-Addr,Domain-Name,Hostname' # Support for additional observable types (URLs) will be added in future releases. +# scope: 'StixFile,IPv4-Addr,Domain-Name,Hostname,Url' # log_level: 'error' # auto: true # Enable/disable auto-enrichment of observables @@ -18,3 +18,4 @@ kaspersky: # file_sections: 'LicenseInfo,Zone,FileGeneralInfo' # ipv4_sections: 'LicenseInfo,Zone,IpGeneralInfo' # domain_sections: 'LicenseInfo,Zone,DomainGeneralInfo' +# url_sections: 'LicenseInfo,Zone,UrlGeneralInfo' diff --git a/internal-enrichment/kaspersky-enrichment/src/connector/connector.py b/internal-enrichment/kaspersky-enrichment/src/connector/connector.py index 88db8cc3994..66e21380df0 100644 --- a/internal-enrichment/kaspersky-enrichment/src/connector/connector.py +++ b/internal-enrichment/kaspersky-enrichment/src/connector/connector.py @@ -3,6 +3,7 @@ from connector.use_cases.enrich_domain import DomainEnricher from connector.use_cases.enrich_file import FileEnricher from connector.use_cases.enrich_ipv4 import Ipv4Enricher +from connector.use_cases.enrich_url import UrlEnricher from connector.utils import entity_in_scope from kaspersky_client import KasperskyClient from pycti import OpenCTIConnectorHelper @@ -54,6 +55,7 @@ def __init__( file_sections = self.config.kaspersky.file_sections ipv4_sections = self.config.kaspersky.ipv4_sections domain_sections = self.config.kaspersky.domain_sections + url_sections = self.config.kaspersky.url_sections zone_octi_score_mapping = self.config.kaspersky.zone_octi_score_mapping api_key = self.config.kaspersky.api_key.get_secret_value() @@ -70,26 +72,33 @@ def __init__( converter_to_stix = ConverterToStix(self.helper) self.file_enricher = FileEnricher( - helper=self.helper, + connector_logger=self.helper.connector_logger, client=client, sections=file_sections, zone_octi_score_mapping=zone_octi_score_mapping, converter_to_stix=converter_to_stix, ) self.ipv4_enricher = Ipv4Enricher( - helper=self.helper, + connector_logger=self.helper.connector_logger, client=client, sections=ipv4_sections, zone_octi_score_mapping=zone_octi_score_mapping, converter_to_stix=converter_to_stix, ) self.domain_enricher = DomainEnricher( - helper=self.helper, + connector_logger=self.helper.connector_logger, client=client, sections=domain_sections, zone_octi_score_mapping=zone_octi_score_mapping, converter_to_stix=converter_to_stix, ) + self.url_enricher = UrlEnricher( + connector_logger=self.helper.connector_logger, + client=client, + sections=url_sections, + zone_octi_score_mapping=zone_octi_score_mapping, + converter_to_stix=converter_to_stix, + ) # Define variables self.stix_objects = [] @@ -159,10 +168,10 @@ def process_message(self, data: dict) -> str: octi_objects = self.domain_enricher.process_domain_enrichment( observable ) - # case "Url": - # octi_objects = self.url_enricher.process_url_enrichment( - # observable - # ) + case "Url": + octi_objects = self.url_enricher.process_url_enrichment( + observable + ) case _: raise ValueError( "Entity type is not supported", diff --git a/internal-enrichment/kaspersky-enrichment/src/connector/constants.py b/internal-enrichment/kaspersky-enrichment/src/connector/constants.py index a03fed1abd9..61481c81646 100644 --- a/internal-enrichment/kaspersky-enrichment/src/connector/constants.py +++ b/internal-enrichment/kaspersky-enrichment/src/connector/constants.py @@ -36,7 +36,17 @@ "Industries", ], }, - "url_sections": {}, + "url_sections": { + "mandatories_sections": ["LicenseInfo", "Zone", "UrlGeneralInfo"], + "supported_sections": [ + "LicenseInfo", + "Zone", + "UrlGeneralInfo", + "FilesDownloaded", + "FilesAccessed", + "Industries", + ], + }, } DATETIME_FORMAT = "%Y-%m-%dT%H:%MZ" diff --git a/internal-enrichment/kaspersky-enrichment/src/connector/converter_to_stix.py b/internal-enrichment/kaspersky-enrichment/src/connector/converter_to_stix.py index dfbde64b318..f3fbcc636fb 100644 --- a/internal-enrichment/kaspersky-enrichment/src/connector/converter_to_stix.py +++ b/internal-enrichment/kaspersky-enrichment/src/connector/converter_to_stix.py @@ -9,6 +9,7 @@ DomainName, File, IPV4Address, + IPV6Address, Note, OrganizationAuthor, Reference, @@ -65,7 +66,7 @@ def create_country(self, country_name: str) -> Country: """ return Country(name=country_name, author=self.author, markings=[self.tlp_clear]) - def create_domain(self, name: str, score: int) -> DomainName: + def create_domain(self, name: str, score: int = None) -> DomainName: """ Create a Domain object """ @@ -88,6 +89,12 @@ def create_ipv4(self, ip: str) -> IPV4Address: """ return IPV4Address(value=ip, author=self.author, markings=[self.tlp_amber]) + def create_ipv6(self, ip: str) -> IPV4Address: + """ + Create an IPv6 object + """ + return IPV6Address(value=ip, author=self.author, markings=[self.tlp_amber]) + def create_note(self, observable: Reference, content: str) -> Note: """ Create a note associated to the file observable diff --git a/internal-enrichment/kaspersky-enrichment/src/connector/settings.py b/internal-enrichment/kaspersky-enrichment/src/connector/settings.py index 96c1e2cb9ae..988d41f967c 100644 --- a/internal-enrichment/kaspersky-enrichment/src/connector/settings.py +++ b/internal-enrichment/kaspersky-enrichment/src/connector/settings.py @@ -60,7 +60,7 @@ class InternalEnrichmentConnectorConfig(BaseInternalEnrichmentConnectorConfig): description="Name of the connector.", ) scope: ListFromString = Field( - default=["StixFile", "IPv4-Addr", "Domain-Name", "Hostname"], + default=["StixFile", "IPv4-Addr", "Domain-Name", "Hostname", "Url"], description="The scope or type of data the connector is importing, either a MIME type or Stix Object (for information only).", ) auto: bool = Field( @@ -120,11 +120,18 @@ class KasperskyConfig(BaseConfigModel): "LicenseInfo, Zone and DomainGeneralInfo are always set, can't be disabled. " "Only DomainDnsResolutions, FilesDownloaded, FilesAccessed and Industries are currently supported", ) + url_sections: str = Field( + default="LicenseInfo,Zone,UrlGeneralInfo", + description="Sections wanted to investigate for the requested URL. " + "LicenseInfo, Zone and UrlGeneralInfo are always set, can't be disabled. " + "Only FilesDownloaded, FilesAccessed and Industries are currently supported", + ) @field_validator( "file_sections", "ipv4_sections", "domain_sections", + "url_sections", mode="before", ) @classmethod diff --git a/internal-enrichment/kaspersky-enrichment/src/connector/use_cases/common.py b/internal-enrichment/kaspersky-enrichment/src/connector/use_cases/common.py index 3b5f2408751..5aa55b7afd5 100644 --- a/internal-enrichment/kaspersky-enrichment/src/connector/use_cases/common.py +++ b/internal-enrichment/kaspersky-enrichment/src/connector/use_cases/common.py @@ -1,15 +1,18 @@ +import logging + from connector.converter_to_stix import ConverterToStix -from connector.utils import is_quota_exceeded -from pycti import STIX_EXT_OCTI_SCO, OpenCTIConnectorHelper, OpenCTIStix2 +from connector.utils import get_first_and_last_seen_datetime, is_quota_exceeded +from connectors_sdk.models import Reference +from pycti import STIX_EXT_OCTI_SCO, OpenCTIStix2 class BaseUseCases: def __init__( self, - helper: OpenCTIConnectorHelper, + connector_logger: logging.Logger, converter_to_stix: ConverterToStix, ): - self.helper = helper + self.connector_logger = connector_logger self.converter_to_stix = converter_to_stix def check_quota(self, license_info: dict) -> None: @@ -17,7 +20,7 @@ def check_quota(self, license_info: dict) -> None: Send a log warning if quota is exceeded """ if is_quota_exceeded(license_info): - self.helper.connector_logger.warning( + self.connector_logger.warning( "[CONNECTOR] The daily quota has been exceeded", { "day_requests": license_info["DayRequests"], @@ -42,7 +45,7 @@ def generate_author_and_tlp_markings(self): return common_objects - def update_observable_score(self, zone: str, observable: dict): + def update_observable_score(self, zone: str, observable: dict) -> dict: """ Update score in observable """ @@ -50,3 +53,54 @@ def update_observable_score(self, zone: str, observable: dict): return OpenCTIStix2.put_attribute_in_extension( observable, STIX_EXT_OCTI_SCO, "score", score ) + + def manage_industries(self, observable_to_ref: Reference, industries: list) -> list: + """ + Create sector and relation for each item in industries + """ + self.connector_logger.info( + "[CONNECTOR] Process enrichment from Industries data..." + ) + + industry_objects = [] + for industry in industries: + industry_object = self.converter_to_stix.create_sector(industry) + + if industry_object: + industry_objects.append(industry_object.to_stix2_object()) + industry_relation = self.converter_to_stix.create_relationship( + relationship_type="related-to", + source_obj=observable_to_ref, + target_obj=industry_object, + ) + industry_objects.append(industry_relation.to_stix2_object()) + return industry_objects + + def manage_files(self, files: list, observable_to_ref: Reference) -> list: + """ + Create file and relation for each item + """ + file_objects = [] + for file in files: + obs_file = self.converter_to_stix.create_file( + hashes={"MD5": file["Md5"]}, + score=self.zone_octi_score_mapping[file["Zone"].lower()], + ) + + if obs_file: + file_objects.append(obs_file.to_stix2_object()) + file_first_seen_datetime, file_last_seen_datetime = ( + get_first_and_last_seen_datetime( + file["FirstSeen"], + file["LastSeen"], + ) + ) + file_relation = self.converter_to_stix.create_relationship( + source_obj=observable_to_ref, + relationship_type="related-to", + target_obj=obs_file, + start_time=file_first_seen_datetime, + stop_time=file_last_seen_datetime, + ) + file_objects.append(file_relation.to_stix2_object()) + return file_objects diff --git a/internal-enrichment/kaspersky-enrichment/src/connector/use_cases/enrich_domain.py b/internal-enrichment/kaspersky-enrichment/src/connector/use_cases/enrich_domain.py index e66d9e92c74..43919d88a7f 100644 --- a/internal-enrichment/kaspersky-enrichment/src/connector/use_cases/enrich_domain.py +++ b/internal-enrichment/kaspersky-enrichment/src/connector/use_cases/enrich_domain.py @@ -1,21 +1,22 @@ +import logging + from connector.converter_to_stix import ConverterToStix from connector.use_cases.common import BaseUseCases from connector.utils import get_first_and_last_seen_datetime from kaspersky_client import KasperskyClient -from pycti import OpenCTIConnectorHelper class DomainEnricher(BaseUseCases): def __init__( self, - helper: OpenCTIConnectorHelper, + connector_logger: logging.Logger, client: KasperskyClient, sections: str, zone_octi_score_mapping: dict, converter_to_stix: ConverterToStix, ): - BaseUseCases.__init__(self, helper, converter_to_stix) - self.helper = helper + BaseUseCases.__init__(self, connector_logger, converter_to_stix) + self.connector_logger = connector_logger self.client = client self.sections = sections self.zone_octi_score_mapping = zone_octi_score_mapping @@ -29,24 +30,28 @@ def process_domain_enrichment(self, observable: dict) -> list: observable_to_ref = self.converter_to_stix.create_reference( obs_id=observable["id"] ) - self.helper.connector_logger.info("[CONNECTOR] Starting enrichment...") + self.connector_logger.info( + "[ENRICH DOMAIN] Starting enrichment...", + {"observable_id": observable["id"]}, + ) # Retrieve domain obs_domain = observable["value"] # Get entity data from api client - entity_data = self.client.get_domain_info(obs_domain, self.sections) + entity_data = self.client.get_data("domain", obs_domain, self.sections) # Check Quota self.check_quota(entity_data["LicenseInfo"]) # Create and add author, TLP clear and TLP amber to octi_objects - octi_objects += self.generate_author_and_tlp_markings() + octi_objects.extend(self.generate_author_and_tlp_markings()) # Manage DomainGeneralInfo data - self.helper.connector_logger.info( - "[CONNECTOR] Process enrichment from DomainGeneralInfo data..." + self.connector_logger.info( + "[ENRICH DOMAIN] Process enrichment from DomainGeneralInfo data...", + {"observable_id": observable["id"]}, ) # Score @@ -65,11 +70,12 @@ def process_domain_enrichment(self, observable: dict) -> list: # Manage DomainDnsResolutions - self.helper.connector_logger.info( - "[CONNECTOR] Process enrichment from DomainDnsResolutions data..." - ) - if entity_data.get("DomainDnsResolutions"): + self.connector_logger.info( + "[ENRICH DOMAIN] Process enrichment from DomainDnsResolutions data...", + {"observable_id": observable["id"]}, + ) + ipv4_entities = entity_data["DomainDnsResolutions"] for ipv4_entity in ipv4_entities: obs_ipv4 = self.converter_to_stix.create_ipv4(ipv4_entity["Ip"]) @@ -85,39 +91,19 @@ def process_domain_enrichment(self, observable: dict) -> list: # Manage FilesDownloaded - self.helper.connector_logger.info( - "[CONNECTOR] Process enrichment from FilesDownloaded data..." - ) - if entity_data.get("FilesDownloaded"): + self.connector_logger.info( + "[ENRICH DOMAIN] Process enrichment from FilesDownloaded data...", + {"observable_id": observable["id"]}, + ) files_downloaded = entity_data["FilesDownloaded"] - for file_downloaded_entity in files_downloaded: - # Create File object and relation - obs_file = self.converter_to_stix.create_file( - hashes={"MD5": file_downloaded_entity["Md5"]}, - score=self.zone_octi_score_mapping[ - file_downloaded_entity["Zone"].lower() - ], - ) - if obs_file: - octi_objects.append(obs_file.to_stix2_object()) - file_first_seen_datetime, file_last_seen_datetime = ( - get_first_and_last_seen_datetime( - file_downloaded_entity["FirstSeen"], - file_downloaded_entity["LastSeen"], - ) - ) - file_relation = self.converter_to_stix.create_relationship( - source_obj=observable_to_ref, - relationship_type="related-to", - target_obj=obs_file, - start_time=file_first_seen_datetime, - stop_time=file_last_seen_datetime, - ) - octi_objects.append(file_relation.to_stix2_object()) + # Create File object and relation + octi_objects.extend(self.manage_files(files_downloaded, observable_to_ref)) + + # Create Url object and relation + for file_downloaded_entity in files_downloaded: - # Create Url object and relation obs_url = self.converter_to_stix.create_url( obs_url_score=self.zone_octi_score_mapping[ file_downloaded_entity["Zone"].lower() @@ -144,11 +130,12 @@ def process_domain_enrichment(self, observable: dict) -> list: # Manage FilesAccessed - self.helper.connector_logger.info( - "[CONNECTOR] Process enrichment from FilesAccessed data..." - ) - if entity_data.get("FilesAccessed"): + self.connector_logger.info( + "[ENRICH DOMAIN] Process enrichment from FilesAccessed data...", + {"observable_id": observable["id"]}, + ) + files_accessed = entity_data["FilesAccessed"] for file_accessed in files_accessed: obs_file_accessed = self.converter_to_stix.create_file( @@ -180,21 +167,9 @@ def process_domain_enrichment(self, observable: dict) -> list: # Manage Industries data - self.helper.connector_logger.info( - "[CONNECTOR] Process enrichment from Industries data..." - ) - if entity_data.get("Industries"): - for industry in entity_data["Industries"]: - industry_object = self.converter_to_stix.create_sector(industry) - - if industry_object: - octi_objects.append(industry_object.to_stix2_object()) - industry_relation = self.converter_to_stix.create_relationship( - relationship_type="related-to", - source_obj=observable_to_ref, - target_obj=industry_object, - ) - octi_objects.append(industry_relation.to_stix2_object()) + octi_objects.extend( + self.manage_industries(observable_to_ref, entity_data["Industries"]) + ) return octi_objects diff --git a/internal-enrichment/kaspersky-enrichment/src/connector/use_cases/enrich_file.py b/internal-enrichment/kaspersky-enrichment/src/connector/use_cases/enrich_file.py index 7628201dd00..c489705f1ed 100644 --- a/internal-enrichment/kaspersky-enrichment/src/connector/use_cases/enrich_file.py +++ b/internal-enrichment/kaspersky-enrichment/src/connector/use_cases/enrich_file.py @@ -1,21 +1,22 @@ +import logging + from connector.converter_to_stix import ConverterToStix from connector.use_cases.common import BaseUseCases from connector.utils import resolve_file_hash from kaspersky_client import KasperskyClient -from pycti import OpenCTIConnectorHelper class FileEnricher(BaseUseCases): def __init__( self, - helper: OpenCTIConnectorHelper, + connector_logger: logging.Logger, client: KasperskyClient, sections: str, zone_octi_score_mapping: dict, converter_to_stix: ConverterToStix, ): - BaseUseCases.__init__(self, helper, converter_to_stix) - self.helper = helper + BaseUseCases.__init__(self, connector_logger, converter_to_stix) + self.connector_logger = connector_logger self.client = client self.sections = sections self.zone_octi_score_mapping = zone_octi_score_mapping @@ -29,24 +30,28 @@ def process_file_enrichment(self, observable: dict) -> list: observable_to_ref = self.converter_to_stix.create_reference( obs_id=observable["id"] ) - self.helper.connector_logger.info("[CONNECTOR] Starting enrichment...") + self.connector_logger.info( + "[ENRICH FILE] Starting enrichment...", + {"observable_id": observable["id"]}, + ) # Retrieve file hash obs_hash = resolve_file_hash(observable) # Get entity data from api client - entity_data = self.client.get_file_info(obs_hash, self.sections) + entity_data = self.client.get_data("hash", obs_hash, self.sections) # Check Quota self.check_quota(entity_data["LicenseInfo"]) # Create and add author, TLP clear and TLP amber to octi_objects - octi_objects += self.generate_author_and_tlp_markings() + octi_objects.extend(self.generate_author_and_tlp_markings()) # Manage FileGeneralInfo data - self.helper.connector_logger.info( - "[CONNECTOR] Process enrichment from FileGeneralInfo data..." + self.connector_logger.info( + "[ENRICH FILE] Process enrichment from FileGeneralInfo data...", + {"observable_id": observable["id"]}, ) # Score @@ -73,15 +78,16 @@ def process_file_enrichment(self, observable: dict) -> list: if entity_file_general_info.get("Categories"): observable["labels"] = observable.get("x_opencti_labels", []) for label in entity_file_general_info["Categories"]: - pretty_label = label.replace("CATEGORY_", "").replace("_", "") + pretty_label = label.replace("CATEGORY_", "").replace("_", " ") if pretty_label not in observable["labels"]: observable["labels"].append(pretty_label) # Manage FileNames data if entity_data.get("FileNames"): - self.helper.connector_logger.info( - "[CONNECTOR] Process enrichment from FileNames data..." + self.connector_logger.info( + "[ENRICH FILE] Process enrichment from FileNames data...", + {"observable_id": observable["id"]}, ) observable["additional_names"] = observable.get( @@ -94,16 +100,17 @@ def process_file_enrichment(self, observable: dict) -> list: # Manage DetectionsInfo data if entity_data.get("DetectionsInfo"): - self.helper.connector_logger.info( - "[CONNECTOR] Process enrichment from DetectionsInfo data..." + self.connector_logger.info( + "[ENRICH FILE] Process enrichment from DetectionsInfo data...", + {"observable_id": observable["id"]}, ) content = "| Detection Date | Detection Name | Detection Method |\n" content += "|----------------|----------------|------------------|\n" for obs_detection_info in entity_data["DetectionsInfo"]: - detection_name = f"[{obs_detection_info["DetectionName"]}]({obs_detection_info["DescriptionUrl"]})" - content += f"| {obs_detection_info["LastDetectDate"]} | {detection_name} | {obs_detection_info["DetectionMethod"]} |\n" + detection_name = f"[{obs_detection_info['DetectionName']}]({obs_detection_info['DescriptionUrl']})" + content += f"| {obs_detection_info['LastDetectDate']} | {detection_name} | {obs_detection_info['DetectionMethod']} |\n" obs_note = self.converter_to_stix.create_note(observable_to_ref, content) octi_objects.append(obs_note.to_stix2_object()) @@ -111,8 +118,9 @@ def process_file_enrichment(self, observable: dict) -> list: # Manage FileDownloadedFromUrls data if entity_data.get("FileDownloadedFromUrls"): - self.helper.connector_logger.info( - "[CONNECTOR] Process enrichment from FileDownloadedFromUrls data..." + self.connector_logger.info( + "[ENRICH FILE] Process enrichment from FileDownloadedFromUrls data...", + {"observable_id": observable["id"]}, ) for url_info in entity_data["FileDownloadedFromUrls"]: @@ -133,20 +141,8 @@ def process_file_enrichment(self, observable: dict) -> list: # Manage Industries data if entity_data.get("Industries"): - self.helper.connector_logger.info( - "[CONNECTOR] Process enrichment from Industries data..." + octi_objects.extend( + self.manage_industries(observable_to_ref, entity_data["Industries"]) ) - for industry in entity_data["Industries"]: - industry_object = self.converter_to_stix.create_sector(industry) - - if industry_object: - octi_objects.append(industry_object.to_stix2_object()) - industry_relation = self.converter_to_stix.create_relationship( - relationship_type="related-to", - source_obj=observable_to_ref, - target_obj=industry_object, - ) - octi_objects.append(industry_relation.to_stix2_object()) - return octi_objects diff --git a/internal-enrichment/kaspersky-enrichment/src/connector/use_cases/enrich_ipv4.py b/internal-enrichment/kaspersky-enrichment/src/connector/use_cases/enrich_ipv4.py index b72557992a3..f1ba7c71fbb 100644 --- a/internal-enrichment/kaspersky-enrichment/src/connector/use_cases/enrich_ipv4.py +++ b/internal-enrichment/kaspersky-enrichment/src/connector/use_cases/enrich_ipv4.py @@ -1,21 +1,22 @@ +import logging + from connector.converter_to_stix import ConverterToStix from connector.use_cases.common import BaseUseCases from connector.utils import get_first_and_last_seen_datetime from kaspersky_client import KasperskyClient -from pycti import OpenCTIConnectorHelper class Ipv4Enricher(BaseUseCases): def __init__( self, - helper: OpenCTIConnectorHelper, + connector_logger: logging.Logger, client: KasperskyClient, sections: str, zone_octi_score_mapping: dict, converter_to_stix: ConverterToStix, ): - BaseUseCases.__init__(self, helper, converter_to_stix) - self.helper = helper + BaseUseCases.__init__(self, connector_logger, converter_to_stix) + self.connector_logger = connector_logger self.client = client self.sections = sections self.zone_octi_score_mapping = zone_octi_score_mapping @@ -29,24 +30,25 @@ def process_ipv4_enrichment(self, observable: dict) -> list: observable_to_ref = self.converter_to_stix.create_reference( obs_id=observable["id"] ) - self.helper.connector_logger.info("[CONNECTOR] Starting enrichment...") + self.connector_logger.info("[ENRICH FILE] Starting enrichment...") # Retrieve ipv4 obs_ipv4 = observable["value"] # Get entity data from api client - entity_data = self.client.get_ipv4_info(obs_ipv4, self.sections) + entity_data = self.client.get_data("ip", obs_ipv4, self.sections) # Check Quota self.check_quota(entity_data["LicenseInfo"]) # Create and add author, TLP clear and TLP amber to octi_objects - octi_objects += self.generate_author_and_tlp_markings() + octi_objects.extend(self.generate_author_and_tlp_markings()) # Manage IpGeneralInfo data - self.helper.connector_logger.info( - "[CONNECTOR] Process enrichment from IpGeneralInfo data..." + self.connector_logger.info( + "[ENRICH FILE] Process enrichment from IpGeneralInfo data...", + {"observable_id": observable["id"]}, ) # Score @@ -59,7 +61,7 @@ def process_ipv4_enrichment(self, observable: dict) -> list: if entity_general_info.get("Categories"): observable["labels"] = observable.get("x_opencti_labels", []) for label in entity_general_info["Categories"]: - pretty_label = label.replace("CATEGORY_", "").replace("_", "") + pretty_label = label.replace("CATEGORY_", "").replace("_", " ") if pretty_label not in observable["labels"]: observable["labels"].append(pretty_label) @@ -80,11 +82,12 @@ def process_ipv4_enrichment(self, observable: dict) -> list: # Manage FilesDownloadedFromIp data - self.helper.connector_logger.info( - "[CONNECTOR] Process enrichment from FilesDownloadedFromIp data..." - ) - if entity_data.get("FilesDownloadedFromIp"): + self.connector_logger.info( + "[ENRICH FILE] Process enrichment from FilesDownloadedFromIp data...", + {"observable_id": observable["id"]}, + ) + for file in entity_data["FilesDownloadedFromIp"]: obs_file = self.converter_to_stix.create_file( hashes={"MD5": file["Md5"]}, @@ -109,11 +112,12 @@ def process_ipv4_enrichment(self, observable: dict) -> list: # Manage HostedUrls data - self.helper.connector_logger.info( - "[CONNECTOR] Process enrichment from HostedUrls data..." - ) - if entity_data.get("HostedUrls"): + self.connector_logger.info( + "[ENRICH FILE] Process enrichment from HostedUrls data...", + {"observable_id": observable["id"]}, + ) + for url_entity in entity_data["HostedUrls"]: obs_url = self.converter_to_stix.create_url( obs_url_score=self.zone_octi_score_mapping[ @@ -140,11 +144,12 @@ def process_ipv4_enrichment(self, observable: dict) -> list: # Manage IpWhoIs data - self.helper.connector_logger.info( - "[CONNECTOR] Process enrichment from IpWhoIs data..." - ) - if entity_data.get("IpWhoIs") and entity_data["IpWhoIs"].get("Asn"): + self.connector_logger.info( + "[ENRICH FILE] Process enrichment from IpWhoIs data...", + {"observable_id": observable["id"]}, + ) + asn_entities = entity_data["IpWhoIs"]["Asn"] for asn_entity in asn_entities: obs_asn = self.converter_to_stix.create_autonomous_system( @@ -162,11 +167,12 @@ def process_ipv4_enrichment(self, observable: dict) -> list: # Manage IpDnsResolutions - self.helper.connector_logger.info( - "[CONNECTOR] Process enrichment from IpDnsResolutions data..." - ) - if entity_data.get("IpDnsResolutions"): + self.connector_logger.info( + "[ENRICH FILE] Process enrichment from IpDnsResolutions data...", + {"observable_id": observable["id"]}, + ) + for resolution in entity_data["IpDnsResolutions"]: obs_domain = self.converter_to_stix.create_domain( name=resolution["Domain"], @@ -191,21 +197,9 @@ def process_ipv4_enrichment(self, observable: dict) -> list: # Manage Industries data - self.helper.connector_logger.info( - "[CONNECTOR] Process enrichment from Industries data..." - ) - if entity_data.get("Industries"): - for industry in entity_data["Industries"]: - industry_object = self.converter_to_stix.create_sector(industry) - - if industry_object: - octi_objects.append(industry_object.to_stix2_object()) - industry_relation = self.converter_to_stix.create_relationship( - relationship_type="related-to", - source_obj=observable_to_ref, - target_obj=industry_object, - ) - octi_objects.append(industry_relation.to_stix2_object()) + octi_objects.extend( + self.manage_industries(observable_to_ref, entity_data["Industries"]) + ) return octi_objects diff --git a/internal-enrichment/kaspersky-enrichment/src/connector/use_cases/enrich_url.py b/internal-enrichment/kaspersky-enrichment/src/connector/use_cases/enrich_url.py new file mode 100644 index 00000000000..1bb3a17177b --- /dev/null +++ b/internal-enrichment/kaspersky-enrichment/src/connector/use_cases/enrich_url.py @@ -0,0 +1,129 @@ +import logging + +from connector.converter_to_stix import ConverterToStix +from connector.use_cases.common import BaseUseCases +from kaspersky_client import KasperskyClient +from validators import domain, ip_address + + +class UrlEnricher(BaseUseCases): + def __init__( + self, + connector_logger: logging.Logger, + client: KasperskyClient, + sections: str, + zone_octi_score_mapping: dict, + converter_to_stix: ConverterToStix, + ): + BaseUseCases.__init__(self, connector_logger, converter_to_stix) + self.connector_logger = connector_logger + self.client = client + self.sections = sections + self.zone_octi_score_mapping = zone_octi_score_mapping + self.converter_to_stix = converter_to_stix + + def process_url_enrichment(self, observable: dict) -> list: + """ + Collect intelligence from the source for an URL type + """ + octi_objects = [] + observable_to_ref = self.converter_to_stix.create_reference( + obs_id=observable["id"] + ) + self.connector_logger.info( + "[ENRICH URL] Starting enrichment...", + {"observable_id": observable["id"]}, + ) + + # Retrieve url + obs_url = ( + observable["value"] + if observable["value"].startswith("http") + else "http://" + observable["value"] + ) + + # Get entity data from api client + entity_data = self.client.get_data("url", obs_url, self.sections) + + # Check Quota + self.check_quota(entity_data["LicenseInfo"]) + + # Create and add author, TLP clear and TLP amber to octi_objects + octi_objects.extend(self.generate_author_and_tlp_markings()) + + # Manage UrlGeneralInfo data + + self.connector_logger.info( + "[ENRICH URL] Process enrichment from UrlGeneralInfo data...", + {"observable_id": observable["id"]}, + ) + + # Score + if entity_data.get("Zone"): + observable = self.update_observable_score(entity_data["Zone"], observable) + + entity_general_info = entity_data["UrlGeneralInfo"] + + if entity_general_info.get("Categories"): + # Labels + observable["labels"] = observable.get("x_opencti_labels", []) + for label in entity_general_info["Categories"]: + pretty_label = label.replace("CATEGORY_", "").replace("_", " ") + if pretty_label not in observable["labels"]: + observable["labels"].append(pretty_label) + + # Host + if entity_general_info.get("Host"): + host_object = None + if domain(entity_general_info["Host"]): + host_object = self.converter_to_stix.create_domain( + name=entity_general_info["Host"] + ) + elif ip_address.ipv4(entity_general_info["Host"]): + host_object = self.converter_to_stix.create_ipv4( + ip=entity_general_info["Host"] + ) + elif ip_address.ipv6(entity_general_info["Host"]): + host_object = self.converter_to_stix.create_ipv6( + ip=entity_general_info["Host"] + ) + + if host_object: + octi_objects.append(host_object.to_stix2_object()) + domain_relation = self.converter_to_stix.create_relationship( + relationship_type="related-to", + source_obj=observable_to_ref, + target_obj=host_object, + ) + octi_objects.append(domain_relation.to_stix2_object()) + + # Manage FilesDownloaded data + + if entity_data.get("FilesDownloaded"): + self.connector_logger.info( + "[ENRICH URL] Process enrichment from FilesDownloaded data...", + {"observable_id": observable["id"]}, + ) + octi_objects.extend( + self.manage_files(entity_data["FilesDownloaded"], observable_to_ref) + ) + + # Manage FilesAccessed data + + if entity_data.get("FilesAccessed"): + self.connector_logger.info( + "[ENRICH URL] Process enrichment from FilesAccessed data...", + {"observable_id": observable["id"]}, + ) + octi_objects.extend( + self.manage_files(entity_data["FilesAccessed"], observable_to_ref) + ) + + # Manage Industries data + + if entity_data.get("Industries"): + octi_objects.extend( + self.manage_industries(observable_to_ref, entity_data["Industries"]) + ) + + return octi_objects diff --git a/internal-enrichment/kaspersky-enrichment/src/kaspersky_client/api_client.py b/internal-enrichment/kaspersky-enrichment/src/kaspersky_client/api_client.py index 5d80f280d16..97322f07ceb 100644 --- a/internal-enrichment/kaspersky-enrichment/src/kaspersky_client/api_client.py +++ b/internal-enrichment/kaspersky-enrichment/src/kaspersky_client/api_client.py @@ -30,7 +30,7 @@ def __init__( self.session.headers.update(self.headers) self.params = params - def _request_data(self, api_url: str, params: dict) -> requests.Response: + def _request_data(self, api_url: HttpUrl, params: dict) -> requests.Response: """ Internal method to handle API requests :return: Response in JSON format @@ -69,29 +69,11 @@ def _request_data(self, api_url: str, params: dict) -> requests.Response: self.helper.connector_logger.error("Unknown error", {"error": err}) raise - def get_file_info(self, obs_hash: str, sections: str) -> dict: + def get_data(self, type: str, obs: str, sections: str) -> dict: """ - Retrieve file information + Retrieve data for current observable """ - file_url = f"{self.base_url}api/hash/{obs_hash}" + url = HttpUrl(f"{self.base_url}api/{type}/{obs}") self.params["sections"] = sections - response = self._request_data(file_url, params=self.params) - return response.json() - - def get_ipv4_info(self, obs_ipv4: str, sections: str) -> dict: - """ - Retrieve ipv4 information - """ - file_url = f"{self.base_url}api/ip/{obs_ipv4}" - self.params["sections"] = sections - response = self._request_data(file_url, params=self.params) - return response.json() - - def get_domain_info(self, obs_domain: str, sections: str) -> dict: - """ - Retrieve domain information - """ - file_url = f"{self.base_url}api/domain/{obs_domain}" - self.params["sections"] = sections - response = self._request_data(file_url, params=self.params) + response = self._request_data(url, params=self.params) return response.json() diff --git a/internal-enrichment/kaspersky-enrichment/src/requirements.txt b/internal-enrichment/kaspersky-enrichment/src/requirements.txt index 84329aaae8f..7a3cdea2463 100644 --- a/internal-enrichment/kaspersky-enrichment/src/requirements.txt +++ b/internal-enrichment/kaspersky-enrichment/src/requirements.txt @@ -3,3 +3,4 @@ pydantic~= 2.11.3 pydantic-settings==2.11.0 requests~=2.32.3 connectors-sdk @ git+https://github.com/OpenCTI-Platform/connectors.git@6.9.7#subdirectory=connectors-sdk +validators diff --git a/internal-enrichment/kaspersky-enrichment/tests/conftest.py b/internal-enrichment/kaspersky-enrichment/tests/conftest.py new file mode 100644 index 00000000000..b595a4348e3 --- /dev/null +++ b/internal-enrichment/kaspersky-enrichment/tests/conftest.py @@ -0,0 +1,105 @@ +import json +import os +import sys +from typing import Any +from unittest.mock import MagicMock, Mock + +import pytest + +sys.path.append(os.path.join(os.path.dirname(__file__), "..", "src")) +from connector import KasperskyConnector +from kaspersky_client.api_client import KasperskyClient + + +@pytest.fixture(scope="class") +def setup_config(request): + """ + Setup configuration for class method + Create fake pycti OpenCTI helper + """ + request.cls.mock_helper = MagicMock() + request.cls.mock_config = Mock() + request.cls.connector = KasperskyConnector( + config=request.cls.mock_config, helper=request.cls.mock_helper + ) + request.cls.api_client = KasperskyClient( + helper=request.cls.mock_helper, + base_url="https://test.url/", + api_key="key", + params={}, + ) + + yield + + +@pytest.fixture +def enrichment_data(): + with open( + os.path.join( + os.path.join(os.path.dirname(__file__), "fixtures"), "enrichment_data.json" + ) + ) as file: + return json.load(file) + + +@pytest.fixture +def fixture_data() -> dict[str, Any]: + return { + "event_type": "INTERNAL_ENRICHMENT", + "entity_id": "hostname--01a98dfa-fdb3-4da1-be9a-ed06c3f8e940", + "entity_type": "Hostname", + "stix_entity": { + "id": "hostname--01a98dfa-fdb3-4da1-be9a-ed06c3f8e940", + "spec_version": "2.1", + "x_opencti_score": 0, + "value": "hostname-to-enrich.com", + "x_opencti_id": "b2a3351f-df4a-4f7d-bc3e-7732f00e29fc", + "x_opencti_type": "Hostname", + "type": "hostname", + }, + "stix_objects": [ + { + "id": "hostname--01a98dfa-fdb3-4da1-be9a-ed06c3f8e940", + "spec_version": "2.1", + "x_opencti_score": 0, + "value": "hostname-to-enrich.com", + "x_opencti_id": "b2a3351f-df4a-4f7d-bc3e-7732f00e29fc", + "x_opencti_type": "Hostname", + "type": "hostname", + } + ], + "enrichment_entity": { + "id": "b2a3351f-df4a-4f7d-bc3e-7732f00e29fc", + "standard_id": "hostname--01a98dfa-fdb3-4da1-be9a-ed06c3f8e940", + "entity_type": "Hostname", + "parent_types": [ + "Basic-Object", + "Stix-Object", + "Stix-Core-Object", + "Stix-Cyber-Observable", + ], + "spec_version": "2.1", + "created_at": "2025-12-24T08:15:03.998Z", + "updated_at": "2025-12-24T08:15:09.337Z", + "objectOrganization": [], + "creators": [ + {"id": "88ec0c6a-13ce-5e39-b486-354fe4a7084f", "name": "admin"} + ], + "createdBy": None, + "objectMarking": [{"definition_type": "TLP", "definition": "TLP:RED"}], + "objectLabel": [], + "externalReferences": [], + "observable_value": "hostname-to-enrich.com", + "x_opencti_description": None, + "x_opencti_score": 0, + "indicators": [], + "value": "hostname-to-enrich.com", + "importFiles": [], + "createdById": None, + "objectMarkingIds": [], + "objectLabelIds": [], + "externalReferencesIds": [], + "indicatorsIds": [], + "importFilesIds": [], + }, + } diff --git a/internal-enrichment/kaspersky-enrichment/tests/test-requirements.txt b/internal-enrichment/kaspersky-enrichment/tests/test-requirements.txt new file mode 100644 index 00000000000..bdef682113c --- /dev/null +++ b/internal-enrichment/kaspersky-enrichment/tests/test-requirements.txt @@ -0,0 +1,2 @@ +-r ../src/requirements.txt +pytest==8.4.2 diff --git a/internal-enrichment/kaspersky-enrichment/tests/test_main.py b/internal-enrichment/kaspersky-enrichment/tests/test_main.py new file mode 100644 index 00000000000..6d977816e50 --- /dev/null +++ b/internal-enrichment/kaspersky-enrichment/tests/test_main.py @@ -0,0 +1,97 @@ +from typing import Any +from unittest.mock import MagicMock + +import pytest +from connector import ConnectorSettings, KasperskyConnector +from pycti import OpenCTIConnectorHelper + + +@pytest.fixture +def mock_opencti_connector_helper(monkeypatch): + """Mock all heavy dependencies of OpenCTIConnectorHelper, typically API calls to OpenCTI.""" + + module_import_path = "pycti.connector.opencti_connector_helper" + monkeypatch.setattr(f"{module_import_path}.OpenCTIApiClient", MagicMock()) + + +class StubConnectorSettings(ConnectorSettings): + """ + Subclass of `ConnectorSettings` (implementation of `BaseConnectorSettings`) for testing purpose. + It overrides `BaseConnectorSettings._load_config_dict` to return a fake but valid config dict. + """ + + @classmethod + def _load_config_dict(cls, _, handler) -> dict[str, Any]: + return handler( + { + "opencti": { + "url": "http://localhost:8080", + "token": "test-token", + }, + "connector": { + "id": "connector-id", + "name": "Test Connector", + "scope": "test, connector", + "log_level": "error", + "auto": True, + }, + "kaspersky": { + "api_base_url": "https://tip.kaspersky.com", + "api_key": "SecretStr", + "max_tlp": "TLP:AMBER", + "zone_octi_score_mapping": "red:100,orange:80,yellow:60,gray:20,green:0", + "file_sections": "LicenseInfo,Zone,FileGeneralInfo", + "ipv4_sections": "LicenseInfo,Zone,IpGeneralInfo", + "domain_sections": "LicenseInfo,Zone,DomainGeneralInfo", + }, + } + ) + + +def test_connector_settings_is_instantiated(): + """ + Test that the implementation of `BaseConnectorSettings` (from `connectors-sdk`) can be instantiated successfully: + - the implemented class MUST have a method `to_helper_config` (inherited from `BaseConnectorSettings`) + - the method `to_helper_config` MUST return a dict (as in base class) + """ + settings = StubConnectorSettings() + + assert isinstance(settings, ConnectorSettings) + assert isinstance(settings.to_helper_config(), dict) + + +def test_connector_helper_is_instantiated(mock_opencti_connector_helper): + """ + Test that `OpenCTIConnectorHelper` (from `pycti`) can be instantiated successfully: + - the value of `settings.to_helper_config` MUST be the expected dict for `OpenCTIConnectorHelper` + - the helper MUST be able to get its instance's attributes from the config dict + + :param mock_opencti_connector_helper: `OpenCTIConnectorHelper` is mocked during this test to avoid any external calls to OpenCTI API + """ + settings = StubConnectorSettings() + helper = OpenCTIConnectorHelper(config=settings.to_helper_config()) + + assert helper.opencti_url == "http://localhost:8080/" + assert helper.opencti_token == "test-token" + assert helper.connect_id == "connector-id" + assert helper.connect_name == "Test Connector" + assert helper.connect_scope == "test,connector" + assert helper.log_level == "ERROR" + assert helper.connect_auto + + +def test_connector_is_instantiated(mock_opencti_connector_helper): + """ + Test that the connector's main class can be instantiated successfully: + - the connector's main class MUST be able to access env/config vars through `self.config` + - the connector's main class MUST be able to access `pycti` API through `self.helper` + + :param mock_opencti_connector_helper: `OpenCTIConnectorHelper` is mocked during this test to avoid any external calls to OpenCTI API + """ + settings = StubConnectorSettings() + helper = OpenCTIConnectorHelper(config=settings.to_helper_config()) + + connector = KasperskyConnector(config=settings, helper=helper) + + assert connector.config == settings + assert connector.helper == helper diff --git a/internal-enrichment/kaspersky-enrichment/tests/tests_connector/test_api_client.py b/internal-enrichment/kaspersky-enrichment/tests/tests_connector/test_api_client.py new file mode 100644 index 00000000000..1743ff18264 --- /dev/null +++ b/internal-enrichment/kaspersky-enrichment/tests/tests_connector/test_api_client.py @@ -0,0 +1,68 @@ +from unittest import mock + +import pytest +import requests + + +@pytest.mark.usefixtures("setup_config") +class TestApiClient(object): + + @pytest.mark.parametrize( + "exceptions_type, exception_raised, status_code, error_message", + [ + ( + requests.exceptions.HTTPError(), + requests.exceptions.HTTPError, + 401, + "Permissions Error, Kaspersky returned a 401, please check your API key", + ), + ( + requests.exceptions.HTTPError(), + requests.exceptions.HTTPError, + 404, + "File not found on Kaspersky, no enrichment possible", + ), + ( + requests.exceptions.HTTPError(), + requests.exceptions.HTTPError, + None, + "Http error", + ), + ( + requests.exceptions.ConnectionError(), + requests.exceptions.ConnectionError, + None, + "Error connecting", + ), + ( + requests.exceptions.Timeout(), + requests.exceptions.Timeout, + None, + "Timeout error", + ), + ( + requests.exceptions.RequestException(), + requests.exceptions.RequestException, + None, + "Something else happened", + ), + ], + ) + @pytest.mark.usefixtures("fixture_data") + def test_api_errors( + self, exceptions_type, exception_raised, status_code, error_message + ): + mock_response = mock.MagicMock() + mock_response.raise_for_status.side_effect = exceptions_type + with mock.patch.object(self.mock_helper, "connect_scope", "Hostname"): + with mock.patch( + "kaspersky_client.api_client.requests.Session.get" + ) as mock_get: + mock_get.return_value = mock_response + if status_code: + mock_get.return_value.status_code = status_code + with pytest.raises(exception_raised): + self.api_client._request_data("https://test.com", {}) + self.mock_helper.connector_logger.error.assert_called_with( + error_message, {"error": exceptions_type} + ) diff --git a/internal-enrichment/kaspersky-enrichment/tests/tests_connector/test_kaspersky_connector.py b/internal-enrichment/kaspersky-enrichment/tests/tests_connector/test_kaspersky_connector.py new file mode 100644 index 00000000000..fe07f29a830 --- /dev/null +++ b/internal-enrichment/kaspersky-enrichment/tests/tests_connector/test_kaspersky_connector.py @@ -0,0 +1,40 @@ +from unittest import mock + +import pytest + + +@pytest.mark.usefixtures("setup_config") +class TestKasperskyConnector(object): + + def test_entity_is_none(self): + with pytest.raises( + Exception, + match=r"\[Kaspersky Enrichment\] Unexpected Error occurred: 'NoneType' object is not subscriptable", + ): + self.connector.process_message(None) + + @pytest.mark.usefixtures("fixture_data") + def test_entity_is_out_of_scope(self, fixture_data): + self.connector.helper.send_stix2_bundle.reset_mock() + fixture_data["enrichment_entity"][ + "entity_id" + ] = "identity--ae798684-7b0a-4229-928b-4b9480e782d0" + fixture_data["enrichment_entity"]["entity_type"] = "Identity" + + with mock.patch.object(self.mock_helper, "connect_scope", "Hostname"): + with pytest.raises( + Exception, + match=r"\[Kaspersky Enrichment\] Unexpected Error occurred: Failed to process observable, Identity is not a supported entity type.", + ): + self.connector.process_message(fixture_data) + self.connector.helper.send_stix2_bundle.assert_not_called() + + @pytest.mark.usefixtures("fixture_data") + def test_skip_entity_with_lower_tlp(self, fixture_data): + with mock.patch.object(self.mock_config.kaspersky, "max_tlp", "TLP:GREEN"): + self.mock_helper.check_max_tlp.return_value = False + assert ( + self.connector.process_message(fixture_data) + == """Do not send any data, TLP of the entity is (TLP:RED), which + is greater than MAX TLP: (TLP:GREEN)""" + ) diff --git a/internal-enrichment/kaspersky-enrichment/tests/tests_connector/test_settings.py b/internal-enrichment/kaspersky-enrichment/tests/tests_connector/test_settings.py new file mode 100644 index 00000000000..0fc0105e19c --- /dev/null +++ b/internal-enrichment/kaspersky-enrichment/tests/tests_connector/test_settings.py @@ -0,0 +1,149 @@ +from typing import Any + +import pytest +from connector import ConnectorSettings +from connectors_sdk import BaseConfigModel, ConfigValidationError + + +@pytest.mark.parametrize( + "settings_dict", + [ + pytest.param( + { + "opencti": { + "url": "http://localhost:8080", + "token": "test-token", + }, + "connector": { + "id": "connector-id", + "name": "Test Connector", + "scope": "test, connector", + "log_level": "error", + "auto": True, + }, + "kaspersky": { + "api_base_url": "https://tip.kaspersky.com", + "api_key": "SecretStr", + "max_tlp": "TLP:AMBER", + "zone_octi_score_mapping": "red:100,orange:80,yellow:60,gray:20,green:0", + "file_sections": "LicenseInfo,Zone,FileGeneralInfo", + "ipv4_sections": "LicenseInfo,Zone,IpGeneralInfo", + "domain_sections": "LicenseInfo,Zone,DomainGeneralInfo", + }, + }, + id="full_valid_settings_dict", + ), + pytest.param( + { + "opencti": { + "url": "http://localhost:8080", + "token": "test-token", + }, + "connector": {}, + "kaspersky": {"api_key": "SecretStr"}, + }, + id="minimal_valid_settings_dict", + ), + ], +) +def test_settings_should_accept_valid_input(settings_dict): + """ + Test that `ConnectorSettings` (implementation of `BaseConnectorSettings` from `connectors-sdk`) accepts valid input. + For the test purpose, `BaseConnectorSettings._load_config_dict` is overridden to return + a fake but valid dict (instead of the env/config vars parsed from `config.yml`, `.env` or env vars). + + :param settings_dict: The dict to use as `ConnectorSettings` input + """ + + class FakeConnectorSettings(ConnectorSettings): + """ + Subclass of `ConnectorSettings` (implementation of `BaseConnectorSettings`) for testing purpose. + It overrides `BaseConnectorSettings._load_config_dict` to return a fake but valid config dict. + """ + + @classmethod + def _load_config_dict(cls, _, handler) -> dict[str, Any]: + return handler(settings_dict) + + settings = FakeConnectorSettings() + assert isinstance(settings.opencti, BaseConfigModel) is True + assert isinstance(settings.connector, BaseConfigModel) is True + assert isinstance(settings.kaspersky, BaseConfigModel) is True + + +@pytest.mark.parametrize( + "settings_dict, field_name", + [ + pytest.param({}, "settings", id="empty_settings_dict"), + pytest.param( + { + "opencti": {"url": "http://localhost:PORT", "token": "test-token"}, + "connector": { + "id": "connector-id", + "name": "Test Connector", + "scope": "test, connector", + "log_level": "error", + "auto": True, + }, + "kaspersky": { + "api_base_url": "https://tip.kaspersky.com", + "api_key": "SecretStr", + "max_tlp": "TLP:AMBER", + "zone_octi_score_mapping": "red:100,orange:80,yellow:60,gray:20,green:0", + "file_sections": "LicenseInfo,Zone,FileGeneralInfo", + "ipv4_sections": "LicenseInfo,Zone,IpGeneralInfo", + "domain_sections": "LicenseInfo,Zone,DomainGeneralInfo", + }, + }, + "opencti.url", + id="invalid_opencti_url", + ), + pytest.param( + { + "opencti": { + "url": "http://localhost:8080", + "token": "test-token", + }, + "connector": { + "id": "connector-id", + "name": "Test Connector", + "scope": "test, connector", + "log_level": "error", + "auto": True, + }, + "kaspersky": { + "api_base_url": "https://tip.kaspersky.com", + "max_tlp": "TLP:AMBER", + "zone_octi_score_mapping": "red:100,orange:80,yellow:60,gray:20,green:0", + "file_sections": "LicenseInfo,Zone,FileGeneralInfo", + "ipv4_sections": "LicenseInfo,Zone,IpGeneralInfo", + "domain_sections": "LicenseInfo,Zone,DomainGeneralInfo", + }, + }, + "kaspersky.api_key", + id="missing_kaspersky_api_key", + ), + ], +) +def test_settings_should_raise_when_invalid_input(settings_dict, field_name): + """ + Test that `ConnectorSettings` (implementation of `BaseConnectorSettings` from `connectors-sdk`) raises on invalid input. + For the test purpose, `BaseConnectorSettings._load_config_dict` is overridden to return + a fake and invalid dict (instead of the env/config vars parsed from `config.yml`, `.env` or env vars). + + :param settings_dict: The dict to use as `ConnectorSettings` input + """ + + class FakeConnectorSettings(ConnectorSettings): + """ + Subclass of `ConnectorSettings` (implementation of `BaseConnectorSettings`) for testing purpose. + It overrides `BaseConnectorSettings._load_config_dict` to return a fake but valid config dict. + """ + + @classmethod + def _load_config_dict(cls, _, handler) -> dict[str, Any]: + return handler(settings_dict) + + with pytest.raises(ConfigValidationError) as err: + FakeConnectorSettings() + assert str("Error validating configuration") in str(err)