diff --git a/contentctl/actions/validate.py b/contentctl/actions/validate.py index 2a16aa37..620d8241 100644 --- a/contentctl/actions/validate.py +++ b/contentctl/actions/validate.py @@ -1,41 +1,48 @@ import pathlib -from contentctl.input.director import Director, DirectorOutputDto -from contentctl.objects.config import validate from contentctl.enrichments.attack_enrichment import AttackEnrichment from contentctl.enrichments.cve_enrichment import CveEnrichment -from contentctl.objects.atomic import AtomicEnrichment -from contentctl.objects.lookup import FileBackedLookup +from contentctl.helper.splunk_app import SplunkApp from contentctl.helper.utils import Utils +from contentctl.input.director import Director, DirectorOutputDto, ValidationFailedError +from contentctl.objects.atomic import AtomicEnrichment +from contentctl.objects.config import validate from contentctl.objects.data_source import DataSource -from contentctl.helper.splunk_app import SplunkApp +from contentctl.objects.lookup import FileBackedLookup class Validate: def execute(self, input_dto: validate) -> DirectorOutputDto: - director_output_dto = DirectorOutputDto( - AtomicEnrichment.getAtomicEnrichment(input_dto), - AttackEnrichment.getAttackEnrichment(input_dto), - CveEnrichment.getCveEnrichment(input_dto), - [], - [], - [], - [], - [], - [], - [], - [], - [], - [], - ) + try: + director_output_dto = DirectorOutputDto( + AtomicEnrichment.getAtomicEnrichment(input_dto), + AttackEnrichment.getAttackEnrichment(input_dto), + CveEnrichment.getCveEnrichment(input_dto), + [], + [], + [], + [], + [], + [], + [], + [], + [], + [], + ) + + director = Director(director_output_dto) + director.execute(input_dto) + self.ensure_no_orphaned_files_in_lookups( + input_dto.path, director_output_dto + ) + if input_dto.data_source_TA_validation: + self.validate_latest_TA_information(director_output_dto.data_sources) - director = Director(director_output_dto) - director.execute(input_dto) - self.ensure_no_orphaned_files_in_lookups(input_dto.path, director_output_dto) - if input_dto.data_source_TA_validation: - self.validate_latest_TA_information(director_output_dto.data_sources) + return director_output_dto - return director_output_dto + except ValidationFailedError: + # Just re-raise without additional output since we already formatted everything + raise SystemExit(1) def ensure_no_orphaned_files_in_lookups( self, repo_path: pathlib.Path, director_output_dto: DirectorOutputDto diff --git a/contentctl/input/director.py b/contentctl/input/director.py index 1b637101..ef362652 100644 --- a/contentctl/input/director.py +++ b/contentctl/input/director.py @@ -1,30 +1,29 @@ import os import sys -from pathlib import Path from dataclasses import dataclass, field -from pydantic import ValidationError +from pathlib import Path from uuid import UUID -from contentctl.input.yml_reader import YmlReader -from contentctl.objects.detection import Detection -from contentctl.objects.story import Story +from pydantic import ValidationError -from contentctl.objects.baseline import Baseline -from contentctl.objects.investigation import Investigation -from contentctl.objects.playbook import Playbook -from contentctl.objects.deployment import Deployment -from contentctl.objects.macro import Macro -from contentctl.objects.lookup import LookupAdapter, Lookup -from contentctl.objects.atomic import AtomicEnrichment -from contentctl.objects.security_content_object import SecurityContentObject -from contentctl.objects.data_source import DataSource -from contentctl.objects.dashboard import Dashboard from contentctl.enrichments.attack_enrichment import AttackEnrichment from contentctl.enrichments.cve_enrichment import CveEnrichment - +from contentctl.helper.utils import Utils +from contentctl.input.yml_reader import YmlReader +from contentctl.objects.atomic import AtomicEnrichment +from contentctl.objects.baseline import Baseline from contentctl.objects.config import validate +from contentctl.objects.dashboard import Dashboard +from contentctl.objects.data_source import DataSource +from contentctl.objects.deployment import Deployment +from contentctl.objects.detection import Detection from contentctl.objects.enums import SecurityContentType -from contentctl.helper.utils import Utils +from contentctl.objects.investigation import Investigation +from contentctl.objects.lookup import Lookup, LookupAdapter +from contentctl.objects.macro import Macro +from contentctl.objects.playbook import Playbook +from contentctl.objects.security_content_object import SecurityContentObject +from contentctl.objects.story import Story @dataclass @@ -93,6 +92,39 @@ def addContentToDictMappings(self, content: SecurityContentObject): self.uuid_to_content_map[content.id] = content +class Colors: + HEADER = "\033[95m" + BLUE = "\033[94m" + CYAN = "\033[96m" + GREEN = "\033[92m" + YELLOW = "\033[93m" + RED = "\033[91m" + BOLD = "\033[1m" + UNDERLINE = "\033[4m" + END = "\033[0m" + MAGENTA = "\033[35m" + BRIGHT_MAGENTA = "\033[95m" + + # Add fallback symbols for Windows + CHECK_MARK = "✓" if sys.platform != "win32" else "*" + WARNING = "⚠️" if sys.platform != "win32" else "!" + ERROR = "❌" if sys.platform != "win32" else "X" + ARROW = "🎯" if sys.platform != "win32" else ">" + TOOLS = "🛠️" if sys.platform != "win32" else "#" + DOCS = "📚" if sys.platform != "win32" else "?" + BULB = "💡" if sys.platform != "win32" else "i" + SEARCH = "🔍" if sys.platform != "win32" else "@" + ZAP = "⚡" if sys.platform != "win32" else "!" + + +class ValidationFailedError(Exception): + """Custom exception for validation failures that already have formatted output.""" + + def __init__(self, message: str): + self.message = message + super().__init__(message) + + class Director: input_dto: validate output_dto: DirectorOutputDto @@ -255,18 +287,92 @@ def createSecurityContent(self, contentType: SecurityContentType) -> None: end="", flush=True, ) - print("Done!") if len(validation_errors) > 0: - errors_string = "\n\n".join( - [ - f"File: {e_tuple[0]}\nError: {str(e_tuple[1])}" - for e_tuple in validation_errors - ] + print("\n") # Clean separation + print(f"{Colors.BOLD}{Colors.BRIGHT_MAGENTA}╔{'═' * 60}╗{Colors.END}") + print( + f"{Colors.BOLD}{Colors.BRIGHT_MAGENTA}║{Colors.BLUE}{f'{Colors.SEARCH} Content Validation Summary':^60}{Colors.BRIGHT_MAGENTA}║{Colors.END}" ) - # print(f"The following {len(validation_errors)} error(s) were found during validation:\n\n{errors_string}\n\nVALIDATION FAILED") - # We quit after validation a single type/group of content because it can cause significant cascading errors in subsequent - # types of content (since they may import or otherwise use it) - raise Exception( - f"The following {len(validation_errors)} error(s) were found during validation:\n\n{errors_string}\n\nVALIDATION FAILED" + print(f"{Colors.BOLD}{Colors.BRIGHT_MAGENTA}╚{'═' * 60}╝{Colors.END}\n") + + print( + f"{Colors.BOLD}{Colors.GREEN}✨ Validation Completed{Colors.END} – Issues detected in {Colors.RED}{Colors.BOLD}{len(validation_errors)}{Colors.END} files.\n" ) + + for index, entry in enumerate(validation_errors, 1): + file_path, error = entry + width = max(70, len(str(file_path)) + 15) + + # File header with numbered emoji + number_emoji = f"{index}️⃣" + print(f"{Colors.YELLOW}┏{'━' * width}┓{Colors.END}") + print( + f"{Colors.YELLOW}┃{Colors.BOLD} {number_emoji} File: {Colors.CYAN}{file_path}{Colors.END}{' ' * (width - len(str(file_path)) - 12)}{Colors.YELLOW}┃{Colors.END}" + ) + print(f"{Colors.YELLOW}┗{'━' * width}┛{Colors.END}") + + print( + f" {Colors.RED}{Colors.BOLD}{Colors.ZAP} Validation Issues:{Colors.END}" + ) + + if isinstance(error, ValidationError): + for err in error.errors(): + error_msg = err.get("msg", "") + if "https://errors.pydantic.dev" in error_msg: + continue + + # Clean error categorization + if "Field required" in error_msg: + print( + f" {Colors.YELLOW}{Colors.WARNING} Field Required: {err.get('loc', [''])[0]}{Colors.END}" + ) + elif "Input should be" in error_msg: + print( + f" {Colors.MAGENTA}{Colors.ARROW} Invalid Value for {err.get('loc', [''])[0]}{Colors.END}" + ) + if "permitted values:" in error_msg: + options = error_msg.split("permitted values:")[ + -1 + ].strip() + print(f" Valid options: {options}") + elif "Extra inputs" in error_msg: + print( + f" {Colors.BLUE}❌ Unexpected Field: {err.get('loc', [''])[0]}{Colors.END}" + ) + elif "Failed to find" in error_msg: + print( + f" {Colors.RED}🔍 Missing Reference: {error_msg}{Colors.END}" + ) + else: + print(f" {Colors.RED}❌ {error_msg}{Colors.END}") + else: + print(f" {Colors.RED}❌ {str(error)}{Colors.END}") + print("") + + # Clean footer with next steps + max_width = max(60, max(len(str(e[0])) + 15 for e in validation_errors)) + print(f"{Colors.BOLD}{Colors.CYAN}╔{'═' * max_width}╗{Colors.END}") + print( + f"{Colors.BOLD}{Colors.CYAN}║{Colors.BLUE}{'🎯 Next Steps':^{max_width}}{Colors.CYAN}║{Colors.END}" + ) + print(f"{Colors.BOLD}{Colors.CYAN}╚{'═' * max_width}╝{Colors.END}\n") + + print( + f"{Colors.GREEN}{Colors.TOOLS} Fix the validation issues in the listed files{Colors.END}" + ) + print( + f"{Colors.YELLOW}{Colors.DOCS} Check the documentation: {Colors.UNDERLINE}https://github.com/splunk/contentctl{Colors.END}" + ) + print( + f"{Colors.BLUE}{Colors.BULB} Use --verbose for detailed error information{Colors.END}\n" + ) + + raise ValidationFailedError( + f"Validation failed with {len(validation_errors)} error(s)" + ) + + # Success case + print( + f"\r{f'{contentType.name.upper()} Progress'.rjust(23)}: [{progress_percent:3.0f}%]... {Colors.GREEN}{Colors.CHECK_MARK} Done!{Colors.END}" + )