diff --git a/src/semantic-router/cmd/main.go b/src/semantic-router/cmd/main.go index 016672cc2..c6b5b070d 100644 --- a/src/semantic-router/cmd/main.go +++ b/src/semantic-router/cmd/main.go @@ -18,6 +18,7 @@ import ( "github.com/vllm-project/semantic-router/src/semantic-router/pkg/extproc" "github.com/vllm-project/semantic-router/src/semantic-router/pkg/k8s" "github.com/vllm-project/semantic-router/src/semantic-router/pkg/observability/logging" + "github.com/vllm-project/semantic-router/src/semantic-router/pkg/observability/metrics" "github.com/vllm-project/semantic-router/src/semantic-router/pkg/observability/tracing" ) @@ -87,6 +88,16 @@ func main() { }() } + // Initialize windowed metrics if enabled + if cfg.Observability.Metrics.WindowedMetrics.Enabled { + logging.Infof("Initializing windowed metrics for load balancing...") + if initErr := metrics.InitializeWindowedMetrics(cfg.Observability.Metrics.WindowedMetrics); initErr != nil { + logging.Warnf("Failed to initialize windowed metrics: %v", initErr) + } else { + logging.Infof("Windowed metrics initialized successfully") + } + } + // Set up signal handling for graceful shutdown sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) diff --git a/src/semantic-router/pkg/config/config.go b/src/semantic-router/pkg/config/config.go index 01980f384..7197eedbb 100644 --- a/src/semantic-router/pkg/config/config.go +++ b/src/semantic-router/pkg/config/config.go @@ -262,15 +262,43 @@ type APIConfig struct { type ObservabilityConfig struct { // Tracing configuration for distributed tracing Tracing TracingConfig `yaml:"tracing"` - // Metrics configuration for Prometheus metrics endpoint + + // Metrics configuration for enhanced metrics collection Metrics MetricsConfig `yaml:"metrics"` } -// MetricsConfig represents configuration for metrics endpoint +// MetricsConfig represents configuration for metrics collection type MetricsConfig struct { // Enabled controls whether the Prometheus metrics endpoint is served // When omitted, defaults to true Enabled *bool `yaml:"enabled,omitempty"` + + // Enable windowed metrics collection for load balancing + WindowedMetrics WindowedMetricsConfig `yaml:"windowed_metrics"` +} + +// WindowedMetricsConfig represents configuration for time-windowed metrics +type WindowedMetricsConfig struct { + // Enable windowed metrics collection + Enabled bool `yaml:"enabled"` + + // Time windows to track (in duration format, e.g., "1m", "5m", "15m", "1h", "24h") + // Default: ["1m", "5m", "15m", "1h", "24h"] + TimeWindows []string `yaml:"time_windows,omitempty"` + + // Update interval for windowed metrics computation (e.g., "10s", "30s") + // Default: "10s" + UpdateInterval string `yaml:"update_interval,omitempty"` + + // Enable model-level metrics tracking + ModelMetrics bool `yaml:"model_metrics"` + + // Enable queue depth estimation + QueueDepthEstimation bool `yaml:"queue_depth_estimation"` + + // Maximum number of models to track (to prevent cardinality explosion) + // Default: 100 + MaxModels int `yaml:"max_models,omitempty"` } // TracingConfig represents configuration for distributed tracing diff --git a/src/semantic-router/pkg/extproc/processor_req_body.go b/src/semantic-router/pkg/extproc/processor_req_body.go index 14c1160a4..986424219 100644 --- a/src/semantic-router/pkg/extproc/processor_req_body.go +++ b/src/semantic-router/pkg/extproc/processor_req_body.go @@ -229,6 +229,12 @@ func (r *OpenAIRouter) selectEndpointForModel(ctx *RequestContext, model string) backendSpan.End() ctx.TraceContext = backendCtx + // Store the selected endpoint in context (for routing/logging purposes) + ctx.SelectedEndpoint = endpointAddress + + // Increment active request count for queue depth estimation (model-level) + metrics.IncrementModelActiveRequests(model) + return endpointAddress } diff --git a/src/semantic-router/pkg/extproc/processor_req_header.go b/src/semantic-router/pkg/extproc/processor_req_header.go index e0fce4ac7..be2208764 100644 --- a/src/semantic-router/pkg/extproc/processor_req_header.go +++ b/src/semantic-router/pkg/extproc/processor_req_header.go @@ -42,6 +42,9 @@ type RequestContext struct { VSRInjectedSystemPrompt bool // Whether a system prompt was injected into the request VSRSelectedDecision *config.Decision // The decision object selected by DecisionEngine (for plugins) + // Endpoint tracking for windowed metrics + SelectedEndpoint string // The endpoint address selected for this request + // Tracing context TraceContext context.Context // OpenTelemetry trace context for span propagation } diff --git a/src/semantic-router/pkg/extproc/processor_res_body.go b/src/semantic-router/pkg/extproc/processor_res_body.go index a5d6c25e7..5e15c0aa4 100644 --- a/src/semantic-router/pkg/extproc/processor_res_body.go +++ b/src/semantic-router/pkg/extproc/processor_res_body.go @@ -15,6 +15,9 @@ import ( func (r *OpenAIRouter) handleResponseBody(v *ext_proc.ProcessingRequest_ResponseBody, ctx *RequestContext) (*ext_proc.ProcessingResponse, error) { completionLatency := time.Since(ctx.StartTime) + // Decrement active request count for queue depth estimation + defer metrics.DecrementModelActiveRequests(ctx.RequestModel) + // Process the response for caching responseBody := v.ResponseBody.Body @@ -68,6 +71,16 @@ func (r *OpenAIRouter) handleResponseBody(v *ext_proc.ProcessingRequest_Response metrics.RecordModelTPOT(ctx.RequestModel, timePerToken) } + // Record windowed model metrics for load balancing + metrics.RecordModelWindowedRequest( + ctx.RequestModel, + completionLatency.Seconds(), + int64(promptTokens), + int64(completionTokens), + false, // isError + false, // isTimeout + ) + // Compute and record cost if pricing is configured if r.Config != nil { promptRatePer1M, completionRatePer1M, currency, ok := r.Config.GetModelPricing(ctx.RequestModel) diff --git a/src/semantic-router/pkg/observability/metrics/windowed_metrics.go b/src/semantic-router/pkg/observability/metrics/windowed_metrics.go new file mode 100644 index 000000000..de88910af --- /dev/null +++ b/src/semantic-router/pkg/observability/metrics/windowed_metrics.go @@ -0,0 +1,585 @@ +package metrics + +import ( + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/vllm-project/semantic-router/src/semantic-router/pkg/config" + "github.com/vllm-project/semantic-router/src/semantic-router/pkg/consts" +) + +// Default time windows for windowed metrics +var DefaultTimeWindows = []string{"1m", "5m", "15m", "1h", "24h"} + +// Default update interval for computing windowed metrics +const DefaultUpdateInterval = 10 * time.Second + +// Default maximum models to track +const DefaultMaxModels = 100 + +// WindowedMetricsManager manages time-windowed metrics for load balancing +type WindowedMetricsManager struct { + config config.WindowedMetricsConfig + timeWindows []time.Duration + windowLabels []string + updateInterval time.Duration + maxModels int + + // Ring buffers for storing request data per model + requestBuffers map[string]*RequestRingBuffer + bufferMutex sync.RWMutex + + // Active request tracking for queue depth estimation + activeRequests map[string]int64 + activeMutex sync.RWMutex + + // Stop channel for background goroutine + stopChan chan struct{} + running bool +} + +// RequestData represents a single request's data for windowed tracking +type RequestData struct { + Timestamp time.Time + Model string + LatencySeconds float64 + PromptTokens int64 + CompletionTokens int64 + IsError bool + IsTimeout bool +} + +// RequestRingBuffer is a time-based ring buffer for storing request data +type RequestRingBuffer struct { + data []RequestData + head int + size int + capacity int + mutex sync.RWMutex +} + +// NewRequestRingBuffer creates a new ring buffer with the given capacity +func NewRequestRingBuffer(capacity int) *RequestRingBuffer { + return &RequestRingBuffer{ + data: make([]RequestData, capacity), + capacity: capacity, + } +} + +// Add adds a request to the ring buffer +func (rb *RequestRingBuffer) Add(req RequestData) { + rb.mutex.Lock() + defer rb.mutex.Unlock() + + rb.data[rb.head] = req + rb.head = (rb.head + 1) % rb.capacity + if rb.size < rb.capacity { + rb.size++ + } +} + +// GetDataSince returns all request data since the given time +func (rb *RequestRingBuffer) GetDataSince(since time.Time) []RequestData { + rb.mutex.RLock() + defer rb.mutex.RUnlock() + + result := make([]RequestData, 0, rb.size) + for i := 0; i < rb.size; i++ { + idx := (rb.head - rb.size + i + rb.capacity) % rb.capacity + if !rb.data[idx].Timestamp.Before(since) { + result = append(result, rb.data[idx]) + } + } + return result +} + +// Prometheus metrics for windowed model tracking +var ( + // ModelLatencyWindowed tracks latency by model and time window + ModelLatencyWindowed *prometheus.GaugeVec + + // ModelRequestsWindowed tracks request counts by model and time window + ModelRequestsWindowed *prometheus.GaugeVec + + // ModelTokensWindowed tracks token throughput by model, token type, and time window + ModelTokensWindowed *prometheus.GaugeVec + + // ModelUtilization tracks utilization percentage by model and time window + ModelUtilization *prometheus.GaugeVec + + // ModelQueueDepth tracks estimated queue depth by model + ModelQueueDepth *prometheus.GaugeVec + + // ModelErrorRate tracks error rate by model and time window + ModelErrorRate *prometheus.GaugeVec + + // ModelLatencyP50 tracks P50 latency by model and time window + ModelLatencyP50 *prometheus.GaugeVec + + // ModelLatencyP95 tracks P95 latency by model and time window + ModelLatencyP95 *prometheus.GaugeVec + + // ModelLatencyP99 tracks P99 latency by model and time window + ModelLatencyP99 *prometheus.GaugeVec + + windowedMetricsInitOnce sync.Once +) + +// Global instance of WindowedMetricsManager +var ( + globalWindowedManager *WindowedMetricsManager + globalWindowedManagerMutex sync.RWMutex +) + +// InitializeWindowedMetrics initializes the windowed metrics system +func InitializeWindowedMetrics(cfg config.WindowedMetricsConfig) error { + windowedMetricsInitOnce.Do(func() { + // Initialize Prometheus metrics + ModelLatencyWindowed = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "llm_model_latency_windowed_seconds", + Help: "Average latency by model and time window", + }, + []string{"model", "time_window"}, + ) + + ModelRequestsWindowed = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "llm_model_requests_windowed_total", + Help: "Total requests by model and time window", + }, + []string{"model", "time_window"}, + ) + + ModelTokensWindowed = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "llm_model_tokens_windowed_total", + Help: "Total tokens by model, token type, and time window", + }, + []string{"model", "token_type", "time_window"}, + ) + + ModelUtilization = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "llm_model_utilization_percentage", + Help: "Estimated utilization percentage by model and time window", + }, + []string{"model", "time_window"}, + ) + + ModelQueueDepth = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "llm_model_queue_depth_estimated", + Help: "Estimated queue depth by model", + }, + []string{"model"}, + ) + + ModelErrorRate = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "llm_model_error_rate_windowed", + Help: "Error rate by model and time window", + }, + []string{"model", "time_window"}, + ) + + ModelLatencyP50 = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "llm_model_latency_p50_windowed_seconds", + Help: "P50 latency by model and time window", + }, + []string{"model", "time_window"}, + ) + + ModelLatencyP95 = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "llm_model_latency_p95_windowed_seconds", + Help: "P95 latency by model and time window", + }, + []string{"model", "time_window"}, + ) + + ModelLatencyP99 = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "llm_model_latency_p99_windowed_seconds", + Help: "P99 latency by model and time window", + }, + []string{"model", "time_window"}, + ) + }) + + // Create and start the manager + manager, err := NewWindowedMetricsManager(cfg) + if err != nil { + return err + } + + globalWindowedManagerMutex.Lock() + globalWindowedManager = manager + globalWindowedManagerMutex.Unlock() + + manager.Start() + return nil +} + +// NewWindowedMetricsManager creates a new WindowedMetricsManager +func NewWindowedMetricsManager(cfg config.WindowedMetricsConfig) (*WindowedMetricsManager, error) { + // Parse time windows + windowStrings := cfg.TimeWindows + if len(windowStrings) == 0 { + windowStrings = DefaultTimeWindows + } + + timeWindows := make([]time.Duration, 0, len(windowStrings)) + windowLabels := make([]string, 0, len(windowStrings)) + for _, ws := range windowStrings { + d, parseErr := time.ParseDuration(ws) + if parseErr != nil { + // Skip invalid durations + continue + } + timeWindows = append(timeWindows, d) + windowLabels = append(windowLabels, ws) + } + + // Parse update interval + updateInterval := DefaultUpdateInterval + if cfg.UpdateInterval != "" { + if d, parseErr := time.ParseDuration(cfg.UpdateInterval); parseErr == nil { + updateInterval = d + } + } + + // Set max models + maxModels := cfg.MaxModels + if maxModels <= 0 { + maxModels = DefaultMaxModels + } + + return &WindowedMetricsManager{ + config: cfg, + timeWindows: timeWindows, + windowLabels: windowLabels, + updateInterval: updateInterval, + maxModels: maxModels, + requestBuffers: make(map[string]*RequestRingBuffer), + activeRequests: make(map[string]int64), + stopChan: make(chan struct{}), + }, nil +} + +// Start begins the background metrics computation goroutine +func (m *WindowedMetricsManager) Start() { + if m.running { + return + } + m.running = true + + go func() { + ticker := time.NewTicker(m.updateInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + m.computeWindowedMetrics() + case <-m.stopChan: + return + } + } + }() +} + +// Stop stops the background metrics computation +func (m *WindowedMetricsManager) Stop() { + if !m.running { + return + } + close(m.stopChan) + m.running = false +} + +// RecordRequest records a request for windowed metrics tracking +func (m *WindowedMetricsManager) RecordRequest(req RequestData) { + if !m.config.Enabled { + return + } + + key := req.Model + + m.bufferMutex.Lock() + buffer, exists := m.requestBuffers[key] + if !exists { + // Check if we've hit max models + if len(m.requestBuffers) >= m.maxModels { + m.bufferMutex.Unlock() + return + } + buffer = NewRequestRingBuffer(10000) // Adjust capacity as needed + m.requestBuffers[key] = buffer + } + m.bufferMutex.Unlock() + + buffer.Add(req) +} + +// IncrementActiveRequests increments the active request count for queue depth tracking +func (m *WindowedMetricsManager) IncrementActiveRequests(model string) { + if !m.config.QueueDepthEstimation { + return + } + + key := model + + m.activeMutex.Lock() + m.activeRequests[key]++ + count := m.activeRequests[key] + m.activeMutex.Unlock() + + // Update the gauge immediately (only if metrics are initialized) + if ModelQueueDepth == nil { + return + } + if model == "" { + model = consts.UnknownLabel + } + ModelQueueDepth.WithLabelValues(model).Set(float64(count)) +} + +// DecrementActiveRequests decrements the active request count +func (m *WindowedMetricsManager) DecrementActiveRequests(model string) { + if !m.config.QueueDepthEstimation { + return + } + + key := model + + m.activeMutex.Lock() + m.activeRequests[key]-- + if m.activeRequests[key] < 0 { + m.activeRequests[key] = 0 + } + count := m.activeRequests[key] + m.activeMutex.Unlock() + + // Update the gauge immediately (only if metrics are initialized) + if ModelQueueDepth == nil { + return + } + if model == "" { + model = consts.UnknownLabel + } + ModelQueueDepth.WithLabelValues(model).Set(float64(count)) +} + +// computeWindowedMetrics computes all windowed metrics +func (m *WindowedMetricsManager) computeWindowedMetrics() { + now := time.Now() + + m.bufferMutex.RLock() + buffers := make(map[string]*RequestRingBuffer, len(m.requestBuffers)) + for k, v := range m.requestBuffers { + buffers[k] = v + } + m.bufferMutex.RUnlock() + + // Compute metrics for each model and each time window + for model, buffer := range buffers { + if model == "" { + model = consts.UnknownLabel + } + + for i, window := range m.timeWindows { + windowLabel := m.windowLabels[i] + since := now.Add(-window) + data := buffer.GetDataSince(since) + + if len(data) == 0 { + // Set zero values for empty windows + ModelRequestsWindowed.WithLabelValues(model, windowLabel).Set(0) + ModelLatencyWindowed.WithLabelValues(model, windowLabel).Set(0) + ModelErrorRate.WithLabelValues(model, windowLabel).Set(0) + continue + } + + // Compute metrics + var totalLatency float64 + var totalPromptTokens, totalCompletionTokens int64 + var errorCount int + latencies := make([]float64, 0, len(data)) + + for _, d := range data { + totalLatency += d.LatencySeconds + totalPromptTokens += d.PromptTokens + totalCompletionTokens += d.CompletionTokens + latencies = append(latencies, d.LatencySeconds) + if d.IsError || d.IsTimeout { + errorCount++ + } + } + + requestCount := float64(len(data)) + avgLatency := totalLatency / requestCount + errorRate := float64(errorCount) / requestCount + + // Update Prometheus metrics + ModelRequestsWindowed.WithLabelValues(model, windowLabel).Set(requestCount) + ModelLatencyWindowed.WithLabelValues(model, windowLabel).Set(avgLatency) + ModelTokensWindowed.WithLabelValues(model, "prompt", windowLabel).Set(float64(totalPromptTokens)) + ModelTokensWindowed.WithLabelValues(model, "completion", windowLabel).Set(float64(totalCompletionTokens)) + ModelErrorRate.WithLabelValues(model, windowLabel).Set(errorRate) + + // Compute percentiles + if len(latencies) > 0 { + p50 := computePercentile(latencies, 0.50) + p95 := computePercentile(latencies, 0.95) + p99 := computePercentile(latencies, 0.99) + + ModelLatencyP50.WithLabelValues(model, windowLabel).Set(p50) + ModelLatencyP95.WithLabelValues(model, windowLabel).Set(p95) + ModelLatencyP99.WithLabelValues(model, windowLabel).Set(p99) + } + + // Compute utilization (requests per second / expected capacity) + // This is a simple approximation based on request rate + requestsPerSecond := requestCount / window.Seconds() + // Assume 100 req/s as theoretical max for utilization calculation + // This can be made configurable + utilization := (requestsPerSecond / 100.0) * 100.0 + if utilization > 100.0 { + utilization = 100.0 + } + ModelUtilization.WithLabelValues(model, windowLabel).Set(utilization) + } + } +} + +// computePercentile computes the given percentile from a slice of values +func computePercentile(values []float64, percentile float64) float64 { + if len(values) == 0 { + return 0 + } + + // Sort the values + sorted := make([]float64, len(values)) + copy(sorted, values) + sortFloat64s(sorted) + + // Calculate the index + index := percentile * float64(len(sorted)-1) + lower := int(index) + upper := lower + 1 + + if upper >= len(sorted) { + return sorted[len(sorted)-1] + } + + // Linear interpolation + weight := index - float64(lower) + return sorted[lower]*(1-weight) + sorted[upper]*weight +} + +// sortFloat64s sorts a slice of float64 in ascending order +func sortFloat64s(a []float64) { + // Simple insertion sort for small slices, quick sort for larger + if len(a) < 12 { + for i := 1; i < len(a); i++ { + for j := i; j > 0 && a[j] < a[j-1]; j-- { + a[j], a[j-1] = a[j-1], a[j] + } + } + return + } + + // Quick sort + quickSort(a, 0, len(a)-1) +} + +func quickSort(a []float64, low, high int) { + if low < high { + p := partition(a, low, high) + quickSort(a, low, p-1) + quickSort(a, p+1, high) + } +} + +func partition(a []float64, low, high int) int { + pivot := a[high] + i := low - 1 + for j := low; j < high; j++ { + if a[j] <= pivot { + i++ + a[i], a[j] = a[j], a[i] + } + } + a[i+1], a[high] = a[high], a[i+1] + return i + 1 +} + +// Global helper functions for recording windowed metrics + +// RecordModelWindowedRequest records a request to the global windowed metrics manager +func RecordModelWindowedRequest(model string, latencySeconds float64, promptTokens, completionTokens int64, isError, isTimeout bool) { + globalWindowedManagerMutex.RLock() + manager := globalWindowedManager + globalWindowedManagerMutex.RUnlock() + + if manager == nil { + return + } + + manager.RecordRequest(RequestData{ + Timestamp: time.Now(), + Model: model, + LatencySeconds: latencySeconds, + PromptTokens: promptTokens, + CompletionTokens: completionTokens, + IsError: isError, + IsTimeout: isTimeout, + }) +} + +// IncrementModelActiveRequests increments the active request count +func IncrementModelActiveRequests(model string) { + globalWindowedManagerMutex.RLock() + manager := globalWindowedManager + globalWindowedManagerMutex.RUnlock() + + if manager == nil { + return + } + + manager.IncrementActiveRequests(model) +} + +// DecrementModelActiveRequests decrements the active request count +func DecrementModelActiveRequests(model string) { + globalWindowedManagerMutex.RLock() + manager := globalWindowedManager + globalWindowedManagerMutex.RUnlock() + + if manager == nil { + return + } + + manager.DecrementActiveRequests(model) +} + +// GetWindowedMetricsManager returns the global windowed metrics manager +func GetWindowedMetricsManager() *WindowedMetricsManager { + globalWindowedManagerMutex.RLock() + defer globalWindowedManagerMutex.RUnlock() + return globalWindowedManager +} + +// IsWindowedMetricsEnabled returns true if windowed metrics are enabled +func IsWindowedMetricsEnabled() bool { + globalWindowedManagerMutex.RLock() + manager := globalWindowedManager + globalWindowedManagerMutex.RUnlock() + + return manager != nil && manager.config.Enabled +} diff --git a/src/semantic-router/pkg/observability/metrics/windowed_metrics_test.go b/src/semantic-router/pkg/observability/metrics/windowed_metrics_test.go new file mode 100644 index 000000000..7f7ce05a1 --- /dev/null +++ b/src/semantic-router/pkg/observability/metrics/windowed_metrics_test.go @@ -0,0 +1,460 @@ +package metrics + +import ( + "testing" + "time" + + "github.com/vllm-project/semantic-router/src/semantic-router/pkg/config" +) + +// TestNewWindowedMetricsManager tests the creation of a new WindowedMetricsManager +func TestNewWindowedMetricsManager(t *testing.T) { + tests := []struct { + name string + config config.WindowedMetricsConfig + wantTimeWindows int + wantInterval time.Duration + wantMaxModels int + }{ + { + name: "Default configuration", + config: config.WindowedMetricsConfig{ + Enabled: true, + }, + wantTimeWindows: 5, // Default: 1m, 5m, 15m, 1h, 24h + wantInterval: DefaultUpdateInterval, + wantMaxModels: DefaultMaxModels, + }, + { + name: "Custom time windows", + config: config.WindowedMetricsConfig{ + Enabled: true, + TimeWindows: []string{"30s", "2m", "10m"}, + }, + wantTimeWindows: 3, + wantInterval: DefaultUpdateInterval, + wantMaxModels: DefaultMaxModels, + }, + { + name: "Custom update interval", + config: config.WindowedMetricsConfig{ + Enabled: true, + UpdateInterval: "5s", + }, + wantTimeWindows: 5, + wantInterval: 5 * time.Second, + wantMaxModels: DefaultMaxModels, + }, + { + name: "Custom max models", + config: config.WindowedMetricsConfig{ + Enabled: true, + MaxModels: 50, + }, + wantTimeWindows: 5, + wantInterval: DefaultUpdateInterval, + wantMaxModels: 50, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + manager, err := NewWindowedMetricsManager(tt.config) + if err != nil { + t.Fatalf("NewWindowedMetricsManager() error = %v", err) + } + + if len(manager.timeWindows) != tt.wantTimeWindows { + t.Errorf("timeWindows count = %d, want %d", len(manager.timeWindows), tt.wantTimeWindows) + } + + if manager.updateInterval != tt.wantInterval { + t.Errorf("updateInterval = %v, want %v", manager.updateInterval, tt.wantInterval) + } + + if manager.maxModels != tt.wantMaxModels { + t.Errorf("maxModels = %d, want %d", manager.maxModels, tt.wantMaxModels) + } + }) + } +} + +// TestRequestRingBuffer tests the ring buffer functionality +func TestRequestRingBuffer(t *testing.T) { + rb := NewRequestRingBuffer(5) + + // Add 3 items + now := time.Now() + for i := 0; i < 3; i++ { + rb.Add(RequestData{ + Timestamp: now.Add(time.Duration(i) * time.Second), + Model: "model1", + LatencySeconds: float64(i), + }) + } + + // Should have 3 items + data := rb.GetDataSince(now.Add(-time.Hour)) + if len(data) != 3 { + t.Errorf("GetDataSince() count = %d, want 3", len(data)) + } + + // Add 5 more items (should wrap around) + for i := 0; i < 5; i++ { + rb.Add(RequestData{ + Timestamp: now.Add(time.Duration(10+i) * time.Second), + Model: "model2", + LatencySeconds: float64(10 + i), + }) + } + + // Should have 5 items (capacity limit) + data = rb.GetDataSince(now.Add(-time.Hour)) + if len(data) != 5 { + t.Errorf("GetDataSince() count after wrap = %d, want 5", len(data)) + } + + // Verify data is from model2 (most recent) + for _, d := range data { + if d.Model != "model2" { + t.Errorf("Expected model2, got %s", d.Model) + } + } +} + +// TestRequestRingBufferTimeBased tests time-based filtering +func TestRequestRingBufferTimeBased(t *testing.T) { + rb := NewRequestRingBuffer(100) + + now := time.Now() + + // Add items across different time ranges + // Old items (2 hours ago) + for i := 0; i < 10; i++ { + rb.Add(RequestData{ + Timestamp: now.Add(-2 * time.Hour), + Model: "model1", + LatencySeconds: 1.0, + }) + } + + // Recent items (5 minutes ago) + for i := 0; i < 5; i++ { + rb.Add(RequestData{ + Timestamp: now.Add(-5 * time.Minute), + Model: "model1", + LatencySeconds: 2.0, + }) + } + + // Very recent items (30 seconds ago) + for i := 0; i < 3; i++ { + rb.Add(RequestData{ + Timestamp: now.Add(-30 * time.Second), + Model: "model1", + LatencySeconds: 3.0, + }) + } + + // Query for last minute + data := rb.GetDataSince(now.Add(-1 * time.Minute)) + if len(data) != 3 { + t.Errorf("GetDataSince(1 minute) count = %d, want 3", len(data)) + } + + // Query for last 15 minutes + data = rb.GetDataSince(now.Add(-15 * time.Minute)) + if len(data) != 8 { // 5 + 3 + t.Errorf("GetDataSince(15 minutes) count = %d, want 8", len(data)) + } + + // Query for last 24 hours + data = rb.GetDataSince(now.Add(-24 * time.Hour)) + if len(data) != 18 { // 10 + 5 + 3 + t.Errorf("GetDataSince(24 hours) count = %d, want 18", len(data)) + } +} + +// TestComputePercentile tests percentile calculation +func TestComputePercentile(t *testing.T) { + tests := []struct { + name string + values []float64 + percentile float64 + want float64 + tolerance float64 + }{ + { + name: "Empty values", + values: []float64{}, + percentile: 0.5, + want: 0, + tolerance: 0.001, + }, + { + name: "Single value", + values: []float64{5.0}, + percentile: 0.5, + want: 5.0, + tolerance: 0.001, + }, + { + name: "P50 of sorted sequence", + values: []float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + percentile: 0.5, + want: 5.5, // Interpolated median + tolerance: 0.1, + }, + { + name: "P95 of sorted sequence", + values: []float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + percentile: 0.95, + want: 9.55, + tolerance: 0.1, + }, + { + name: "P99 of sorted sequence", + values: []float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + percentile: 0.99, + want: 9.91, + tolerance: 0.1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := computePercentile(tt.values, tt.percentile) + diff := got - tt.want + if diff < 0 { + diff = -diff + } + if diff > tt.tolerance { + t.Errorf("computePercentile() = %v, want %v (tolerance %v)", got, tt.want, tt.tolerance) + } + }) + } +} + +// TestSortFloat64s tests the sorting function +func TestSortFloat64s(t *testing.T) { + tests := []struct { + name string + input []float64 + want []float64 + }{ + { + name: "Empty slice", + input: []float64{}, + want: []float64{}, + }, + { + name: "Single element", + input: []float64{5.0}, + want: []float64{5.0}, + }, + { + name: "Already sorted", + input: []float64{1.0, 2.0, 3.0, 4.0, 5.0}, + want: []float64{1.0, 2.0, 3.0, 4.0, 5.0}, + }, + { + name: "Reverse sorted", + input: []float64{5.0, 4.0, 3.0, 2.0, 1.0}, + want: []float64{1.0, 2.0, 3.0, 4.0, 5.0}, + }, + { + name: "Random order", + input: []float64{3.0, 1.0, 4.0, 1.0, 5.0, 9.0, 2.0, 6.0}, + want: []float64{1.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 9.0}, + }, + { + name: "Large slice (tests quicksort path)", + input: []float64{15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1}, + want: []float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + input := make([]float64, len(tt.input)) + copy(input, tt.input) + sortFloat64s(input) + + if len(input) != len(tt.want) { + t.Errorf("sortFloat64s() length = %d, want %d", len(input), len(tt.want)) + return + } + + for i := range input { + if input[i] != tt.want[i] { + t.Errorf("sortFloat64s()[%d] = %v, want %v", i, input[i], tt.want[i]) + } + } + }) + } +} + +// TestWindowedMetricsManagerRecordRequest tests request recording +func TestWindowedMetricsManagerRecordRequest(t *testing.T) { + manager, err := NewWindowedMetricsManager(config.WindowedMetricsConfig{ + Enabled: true, + MaxModels: 3, + }) + if err != nil { + t.Fatalf("NewWindowedMetricsManager() error = %v", err) + } + + // Record requests for multiple models + now := time.Now() + for i := 0; i < 5; i++ { + manager.RecordRequest(RequestData{ + Timestamp: now, + Model: "model1", + LatencySeconds: 0.1, + PromptTokens: 100, + CompletionTokens: 50, + }) + } + + manager.RecordRequest(RequestData{ + Timestamp: now, + Model: "model2", + LatencySeconds: 0.2, + PromptTokens: 200, + CompletionTokens: 100, + }) + + manager.RecordRequest(RequestData{ + Timestamp: now, + Model: "model3", + LatencySeconds: 0.3, + PromptTokens: 300, + CompletionTokens: 150, + }) + + // This should be ignored (max models reached) + manager.RecordRequest(RequestData{ + Timestamp: now, + Model: "model4", + LatencySeconds: 0.4, + PromptTokens: 400, + CompletionTokens: 200, + }) + + // Check buffer count + manager.bufferMutex.RLock() + bufferCount := len(manager.requestBuffers) + manager.bufferMutex.RUnlock() + + if bufferCount != 3 { + t.Errorf("Buffer count = %d, want 3 (max models)", bufferCount) + } +} + +// TestActiveRequestTracking tests queue depth tracking +func TestActiveRequestTracking(t *testing.T) { + manager, err := NewWindowedMetricsManager(config.WindowedMetricsConfig{ + Enabled: true, + QueueDepthEstimation: true, + }) + if err != nil { + t.Fatalf("NewWindowedMetricsManager() error = %v", err) + } + + // Increment active requests + manager.IncrementActiveRequests("model1") + manager.IncrementActiveRequests("model1") + manager.IncrementActiveRequests("model1") + + // Check count + manager.activeMutex.RLock() + count := manager.activeRequests["model1"] + manager.activeMutex.RUnlock() + + if count != 3 { + t.Errorf("Active requests count = %d, want 3", count) + } + + // Decrement + manager.DecrementActiveRequests("model1") + manager.DecrementActiveRequests("model1") + + manager.activeMutex.RLock() + count = manager.activeRequests["model1"] + manager.activeMutex.RUnlock() + + if count != 1 { + t.Errorf("Active requests count after decrement = %d, want 1", count) + } + + // Decrement beyond zero (should not go negative) + manager.DecrementActiveRequests("model1") + manager.DecrementActiveRequests("model1") + + manager.activeMutex.RLock() + count = manager.activeRequests["model1"] + manager.activeMutex.RUnlock() + + if count != 0 { + t.Errorf("Active requests count should not go negative, got %d", count) + } +} + +// TestDisabledManager tests that disabled manager doesn't record +func TestDisabledManager(t *testing.T) { + manager, err := NewWindowedMetricsManager(config.WindowedMetricsConfig{ + Enabled: false, + }) + if err != nil { + t.Fatalf("NewWindowedMetricsManager() error = %v", err) + } + + // Record request (should be ignored) + manager.RecordRequest(RequestData{ + Timestamp: time.Now(), + Model: "model1", + LatencySeconds: 0.1, + }) + + // Check buffer count (should be 0) + manager.bufferMutex.RLock() + bufferCount := len(manager.requestBuffers) + manager.bufferMutex.RUnlock() + + if bufferCount != 0 { + t.Errorf("Buffer count = %d, want 0 (disabled manager)", bufferCount) + } +} + +// BenchmarkRecordRequest benchmarks request recording +func BenchmarkRecordRequest(b *testing.B) { + manager, _ := NewWindowedMetricsManager(config.WindowedMetricsConfig{ + Enabled: true, + }) + + req := RequestData{ + Timestamp: time.Now(), + Model: "model1", + LatencySeconds: 0.1, + PromptTokens: 100, + CompletionTokens: 50, + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + manager.RecordRequest(req) + } +} + +// BenchmarkComputePercentile benchmarks percentile computation +func BenchmarkComputePercentile(b *testing.B) { + values := make([]float64, 1000) + for i := range values { + values[i] = float64(i) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + computePercentile(values, 0.95) + } +} diff --git a/website/docs/tutorials/observability/metrics.md b/website/docs/tutorials/observability/metrics.md index ea19671a5..cc0c6ed5e 100644 --- a/website/docs/tutorials/observability/metrics.md +++ b/website/docs/tutorials/observability/metrics.md @@ -179,7 +179,69 @@ histogram_quantile(0.95, rate(llm_model_completion_latency_seconds_bucket[5m])) --- -## 6. Troubleshooting +## 6. Windowed Model Metrics (Load Balancing) + +Enhanced time-windowed metrics for model performance tracking, useful for load balancing decisions in Kubernetes environments where model selection matters more than endpoint selection. + +### Configuration + +Enable windowed metrics in `config.yaml`: + +```yaml +observability: + metrics: + windowed_metrics: + enabled: true + time_windows: ["1m", "5m", "15m", "1h", "24h"] + update_interval: "10s" + model_metrics: true + queue_depth_estimation: true + max_models: 100 +``` + +### Model-Level Metrics + +| Metric | Type | Labels | Description | +| ------------------------------------------- | ----- | ---------------------------- | ------------------------------------- | +| `llm_model_latency_windowed_seconds` | gauge | model, time_window | Average latency per time window | +| `llm_model_requests_windowed_total` | gauge | model, time_window | Request count per time window | +| `llm_model_tokens_windowed_total` | gauge | model, token_type, time_window | Token throughput per window | +| `llm_model_utilization_percentage` | gauge | model, time_window | Estimated utilization percentage | +| `llm_model_queue_depth_estimated` | gauge | model | Current estimated queue depth | +| `llm_model_error_rate_windowed` | gauge | model, time_window | Error rate per time window | +| `llm_model_latency_p50_windowed_seconds` | gauge | model, time_window | P50 latency per time window | +| `llm_model_latency_p95_windowed_seconds` | gauge | model, time_window | P95 latency per time window | +| `llm_model_latency_p99_windowed_seconds` | gauge | model, time_window | P99 latency per time window | + +### Example Queries + +```promql +# Average latency for model in last 5 minutes +llm_model_latency_windowed_seconds{model="gpt-4", time_window="5m"} + +# P95 latency comparison across models +llm_model_latency_p95_windowed_seconds{time_window="15m"} + +# Token throughput per model +llm_model_tokens_windowed_total{token_type="completion", time_window="1h"} + +# Current queue depth for load balancing decisions +llm_model_queue_depth_estimated{model="gpt-4"} + +# Error rate monitoring +llm_model_error_rate_windowed{time_window="5m"} > 0.05 +``` + +### Use Cases + +1. **Load Balancing**: Use queue depth and latency metrics to route requests to less loaded models +2. **Performance Monitoring**: Track P95/P99 latency trends across time windows +3. **Capacity Planning**: Monitor utilization percentages to identify when to scale models +4. **Alerting**: Set alerts on error rates or latency spikes within specific time windows + +--- + +## 7. Troubleshooting | Issue | Check | Fix | | --------------- | ------------------- | ----------------------------------------------------- |