Skip to content

Commit

Permalink
addressing review
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Jan 27, 2025
1 parent 136127f commit 4ed938c
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 19 deletions.
49 changes: 34 additions & 15 deletions pkg/controllers/node/termination/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,20 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile

nodeClaim, err := nodeutils.NodeClaimForNode(ctx, c.kubeClient, node)
if err != nil {
// This should not occur. The NodeClaim is required to track details about the termination stage and termination grace
// period and will not be finalized until after the Node has been terminated by Karpenter. If there are duplicates or
// the nodeclaim does not exist, this indicates a customer induced error (e.g. removing finalizers or manually
// creating nodeclaims with matching provider IDs).
if nodeutils.IsDuplicateNodeClaimError(err) || nodeutils.IsNodeClaimNotFoundError(err) {
return reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("failed to terminate node, %w", err))
return reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("failed to terminate node, expected a single associated nodeclaim, %w", err))
}
return reconcile.Result{}, err
}
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("NodeClaim", klog.KRef(nodeClaim.Namespace, nodeClaim.Name)))
if nodeClaim.DeletionTimestamp.IsZero() {
if err := c.kubeClient.Delete(ctx, nodeClaim); err != nil {
if errors.IsNotFound(err) {
return reconcile.Result{Requeue: true}, nil
return reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("failed to terminate node, expected a single associated nodeclaim, %w", err))
}
return reconcile.Result{}, fmt.Errorf("deleting nodeclaim, %w", err)
}
Expand All @@ -129,7 +133,7 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile
return reconcile.Result{}, err
}
if err = c.terminator.Taint(ctx, node, v1.DisruptedNoScheduleTaint); err != nil {
if errors.IsConflict(err) || errors.IsNotFound(err) {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, fmt.Errorf("tainting node with %s, %w", pretty.Taint(v1.DisruptedNoScheduleTaint), err)
Expand All @@ -142,9 +146,12 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile
stored := nodeClaim.DeepCopy()
if modified := nodeClaim.StatusConditions().SetFalse(v1.ConditionTypeDrained, "Draining", "Draining"); modified {
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) || errors.IsNotFound(err) {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
if errors.IsNotFound(err) {
return reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("failed to terminate node, expected a single associated nodeclaim, %w", err))
}
return reconcile.Result{}, err
}
}
Expand All @@ -154,9 +161,12 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile
stored := nodeClaim.DeepCopy()
_ = nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeDrained)
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) || errors.IsNotFound(err) {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
if errors.IsNotFound(err) {
return reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("failed to terminate node, expected a single associated nodeclaim, %w", err))
}
return reconcile.Result{}, err
}
NodesDrainedTotal.Inc(map[string]string{
Expand All @@ -170,31 +180,37 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile
// In order for Pods associated with PersistentVolumes to smoothly migrate from the terminating Node, we wait
// for VolumeAttachments of drain-able Pods to be cleaned up before terminating Node and removing its finalizer.
// However, if TerminationGracePeriod is configured for Node, and we are past that period, we will skip waiting.
volumesDetached, err := c.ensureVolumesDetached(ctx, node)
pendingVolumeAttachments, err := c.pendingVolumeAttachments(ctx, node)
if err != nil {
return reconcile.Result{}, fmt.Errorf("ensuring no volume attachments, %w", err)
}
if volumesDetached {
if len(pendingVolumeAttachments) == 0 {
stored := nodeClaim.DeepCopy()
if modified := nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeVolumesDetached); modified {
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) || errors.IsNotFound(err) {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
if errors.IsNotFound(err) {
return reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("failed to terminate node, expected a single associated nodeclaim, %w", err))
}
return reconcile.Result{}, err
}
// We requeue after a patch operation since we want to ensure we read our own writes before any subsequent
// operations on the NodeClaim.
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
} else if !c.hasTerminationGracePeriodElapsed(nodeTerminationTime) {
c.recorder.Publish(terminatorevents.NodeAwaitingVolumeDetachmentEvent(node))
c.recorder.Publish(terminatorevents.NodeAwaitingVolumeDetachmentEvent(node, pendingVolumeAttachments...))
stored := nodeClaim.DeepCopy()
if modified := nodeClaim.StatusConditions().SetFalse(v1.ConditionTypeVolumesDetached, "AwaitingVolumeDetachment", "AwaitingVolumeDetachment"); modified {
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) || errors.IsNotFound(err) {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
if errors.IsNotFound(err) {
return reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("failed to terminate node, expected a single associated nodeclaim, %w", err))
}
return reconcile.Result{}, err
}
}
Expand All @@ -203,9 +219,12 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile
stored := nodeClaim.DeepCopy()
if modified := nodeClaim.StatusConditions().SetFalse(v1.ConditionTypeVolumesDetached, "TerminationGracePeriodElapsed", "TerminationGracePeriodElapsed"); modified {
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) || errors.IsNotFound(err) {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
if errors.IsNotFound(err) {
return reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("failed to terminate node, expected a single associated nodeclaim, %w", err))
}
return reconcile.Result{}, err
}
// We requeue after a patch operation since we want to ensure we read our own writes before any subsequent
Expand Down Expand Up @@ -238,17 +257,17 @@ func (c *Controller) hasTerminationGracePeriodElapsed(nodeTerminationTime *time.
return !c.clock.Now().Before(*nodeTerminationTime)
}

func (c *Controller) ensureVolumesDetached(ctx context.Context, node *corev1.Node) (volumesDetached bool, err error) {
func (c *Controller) pendingVolumeAttachments(ctx context.Context, node *corev1.Node) ([]*storagev1.VolumeAttachment, error) {
volumeAttachments, err := nodeutils.GetVolumeAttachments(ctx, c.kubeClient, node)
if err != nil {
return false, err
return nil, err
}
// Filter out VolumeAttachments associated with not drain-able Pods
filteredVolumeAttachments, err := filterVolumeAttachments(ctx, c.kubeClient, node, volumeAttachments, c.clock)
if err != nil {
return false, err
return nil, err
}
return len(filteredVolumeAttachments) == 0, nil
return filteredVolumeAttachments, nil
}

// filterVolumeAttachments filters out storagev1.VolumeAttachments that should not block the termination
Expand Down
15 changes: 12 additions & 3 deletions pkg/controllers/node/termination/terminator/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@ import (
"fmt"
"time"

"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/events"
"sigs.k8s.io/karpenter/pkg/utils/pretty"

storagev1 "k8s.io/api/storage/v1"
)

func EvictPod(pod *corev1.Pod, message string) events.Event {
Expand Down Expand Up @@ -56,13 +60,18 @@ func NodeFailedToDrain(node *corev1.Node, err error) events.Event {
}
}

func NodeAwaitingVolumeDetachmentEvent(node *corev1.Node) events.Event {
func NodeAwaitingVolumeDetachmentEvent(node *corev1.Node, volumeAttachments ...*storagev1.VolumeAttachment) events.Event {
return events.Event{
InvolvedObject: node,
Type: corev1.EventTypeNormal,
Reason: "AwaitingVolumeDetachment",
Message: "Awaiting deletion VolumeAttachments bound to node",
DedupeValues: []string{node.Name},
Message: fmt.Sprintf(
"Awaiting deletion of bound VolumeAttachments (%s)",
pretty.Slice(lo.Map(volumeAttachments, func(va *storagev1.VolumeAttachment, _ int) string {
return va.Name
}), 5),
),
DedupeValues: []string{node.Name},
}
}

Expand Down
8 changes: 7 additions & 1 deletion pkg/utils/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func IgnoreNodeClaimNotFoundError(err error) error {
// DuplicateNodeClaimError is an error returned when multiple v1.NodeClaims are found matching the passed providerID
type DuplicateNodeClaimError struct {
ProviderID string
NodeClaims []string
}

func (e *DuplicateNodeClaimError) Error() string {
Expand Down Expand Up @@ -119,7 +120,12 @@ func NodeClaimForNode(ctx context.Context, c client.Client, node *corev1.Node) (
return nil, err
}
if len(nodeClaims) > 1 {
return nil, &DuplicateNodeClaimError{ProviderID: node.Spec.ProviderID}
return nil, &DuplicateNodeClaimError{
ProviderID: node.Spec.ProviderID,
NodeClaims: lo.Map(nodeClaims, func(nc *v1.NodeClaim, _ int) string {
return nc.Name
}),
}
}
if len(nodeClaims) == 0 {
return nil, &NodeClaimNotFoundError{ProviderID: node.Spec.ProviderID}
Expand Down

0 comments on commit 4ed938c

Please sign in to comment.