Skip to content

Commit a88e4fb

Browse files
authored
refactor: [Scale from Zero] Introduce PodLocator (#1950)
* contracts: add PodLocator for candidate resolution This defines the contract for resolving candidate pods based on request metadata, decoupling the resolution logic from the storage layer. * requestcontrol: implement CachedPodLocator Introduces DatastorePodLocator and a caching decorator. This reduces contention on the Datastore RWMutex during high-throughput dispatch cycles by caching subset resolution results for a short TTL. * director: delegate candidate resolution Refactors the Director to use the injected PodLocator interface instead of the private getCandidatePodsForScheduling method. This prepares the Director for lazy resolution without changing current behavior.
1 parent d5974b1 commit a88e4fb

File tree

7 files changed

+644
-142
lines changed

7 files changed

+644
-142
lines changed

cmd/epp/runner/runner.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"strconv"
3131
"strings"
3232
"sync/atomic"
33+
"time"
3334

3435
"github.com/go-logr/logr"
3536
"github.com/prometheus/client_golang/prometheus"
@@ -352,10 +353,13 @@ func (r *Runner) Run(ctx context.Context) error {
352353
admissionController = requestcontrol.NewLegacyAdmissionController(saturationDetector)
353354
}
354355

356+
locator := requestcontrol.NewDatastorePodLocator(ds)
357+
cachedLocator := requestcontrol.NewCachedPodLocator(ctx, locator, time.Millisecond*50)
355358
director := requestcontrol.NewDirectorWithConfig(
356359
ds,
357360
scheduler,
358361
admissionController,
362+
cachedLocator,
359363
r.requestControlConfig)
360364

361365
// --- Setup ExtProc Server Runner ---

pkg/epp/flowcontrol/contracts/saturationdetector.go renamed to pkg/epp/flowcontrol/contracts/dependencies.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,16 @@ import (
2222
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
2323
)
2424

25+
// PodLocator defines the contract for a component that resolves the set of candidate pods for a request based on its
26+
// metadata (e.g., subsetting).
27+
//
28+
// This interface allows the Flow Controller to fetch a fresh list of pods dynamically during the dispatch cycle,
29+
// enabling support for "Scale-from-Zero" scenarios where pods may not exist when the request is first enqueued.
30+
type PodLocator interface {
31+
// Locate returns a list of pod metrics that match the criteria defined in the request metadata.
32+
Locate(ctx context.Context, requestMetadata map[string]any) []metrics.PodMetrics
33+
}
34+
2535
// SaturationDetector defines the contract for a component that provides real-time load signals to the
2636
// `controller.FlowController`.
2737
//

pkg/epp/requestcontrol/director.go

Lines changed: 5 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ import (
3232
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
3333
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
3434
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
35+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
3536
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers"
36-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
3737
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
3838
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
3939
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
@@ -65,12 +65,14 @@ func NewDirectorWithConfig(
6565
datastore Datastore,
6666
scheduler Scheduler,
6767
admissionController AdmissionController,
68+
podLocator contracts.PodLocator,
6869
config *Config,
6970
) *Director {
7071
return &Director{
7172
datastore: datastore,
7273
scheduler: scheduler,
7374
admissionController: admissionController,
75+
podLocator: podLocator,
7476
requestControlPlugins: *config,
7577
defaultPriority: 0, // define default priority explicitly
7678
}
@@ -89,6 +91,7 @@ type Director struct {
8991
datastore Datastore
9092
scheduler Scheduler
9193
admissionController AdmissionController
94+
podLocator contracts.PodLocator
9295
requestControlPlugins Config
9396
// we just need a pointer to an int variable since priority is a pointer in InferenceObjective
9497
// no need to set this in the constructor, since the value we want is the default int val
@@ -157,7 +160,7 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
157160
logger.V(logutil.DEBUG).Info("LLM request assembled")
158161

159162
// Get candidate pods for scheduling
160-
candidatePods := d.getCandidatePodsForScheduling(ctx, reqCtx.Request.Metadata)
163+
candidatePods := d.podLocator.Locate(ctx, reqCtx.Request.Metadata)
161164
if len(candidatePods) == 0 {
162165
return reqCtx, errutil.Error{Code: errutil.ServiceUnavailable, Msg: "failed to find candidate pods for serving the request"}
163166
}
@@ -232,52 +235,6 @@ func (d *Director) selectWeightedModel(models []v1alpha2.TargetModel) string {
232235
return models[len(models)-1].ModelRewrite
233236
}
234237

235-
// getCandidatePodsForScheduling gets the list of relevant endpoints for the scheduling cycle from the datastore.
236-
// according to EPP protocol, if "x-gateway-destination-endpoint-subset" is set on the request metadata and specifies
237-
// a subset of endpoints, only these endpoints will be considered as candidates for the scheduler.
238-
// Snapshot pod metrics from the datastore to:
239-
// 1. Reduce concurrent access to the datastore.
240-
// 2. Ensure consistent data during the scheduling operation of a request between all scheduling cycles.
241-
func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMetadata map[string]any) []backendmetrics.PodMetrics {
242-
loggerTrace := log.FromContext(ctx).V(logutil.TRACE)
243-
244-
subsetMap, found := requestMetadata[metadata.SubsetFilterNamespace].(map[string]any)
245-
if !found {
246-
return d.datastore.PodList(datastore.AllPodsPredicate)
247-
}
248-
249-
// Check if endpoint key is present in the subset map and ensure there is at least one value
250-
endpointSubsetList, found := subsetMap[metadata.SubsetFilterKey].([]any)
251-
if !found {
252-
return d.datastore.PodList(datastore.AllPodsPredicate)
253-
} else if len(endpointSubsetList) == 0 {
254-
loggerTrace.Info("found empty subset filter in request metadata, filtering all pods")
255-
return []backendmetrics.PodMetrics{}
256-
}
257-
258-
// Create a map of endpoint addresses for easy lookup
259-
endpoints := make(map[string]bool)
260-
for _, endpoint := range endpointSubsetList {
261-
// Extract address from endpoint
262-
// The endpoint is formatted as "<address>:<port>" (ex. "10.0.1.0:8080")
263-
epStr := strings.Split(endpoint.(string), ":")[0]
264-
endpoints[epStr] = true
265-
}
266-
267-
podTotalCount := 0
268-
podFilteredList := d.datastore.PodList(func(pm backendmetrics.PodMetrics) bool {
269-
podTotalCount++
270-
if _, found := endpoints[pm.GetPod().GetIPAddress()]; found {
271-
return true
272-
}
273-
return false
274-
})
275-
276-
loggerTrace.Info("filtered candidate pods by subset filtering", "podTotalCount", podTotalCount, "filteredCount", len(podFilteredList))
277-
278-
return podFilteredList
279-
}
280-
281238
// prepareRequest populates the RequestContext and calls the registered PreRequest plugins
282239
// for allowing plugging customized logic based on the scheduling result.
283240
func (d *Director) prepareRequest(ctx context.Context, reqCtx *handlers.RequestContext, result *schedulingtypes.SchedulingResult) (*handlers.RequestContext, error) {

pkg/epp/requestcontrol/director_test.go

Lines changed: 23 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"time"
2727

2828
"github.com/google/go-cmp/cmp"
29-
"github.com/google/go-cmp/cmp/cmpopts"
3029
"github.com/stretchr/testify/assert"
3130
corev1 "k8s.io/api/core/v1"
3231
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -42,7 +41,6 @@ import (
4241
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
4342
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
4443
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers"
45-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
4644
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
4745
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
4846
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
@@ -659,9 +657,16 @@ func TestDirector_HandleRequest(t *testing.T) {
659657
config = config.WithPrepareDataPlugins(test.prepareDataPlugin)
660658
}
661659
config = config.WithAdmissionPlugins(newMockAdmissionPlugin("test-admit-plugin", test.admitRequestDenialError))
662-
director := NewDirectorWithConfig(ds, mockSched, test.mockAdmissionController, config)
660+
661+
locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds), time.Minute)
662+
director := NewDirectorWithConfig(ds, mockSched, test.mockAdmissionController, locator, config)
663663
if test.name == "successful request with model rewrite" {
664-
director.datastore = &mockDatastore{pods: ds.PodList(datastore.AllPodsPredicate), rewrites: []*v1alpha2.InferenceModelRewrite{rewrite}}
664+
mockDs := &mockDatastore{
665+
pods: ds.PodList(datastore.AllPodsPredicate),
666+
rewrites: []*v1alpha2.InferenceModelRewrite{rewrite},
667+
}
668+
director.datastore = mockDs
669+
director.podLocator = NewCachedPodLocator(context.Background(), NewDatastorePodLocator(mockDs), time.Minute)
665670
}
666671

667672
reqCtx := &handlers.RequestContext{
@@ -708,91 +713,6 @@ func TestDirector_HandleRequest(t *testing.T) {
708713
}
709714
}
710715

711-
// TestGetCandidatePodsForScheduling is testing getCandidatePodsForScheduling and more specifically the functionality of SubsetFilter.
712-
func TestGetCandidatePodsForScheduling(t *testing.T) {
713-
var makeFilterMetadata = func(data []any) map[string]any {
714-
return map[string]any{
715-
metadata.SubsetFilterNamespace: map[string]any{
716-
metadata.SubsetFilterKey: data,
717-
},
718-
}
719-
}
720-
721-
pod1 := &backend.Pod{
722-
NamespacedName: types.NamespacedName{Name: "pod1"},
723-
Address: "10.0.0.1",
724-
Labels: map[string]string{},
725-
}
726-
727-
pod2 := &backend.Pod{
728-
NamespacedName: types.NamespacedName{Name: "pod2"},
729-
Address: "10.0.0.2",
730-
Labels: map[string]string{},
731-
}
732-
733-
testInput := []backendmetrics.PodMetrics{
734-
&backendmetrics.FakePodMetrics{Pod: pod1},
735-
&backendmetrics.FakePodMetrics{Pod: pod2},
736-
}
737-
738-
tests := []struct {
739-
name string
740-
metadata map[string]any
741-
output []backendmetrics.PodMetrics
742-
}{
743-
{
744-
name: "SubsetFilter, filter not present — return all pods",
745-
metadata: map[string]any{},
746-
output: testInput,
747-
},
748-
{
749-
name: "SubsetFilter, namespace present filter not present — return all pods",
750-
metadata: map[string]any{metadata.SubsetFilterNamespace: map[string]any{}},
751-
output: testInput,
752-
},
753-
{
754-
name: "SubsetFilter, filter present with empty list — return error",
755-
metadata: makeFilterMetadata([]any{}),
756-
output: []backendmetrics.PodMetrics{},
757-
},
758-
{
759-
name: "SubsetFilter, subset with one matching pod",
760-
metadata: makeFilterMetadata([]any{"10.0.0.1"}),
761-
output: []backendmetrics.PodMetrics{
762-
&backendmetrics.FakePodMetrics{
763-
Pod: pod1,
764-
},
765-
},
766-
},
767-
{
768-
name: "SubsetFilter, subset with multiple matching pods",
769-
metadata: makeFilterMetadata([]any{"10.0.0.1", "10.0.0.2", "10.0.0.3"}),
770-
output: testInput,
771-
},
772-
{
773-
name: "SubsetFilter, subset with no matching pods",
774-
metadata: makeFilterMetadata([]any{"10.0.0.3"}),
775-
output: []backendmetrics.PodMetrics{},
776-
},
777-
}
778-
779-
ds := &mockDatastore{pods: testInput}
780-
for _, test := range tests {
781-
t.Run(test.name, func(t *testing.T) {
782-
director := NewDirectorWithConfig(ds, &mockScheduler{}, &mockAdmissionController{}, NewConfig())
783-
784-
got := director.getCandidatePodsForScheduling(context.Background(), test.metadata)
785-
786-
diff := cmp.Diff(test.output, got, cmpopts.SortSlices(func(a, b backendmetrics.PodMetrics) bool {
787-
return a.GetPod().NamespacedName.String() < b.GetPod().NamespacedName.String()
788-
}))
789-
if diff != "" {
790-
t.Errorf("Unexpected output (-want +got): %v", diff)
791-
}
792-
})
793-
}
794-
}
795-
796716
func TestGetRandomPod(t *testing.T) {
797717
tests := []struct {
798718
name string
@@ -1028,7 +948,8 @@ func TestDirector_ApplyWeightedModelRewrite(t *testing.T) {
1028948
for _, test := range tests {
1029949
t.Run(test.name, func(t *testing.T) {
1030950
mockDs := &mockDatastore{rewrites: test.rewrites}
1031-
director := NewDirectorWithConfig(mockDs, &mockScheduler{}, &mockAdmissionController{}, NewConfig())
951+
locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(mockDs), time.Minute)
952+
director := NewDirectorWithConfig(mockDs, &mockScheduler{}, &mockAdmissionController{}, locator, NewConfig())
1032953

1033954
reqCtx := &handlers.RequestContext{
1034955
IncomingModelName: test.incomingModel,
@@ -1128,7 +1049,14 @@ func TestDirector_HandleResponseReceived(t *testing.T) {
11281049
ctx := logutil.NewTestLoggerIntoContext(context.Background())
11291050
ds := datastore.NewDatastore(t.Context(), nil, 0)
11301051
mockSched := &mockScheduler{}
1131-
director := NewDirectorWithConfig(ds, mockSched, &mockAdmissionController{}, NewConfig().WithResponseReceivedPlugins(pr1))
1052+
locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds), time.Minute)
1053+
director := NewDirectorWithConfig(
1054+
ds,
1055+
mockSched,
1056+
&mockAdmissionController{},
1057+
locator,
1058+
NewConfig().WithResponseReceivedPlugins(pr1),
1059+
)
11321060

11331061
reqCtx := &handlers.RequestContext{
11341062
Request: &handlers.Request{
@@ -1165,7 +1093,8 @@ func TestDirector_HandleResponseStreaming(t *testing.T) {
11651093
ctx := logutil.NewTestLoggerIntoContext(context.Background())
11661094
ds := datastore.NewDatastore(t.Context(), nil, 0)
11671095
mockSched := &mockScheduler{}
1168-
director := NewDirectorWithConfig(ds, mockSched, nil, NewConfig().WithResponseStreamingPlugins(ps1))
1096+
locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds), time.Minute)
1097+
director := NewDirectorWithConfig(ds, mockSched, nil, locator, NewConfig().WithResponseStreamingPlugins(ps1))
11691098

11701099
reqCtx := &handlers.RequestContext{
11711100
Request: &handlers.Request{
@@ -1201,7 +1130,8 @@ func TestDirector_HandleResponseComplete(t *testing.T) {
12011130
ctx := logutil.NewTestLoggerIntoContext(context.Background())
12021131
ds := datastore.NewDatastore(t.Context(), nil, 0)
12031132
mockSched := &mockScheduler{}
1204-
director := NewDirectorWithConfig(ds, mockSched, nil, NewConfig().WithResponseCompletePlugins(pc1))
1133+
locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds), time.Minute)
1134+
director := NewDirectorWithConfig(ds, mockSched, nil, locator, NewConfig().WithResponseCompletePlugins(pc1))
12051135

12061136
reqCtx := &handlers.RequestContext{
12071137
Request: &handlers.Request{

0 commit comments

Comments
 (0)