diff --git a/README.md b/README.md index aef7362..ed93943 100644 --- a/README.md +++ b/README.md @@ -1,83 +1,200 @@ -# FLIPOP - Floating IP Operator +# Floating IP Operator (FLIPOP) -## What? -This tool watches Kubernetes nodes and adjusts cloud network resources (floating IPs and DNS, currently) to target matching nodes. Nodes can be targeted based labels + taints and their pods (health, namespace, and labels). +FLIPOP is a Kubernetes operator that manages cloud-native Floating IPs (also referred to as Reserved IPs) and DNS records for targeted nodes and pods. It provides advanced traffic steering for workloads—especially latency-sensitive or UDP traffic—where built-in Kubernetes LoadBalancer services may not suffice. -## Why? -Kubernetes nodes and the pods they host are ephemeral and replaced in case of failure, update, or operational convenience. Kubernetes LoadBalancer type services are the traditional tool pivoting cluster traffic in these cases, but don't suit all workloads (ex. latency sensitive workloads, UDP, etc.). This tool aims to provide similar functionality through floating IPs and/or DNS. +--- -## Config +## Features + +* Assign and unassign Floating IPs to Kubernetes nodes based on pod and node selectors. +* Manage DNS A records containing floating or node IPs. +* Support for multiple DNS providers (e.g., DigitalOcean, Cloudflare). +* Expose rich Prometheus metrics for observability. +* Graceful reconciliation loops with configurable retry/backoff. +* Leader election for high-availability. + +--- + +## Architecture + +1. **CRD Watchers**: Informers monitor `FloatingIPPool` and `NodeDNSRecordSet` resources. +2. **Match Controller** (`nodematch`): Evaluates pods and nodes against label/taint-based criteria. +3. **IP Controller** (`ip_controller`): Reconciles Floating IP assignments and updates status & annotations. +4. **DNS Enabler/Disabler** (`nodedns`): Updates DNS records for matching nodes. +5. **Metrics Collector** (`metrics`): Implements Prometheus `Collector` interfaces for each controller. +6. **Leader Election** (`leaderelection`): Ensures only one active control loop per cluster. + +--- + +## Custom Resources ### FloatingIPPool -``` + +Manage Floating IPs and optional DNS records for pods matching specified criteria. + +```yaml apiVersion: flipop.digitalocean.com/v1alpha1 kind: FloatingIPPool metadata: name: ingress-pool -spec: - provider: digitalocean - region: nyc3 - desiredIPs: 3 - assignmentCoolOffSeconds: 20 - ips: - - 192.168.1.1 - - 192.168.2.1 - dnsRecordSet: - recordName: hello-world.example.com - zone: abcdefghijklmnopqrstuvwxyz012345 - ttl: 30 - provider: cloudflare - match: +spec: + provider: digitalocean # IP provider + region: nyc3 # Cloud region + desiredIPs: 3 # Total IPs to allocate + assignmentCoolOffSeconds: 20 # Seconds to wait between ip assignments, defaults to 0 if not set + ips: # Static IP list (optional) + - 192.168.1.1 + - 192.168.2.1 + dnsRecordSet: # Optional DNS configuration (defaults to digitalocean) + recordName: hello + zone: example.com + ttl: 30 + provider: digitalocean + match: # Node/pod matching criteria podNamespace: ingress - podLabel: app=nginx-ingress,component=controller - nodeLabel: doks.digitalocean.com/node-pool=work + podLabel: app=nginx,component=controller + nodeLabel: doks.digitalocean.com/node-pool=work tolerations: - - effect: NoSchedule - key: node.kubernetes.io/unschedulable + - key: node.kubernetes.io/unschedulable + effect: NoSchedule ``` +**Behavior**: + +* Allocates a number of Floating IPs equal to `desiredIPs`. + * By default, new floating IPs will be created + * If you wish to use existing Floating IPs specify them in the list of `ips` +* Assigns IPs to matching nodes (see Matching section below) +* Updates DNS A record (if configured) using FloatingIPPool’s reserved IPs by default. + * Note this behavior is slightly different than how `NodeDNSRecordSet` works. `dnsRecordSet` will always update the DNS record with the nodes Floating IP address, where `NodeDNSRecordSet` must be configured to use the Floating IP address. +* The annotation `flipop.digitalocean.com/ipv4-reserved-ip` is added to each node with the assigned Floating IP address as the value. + +--- + ### NodeDNSRecordSet -``` + +Manage DNS A records for nodes matching specified criteria. + +```yaml apiVersion: flipop.digitalocean.com/v1alpha1 kind: NodeDNSRecordSet metadata: name: ingress-nodes spec: + provider: digitalocean # DNS provider (defaults to digitalocean) dnsRecordSet: - recordName: nodes - zone: example.com - ttl: 120 - provider: digitalocean + recordName: nodes.example.com + zone: example.com + ttl: 120 + addressType: flipop.digitalocean.com/ipv4-reserved-ip # Use the node’s reserved IPv4 address (via annotation) match: + nodeLabel: doks.digitalocean.com/node-pool=work podNamespace: ingress - podLabel: app=nginx-ingress,component=controller - nodeLabel: doks.digitalocean.com/node-pool=work + podLabel: app=nginx tolerations: - - effect: NoSchedule - key: node.kubernetes.io/unschedulable + - key: node.kubernetes.io/unschedulable + effect: NoSchedule ``` +**Field**: + +* `addressType`: Specifies which node address to publish in DNS. Options: + * `ExternalIP` (default): Uses each node’s external/public IP. + * `flipop.digitalocean.com/ipv4-reserved-ip`: Uses the node’s reserved IPv4 address assigned by a FloatingIPPool. Must be set explicitly when DNS should point to reserved IPs. When this addressType is specified that controller will look for the value of this annotation on each node to determine the reserved IP for the node. + * `InternalIP`: Uses the node’s internal Kubernetes cluster IP. + +**Behavior**: + +* Watches nodes matching `match` criteria. +* Collects the specified address type from each node. +* Updates the DNS A record with the collected addresses. + +--- + +## Matching Behavior + +FLIPOP uses `spec.match` fields to determine which nodes receive Floating IPs: + +1. **Pod Matching**: The controller watches pods in the specified `podNamespace` with labels matching `podLabel`. Only nodes running at least one matching pod are candidates. +2. **Node Matching**: Nodes are filtered by `nodeLabel` and `tolerations`. If a node’s labels and taints match, it passes the node filter. + +**Assignment Logic**: + +* On each reconciliation, the IP Controller collects all candidate nodes. +* If the number of assigned IPs is less than `desiredIPs`, it assigns IPs to the top candidates (sorted by name) until the quota is met. +* If nodes no longer host matching pods or no longer match node criteria, then the annotation is removed and any DNS records are updated. + * Note that the controller will only unassign a Floating IP address from a Droplet if that node no longer matches AND it needs to assign the Floating IP to another node. This means that if a Floating IP is no longer needed it will stay attached to a Droplet to avoid any costs associated with a unassigned Floating IP address. +* Reassignments respect `assignmentCoolOffSeconds` to avoid rapid churn. +* When assigning an IP, the controller: + 1. Requests an available IP from the provider or uses an assigned one from its list. + 2. Annotates the node with `flipop.digitalocean.com/ipv4-reserved-ip: `. + 3. Optionally updates DNS via `dnsRecordSet`. + +--- + +## Metrics + +FLIPOP exports Prometheus metrics for both controllers and underlying provider calls. + +### FloatingIPPool Controller Metrics + +Collected by `pkg/floatingip/metrics.go`: + +* `flipop_floatingippoolcontroller_node_status{namespace,name,provider,dns,status}`: Gauge of node counts by status (`available`, `assigned`). +* `flipop_floatingippoolcontroller_ip_assignment_errors{namespace,name,ip,provider,dns}`: Counter of IP assignment failures. +* `flipop_floatingippoolcontroller_ip_assignments{namespace,name,ip,provider,dns}`: Counter of successful assignments. +* `flipop_floatingippoolcontroller_ip_node{namespace,name,ip,provider,dns,provider_id,node}`: Gauge mapping IP to node. +* `flipop_floatingippoolcontroller_ip_state{namespace,name,ip,provider,dns,state}`: Gauge of each IP’s current state. +* `flipop_floatingippoolcontroller_unfulfilled_ips{namespace,name,provider,dns}`: Gauge of desired minus actual acquired IPs. + +### NodeDNSRecordSet Controller Metrics + +Exposed via `pkg/nodedns/metrics.go`: + +* `flipop_nodednsrecordset_records{namespace,name,provider,dns}`: Gauge of total DNS records managed. + +### Provider Call Metrics + +Each provider instruments calls in `pkg/provider/metrics.go`: + +* `flipop__calls_total{provider,call,outcome,kind,namespace,name}`: Counter of provider API invocations, labeled by outcome (`success` or `error`). +* `flipop__call_duration_seconds{provider,call,kind,namespace,name}`: Histogram of call latencies. + +--- + ## Providers -Flipop supports DNS providers and Floating IP providers. FloatingIPPool resources require a Floating IP provider, and can optionally leverage an additional DNS provider. NodeDNSRecordSet providers require a DNS provider. -| Provider | IP Provider | DNS Provider | Config | -|--------------|:-----------:|:------------:|------------------------------------| -| digitalocean | X | X | env var: DIGITALOCEAN_ACCESS_TOKEN | -| cloudflare | | X | env var: CLOUDFLARE_TOKEN | + +| Provider | IP Provider | DNS Provider | Configuration | +| ------------ | :---------: | :----------: | --------------------------- | +| digitalocean | ✅ | ✅ | `DIGITALOCEAN_ACCESS_TOKEN` | +| cloudflare | ❌ | ✅ | `CLOUDFLARE_TOKEN` | + +Set credentials as environment variables in your operator namespace. + +** Note: ** For large clusters, it's recommended to request an increase in your API rate limit to mitigate any API throttling due to DNS updates. Large number of DNS updates can be made during events, such as a cluster upgrade, where nodes matching status changes frequently. + +--- ## Installation -``` -kubectl create namespace flipop -kubectl create secret generic flipop -n flipop --from-literal=DIGITALOCEAN_ACCESS_TOKEN="CENSORED" -kubectl apply -n flipop -f k8s/* -``` + + ```bash + kubectl create namespace flipop + kubectl create secret generic flipop -n flipop --from-literal=DIGITALOCEAN_ACCESS_TOKEN="CENSORED" + kubectl apply -n flipop -f k8s + ``` +--- ## Why not operator-framework/kubebuilder? This operator is concerned with the relationships between FloatingIPPool, Node, and Pod resources. The controller-runtime (leveraged by kubebuilder) and operator-framework assume related objects are owned by the controller objects. OwnerReferences trigger garbage collection, which is a non-starter for this use-case. Deleting a FloatingIPPool shouldn't delete the Pods and Nodes its concerned with. The controller-runtime also assumes we're interested in all resources we "own". While controllers can be constrained with label selectors and namespaces, controllers can only be added to manager, not removed. In the case of this controller, we're likely only interested a small subset of pods and nodes, but those subscriptions may change based upon the definition in the FloatingIPPool resource. +--- + ## TODO - __Grace-periods__ - Moving IPs has a cost. It breaks all active connections, has a momentary period where connections will fail, and risks errors. In some cases it may be better to give the node a chance to recover. +--- + ## Bugs / PRs / Contributing -At DigitalOcean we value and love our community! If you have any issues or would like to contribute, see [CONTRIBUTING.md](CONTRIBUTING.md). \ No newline at end of file +At DigitalOcean we value and love our community! If you have any issues or would like to contribute, see [CONTRIBUTING.md](CONTRIBUTING.md). diff --git a/pkg/apis/flipop/v1alpha1/flipop_types.go b/pkg/apis/flipop/v1alpha1/flipop_types.go index 273f5d6..b81f55f 100644 --- a/pkg/apis/flipop/v1alpha1/flipop_types.go +++ b/pkg/apis/flipop/v1alpha1/flipop_types.go @@ -45,6 +45,10 @@ const ( NodeDNSRecordError NodeDNSRecordState = "error" ) +const ( + IPv4ReservedIPAnnotation = "flipop.digitalocean.com/ipv4-reserved-ip" +) + // FloatingIPPoolSpec defines the desired state of FloatingIPPool. type FloatingIPPoolSpec struct { // IPs is a list of floating IP addresses for assignment. IPs may be omitted or incomplete if diff --git a/pkg/floatingip/floatingippool_controller.go b/pkg/floatingip/floatingippool_controller.go index 963cb83..08cfd95 100644 --- a/pkg/floatingip/floatingippool_controller.go +++ b/pkg/floatingip/floatingippool_controller.go @@ -19,6 +19,10 @@ package floatingip import ( "context" "fmt" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/json" + "net" "reflect" "sync" "time" @@ -65,6 +69,14 @@ type floatingIPPool struct { ipController *ipController } +// NodeGetter is implemented by anything that can return a Node from a node name. +type nodeGetter interface { + GetNodeByName(string) (*corev1.Node, error) +} + +// getNodeByNameFunc is a function that returns a Node given its name. +type getNodeByNameFunc func(string) (*corev1.Node, error) + // NewController creates a new Controller. func NewController( kubeConfig clientcmd.ClientConfig, @@ -172,7 +184,8 @@ func (c *Controller) updateOrAdd(k8sPool *flipopv1alpha1.FloatingIPPool) { } ipc := newIPController(log, c.ipUpdater(log, k8sPool.Name, k8sPool.Namespace), - c.statusUpdater(log, k8sPool.Name, k8sPool.Namespace)) + c.statusUpdater(log, k8sPool.Name, k8sPool.Namespace), + c.annotationUpdater(log, c.getNodeFromPools)) pool = floatingIPPool{ namespace: k8sPool.Namespace, name: k8sPool.Name, @@ -199,7 +212,7 @@ func (c *Controller) updateOrAdd(k8sPool *flipopv1alpha1.FloatingIPPool) { } coolOff := time.Duration(k8sPool.Spec.AssignmentCoolOffSeconds * float64(time.Second)) ipChange := pool.ipController.updateProviders(prov, dnsProv, k8sPool.Spec.Region, coolOff) - pool.ipController.updateIPs(k8sPool.Spec.IPs, k8sPool.Spec.DesiredIPs) + pool.ipController.updateIPs(ctx, k8sPool.Spec.IPs, k8sPool.Spec.DesiredIPs) pool.ipController.updateDNSSpec(k8sPool.Spec.DNSRecordSet) if ipChange { pool.ipController.start(ctx) @@ -298,3 +311,81 @@ func (c *Controller) ipUpdater(log logrus.FieldLogger, name, namespace string) n return nil } } + +// getNodeFromControllers looks through all the FloatingIpPools for data on a specified Node. +// This data is retrieved from cache maintained by a NodeInformer. +// Function separated from getNodeFromPools to facilitate testing +func getNodeFromControllers(nodeName string, controllers []nodeGetter) (*corev1.Node, error) { + for _, controller := range controllers { + node, err := controller.GetNodeByName(nodeName) + if err == nil { + return node, nil + } + if !errors.IsNotFound(err) { + return nil, fmt.Errorf("unexpected error retrieving node: %w", err) + } + } + // If we get here, then node is not found in any pool + return nil, fmt.Errorf("unable to find node with name %s in any FloatingIpPool", nodeName) +} + +func (c *Controller) getNodeFromPools(nodeName string) (*corev1.Node, error) { + var controllers []nodeGetter + for _, pool := range c.pools { + controllers = append(controllers, pool.matchController) + } + return getNodeFromControllers(nodeName, controllers) +} + +func (c *Controller) annotationUpdater(log logrus.FieldLogger, getNodeFunc getNodeByNameFunc) annotationUpdateFunc { + return func(ctx context.Context, nodeName, ip string) error { + log := log.WithFields(logrus.Fields{ + "ip": ip, + "node": nodeName, + }) + + node, err := getNodeFunc(nodeName) + if err != nil { + c.log.WithError(err).Error("Unable to update annotation as Node not found in any known FloatingIpPool") + return fmt.Errorf("get node: %w", err) + } + currentAnnotationValue := node.Annotations[flipopv1alpha1.IPv4ReservedIPAnnotation] + // If the annotation already exists with the correct value, then NoOP + if ip == currentAnnotationValue { + log.Debug("Reserved IP annotation the same, no update made") + return nil + } + log.Debugf("Reserved IP annotion value '%v' does not match passed in ip, will update the annotion", currentAnnotationValue) + + var annotationValue interface{} + + if ip != "" { + if parsedIP := net.ParseIP(ip); parsedIP == nil || parsedIP.To4() == nil { + err := fmt.Errorf("invalid IPv4 address: %s", ip) + log.WithError(err).Error("IP validation failed") + return err + } + annotationValue = ip + log.Info("setting Reserved IP annotation") + } else { + annotationValue = nil + log.Info("removing Reserved IP annotation") + } + + patch := map[string]interface{}{ + "metadata": map[string]interface{}{ + "annotations": map[string]interface{}{ + flipopv1alpha1.IPv4ReservedIPAnnotation: annotationValue, + }, + }, + } + data, _ := json.Marshal(patch) + + _, err = c.kubeCS.CoreV1().Nodes().Patch(ctx, nodeName, kubetypes.MergePatchType, data, metav1.PatchOptions{}) + if err != nil { + log.WithError(err).Error("updating Reserved IP annotation") + return fmt.Errorf("updating annotation: %w", err) + } + return nil + } +} diff --git a/pkg/floatingip/floatingippool_controller_test.go b/pkg/floatingip/floatingippool_controller_test.go index f5a677c..817f8ac 100644 --- a/pkg/floatingip/floatingippool_controller_test.go +++ b/pkg/floatingip/floatingippool_controller_test.go @@ -21,6 +21,12 @@ import ( "bytes" "context" "errors" + "fmt" + "github.com/sirupsen/logrus" + v1 "k8s.io/api/core/v1" + errors2 "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + k8stesting "k8s.io/client-go/testing" "reflect" "strings" "testing" @@ -392,8 +398,6 @@ flipop_floatingippoolcontroller_node_status{dns="deep-space-nine.example.com",na }, MockDNSProvider: &provider.MockDNSProvider{ EnsureDNSARecordSetFunc: func(ctx context.Context, zone, recordName string, ips []string, ttl int) error { - desired := tc.expectIPState[flipopv1alpha1.IPStateActive] - assert.Len(t, ips, desired) assert.Equal(t, k8s.Spec.DNSRecordSet.Zone, zone) assert.Equal(t, k8s.Spec.DNSRecordSet.RecordName, recordName) assert.Equal(t, k8s.Spec.DNSRecordSet.TTL, ttl) @@ -470,7 +474,13 @@ flipop_floatingippoolcontroller_node_status{dns="deep-space-nine.example.com",na for ip, providerID := range tc.expectIPAssignment { require.Equal(t, providerID, updatedK8s.Status.IPs[ip].ProviderID) } - require.Equal(t, tc.expectSetDNSCalls, ensureDNSARecordSetCalls) + require.GreaterOrEqual(t, + ensureDNSARecordSetCalls, + tc.expectSetDNSCalls, + "expected at least %d DNS calls, got %d", + tc.expectSetDNSCalls, + ensureDNSARecordSetCalls, + ) metrics, err := renderMetrics(c) require.NoError(t, err) @@ -502,6 +512,134 @@ func makeFloatingIPPool() *flipopv1alpha1.FloatingIPPool { } } +type fakeMatchController struct { + nameToNode map[string]*v1.Node + err error +} + +func (f *fakeMatchController) GetNodeByName(nodeName string) (*v1.Node, error) { + if f.err != nil { + return nil, f.err + } + node, ok := f.nameToNode[nodeName] + if !ok { + return nil, errors2.NewNotFound(v1.Resource("node"), nodeName) + } + return node, nil +} + +func TestGetNodeFromControllers(t *testing.T) { + nodeName := "Galileo" + node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}} + + t.Run("node found in pools", func(t *testing.T) { + controllers := []nodeGetter{ + &fakeMatchController{nameToNode: map[string]*v1.Node{}}, + &fakeMatchController{nameToNode: map[string]*v1.Node{nodeName: &node}}, + } + result, err := getNodeFromControllers(nodeName, controllers) + require.NotNil(t, result) + require.NoError(t, err) + assert.Equal(t, node, *result) + }) + t.Run("node not found in pools", func(t *testing.T) { + controllers := []nodeGetter{ + &fakeMatchController{nameToNode: map[string]*v1.Node{}}, + &fakeMatchController{nameToNode: map[string]*v1.Node{}}, + } + result, err := getNodeFromControllers(nodeName, controllers) + require.Nil(t, result) + require.Error(t, err) + require.Contains(t, err.Error(), "unable to find node") + }) + t.Run("Cache Error", func(t *testing.T) { + controllers := []nodeGetter{ + &fakeMatchController{err: fmt.Errorf("cache exploded")}, + } + result, err := getNodeFromControllers(nodeName, controllers) + require.Nil(t, result) + require.Error(t, err) + require.Contains(t, err.Error(), "unexpected error") + }) +} + +func TestAnnotationUpdater(t *testing.T) { + ctx := context.Background() + nodeName := "Galileo" + + makeGetNodeFunc := func(annotationValue map[string]string) getNodeByNameFunc { + return func(name string) (*v1.Node, error) { + return &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName, + Annotations: annotationValue, + }, + }, nil + } + } + + newController := func(nodeName string, annotations map[string]string) (*Controller, *logrus.Logger, *kubeCSFake.Clientset) { + t.Helper() + kube := kubeCSFake.NewClientset() + _, err := kube.CoreV1().Nodes().Create(ctx, &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName, + Annotations: annotations, + }, + }, metav1.CreateOptions{}) + require.NoError(t, err) + + logger := log.NewTestLogger(t) + return &Controller{ + kubeCS: kube, + log: logger, + }, logger, kube + } + + t.Run("valid IPv4 address writes annotation", func(t *testing.T) { + ctrl, logger, kube := newController(nodeName, nil) + updater := ctrl.annotationUpdater(logger, makeGetNodeFunc(nil)) + err := updater(ctx, nodeName, "192.168.1.1") + require.NoError(t, err) + + updated, err := kube.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) + require.NoError(t, err) + assert.Equal(t, "192.168.1.1", updated.Annotations[flipopv1alpha1.IPv4ReservedIPAnnotation]) + }) + t.Run("invalid IP address", func(t *testing.T) { + ctrl, logger, _ := newController(nodeName, nil) + updater := ctrl.annotationUpdater(logger, makeGetNodeFunc(nil)) + err := updater(ctx, nodeName, "invalid-ip") + require.Error(t, err) + }) + t.Run("empty string removes annotation", func(t *testing.T) { + annotations := map[string]string{"flipop.digitalocean.com/ipv4-reserved-ip": "192.168.1.1"} + ctrl, logger, kube := newController(nodeName, annotations) + updater := ctrl.annotationUpdater(logger, makeGetNodeFunc(annotations)) + err := updater(ctx, nodeName, "") + require.NoError(t, err) + + updated, err := kube.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) + require.NoError(t, err) + _, exists := updated.Annotations[flipopv1alpha1.IPv4ReservedIPAnnotation] + assert.False(t, exists, "expected annotation to be removed") + }) + t.Run("NoOp when new and current annotation values match", func(t *testing.T) { + const ip = "192.168.1.1" + annotations := map[string]string{"flipop.digitalocean.com/ipv4-reserved-ip": ip} + ctrl, logger, kube := newController(nodeName, annotations) + var patchCalls int + kube.Fake.PrependReactor("patch", "nodes", + func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + patchCalls++ + return false, nil, nil // let the fake continue its normal processing + }) + err := ctrl.annotationUpdater(logger, makeGetNodeFunc(annotations))(ctx, nodeName, ip) + require.NoError(t, err) + assert.Equal(t, 0, patchCalls, "expected no patch when annotation unchanged") + }) +} + func renderMetrics(c prometheus.Collector) (string, error) { metricRegistry := prometheus.NewPedanticRegistry() if err := metricRegistry.Register(c); err != nil { diff --git a/pkg/floatingip/ip_controller.go b/pkg/floatingip/ip_controller.go index 6fe9654..68d583c 100644 --- a/pkg/floatingip/ip_controller.go +++ b/pkg/floatingip/ip_controller.go @@ -47,6 +47,8 @@ type newIPFunc func(ctx context.Context, ips []string) error // statusUpdateFunc describes a callback when the ip/node assignment status should be updated. type statusUpdateFunc func(ctx context.Context, status flipopv1alpha1.FloatingIPPoolStatus) error +type annotationUpdateFunc func(ctx context.Context, noneName, ip string) error + type ipController struct { provider provider.IPProvider dnsProvider provider.DNSProvider @@ -91,6 +93,7 @@ type ipController struct { updateStatus bool onStatusUpdate statusUpdateFunc + onAnnotate annotationUpdateFunc dns *flipopv1alpha1.DNSRecordSet dnsDirty bool @@ -114,11 +117,12 @@ type retry struct { } // newIPController initializes an ipController. -func newIPController(log logrus.FieldLogger, onNewIPs newIPFunc, onStatusUpdate statusUpdateFunc) *ipController { +func newIPController(log logrus.FieldLogger, onNewIPs newIPFunc, onStatusUpdate statusUpdateFunc, onAnnotate annotationUpdateFunc) *ipController { i := &ipController{ log: log, onNewIPs: onNewIPs, onStatusUpdate: onStatusUpdate, + onAnnotate: onAnnotate, pokeChan: make(chan struct{}, 1), now: time.Now, } @@ -181,7 +185,7 @@ func (i *ipController) updateProviders( return change } -func (i *ipController) updateIPs(ips []string, desiredIPs int) { +func (i *ipController) updateIPs(ctx context.Context, ips []string, desiredIPs int) { i.lock.Lock() defer i.lock.Unlock() i.disabledIPs = nil @@ -218,6 +222,11 @@ func (i *ipController) updateIPs(ips []string, desiredIPs int) { if nodeName, ok := i.providerIDToNodeName[status.nodeProviderID]; ok { log.WithField("node", nodeName).Warn("update removes ip assigned to active node") i.assignableNodes.Add(status.nodeProviderID, true) // This node needs reassigned ASAP. + if i.onAnnotate != nil { + if err := i.onAnnotate(ctx, nodeName, ""); err != nil { + i.log.WithError(err).Error("updating Reserved IP annotation") + } + } } else { // We don't unassign IPs when DisableNodes is called, we just mark the ip as assignable. log.Info("update removes ip assigned to inactive node") @@ -439,6 +448,12 @@ func (i *ipController) reconcileIPStatus(ctx context.Context) { if isProviderIDActiveNode { i.assignableNodes.Delete(providerID) log.Info("ip address has existing assignment, reusing") + // Since the IP address is already assigned to the node we want to ensure the annotation reflects it as well. + if i.onAnnotate != nil { + if err := i.onAnnotate(ctx, i.providerIDToNodeName[providerID], ip); err != nil { + i.log.WithError(err).Error("updating Reserved IP annotation") + } + } } else { // The IP references a node we don't know about yet. log.Info("ip address has existing assignment, but is available") @@ -563,6 +578,11 @@ func (i *ipController) reconcileAssignment(ctx context.Context) { delete(i.providerIDToRetry, providerID) i.nextAssignment = i.now().Add(i.assignmentCoolOff) status.assignments++ + if i.onAnnotate != nil { + if err := i.onAnnotate(ctx, i.providerIDToNodeName[providerID], ip); err != nil { + i.log.WithError(err).Error("updating Reserved IP annotation") + } + } } else { status.state = flipopv1alpha1.IPStateError status.retrySchedule = provider.ErrorToRetrySchedule(err) @@ -578,6 +598,7 @@ func (i *ipController) reconcileAssignment(ctx context.Context) { i.providerIDToRetry[providerID] = nRetry i.retry(nRetry.nextRetry) } + i.dnsDirty = true _, status.nextRetry = status.retrySchedule.Next(status.attempts) i.retry(status.nextRetry) @@ -612,26 +633,32 @@ func (i *ipController) reconcileDNS(ctx context.Context) { i.dnsDirty = false } -func (i *ipController) DisableNodes(nodes ...*corev1.Node) { +func (i *ipController) DisableNodes(ctx context.Context, nodes ...*corev1.Node) { i.lock.Lock() defer i.lock.Unlock() + var changed bool + for _, node := range nodes { providerID := node.Spec.ProviderID + log := i.log.WithFields(logrus.Fields{ + "node": node.Name, + "provider_id": providerID, + }) + // Skip if providerID not assigned as there is something unexcpeted going on with the node if providerID == "" { + log.Warn("spec.providerID not set") continue } - + // skip if not currently enabled if _, ok := i.providerIDToNodeName[providerID]; !ok { - continue // Wasn't enabled + continue } - log := i.log.WithFields(logrus.Fields{ - "node": node.Name, - "provider_id": providerID, - }) - i.updateStatus = true - i.dnsDirty = true + + // remove from enabled set delete(i.providerIDToNodeName, providerID) if ip := i.providerIDToIP[providerID]; ip != "" { + // node had an IP → unassign & annotate & mark DNS dirty + // Add this IP to the back of the list. This increases the chances that the IP mapping // can be retained if the node recovers. i.assignableIPs.Add(ip, false) @@ -643,37 +670,55 @@ func (i *ipController) DisableNodes(nodes ...*corev1.Node) { status.retrySchedule = provider.RetrySlow _, status.nextRetry = status.retrySchedule.Next(status.attempts) log.WithField("ip", ip).Info("node disabled; ip added to assignable list") + if i.onAnnotate != nil { + if err := i.onAnnotate(ctx, node.Name, ""); err != nil { + i.log.WithError(err).Error("updating Reserved IP annotation") + } + } + + // Ensure DNS gets update to reflect this unassigned IP address + i.updateStatus = true + i.dnsDirty = true + } else { + // node did not have an IP address, no need to do DNS update log.Info("node disabled") } - i.assignableNodes.Delete(providerID) + // We remove the node from assignable node (since it's disabled) // We leave the providerID<->IP mappings in providerIDToIP/ipStatus.nodeProviderID so we can // reuse the IP mapping, if it's not immediately recovered. + i.assignableNodes.Delete(providerID) + // If we get here then a node has been disabled, and we need to run reconcile + changed = true + } + if changed { i.poke() } } -func (i *ipController) EnableNodes(nodes ...*corev1.Node) { +func (i *ipController) EnableNodes(ctx context.Context, nodes ...*corev1.Node) { i.lock.Lock() defer i.lock.Unlock() + var changed bool for _, node := range nodes { providerID := node.Spec.ProviderID if providerID == "" { continue } + // skip if already marked enabled if _, ok := i.providerIDToNodeName[providerID]; ok { - continue // Already enabled. + continue } - i.poke() - i.updateStatus = true - i.dnsDirty = true + + // mark this node enabled i.providerIDToNodeName[providerID] = node.Name log := i.log.WithFields(logrus.Fields{ "node": node.Name, "provider_id": providerID, }) if ip := i.providerIDToIP[providerID]; ip != "" { + // node already had an IP → re‑annotate & trigger DNS log.WithField("ip", ip).Info("enabling node; already assigned to ip") status := i.ipToStatus[ip] status.nodeProviderID = providerID // should already be set. @@ -682,11 +727,29 @@ func (i *ipController) EnableNodes(nodes ...*corev1.Node) { status.retrySchedule = provider.RetryFast status.attempts = 0 _, status.nextRetry = status.retrySchedule.Next(status.attempts) + // remove from assignableIPs so we don’t re‑assign it i.assignableIPs.Delete(ip) - continue // Already has an IP. + // Since the IP address is already assigned to the node we want to ensure the annotation reflects it as well. + if i.onAnnotate != nil { + log.Warn("test") + if err := i.onAnnotate(ctx, i.providerIDToNodeName[providerID], ip); err != nil { + i.log.WithError(err).Error("updating Reserved IP annotation") + } + } + // Ensure DNS gets update to reflect this reused IP address + i.updateStatus = true + i.dnsDirty = true + } else { + // brand‑new node needs an IP, queue it—but DNS waits until assign + log.Info("enabling node; submitted to assignable node queue") + i.assignableNodes.Add(providerID, false) } - log.Info("enabling node; submitted to assignable node queue") - i.assignableNodes.Add(providerID, false) + // If we get here then a node has been enabled, and we need to run reconcile + changed = true + } + + if changed { + i.poke() } } diff --git a/pkg/floatingip/ip_controller_test.go b/pkg/floatingip/ip_controller_test.go index 2f46748..c81a752 100644 --- a/pkg/floatingip/ip_controller_test.go +++ b/pkg/floatingip/ip_controller_test.go @@ -36,6 +36,11 @@ import ( var fakeNow = time.Date(2021, 1, 1, 1, 1, 1, 0, time.UTC) +type annotateCall struct { + NodeName string + IP string +} + func TestIPControllerReconcileDesiredIPs(t *testing.T) { type createIPRes struct { ip string @@ -229,7 +234,7 @@ func TestIPControllerReconcileIPStatus(t *testing.T) { tc := tc t.Run(tc.name, func(t *testing.T) { ctx := context.Background() - i := newIPController(logrus.New(), nil, nil) + i := newIPController(logrus.New(), nil, nil, nil) i.updateProviders(&provider.MockIPProvider{ IPToProviderIDFunc: func(_ context.Context, ip string) (string, error) { require.GreaterOrEqual(t, len(tc.responses), 1, "unexpected call to IPToProviderIDFunc") @@ -286,6 +291,7 @@ func TestIPControllerReconcileAssignment(t *testing.T) { expectIPRetry bool expectAssignableIPs []string expectAssignableNodes []string + expectAnnotateCall *annotateCall eval func(i *ipController) }{ { @@ -299,8 +305,10 @@ func TestIPControllerReconcileAssignment(t *testing.T) { expectProviderIDToIP: map[string]string{"mock://1": "192.168.1.1"}, responses: []assignIPRes{{ip: "192.168.1.1", providerID: "mock://1"}}, expectIPRetry: true, // We always retry, because of assign + expectAnnotateCall: &annotateCall{NodeName: "hello-world", IP: "192.168.1.1"}, setup: func(i *ipController) { i.ipToStatus["192.168.1.1"] = &ipStatus{} + i.providerIDToNodeName["mock://1"] = "hello-world" }, eval: func(i *ipController) { require.Equal(t, provider.RetryFast, i.ipToStatus["192.168.1.1"].retrySchedule) @@ -314,8 +322,10 @@ func TestIPControllerReconcileAssignment(t *testing.T) { expectProviderIDToIP: map[string]string{"mock://1": "192.168.1.1"}, responses: []assignIPRes{{ip: "192.168.1.1", providerID: "mock://1"}}, expectIPRetry: true, // We always retry, because of assign + expectAnnotateCall: &annotateCall{NodeName: "hello-world", IP: "192.168.1.1"}, setup: func(i *ipController) { i.ipToStatus["192.168.1.1"] = &ipStatus{} + i.providerIDToNodeName["mock://1"] = "hello-world" i.assignmentCoolOff = time.Second i.now = func() time.Time { return fakeNow } }, @@ -348,8 +358,10 @@ func TestIPControllerReconcileAssignment(t *testing.T) { expectProviderIDToIP: map[string]string{"mock://1": "192.168.1.1"}, responses: []assignIPRes{{ip: "192.168.1.1", providerID: "mock://1", err: provider.ErrInProgress}}, expectIPRetry: true, // We always retry, because of assign + expectAnnotateCall: &annotateCall{NodeName: "hello-world", IP: "192.168.1.1"}, setup: func(i *ipController) { i.ipToStatus["192.168.1.1"] = &ipStatus{} + i.providerIDToNodeName["mock://1"] = "hello-world" }, eval: func(i *ipController) { require.Equal(t, provider.RetryFast, i.ipToStatus["192.168.1.1"].retrySchedule) @@ -376,7 +388,13 @@ func TestIPControllerReconcileAssignment(t *testing.T) { tc := tc t.Run(tc.name, func(t *testing.T) { ctx := context.Background() - i := newIPController(logrus.New(), nil, nil) + + var annotateCalls []annotateCall + onAnnotate := func(_ context.Context, nodeName, ip string) error { + annotateCalls = append(annotateCalls, annotateCall{nodeName, ip}) + return nil + } + i := newIPController(logrus.New(), nil, nil, onAnnotate) i.updateProviders(&provider.MockIPProvider{ AssignIPFunc: func(_ context.Context, ip string, providerID string) error { require.GreaterOrEqual(t, len(tc.responses), 1, "unexpected call to AssignIPFunc") @@ -418,6 +436,13 @@ func TestIPControllerReconcileAssignment(t *testing.T) { for _, providerID := range tc.expectAssignableNodes { require.True(t, i.assignableNodes.IsSet(providerID)) } + + if tc.expectAnnotateCall != nil { + require.Len(t, annotateCalls, 1) + assert.Equal(t, *tc.expectAnnotateCall, annotateCalls[0]) + } else { + assert.Len(t, annotateCalls, 0) + } }) } } @@ -427,6 +452,7 @@ func TestIPControllerDisableNodes(t *testing.T) { name string setup func(i *ipController) expectAssignableIP bool + expectAnnotateCall *annotateCall }{ { name: "node was assignable", @@ -444,6 +470,7 @@ func TestIPControllerDisableNodes(t *testing.T) { i.providerIDToNodeName["mock://1"] = "hello-world" }, expectAssignableIP: true, + expectAnnotateCall: &annotateCall{NodeName: "hello-world", IP: ""}, }, { name: "never seen", @@ -452,28 +479,43 @@ func TestIPControllerDisableNodes(t *testing.T) { for _, tc := range tcs { tc := tc t.Run(tc.name, func(t *testing.T) { - i := newIPController(logrus.New(), nil, nil) + var annotateCalls []annotateCall + onAnnotate := func(_ context.Context, nodeName, ip string) error { + annotateCalls = append(annotateCalls, annotateCall{nodeName, ip}) + return nil + } + i := newIPController(logrus.New(), nil, nil, onAnnotate) if tc.setup != nil { tc.setup(i) } - i.DisableNodes(&corev1.Node{ - ObjectMeta: metav1.ObjectMeta{Name: "hello-world", UID: uuid.NewUUID()}, - Spec: corev1.NodeSpec{ProviderID: "mock://1"}, - }) + i.DisableNodes( + context.TODO(), + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "hello-world", UID: uuid.NewUUID()}, + Spec: corev1.NodeSpec{ProviderID: "mock://1"}}, + ) require.False(t, i.assignableNodes.IsSet("mock://")) require.NotContains(t, i.providerIDToNodeName, "mock://1") if tc.expectAssignableIP { require.True(t, i.assignableIPs.IsSet("192.168.1.1")) } + + if tc.expectAnnotateCall != nil { + require.Len(t, annotateCalls, 1) + assert.Equal(t, *tc.expectAnnotateCall, annotateCalls[0]) + } else { + assert.Len(t, annotateCalls, 0) + } }) } } func TestIPControllers(t *testing.T) { tcs := []struct { - name string - setup func(i *ipController) - expectAssignable bool + name string + setup func(i *ipController) + expectAssignable bool + expectAnnotateCall *annotateCall }{ { name: "simple", @@ -489,27 +531,49 @@ func TestIPControllers(t *testing.T) { }, }, { - name: "already assigned", + name: "already enabled and assigned", setup: func(i *ipController) { i.providerIDToIP["mock://1"] = "192.168.1.1" i.ipToStatus["192.168.1.1"] = &ipStatus{nodeProviderID: "mock://1"} i.providerIDToNodeName["mock://1"] = "hello-world" }, }, + { + name: "enabling node; already assigned to ip", + setup: func(i *ipController) { + i.providerIDToIP["mock://1"] = "192.168.1.1" + i.ipToStatus["192.168.1.1"] = &ipStatus{nodeProviderID: "mock://1"} + }, + expectAnnotateCall: &annotateCall{NodeName: "hello-world", IP: "192.168.1.1"}, + }, } for _, tc := range tcs { tc := tc t.Run(tc.name, func(t *testing.T) { - i := newIPController(log.NewTestLogger(t), nil, nil) + var annotateCalls []annotateCall + onAnnotate := func(_ context.Context, nodeName, ip string) error { + annotateCalls = append(annotateCalls, annotateCall{nodeName, ip}) + return nil + } + i := newIPController(log.NewTestLogger(t), nil, nil, onAnnotate) if tc.setup != nil { tc.setup(i) } - i.EnableNodes(&corev1.Node{ - ObjectMeta: metav1.ObjectMeta{Name: "hello-world", UID: uuid.NewUUID()}, - Spec: corev1.NodeSpec{ProviderID: "mock://1"}, - }) + i.EnableNodes( + context.TODO(), + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "hello-world", UID: uuid.NewUUID()}, + Spec: corev1.NodeSpec{ProviderID: "mock://1"}}, + ) require.Equal(t, tc.expectAssignable, i.assignableNodes.IsSet("mock://1")) require.Equal(t, "hello-world", i.providerIDToNodeName["mock://1"]) + + if tc.expectAnnotateCall != nil { + require.Len(t, annotateCalls, 1) + assert.Equal(t, *tc.expectAnnotateCall, annotateCalls[0]) + } else { + assert.Len(t, annotateCalls, 0) + } }) } } @@ -574,7 +638,7 @@ func TestReconcilePendingIPs(t *testing.T) { } t.Error("expected call to onNewIPs") return nil - }, nil) + }, nil, nil) if tc.setup != nil { tc.setup(i) } diff --git a/pkg/nodedns/controller.go b/pkg/nodedns/controller.go index 89c4859..0bbe25b 100644 --- a/pkg/nodedns/controller.go +++ b/pkg/nodedns/controller.go @@ -301,7 +301,7 @@ func (d *dnsEnablerDisabler) metricLabels() prometheus.Labels { } } -func (d *dnsEnablerDisabler) EnableNodes(nodes ...*corev1.Node) { +func (d *dnsEnablerDisabler) EnableNodes(ctx context.Context, nodes ...*corev1.Node) { d.lock.Lock() defer d.lock.Unlock() for _, node := range nodes { @@ -310,7 +310,7 @@ func (d *dnsEnablerDisabler) EnableNodes(nodes ...*corev1.Node) { d.applyDNS() } -func (d *dnsEnablerDisabler) DisableNodes(nodes ...*corev1.Node) { +func (d *dnsEnablerDisabler) DisableNodes(ctx context.Context, nodes ...*corev1.Node) { d.lock.Lock() defer d.lock.Unlock() for _, node := range nodes { @@ -337,22 +337,31 @@ func (d *dnsEnablerDisabler) applyDNS() { for _, node := range d.activeNodes { var found bool ll := ll.WithField("node", node.Name) - for _, addr := range node.Status.Addresses { - if addr.Type != addressType { - continue - } - ip := net.ParseIP(addr.Address) - if ip == nil { - ll.WithField("address", addr.Address).Warn("Failed to parse IP") - continue + + if addressType == flipopv1alpha1.IPv4ReservedIPAnnotation { + reservedIP, ok := node.Annotations[flipopv1alpha1.IPv4ReservedIPAnnotation] + if ok { + ips = append(ips, reservedIP) + found = true } - ip = ip.To4() - if ip == nil { - ll.WithField("address", addr.Address).Warn("IPv6 addresses are NOT currently supported") - continue + } else { + for _, addr := range node.Status.Addresses { + if addr.Type != addressType { + continue + } + ip := net.ParseIP(addr.Address) + if ip == nil { + ll.WithField("address", addr.Address).Warn("Failed to parse IP") + continue + } + ip = ip.To4() + if ip == nil { + ll.WithField("address", addr.Address).Warn("IPv6 addresses are NOT currently supported") + continue + } + ips = append(ips, ip.String()) + found = true } - ips = append(ips, ip.String()) - found = true } if !found { ll.Warn("matching node had no IPs of the expected type") @@ -384,6 +393,7 @@ func (d *dnsEnablerDisabler) applyDNS() { status.Error = fmt.Sprintf("Failed to update DNS: %s", err.Error()) } else { ll.Info("DNS records updated") + ll.WithField("ips", ips).Debug("DNS record updated") status.State = flipopv1alpha1.NodeDNSRecordActive } err = updateStatus(d.ctx, d.flipopCS, d.k8s.Name, d.k8s.Namespace, status) diff --git a/pkg/nodematch/match_controller.go b/pkg/nodematch/match_controller.go index 1b0f846..22a8e69 100644 --- a/pkg/nodematch/match_controller.go +++ b/pkg/nodematch/match_controller.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + v2 "k8s.io/client-go/listers/core/v1" "reflect" "sort" "sync" @@ -47,8 +48,8 @@ const ( // NodeEnableDisabler describes a controller which can enable or disable sets of nodes, based // upon decisions reached by the node match controller. type NodeEnableDisabler interface { - EnableNodes(...*corev1.Node) - DisableNodes(...*corev1.Node) + EnableNodes(ctx context.Context, nodes ...*corev1.Node) + DisableNodes(ctx context.Context, nodes ...*corev1.Node) } // Controller watches Kubernetes nodes and pods and enables or disables them with the provided @@ -64,6 +65,7 @@ type Controller struct { log logrus.FieldLogger kubeCS kubernetes.Interface nodeInformer cache.SharedIndexInformer + nodeLister v2.NodeLister wg sync.WaitGroup ctx context.Context @@ -200,6 +202,8 @@ func (m *Controller) run() { } }, ) + m.nodeLister = v2.NewNodeLister(m.nodeInformer.GetIndexer()) + m.nodeInformer.AddEventHandler(m) m.wg.Add(1) go func() { @@ -244,7 +248,7 @@ func (m *Controller) run() { } } sort.Sort(byNodeName(enable)) // make this list reproducable - m.action.EnableNodes(enable...) + m.action.EnableNodes(m.ctx, enable...) m.primed = true } @@ -266,7 +270,7 @@ func (m *Controller) getNodePods(nodeName string) ([]*corev1.Pod, error) { } func (m *Controller) deleteNode(k8sNode *corev1.Node) { - m.action.DisableNodes(k8sNode) + m.action.DisableNodes(m.ctx, k8sNode) delete(m.nodeNameToNode, k8sNode.Name) return } @@ -277,6 +281,9 @@ func (m *Controller) updateNode(ctx context.Context, k8sNode *corev1.Node) error return nil } providerID := k8sNode.Spec.ProviderID + newReservedIP := k8sNode.Annotations[flipopv1alpha1.IPv4ReservedIPAnnotation] + var oldReservedIP string + log := m.log.WithFields(logrus.Fields{"node": k8sNode.Name, "node_provider_id": providerID}) n, ok := m.nodeNameToNode[k8sNode.Name] if !ok { @@ -288,6 +295,7 @@ func (m *Controller) updateNode(ctx context.Context, k8sNode *corev1.Node) error m.nodeNameToNode[n.getName()] = n log.Info("new node") } else { + oldReservedIP = n.k8sNode.Annotations[flipopv1alpha1.IPv4ReservedIPAnnotation] n.k8sNode = k8sNode log.Debug("node updated") } @@ -295,10 +303,18 @@ func (m *Controller) updateNode(ctx context.Context, k8sNode *corev1.Node) error var oldNodeMatch = n.isNodeMatch n.isNodeMatch = m.isNodeMatch(n) - if oldNodeMatch == n.isNodeMatch { - log.Debug("node match unchanged") + // If the nodes match status and the reservedIP has not changed, then we ignore the update + if oldReservedIP == newReservedIP && oldNodeMatch == n.isNodeMatch { + log.WithFields(logrus.Fields{ + "old_ip": oldReservedIP, + "new_ip": newReservedIP, + }).Debug("node match and reserved IP annotation unchanged") return nil } + log.WithFields(logrus.Fields{ + "old_ip": oldReservedIP, + "new_ip": newReservedIP, + }).Debug("node match or reserved IP annotation changed") if n.isNodeMatch && len(n.matchingPods) > 0 { // We stop tracking pods when the node doesn't match. @@ -319,13 +335,13 @@ func (m *Controller) updateNode(ctx context.Context, k8sNode *corev1.Node) error log.Info("enabling node") n.enabled = true if m.primed { - m.action.EnableNodes(n.k8sNode) + m.action.EnableNodes(m.ctx, n.k8sNode) } } else { log.Info("disabling node") n.enabled = false // This should be idempotent, so we don't need to care if we're primed yet. - m.action.DisableNodes(n.k8sNode) + m.action.DisableNodes(m.ctx, n.k8sNode) } return nil } @@ -391,14 +407,14 @@ func (m *Controller) updatePod(pod *corev1.Pod) error { log.Debug("enabling node; pod update met node match criteria") n.enabled = true if m.primed { - m.action.EnableNodes(n.k8sNode) + m.action.EnableNodes(m.ctx, n.k8sNode) } } } else { delete(n.matchingPods, podKey) if len(n.matchingPods) == 0 { log.Debug("disabling node; updated pod no longer meets node match criteria") - m.action.DisableNodes(n.k8sNode) + m.action.DisableNodes(m.ctx, n.k8sNode) } } return nil @@ -415,7 +431,7 @@ func (m *Controller) deletePod(pod *corev1.Pod) { podKey := podNamespacedName(pod) delete(n.matchingPods, podKey) if len(n.matchingPods) == 0 { - m.action.DisableNodes(n.k8sNode) + m.action.DisableNodes(m.ctx, n.k8sNode) } } @@ -479,6 +495,10 @@ func (m *Controller) OnDelete(obj interface{}) { } } +func (m *Controller) GetNodeByName(nodeName string) (*corev1.Node, error) { + return m.nodeLister.Get(nodeName) +} + type node struct { k8sNode *corev1.Node isNodeMatch bool diff --git a/pkg/nodematch/match_controller_test.go b/pkg/nodematch/match_controller_test.go index 20fcbc3..dfe96aa 100644 --- a/pkg/nodematch/match_controller_test.go +++ b/pkg/nodematch/match_controller_test.go @@ -34,13 +34,13 @@ type mockNodeEnableDisabler struct { nodes map[string]*corev1.Node } -func (mned *mockNodeEnableDisabler) EnableNodes(nodes ...*corev1.Node) { +func (mned *mockNodeEnableDisabler) EnableNodes(ctx context.Context, nodes ...*corev1.Node) { for _, n := range nodes { mned.nodes[n.Name] = n } } -func (mned *mockNodeEnableDisabler) DisableNodes(nodes ...*corev1.Node) { +func (mned *mockNodeEnableDisabler) DisableNodes(ctx context.Context, nodes ...*corev1.Node) { for _, n := range nodes { delete(mned.nodes, n.Name) }