From 233f43a9aab18bdc62132bc33bf40af3779a55ad Mon Sep 17 00:00:00 2001 From: Jason Deal Date: Wed, 15 Jan 2025 15:30:16 -0800 Subject: [PATCH] review --- .../karpenter.kwok.sh_kwoknodeclasses.yaml | 2 +- kwok/charts/crds/karpenter.sh_nodeclaims.yaml | 2 +- kwok/charts/crds/karpenter.sh_nodepools.yaml | 2 +- pkg/apis/crds/karpenter.sh_nodeclaims.yaml | 2 +- pkg/apis/crds/karpenter.sh_nodepools.yaml | 2 +- .../node/termination/controller.go | 71 +++++++++++++------ .../node/termination/suite_test.go | 11 +-- .../termination/terminator/events/events.go | 15 +++- .../karpenter.test.sh_testnodeclasses.yaml | 2 +- pkg/utils/node/node.go | 8 ++- 10 files changed, 82 insertions(+), 35 deletions(-) diff --git a/kwok/apis/crds/karpenter.kwok.sh_kwoknodeclasses.yaml b/kwok/apis/crds/karpenter.kwok.sh_kwoknodeclasses.yaml index e9a158a966..c00c911b79 100644 --- a/kwok/apis/crds/karpenter.kwok.sh_kwoknodeclasses.yaml +++ b/kwok/apis/crds/karpenter.kwok.sh_kwoknodeclasses.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.17.1 + controller-gen.kubebuilder.io/version: v0.17.2 name: kwoknodeclasses.karpenter.kwok.sh spec: group: karpenter.kwok.sh diff --git a/kwok/charts/crds/karpenter.sh_nodeclaims.yaml b/kwok/charts/crds/karpenter.sh_nodeclaims.yaml index 92eb9dbe03..94a9b3604e 100644 --- a/kwok/charts/crds/karpenter.sh_nodeclaims.yaml +++ b/kwok/charts/crds/karpenter.sh_nodeclaims.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.17.1 + controller-gen.kubebuilder.io/version: v0.17.2 name: nodeclaims.karpenter.sh spec: group: karpenter.sh diff --git a/kwok/charts/crds/karpenter.sh_nodepools.yaml b/kwok/charts/crds/karpenter.sh_nodepools.yaml index bd608d655f..6601e59dbf 100644 --- a/kwok/charts/crds/karpenter.sh_nodepools.yaml +++ b/kwok/charts/crds/karpenter.sh_nodepools.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.17.1 + controller-gen.kubebuilder.io/version: v0.17.2 name: nodepools.karpenter.sh spec: group: karpenter.sh diff --git a/pkg/apis/crds/karpenter.sh_nodeclaims.yaml b/pkg/apis/crds/karpenter.sh_nodeclaims.yaml index 00018dc000..9538307299 100644 --- a/pkg/apis/crds/karpenter.sh_nodeclaims.yaml +++ b/pkg/apis/crds/karpenter.sh_nodeclaims.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.17.1 + controller-gen.kubebuilder.io/version: v0.17.2 name: nodeclaims.karpenter.sh spec: group: karpenter.sh diff --git a/pkg/apis/crds/karpenter.sh_nodepools.yaml b/pkg/apis/crds/karpenter.sh_nodepools.yaml index 36ecac075d..157aaf13c4 100644 --- a/pkg/apis/crds/karpenter.sh_nodepools.yaml +++ b/pkg/apis/crds/karpenter.sh_nodepools.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.17.1 + controller-gen.kubebuilder.io/version: v0.17.2 name: nodepools.karpenter.sh spec: group: karpenter.sh diff --git a/pkg/controllers/node/termination/controller.go b/pkg/controllers/node/termination/controller.go index ffdb5f8231..17dbf7454d 100644 --- a/pkg/controllers/node/termination/controller.go +++ b/pkg/controllers/node/termination/controller.go @@ -96,9 +96,12 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile nodeClaim, err := nodeutils.NodeClaimForNode(ctx, c.kubeClient, node) if err != nil { + // This should not occur. The NodeClaim is required to track details about the termination stage and termination grace + // period and will not be finalized until after the Node has been terminated by Karpenter. If there are duplicates or + // the nodeclaim does not exist, this indicates a customer induced error (e.g. removing finalizers or manually + // creating nodeclaims with matching provider IDs). if nodeutils.IsDuplicateNodeClaimError(err) || nodeutils.IsNodeClaimNotFoundError(err) { - log.FromContext(ctx).Error(err, "failed to terminate node") - return reconcile.Result{}, nil + return reconcile.Result{}, c.associatedNodeClaimError(err) } return reconcile.Result{}, err } @@ -106,14 +109,14 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile if nodeClaim.DeletionTimestamp.IsZero() { if err := c.kubeClient.Delete(ctx, nodeClaim); err != nil { if errors.IsNotFound(err) { - return reconcile.Result{Requeue: true}, nil + return reconcile.Result{}, c.associatedNodeClaimError(err) } return reconcile.Result{}, fmt.Errorf("deleting nodeclaim, %w", err) } } - // If the underlying NodeClaim no longer exists, we want to delete to avoid trying to gracefully drain nodes that are - // no longer alive. We do a check on the Ready condition of the node since, even though the CloudProvider says the + // If the underlying instance no longer exists, we want to delete to avoid trying to gracefully draining the + // associated node. We do a check on the Ready condition of the node since, even though the CloudProvider says the // instance is not around, we know that the kubelet process is still running if the Node Ready condition is true. // Similar logic to: https://github.com/kubernetes/kubernetes/blob/3a75a8c8d9e6a1ebd98d8572132e675d4980f184/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go#L144 if nodeutils.GetCondition(node, corev1.NodeReady).Status != corev1.ConditionTrue { @@ -130,7 +133,7 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile return reconcile.Result{}, err } if err = c.terminator.Taint(ctx, node, v1.DisruptedNoScheduleTaint); err != nil { - if errors.IsConflict(err) || errors.IsNotFound(err) { + if errors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil } return reconcile.Result{}, fmt.Errorf("tainting node with %s, %w", pretty.Taint(v1.DisruptedNoScheduleTaint), err) @@ -141,11 +144,14 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile } c.recorder.Publish(terminatorevents.NodeFailedToDrain(node, err)) stored := nodeClaim.DeepCopy() - if modified := nodeClaim.StatusConditions().SetFalse(v1.ConditionTypeDrained, "Draining", "Draining"); modified { + if modified := nodeClaim.StatusConditions().SetUnknownWithReason(v1.ConditionTypeDrained, "Draining", "Draining"); modified { if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil { - if errors.IsConflict(err) || errors.IsNotFound(err) { + if errors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil } + if errors.IsNotFound(err) { + return reconcile.Result{}, c.associatedNodeClaimError(err) + } return reconcile.Result{}, err } } @@ -155,9 +161,12 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile stored := nodeClaim.DeepCopy() _ = nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeDrained) if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil { - if errors.IsConflict(err) || errors.IsNotFound(err) { + if errors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil } + if errors.IsNotFound(err) { + return reconcile.Result{}, c.associatedNodeClaimError(err) + } return reconcile.Result{}, err } NodesDrainedTotal.Inc(map[string]string{ @@ -171,17 +180,22 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile // In order for Pods associated with PersistentVolumes to smoothly migrate from the terminating Node, we wait // for VolumeAttachments of drain-able Pods to be cleaned up before terminating Node and removing its finalizer. // However, if TerminationGracePeriod is configured for Node, and we are past that period, we will skip waiting. - volumesDetached, err := c.ensureVolumesDetached(ctx, node) + pendingVolumeAttachments, err := c.pendingVolumeAttachments(ctx, node) if err != nil { return reconcile.Result{}, fmt.Errorf("ensuring no volume attachments, %w", err) } - if volumesDetached { + if len(pendingVolumeAttachments) == 0 { + // There are no remaining volume attachments blocking instance termination. If we've already updated the status + // condition, fall through. Otherwise, update the status condition and requeue. stored := nodeClaim.DeepCopy() if modified := nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeVolumesDetached); modified { if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil { - if errors.IsConflict(err) || errors.IsNotFound(err) { + if errors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil } + if errors.IsNotFound(err) { + return reconcile.Result{}, c.associatedNodeClaimError(err) + } return reconcile.Result{}, err } // We requeue after a patch operation since we want to ensure we read our own writes before any subsequent @@ -189,24 +203,37 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile return reconcile.Result{RequeueAfter: 1 * time.Second}, nil } } else if !c.hasTerminationGracePeriodElapsed(nodeTerminationTime) { - c.recorder.Publish(terminatorevents.NodeAwaitingVolumeDetachmentEvent(node)) + // There are volume attachments blocking instance termination remaining. We should set the status condition to + // unknown (if not already) and requeue. This case should never fall through, to continue to instance termination + // one of two conditions must be met: all blocking volume attachment objects must be deleted or the nodeclaim's TGP + // must have expired. + c.recorder.Publish(terminatorevents.NodeAwaitingVolumeDetachmentEvent(node, pendingVolumeAttachments...)) stored := nodeClaim.DeepCopy() - if modified := nodeClaim.StatusConditions().SetFalse(v1.ConditionTypeVolumesDetached, "AwaitingVolumeDetachment", "AwaitingVolumeDetachment"); modified { + if modified := nodeClaim.StatusConditions().SetUnknownWithReason(v1.ConditionTypeVolumesDetached, "AwaitingVolumeDetachment", "AwaitingVolumeDetachment"); modified { if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil { - if errors.IsConflict(err) || errors.IsNotFound(err) { + if errors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil } + if errors.IsNotFound(err) { + return reconcile.Result{}, c.associatedNodeClaimError(err) + } return reconcile.Result{}, err } } return reconcile.Result{RequeueAfter: 1 * time.Second}, nil } else { + // There are volume attachments blocking instance termination remaining, but the nodeclaim's TGP has expired. In this + // case we should set the status condition to false (requeing if it wasn't already) and then fall through to instance + // termination. stored := nodeClaim.DeepCopy() if modified := nodeClaim.StatusConditions().SetFalse(v1.ConditionTypeVolumesDetached, "TerminationGracePeriodElapsed", "TerminationGracePeriodElapsed"); modified { if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil { - if errors.IsConflict(err) || errors.IsNotFound(err) { + if errors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil } + if errors.IsNotFound(err) { + return reconcile.Result{}, c.associatedNodeClaimError(err) + } return reconcile.Result{}, err } // We requeue after a patch operation since we want to ensure we read our own writes before any subsequent @@ -232,6 +259,10 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile return reconcile.Result{}, nil } +func (*Controller) associatedNodeClaimError(err error) error { + return reconcile.TerminalError(fmt.Errorf("failed to terminate node, expected a single associated nodeclaim, %w", err)) +} + func (c *Controller) hasTerminationGracePeriodElapsed(nodeTerminationTime *time.Time) bool { if nodeTerminationTime == nil { return false @@ -239,17 +270,17 @@ func (c *Controller) hasTerminationGracePeriodElapsed(nodeTerminationTime *time. return !c.clock.Now().Before(*nodeTerminationTime) } -func (c *Controller) ensureVolumesDetached(ctx context.Context, node *corev1.Node) (volumesDetached bool, err error) { +func (c *Controller) pendingVolumeAttachments(ctx context.Context, node *corev1.Node) ([]*storagev1.VolumeAttachment, error) { volumeAttachments, err := nodeutils.GetVolumeAttachments(ctx, c.kubeClient, node) if err != nil { - return false, err + return nil, err } // Filter out VolumeAttachments associated with not drain-able Pods filteredVolumeAttachments, err := filterVolumeAttachments(ctx, c.kubeClient, node, volumeAttachments, c.clock) if err != nil { - return false, err + return nil, err } - return len(filteredVolumeAttachments) == 0, nil + return filteredVolumeAttachments, nil } // filterVolumeAttachments filters out storagev1.VolumeAttachments that should not block the termination diff --git a/pkg/controllers/node/termination/suite_test.go b/pkg/controllers/node/termination/suite_test.go index ec977a740b..16e38a358f 100644 --- a/pkg/controllers/node/termination/suite_test.go +++ b/pkg/controllers/node/termination/suite_test.go @@ -549,7 +549,7 @@ var _ = Describe("Termination", func() { ExpectSingletonReconciled(ctx, queue) nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) - Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).IsFalse()).To(BeTrue()) + Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).IsUnknown()).To(BeTrue()) Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).Reason).To(Equal("Draining")) // Expect the pods to be evicted @@ -560,7 +560,7 @@ var _ = Describe("Termination", func() { ExpectRequeued(ExpectObjectReconciled(ctx, env.Client, terminationController, node)) // DrainValidation ExpectNodeWithNodeClaimDraining(env.Client, node.Name) nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) - Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).IsFalse()).To(BeTrue()) + Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).IsUnknown()).To(BeTrue()) Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).Reason).To(Equal("Draining")) ExpectDeleted(ctx, env.Client, pods[1]) @@ -795,7 +795,7 @@ var _ = Describe("Termination", func() { ExpectRequeued(ExpectObjectReconciled(ctx, env.Client, terminationController, node)) // VolumeDetachmentInitiation ExpectExists(ctx, env.Client, node) nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) - Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).IsFalse()).To(BeTrue()) + Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).IsUnknown()).To(BeTrue()) Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).Reason).To(Equal("AwaitingVolumeDetachment")) ExpectDeleted(ctx, env.Client, va) @@ -837,7 +837,7 @@ var _ = Describe("Termination", func() { ExpectRequeued(ExpectObjectReconciled(ctx, env.Client, terminationController, node)) // VolumeDetachment ExpectExists(ctx, env.Client, node) nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) - Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).IsFalse()).To(BeTrue()) + Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).IsUnknown()).To(BeTrue()) Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).Reason).To(Equal("AwaitingVolumeDetachment")) ExpectDeleted(ctx, env.Client, vaDrainable) @@ -865,7 +865,8 @@ var _ = Describe("Termination", func() { ExpectRequeued(ExpectObjectReconciled(ctx, env.Client, terminationController, node)) // VolumeDetachment ExpectExists(ctx, env.Client, node) nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) - Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).IsFalse()).To(BeTrue()) + Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).IsUnknown()).To(BeTrue()) + Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).Reason).To(Equal("AwaitingVolumeDetachment")) fakeClock.Step(5 * time.Minute) ExpectRequeued(ExpectObjectReconciled(ctx, env.Client, terminationController, node)) // VolumeDetachment diff --git a/pkg/controllers/node/termination/terminator/events/events.go b/pkg/controllers/node/termination/terminator/events/events.go index 85683787da..a2b66e935b 100644 --- a/pkg/controllers/node/termination/terminator/events/events.go +++ b/pkg/controllers/node/termination/terminator/events/events.go @@ -20,10 +20,14 @@ import ( "fmt" "time" + "github.com/samber/lo" corev1 "k8s.io/api/core/v1" v1 "sigs.k8s.io/karpenter/pkg/apis/v1" "sigs.k8s.io/karpenter/pkg/events" + "sigs.k8s.io/karpenter/pkg/utils/pretty" + + storagev1 "k8s.io/api/storage/v1" ) func EvictPod(pod *corev1.Pod, message string) events.Event { @@ -56,13 +60,18 @@ func NodeFailedToDrain(node *corev1.Node, err error) events.Event { } } -func NodeAwaitingVolumeDetachmentEvent(node *corev1.Node) events.Event { +func NodeAwaitingVolumeDetachmentEvent(node *corev1.Node, volumeAttachments ...*storagev1.VolumeAttachment) events.Event { return events.Event{ InvolvedObject: node, Type: corev1.EventTypeNormal, Reason: "AwaitingVolumeDetachment", - Message: "Awaiting deletion VolumeAttachments bound to node", - DedupeValues: []string{node.Name}, + Message: fmt.Sprintf( + "Awaiting deletion of bound VolumeAttachments (%s)", + pretty.Slice(lo.Map(volumeAttachments, func(va *storagev1.VolumeAttachment, _ int) string { + return va.Name + }), 5), + ), + DedupeValues: []string{node.Name}, } } diff --git a/pkg/test/v1alpha1/crds/karpenter.test.sh_testnodeclasses.yaml b/pkg/test/v1alpha1/crds/karpenter.test.sh_testnodeclasses.yaml index a80a21f279..f16803a2be 100644 --- a/pkg/test/v1alpha1/crds/karpenter.test.sh_testnodeclasses.yaml +++ b/pkg/test/v1alpha1/crds/karpenter.test.sh_testnodeclasses.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.17.1 + controller-gen.kubebuilder.io/version: v0.17.2 name: testnodeclasses.karpenter.test.sh spec: group: karpenter.test.sh diff --git a/pkg/utils/node/node.go b/pkg/utils/node/node.go index c308589dce..f743dd3d71 100644 --- a/pkg/utils/node/node.go +++ b/pkg/utils/node/node.go @@ -64,6 +64,7 @@ func IgnoreNodeClaimNotFoundError(err error) error { // DuplicateNodeClaimError is an error returned when multiple v1.NodeClaims are found matching the passed providerID type DuplicateNodeClaimError struct { ProviderID string + NodeClaims []string } func (e *DuplicateNodeClaimError) Error() string { @@ -119,7 +120,12 @@ func NodeClaimForNode(ctx context.Context, c client.Client, node *corev1.Node) ( return nil, err } if len(nodeClaims) > 1 { - return nil, &DuplicateNodeClaimError{ProviderID: node.Spec.ProviderID} + return nil, &DuplicateNodeClaimError{ + ProviderID: node.Spec.ProviderID, + NodeClaims: lo.Map(nodeClaims, func(nc *v1.NodeClaim, _ int) string { + return nc.Name + }), + } } if len(nodeClaims) == 0 { return nil, &NodeClaimNotFoundError{ProviderID: node.Spec.ProviderID}