diff --git a/docs/docs_src/javascripts/performance.js b/docs/docs_src/javascripts/performance.js index 99c7a58..85e93b8 100644 --- a/docs/docs_src/javascripts/performance.js +++ b/docs/docs_src/javascripts/performance.js @@ -411,6 +411,19 @@ }; // Initialize all performance features + /** + * Initializes various performance optimization features when the DOM is ready. + * @example + * init() + * // Logs "📊 Performance optimization features initialized" + * @param {void} None - This function does not take any parameters. + * @returns {void} This function does not return anything. + * @description + * - Sets up event listener to delay initialization until the DOM is fully loaded. + * - Initializes features like Performance, LazyLoading, Prefetching, ProgressiveEnhancement, and ErrorTracking. + * - Adds a loading progress indicator to the body of the document. + * - Ensures progress bar activation during page navigation. + */ function init() { // Wait for DOM to be ready if (document.readyState === 'loading') { diff --git a/scripts/batch_compliance_check.py b/scripts/batch_compliance_check.py index 42635ce..9a2e85b 100644 --- a/scripts/batch_compliance_check.py +++ b/scripts/batch_compliance_check.py @@ -33,6 +33,15 @@ def is_vcf_or_bcf(filename): return ext.endswith(".vcf") or ext.endswith(".vcf.gz") or ext.endswith(".bcf") def generate_markdown_report(results, directory): + """Generate a markdown report based on compliance check results. + Parameters: + - results (List[Tuple[str, str, bool, str]]): A list of tuples containing the file name, tool used, validity, and error message. + - directory (str): The directory that was checked. + Returns: + - str: A markdown-formatted string representing the compliance report. + Processing Logic: + - Formats each result into a markdown table row, replacing pipe characters in error messages to avoid markdown misinterpretation. + - Marks validity with a check or cross emoji based on the boolean value.""" lines = ["# Batch Compliance Check Report\n", f"Checked directory: `{directory}`\n", "| File | Tool | Valid | Error |", @@ -43,6 +52,17 @@ def generate_markdown_report(results, directory): def generate_color_markdown_report(results, directory): # Uses HTML for color, which works in GitHub/HTML renderers + """Generate a markdown report with colored pass/fail annotations based on compliance check results. + Parameters: + - results (list of tuples): A list where each tuple contains details about the compliance check (`filename`, `tool`, `validity`, `error_message`). + - directory (str): The directory that was checked. + Returns: + - str: A markdown-formatted string that represents the batch compliance check report. + Processing Logic: + - Builds a markdown table with headers and formatted data for each file and tool checked. + - Uses HTML `` elements to apply color styling for pass and fail indicators. + - Replaces any pipe characters in the error messages to avoid markdown table misalignment. + - Appends a legend explaining the color codes used for pass and fail statuses.""" lines = ["# Batch Compliance Check Report\n", f"Checked directory: `{directory}`\n", "| File | Tool | Valid | Error |", @@ -58,6 +78,17 @@ def generate_color_markdown_report(results, directory): return "\n".join(lines) def generate_html_report(results, directory): + """Generates an HTML report summarizing the compliance checks performed on files. + Parameters: + - results (list of tuples): A list where each tuple contains file information from compliance checks. Each tuple consists of four items: file name (str), tool name (str), validation status (bool), and error message (str). + - directory (str): The directory path where the compliance checks were performed. + Returns: + - str: An HTML formatted string representing the compliance check report. + Processing Logic: + - Constructs an HTML document with a styled report table. + - Generates current date and time to include in the report. + - Formats validation status with PASS or FAIL indicators. + - Converts error message pipes ('|') into spaces for better readability.""" now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') html = [ "", @@ -97,6 +128,16 @@ def generate_html_report(results, directory): return "\n".join(html) def main(): + """Batch VCF/BCF Compliance Checker main function. + Parameters: + - None + Returns: + - None + Processing Logic: + - Parses command-line arguments for directory, tool, output file, edge-case generation, notifications, and output format. + - Scans the specified directory for VCF/BCF files and validates them against the selected compliance tool. + - Handles edge-case generation if specified and outputs validation results in the chosen format. + - Saves the compliance report to stdout or a specified file, potentially notifying by saving to a specific folder if the notify option is selected.""" parser = argparse.ArgumentParser(description="Batch VCF/BCF Compliance Checker") parser.add_argument("-d", "--directory", default="sample_data", help="Directory to scan for VCF/BCF files") parser.add_argument("-t", "--tool", default=None, help="Compliance tool to use (bcftools, gatk, or as configured)") diff --git a/scripts/cli_enhanced_validation.py b/scripts/cli_enhanced_validation.py index c9ffc7c..104b379 100644 --- a/scripts/cli_enhanced_validation.py +++ b/scripts/cli_enhanced_validation.py @@ -242,6 +242,17 @@ class CacheManager: """ def __init__(self, cache_dir: str = ".validation-cache", ttl_hours: int = 24, max_size_mb: int = 100): + """Initialize a cache manager with specified directory, time-to-live, and size constraints. + Parameters: + - cache_dir (str): Directory where cached data will be stored. Defaults to ".validation-cache". + - ttl_hours (int): Time-to-live for cached items in hours. Defaults to 24 hours. + - max_size_mb (int): Maximum cache size in megabytes. Defaults to 100MB. + Returns: + - None: This method initializes the cache manager; there is no return value. + Processing Logic: + - Converts ttl_hours to seconds for internal use. + - Ensures that the cache directory exists, creating it if necessary. + - Initializes cache hit/miss statistics counters.""" self.cache_dir = Path(cache_dir) self.ttl_seconds = ttl_hours * 3600 self.max_size_bytes = max_size_mb * 1024 * 1024 @@ -568,6 +579,14 @@ class ValidationConfig: """ def __init__(self, config_file: Optional[str] = None): + """Initializes the ValidationConfig class with default settings and optional configuration file. + Parameters: + - config_file (Optional[str]): The path to a configuration file to override default settings. + Returns: + - None: This constructor does not return any value. + Processing Logic: + - Initializes various default settings related to caching, performance, validation rules, and CI/CD. + - Loads additional configuration settings from the provided file, if specified.""" self.logger = logging.getLogger(__name__ + '.ValidationConfig') # Default configuration diff --git a/src/vcf_agent/config.py b/src/vcf_agent/config.py index 17f2edb..f9f12ea 100644 --- a/src/vcf_agent/config.py +++ b/src/vcf_agent/config.py @@ -137,6 +137,20 @@ def __init__( ollama_base_url: Optional[str] = "http://localhost:11434", memory_optimization: Optional[MemoryOptimizationConfig] = None ): + """Initializes an instance of the class with configuration settings for model interaction. + Parameters: + - raw_mode (Optional[bool]): Indicates if the instance should operate in raw mode; default is None. + - model_provider (Literal["ollama", "openai", "cerebras"]): Specifies the provider of the model, defaults to "ollama". + - credentials_file (Optional[str]): Path to the credentials file, if applicable. + - reference_fasta (Optional[str]): Path to the reference FASTA file, if necessary. + - ollama_model_name (Optional[str]): Name of the Ollama model to use; default is "qwen3:4b". + - ollama_base_url (Optional[str]): Base URL for accessing Ollama model services, defaults to "http://localhost:11434". + - memory_optimization (Optional[MemoryOptimizationConfig]): Configuration for memory optimization, defaults to a new MemoryOptimizationConfig instance. + Returns: + - None: This is a constructor, therefore it does not return a value. + Processing Logic: + - Sets the model provider to a default of "ollama" if not specified. + - Initializes a memory optimization configuration if none is provided.""" self.raw_mode = raw_mode self.model_provider = model_provider self.credentials_file = credentials_file diff --git a/src/vcf_agent/enhanced_tracing.py b/src/vcf_agent/enhanced_tracing.py index 5852ffd..4204ce0 100644 --- a/src/vcf_agent/enhanced_tracing.py +++ b/src/vcf_agent/enhanced_tracing.py @@ -346,6 +346,19 @@ def __init__( tracing_config: Optional[TracingConfig] = None, environment: Optional[str] = None ): + """Initializes a tracer for monitoring service performance and memory optimization. + Parameters: + - service_name (str): Name of the service for which tracing is initialized. + - config (Optional[MemoryOptimizationConfig]): Configuration for optimizing memory usage, defaults to a standard configuration if not provided. + - tracing_config (Optional[TracingConfig]): Configuration for tracing, defaults to environment-based configuration if not provided. + - environment (Optional[str]): Environment setting for tracing, overrides default if specified. + Returns: + - None: This is a constructor method, it does not return a value. + Processing Logic: + - Initializes tracing configuration with environment setting if provided. + - Sets up tracer and sampling based on environmental configurations. + - Initializes performance tracking structures for operations. + - Sets logger to debug mode if tracing in debug mode is enabled.""" self.service_name = service_name self.memory_config = config or MemoryOptimizationConfig() self.tracing_config = tracing_config or TracingConfig.from_environment() @@ -484,7 +497,30 @@ def ai_provider_span(self, provider: Union[str, AIProvider], model: str, operati """Decorator for AI provider operations.""" def decorator(func): @functools.wraps(func) + """Wraps a function to add AI provider span context for tracking execution details. + Parameters: + - func (function): The function to be wrapped, which can be asynchronous or synchronous. + Returns: + - function: The wrapped version of the input function with added span context attributes. + Processing Logic: + - Determines if the function is synchronous or asynchronous and uses the appropriate wrapper. + - Establishes an AI provider span context to track execution details such as function name, arguments, and success status. + - Handles exceptions by setting success attribute to false, while successful executions set it to true.""" async def async_wrapper(*args, **kwargs): + """A wrapper for an asynchronous function that integrates tracing and error handling. + Parameters: + - *args: Arguments to be passed to the asynchronous function. + - **kwargs: Keyword arguments to be passed to the asynchronous function. + - provider (str): The AI provider managing the span context. + - model (str): The model being used for the operation. + - operation (str): The operation being traced. + - func (Callable): The asynchronous function to be wrapped. + Returns: + - Any: The result returned by the asynchronous function. + Processing Logic: + - Executes the wrapped asynchronous function within a tracing span context. + - Sets attributes on the span based on function name, argument counts, and execution success. + - Raises any exception from the asynchronous function after logging it in the span.""" with self.ai_provider_span_context(provider, model, operation) as span: # Add function-specific attributes span.set_ai_attributes( @@ -503,6 +539,16 @@ async def async_wrapper(*args, **kwargs): @functools.wraps(func) def sync_wrapper(*args, **kwargs): + """Syncs function calls within an AI provider span context for monitoring and tracking. + Parameters: + - *args: Variable length argument list for the target function. + - **kwargs: Arbitrary keyword arguments for the target function. + Returns: + - Any: The result of the function `func` when called with provided arguments. + Processing Logic: + - The synchronous execution of the function is wrapped in a monitoring span context. + - Function metadata like name and argument count are logged in the span. + - Success or failure of the function execution is recorded as span attributes.""" with self.ai_provider_span_context(provider, model, operation) as span: span.set_ai_attributes( function_name=func.__name__, @@ -531,7 +577,29 @@ def vcf_operation_span(self, operation: str): """Decorator for VCF processing operations.""" def decorator(func): @functools.wraps(func) + """Apply a decorator to a function to perform context logging during its execution. + Parameters: + - func (callable): The function to be wrapped by the decorator. + Returns: + - callable: A wrapped function that logs its execution context including arguments and success status. + Processing Logic: + - Establish a context for the operation before function execution. + - Log function name, argument count, and keyword argument count. + - Execute the function within this context. + - Capture and log the success status or handle exceptions accordingly.""" def wrapper(*args, **kwargs): + """Wrapper function for executing a given function within a VCF operation context, recording attributes about the operation. + Parameters: + - *args: Variable length argument list passed to the wrapped function. + - **kwargs: Arbitrary keyword arguments passed to the wrapped function. + - operation (Operation object): The context of VCF operation during execution. + Returns: + - any: Whatever the wrapped function returns upon successful execution. + Processing Logic: + - The function execution is wrapped within a context manager. + - VCF operation attributes such as function name and argument counts are recorded. + - Success or failure of the function invocation is captured in the context attributes. + - Raises any exception encountered during function execution after setting 'success' to False.""" with self.vcf_operation_context(operation) as span: span.set_vcf_attributes( function_name=func.__name__, @@ -554,7 +622,28 @@ def memory_operation_span(self, operation: str): """Decorator for memory optimization operations.""" def decorator(func): @functools.wraps(func) + """A decorator for function memory context management. + Parameters: + - func (callable): The function to be wrapped and managed within a memory context. + Returns: + - callable: The wrapped function with added memory context and attribute management. + Processing Logic: + - Opens a memory context before executing the function, capturing it in a span. + - Sets memory attributes such as function name and optimization level. + - Upon successful function execution, sets a success attribute in the span. + - On exception, sets a failure attribute and re-raises the exception.""" def wrapper(*args, **kwargs): + """Wrapper function to execute another function while managing memory context and capturing its attributes. + Parameters: + - *args: Positional arguments to pass to the `func`. + - **kwargs: Keyword arguments to pass to the `func`. + Returns: + - The return value of the executed function `func`. + Processing Logic: + - Establishes a memory context using `self.memory_context`. + - Sets attributes related to the function name and optimization level. + - Executes the given function, capturing success or failure state. + - Raises any exceptions encountered during execution.""" with self.memory_context(operation) as span: span.set_memory_attributes( function_name=func.__name__, diff --git a/src/vcf_agent/graph_integration.py b/src/vcf_agent/graph_integration.py index 148bdb5..5020d74 100644 --- a/src/vcf_agent/graph_integration.py +++ b/src/vcf_agent/graph_integration.py @@ -283,6 +283,17 @@ def link_variant_to_sample(conn: kuzu.Connection, sample_id: str, variant_id: st raise def execute_query(conn: kuzu.Connection, cypher_query: str, params: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]: + """Executes a Cypher query using the Kuzu connection and returns the results as a list of dictionaries. + Parameters: + - conn (kuzu.Connection): The database connection object to execute the query. + - cypher_query (str): The Cypher query to execute. + - params (Optional[Dict[str, Any]]): Parameters to include in the query, defaults to None. + Returns: + - List[Dict[str, Any]]: A list of dictionaries representing the query results. + Processing Logic: + - Handles both single and list results from the query execution, focusing on the first query result in case of a list. + - Converts the query result to a Pandas DataFrame and then to a list of dictionaries. + - Contains error handling for unexpected types and ensures cleanup of resources.""" query_result_union: Optional[Union[kuzu.query_result.QueryResult, List[kuzu.query_result.QueryResult]]] = None try: print(f"Executing Cypher query: {cypher_query} with params: {params}") diff --git a/src/vcf_agent/lancedb_integration.py b/src/vcf_agent/lancedb_integration.py index 512e853..a91c57e 100644 --- a/src/vcf_agent/lancedb_integration.py +++ b/src/vcf_agent/lancedb_integration.py @@ -293,6 +293,17 @@ class DimensionReducer: Integrated directly into the embedding service. """ def __init__(self, target_dimensions: int = 768): + """Initialize an object for dimensionality reduction using PCA. + Parameters: + - target_dimensions (int): Desired number of dimensions for the reduced dataset. Defaults to 768. + Returns: + - None: The function does not return any value. It sets up internal state for future processing. + Processing Logic: + - Sets up initial dimensions and checks for availability of scikit-learn. + - Initializes model attributes such as PCA model and scaler as None. + - Flags is_trained to False, indicating model needs training. + - Prepares to store training embeddings and specifies a minimum sample size for reliable PCA application. + - Issues a warning when scikit-learn is unavailable, showing dimension reduction feature will not work.""" self.target_dimensions = target_dimensions self.original_dimensions = 1536 self.pca_model = None diff --git a/src/vcf_agent/metrics.py b/src/vcf_agent/metrics.py index 97527ff..0935f88 100644 --- a/src/vcf_agent/metrics.py +++ b/src/vcf_agent/metrics.py @@ -173,6 +173,18 @@ def safe_register_metric(metric, registry=None): # --- Structlog OTel Processor --- def add_otel_context(_, __, event_dict): + """Add OpenTelemetry (OTel) context information to the event dictionary. + Parameters: + - _ (unused): Placeholder for the first parameter that is not used. + - __ (unused): Placeholder for the second parameter that is not used. + - event_dict (dict): The dictionary to which OTel context information will be added. + Returns: + - dict: The updated event dictionary with OTel context information added, if available. + Processing Logic: + - Extracts the current span using OpenTelemetry tracing utilities. + - Checks if the span is recording and has a valid span context. + - Retrieves and formats the trace ID and span ID from the span context. + - Attempts to get the service name from the span's associated resource attributes, using a common method or resorting to private attributes if necessary.""" current_span = trace.get_current_span() if isinstance(current_span, Span) and current_span.is_recording(): span_context = current_span.get_span_context() @@ -197,6 +209,15 @@ def add_otel_context(_, __, event_dict): def setup_logging(): # Basic logging configuration (can be overridden by app-level config) + """Setup logging configuration using structlog. + Parameters: + - None + Returns: + - BoundLogger: A configured logger object ready for structured logging. + Processing Logic: + - Sets basic logging configuration using environment variable or defaults to 'INFO'. + - Configures structlog with processors for adding logger name and level and formatting logs. + - Customizes log rendering for better readability, optionally with JSON output.""" logging.basicConfig(level=os.environ.get("LOG_LEVEL", "INFO").upper(), format="%(message)s") structlog.configure( processors=[ @@ -345,6 +366,15 @@ def setup_logging(): _metrics_server_thread = None def start_metrics_http_server(port: Optional[int] = None): + """Starts a Prometheus metrics HTTP server on the specified port. + Parameters: + - port (Optional[int]): Port number for the metrics server. Uses a default port if not provided. + Returns: + - None + Processing Logic: + - Initializes a background thread to start the HTTP server using the specified registry configuration. + - Ensures the server is only started if it is not running already. + - Logs all outcomes - successful start, server already running, or errors encountered during startup.""" global _metrics_server_thread actual_port = port if port is not None else METRICS_HTTP_PORT if _metrics_server_thread is None or not _metrics_server_thread.is_alive(): @@ -430,6 +460,22 @@ def observe_ai_interaction( error_type: Optional[str] = None ): # Increment request and error counters + """Observe and record metrics related to AI interaction. + Parameters: + - model_provider (str): Name of the AI model provider. + - endpoint_task (str): Specific task or endpoint being invoked. + - duration_seconds (float): Duration of the AI interaction in seconds. + - prompt_tokens (Optional[int]): Number of prompt tokens used in the request, if applicable. + - completion_tokens (Optional[int]): Number of completion tokens generated, if applicable. + - total_tokens (Optional[int]): Total number of tokens used when prompt and completion tokens are not specified. + - success (bool): Whether the interaction was successful or not, default is True. + - error_type (Optional[str]): Type of error encountered if the interaction was not successful. + Returns: + - None: The function does not return a value; it updates metrics. + Processing Logic: + - Inc/observe appropriate metrics based on success or failure of interaction. + - Record error type unless specified otherwise. + - Avoid double counting of tokens when prompt/completion tokens are specified along with total tokens.""" status_str = "success" if success else "error" VCF_AGENT_AI_REQUESTS_TOTAL.labels(model_provider=model_provider, endpoint_task=endpoint_task, status=status_str).inc() diff --git a/src/vcf_agent/optimizations.py b/src/vcf_agent/optimizations.py index c4626fc..744d55c 100644 --- a/src/vcf_agent/optimizations.py +++ b/src/vcf_agent/optimizations.py @@ -43,6 +43,17 @@ class EmbeddingCache: """Thread-safe LRU cache for embeddings with optimized persistence.""" def __init__(self, max_size: int = 10000, persist_file: Optional[str] = None, max_cache_file_size_mb: int = 50): + """Initialize a cache object with specific constraints and optional persistence. + Parameters: + - max_size (int): Maximum number of items the cache can hold. Default is 10000. + - persist_file (Optional[str]): File path for persistence option. If provided, cache can be saved to or loaded from this file. + - max_cache_file_size_mb (int): Maximum size of the cache file in megabytes. Default is 50. + Returns: + - None: The constructor initializes attributes and loads cache if persist_file is specified. + Processing Logic: + - Initializes a thread-safe cache and tracks access order for eviction purposes. + - Sets up attributes for managing cache size and persistence options. + - Tries to load cache data from persist_file if the file is provided.""" self.max_size = max_size self.persist_file = persist_file self.max_cache_file_size_mb = max_cache_file_size_mb @@ -197,6 +208,17 @@ class OptimizedEmbeddingService: """ def __init__(self, base_service, config: OptimizationConfig): + """Initialize the OptimizedEmbeddingService with caching capabilities. + Parameters: + - base_service: The base service to be optimized with embedding caching. + - config (OptimizationConfig): Configuration object for the embedding optimization settings. + Returns: + - None + Processing Logic: + - Initializes with MemoryAwareEmbeddingCache if Phase 2 is available and caching is enabled. + - Registers the cache with the Phase 2 recovery system if applicable. + - Falls back to an original EmbeddingCache if Phase 2 is not available but caching is enabled. + - Sets up internal statistics tracking for cache usage and embedding requests.""" self.base_service = base_service self.config = config @@ -516,6 +538,15 @@ class PerformanceOptimizer: """Main optimization coordinator.""" def __init__(self, config: OptimizationConfig = None): + """Initializes an optimization system with configurable components. + Parameters: + - config (OptimizationConfig, optional): Configuration settings for optimization. Defaults to a new OptimizationConfig instance if not provided. + Returns: + - None: This function does not return a value. + Processing Logic: + - Initializes `embedding_cache` to None. + - Sets up `query_batcher`, `async_processor`, and `memory_optimizer` using the provided or default configuration. + - Initializes a statistics dictionary to track applied optimizations, time saved, and memory saved.""" self.config = config or OptimizationConfig() self.embedding_cache = None self.query_batcher = QueryBatcher(self.config) diff --git a/src/vcf_agent/phase5/iggy_processor.py b/src/vcf_agent/phase5/iggy_processor.py index b691ebb..1cc72b6 100644 --- a/src/vcf_agent/phase5/iggy_processor.py +++ b/src/vcf_agent/phase5/iggy_processor.py @@ -111,6 +111,15 @@ class IggyConnectionManager: """ def __init__(self, config: Phase5Config): + """Initializes the necessary configurations and metrics for managing Iggy clients. + Parameters: + - config (Phase5Config): Configuration object that contains settings for Iggy clients. + Returns: + - None: This is an initializer and does not return a value. + Processing Logic: + - Sets up initial connection health check values and maximum failure thresholds. + - Configures stream and topic names from Iggy configuration. + - Initializes metric counters for tracking connections and connection errors.""" self.config = config self.iggy_config = config.iggy self.clients: Dict[str, IggyClient] = {} diff --git a/src/vcf_agent/phase5/kafka_processor.py b/src/vcf_agent/phase5/kafka_processor.py index c7e02fb..25e70c7 100644 --- a/src/vcf_agent/phase5/kafka_processor.py +++ b/src/vcf_agent/phase5/kafka_processor.py @@ -130,6 +130,16 @@ class KafkaConnectionManager: """ def __init__(self, config: Phase5Config): + """Initializes the Kafka connection configuration and tracking metrics. + Parameters: + - config (Phase5Config): Configuration object for initializing Kafka connections. + Returns: + - None: The constructor does not return a value. + Processing Logic: + - Initializes the Kafka producer and consumer dictionaries for storing connection instances. + - Sets initial health status and tracks the last health check time. + - Configures maximum consecutive failure threshold for connection health. + - Sets up metric counters for monitoring the total number of Kafka connections and errors.""" self.config = config self.kafka_config = config.kafka self.producers: Dict[str, KafkaProducer] = {} diff --git a/src/vcf_agent/phase5/monitoring.py b/src/vcf_agent/phase5/monitoring.py index 16e2184..72e4157 100644 --- a/src/vcf_agent/phase5/monitoring.py +++ b/src/vcf_agent/phase5/monitoring.py @@ -70,6 +70,18 @@ class PrometheusMetrics: def __init__(self): # VCF Processing Metrics + """Initializes various metrics for monitoring VCF processing and system performance. + Parameters: + None + Returns: + None + Processing Logic: + - Sets up counters, histograms, and gauges to track VCF processing, platform health, + system resource usage, message metrics, errors, and performance targets. + - Metrics are categorized into VCF processing, platform health, system resources, + message properties, error tracking, and performance monitoring. + - Specific labels are applied to track dimensions such as platform, chromosome, status, + component, error type, severity, compression, and others crucial for detailed monitoring.""" self.variants_processed_total = Counter( 'vcf_variants_processed_total', 'Total number of VCF variants processed', @@ -303,6 +315,15 @@ class IggyMetricsCollector: """ def __init__(self): + """Initializes an object with monitoring attributes for Apache Iggy operations. + Parameters: + - None + Returns: + - None + Processing Logic: + - Initializes a deque to maintain a history of up to 100 metrics records. + - Sets up a CircuitBreaker with specific failure and recovery parameters to handle operation failures. + - Creates and assigns performance metrics for latency, throughput, and health using a meter object.""" self.is_running = False self.metrics_history: deque = deque(maxlen=100) self.circuit_breaker = CircuitBreaker( @@ -389,6 +410,15 @@ class KafkaMetricsCollector: """ def __init__(self): + """Initializes the Kafka monitoring and circuit breaking mechanism. + Parameters: + None + Returns: + None + Processing Logic: + - Sets up a circuit breaker with specified thresholds for monitoring Kafka operations. + - Initializes performance metrics using histograms and gauges for latency, throughput, health, and consumer lag. + - The metrics history is stored in a deque with a maximum length of 100 for efficient tracking.""" self.is_running = False self.metrics_history: deque = deque(maxlen=100) self.circuit_breaker = CircuitBreaker( diff --git a/src/vcf_agent/vcf_ingestion.py b/src/vcf_agent/vcf_ingestion.py index e21f61c..9b337ff 100644 --- a/src/vcf_agent/vcf_ingestion.py +++ b/src/vcf_agent/vcf_ingestion.py @@ -143,6 +143,17 @@ class VCFStreamer: """Memory-efficient VCF file streaming processor.""" def __init__(self, vcf_path: str, batch_size: int = 1000, resume_from: Optional[str] = None): + """Initializes an instance with paths and settings for processing VCF files. + Parameters: + - vcf_path (str): Path to the VCF file to be processed. + - batch_size (int): Number of records to process at a time, defaults to 1000. + - resume_from (Optional[str]): Position to resume processing from, in format 'chrom:pos'. + Returns: + - None + Processing Logic: + - Parses the 'resume_from' parameter to extract the chromosome and position if provided. + - Logs a warning if the 'resume_from' format is invalid. + - Sets the current processing position to None initially.""" self.vcf_path = vcf_path self.batch_size = batch_size self.resume_from = resume_from @@ -278,6 +289,16 @@ class VCFIngestionPipeline: """Main VCF ingestion pipeline orchestrator.""" def __init__(self, config: IngestionConfig): + """Initialize the ingestion process with configuration settings. + Parameters: + - config (IngestionConfig): The configuration settings for the ingestion process which includes file paths, batch size, and embedding dimensions. + Returns: + - None + Processing Logic: + - Initializes the VCFStreamer with the specified file, batch size, and resume point from the configuration. + - Sets up an EmbeddingGenerator with the specified embedding dimensions from the configuration. + - Establishes initial null state for database connections which will be setup later as required. + - Initializes the progress tracking mechanism and prepares results tracking.""" self.config = config self.validator = VCFValidator() self.streamer = VCFStreamer(