Skip to content

Enable Attack Data Download before Test #392

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
May 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import time
import urllib.parse
import uuid
from shutil import copyfile
from ssl import SSLEOFError, SSLZeroReturnError
from sys import stdout
from tempfile import TemporaryDirectory, mktemp
Expand Down Expand Up @@ -1402,7 +1401,6 @@ def replay_attack_data_file(
f"The only valid indexes on the server are {self.all_indexes_on_server}"
)

tempfile = mktemp(dir=tmp_dir)
if not (
str(attack_data_file.data).startswith("http://")
or str(attack_data_file.data).startswith("https://")
Expand All @@ -1415,13 +1413,7 @@ def replay_attack_data_file(
test_group_start_time,
)

try:
copyfile(str(attack_data_file.data), tempfile)
except Exception as e:
raise Exception(
f"Error copying local Attack Data File for [{test_group.name}] - [{attack_data_file.data}]: "
f"{str(e)}"
)
tempfile = str(attack_data_file.data)
else:
raise Exception(
f"Attack Data File for [{test_group.name}] is local [{attack_data_file.data}], but does not exist."
Expand All @@ -1432,6 +1424,7 @@ def replay_attack_data_file(
# We need to overwrite the file - mkstemp will create an empty file with the
# given name
try:
tempfile = mktemp(dir=tmp_dir)
# In case the path is a local file, try to get it

self.format_pbar_string(
Expand Down
1 change: 1 addition & 0 deletions contentctl/contentctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def init_func(config: test):


def validate_func(config: validate) -> DirectorOutputDto:
config.check_test_data_caches()
validate = Validate()
return validate.execute(config)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ def search_rba_fields_exist_validate(self):
return self

@field_validator("tests", mode="before")
def ensure_yml_test_is_unittest(cls, v: list[dict]):
def ensure_yml_test_is_unittest(cls, v: list[dict], info: ValidationInfo):
"""The typing for the tests field allows it to be one of
a number of different types of tests. However, ONLY
UnitTest should be allowed to be defined in the YML
Expand Down Expand Up @@ -941,7 +941,7 @@ def ensure_yml_test_is_unittest(cls, v: list[dict]):
for unitTest in v:
# This raises a ValueError on a failed UnitTest.
try:
UnitTest.model_validate(unitTest)
UnitTest.model_validate(unitTest, context=info.context)
except ValueError as e:
valueErrors.append(e)
if len(valueErrors):
Expand Down
173 changes: 173 additions & 0 deletions contentctl/objects/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
field_validator,
model_validator,
)
from requests import RequestException, head

from contentctl.helper.splunk_app import SplunkApp
from contentctl.helper.utils import Utils
Expand Down Expand Up @@ -261,6 +262,37 @@ class init(Config_Base):
)


# There can be a number of attack data file warning mapping exceptions, or errors,
# that can occur when using attack data caches. In order to avoid very complex
# output, we will only emit the verbose versions of these message once per file.
# This is a non-intuitive place to put this, but it is good enough for now.
ATTACK_DATA_CACHE_MAPPING_EXCEPTIONS: set[str] = set()


class AttackDataCache(BaseModel):
base_url: str = Field(
"This is the beginning of a URL that the data must begin with to map to this cache object."
)
base_directory_name: str = Field(
"This is the root folder name where the attack data should be downloaded to. Note that this path MUST be in the external_repos/ folder",
pattern=r"^external_repos/.+",
)
# suggested checkout information for our attack_data repo
# curl https://attack-range-attack-data.s3.us-west-2.amazonaws.com/attack_data.tar.zstd | zstd --decompress | tar -x -C attack_data/
# suggested YML values for this:
helptext: str | None = Field(
default="This repo is set up to use test_data_caches. This can be extremely helpful in validating correct links for test attack_data and speeding up testing.\n"
"Include the following in your contentctl.yml file to use this cache:\n\n"
"test_data_caches:\n"
"- base_url: https://media.githubusercontent.com/media/splunk/attack_data/master/\n"
" base_directory_name: external_repos/attack_data\n\n"
"In order to check out STRT Attack Data, you can use the following command:\n"
"mkdir -p external_repos; curl https://attack-range-attack-data.s3.us-west-2.amazonaws.com/attack_data.tar.zstd | zstd --decompress | tar -x -C external_repos/\n"
"or\n"
"""echo "First ensure git-lfs is enabled"; git clone https://github.com/splunk/attack_data external_repos/attack_data"""
)


class validate(Config_Base):
model_config = ConfigDict(validate_default=True, arbitrary_types_allowed=True)
enforce_deprecation_mapping_requirement: bool = Field(
Expand Down Expand Up @@ -291,10 +323,151 @@ class validate(Config_Base):
default=False, description="Validate latest TA information from Splunkbase"
)

test_data_caches: list[AttackDataCache] = Field(
default=[],
description="A list of attack data that can "
"be used in lieu of the HTTPS download links "
"of each test data file. This cache can significantly "
"increase overall test speed, ensure the correctness of "
"links at 'contentctl validate' time, and reduce errors "
"associated with failed responses from file servers.",
)

@property
def external_repos_path(self) -> pathlib.Path:
return self.path / "external_repos"

# We can't make this a validator because the constructor
# is called many times - we don't want to print this out many times.
def check_test_data_caches(self) -> Self:
"""
Check that the test data caches actually exist at the specified paths.
If they do exist, then do nothing. If they do not, then emit the helpext, but
do not raise an exception. They are not required, but can significantly speed up
and reduce the flakiness of tests by reducing failed HTTP requests.
"""
if not self.verbose:
# Ignore the check and error output if we are not in verbose mode
return self
for cache in self.test_data_caches:
cache_path = self.path / cache.base_directory_name
if not cache_path.is_dir():
print(cache.helptext)
else:
build_date_file = cache_path / "cache_build_date.txt"
git_hash_file = cache_path / "git_hash.txt"

if build_date_file.is_file():
# This is a cache that was built by contentctl. We can use this to
# determine if the cache is out of date.
with open(build_date_file, "r") as f:
build_date = f.read().strip()
else:
build_date = "<UNKNOWN_DATE>"
if git_hash_file.is_file():
# This is a cache that was built by contentctl. We can use this to
# determine if the cache is out of date.
with open(git_hash_file, "r") as f:
git_hash = f.read().strip()
else:
git_hash = "<UNKNOWN_HASH>"

print(
f"Found attack data cache at [{cache_path}]\n**Cache Build Date: {build_date}\n**Repo Git Hash : {git_hash}\n"
)

return self

def map_to_attack_data_cache(
self, filename: HttpUrl | FilePath, verbose: bool = False
) -> HttpUrl | FilePath:
if str(filename) in ATTACK_DATA_CACHE_MAPPING_EXCEPTIONS:
# This is already something that we have emitted a warning or
# Exception for. We don't want to emit it again as it will
# pollute the output.
return filename

# If this is simply a link to a file directly, then no mapping
# needs to take place. Return the link to the file.
if isinstance(filename, pathlib.Path):
return filename

if len(self.test_data_caches) == 0:
return filename

# Otherwise, this is a URL. See if its prefix matches one of the
# prefixes in the list of caches
for cache in self.test_data_caches:
root_folder_path = self.path / cache.base_directory_name
# See if this data file was in that path

if str(filename).startswith(cache.base_url):
new_file_name = str(filename).replace(cache.base_url, "")
new_file_path = root_folder_path / new_file_name

if not root_folder_path.is_dir():
# This has not been checked out. Even though we want to use this cache
# whenever possible, we don't want to force it.
return filename

if new_file_path.is_file():
# We found the file in the cache. Return the new path
return new_file_path

# Any thing below here is non standard behavior that will produce either a warning message,
# an error, or both. We onyl want to do this once for each file, even if it is used
# across multiple different detections.
ATTACK_DATA_CACHE_MAPPING_EXCEPTIONS.add(str(filename))

# The cache exists, but we didn't find the file. We will emit an informational warning
# for this, but this is not an exception. Instead, we will just fall back to using
# the original URL.
if verbose:
# Give some extra context about missing attack data files/bad mapping
try:
h = head(str(filename))
h.raise_for_status()

except RequestException:
raise ValueError(
f"Error resolving the attack_data file {filename}. "
f"It was missing from the cache {cache.base_directory_name} and a download from the server failed."
)
print(
f"\nFilename {filename} not found in cache {cache.base_directory_name}, but exists on the server. "
f"Your cache {cache.base_directory_name} may be out of date."
)
return filename
if verbose:
# Any thing below here is non standard behavior that will produce either a warning message,
# an error, or both. We onyl want to do this once for each file, even if it is used
# across multiple different detections.
ATTACK_DATA_CACHE_MAPPING_EXCEPTIONS.add(str(filename))

# Give some extra context about missing attack data files/bad mapping
url = f"Attack Data : {filename}"
prefixes = "".join(
[
f"\n Valid Prefix: {cache.base_url}"
for cache in self.test_data_caches
]
)
# Give some extra context about missing attack data files/bad mapping
try:
h = head(str(filename))
h.raise_for_status()
except RequestException:
raise ValueError(
f"Error resolving the attack_data file {filename}. It was missing from all caches and a download from the server failed.\n"
f"{url}{prefixes}\n"
)

print(
f"\nAttack Data Missing from all caches, but present at URL:\n{url}{prefixes}"
)

return filename

@property
def mitre_cti_repo_path(self) -> pathlib.Path:
return self.external_repos_path / "cti"
Expand Down
37 changes: 36 additions & 1 deletion contentctl/objects/test_attack_data.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
from __future__ import annotations
from pydantic import BaseModel, HttpUrl, FilePath, Field, ConfigDict

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from contentctl.objects.config import validate

from pydantic import (
BaseModel,
ConfigDict,
Field,
FilePath,
HttpUrl,
ValidationInfo,
field_validator,
)


class TestAttackData(BaseModel):
Expand All @@ -11,3 +25,24 @@ class TestAttackData(BaseModel):
sourcetype: str = Field(...)
custom_index: str | None = None
host: str | None = None

@field_validator("data", mode="after")
@classmethod
def check_for_existence_of_attack_data_repo(
cls, value: HttpUrl | FilePath, info: ValidationInfo
) -> HttpUrl | FilePath:
# this appears to be called more than once, the first time
# info.context is always None. In this case, just return what
# was passed.
if not info.context:
return value

# When the config is passed, used it to determine if we can map
# the test data to a file on disk
if info.context.get("config", None):
config: validate = info.context.get("config", None)
return config.map_to_attack_data_cache(value, verbose=config.verbose)
else:
raise ValueError(
"config not passed to TestAttackData constructor in context"
)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "contentctl"

version = "5.4.1"
version = "5.5.0"

description = "Splunk Content Control Tool"
authors = ["STRT <[email protected]>"]
Expand Down
Loading