From d613f488d95ef6f4dd7969eab08d47e72e2ab238 Mon Sep 17 00:00:00 2001 From: Kuromesi <blackfacepan@163.com> Date: Thu, 9 Jan 2025 11:41:27 +0800 Subject: [PATCH 1/6] configurable filter chains Signed-off-by: Kuromesi <blackfacepan@163.com> --- pkg/ext-proc/backend/datastore.go | 14 + .../backend/filterconfig_reconciler.go | 53 ++++ pkg/ext-proc/main.go | 21 +- pkg/ext-proc/scheduling/filter.go | 35 ++- pkg/ext-proc/scheduling/filter_test.go | 10 +- pkg/ext-proc/scheduling/filtergen.go | 147 ++++++++++ pkg/ext-proc/scheduling/orchestrate.go | 104 +++++++ pkg/ext-proc/scheduling/orchestrate_test.go | 270 ++++++++++++++++++ pkg/ext-proc/scheduling/scheduler.go | 57 ++-- 9 files changed, 670 insertions(+), 41 deletions(-) create mode 100644 pkg/ext-proc/backend/filterconfig_reconciler.go create mode 100644 pkg/ext-proc/scheduling/filtergen.go create mode 100644 pkg/ext-proc/scheduling/orchestrate.go create mode 100644 pkg/ext-proc/scheduling/orchestrate_test.go diff --git a/pkg/ext-proc/backend/datastore.go b/pkg/ext-proc/backend/datastore.go index 70f000b8..cf4ff869 100644 --- a/pkg/ext-proc/backend/datastore.go +++ b/pkg/ext-proc/backend/datastore.go @@ -29,10 +29,24 @@ type K8sDatastore struct { inferencePool *v1alpha1.InferencePool InferenceModels *sync.Map pods *sync.Map + + filterConfigMap *corev1.ConfigMap } type K8sDatastoreOption func(*K8sDatastore) +func (ds *K8sDatastore) GetFilterConfigMap() *corev1.ConfigMap { + ds.poolMu.RLock() + defer ds.poolMu.RUnlock() + return ds.filterConfigMap +} + +func WithFilterConfigMap(filterConfigMap *corev1.ConfigMap) K8sDatastoreOption { + return func(store *K8sDatastore) { + store.filterConfigMap = filterConfigMap + } +} + // WithPods can be used in tests to override the pods. func WithPods(pods []*PodMetrics) K8sDatastoreOption { return func(store *K8sDatastore) { diff --git a/pkg/ext-proc/backend/filterconfig_reconciler.go b/pkg/ext-proc/backend/filterconfig_reconciler.go new file mode 100644 index 00000000..99f74d4a --- /dev/null +++ b/pkg/ext-proc/backend/filterconfig_reconciler.go @@ -0,0 +1,53 @@ +package backend + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +type FilterConfigReconciler struct { + client.Client + Datastore *K8sDatastore +} + +func (c *FilterConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + cm := &corev1.ConfigMap{} + if err := c.Get(ctx, req.NamespacedName, cm); err != nil { + if client.IgnoreNotFound(err) != nil { + klog.Errorf("unable to get ConfigMap, err: %v", err) + return ctrl.Result{}, err + } + c.Datastore.poolMu.Lock() + defer c.Datastore.poolMu.Unlock() + klog.V(1).Info("filter config deleted, reset filter config") + c.Datastore.filterConfigMap = nil + return ctrl.Result{}, nil + } + + c.Datastore.poolMu.Lock() + defer c.Datastore.poolMu.Unlock() + + if cm.DeletionTimestamp != nil { + klog.V(1).Info("filter config deleting, reset filter config") + c.Datastore.filterConfigMap = nil + return ctrl.Result{}, nil + } + + klog.V(1).Infof("update filter config to: %++v", cm.Data) + c.Datastore.filterConfigMap = cm.DeepCopy() + return ctrl.Result{}, nil +} + +func (c *FilterConfigReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&corev1.ConfigMap{}). + WithEventFilter(predicate.NewPredicateFuncs(func(object client.Object) bool { + return object.GetName() == "filter-config" && object.GetNamespace() == "default" + })). + Complete(c) +} diff --git a/pkg/ext-proc/main.go b/pkg/ext-proc/main.go index e42d8e4f..381c9baf 100644 --- a/pkg/ext-proc/main.go +++ b/pkg/ext-proc/main.go @@ -59,6 +59,11 @@ var ( "refreshMetricsInterval", 50*time.Millisecond, "interval to refresh metrics") + enableFilterConfiguration = flag.Bool( + "enableFilterConfiguration", + false, + "Whether to enable configuring filters in `default/filter-config` configmap, ONLY FOR DEV NOW.", + ) scheme = runtime.NewScheme() ) @@ -133,6 +138,15 @@ func main() { klog.Fatalf("Failed setting up EndpointSliceReconciler: %v", err) } + if *enableFilterConfiguration { + if err := (&backend.FilterConfigReconciler{ + Datastore: datastore, + Client: mgr.GetClient(), + }).SetupWithManager(mgr); err != nil { + klog.Error(err, "Error setting up FilterConfigReconciler") + } + } + // Start health and ext-proc servers in goroutines healthSvr := startHealthServer(datastore, *grpcHealthPort) extProcSvr := startExternalProcessorServer( @@ -193,6 +207,11 @@ func startExternalProcessorServer( ) *grpc.Server { svr := grpc.NewServer() + var orchestrator *scheduling.FilterOrchestratorImpl + if *enableFilterConfiguration { + orchestrator = scheduling.NewFilterOrchestrator(datastore) + } + go func() { lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { @@ -209,7 +228,7 @@ func startExternalProcessorServer( // Register ext_proc handlers extProcPb.RegisterExternalProcessorServer( svr, - handlers.NewServer(pp, scheduling.NewScheduler(pp), targetPodHeader, datastore), + handlers.NewServer(pp, scheduling.NewScheduler(pp, scheduling.WithOrchestrator(orchestrator)), targetPodHeader, datastore), ) // Blocking and will return when shutdown is complete. diff --git a/pkg/ext-proc/scheduling/filter.go b/pkg/ext-proc/scheduling/filter.go index 09779d63..dec75bef 100644 --- a/pkg/ext-proc/scheduling/filter.go +++ b/pkg/ext-proc/scheduling/filter.go @@ -4,43 +4,46 @@ import ( "errors" "math" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" + klog "k8s.io/klog/v2" ) -type Filter interface { +type FilterChain interface { Name() string Filter(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) } -// filter applies current filterFunc, and then recursively applies next filters depending success or +// filterChainImpl applies current filterFunc, and then recursively applies next filters depending success or // failure of the current filterFunc. // It can be used to construct a flow chart algorithm. -type filter struct { +type filterChainImpl struct { name string - filter filterFunc + filter filter // nextOnSuccess filter will be applied after successfully applying the current filter. // The filtered results will be passed to the next filter. - nextOnSuccess *filter + nextOnSuccess *filterChainImpl // nextOnFailure filter will be applied if current filter fails. // The original input will be passed to the next filter. - nextOnFailure *filter + nextOnFailure *filterChainImpl // nextOnSuccessOrFailure is a convenience field to configure the next filter regardless of the // success or failure of the current filter. // NOTE: When using nextOnSuccessOrFailure, both nextOnSuccess and nextOnFailure SHOULD be nil. // However if that's not the case, nextOnSuccess and nextOnFailure will be used, instead of // nextOnSuccessOrFailure, in the success and failure scenarios, respectively. - nextOnSuccessOrFailure *filter + nextOnSuccessOrFailure *filterChainImpl } -func (f *filter) Name() string { +func (f *filterChainImpl) Name() string { if f == nil { return "nil" } return f.name } -func (f *filter) Filter(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { +func (f *filterChainImpl) Filter(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { klog.V(3).Infof("Running filter %q on request %v with %v pods", f.name, req, len(pods)) filtered, err := f.filter(req, pods) @@ -71,11 +74,11 @@ func (f *filter) Filter(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend } } -// filterFunc filters a set of input pods to a subset. -type filterFunc func(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) +// filter filters a set of input pods to a subset. +type filter func(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) -// toFilterFunc is a helper function to convert a per pod filter func to the FilterFunc. -func toFilterFunc(pp podPredicate) filterFunc { +// toFilter is a helper function to convert a per pod filter func to the FilterFunc. +func toFilter(pp podPredicate) filter { return func(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { filtered := []*backend.PodMetrics{} for _, pod := range pods { @@ -152,6 +155,12 @@ func leastKVCacheFilterFunc(req *LLMRequest, pods []*backend.PodMetrics) ([]*bac return filtered, nil } +func dropRequestFilterFunc(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { + klog.Infof("Dropping request %v", req) + return []*backend.PodMetrics{}, status.Errorf( + codes.ResourceExhausted, "dropping request due to limited backend resources") +} + // podPredicate is a filter function to check whether a pod is desired. type podPredicate func(req *LLMRequest, pod *backend.PodMetrics) bool diff --git a/pkg/ext-proc/scheduling/filter_test.go b/pkg/ext-proc/scheduling/filter_test.go index d88f437c..f923fcba 100644 --- a/pkg/ext-proc/scheduling/filter_test.go +++ b/pkg/ext-proc/scheduling/filter_test.go @@ -15,11 +15,11 @@ func TestFilter(t *testing.T) { input []*backend.PodMetrics output []*backend.PodMetrics err bool - filter *filter + filter *filterChainImpl }{ { name: "simple filter without successor, failure", - filter: &filter{filter: func(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { + filter: &filterChainImpl{filter: func(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { return nil, errors.New("filter error") }}, err: true, @@ -216,7 +216,7 @@ func TestFilter(t *testing.T) { func TestFilterFunc(t *testing.T) { tests := []struct { name string - f filterFunc + f filter req *LLMRequest input []*backend.PodMetrics output []*backend.PodMetrics @@ -302,7 +302,7 @@ func TestFilterFunc(t *testing.T) { }, { name: "noQueueAndLessThanKVCacheThresholdPredicate", - f: toFilterFunc(noQueueAndLessThanKVCacheThresholdPredicate(0, 0.8)), + f: toFilter(noQueueAndLessThanKVCacheThresholdPredicate(0, 0.8)), input: []*backend.PodMetrics{ { // This pod should be returned. @@ -337,7 +337,7 @@ func TestFilterFunc(t *testing.T) { }, { name: "low LoRA cost", - f: toFilterFunc(lowLoRACostPredicate), + f: toFilter(lowLoRACostPredicate), req: &LLMRequest{ Model: "model", ResolvedTargetModel: "model", diff --git a/pkg/ext-proc/scheduling/filtergen.go b/pkg/ext-proc/scheduling/filtergen.go new file mode 100644 index 00000000..9e3b66e1 --- /dev/null +++ b/pkg/ext-proc/scheduling/filtergen.go @@ -0,0 +1,147 @@ +package scheduling + +const ( + FilterCriticalRequestName = "critical_request" + FilterLeastQueuingName = "least_queuing" + FilterLowCostLoraName = "low_cost_lora" + FilterLowLatencyName = "low_latency" + FilterAffinityLoraName = "affinity_lora" + FilterSheddableRequestName = "sheddable_request" + FilterLeastKvCacheName = "least_kv_cache" + FilterDropRequestName = "drop_request" + FilterCanAcceptNewLoraName = "can_accept_new_lora" +) + +const ( + TopKByWaitingQueueSize = "waiting_queue_size" + TopKByKVCacheUsagePercent = "kv_cache_usage_percent" +) + +var filterMap = map[string]FilterGen{ + FilterLowLatencyName: FilterLowLatency, + FilterCriticalRequestName: FilterCriticalRequest, + FilterLeastQueuingName: FilterLeastQueuing, + FilterCanAcceptNewLoraName: FilterCanAcceptNewLora, + FilterSheddableRequestName: FilterSheddableRequest, + FilterDropRequestName: FilterDropRequest, + FilterAffinityLoraName: FilterAffinityLora, + FilterLowCostLoraName: FilterLowCostLora, + FilterLeastKvCacheName: FilterLeastKvCache, +} + +// FilterGen generate a filter from a filter option +type FilterGen interface { + Name() string + Get(*FilterOption) filter + Validate(*FilterOption) error +} + +type FilterOption struct { + KvCacheThreshold *float64 `json:"kvCacheThreshold,omitempty"` + + QueueThresholdCritical *int `json:"queueThresholdCritical,omitempty"` + QueueingThresholdLoRA *int `json:"queueingThresholdLoRA,omitempty"` +} + +type filterGenImpl struct { + name string + getter func(*FilterOption) filter + validator func(*FilterOption) error +} + +var _ FilterGen = &filterGenImpl{} + +func (fg *filterGenImpl) Name() string { + return fg.name +} + +func (fg *filterGenImpl) Get(fo *FilterOption) filter { + return fg.getter(fo) +} + +func (fg *filterGenImpl) Validate(fo *FilterOption) error { + return fg.validator(fo) +} + +var ( + FilterCriticalRequest FilterGen = &filterGenImpl{ + name: FilterCriticalRequestName, + getter: func(fo *FilterOption) filter { + return toFilter(criticalRequestPredicate) + }, + validator: func(fo *FilterOption) error { return nil }, + } + + FilterLeastQueuing FilterGen = &filterGenImpl{ + name: FilterLeastQueuingName, + getter: func(fo *FilterOption) filter { + return leastQueuingFilterFunc + }, + validator: func(fo *FilterOption) error { return nil }, + } + + FilterLowCostLora FilterGen = &filterGenImpl{ + name: FilterLowCostLoraName, + getter: func(fo *FilterOption) filter { + return toFilter(lowLoRACostPredicate) + }, + validator: func(fo *FilterOption) error { return nil }, + } + + FilterLowLatency FilterGen = &filterGenImpl{ + name: FilterLowLatencyName, + getter: func(fo *FilterOption) filter { + return toFilter(lowQueueingPodPredicate) + }, + validator: func(fo *FilterOption) error { return nil }, + } + + FilterAffinityLora FilterGen = &filterGenImpl{ + name: FilterAffinityLoraName, + getter: func(fo *FilterOption) filter { + return toFilter(loRAAffinityPredicate) + }, + validator: func(fo *FilterOption) error { return nil }, + } + + FilterSheddableRequest FilterGen = &filterGenImpl{ + name: FilterSheddableRequestName, + getter: func(opt *FilterOption) filter { + qtc, kct := queueThresholdCritical, kvCacheThreshold + if opt != nil { + if opt.KvCacheThreshold != nil { + kct = *opt.KvCacheThreshold + } + if opt.QueueThresholdCritical != nil { + qtc = *opt.QueueThresholdCritical + } + } + return toFilter(noQueueAndLessThanKVCacheThresholdPredicate(qtc, kct)) + }, + validator: func(fo *FilterOption) error { return nil }, + } + + FilterLeastKvCache FilterGen = &filterGenImpl{ + name: FilterLeastKvCacheName, + getter: func(fo *FilterOption) filter { + return leastKVCacheFilterFunc + }, + validator: func(fo *FilterOption) error { return nil }, + } + + FilterDropRequest FilterGen = &filterGenImpl{ + name: FilterDropRequestName, + getter: func(fo *FilterOption) filter { + return dropRequestFilterFunc + }, + validator: func(fo *FilterOption) error { return nil }, + } + + FilterCanAcceptNewLora FilterGen = &filterGenImpl{ + name: FilterCanAcceptNewLoraName, + getter: func(fo *FilterOption) filter { + return toFilter(canAcceptNewLoraPredicate) + }, + validator: func(fo *FilterOption) error { return nil }, + } +) diff --git a/pkg/ext-proc/scheduling/orchestrate.go b/pkg/ext-proc/scheduling/orchestrate.go new file mode 100644 index 00000000..3b8f5908 --- /dev/null +++ b/pkg/ext-proc/scheduling/orchestrate.go @@ -0,0 +1,104 @@ +package scheduling + +import ( + "encoding/json" + "fmt" + + "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" + klog "k8s.io/klog/v2" +) + +type FilterOrchestrator interface { + Orchestrate() FilterChain +} + +func NewFilterOrchestrator(datastore *backend.K8sDatastore) *FilterOrchestratorImpl { + return &FilterOrchestratorImpl{ + datastore: datastore, + } +} + +type FilterOrchestratorImpl struct { + datastore *backend.K8sDatastore + lastUpdated string + storedFilter *filterChainImpl +} + +var _ FilterOrchestrator = &FilterOrchestratorImpl{} + +type FilterOrchestration struct { + Name string `json:"name"` + NextOnSuccess *FilterOrchestration `json:"nextOnSuccess,omitempty"` + NextOnFailure *FilterOrchestration `json:"nextOnFailure,omitempty"` + NextOnSuccessOrFailure *FilterOrchestration `json:"nextOnSuccessOrFailure,omitempty"` + FilterOption *FilterOption `json:"filterOption,omitempty"` +} + +func (o *FilterOrchestratorImpl) Orchestrate() FilterChain { + if o == nil { + return defaultFilter + } + if o.datastore == nil || o.datastore.GetFilterConfigMap() == nil { + return defaultFilter + } + + if o.lastUpdated == string(o.datastore.GetFilterConfigMap().UID)+o.datastore.GetFilterConfigMap().ResourceVersion { + return o.storedFilter + } + + o.lastUpdated = string(o.datastore.GetFilterConfigMap().UID) + o.datastore.GetFilterConfigMap().ResourceVersion + + f := &FilterOrchestration{} + if err := json.Unmarshal([]byte(o.datastore.GetFilterConfigMap().Data["filter"]), f); err != nil { + o.storedFilter = defaultFilter + klog.Errorf("error unmarshalling filter config: %v", err) + return defaultFilter + } + + filter, err := o.orchestrate(f) + if err != nil { + klog.Errorf("error orchestrating filters: %v", err) + filter = defaultFilter + } + + klog.V(1).Infof("filter orchestrated") + o.storedFilter = filter + return filter +} + +func (o *FilterOrchestratorImpl) orchestrate(fo *FilterOrchestration) (*filterChainImpl, error) { + if fo == nil { + return nil, nil + } + + fg, ok := filterMap[fo.Name] + if !ok { + return nil, fmt.Errorf("unknown filter %s", fo.Name) + } + + if err := fg.Validate(fo.FilterOption); err != nil { + return nil, err + } + + filter := &filterChainImpl{ + filter: fg.Get(fo.FilterOption), + name: fg.Name(), + } + + nextOnSuccess, err := o.orchestrate(fo.NextOnSuccess) + if err != nil { + return nil, err + } + nextOnFailure, err := o.orchestrate(fo.NextOnFailure) + if err != nil { + return nil, err + } + nextOnSuccessOrFailure, err := o.orchestrate(fo.NextOnSuccessOrFailure) + if err != nil { + return nil, err + } + filter.nextOnFailure = nextOnFailure + filter.nextOnSuccess = nextOnSuccess + filter.nextOnSuccessOrFailure = nextOnSuccessOrFailure + return filter, nil +} diff --git a/pkg/ext-proc/scheduling/orchestrate_test.go b/pkg/ext-proc/scheduling/orchestrate_test.go new file mode 100644 index 00000000..e1db8517 --- /dev/null +++ b/pkg/ext-proc/scheduling/orchestrate_test.go @@ -0,0 +1,270 @@ +package scheduling + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +// A copy from filter_test.go +func TestOrchestratedFilterChain(t *testing.T) { + fakeFilterConfigMap := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + UID: types.UID("111"), + ResourceVersion: "222", + }, + Data: map[string]string{ + "filter": ` +{ + "name": "critical_request", + "nextOnSuccess": { + "name": "low_latency", + "nextOnSuccess": { + "name": "affinity_lora", + "nextOnSuccess": { + "name": "least_queuing", + "nextOnSuccessOrFailure": { + "name": "least_kv_cache" + } + }, + "nextOnFailure": { + "name": "can_accept_new_lora", + "nextOnSuccessOrFailure": { + "name": "least_queuing", + "nextOnSuccessOrFailure": { + "name": "least_kv_cache" + } + } + } + }, + "nextOnFailure": { + "name": "least_queuing", + "nextOnSuccessOrFailure": { + "name": "low_cost_lora", + "nextOnSuccessOrFailure": { + "name": "least_kv_cache" + } + } + } + }, + "nextOnFailure": { + "name": "sheddable_request", + "nextOnSuccess": { + "name": "least_queuing", + "nextOnSuccessOrFailure": { + "name": "low_cost_lora", + "nextOnSuccessOrFailure": { + "name": "least_kv_cache" + } + } + }, + "nextOnFailure": { + "name": "drop_request" + } + } + } +`, + }, + } + datastore := backend.NewK8sDataStore(backend.WithFilterConfigMap(fakeFilterConfigMap)) + o := NewFilterOrchestrator(datastore) + tests := []struct { + name string + req *LLMRequest + input []*backend.PodMetrics + output []*backend.PodMetrics + err bool + filter *filterChainImpl + }{ + { + name: "orchestrated filter, critical request", + filter: o.Orchestrate().(*filterChainImpl), + req: &LLMRequest{ + Model: "critical", + ResolvedTargetModel: "critical", + Critical: true, + }, + // pod2 will be picked because it has relatively low queue size, with the requested + // model being active, and has low KV cache. + input: []*backend.PodMetrics{ + { + Pod: backend.Pod{Name: "pod1"}, + Metrics: backend.Metrics{ + WaitingQueueSize: 0, + KVCacheUsagePercent: 0.2, + MaxActiveModels: 2, + ActiveModels: map[string]int{ + "foo": 1, + "bar": 1, + }, + }, + }, + { + Pod: backend.Pod{Name: "pod2"}, + Metrics: backend.Metrics{ + WaitingQueueSize: 3, + KVCacheUsagePercent: 0.1, + MaxActiveModels: 2, + ActiveModels: map[string]int{ + "foo": 1, + "critical": 1, + }, + }, + }, + { + Pod: backend.Pod{Name: "pod3"}, + Metrics: backend.Metrics{ + WaitingQueueSize: 10, + KVCacheUsagePercent: 0.2, + MaxActiveModels: 2, + ActiveModels: map[string]int{ + "foo": 1, + }, + }, + }, + }, + output: []*backend.PodMetrics{ + { + Pod: backend.Pod{Name: "pod2"}, + Metrics: backend.Metrics{ + WaitingQueueSize: 3, + KVCacheUsagePercent: 0.1, + MaxActiveModels: 2, + ActiveModels: map[string]int{ + "foo": 1, + "critical": 1, + }, + }, + }, + }, + }, + { + name: "orchestrated filter, sheddable request, accepted", + filter: o.Orchestrate().(*filterChainImpl), + req: &LLMRequest{ + Model: "sheddable", + ResolvedTargetModel: "sheddable", + Critical: false, + }, + // pod1 will be picked because it has capacity for the sheddable request. + input: []*backend.PodMetrics{ + { + Pod: backend.Pod{Name: "pod1"}, + Metrics: backend.Metrics{ + WaitingQueueSize: 0, + KVCacheUsagePercent: 0.2, + MaxActiveModels: 2, + ActiveModels: map[string]int{ + "foo": 1, + "bar": 1, + }, + }, + }, + { + Pod: backend.Pod{Name: "pod2"}, + Metrics: backend.Metrics{ + WaitingQueueSize: 3, + KVCacheUsagePercent: 0.1, + MaxActiveModels: 2, + ActiveModels: map[string]int{ + "foo": 1, + "critical": 1, + }, + }, + }, + { + Pod: backend.Pod{Name: "pod3"}, + Metrics: backend.Metrics{ + WaitingQueueSize: 10, + KVCacheUsagePercent: 0.2, + MaxActiveModels: 2, + ActiveModels: map[string]int{ + "foo": 1, + }, + }, + }, + }, + output: []*backend.PodMetrics{ + { + Pod: backend.Pod{Name: "pod1"}, + Metrics: backend.Metrics{ + WaitingQueueSize: 0, + KVCacheUsagePercent: 0.2, + MaxActiveModels: 2, + ActiveModels: map[string]int{ + "foo": 1, + "bar": 1, + }, + }, + }, + }, + }, + { + name: "orchestrated filter, sheddable request, dropped", + filter: o.Orchestrate().(*filterChainImpl), + req: &LLMRequest{ + Model: "sheddable", + ResolvedTargetModel: "sheddable", + Critical: false, + }, + // All pods have higher KV cache thant the threshold, so the sheddable request will be + // dropped. + input: []*backend.PodMetrics{ + { + Pod: backend.Pod{Name: "pod1"}, + Metrics: backend.Metrics{ + WaitingQueueSize: 10, + KVCacheUsagePercent: 0.9, + MaxActiveModels: 2, + ActiveModels: map[string]int{ + "foo": 1, + "bar": 1, + }, + }, + }, + { + Pod: backend.Pod{Name: "pod2"}, + Metrics: backend.Metrics{ + WaitingQueueSize: 3, + KVCacheUsagePercent: 0.85, + MaxActiveModels: 2, + ActiveModels: map[string]int{ + "foo": 1, + "critical": 1, + }, + }, + }, + { + Pod: backend.Pod{Name: "pod3"}, + Metrics: backend.Metrics{ + WaitingQueueSize: 10, + KVCacheUsagePercent: 0.85, + MaxActiveModels: 2, + ActiveModels: map[string]int{ + "foo": 1, + }, + }, + }, + }, + output: []*backend.PodMetrics{}, + err: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got, err := test.filter.Filter(test.req, test.input) + if test.err != (err != nil) { + t.Errorf("Unexpected error, got %v, want %v", err, test.err) + } + + if diff := cmp.Diff(test.output, got); diff != "" { + t.Errorf("Unexpected output (-want +got): %v", diff) + } + }) + } +} diff --git a/pkg/ext-proc/scheduling/scheduler.go b/pkg/ext-proc/scheduling/scheduler.go index c6a91541..6a3015bd 100644 --- a/pkg/ext-proc/scheduling/scheduler.go +++ b/pkg/ext-proc/scheduling/scheduler.go @@ -23,21 +23,21 @@ const ( ) var ( - defaultFilter = &filter{ + defaultFilter = &filterChainImpl{ name: "critical request", - filter: toFilterFunc(criticalRequestPredicate), + filter: toFilter(criticalRequestPredicate), nextOnSuccess: lowLatencyFilter, nextOnFailure: sheddableRequestFilter, } // queueLoRAAndKVCacheFilter applied least queue -> low cost lora -> least KV Cache filter - queueLoRAAndKVCacheFilter = &filter{ + queueLoRAAndKVCacheFilter = &filterChainImpl{ name: "least queuing", filter: leastQueuingFilterFunc, - nextOnSuccessOrFailure: &filter{ + nextOnSuccessOrFailure: &filterChainImpl{ name: "low cost LoRA", - filter: toFilterFunc(lowLoRACostPredicate), - nextOnSuccessOrFailure: &filter{ + filter: toFilter(lowLoRACostPredicate), + nextOnSuccessOrFailure: &filterChainImpl{ name: "least KV cache percent", filter: leastKVCacheFilterFunc, }, @@ -45,41 +45,41 @@ var ( } // queueAndKVCacheFilter applies least queue followed by least KV Cache filter - queueAndKVCacheFilter = &filter{ + queueAndKVCacheFilter = &filterChainImpl{ name: "least queuing", filter: leastQueuingFilterFunc, - nextOnSuccessOrFailure: &filter{ + nextOnSuccessOrFailure: &filterChainImpl{ name: "least KV cache percent", filter: leastKVCacheFilterFunc, }, } - lowLatencyFilter = &filter{ + lowLatencyFilter = &filterChainImpl{ name: "low queueing filter", - filter: toFilterFunc((lowQueueingPodPredicate)), - nextOnSuccess: &filter{ + filter: toFilter((lowQueueingPodPredicate)), + nextOnSuccess: &filterChainImpl{ name: "affinity LoRA", - filter: toFilterFunc(loRAAffinityPredicate), + filter: toFilter(loRAAffinityPredicate), nextOnSuccess: queueAndKVCacheFilter, - nextOnFailure: &filter{ + nextOnFailure: &filterChainImpl{ name: "can accept LoRA Adapter", - filter: toFilterFunc(canAcceptNewLoraPredicate), + filter: toFilter(canAcceptNewLoraPredicate), nextOnSuccessOrFailure: queueAndKVCacheFilter, }, }, nextOnFailure: queueLoRAAndKVCacheFilter, } - sheddableRequestFilter = &filter{ + sheddableRequestFilter = &filterChainImpl{ // When there is at least one model server that's not queuing requests, and still has KV // cache below a certain threshold, we consider this model server has capacity to handle // a sheddable request without impacting critical requests. name: "has capacity for sheddable requests", - filter: toFilterFunc(noQueueAndLessThanKVCacheThresholdPredicate(queueThresholdCritical, kvCacheThreshold)), + filter: toFilter(noQueueAndLessThanKVCacheThresholdPredicate(queueThresholdCritical, kvCacheThreshold)), nextOnSuccess: queueLoRAAndKVCacheFilter, // If all pods are queuing or running above the KVCache threshold, we drop the sheddable // request to make room for critical requests. - nextOnFailure: &filter{ + nextOnFailure: &filterChainImpl{ name: "drop request", filter: func(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { klog.Infof("Dropping request %v", req) @@ -90,17 +90,30 @@ var ( } ) -func NewScheduler(pmp PodMetricsProvider) *Scheduler { - - return &Scheduler{ +func NewScheduler(pmp PodMetricsProvider, opts ...SchedulerOption) *Scheduler { + s := &Scheduler{ podMetricsProvider: pmp, filter: defaultFilter, } + + for _, opt := range opts { + opt(s) + } + return s } +func WithOrchestrator(orchestrator FilterOrchestrator) SchedulerOption { + return func(s *Scheduler) { + s.filterOrchestrator = orchestrator + } +} + +type SchedulerOption func(*Scheduler) + type Scheduler struct { podMetricsProvider PodMetricsProvider - filter Filter + filter FilterChain + filterOrchestrator FilterOrchestrator } // PodMetricsProvider is an interface to provide set of pods in the backend and information such as @@ -112,7 +125,7 @@ type PodMetricsProvider interface { // Schedule finds the target pod based on metrics and the requested lora adapter. func (s *Scheduler) Schedule(req *LLMRequest) (targetPod backend.Pod, err error) { klog.V(3).Infof("request: %v; metrics: %+v", req, s.podMetricsProvider.AllPodMetrics()) - pods, err := s.filter.Filter(req, s.podMetricsProvider.AllPodMetrics()) + pods, err := s.filterOrchestrator.Orchestrate().Filter(req, s.podMetricsProvider.AllPodMetrics()) if err != nil || len(pods) == 0 { return backend.Pod{}, fmt.Errorf( "failed to apply filter, resulted %v pods, this should never happen: %w", len(pods), err) From 2480f0e9c8ba3a0b64dbd24c6c483b1addbc7801 Mon Sep 17 00:00:00 2001 From: Kuromesi <blackfacepan@163.com> Date: Thu, 9 Jan 2025 16:24:45 +0800 Subject: [PATCH 2/6] update cluster role, add more tests for orchestrator Signed-off-by: Kuromesi <blackfacepan@163.com> --- pkg/ext-proc/scheduling/orchestrate.go | 9 +- pkg/ext-proc/scheduling/orchestrate_test.go | 101 ++++++++++++++++++++ pkg/manifests/ext_proc.yaml | 3 + 3 files changed, 111 insertions(+), 2 deletions(-) diff --git a/pkg/ext-proc/scheduling/orchestrate.go b/pkg/ext-proc/scheduling/orchestrate.go index 3b8f5908..bf1f3434 100644 --- a/pkg/ext-proc/scheduling/orchestrate.go +++ b/pkg/ext-proc/scheduling/orchestrate.go @@ -5,6 +5,7 @@ import ( "fmt" "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" + corev1 "k8s.io/api/core/v1" klog "k8s.io/klog/v2" ) @@ -42,11 +43,11 @@ func (o *FilterOrchestratorImpl) Orchestrate() FilterChain { return defaultFilter } - if o.lastUpdated == string(o.datastore.GetFilterConfigMap().UID)+o.datastore.GetFilterConfigMap().ResourceVersion { + if o.lastUpdated == lastUpdatedKey(o.datastore.GetFilterConfigMap()) { return o.storedFilter } - o.lastUpdated = string(o.datastore.GetFilterConfigMap().UID) + o.datastore.GetFilterConfigMap().ResourceVersion + o.lastUpdated = lastUpdatedKey(o.datastore.GetFilterConfigMap()) f := &FilterOrchestration{} if err := json.Unmarshal([]byte(o.datastore.GetFilterConfigMap().Data["filter"]), f); err != nil { @@ -102,3 +103,7 @@ func (o *FilterOrchestratorImpl) orchestrate(fo *FilterOrchestration) (*filterCh filter.nextOnSuccessOrFailure = nextOnSuccessOrFailure return filter, nil } + +func lastUpdatedKey(cm *corev1.ConfigMap) string { + return string(cm.UID) + cm.ResourceVersion +} diff --git a/pkg/ext-proc/scheduling/orchestrate_test.go b/pkg/ext-proc/scheduling/orchestrate_test.go index e1db8517..5fa9892f 100644 --- a/pkg/ext-proc/scheduling/orchestrate_test.go +++ b/pkg/ext-proc/scheduling/orchestrate_test.go @@ -10,6 +10,107 @@ import ( "k8s.io/apimachinery/pkg/types" ) +func TestOrchestrate(t *testing.T) { + cases := []struct { + name string + cm *corev1.ConfigMap + fo *FilterOrchestratorImpl + expect func(get *filterChainImpl) bool + }{ + { + name: "test orchestrating filter chain", + cm: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + UID: "111", + ResourceVersion: "222", + }, + Data: map[string]string{ + "filter": ` + { + "name": "can_accept_new_lora", + "nextOnSuccessOrFailure": { + "name": "least_kv_cache" + } + } + `, + }, + }, + fo: &FilterOrchestratorImpl{}, + expect: func(get *filterChainImpl) bool { + return get.name == FilterCanAcceptNewLoraName && get.nextOnSuccessOrFailure.name == FilterLeastKvCacheName + }, + }, + { + name: "test bad orchestration json format, missing a closing bracket, expect return default filter", + cm: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + UID: "111", + ResourceVersion: "222", + }, + Data: map[string]string{ + "filter": ` + { + "name": "can_accept_new_lora", + "nextOnSuccessOrFailure": { + "name": "least_kv_cache" + } + `, + }, + }, + fo: &FilterOrchestratorImpl{}, + expect: func(get *filterChainImpl) bool { + return get == defaultFilter + }, + }, + { + name: "test cached filter chain", + cm: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + UID: "111", + ResourceVersion: "222", + }, + Data: map[string]string{}, + }, + fo: &FilterOrchestratorImpl{ + lastUpdated: lastUpdatedKey(&corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: "111", ResourceVersion: "222"}}), + storedFilter: &filterChainImpl{ + name: "fake_cached_filter", + }, + }, + expect: func(get *filterChainImpl) bool { + return get.name == "fake_cached_filter" + }, + }, + { + name: "test nil filter orchestrator, should return default filter chain", + cm: nil, + fo: nil, + expect: func(get *filterChainImpl) bool { + return get == defaultFilter + }, + }, + { + name: "test nil configmap, should return default filter", + cm: nil, + fo: &FilterOrchestratorImpl{}, + expect: func(get *filterChainImpl) bool { + return get == defaultFilter + }, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + if tc.fo != nil { + tc.fo.datastore = backend.NewK8sDataStore(backend.WithFilterConfigMap(tc.cm)) + } + filter := tc.fo.Orchestrate() + if !tc.expect(filter.(*filterChainImpl)) { + t.Error("filter chain not match") + } + }) + } +} + // A copy from filter_test.go func TestOrchestratedFilterChain(t *testing.T) { fakeFilterConfigMap := &corev1.ConfigMap{ diff --git a/pkg/manifests/ext_proc.yaml b/pkg/manifests/ext_proc.yaml index a9141071..06886b1a 100644 --- a/pkg/manifests/ext_proc.yaml +++ b/pkg/manifests/ext_proc.yaml @@ -15,6 +15,9 @@ rules: - apiGroups: ["discovery.k8s.io"] resources: ["endpointslices"] verbs: ["get", "watch", "list"] +- apiGroups: [""] + resources: ["configmaps"] + verbs: ["get", "watch", "list"] --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 From a49490c4d58c3e5e7c962c11cbc3322276dc13a0 Mon Sep 17 00:00:00 2001 From: Kuromesi <blackfacepan@163.com> Date: Thu, 9 Jan 2025 16:34:02 +0800 Subject: [PATCH 3/6] add validator for sheddable_request filter Signed-off-by: Kuromesi <blackfacepan@163.com> --- pkg/ext-proc/scheduling/filtergen.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/pkg/ext-proc/scheduling/filtergen.go b/pkg/ext-proc/scheduling/filtergen.go index 9e3b66e1..1dd88c4b 100644 --- a/pkg/ext-proc/scheduling/filtergen.go +++ b/pkg/ext-proc/scheduling/filtergen.go @@ -1,5 +1,7 @@ package scheduling +import "fmt" + const ( FilterCriticalRequestName = "critical_request" FilterLeastQueuingName = "least_queuing" @@ -118,7 +120,18 @@ var ( } return toFilter(noQueueAndLessThanKVCacheThresholdPredicate(qtc, kct)) }, - validator: func(fo *FilterOption) error { return nil }, + validator: func(fo *FilterOption) error { + if fo == nil { + return nil + } + if fo.KvCacheThreshold != nil && *fo.KvCacheThreshold < 0 { + return fmt.Errorf("invalid kvCacheThreshold: %f", *fo.KvCacheThreshold) + } + if fo.QueueThresholdCritical != nil && *fo.QueueThresholdCritical < 0 { + return fmt.Errorf("invalid queueThresholdCritical: %d", *fo.QueueThresholdCritical) + } + return nil + }, } FilterLeastKvCache FilterGen = &filterGenImpl{ From 6c1af0a02a08e7e684ea9a3a7a9e0b7829e5f7fa Mon Sep 17 00:00:00 2001 From: Kuromesi <blackfacepan@163.com> Date: Thu, 9 Jan 2025 17:02:57 +0800 Subject: [PATCH 4/6] add tests for filter gen Signed-off-by: Kuromesi <blackfacepan@163.com> --- pkg/ext-proc/scheduling/filter.go | 10 +-- pkg/ext-proc/scheduling/filtergen.go | 19 ++++-- pkg/ext-proc/scheduling/filtergen_test.go | 79 +++++++++++++++++++++++ pkg/ext-proc/scheduling/scheduler.go | 10 +-- 4 files changed, 105 insertions(+), 13 deletions(-) create mode 100644 pkg/ext-proc/scheduling/filtergen_test.go diff --git a/pkg/ext-proc/scheduling/filter.go b/pkg/ext-proc/scheduling/filter.go index dec75bef..b85c537b 100644 --- a/pkg/ext-proc/scheduling/filter.go +++ b/pkg/ext-proc/scheduling/filter.go @@ -123,10 +123,6 @@ func leastQueuingFilterFunc(req *LLMRequest, pods []*backend.PodMetrics) ([]*bac return filtered, nil } -func lowQueueingPodPredicate(_ *LLMRequest, pod *backend.PodMetrics) bool { - return pod.WaitingQueueSize < queueingThresholdLoRA -} - // leastKVCacheFilterFunc finds the max and min KV cache of all pods, divides the whole range // (max-min) by the number of pods, and finds the pods that fall into the first range. // The intuition is that if there are multiple pods that share similar KV cache in the low range, we @@ -194,3 +190,9 @@ func noQueueAndLessThanKVCacheThresholdPredicate(queueThreshold int, kvCacheThre return pod.WaitingQueueSize <= queueThreshold && pod.KVCacheUsagePercent <= kvCacheThreshold } } + +func lowQueueingPodPredicate(queueingThresholdLoRA int) podPredicate { + return func(_ *LLMRequest, pod *backend.PodMetrics) bool { + return pod.WaitingQueueSize < queueingThresholdLoRA + } +} diff --git a/pkg/ext-proc/scheduling/filtergen.go b/pkg/ext-proc/scheduling/filtergen.go index 1dd88c4b..244d8204 100644 --- a/pkg/ext-proc/scheduling/filtergen.go +++ b/pkg/ext-proc/scheduling/filtergen.go @@ -1,6 +1,8 @@ package scheduling -import "fmt" +import ( + "fmt" +) const ( FilterCriticalRequestName = "critical_request" @@ -93,9 +95,18 @@ var ( FilterLowLatency FilterGen = &filterGenImpl{ name: FilterLowLatencyName, getter: func(fo *FilterOption) filter { - return toFilter(lowQueueingPodPredicate) + qtl := defaultQueueingThresholdLoRA + if fo != nil && fo.QueueingThresholdLoRA != nil { + qtl = *fo.QueueingThresholdLoRA + } + return toFilter(lowQueueingPodPredicate(qtl)) + }, + validator: func(fo *FilterOption) error { + if fo != nil && fo.QueueingThresholdLoRA != nil && *fo.QueueingThresholdLoRA < 0 { + return fmt.Errorf("invalid queueingThresholdLoRA:%d", *fo.QueueingThresholdLoRA) + } + return nil }, - validator: func(fo *FilterOption) error { return nil }, } FilterAffinityLora FilterGen = &filterGenImpl{ @@ -109,7 +120,7 @@ var ( FilterSheddableRequest FilterGen = &filterGenImpl{ name: FilterSheddableRequestName, getter: func(opt *FilterOption) filter { - qtc, kct := queueThresholdCritical, kvCacheThreshold + qtc, kct := defaultQueueThresholdCritical, defaultKvCacheThreshold if opt != nil { if opt.KvCacheThreshold != nil { kct = *opt.KvCacheThreshold diff --git a/pkg/ext-proc/scheduling/filtergen_test.go b/pkg/ext-proc/scheduling/filtergen_test.go new file mode 100644 index 00000000..72bd7367 --- /dev/null +++ b/pkg/ext-proc/scheduling/filtergen_test.go @@ -0,0 +1,79 @@ +package scheduling + +import "testing" + +func TestFilterGenValidation(t *testing.T) { + testCases := []struct { + name string + fo *FilterOption + filterGen FilterGen + wantErr bool + }{ + { + name: "valid sheddable_request filter option", + fo: &FilterOption{ + KvCacheThreshold: toPtr(0.8), + QueueThresholdCritical: toPtr(5), + }, + filterGen: FilterSheddableRequest, + wantErr: false, + }, + { + name: "valid sheddable_request filter option, nil option", + fo: &FilterOption{}, + filterGen: FilterSheddableRequest, + wantErr: false, + }, + { + name: "valid sheddable_request filter option, nil QueueThresholdCritical", + fo: &FilterOption{ + KvCacheThreshold: toPtr(0.8), + }, + filterGen: FilterSheddableRequest, + wantErr: false, + }, + { + name: "invalid sheddable_request filter option", + fo: &FilterOption{ + KvCacheThreshold: toPtr(-1.0), + QueueThresholdCritical: toPtr(5), + }, + filterGen: FilterSheddableRequest, + wantErr: true, + }, + { + name: "valid low_latency filter option", + fo: &FilterOption{ + QueueingThresholdLoRA: toPtr(50), + }, + filterGen: FilterLowLatency, + wantErr: false, + }, + { + name: "valid low_latency filter option, nil option", + fo: &FilterOption{}, + filterGen: FilterLowLatency, + wantErr: false, + }, + { + name: "invalid low_latency filter option", + fo: &FilterOption{ + QueueingThresholdLoRA: toPtr(-1), + }, + filterGen: FilterLowLatency, + wantErr: true, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := tc.filterGen.Validate(tc.fo) + if (err != nil) != tc.wantErr { + t.Errorf("Validate() error = %v, wantErr %v", err, tc.wantErr) + } + }) + } +} + +func toPtr[T any](v T) *T { + return &v +} diff --git a/pkg/ext-proc/scheduling/scheduler.go b/pkg/ext-proc/scheduling/scheduler.go index 6a3015bd..d2ac0faa 100644 --- a/pkg/ext-proc/scheduling/scheduler.go +++ b/pkg/ext-proc/scheduling/scheduler.go @@ -13,13 +13,13 @@ import ( const ( // TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/16) Make this configurable. - kvCacheThreshold = 0.8 + defaultKvCacheThreshold = 0.8 // TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/16) Make this configurable. - queueThresholdCritical = 5 + defaultQueueThresholdCritical = 5 // TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/16) Make this configurable. // the threshold for queued requests to be considered low below which we can prioritize LoRA affinity. // The value of 50 is arrived heuristicically based on experiments. - queueingThresholdLoRA = 50 + defaultQueueingThresholdLoRA = 50 ) var ( @@ -56,7 +56,7 @@ var ( lowLatencyFilter = &filterChainImpl{ name: "low queueing filter", - filter: toFilter((lowQueueingPodPredicate)), + filter: toFilter((lowQueueingPodPredicate(defaultQueueingThresholdLoRA))), nextOnSuccess: &filterChainImpl{ name: "affinity LoRA", filter: toFilter(loRAAffinityPredicate), @@ -75,7 +75,7 @@ var ( // cache below a certain threshold, we consider this model server has capacity to handle // a sheddable request without impacting critical requests. name: "has capacity for sheddable requests", - filter: toFilter(noQueueAndLessThanKVCacheThresholdPredicate(queueThresholdCritical, kvCacheThreshold)), + filter: toFilter(noQueueAndLessThanKVCacheThresholdPredicate(defaultQueueThresholdCritical, defaultKvCacheThreshold)), nextOnSuccess: queueLoRAAndKVCacheFilter, // If all pods are queuing or running above the KVCache threshold, we drop the sheddable // request to make room for critical requests. From 24e7d3b16788a39d8e4915ab38828f7713fe376a Mon Sep 17 00:00:00 2001 From: Kuromesi <blackfacepan@163.com> Date: Thu, 9 Jan 2025 18:16:53 +0800 Subject: [PATCH 5/6] optimize Signed-off-by: Kuromesi <blackfacepan@163.com> --- pkg/ext-proc/backend/datastore.go | 3 +++ pkg/ext-proc/scheduling/orchestrate.go | 10 ++++++---- pkg/ext-proc/scheduling/scheduler.go | 1 + 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/pkg/ext-proc/backend/datastore.go b/pkg/ext-proc/backend/datastore.go index cf4ff869..a090d3b9 100644 --- a/pkg/ext-proc/backend/datastore.go +++ b/pkg/ext-proc/backend/datastore.go @@ -36,6 +36,9 @@ type K8sDatastore struct { type K8sDatastoreOption func(*K8sDatastore) func (ds *K8sDatastore) GetFilterConfigMap() *corev1.ConfigMap { + if ds == nil { + return nil + } ds.poolMu.RLock() defer ds.poolMu.RUnlock() return ds.filterConfigMap diff --git a/pkg/ext-proc/scheduling/orchestrate.go b/pkg/ext-proc/scheduling/orchestrate.go index bf1f3434..a65f0486 100644 --- a/pkg/ext-proc/scheduling/orchestrate.go +++ b/pkg/ext-proc/scheduling/orchestrate.go @@ -39,18 +39,20 @@ func (o *FilterOrchestratorImpl) Orchestrate() FilterChain { if o == nil { return defaultFilter } - if o.datastore == nil || o.datastore.GetFilterConfigMap() == nil { + + cm := o.datastore.GetFilterConfigMap() + if cm == nil { return defaultFilter } - if o.lastUpdated == lastUpdatedKey(o.datastore.GetFilterConfigMap()) { + if o.lastUpdated == lastUpdatedKey(cm) { return o.storedFilter } - o.lastUpdated = lastUpdatedKey(o.datastore.GetFilterConfigMap()) + o.lastUpdated = lastUpdatedKey(cm) f := &FilterOrchestration{} - if err := json.Unmarshal([]byte(o.datastore.GetFilterConfigMap().Data["filter"]), f); err != nil { + if err := json.Unmarshal([]byte(cm.Data["filter"]), f); err != nil { o.storedFilter = defaultFilter klog.Errorf("error unmarshalling filter config: %v", err) return defaultFilter diff --git a/pkg/ext-proc/scheduling/scheduler.go b/pkg/ext-proc/scheduling/scheduler.go index d2ac0faa..32b9c307 100644 --- a/pkg/ext-proc/scheduling/scheduler.go +++ b/pkg/ext-proc/scheduling/scheduler.go @@ -94,6 +94,7 @@ func NewScheduler(pmp PodMetricsProvider, opts ...SchedulerOption) *Scheduler { s := &Scheduler{ podMetricsProvider: pmp, filter: defaultFilter, + filterOrchestrator: &FilterOrchestratorImpl{}, } for _, opt := range opts { From d7edec84a3d299f8d38fd54221f30de751329b1d Mon Sep 17 00:00:00 2001 From: Kuromesi <blackfacepan@163.com> Date: Tue, 14 Jan 2025 15:36:34 +0800 Subject: [PATCH 6/6] tidy code Signed-off-by: Kuromesi <blackfacepan@163.com> --- pkg/ext-proc/main.go | 2 +- pkg/ext-proc/scheduling/orchestrate.go | 15 +++++++++++++++ pkg/ext-proc/scheduling/scheduler.go | 6 ++++-- 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/pkg/ext-proc/main.go b/pkg/ext-proc/main.go index 381c9baf..18ed1453 100644 --- a/pkg/ext-proc/main.go +++ b/pkg/ext-proc/main.go @@ -207,7 +207,7 @@ func startExternalProcessorServer( ) *grpc.Server { svr := grpc.NewServer() - var orchestrator *scheduling.FilterOrchestratorImpl + var orchestrator scheduling.FilterOrchestrator if *enableFilterConfiguration { orchestrator = scheduling.NewFilterOrchestrator(datastore) } diff --git a/pkg/ext-proc/scheduling/orchestrate.go b/pkg/ext-proc/scheduling/orchestrate.go index a65f0486..93f72ce8 100644 --- a/pkg/ext-proc/scheduling/orchestrate.go +++ b/pkg/ext-proc/scheduling/orchestrate.go @@ -19,6 +19,21 @@ func NewFilterOrchestrator(datastore *backend.K8sDatastore) *FilterOrchestratorI } } +func NewDefaultFilterOrchestrator() *DefaultFilterOrchestrator { + return &DefaultFilterOrchestrator{} +} + +// DefaultFilterOrchestrator is a filter orchestrator that returns the default filter chain +type DefaultFilterOrchestrator struct{} + +var _ FilterOrchestrator = &DefaultFilterOrchestrator{} + +func (DefaultFilterOrchestrator) Orchestrate() FilterChain { + return defaultFilter +} + +// FilterOrchestratorImpl is a filter orchestrator that reads the filter configuration +// from configmap and orchestrate the filter chain type FilterOrchestratorImpl struct { datastore *backend.K8sDatastore lastUpdated string diff --git a/pkg/ext-proc/scheduling/scheduler.go b/pkg/ext-proc/scheduling/scheduler.go index 32b9c307..d55dcb2d 100644 --- a/pkg/ext-proc/scheduling/scheduler.go +++ b/pkg/ext-proc/scheduling/scheduler.go @@ -94,7 +94,7 @@ func NewScheduler(pmp PodMetricsProvider, opts ...SchedulerOption) *Scheduler { s := &Scheduler{ podMetricsProvider: pmp, filter: defaultFilter, - filterOrchestrator: &FilterOrchestratorImpl{}, + filterOrchestrator: NewDefaultFilterOrchestrator(), } for _, opt := range opts { @@ -105,7 +105,9 @@ func NewScheduler(pmp PodMetricsProvider, opts ...SchedulerOption) *Scheduler { func WithOrchestrator(orchestrator FilterOrchestrator) SchedulerOption { return func(s *Scheduler) { - s.filterOrchestrator = orchestrator + if orchestrator != nil { + s.filterOrchestrator = orchestrator + } } }