-
Notifications
You must be signed in to change notification settings - Fork 27
/
Copy pathvalidate.py
138 lines (123 loc) · 6.01 KB
/
validate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import pathlib
from contentctl.enrichments.attack_enrichment import AttackEnrichment
from contentctl.enrichments.cve_enrichment import CveEnrichment
from contentctl.helper.splunk_app import SplunkApp
from contentctl.helper.utils import Utils
from contentctl.input.director import Director, DirectorOutputDto, ValidationFailedError
from contentctl.objects.atomic import AtomicEnrichment
from contentctl.objects.config import validate
from contentctl.objects.data_source import DataSource
from contentctl.objects.lookup import FileBackedLookup
class Validate:
def execute(self, input_dto: validate) -> DirectorOutputDto:
try:
director_output_dto = DirectorOutputDto(
AtomicEnrichment.getAtomicEnrichment(input_dto),
AttackEnrichment.getAttackEnrichment(input_dto),
CveEnrichment.getCveEnrichment(input_dto),
[],
[],
[],
[],
[],
[],
[],
[],
[],
[],
)
director = Director(director_output_dto)
director.execute(input_dto)
self.ensure_no_orphaned_files_in_lookups(
input_dto.path, director_output_dto
)
if input_dto.data_source_TA_validation:
self.validate_latest_TA_information(director_output_dto.data_sources)
return director_output_dto
except ValidationFailedError:
# Just re-raise without additional output since we already formatted everything
raise SystemExit(1)
def ensure_no_orphaned_files_in_lookups(
self, repo_path: pathlib.Path, director_output_dto: DirectorOutputDto
):
"""
This function ensures that only files which are relevant to lookups are included in the lookups folder.
This means that a file must be either:
1. A lookup YML (.yml)
2. A lookup CSV (.csv) which is referenced by a YML
3. A lookup MLMODEL (.mlmodel) which is referenced by a YML.
All other files, includes CSV and MLMODEL files which are NOT
referenced by a YML, will generate an exception from this function.
Args:
repo_path (pathlib.Path): path to the root of the app
director_output_dto (DirectorOutputDto): director object with all constructed content
Raises:
Exception: An Exception will be raised if there are any non .yml, .csv, or .mlmodel
files in this directory. Additionally, an exception will be raised if there
exists one or more .csv or .mlmodel files that are not referenced by at least 1
detection .yml file in this directory.
This avoids having additional, unused files in this directory that may be copied into
the app when it is built (which can cause appinspect errors or larger app size.)
"""
lookupsDirectory = repo_path / "lookups"
# Get all of the files referneced by Lookups
usedLookupFiles: list[pathlib.Path] = [
lookup.filename
for lookup in director_output_dto.lookups
if isinstance(lookup, FileBackedLookup)
] + [
lookup.file_path
for lookup in director_output_dto.lookups
if lookup.file_path is not None
]
# Get all of the mlmodel and csv files in the lookups directory
csvAndMlmodelFiles = Utils.get_security_content_files_from_directory(
lookupsDirectory,
allowedFileExtensions=[".yml", ".csv", ".mlmodel"],
fileExtensionsToReturn=[".csv", ".mlmodel"],
)
# Generate an exception of any csv or mlmodel files exist but are not used
unusedLookupFiles: list[pathlib.Path] = [
testFile
for testFile in csvAndMlmodelFiles
if testFile not in usedLookupFiles
]
if len(unusedLookupFiles) > 0:
raise Exception(
f"The following .csv or .mlmodel files exist in '{lookupsDirectory}', but are not referenced by a lookup file: {[str(path) for path in unusedLookupFiles]}"
)
return
def validate_latest_TA_information(self, data_sources: list[DataSource]) -> None:
validated_TAs: list[tuple[str, str]] = []
errors: list[str] = []
print("----------------------")
print("Validating latest TA:")
print("----------------------")
for data_source in data_sources:
for supported_TA in data_source.supported_TA:
ta_identifier = (supported_TA.name, supported_TA.version)
if ta_identifier in validated_TAs:
continue
if supported_TA.url is not None:
validated_TAs.append(ta_identifier)
uid = int(str(supported_TA.url).rstrip("/").split("/")[-1])
try:
splunk_app = SplunkApp(app_uid=uid)
if splunk_app.latest_version != supported_TA.version:
errors.append(
f"Version mismatch in '{data_source.file_path}' supported TA '{supported_TA.name}'"
f"\n Latest version on Splunkbase : {splunk_app.latest_version}"
f"\n Version specified in data source: {supported_TA.version}"
)
except Exception as e:
errors.append(
f"Error processing checking version of TA {supported_TA.name}: {str(e)}"
)
if len(errors) > 0:
errorString = "\n\n".join(errors)
raise Exception(
f"[{len(errors)}] or more TA versions are out of date or have other errors."
f"Please update the following data sources with the latest versions of "
f"their supported tas:\n\n{errorString}"
)
print("All TA versions are up to date.")