diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 0000000..e24b30f --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,59 @@ +name: build + +on: [push, pull_request] + +jobs: + build: + runs-on: ${{ matrix.os }} + + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, windows-latest, macos-latest] + python-version: ["3.11"] + include: + - os: ubuntu-latest + path: ~/.cache/pip + - os: macos-latest + path: ~/Library/Caches/pip + - os: windows-latest + path: ~\AppData\Local\pip\Cache + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + + - name: Cache pip + uses: actions/cache@v4 + with: + path: ${{ matrix.path }} + key: ${{ runner.os }}-pip-${{ hashFiles('**/requirements.txt') }} + restore-keys: | + ${{ runner.os }}-pip- + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + python -m pip install -r requirements.txt + python -m pip install flake8==7.1.1 mypy==1.11.2 mypy-extensions types-pytz types-requests pytest==8.3.3 pytest-cov==5.0.0 + + - name: Lint with flake8 + run: | + flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + flake8 . --count --exit-zero --ignore=W191,E501 --max-complexity=25 --statistics + + - name: Static type checking with mypy + run: | + python -m mypy --strict . + + - name: Run Pytest + run: | + pytest + + - name: Run tool with --help + run: | + python threat_intelligence_toolkit.py --help diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..772951a --- /dev/null +++ b/.gitignore @@ -0,0 +1,142 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +.idea/ + +docs/_build diff --git a/README.md b/README.md index 38dc508..6aaf924 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ Automate generating or pulling threat intelligence Structured Threat Information Expression (STIX) files from a flat file or from a TAXII server and uploading a threat collection to an ECA and multiple EDAs via the REST API. By uploading STIX files, you can add a threat collection to your ExtraHop Discover and Command appliances. Threat collections enable you to identify suspicious hosts, IP addresses, and URIs on your network. ## Usage -Specify an output directory, threat collection name, ECA/EDA details, and other optional config via the command line then the script either generates a STIX file or polls the TAXII server (defaults to [EclecticIQ OpenTAXII](https://open.taxiistand.com) intel feed), saves the stix files in a gzipped tar file (tgz), and uploads the threat collection to the specified Reveal(x) ECA/EDAs. +Specify an output directory, threat collection name, ECA/EDA details, and other optional config via the command line then the script either generates a STIX file or polls the TAXII server (defaults to OTX AlienVault/AT&T Cybersecurity](https://otx.alienvault.com) intel feed), saves the stix files in a gzipped tar file (tgz), and uploads the threat collection to the specified Reveal(x) ECA/EDAs. **This script solely serves as example code and is made available without any support or warranty.** @@ -21,16 +21,16 @@ You will need to update the example output paths, collection names, IP addresses - View usage and all possible command line arguments - `python3 threat_intelligence_toolkit.py -h` -- Download, tgz, and upload all collections from the default EclecticIQ OpenTAXII server to an EDA +- Download, tgz, and upload all collections from the default OTX AlienVault/AT&T Cybersecurity server to an EDA - `python3 threat_intelligence_toolkit.py -o ~/output_folder -tc example_collection --eda 172.16.1.2 3Hb7EpHRqb2EpnS7iweHgR5F3sf False` -- Download, tgz, and upload all collections from the default EclecticIQ OpenTAXII server to an ECA and multiple EDAs +- Download, tgz, and upload all collections from the default OTX AlienVault/AT&T Cybersecurity server to an ECA and multiple EDAs - `python3 threat_intelligence_toolkit.py -o ~/output_folder -tc example_collection --eca 172.16.1.1 3Hb7EpHRqb2EpnS7iweHgR5F3sg True --eda 172.16.1.2 3Hb7EpHRqb2EpnS7iweHgR5F3sf False --eda 172.16.1.3 3Hb7EpHRqb2EpnS7iweHgR5F3sf False` -- Download and tgz all collections from the default EclecticIQ OpenTAXII server to be uploaded manually +- Download and tgz all collections from the default OTX AlienVault/AT&T Cybersecurity server to be uploaded manually - `python3 threat_intelligence_toolkit.py -o ~/output_folder -tc example_collection` -- Download and tgz a specific list of collections from the default EclecticIQ OpenTAXII server to be uploaded manually +- Download and tgz a specific list of collections from the default OTX AlienVault/AT&T Cybersecurity server to be uploaded manually - `python3 threat_intelligence_toolkit.py -o ~/output_folder -tc example_collection --taxii-collections vxvault hailataxii.guest.dataForLast_7daysOnly` - Download, tgz, and upload all collections from a specific TAXII server to an EDA diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..4071a80 --- /dev/null +++ b/mypy.ini @@ -0,0 +1,6 @@ +# Global options: + +[mypy] +warn_return_any = True +warn_unused_configs = True +ignore_missing_imports = True diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..1bfefed --- /dev/null +++ b/pytest.ini @@ -0,0 +1,11 @@ +# pytest.ini +[pytest] +pythonpath = . +addopts = --cov=./ --cov-report=term-missing --cov-config=pytest.ini +filterwarnings = + ignore::DeprecationWarning:libtaxii.* + +[coverage:run] +omit = + */tests/* + */migrations/* diff --git a/requirements.txt b/requirements.txt index 75d6ee5..53f5be2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,21 +1,22 @@ cabby==0.1.23 -certifi==2024.7.4 -charset-normalizer==2.1.1 -colorlog==6.7.0 +certifi==2024.8.30 +charset-normalizer==3.3.2 +colorlog==6.8.2 cybox==2.1.0.21 decorator==5.1.1 furl==2.1.3 -idna==3.7 +idna==3.10 libtaxii==1.1.119 -lxml==4.9.2 +lxml==5.3.0 mixbox==1.0.5 ordered-set==4.1.0 orderedmultidict==1.0.1 -python-dateutil==2.8.2 -pytz==2022.6 -requests==2.32.0 +python-dateutil==2.9.0.post0 +pytz==2024.2 +requests==2.32.3 six==1.16.0 stix==1.2.0.11 -urllib3==1.26.19 -validators==0.20.0 +urllib3==2.2.3 +validators==0.34.0 weakrefmethod==1.0.3 + diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_threat_intelligence_toolkit.py b/tests/test_threat_intelligence_toolkit.py new file mode 100644 index 0000000..afc3f8f --- /dev/null +++ b/tests/test_threat_intelligence_toolkit.py @@ -0,0 +1,352 @@ +"""/tests/test_threat_intelligence_toolkit.py""" + +import pytest +import sys +from pathlib import Path +from unittest.mock import patch, MagicMock +from threat_intelligence_toolkit import ( + strip_non_alphanum, + str_to_bool, + parse_command_line_args, + threatcollection_api_request, + generate_stix_file +) + + +def test_strip_non_alphanum() -> None: + assert strip_non_alphanum("abc123") == "abc123" + assert strip_non_alphanum("abc!@#123") == "abc123" + assert strip_non_alphanum("!@#") == "" + assert strip_non_alphanum("abc def") == "abcdef" + assert strip_non_alphanum("a$b%c^") == "abc" + + +def test_str_to_bool() -> None: + assert str_to_bool("True") is True + assert str_to_bool("false") is False + assert str_to_bool("Yes") is True + assert str_to_bool("No") is False + assert str_to_bool("1") is True + assert str_to_bool("0") is False + assert str_to_bool("unexpected") is True # Default behavior + + +def test_parse_command_line_args_basic() -> None: + test_args = [ + 'threat_intelligence_toolkit.py', + '-o', 'output_dir', + '-tc', 'TestCollection' + ] + sys.argv = test_args + args = parse_command_line_args() + assert args.output_dir == 'output_dir' + assert args.threat_collection_name == 'TestCollection' + + +def test_parse_command_line_args_generate_stix() -> None: + test_args = [ + 'threat_intelligence_toolkit.py', + '-o', 'output_dir', + '-tc', 'TestCollection', + '--generate-stix', + '--input-file', 'input.txt', + '--list-type', 'ip' + ] + sys.argv = test_args + args = parse_command_line_args() + assert args.generate_stix is True + assert args.input_file == 'input.txt' + assert args.list_type == 'ip' + + +def test_parse_command_line_args_missing_generate_stix_args() -> None: + test_args = [ + 'threat_intelligence_toolkit.py', + '-o', 'output_dir', + '-tc', 'TestCollection', + '--generate-stix' + ] + sys.argv = test_args + with pytest.raises(SystemExit): + parse_command_line_args() + + +def test_threatcollection_api_request_success(tmp_path: Path) -> None: + # Prepare test data + eh_host = 'otx.alienvault.com' + eh_apikey = 'test_api_key' + eh_verify_cert = True + threatcollection_name = 'example_collection' + file_name = 'test_file.tgz' + file_content = b'Test content' + verbose = False + + # Create a temporary file + file_path = tmp_path / file_name + with open(file_path, 'wb') as f: + f.write(file_content) + + # Mock requests.put to return a successful response + with patch('threat_intelligence_toolkit.requests.put') as mock_put: + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.text = 'Success' + mock_put.return_value = mock_response + + # Call the function + threatcollection_api_request( + eh_host, + eh_apikey, + eh_verify_cert, + threatcollection_name, + file_name, + str(file_path), + verbose + ) + + # Check that requests.put was called with the correct parameters + mock_put.assert_called_once() + args, kwargs = mock_put.call_args + + expected_url = f'https://{eh_host}/api/v1/threatcollections/~{strip_non_alphanum(threatcollection_name)}' + assert args[0] == expected_url + assert kwargs['headers']['Authorization'] == f'ExtraHop apikey={eh_apikey}' + assert kwargs['verify'] == eh_verify_cert + assert 'files' in kwargs + assert 'data' in kwargs + # Ensure the file was read correctly + uploaded_file = kwargs['files']['file'] + assert uploaded_file[0] == file_name # Filename + # The file content is not accessible since file is opened in binary mode in context manager + + +def test_threatcollection_api_request_failure(tmp_path: Path) -> None: + # Prepare test data + eh_host = 'otx.alienvault.com' + eh_apikey = 'test_api_key' + eh_verify_cert = True + threatcollection_name = 'example_collection' + file_name = 'test_file.tgz' + file_content = b'Test content' + verbose = False + + # Create a temporary file + file_path = tmp_path / file_name + with open(file_path, 'wb') as f: + f.write(file_content) + + # Mock requests.put to return a failure response + with patch('threat_intelligence_toolkit.requests.put') as mock_put: + mock_response = MagicMock() + mock_response.status_code = 404 + mock_response.text = '{"detail": "endpoint not found"}' + mock_put.return_value = mock_response + + # Call the function and expect it to raise a ValueError + with pytest.raises(ValueError) as excinfo: + threatcollection_api_request( + eh_host, + eh_apikey, + eh_verify_cert, + threatcollection_name, + file_name, + str(file_path), + verbose + ) + + assert 'Non-200 status code from ExtraHop API request' in str(excinfo.value) + + +def test_threatcollection_api_request_exception(tmp_path: Path) -> None: + # Prepare test data + eh_host = 'otx.alienvault.com' + eh_apikey = 'test_api_key' + eh_verify_cert = True + threatcollection_name = 'example_collection' + file_name = 'test_file.tgz' + file_content = b'Test content' + verbose = False + + # Create a temporary file + file_path = tmp_path / file_name + with open(file_path, 'wb') as f: + f.write(file_content) + + # Mock requests.put to raise an exception + with patch('threat_intelligence_toolkit.requests.put') as mock_put: + mock_put.side_effect = Exception('Connection error') + + # Call the function and expect it to raise the same exception + with pytest.raises(Exception) as excinfo: + threatcollection_api_request( + eh_host, + eh_apikey, + eh_verify_cert, + threatcollection_name, + file_name, + str(file_path), + verbose + ) + + assert 'Connection error' in str(excinfo.value) + + +def test_generate_stix_file_with_local_file(tmp_path: Path) -> None: + # Prepare test data + input_file = tmp_path / 'input.txt' + input_content = '192.168.1.1\n10.0.0.0/24\ninvalid_ip\n#comment\n\n' + input_file.write_text(input_content) + + list_type = 'ip' + delimiter = '\n' + list_name = 'TestList' + tc_name = 'TestCollection' + tmp_dir = tmp_path / 'output' + tmp_dir.mkdir() + validate = False + verbose = False + + # Call the function + generate_stix_file( + str(input_file), + list_type, + delimiter, + list_name, + tc_name, + str(tmp_dir), + validate, + verbose + ) + + # Check that the STIX file was created + stix_files = list(tmp_dir.glob('*.stix')) + assert len(stix_files) == 1 + + # Optionally, read and inspect the content of the STIX file + stix_file = stix_files[0] + stix_content = stix_file.read_bytes() + assert b'192.168.1.1' in stix_content + assert b'10.0.0.0/24' in stix_content + assert b'invalid_ip' not in stix_content # Should be skipped + assert b'#comment' not in stix_content # Should be skipped + + +def test_generate_stix_file_with_url_input(tmp_path: Path) -> None: + # Prepare test data + input_file = 'http://example.com/input.txt' + input_content = 'example.com\ninvalid_domain\n#comment\n\n' + + list_type = 'domain' + delimiter = '\n' + list_name = 'TestList' + tc_name = 'TestCollection' + tmp_dir = tmp_path / 'output' + tmp_dir.mkdir() + validate = False + verbose = False + + # Mock requests.get to return the input_content + with patch('threat_intelligence_toolkit.requests.get') as mock_get: + mock_response = MagicMock() + mock_response.text = input_content + mock_get.return_value = mock_response + + # Call the function + generate_stix_file( + input_file, + list_type, + delimiter, + list_name, + tc_name, + str(tmp_dir), + validate, + verbose + ) + + # Check that the STIX file was created + stix_files = list(tmp_dir.glob('*.stix')) + assert len(stix_files) == 1 + + # Optionally, read and inspect the content of the STIX file + stix_file = stix_files[0] + stix_content = stix_file.read_bytes() + assert b'example.com' in stix_content + assert b'invalid_domain' in stix_content or not validate # Included if not validating + assert b'#comment' not in stix_content # Should be skipped + + +def test_generate_stix_file_with_validation(tmp_path: Path) -> None: + # Prepare test data + input_file = tmp_path / 'input.txt' + input_content = 'https://valid.url\ninvalid_url\n#comment\n\n' + input_file.write_text(input_content) + + list_type = 'url' + delimiter = '\n' + list_name = 'TestList' + tc_name = 'TestCollection' + tmp_dir = tmp_path / 'output' + tmp_dir.mkdir() + validate = True + verbose = False + + # Call the function + generate_stix_file( + str(input_file), + list_type, + delimiter, + list_name, + tc_name, + str(tmp_dir), + validate, + verbose + ) + + # Check that the STIX file was created + stix_files = list(tmp_dir.glob('*.stix')) + assert len(stix_files) == 1 + + # Optionally, read and inspect the content of the STIX file + stix_file = stix_files[0] + stix_content = stix_file.read_bytes() + assert b'https://valid.url' in stix_content + assert b'invalid_url' not in stix_content # Should be skipped due to validation + assert b'#comment' not in stix_content # Should be skipped + + +def test_generate_stix_file_with_ipv6(tmp_path: Path) -> None: + # Prepare test data + input_file = tmp_path / 'input.txt' + input_content = '2001:0db8:85a3:0000:0000:8a2e:0370:7334\ninvalid_ipv6\n\n' + input_file.write_text(input_content) + + list_type = 'ip' + delimiter = '\n' + list_name = 'TestList' + tc_name = 'TestCollection' + tmp_dir = tmp_path / 'output' + tmp_dir.mkdir() + validate = False + verbose = False + + # Call the function + generate_stix_file( + str(input_file), + list_type, + delimiter, + list_name, + tc_name, + str(tmp_dir), + validate, + verbose + ) + + # Check that the STIX file was created + stix_files = list(tmp_dir.glob('*.stix')) + assert len(stix_files) == 1 + + # Read and inspect the content of the STIX file + stix_file = stix_files[0] + stix_content = stix_file.read_bytes() + assert b'2001:0db8:85a3:0000:0000:8a2e:0370:7334' in stix_content + assert b'invalid_ipv6' not in stix_content # Should be skipped diff --git a/threat_intelligence_toolkit.py b/threat_intelligence_toolkit.py index 3744121..418f5e0 100644 --- a/threat_intelligence_toolkit.py +++ b/threat_intelligence_toolkit.py @@ -1,14 +1,14 @@ #!/usr/bin/env python3 # # COPYRIGHT 2022 BY EXTRAHOP NETWORKS, INC. -# +# # This file is subject to the terms and conditions defined in # file 'LICENSE', which is part of this source code package. # -# Description: Threat Intelligence Toolkit - Automate generating or pulling threat intelligence Structured Threat -# Information Expression (STIX) files from a flat file or from a TAXII server and uploading a threat collection to -# an ECA and multiple EDAs via the REST API. By uploading STIX files, you can add a threat collection to your ExtraHop -# Discover and Command appliances. Threat collections enable you to identify suspicious hosts, IP addresses, and URIs +# Description: Threat Intelligence Toolkit - Automate generating or pulling threat intelligence Structured Threat +# Information Expression (STIX) files from a flat file or from a TAXII server and uploading a threat collection to +# an ECA and multiple EDAs via the REST API. By uploading STIX files, you can add a threat collection to your ExtraHop +# Discover and Command appliances. Threat collections enable you to identify suspicious hosts, IP addresses, and URIs # on your network # # Usage: Specify an output directory, threat collection name, ECA/EDA details, and other optional config via @@ -20,19 +20,21 @@ # # Version 1.3.6 -import cabby -import requests -import urllib3 -import validators -import ipaddress -import datetime -import pytz -import argparse +from __future__ import annotations +from typing import BinaryIO import os import sys import tarfile import shutil import logging +import argparse +import datetime +import ipaddress +import cabby +import requests +import urllib3 +import validators +import pytz from cybox.objects.uri_object import URI from cybox.objects.domain_name_object import DomainName from cybox.objects.address_object import Address @@ -40,9 +42,10 @@ from stix.core import STIXPackage, STIXHeader from stix.indicator.indicator import Indicator + # parse command line arguments -def parse_command_line_args(): - +def parse_command_line_args() -> argparse.Namespace: + argparser = argparse.ArgumentParser() argparser.add_argument('-o', '--output-dir', action='store', dest='output_dir', help='Existing directory to output tgz containing stix files', required=True, metavar='OUTPUT_DIRECTORY') @@ -56,7 +59,7 @@ def parse_command_line_args(): # options below are for generating a STIX file from a flat file argparser.add_argument('--generate-stix', action='store_true', dest='generate_stix', default=False, help='Create a stix file from a flat file. Requires that --input-file and --type are set.') argparser.add_argument('--input-file', action='store', dest='input_file', help='Full path of delimited list file. Also accepts a URL to a file. Ignored if --generate-stix is not set.', metavar='INPUT_FILE') - argparser.add_argument('--list-type', action='store', dest='list_type', choices=['ip','domain', 'url'], help='Type of the input items in the provided list (list must all be the same type), allowed values: ip, domain, url. Ignored if --generate-stix is not set.', metavar='LIST_TYPE') + argparser.add_argument('--list-type', action='store', dest='list_type', choices=['ip', 'domain', 'url'], help='Type of the input items in the provided list (list must all be the same type), allowed values: ip, domain, url. Ignored if --generate-stix is not set.', metavar='LIST_TYPE') argparser.add_argument('--delimiter', action='store', dest='delimiter', help='Delimiter for the input list file. Ignored if --generate-stix is not set.', default='\n', metavar='INPUT_FILE') argparser.add_argument('--list-name', action='store', dest='list_name', help='Name of the list or provider to be used in the created stix file. Ignored if --generate-stix is not set.', default='Threat Intel List', metavar='LIST_NAME') argparser.add_argument('--validate', action='store_true', dest='validate_input', default=False, help='Validate each Domain/URL before adding to generated stix file (beta). Requires that --generate-stix is set.') @@ -75,16 +78,19 @@ def parse_command_line_args(): return args + # strip all non alphanumeric chars from a string -def strip_non_alphanum(input_str): +def strip_non_alphanum(input_str: str) -> str: return ''.join(char for char in input_str if char.isalnum()) + # convert string to boolean and return True when unsure -def str_to_bool(input_str): +def str_to_bool(input_str: str) -> bool: return input_str.lower() not in ['false', 'f', '0', 'n', 'no'] + # send a PUT request to an EDA or ECA threatcollections/{id} endpoint -def threatcollection_api_request(eh_host, eh_apikey, eh_verify_cert, threatcollection_name, file_name, file_path, verbose): +def threatcollection_api_request(eh_host: str, eh_apikey: str, eh_verify_cert: bool, threatcollection_name: str, file_name: str, file_path: str, verbose: bool) -> None: if verbose: logging.info("===============") @@ -93,12 +99,16 @@ def threatcollection_api_request(eh_host, eh_apikey, eh_verify_cert, threatcolle user_key = strip_non_alphanum(threatcollection_name) - headers = {'Accept': 'application/json', 'Authorization': "ExtraHop apikey={}".format(eh_apikey)} + headers = {'Accept': 'application/json', 'Authorization': f"ExtraHop apikey={eh_apikey}"} - url = "https://{}/api/v1/threatcollections/~{}".format(eh_host, user_key) + url = f"https://{eh_host}/api/v1/threatcollections/~{user_key}" # configure tgz for multipart file upload - file_body = {'file': (file_name, open(file_path, 'rb')), 'name': threatcollection_name} + # file_body = {'file': (file_name, open(file_path, 'rb')), 'name': threatcollection_name} + file_handle: BinaryIO + with open(file_path, 'rb') as file_handle: + files = {'file': (file_name, file_handle)} + data = {'name': threatcollection_name} # log InsecureRequestWarning if making an unverified https request if not eh_verify_cert: @@ -106,21 +116,23 @@ def threatcollection_api_request(eh_host, eh_apikey, eh_verify_cert, threatcolle try: # send PUT request to create or update - r = requests.put(url, headers=headers, files=file_body, verify=eh_verify_cert) + # r = requests.put(url, headers=headers, files=file_body, verify=eh_verify_cert) + r = requests.put(url, headers=headers, files=files, data=data, verify=eh_verify_cert) except Exception as e: - logging.error("Issue encountered while sending an API request to {}. Details: {}".format(url, e)) + logging.error(f"Issue encountered while sending an API request to {url}. Details: {e}") raise # handle non 200 response if r.status_code >= 200 and r.status_code < 300: - logging.info("Successfully uploaded {} to {} as threatcollection named {} with user_key {}".format(file_name, eh_host, threatcollection_name, user_key)) + logging.info(f"Successfully uploaded {file_name} to {eh_host} as threatcollection named {threatcollection_name} with user_key {user_key}") else: - logging.error(("Non-200 status code from ExtraHop API request. Status code: {}, URL: {}, Response: {}".format(r.status_code, url, r.text))) - raise ValueError("Non-200 status code from ExtraHop API request. Status code: {}, URL: {}, Response: {}".format(r.status_code, url, r.text)) + logging.error(f"Non-200 status code from ExtraHop API request. Status code: {r.status_code}, URL: {url}, Response: {r.text}") + raise ValueError(f"Non-200 status code from ExtraHop API request. Status code: {r.status_code}, URL: {url}, Response: {r.text}") return + # generate stix files from a flat file or URL to a flat file -def generate_stix_file(input_file, list_type, delimiter, list_name, tc_name, tmp_dir, validate, verbose): +def generate_stix_file(input_file: str, list_type: str, delimiter: str, list_name: str, tc_name: str, tmp_dir: str, validate: bool, verbose: bool) -> None: # observable limit per generated stix file OBSERVABLES_PER_STIX_FILE = 3000 @@ -128,7 +140,7 @@ def generate_stix_file(input_file, list_type, delimiter, list_name, tc_name, tmp logging.info("=====================") logging.info("== GENERATING STIX ==") logging.info("=====================") - + # download or open input file if validators.url(input_file): res = requests.get(input_file) @@ -136,12 +148,12 @@ def generate_stix_file(input_file, list_type, delimiter, list_name, tc_name, tmp else: # exit if input file doesn't exist if not os.path.isfile(input_file): - logging.error("Supplied input file '{}' doesn't exist".format(input_file)) - sys.exit("Error: Supplied input file '{}' doesn't exist".format(input_file)) + logging.error(f"Supplied input file '{input_file}' doesn't exist") + sys.exit(f"Error: Supplied input file '{input_file}' doesn't exist") else: with open(input_file, 'r') as f: items = f.read().split(delimiter) - logging.info("Successfully parsed input file at {}".format(input_file)) + logging.info(f"Successfully parsed input file at {input_file}") # slice input into batches for batch_num, index in enumerate(range(0, len(items), OBSERVABLES_PER_STIX_FILE), 1): @@ -175,17 +187,18 @@ def generate_stix_file(input_file, list_type, delimiter, list_name, tc_name, tmp elif parsed_ip.version == 6: indicator_obj.category = Address.CAT_IPV6 else: - logging.warning("Unknown IP Address version type: {} - skipping".format(parsed_ip.version)) + logging.warning(f"Unknown IP Address version type: {parsed_ip.version} - skipping") continue + indicator_obj.address_value = str(parsed_ip) except ValueError: # if ip address parsing fails then attempt to parse as an ip network try: - parsed_ip = ipaddress.ip_network(item, strict=False) + parsed_network = ipaddress.ip_network(item, strict=False) indicator_obj.category = Address.CAT_CIDR + indicator_obj.address_value = str(parsed_network) except ValueError: - logging.warning("IP Address {} is neither an IPv4, IPv6, nor CIDR - skipping".format(item)) + logging.warning(f"IP Address {item} is neither an IPv4, IPv6, nor CIDR - skipping") continue - indicator_obj.address_value = str(parsed_ip) indicator_obj.condition = "Equals" indicator_type = "IP Watchlist" # customizable components below @@ -194,7 +207,7 @@ def generate_stix_file(input_file, list_type, delimiter, list_name, tc_name, tmp elif list_type == 'domain': # validate domain if validate and not validators.domain(item): - logging.warning("Invalid domain: {} - skipping".format(item)) + logging.warning(f"Invalid domain: {item} - skipping") continue indicator_obj = DomainName() indicator_obj.value = item @@ -205,11 +218,11 @@ def generate_stix_file(input_file, list_type, delimiter, list_name, tc_name, tmp elif list_type == 'url': # validate url if validate and not validators.url(item): - logging.warning("Invalid url: {} - skipping".format(item)) + logging.warning(f"Invalid url: {item} - skipping") continue indicator_obj = URI() indicator_obj.value = item - indicator_obj.type_ = URI.TYPE_URL + indicator_obj.type_ = URI.TYPE_URL indicator_obj.condition = "Equals" indicator_type = "URL Watchlist" # customizable components below @@ -233,29 +246,30 @@ def generate_stix_file(input_file, list_type, delimiter, list_name, tc_name, tmp package.add_indicator(indicator) # save each batch in a separate stix file with the filename ending ..._part_N.stix - collection_filename = "{}_part_{}.stix".format(strip_non_alphanum(tc_name), batch_num) + collection_filename = f"{strip_non_alphanum(tc_name)}_part_{batch_num}.stix" with open(os.path.join(tmp_dir, collection_filename), 'wb') as f: f.write(package.to_xml()) - logging.info("Successfully created stix file {}".format(collection_filename)) + logging.info(f"Successfully created stix file {collection_filename}") # clear cybox cache to prevent an Out of Memory error # https://cybox.readthedocs.io/en/stable/api/cybox/core/object.html#cybox.core.object.Object cache_clear() - - return + + return + # poll a taxii server for stix files -def poll_taxii_server(taxii_server, basic_user, basic_pw, taxii_collections, days_to_poll, tmp_dir, verbose): - # if no taxii server details are specified then default to the EclecticIQ OpenTAXII threat intel feed +def poll_taxii_server(taxii_server: list[str], basic_user: str, basic_pw: str, taxii_collections: str, days_to_poll: int, tmp_dir: str, verbose: bool) -> None: + # if no taxii server details are specified then default to the OTX AlienVault/AT&T Cybersecurity threat intel feed if not taxii_server: - taxii_server = ["open.taxiistand.com", "/services/discovery", "True"] + taxii_server = ["otx.alienvault.com", "/taxii/discovery", "True"] try: # handle taxii server port if supplied - taxii_server_port = None + taxii_server_port: str | None = None if ':' in taxii_server[0]: taxii_server[0], taxii_server_port = taxii_server[0].split(':') - + # setup taxii client taxii_client = cabby.create_client( host=taxii_server[0], @@ -274,7 +288,7 @@ def poll_taxii_server(taxii_server, basic_user, basic_pw, taxii_collections, day services = taxii_client.discover_services() collections = taxii_client.get_collections() except Exception as e: - logging.error("Issue encountered while setting up or querying with the TAXII client. Details: {}".format(e)) + logging.error(f"Issue encountered while setting up or querying with the TAXII client. Details: {e}") raise # verbose taxii server info @@ -283,19 +297,19 @@ def poll_taxii_server(taxii_server, basic_user, basic_pw, taxii_collections, day logging.info("== SERVICES ==") logging.info("==============") for service in services: - logging.info("Service type={s.type}, address={s.address}, available={s.available}, message={s.message}, version={s.version}, protocol={s.protocol}".format(s=service)) + logging.info(f"Service type={service.type}, address={service.address}, available={service.available}, message={service.message}, version={service.version}, protocol={service.protocol}") logging.info("=================") logging.info("== COLLECTIONS ==") logging.info("=================") for collection in collections: - logging.info("Collection name={c.name}, description={c.description}, available={c.available}".format(c=collection)) + logging.info(f"Collection name={collection.name}, description={collection.description}, available={collection.available}") logging.info("=============") logging.info("== POLLING ==") logging.info("=============") - # if specified, filter only the supplied collection(s) + # if specified, filter only the supplied collection(s) if taxii_collections: collections = filter(lambda collection: collection.name in taxii_collections, collections) @@ -307,28 +321,29 @@ def poll_taxii_server(taxii_server, basic_user, basic_pw, taxii_collections, day i = 0 # save each returned content block in a separate file with the filename ending ..._part_N.stix for i, block in enumerate(content_blocks, 1): - collection_filename = "{}_part_{}.stix".format(collection.name, i) + collection_filename = f"{collection.name}_part_{i}.stix" with open(os.path.join(tmp_dir, collection_filename), 'wb') as f: f.write(block.content) if verbose: if i != 0: - logging.info("Successfully downloaded collection {} into {} file(s)".format(collection.name, i)) + logging.info(f"Successfully downloaded collection {collection.name} into {i} file(s)") else: - logging.warning("Successfully polled collection {}, but there was nothing to download for the specified timeframe".format(collection.name)) + logging.warning(f"Successfully polled collection {collection.name}, but there was nothing to download for the specified timeframe") except Exception as e: if verbose: - logging.error("Could not download collection: {}. Details: {}".format(collection.name, e)) + logging.error(f"Could not download collection: {collection.name}. Details: {e}") continue - + return -def main(): + +def main() -> None: # retrive command line arguments args = parse_command_line_args() # ensure supplied directory exists if not os.path.isdir(args.output_dir): - sys.exit("Error: Supplied output directory '{}' either doesn't exist or is not a directory".format(args.output_dir)) + sys.exit(f"Error: Supplied output directory '{args.output_dir}' either doesn't exist or is not a directory") # disable insecure request warnings to stdout, will still log warnings urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) @@ -339,7 +354,7 @@ def main(): logging.info("ExtraHop Threat Intelligence Toolkit started running") # make temporary directory - tmp_dir_name = "{}_{}".format(strip_non_alphanum(args.threat_collection_name), datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")) + tmp_dir_name = f"{strip_non_alphanum(args.threat_collection_name)}_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}" tmp_dir = os.path.join(args.output_dir, tmp_dir_name) os.makedirs(tmp_dir) @@ -353,11 +368,11 @@ def main(): # only proceed with packaging and uploading if there are files present if os.listdir(tmp_dir): # create the gzipped tar file of the temporary directory - tgz_name = "{}.tgz".format(tmp_dir_name) + tgz_name = f"{tmp_dir_name}.tgz" tgz_path = os.path.join(args.output_dir, tgz_name) with tarfile.open(tgz_path, "w:gz") as tar: tar.add(tmp_dir, arcname=os.path.basename(tmp_dir)) - logging.info("Successfully created tgz file named {} in {}".format(tgz_name, args.output_dir)) + logging.info(f"Successfully created tgz file named {tgz_name} in {args.output_dir}") # upload the threat collection to one ECA and one or more EDAs if args.eca: @@ -371,7 +386,7 @@ def main(): # if only EDAs are provided elif args.edas: for eda in args.edas: - threatcollection_api_request(eda[0], eda[1], str_to_bool(eda[2]), args.threat_collection_name, tgz_name, tgz_path, args.verbose) + threatcollection_api_request(eda[0], eda[1], str_to_bool(eda[2]), args.threat_collection_name, tgz_name, tgz_path, args.verbose) else: logging.warning("Did not upload threat collection to an ExtraHop appliance since neither an ECA/EDAs nor EDAs were provided") @@ -379,9 +394,9 @@ def main(): if (args.eca or args.edas) and args.clean_up: try: os.remove(tgz_path) - logging.info("Successfully cleaned up and removed the local threat collection tgz file: {}".format(tgz_name)) + logging.info(f"Successfully cleaned up and removed the local threat collection tgz file: {tgz_name}") except OSError as e: - logging.error("Could not delete the local threat collection .tgz file: {}. Details: {}.".format(tgz_path, e.strerror)) + logging.error(f"Could not delete the local threat collection .tgz file: {tgz_path}. Details: {e.strerror}.") else: logging.warning('There were no threat intel results to process. Note: If polling a TAXII server ensure that the collection(s) contain results') @@ -389,9 +404,10 @@ def main(): try: shutil.rmtree(tmp_dir) except OSError as e: - logging.error("Could not delete the temporary directory: {}. Details: {}.".format(tmp_dir_name, e.strerror)) + logging.error(f"Could not delete the temporary directory: {tmp_dir_name}. Details: {e.strerror}.") logging.info('ExtraHop Threat Intelligence Toolkit finished running') + if __name__ == '__main__': - main() \ No newline at end of file + main()