Skip to content

Commit 00d3fed

Browse files
feat: add time-windowed model metrics for load balancing
Signed-off-by: Jintao Zhang <[email protected]>
1 parent 30801fa commit 00d3fed

File tree

8 files changed

+1171
-3
lines changed

8 files changed

+1171
-3
lines changed

src/semantic-router/cmd/main.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/vllm-project/semantic-router/src/semantic-router/pkg/extproc"
1919
"github.com/vllm-project/semantic-router/src/semantic-router/pkg/k8s"
2020
"github.com/vllm-project/semantic-router/src/semantic-router/pkg/observability/logging"
21+
"github.com/vllm-project/semantic-router/src/semantic-router/pkg/observability/metrics"
2122
"github.com/vllm-project/semantic-router/src/semantic-router/pkg/observability/tracing"
2223
)
2324

@@ -87,6 +88,16 @@ func main() {
8788
}()
8889
}
8990

91+
// Initialize windowed metrics if enabled
92+
if cfg.Observability.Metrics.WindowedMetrics.Enabled {
93+
logging.Infof("Initializing windowed metrics for load balancing...")
94+
if err := metrics.InitializeWindowedMetrics(cfg.Observability.Metrics.WindowedMetrics); err != nil {
95+
logging.Warnf("Failed to initialize windowed metrics: %v", err)
96+
} else {
97+
logging.Infof("Windowed metrics initialized successfully")
98+
}
99+
}
100+
90101
// Set up signal handling for graceful shutdown
91102
sigChan := make(chan os.Signal, 1)
92103
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)

src/semantic-router/pkg/config/config.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -262,15 +262,43 @@ type APIConfig struct {
262262
type ObservabilityConfig struct {
263263
// Tracing configuration for distributed tracing
264264
Tracing TracingConfig `yaml:"tracing"`
265-
// Metrics configuration for Prometheus metrics endpoint
265+
266+
// Metrics configuration for enhanced metrics collection
266267
Metrics MetricsConfig `yaml:"metrics"`
267268
}
268269

269-
// MetricsConfig represents configuration for metrics endpoint
270+
// MetricsConfig represents configuration for metrics collection
270271
type MetricsConfig struct {
271272
// Enabled controls whether the Prometheus metrics endpoint is served
272273
// When omitted, defaults to true
273274
Enabled *bool `yaml:"enabled,omitempty"`
275+
276+
// Enable windowed metrics collection for load balancing
277+
WindowedMetrics WindowedMetricsConfig `yaml:"windowed_metrics"`
278+
}
279+
280+
// WindowedMetricsConfig represents configuration for time-windowed metrics
281+
type WindowedMetricsConfig struct {
282+
// Enable windowed metrics collection
283+
Enabled bool `yaml:"enabled"`
284+
285+
// Time windows to track (in duration format, e.g., "1m", "5m", "15m", "1h", "24h")
286+
// Default: ["1m", "5m", "15m", "1h", "24h"]
287+
TimeWindows []string `yaml:"time_windows,omitempty"`
288+
289+
// Update interval for windowed metrics computation (e.g., "10s", "30s")
290+
// Default: "10s"
291+
UpdateInterval string `yaml:"update_interval,omitempty"`
292+
293+
// Enable model-level metrics tracking
294+
ModelMetrics bool `yaml:"model_metrics"`
295+
296+
// Enable queue depth estimation
297+
QueueDepthEstimation bool `yaml:"queue_depth_estimation"`
298+
299+
// Maximum number of models to track (to prevent cardinality explosion)
300+
// Default: 100
301+
MaxModels int `yaml:"max_models,omitempty"`
274302
}
275303

276304
// TracingConfig represents configuration for distributed tracing

src/semantic-router/pkg/extproc/processor_req_body.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,12 @@ func (r *OpenAIRouter) selectEndpointForModel(ctx *RequestContext, model string)
229229
backendSpan.End()
230230
ctx.TraceContext = backendCtx
231231

232+
// Store the selected endpoint in context (for routing/logging purposes)
233+
ctx.SelectedEndpoint = endpointAddress
234+
235+
// Increment active request count for queue depth estimation (model-level)
236+
metrics.IncrementModelActiveRequests(model)
237+
232238
return endpointAddress
233239
}
234240

src/semantic-router/pkg/extproc/processor_req_header.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ type RequestContext struct {
4242
VSRInjectedSystemPrompt bool // Whether a system prompt was injected into the request
4343
VSRSelectedDecision *config.Decision // The decision object selected by DecisionEngine (for plugins)
4444

45+
// Endpoint tracking for windowed metrics
46+
SelectedEndpoint string // The endpoint address selected for this request
47+
4548
// Tracing context
4649
TraceContext context.Context // OpenTelemetry trace context for span propagation
4750
}

src/semantic-router/pkg/extproc/processor_res_body.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ import (
1515
func (r *OpenAIRouter) handleResponseBody(v *ext_proc.ProcessingRequest_ResponseBody, ctx *RequestContext) (*ext_proc.ProcessingResponse, error) {
1616
completionLatency := time.Since(ctx.StartTime)
1717

18+
// Decrement active request count for queue depth estimation
19+
defer metrics.DecrementModelActiveRequests(ctx.RequestModel)
20+
1821
// Process the response for caching
1922
responseBody := v.ResponseBody.Body
2023

@@ -68,6 +71,16 @@ func (r *OpenAIRouter) handleResponseBody(v *ext_proc.ProcessingRequest_Response
6871
metrics.RecordModelTPOT(ctx.RequestModel, timePerToken)
6972
}
7073

74+
// Record windowed model metrics for load balancing
75+
metrics.RecordModelWindowedRequest(
76+
ctx.RequestModel,
77+
completionLatency.Seconds(),
78+
int64(promptTokens),
79+
int64(completionTokens),
80+
false, // isError
81+
false, // isTimeout
82+
)
83+
7184
// Compute and record cost if pricing is configured
7285
if r.Config != nil {
7386
promptRatePer1M, completionRatePer1M, currency, ok := r.Config.GetModelPricing(ctx.RequestModel)

0 commit comments

Comments
 (0)