From 26214a406751c41e2dd3eccaa72515447d351b4f Mon Sep 17 00:00:00 2001 From: plasticuproject <19690649+plasticuproject@users.noreply.github.com> Date: Sun, 29 Sep 2024 12:45:35 -0400 Subject: [PATCH 01/15] Updated example/default feed to use OTX AlienVault/AT&T Cybersecurity, as EclecticIQ OpenTAXII is no longer operational --- README.md | 10 +++++----- threat_intelligence_toolkit.py | 6 +++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 38dc508..6aaf924 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ Automate generating or pulling threat intelligence Structured Threat Information Expression (STIX) files from a flat file or from a TAXII server and uploading a threat collection to an ECA and multiple EDAs via the REST API. By uploading STIX files, you can add a threat collection to your ExtraHop Discover and Command appliances. Threat collections enable you to identify suspicious hosts, IP addresses, and URIs on your network. ## Usage -Specify an output directory, threat collection name, ECA/EDA details, and other optional config via the command line then the script either generates a STIX file or polls the TAXII server (defaults to [EclecticIQ OpenTAXII](https://open.taxiistand.com) intel feed), saves the stix files in a gzipped tar file (tgz), and uploads the threat collection to the specified Reveal(x) ECA/EDAs. +Specify an output directory, threat collection name, ECA/EDA details, and other optional config via the command line then the script either generates a STIX file or polls the TAXII server (defaults to OTX AlienVault/AT&T Cybersecurity](https://otx.alienvault.com) intel feed), saves the stix files in a gzipped tar file (tgz), and uploads the threat collection to the specified Reveal(x) ECA/EDAs. **This script solely serves as example code and is made available without any support or warranty.** @@ -21,16 +21,16 @@ You will need to update the example output paths, collection names, IP addresses - View usage and all possible command line arguments - `python3 threat_intelligence_toolkit.py -h` -- Download, tgz, and upload all collections from the default EclecticIQ OpenTAXII server to an EDA +- Download, tgz, and upload all collections from the default OTX AlienVault/AT&T Cybersecurity server to an EDA - `python3 threat_intelligence_toolkit.py -o ~/output_folder -tc example_collection --eda 172.16.1.2 3Hb7EpHRqb2EpnS7iweHgR5F3sf False` -- Download, tgz, and upload all collections from the default EclecticIQ OpenTAXII server to an ECA and multiple EDAs +- Download, tgz, and upload all collections from the default OTX AlienVault/AT&T Cybersecurity server to an ECA and multiple EDAs - `python3 threat_intelligence_toolkit.py -o ~/output_folder -tc example_collection --eca 172.16.1.1 3Hb7EpHRqb2EpnS7iweHgR5F3sg True --eda 172.16.1.2 3Hb7EpHRqb2EpnS7iweHgR5F3sf False --eda 172.16.1.3 3Hb7EpHRqb2EpnS7iweHgR5F3sf False` -- Download and tgz all collections from the default EclecticIQ OpenTAXII server to be uploaded manually +- Download and tgz all collections from the default OTX AlienVault/AT&T Cybersecurity server to be uploaded manually - `python3 threat_intelligence_toolkit.py -o ~/output_folder -tc example_collection` -- Download and tgz a specific list of collections from the default EclecticIQ OpenTAXII server to be uploaded manually +- Download and tgz a specific list of collections from the default OTX AlienVault/AT&T Cybersecurity server to be uploaded manually - `python3 threat_intelligence_toolkit.py -o ~/output_folder -tc example_collection --taxii-collections vxvault hailataxii.guest.dataForLast_7daysOnly` - Download, tgz, and upload all collections from a specific TAXII server to an EDA diff --git a/threat_intelligence_toolkit.py b/threat_intelligence_toolkit.py index 3744121..2ecb446 100644 --- a/threat_intelligence_toolkit.py +++ b/threat_intelligence_toolkit.py @@ -246,9 +246,9 @@ def generate_stix_file(input_file, list_type, delimiter, list_name, tc_name, tmp # poll a taxii server for stix files def poll_taxii_server(taxii_server, basic_user, basic_pw, taxii_collections, days_to_poll, tmp_dir, verbose): - # if no taxii server details are specified then default to the EclecticIQ OpenTAXII threat intel feed + # if no taxii server details are specified then default to the OTX AlienVault/AT&T Cybersecurity threat intel feed if not taxii_server: - taxii_server = ["open.taxiistand.com", "/services/discovery", "True"] + taxii_server = ["otx.alienvault.com", "/taxii/discovery", "True"] try: # handle taxii server port if supplied @@ -394,4 +394,4 @@ def main(): logging.info('ExtraHop Threat Intelligence Toolkit finished running') if __name__ == '__main__': - main() \ No newline at end of file + main() From 9c9f8167f1516093d34693f22c38700e39fdd7e2 Mon Sep 17 00:00:00 2001 From: plasticuproject <19690649+plasticuproject@users.noreply.github.com> Date: Sun, 29 Sep 2024 13:07:44 -0400 Subject: [PATCH 02/15] Updated dependencies --- requirements.txt | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/requirements.txt b/requirements.txt index 75d6ee5..53f5be2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,21 +1,22 @@ cabby==0.1.23 -certifi==2024.7.4 -charset-normalizer==2.1.1 -colorlog==6.7.0 +certifi==2024.8.30 +charset-normalizer==3.3.2 +colorlog==6.8.2 cybox==2.1.0.21 decorator==5.1.1 furl==2.1.3 -idna==3.7 +idna==3.10 libtaxii==1.1.119 -lxml==4.9.2 +lxml==5.3.0 mixbox==1.0.5 ordered-set==4.1.0 orderedmultidict==1.0.1 -python-dateutil==2.8.2 -pytz==2022.6 -requests==2.32.0 +python-dateutil==2.9.0.post0 +pytz==2024.2 +requests==2.32.3 six==1.16.0 stix==1.2.0.11 -urllib3==1.26.19 -validators==0.20.0 +urllib3==2.2.3 +validators==0.34.0 weakrefmethod==1.0.3 + From 8bf28135e1433bf1d7eb7ec30e025ff01b8748df Mon Sep 17 00:00:00 2001 From: plasticuproject <19690649+plasticuproject@users.noreply.github.com> Date: Sun, 29 Sep 2024 13:12:26 -0400 Subject: [PATCH 03/15] Added gitignore file --- .gitignore | 142 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 142 insertions(+) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..772951a --- /dev/null +++ b/.gitignore @@ -0,0 +1,142 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +.idea/ + +docs/_build From 98028fa8308cec0d71dded96d49f4d17d4c60967 Mon Sep 17 00:00:00 2001 From: plasticuproject <19690649+plasticuproject@users.noreply.github.com> Date: Sun, 29 Sep 2024 15:08:00 -0400 Subject: [PATCH 04/15] Added type annotations --- threat_intelligence_toolkit.py | 72 ++++++++++++++++++++++------------ 1 file changed, 47 insertions(+), 25 deletions(-) diff --git a/threat_intelligence_toolkit.py b/threat_intelligence_toolkit.py index 2ecb446..2308a1b 100644 --- a/threat_intelligence_toolkit.py +++ b/threat_intelligence_toolkit.py @@ -1,14 +1,14 @@ #!/usr/bin/env python3 # # COPYRIGHT 2022 BY EXTRAHOP NETWORKS, INC. -# +# # This file is subject to the terms and conditions defined in # file 'LICENSE', which is part of this source code package. # -# Description: Threat Intelligence Toolkit - Automate generating or pulling threat intelligence Structured Threat -# Information Expression (STIX) files from a flat file or from a TAXII server and uploading a threat collection to -# an ECA and multiple EDAs via the REST API. By uploading STIX files, you can add a threat collection to your ExtraHop -# Discover and Command appliances. Threat collections enable you to identify suspicious hosts, IP addresses, and URIs +# Description: Threat Intelligence Toolkit - Automate generating or pulling threat intelligence Structured Threat +# Information Expression (STIX) files from a flat file or from a TAXII server and uploading a threat collection to +# an ECA and multiple EDAs via the REST API. By uploading STIX files, you can add a threat collection to your ExtraHop +# Discover and Command appliances. Threat collections enable you to identify suspicious hosts, IP addresses, and URIs # on your network # # Usage: Specify an output directory, threat collection name, ECA/EDA details, and other optional config via @@ -20,6 +20,8 @@ # # Version 1.3.6 +from __future__ import annotations +from typing import BinaryIO import cabby import requests import urllib3 @@ -41,8 +43,8 @@ from stix.indicator.indicator import Indicator # parse command line arguments -def parse_command_line_args(): - +def parse_command_line_args() -> argparse.Namespace: + argparser = argparse.ArgumentParser() argparser.add_argument('-o', '--output-dir', action='store', dest='output_dir', help='Existing directory to output tgz containing stix files', required=True, metavar='OUTPUT_DIRECTORY') @@ -76,15 +78,21 @@ def parse_command_line_args(): return args # strip all non alphanumeric chars from a string -def strip_non_alphanum(input_str): +def strip_non_alphanum(input_str: str) -> str: return ''.join(char for char in input_str if char.isalnum()) # convert string to boolean and return True when unsure -def str_to_bool(input_str): +def str_to_bool(input_str: str) -> bool: return input_str.lower() not in ['false', 'f', '0', 'n', 'no'] # send a PUT request to an EDA or ECA threatcollections/{id} endpoint -def threatcollection_api_request(eh_host, eh_apikey, eh_verify_cert, threatcollection_name, file_name, file_path, verbose): +def threatcollection_api_request(eh_host: str, + eh_apikey: str, + eh_verify_cert: bool, + threatcollection_name: str, + file_name: str, + file_path: str, + verbose: bool) -> None: if verbose: logging.info("===============") @@ -98,7 +106,14 @@ def threatcollection_api_request(eh_host, eh_apikey, eh_verify_cert, threatcolle url = "https://{}/api/v1/threatcollections/~{}".format(eh_host, user_key) # configure tgz for multipart file upload - file_body = {'file': (file_name, open(file_path, 'rb')), 'name': threatcollection_name} + # file_body = {'file': (file_name, open(file_path, 'rb')), 'name': threatcollection_name} + with open(file_path, 'rb') as file_handle: # type: BinaryIO + files = { + 'file': (file_name, file_handle) + } + data = { + 'name': threatcollection_name + } # log InsecureRequestWarning if making an unverified https request if not eh_verify_cert: @@ -106,7 +121,8 @@ def threatcollection_api_request(eh_host, eh_apikey, eh_verify_cert, threatcolle try: # send PUT request to create or update - r = requests.put(url, headers=headers, files=file_body, verify=eh_verify_cert) + # r = requests.put(url, headers=headers, files=file_body, verify=eh_verify_cert) + r = requests.put(url, headers=headers, files=files, data=data, verify=eh_verify_cert) except Exception as e: logging.error("Issue encountered while sending an API request to {}. Details: {}".format(url, e)) raise @@ -120,7 +136,10 @@ def threatcollection_api_request(eh_host, eh_apikey, eh_verify_cert, threatcolle return # generate stix files from a flat file or URL to a flat file -def generate_stix_file(input_file, list_type, delimiter, list_name, tc_name, tmp_dir, validate, verbose): +def generate_stix_file(input_file: str, list_type: str, + delimiter: str, list_name: str, + tc_name: str, tmp_dir: str, + validate: bool, verbose: bool) -> None: # observable limit per generated stix file OBSERVABLES_PER_STIX_FILE = 3000 @@ -128,7 +147,7 @@ def generate_stix_file(input_file, list_type, delimiter, list_name, tc_name, tmp logging.info("=====================") logging.info("== GENERATING STIX ==") logging.info("=====================") - + # download or open input file if validators.url(input_file): res = requests.get(input_file) @@ -180,12 +199,12 @@ def generate_stix_file(input_file, list_type, delimiter, list_name, tc_name, tmp except ValueError: # if ip address parsing fails then attempt to parse as an ip network try: - parsed_ip = ipaddress.ip_network(item, strict=False) + parsed_network = ipaddress.ip_network(item, strict=False) indicator_obj.category = Address.CAT_CIDR + indicator_obj.address_value = str(parsed_network) except ValueError: logging.warning("IP Address {} is neither an IPv4, IPv6, nor CIDR - skipping".format(item)) continue - indicator_obj.address_value = str(parsed_ip) indicator_obj.condition = "Equals" indicator_type = "IP Watchlist" # customizable components below @@ -241,21 +260,24 @@ def generate_stix_file(input_file, list_type, delimiter, list_name, tc_name, tmp # clear cybox cache to prevent an Out of Memory error # https://cybox.readthedocs.io/en/stable/api/cybox/core/object.html#cybox.core.object.Object cache_clear() - - return + + return # poll a taxii server for stix files -def poll_taxii_server(taxii_server, basic_user, basic_pw, taxii_collections, days_to_poll, tmp_dir, verbose): +def poll_taxii_server(taxii_server: list[str], basic_user: str, + basic_pw: str, taxii_collections: str, + days_to_poll: int, tmp_dir: str, + verbose: bool) -> None: # if no taxii server details are specified then default to the OTX AlienVault/AT&T Cybersecurity threat intel feed if not taxii_server: taxii_server = ["otx.alienvault.com", "/taxii/discovery", "True"] try: # handle taxii server port if supplied - taxii_server_port = None + taxii_server_port: str | None = None if ':' in taxii_server[0]: taxii_server[0], taxii_server_port = taxii_server[0].split(':') - + # setup taxii client taxii_client = cabby.create_client( host=taxii_server[0], @@ -295,7 +317,7 @@ def poll_taxii_server(taxii_server, basic_user, basic_pw, taxii_collections, day logging.info("== POLLING ==") logging.info("=============") - # if specified, filter only the supplied collection(s) + # if specified, filter only the supplied collection(s) if taxii_collections: collections = filter(lambda collection: collection.name in taxii_collections, collections) @@ -307,7 +329,7 @@ def poll_taxii_server(taxii_server, basic_user, basic_pw, taxii_collections, day i = 0 # save each returned content block in a separate file with the filename ending ..._part_N.stix for i, block in enumerate(content_blocks, 1): - collection_filename = "{}_part_{}.stix".format(collection.name, i) + collection_filename = "{}_part_{}.stix".format(collection.name, i) with open(os.path.join(tmp_dir, collection_filename), 'wb') as f: f.write(block.content) if verbose: @@ -319,10 +341,10 @@ def poll_taxii_server(taxii_server, basic_user, basic_pw, taxii_collections, day if verbose: logging.error("Could not download collection: {}. Details: {}".format(collection.name, e)) continue - + return -def main(): +def main() -> None: # retrive command line arguments args = parse_command_line_args() From 68896e8311fe63ee1100097c6fa5bfbeaa68bdbd Mon Sep 17 00:00:00 2001 From: plasticuproject <19690649+plasticuproject@users.noreply.github.com> Date: Sun, 29 Sep 2024 15:08:59 -0400 Subject: [PATCH 05/15] Added mypy configuration file --- .mypy.ini | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .mypy.ini diff --git a/.mypy.ini b/.mypy.ini new file mode 100644 index 0000000..4071a80 --- /dev/null +++ b/.mypy.ini @@ -0,0 +1,6 @@ +# Global options: + +[mypy] +warn_return_any = True +warn_unused_configs = True +ignore_missing_imports = True From 9517706071b253ffb3298b91ea016a6e0ab21b2b Mon Sep 17 00:00:00 2001 From: plasticuproject <19690649+plasticuproject@users.noreply.github.com> Date: Sun, 29 Sep 2024 16:04:59 -0400 Subject: [PATCH 06/15] Reordered imports, changed string formating to use fstrings, minor formating changes --- threat_intelligence_toolkit.py | 117 ++++++++++++++++----------------- 1 file changed, 55 insertions(+), 62 deletions(-) diff --git a/threat_intelligence_toolkit.py b/threat_intelligence_toolkit.py index 2308a1b..5a76be5 100644 --- a/threat_intelligence_toolkit.py +++ b/threat_intelligence_toolkit.py @@ -22,19 +22,19 @@ from __future__ import annotations from typing import BinaryIO -import cabby -import requests -import urllib3 -import validators -import ipaddress -import datetime -import pytz -import argparse import os import sys import tarfile import shutil import logging +import argparse +import datetime +import ipaddress +import cabby +import requests +import urllib3 +import validators +import pytz from cybox.objects.uri_object import URI from cybox.objects.domain_name_object import DomainName from cybox.objects.address_object import Address @@ -42,6 +42,7 @@ from stix.core import STIXPackage, STIXHeader from stix.indicator.indicator import Indicator + # parse command line arguments def parse_command_line_args() -> argparse.Namespace: @@ -58,7 +59,7 @@ def parse_command_line_args() -> argparse.Namespace: # options below are for generating a STIX file from a flat file argparser.add_argument('--generate-stix', action='store_true', dest='generate_stix', default=False, help='Create a stix file from a flat file. Requires that --input-file and --type are set.') argparser.add_argument('--input-file', action='store', dest='input_file', help='Full path of delimited list file. Also accepts a URL to a file. Ignored if --generate-stix is not set.', metavar='INPUT_FILE') - argparser.add_argument('--list-type', action='store', dest='list_type', choices=['ip','domain', 'url'], help='Type of the input items in the provided list (list must all be the same type), allowed values: ip, domain, url. Ignored if --generate-stix is not set.', metavar='LIST_TYPE') + argparser.add_argument('--list-type', action='store', dest='list_type', choices=['ip', 'domain', 'url'], help='Type of the input items in the provided list (list must all be the same type), allowed values: ip, domain, url. Ignored if --generate-stix is not set.', metavar='LIST_TYPE') argparser.add_argument('--delimiter', action='store', dest='delimiter', help='Delimiter for the input list file. Ignored if --generate-stix is not set.', default='\n', metavar='INPUT_FILE') argparser.add_argument('--list-name', action='store', dest='list_name', help='Name of the list or provider to be used in the created stix file. Ignored if --generate-stix is not set.', default='Threat Intel List', metavar='LIST_NAME') argparser.add_argument('--validate', action='store_true', dest='validate_input', default=False, help='Validate each Domain/URL before adding to generated stix file (beta). Requires that --generate-stix is set.') @@ -77,22 +78,19 @@ def parse_command_line_args() -> argparse.Namespace: return args + # strip all non alphanumeric chars from a string def strip_non_alphanum(input_str: str) -> str: return ''.join(char for char in input_str if char.isalnum()) + # convert string to boolean and return True when unsure def str_to_bool(input_str: str) -> bool: return input_str.lower() not in ['false', 'f', '0', 'n', 'no'] + # send a PUT request to an EDA or ECA threatcollections/{id} endpoint -def threatcollection_api_request(eh_host: str, - eh_apikey: str, - eh_verify_cert: bool, - threatcollection_name: str, - file_name: str, - file_path: str, - verbose: bool) -> None: +def threatcollection_api_request(eh_host: str, eh_apikey: str, eh_verify_cert: bool, threatcollection_name: str, file_name: str, file_path: str, verbose: bool) -> None: if verbose: logging.info("===============") @@ -101,19 +99,16 @@ def threatcollection_api_request(eh_host: str, user_key = strip_non_alphanum(threatcollection_name) - headers = {'Accept': 'application/json', 'Authorization': "ExtraHop apikey={}".format(eh_apikey)} + headers = {'Accept': 'application/json', 'Authorization': f"ExtraHop apikey={eh_apikey}"} - url = "https://{}/api/v1/threatcollections/~{}".format(eh_host, user_key) + url = f"https://{eh_host}/api/v1/threatcollections/~{user_key}" # configure tgz for multipart file upload # file_body = {'file': (file_name, open(file_path, 'rb')), 'name': threatcollection_name} - with open(file_path, 'rb') as file_handle: # type: BinaryIO - files = { - 'file': (file_name, file_handle) - } - data = { - 'name': threatcollection_name - } + file_handle: BinaryIO + with open(file_path, 'rb') as file_handle: + files = {'file': (file_name, file_handle)} + data = {'name': threatcollection_name} # log InsecureRequestWarning if making an unverified https request if not eh_verify_cert: @@ -124,22 +119,20 @@ def threatcollection_api_request(eh_host: str, # r = requests.put(url, headers=headers, files=file_body, verify=eh_verify_cert) r = requests.put(url, headers=headers, files=files, data=data, verify=eh_verify_cert) except Exception as e: - logging.error("Issue encountered while sending an API request to {}. Details: {}".format(url, e)) + logging.error(f"Issue encountered while sending an API request to {url}. Details: {e}") raise # handle non 200 response if r.status_code >= 200 and r.status_code < 300: - logging.info("Successfully uploaded {} to {} as threatcollection named {} with user_key {}".format(file_name, eh_host, threatcollection_name, user_key)) + logging.info(f"Successfully uploaded {file_name} to {eh_host} as threatcollection named {threatcollection_name} with user_key {user_key}") else: - logging.error(("Non-200 status code from ExtraHop API request. Status code: {}, URL: {}, Response: {}".format(r.status_code, url, r.text))) - raise ValueError("Non-200 status code from ExtraHop API request. Status code: {}, URL: {}, Response: {}".format(r.status_code, url, r.text)) + logging.error(f"Non-200 status code from ExtraHop API request. Status code: {r.status_code}, URL: {url}, Response: {r.text}") + raise ValueError(f"Non-200 status code from ExtraHop API request. Status code: {r.status_code}, URL: {url}, Response: {r.text}") return + # generate stix files from a flat file or URL to a flat file -def generate_stix_file(input_file: str, list_type: str, - delimiter: str, list_name: str, - tc_name: str, tmp_dir: str, - validate: bool, verbose: bool) -> None: +def generate_stix_file(input_file: str, list_type: str, delimiter: str, list_name: str, tc_name: str, tmp_dir: str, validate: bool, verbose: bool) -> None: # observable limit per generated stix file OBSERVABLES_PER_STIX_FILE = 3000 @@ -155,12 +148,12 @@ def generate_stix_file(input_file: str, list_type: str, else: # exit if input file doesn't exist if not os.path.isfile(input_file): - logging.error("Supplied input file '{}' doesn't exist".format(input_file)) - sys.exit("Error: Supplied input file '{}' doesn't exist".format(input_file)) + logging.error(f"Supplied input file '{input_file}' doesn't exist") + sys.exit(f"Error: Supplied input file '{input_file}' doesn't exist") else: with open(input_file, 'r') as f: items = f.read().split(delimiter) - logging.info("Successfully parsed input file at {}".format(input_file)) + logging.info(f"Successfully parsed input file at {input_file}") # slice input into batches for batch_num, index in enumerate(range(0, len(items), OBSERVABLES_PER_STIX_FILE), 1): @@ -194,7 +187,7 @@ def generate_stix_file(input_file: str, list_type: str, elif parsed_ip.version == 6: indicator_obj.category = Address.CAT_IPV6 else: - logging.warning("Unknown IP Address version type: {} - skipping".format(parsed_ip.version)) + logging.warning(f"Unknown IP Address version type: {parsed_ip.version} - skipping") continue except ValueError: # if ip address parsing fails then attempt to parse as an ip network @@ -203,7 +196,7 @@ def generate_stix_file(input_file: str, list_type: str, indicator_obj.category = Address.CAT_CIDR indicator_obj.address_value = str(parsed_network) except ValueError: - logging.warning("IP Address {} is neither an IPv4, IPv6, nor CIDR - skipping".format(item)) + logging.warning(f"IP Address {item} is neither an IPv4, IPv6, nor CIDR - skipping") continue indicator_obj.condition = "Equals" indicator_type = "IP Watchlist" @@ -213,7 +206,7 @@ def generate_stix_file(input_file: str, list_type: str, elif list_type == 'domain': # validate domain if validate and not validators.domain(item): - logging.warning("Invalid domain: {} - skipping".format(item)) + logging.warning(f"Invalid domain: {item} - skipping") continue indicator_obj = DomainName() indicator_obj.value = item @@ -224,11 +217,11 @@ def generate_stix_file(input_file: str, list_type: str, elif list_type == 'url': # validate url if validate and not validators.url(item): - logging.warning("Invalid url: {} - skipping".format(item)) + logging.warning(f"Invalid url: {item} - skipping") continue indicator_obj = URI() indicator_obj.value = item - indicator_obj.type_ = URI.TYPE_URL + indicator_obj.type_ = URI.TYPE_URL indicator_obj.condition = "Equals" indicator_type = "URL Watchlist" # customizable components below @@ -252,10 +245,10 @@ def generate_stix_file(input_file: str, list_type: str, package.add_indicator(indicator) # save each batch in a separate stix file with the filename ending ..._part_N.stix - collection_filename = "{}_part_{}.stix".format(strip_non_alphanum(tc_name), batch_num) + collection_filename = f"{strip_non_alphanum(tc_name)}_part_{batch_num}.stix" with open(os.path.join(tmp_dir, collection_filename), 'wb') as f: f.write(package.to_xml()) - logging.info("Successfully created stix file {}".format(collection_filename)) + logging.info(f"Successfully created stix file {collection_filename}") # clear cybox cache to prevent an Out of Memory error # https://cybox.readthedocs.io/en/stable/api/cybox/core/object.html#cybox.core.object.Object @@ -263,11 +256,9 @@ def generate_stix_file(input_file: str, list_type: str, return + # poll a taxii server for stix files -def poll_taxii_server(taxii_server: list[str], basic_user: str, - basic_pw: str, taxii_collections: str, - days_to_poll: int, tmp_dir: str, - verbose: bool) -> None: +def poll_taxii_server(taxii_server: list[str], basic_user: str, basic_pw: str, taxii_collections: str, days_to_poll: int, tmp_dir: str, verbose: bool) -> None: # if no taxii server details are specified then default to the OTX AlienVault/AT&T Cybersecurity threat intel feed if not taxii_server: taxii_server = ["otx.alienvault.com", "/taxii/discovery", "True"] @@ -296,7 +287,7 @@ def poll_taxii_server(taxii_server: list[str], basic_user: str, services = taxii_client.discover_services() collections = taxii_client.get_collections() except Exception as e: - logging.error("Issue encountered while setting up or querying with the TAXII client. Details: {}".format(e)) + logging.error(f"Issue encountered while setting up or querying with the TAXII client. Details: {e}") raise # verbose taxii server info @@ -305,13 +296,13 @@ def poll_taxii_server(taxii_server: list[str], basic_user: str, logging.info("== SERVICES ==") logging.info("==============") for service in services: - logging.info("Service type={s.type}, address={s.address}, available={s.available}, message={s.message}, version={s.version}, protocol={s.protocol}".format(s=service)) + logging.info(f"Service type={service.type}, address={service.address}, available={service.available}, message={service.message}, version={service.version}, protocol={service.protocol}") logging.info("=================") logging.info("== COLLECTIONS ==") logging.info("=================") for collection in collections: - logging.info("Collection name={c.name}, description={c.description}, available={c.available}".format(c=collection)) + logging.info(f"Collection name={collection.name}, description={collection.description}, available={collection.available}") logging.info("=============") logging.info("== POLLING ==") @@ -329,28 +320,29 @@ def poll_taxii_server(taxii_server: list[str], basic_user: str, i = 0 # save each returned content block in a separate file with the filename ending ..._part_N.stix for i, block in enumerate(content_blocks, 1): - collection_filename = "{}_part_{}.stix".format(collection.name, i) + collection_filename = f"{collection.name}_part_{i}.stix" with open(os.path.join(tmp_dir, collection_filename), 'wb') as f: f.write(block.content) if verbose: if i != 0: - logging.info("Successfully downloaded collection {} into {} file(s)".format(collection.name, i)) + logging.info(f"Successfully downloaded collection {collection.name} into {i} file(s)") else: - logging.warning("Successfully polled collection {}, but there was nothing to download for the specified timeframe".format(collection.name)) + logging.warning(f"Successfully polled collection {collection.name}, but there was nothing to download for the specified timeframe") except Exception as e: if verbose: - logging.error("Could not download collection: {}. Details: {}".format(collection.name, e)) + logging.error(f"Could not download collection: {collection.name}. Details: {e}") continue return + def main() -> None: # retrive command line arguments args = parse_command_line_args() # ensure supplied directory exists if not os.path.isdir(args.output_dir): - sys.exit("Error: Supplied output directory '{}' either doesn't exist or is not a directory".format(args.output_dir)) + sys.exit(f"Error: Supplied output directory '{args.output_dir}' either doesn't exist or is not a directory") # disable insecure request warnings to stdout, will still log warnings urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) @@ -361,7 +353,7 @@ def main() -> None: logging.info("ExtraHop Threat Intelligence Toolkit started running") # make temporary directory - tmp_dir_name = "{}_{}".format(strip_non_alphanum(args.threat_collection_name), datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")) + tmp_dir_name = f"{strip_non_alphanum(args.threat_collection_name)}_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}" tmp_dir = os.path.join(args.output_dir, tmp_dir_name) os.makedirs(tmp_dir) @@ -375,11 +367,11 @@ def main() -> None: # only proceed with packaging and uploading if there are files present if os.listdir(tmp_dir): # create the gzipped tar file of the temporary directory - tgz_name = "{}.tgz".format(tmp_dir_name) + tgz_name = f"{tmp_dir_name}.tgz" tgz_path = os.path.join(args.output_dir, tgz_name) with tarfile.open(tgz_path, "w:gz") as tar: tar.add(tmp_dir, arcname=os.path.basename(tmp_dir)) - logging.info("Successfully created tgz file named {} in {}".format(tgz_name, args.output_dir)) + logging.info(f"Successfully created tgz file named {tgz_name} in {args.output_dir}") # upload the threat collection to one ECA and one or more EDAs if args.eca: @@ -393,7 +385,7 @@ def main() -> None: # if only EDAs are provided elif args.edas: for eda in args.edas: - threatcollection_api_request(eda[0], eda[1], str_to_bool(eda[2]), args.threat_collection_name, tgz_name, tgz_path, args.verbose) + threatcollection_api_request(eda[0], eda[1], str_to_bool(eda[2]), args.threat_collection_name, tgz_name, tgz_path, args.verbose) else: logging.warning("Did not upload threat collection to an ExtraHop appliance since neither an ECA/EDAs nor EDAs were provided") @@ -401,9 +393,9 @@ def main() -> None: if (args.eca or args.edas) and args.clean_up: try: os.remove(tgz_path) - logging.info("Successfully cleaned up and removed the local threat collection tgz file: {}".format(tgz_name)) + logging.info(f"Successfully cleaned up and removed the local threat collection tgz file: {tgz_name}") except OSError as e: - logging.error("Could not delete the local threat collection .tgz file: {}. Details: {}.".format(tgz_path, e.strerror)) + logging.error(f"Could not delete the local threat collection .tgz file: {tgz_path}. Details: {e.strerror}.") else: logging.warning('There were no threat intel results to process. Note: If polling a TAXII server ensure that the collection(s) contain results') @@ -411,9 +403,10 @@ def main() -> None: try: shutil.rmtree(tmp_dir) except OSError as e: - logging.error("Could not delete the temporary directory: {}. Details: {}.".format(tmp_dir_name, e.strerror)) + logging.error(f"Could not delete the temporary directory: {tmp_dir_name}. Details: {e.strerror}.") logging.info('ExtraHop Threat Intelligence Toolkit finished running') + if __name__ == '__main__': main() From d9543efc8e9d9a5ae3e9edfbe8cf8ea8868da2d8 Mon Sep 17 00:00:00 2001 From: plasticuproject <19690649+plasticuproject@users.noreply.github.com> Date: Sun, 29 Sep 2024 16:46:58 -0400 Subject: [PATCH 07/15] Added basic pytest tests (coverage is low, still need tests for mock data, this is just a start), flake8 linting checks, and mypy strict type checks in a github workflow --- .github/workflows/tests.yml | 33 ++++++++++++ .mypy.ini => mypy.ini | 0 pytest.ini | 11 ++++ tests/__init__.py | 0 tests/test_threat_intelligence_toolkit.py | 62 +++++++++++++++++++++++ 5 files changed, 106 insertions(+) create mode 100644 .github/workflows/tests.yml rename .mypy.ini => mypy.ini (100%) create mode 100644 pytest.ini create mode 100644 tests/__init__.py create mode 100644 tests/test_threat_intelligence_toolkit.py diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 0000000..abe758c --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,33 @@ +name: build + +on: ["push", "pull_request"] + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + - name: Set up Python 3.11 + uses: actions/setup-python@v4 + with: + python-version: "3.11" + - name: Install dependencies + run: | + python -m pip install --upgrade pip + python -m pip install -r requirements.txt + python -m pip install flake8==7.1.1 mypy==1.11.2 pytest==8.3.3 pytest-cov==5.0.0 + python -m mypy --install-types + - name: Lint with flake8 + run: | + # stop the build if there are Python syntax errors or undefined names + flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + # exit-zero treats all errors as warnings. + flake8 . --count --exit-zero --ignore=W191,E501 --max-complexity=25 --statistics + - name: Static type checking with mypy + run: | + python -m mypy --strict . + - name: Pytest + run: | + pytest diff --git a/.mypy.ini b/mypy.ini similarity index 100% rename from .mypy.ini rename to mypy.ini diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..1bfefed --- /dev/null +++ b/pytest.ini @@ -0,0 +1,11 @@ +# pytest.ini +[pytest] +pythonpath = . +addopts = --cov=./ --cov-report=term-missing --cov-config=pytest.ini +filterwarnings = + ignore::DeprecationWarning:libtaxii.* + +[coverage:run] +omit = + */tests/* + */migrations/* diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_threat_intelligence_toolkit.py b/tests/test_threat_intelligence_toolkit.py new file mode 100644 index 0000000..01b8356 --- /dev/null +++ b/tests/test_threat_intelligence_toolkit.py @@ -0,0 +1,62 @@ +"""/tests/test_threat_intelligence_toolkit.py""" + +import pytest +import sys +from threat_intelligence_toolkit import ( + strip_non_alphanum, + str_to_bool, + parse_command_line_args +) + +def test_strip_non_alphanum() -> None: + assert strip_non_alphanum("abc123") == "abc123" + assert strip_non_alphanum("abc!@#123") == "abc123" + assert strip_non_alphanum("!@#") == "" + assert strip_non_alphanum("abc def") == "abcdef" + assert strip_non_alphanum("a$b%c^") == "abc" + +def test_str_to_bool() -> None: + assert str_to_bool("True") is True + assert str_to_bool("false") is False + assert str_to_bool("Yes") is True + assert str_to_bool("No") is False + assert str_to_bool("1") is True + assert str_to_bool("0") is False + assert str_to_bool("unexpected") is True # Default behavior + +def test_parse_command_line_args_basic() -> None: + test_args = [ + 'threat_intelligence_toolkit.py', + '-o', 'output_dir', + '-tc', 'TestCollection' + ] + sys.argv = test_args + args = parse_command_line_args() + assert args.output_dir == 'output_dir' + assert args.threat_collection_name == 'TestCollection' + +def test_parse_command_line_args_generate_stix() -> None: + test_args = [ + 'threat_intelligence_toolkit.py', + '-o', 'output_dir', + '-tc', 'TestCollection', + '--generate-stix', + '--input-file', 'input.txt', + '--list-type', 'ip' + ] + sys.argv = test_args + args = parse_command_line_args() + assert args.generate_stix is True + assert args.input_file == 'input.txt' + assert args.list_type == 'ip' + +def test_parse_command_line_args_missing_generate_stix_args() -> None: + test_args = [ + 'threat_intelligence_toolkit.py', + '-o', 'output_dir', + '-tc', 'TestCollection', + '--generate-stix' + ] + sys.argv = test_args + with pytest.raises(SystemExit): + parse_command_line_args() From 74b96cf76a68538712933253ce87dd4f91d357ca Mon Sep 17 00:00:00 2001 From: plasticuproject <19690649+plasticuproject@users.noreply.github.com> Date: Sun, 29 Sep 2024 16:53:56 -0400 Subject: [PATCH 08/15] Updated test workflow --- .github/workflows/tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index abe758c..fc88b0e 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -17,7 +17,7 @@ jobs: run: | python -m pip install --upgrade pip python -m pip install -r requirements.txt - python -m pip install flake8==7.1.1 mypy==1.11.2 pytest==8.3.3 pytest-cov==5.0.0 + python -m pip install flake8==7.1.1 mypy==1.11.2 mypy-extensions types-pytz types-requests test==8.3.3 pytest-cov==5.0.0 python -m mypy --install-types - name: Lint with flake8 run: | From d5619f7a7ea9e7d9d75f0d22b0de6c30369975c6 Mon Sep 17 00:00:00 2001 From: plasticuproject <19690649+plasticuproject@users.noreply.github.com> Date: Sun, 29 Sep 2024 16:55:13 -0400 Subject: [PATCH 09/15] Fixed type in workflow --- .github/workflows/tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index fc88b0e..0af7eb7 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -17,7 +17,7 @@ jobs: run: | python -m pip install --upgrade pip python -m pip install -r requirements.txt - python -m pip install flake8==7.1.1 mypy==1.11.2 mypy-extensions types-pytz types-requests test==8.3.3 pytest-cov==5.0.0 + python -m pip install flake8==7.1.1 mypy==1.11.2 mypy-extensions types-pytz types-requests pytest==8.3.3 pytest-cov==5.0.0 python -m mypy --install-types - name: Lint with flake8 run: | From 4ee2ea14fd76ef01a06b520d399925e601d68820 Mon Sep 17 00:00:00 2001 From: plasticuproject <19690649+plasticuproject@users.noreply.github.com> Date: Sun, 29 Sep 2024 16:56:45 -0400 Subject: [PATCH 10/15] Forgot to remove unneded mypy command --- .github/workflows/tests.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 0af7eb7..fb7f402 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -18,7 +18,6 @@ jobs: python -m pip install --upgrade pip python -m pip install -r requirements.txt python -m pip install flake8==7.1.1 mypy==1.11.2 mypy-extensions types-pytz types-requests pytest==8.3.3 pytest-cov==5.0.0 - python -m mypy --install-types - name: Lint with flake8 run: | # stop the build if there are Python syntax errors or undefined names From 7314ae2716eaccf4cab6ab6e1152c3ab415e5c1f Mon Sep 17 00:00:00 2001 From: plasticuproject <19690649+plasticuproject@users.noreply.github.com> Date: Sun, 29 Sep 2024 19:51:48 -0400 Subject: [PATCH 11/15] Corrected flake8 format error in test file --- tests/test_threat_intelligence_toolkit.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/test_threat_intelligence_toolkit.py b/tests/test_threat_intelligence_toolkit.py index 01b8356..1d367df 100644 --- a/tests/test_threat_intelligence_toolkit.py +++ b/tests/test_threat_intelligence_toolkit.py @@ -8,6 +8,7 @@ parse_command_line_args ) + def test_strip_non_alphanum() -> None: assert strip_non_alphanum("abc123") == "abc123" assert strip_non_alphanum("abc!@#123") == "abc123" @@ -15,6 +16,7 @@ def test_strip_non_alphanum() -> None: assert strip_non_alphanum("abc def") == "abcdef" assert strip_non_alphanum("a$b%c^") == "abc" + def test_str_to_bool() -> None: assert str_to_bool("True") is True assert str_to_bool("false") is False @@ -24,6 +26,7 @@ def test_str_to_bool() -> None: assert str_to_bool("0") is False assert str_to_bool("unexpected") is True # Default behavior + def test_parse_command_line_args_basic() -> None: test_args = [ 'threat_intelligence_toolkit.py', @@ -35,6 +38,7 @@ def test_parse_command_line_args_basic() -> None: assert args.output_dir == 'output_dir' assert args.threat_collection_name == 'TestCollection' + def test_parse_command_line_args_generate_stix() -> None: test_args = [ 'threat_intelligence_toolkit.py', @@ -50,6 +54,7 @@ def test_parse_command_line_args_generate_stix() -> None: assert args.input_file == 'input.txt' assert args.list_type == 'ip' + def test_parse_command_line_args_missing_generate_stix_args() -> None: test_args = [ 'threat_intelligence_toolkit.py', From f93ee3e31c4108fb8d175b4fcfc973d5b6bfebb9 Mon Sep 17 00:00:00 2001 From: plasticuproject <19690649+plasticuproject@users.noreply.github.com> Date: Sun, 29 Sep 2024 20:41:38 -0400 Subject: [PATCH 12/15] Added more tests, fixed issue where it was possible to attempt to access the Address() object before it was initialized --- tests/test_threat_intelligence_toolkit.py | 287 +++++++++++++++++++++- threat_intelligence_toolkit.py | 1 + 2 files changed, 287 insertions(+), 1 deletion(-) diff --git a/tests/test_threat_intelligence_toolkit.py b/tests/test_threat_intelligence_toolkit.py index 1d367df..afc3f8f 100644 --- a/tests/test_threat_intelligence_toolkit.py +++ b/tests/test_threat_intelligence_toolkit.py @@ -2,10 +2,14 @@ import pytest import sys +from pathlib import Path +from unittest.mock import patch, MagicMock from threat_intelligence_toolkit import ( strip_non_alphanum, str_to_bool, - parse_command_line_args + parse_command_line_args, + threatcollection_api_request, + generate_stix_file ) @@ -65,3 +69,284 @@ def test_parse_command_line_args_missing_generate_stix_args() -> None: sys.argv = test_args with pytest.raises(SystemExit): parse_command_line_args() + + +def test_threatcollection_api_request_success(tmp_path: Path) -> None: + # Prepare test data + eh_host = 'otx.alienvault.com' + eh_apikey = 'test_api_key' + eh_verify_cert = True + threatcollection_name = 'example_collection' + file_name = 'test_file.tgz' + file_content = b'Test content' + verbose = False + + # Create a temporary file + file_path = tmp_path / file_name + with open(file_path, 'wb') as f: + f.write(file_content) + + # Mock requests.put to return a successful response + with patch('threat_intelligence_toolkit.requests.put') as mock_put: + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.text = 'Success' + mock_put.return_value = mock_response + + # Call the function + threatcollection_api_request( + eh_host, + eh_apikey, + eh_verify_cert, + threatcollection_name, + file_name, + str(file_path), + verbose + ) + + # Check that requests.put was called with the correct parameters + mock_put.assert_called_once() + args, kwargs = mock_put.call_args + + expected_url = f'https://{eh_host}/api/v1/threatcollections/~{strip_non_alphanum(threatcollection_name)}' + assert args[0] == expected_url + assert kwargs['headers']['Authorization'] == f'ExtraHop apikey={eh_apikey}' + assert kwargs['verify'] == eh_verify_cert + assert 'files' in kwargs + assert 'data' in kwargs + # Ensure the file was read correctly + uploaded_file = kwargs['files']['file'] + assert uploaded_file[0] == file_name # Filename + # The file content is not accessible since file is opened in binary mode in context manager + + +def test_threatcollection_api_request_failure(tmp_path: Path) -> None: + # Prepare test data + eh_host = 'otx.alienvault.com' + eh_apikey = 'test_api_key' + eh_verify_cert = True + threatcollection_name = 'example_collection' + file_name = 'test_file.tgz' + file_content = b'Test content' + verbose = False + + # Create a temporary file + file_path = tmp_path / file_name + with open(file_path, 'wb') as f: + f.write(file_content) + + # Mock requests.put to return a failure response + with patch('threat_intelligence_toolkit.requests.put') as mock_put: + mock_response = MagicMock() + mock_response.status_code = 404 + mock_response.text = '{"detail": "endpoint not found"}' + mock_put.return_value = mock_response + + # Call the function and expect it to raise a ValueError + with pytest.raises(ValueError) as excinfo: + threatcollection_api_request( + eh_host, + eh_apikey, + eh_verify_cert, + threatcollection_name, + file_name, + str(file_path), + verbose + ) + + assert 'Non-200 status code from ExtraHop API request' in str(excinfo.value) + + +def test_threatcollection_api_request_exception(tmp_path: Path) -> None: + # Prepare test data + eh_host = 'otx.alienvault.com' + eh_apikey = 'test_api_key' + eh_verify_cert = True + threatcollection_name = 'example_collection' + file_name = 'test_file.tgz' + file_content = b'Test content' + verbose = False + + # Create a temporary file + file_path = tmp_path / file_name + with open(file_path, 'wb') as f: + f.write(file_content) + + # Mock requests.put to raise an exception + with patch('threat_intelligence_toolkit.requests.put') as mock_put: + mock_put.side_effect = Exception('Connection error') + + # Call the function and expect it to raise the same exception + with pytest.raises(Exception) as excinfo: + threatcollection_api_request( + eh_host, + eh_apikey, + eh_verify_cert, + threatcollection_name, + file_name, + str(file_path), + verbose + ) + + assert 'Connection error' in str(excinfo.value) + + +def test_generate_stix_file_with_local_file(tmp_path: Path) -> None: + # Prepare test data + input_file = tmp_path / 'input.txt' + input_content = '192.168.1.1\n10.0.0.0/24\ninvalid_ip\n#comment\n\n' + input_file.write_text(input_content) + + list_type = 'ip' + delimiter = '\n' + list_name = 'TestList' + tc_name = 'TestCollection' + tmp_dir = tmp_path / 'output' + tmp_dir.mkdir() + validate = False + verbose = False + + # Call the function + generate_stix_file( + str(input_file), + list_type, + delimiter, + list_name, + tc_name, + str(tmp_dir), + validate, + verbose + ) + + # Check that the STIX file was created + stix_files = list(tmp_dir.glob('*.stix')) + assert len(stix_files) == 1 + + # Optionally, read and inspect the content of the STIX file + stix_file = stix_files[0] + stix_content = stix_file.read_bytes() + assert b'192.168.1.1' in stix_content + assert b'10.0.0.0/24' in stix_content + assert b'invalid_ip' not in stix_content # Should be skipped + assert b'#comment' not in stix_content # Should be skipped + + +def test_generate_stix_file_with_url_input(tmp_path: Path) -> None: + # Prepare test data + input_file = 'http://example.com/input.txt' + input_content = 'example.com\ninvalid_domain\n#comment\n\n' + + list_type = 'domain' + delimiter = '\n' + list_name = 'TestList' + tc_name = 'TestCollection' + tmp_dir = tmp_path / 'output' + tmp_dir.mkdir() + validate = False + verbose = False + + # Mock requests.get to return the input_content + with patch('threat_intelligence_toolkit.requests.get') as mock_get: + mock_response = MagicMock() + mock_response.text = input_content + mock_get.return_value = mock_response + + # Call the function + generate_stix_file( + input_file, + list_type, + delimiter, + list_name, + tc_name, + str(tmp_dir), + validate, + verbose + ) + + # Check that the STIX file was created + stix_files = list(tmp_dir.glob('*.stix')) + assert len(stix_files) == 1 + + # Optionally, read and inspect the content of the STIX file + stix_file = stix_files[0] + stix_content = stix_file.read_bytes() + assert b'example.com' in stix_content + assert b'invalid_domain' in stix_content or not validate # Included if not validating + assert b'#comment' not in stix_content # Should be skipped + + +def test_generate_stix_file_with_validation(tmp_path: Path) -> None: + # Prepare test data + input_file = tmp_path / 'input.txt' + input_content = 'https://valid.url\ninvalid_url\n#comment\n\n' + input_file.write_text(input_content) + + list_type = 'url' + delimiter = '\n' + list_name = 'TestList' + tc_name = 'TestCollection' + tmp_dir = tmp_path / 'output' + tmp_dir.mkdir() + validate = True + verbose = False + + # Call the function + generate_stix_file( + str(input_file), + list_type, + delimiter, + list_name, + tc_name, + str(tmp_dir), + validate, + verbose + ) + + # Check that the STIX file was created + stix_files = list(tmp_dir.glob('*.stix')) + assert len(stix_files) == 1 + + # Optionally, read and inspect the content of the STIX file + stix_file = stix_files[0] + stix_content = stix_file.read_bytes() + assert b'https://valid.url' in stix_content + assert b'invalid_url' not in stix_content # Should be skipped due to validation + assert b'#comment' not in stix_content # Should be skipped + + +def test_generate_stix_file_with_ipv6(tmp_path: Path) -> None: + # Prepare test data + input_file = tmp_path / 'input.txt' + input_content = '2001:0db8:85a3:0000:0000:8a2e:0370:7334\ninvalid_ipv6\n\n' + input_file.write_text(input_content) + + list_type = 'ip' + delimiter = '\n' + list_name = 'TestList' + tc_name = 'TestCollection' + tmp_dir = tmp_path / 'output' + tmp_dir.mkdir() + validate = False + verbose = False + + # Call the function + generate_stix_file( + str(input_file), + list_type, + delimiter, + list_name, + tc_name, + str(tmp_dir), + validate, + verbose + ) + + # Check that the STIX file was created + stix_files = list(tmp_dir.glob('*.stix')) + assert len(stix_files) == 1 + + # Read and inspect the content of the STIX file + stix_file = stix_files[0] + stix_content = stix_file.read_bytes() + assert b'2001:0db8:85a3:0000:0000:8a2e:0370:7334' in stix_content + assert b'invalid_ipv6' not in stix_content # Should be skipped diff --git a/threat_intelligence_toolkit.py b/threat_intelligence_toolkit.py index 5a76be5..418f5e0 100644 --- a/threat_intelligence_toolkit.py +++ b/threat_intelligence_toolkit.py @@ -189,6 +189,7 @@ def generate_stix_file(input_file: str, list_type: str, delimiter: str, list_nam else: logging.warning(f"Unknown IP Address version type: {parsed_ip.version} - skipping") continue + indicator_obj.address_value = str(parsed_ip) except ValueError: # if ip address parsing fails then attempt to parse as an ip network try: From 5713ef12a5a8cb48294a1ad6d2df75677de68a55 Mon Sep 17 00:00:00 2001 From: plasticuproject <19690649+plasticuproject@users.noreply.github.com> Date: Sun, 29 Sep 2024 20:51:52 -0400 Subject: [PATCH 13/15] Added windows build to test workflow --- .github/workflows/tests.yml | 36 ++++++++++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index fb7f402..2b000a1 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -1,32 +1,52 @@ name: build -on: ["push", "pull_request"] +on: [push, pull_request] jobs: build: - - runs-on: ubuntu-latest + runs-on: ${{ matrix.os }} + + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, windows-latest] + python-version: ["3.9", "3.12"] steps: - uses: actions/checkout@v4 - - name: Set up Python 3.11 + + - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v4 with: - python-version: "3.11" + python-version: ${{ matrix.python-version }} + + - name: Cache pip + uses: actions/cache@v4 + with: + path: ~/.cache/pip + key: ${{ runner.os }}-pip-${{ hashFiles('**/requirements.txt') }} + restore-keys: | + ${{ runner.os }}-pip- + - name: Install dependencies run: | python -m pip install --upgrade pip python -m pip install -r requirements.txt python -m pip install flake8==7.1.1 mypy==1.11.2 mypy-extensions types-pytz types-requests pytest==8.3.3 pytest-cov==5.0.0 + - name: Lint with flake8 run: | - # stop the build if there are Python syntax errors or undefined names flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics - # exit-zero treats all errors as warnings. flake8 . --count --exit-zero --ignore=W191,E501 --max-complexity=25 --statistics + - name: Static type checking with mypy run: | python -m mypy --strict . - - name: Pytest + + - name: Run Pytest run: | pytest + + - name: Run tool with --help + run: | + python threat_intelligence_toolkit.py --help From 3657fdef2b36a36d9e1dbf4bd301f2385b2bb8a3 Mon Sep 17 00:00:00 2001 From: plasticuproject <19690649+plasticuproject@users.noreply.github.com> Date: Sun, 29 Sep 2024 21:01:28 -0400 Subject: [PATCH 14/15] Reverted to test python versions 3.9 and 3.11 only, as 3.12 is not compatible. Also added testing for macos build --- .github/workflows/tests.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 2b000a1..d9d5e6d 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -9,8 +9,8 @@ jobs: strategy: fail-fast: false matrix: - os: [ubuntu-latest, windows-latest] - python-version: ["3.9", "3.12"] + os: [ubuntu-latest, windows-latest, macos-latest] + python-version: ["3.9", "3.11"] steps: - uses: actions/checkout@v4 From a5891ad6784ddefdbd9a5b651c8f00562cc0b80e Mon Sep 17 00:00:00 2001 From: plasticuproject <19690649+plasticuproject@users.noreply.github.com> Date: Sun, 29 Sep 2024 23:53:18 -0400 Subject: [PATCH 15/15] Decided testing against python3.11 only was fine, removed 3.9. Fixed paths for caching pip across OS matrixes --- .github/workflows/tests.yml | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index d9d5e6d..e24b30f 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -10,7 +10,14 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest, windows-latest, macos-latest] - python-version: ["3.9", "3.11"] + python-version: ["3.11"] + include: + - os: ubuntu-latest + path: ~/.cache/pip + - os: macos-latest + path: ~/Library/Caches/pip + - os: windows-latest + path: ~\AppData\Local\pip\Cache steps: - uses: actions/checkout@v4 @@ -23,7 +30,7 @@ jobs: - name: Cache pip uses: actions/cache@v4 with: - path: ~/.cache/pip + path: ${{ matrix.path }} key: ${{ runner.os }}-pip-${{ hashFiles('**/requirements.txt') }} restore-keys: | ${{ runner.os }}-pip-