Skip to content

Commit

Permalink
review
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Feb 11, 2025
1 parent 7901a70 commit 1ddb121
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 29 deletions.
71 changes: 51 additions & 20 deletions pkg/controllers/node/termination/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,24 +96,27 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile

nodeClaim, err := nodeutils.NodeClaimForNode(ctx, c.kubeClient, node)
if err != nil {
// This should not occur. The NodeClaim is required to track details about the termination stage and termination grace
// period and will not be finalized until after the Node has been terminated by Karpenter. If there are duplicates or
// the nodeclaim does not exist, this indicates a customer induced error (e.g. removing finalizers or manually
// creating nodeclaims with matching provider IDs).
if nodeutils.IsDuplicateNodeClaimError(err) || nodeutils.IsNodeClaimNotFoundError(err) {
log.FromContext(ctx).Error(err, "failed to terminate node")
return reconcile.Result{}, nil
return reconcile.Result{}, c.associatedNodeClaimError(err)
}
return reconcile.Result{}, err
}
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("NodeClaim", klog.KRef(nodeClaim.Namespace, nodeClaim.Name)))
if nodeClaim.DeletionTimestamp.IsZero() {
if err := c.kubeClient.Delete(ctx, nodeClaim); err != nil {
if errors.IsNotFound(err) {
return reconcile.Result{Requeue: true}, nil
return reconcile.Result{}, c.associatedNodeClaimError(err)
}
return reconcile.Result{}, fmt.Errorf("deleting nodeclaim, %w", err)
}
}

// If the underlying NodeClaim no longer exists, we want to delete to avoid trying to gracefully drain nodes that are
// no longer alive. We do a check on the Ready condition of the node since, even though the CloudProvider says the
// If the underlying instance no longer exists, we want to delete to avoid trying to gracefully draining the
// associated node. We do a check on the Ready condition of the node since, even though the CloudProvider says the
// instance is not around, we know that the kubelet process is still running if the Node Ready condition is true.
// Similar logic to: https://github.com/kubernetes/kubernetes/blob/3a75a8c8d9e6a1ebd98d8572132e675d4980f184/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go#L144
if nodeutils.GetCondition(node, corev1.NodeReady).Status != corev1.ConditionTrue {
Expand All @@ -130,7 +133,7 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile
return reconcile.Result{}, err
}
if err = c.terminator.Taint(ctx, node, v1.DisruptedNoScheduleTaint); err != nil {
if errors.IsConflict(err) || errors.IsNotFound(err) {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, fmt.Errorf("tainting node with %s, %w", pretty.Taint(v1.DisruptedNoScheduleTaint), err)
Expand All @@ -141,11 +144,14 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile
}
c.recorder.Publish(terminatorevents.NodeFailedToDrain(node, err))
stored := nodeClaim.DeepCopy()
if modified := nodeClaim.StatusConditions().SetFalse(v1.ConditionTypeDrained, "Draining", "Draining"); modified {
if modified := nodeClaim.StatusConditions().SetUnknownWithReason(v1.ConditionTypeDrained, "Draining", "Draining"); modified {
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) || errors.IsNotFound(err) {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
if errors.IsNotFound(err) {
return reconcile.Result{}, c.associatedNodeClaimError(err)
}
return reconcile.Result{}, err
}
}
Expand All @@ -155,9 +161,12 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile
stored := nodeClaim.DeepCopy()
_ = nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeDrained)
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) || errors.IsNotFound(err) {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
if errors.IsNotFound(err) {
return reconcile.Result{}, c.associatedNodeClaimError(err)
}
return reconcile.Result{}, err
}
NodesDrainedTotal.Inc(map[string]string{
Expand All @@ -171,42 +180,60 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile
// In order for Pods associated with PersistentVolumes to smoothly migrate from the terminating Node, we wait
// for VolumeAttachments of drain-able Pods to be cleaned up before terminating Node and removing its finalizer.
// However, if TerminationGracePeriod is configured for Node, and we are past that period, we will skip waiting.
volumesDetached, err := c.ensureVolumesDetached(ctx, node)
pendingVolumeAttachments, err := c.pendingVolumeAttachments(ctx, node)
if err != nil {
return reconcile.Result{}, fmt.Errorf("ensuring no volume attachments, %w", err)
}
if volumesDetached {
if len(pendingVolumeAttachments) == 0 {
// There are no remaining volume attachments blocking instance termination. If we've already updated the status
// condition, fall through. Otherwise, update the status condition and requeue.
stored := nodeClaim.DeepCopy()
if modified := nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeVolumesDetached); modified {
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) || errors.IsNotFound(err) {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
if errors.IsNotFound(err) {
return reconcile.Result{}, c.associatedNodeClaimError(err)
}
return reconcile.Result{}, err
}
// We requeue after a patch operation since we want to ensure we read our own writes before any subsequent
// operations on the NodeClaim.
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
} else if !c.hasTerminationGracePeriodElapsed(nodeTerminationTime) {
c.recorder.Publish(terminatorevents.NodeAwaitingVolumeDetachmentEvent(node))
// There are volume attachments blocking instance termination remaining. We should set the status condition to
// unknown (if not already) and requeue. This case should never fall through, to continue to instance termination
// one of two conditions must be met: all blocking volume attachment objects must be deleted or the nodeclaim's TGP
// must have expired.
c.recorder.Publish(terminatorevents.NodeAwaitingVolumeDetachmentEvent(node, pendingVolumeAttachments...))
stored := nodeClaim.DeepCopy()
if modified := nodeClaim.StatusConditions().SetFalse(v1.ConditionTypeVolumesDetached, "AwaitingVolumeDetachment", "AwaitingVolumeDetachment"); modified {
if modified := nodeClaim.StatusConditions().SetUnknownWithReason(v1.ConditionTypeVolumesDetached, "AwaitingVolumeDetachment", "AwaitingVolumeDetachment"); modified {
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) || errors.IsNotFound(err) {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
if errors.IsNotFound(err) {
return reconcile.Result{}, c.associatedNodeClaimError(err)
}
return reconcile.Result{}, err
}
}
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
} else {
// There are volume attachments blocking instance termination remaining, but the nodeclaim's TGP has expired. In this
// case we should set the status condition to false (requeing if it wasn't already) and then fall through to instance
// termination.
stored := nodeClaim.DeepCopy()
if modified := nodeClaim.StatusConditions().SetFalse(v1.ConditionTypeVolumesDetached, "TerminationGracePeriodElapsed", "TerminationGracePeriodElapsed"); modified {
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) || errors.IsNotFound(err) {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
if errors.IsNotFound(err) {
return reconcile.Result{}, c.associatedNodeClaimError(err)
}
return reconcile.Result{}, err
}
// We requeue after a patch operation since we want to ensure we read our own writes before any subsequent
Expand All @@ -232,24 +259,28 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile
return reconcile.Result{}, nil
}

func (*Controller) associatedNodeClaimError(err error) error {
return reconcile.TerminalError(fmt.Errorf("failed to terminate node, expected a single associated nodeclaim, %w", err))
}

func (c *Controller) hasTerminationGracePeriodElapsed(nodeTerminationTime *time.Time) bool {
if nodeTerminationTime == nil {
return false
}
return !c.clock.Now().Before(*nodeTerminationTime)
}

func (c *Controller) ensureVolumesDetached(ctx context.Context, node *corev1.Node) (volumesDetached bool, err error) {
func (c *Controller) pendingVolumeAttachments(ctx context.Context, node *corev1.Node) ([]*storagev1.VolumeAttachment, error) {
volumeAttachments, err := nodeutils.GetVolumeAttachments(ctx, c.kubeClient, node)
if err != nil {
return false, err
return nil, err
}
// Filter out VolumeAttachments associated with not drain-able Pods
filteredVolumeAttachments, err := filterVolumeAttachments(ctx, c.kubeClient, node, volumeAttachments, c.clock)
if err != nil {
return false, err
return nil, err
}
return len(filteredVolumeAttachments) == 0, nil
return filteredVolumeAttachments, nil
}

// filterVolumeAttachments filters out storagev1.VolumeAttachments that should not block the termination
Expand Down
11 changes: 6 additions & 5 deletions pkg/controllers/node/termination/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ var _ = Describe("Termination", func() {
ExpectSingletonReconciled(ctx, queue)

nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).IsFalse()).To(BeTrue())
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).IsUnknown()).To(BeTrue())
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).Reason).To(Equal("Draining"))

// Expect the pods to be evicted
Expand All @@ -560,7 +560,7 @@ var _ = Describe("Termination", func() {
ExpectRequeued(ExpectObjectReconciled(ctx, env.Client, terminationController, node)) // DrainValidation
ExpectNodeWithNodeClaimDraining(env.Client, node.Name)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).IsFalse()).To(BeTrue())
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).IsUnknown()).To(BeTrue())
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).Reason).To(Equal("Draining"))

ExpectDeleted(ctx, env.Client, pods[1])
Expand Down Expand Up @@ -795,7 +795,7 @@ var _ = Describe("Termination", func() {
ExpectRequeued(ExpectObjectReconciled(ctx, env.Client, terminationController, node)) // VolumeDetachmentInitiation
ExpectExists(ctx, env.Client, node)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).IsFalse()).To(BeTrue())
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).IsUnknown()).To(BeTrue())
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).Reason).To(Equal("AwaitingVolumeDetachment"))

ExpectDeleted(ctx, env.Client, va)
Expand Down Expand Up @@ -837,7 +837,7 @@ var _ = Describe("Termination", func() {
ExpectRequeued(ExpectObjectReconciled(ctx, env.Client, terminationController, node)) // VolumeDetachment
ExpectExists(ctx, env.Client, node)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).IsFalse()).To(BeTrue())
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).IsUnknown()).To(BeTrue())
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).Reason).To(Equal("AwaitingVolumeDetachment"))

ExpectDeleted(ctx, env.Client, vaDrainable)
Expand Down Expand Up @@ -865,7 +865,8 @@ var _ = Describe("Termination", func() {
ExpectRequeued(ExpectObjectReconciled(ctx, env.Client, terminationController, node)) // VolumeDetachment
ExpectExists(ctx, env.Client, node)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).IsFalse()).To(BeTrue())
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).IsUnknown()).To(BeTrue())
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeVolumesDetached).Reason).To(Equal("AwaitingVolumeDetachment"))

fakeClock.Step(5 * time.Minute)
ExpectRequeued(ExpectObjectReconciled(ctx, env.Client, terminationController, node)) // VolumeDetachment
Expand Down
15 changes: 12 additions & 3 deletions pkg/controllers/node/termination/terminator/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@ import (
"fmt"
"time"

"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/events"
"sigs.k8s.io/karpenter/pkg/utils/pretty"

storagev1 "k8s.io/api/storage/v1"
)

func EvictPod(pod *corev1.Pod, message string) events.Event {
Expand Down Expand Up @@ -56,13 +60,18 @@ func NodeFailedToDrain(node *corev1.Node, err error) events.Event {
}
}

func NodeAwaitingVolumeDetachmentEvent(node *corev1.Node) events.Event {
func NodeAwaitingVolumeDetachmentEvent(node *corev1.Node, volumeAttachments ...*storagev1.VolumeAttachment) events.Event {
return events.Event{
InvolvedObject: node,
Type: corev1.EventTypeNormal,
Reason: "AwaitingVolumeDetachment",
Message: "Awaiting deletion VolumeAttachments bound to node",
DedupeValues: []string{node.Name},
Message: fmt.Sprintf(
"Awaiting deletion of bound VolumeAttachments (%s)",
pretty.Slice(lo.Map(volumeAttachments, func(va *storagev1.VolumeAttachment, _ int) string {
return va.Name
}), 5),
),
DedupeValues: []string{node.Name},
}
}

Expand Down
8 changes: 7 additions & 1 deletion pkg/utils/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func IgnoreNodeClaimNotFoundError(err error) error {
// DuplicateNodeClaimError is an error returned when multiple v1.NodeClaims are found matching the passed providerID
type DuplicateNodeClaimError struct {
ProviderID string
NodeClaims []string
}

func (e *DuplicateNodeClaimError) Error() string {
Expand Down Expand Up @@ -119,7 +120,12 @@ func NodeClaimForNode(ctx context.Context, c client.Client, node *corev1.Node) (
return nil, err
}
if len(nodeClaims) > 1 {
return nil, &DuplicateNodeClaimError{ProviderID: node.Spec.ProviderID}
return nil, &DuplicateNodeClaimError{
ProviderID: node.Spec.ProviderID,
NodeClaims: lo.Map(nodeClaims, func(nc *v1.NodeClaim, _ int) string {
return nc.Name
}),
}
}
if len(nodeClaims) == 0 {
return nil, &NodeClaimNotFoundError{ProviderID: node.Spec.ProviderID}
Expand Down

0 comments on commit 1ddb121

Please sign in to comment.