Skip to content

Commit

Permalink
unify retry logic and add RetryCount and MaxPodRetries
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Ramlot <[email protected]>
  • Loading branch information
inteon committed Jul 9, 2024
1 parent 6cb31b6 commit 8f364af
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8152,6 +8152,9 @@ spec:
description: PrevReportStates stores the previous reported prowjob
state per reporter So crier won't make duplicated report attempt
type: object
retryCount:
description: Amount of times the Pod was retried.
type: integer
startTime:
description: StartTime is equal to the creation time of the ProwJob
format: date-time
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/prowjobs/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1057,6 +1057,8 @@ type ProwJobStatus struct {
PendingTime *metav1.Time `json:"pendingTime,omitempty"`
// CompletionTime is the timestamp for when the job goes to a final state
CompletionTime *metav1.Time `json:"completionTime,omitempty"`
// Amount of times the Pod was retried.
RetryCount int `json:"retryCount,omitempty"`
// +kubebuilder:validation:Enum=scheduling;triggered;pending;success;failure;aborted;error
// +kubebuilder:validation:Required
State ProwJobState `json:"state,omitempty"`
Expand Down
137 changes: 57 additions & 80 deletions pkg/plank/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ import (

const ControllerName = "plank"

const MaxPodRetries = 3

// PodStatus constants
const (
Evicted = "Evicted"
Expand Down Expand Up @@ -462,104 +464,57 @@ func (r *reconciler) syncPendingJob(ctx context.Context, pj *prowv1.ProwJob) (*r
pj.Status.PodName = pn
r.log.WithFields(pjutil.ProwJobFields(pj)).Info("Pod is missing, starting a new pod")
}
} else if pod.Status.Reason == Evicted {
// Pod was evicted.
if pj.Spec.ErrorOnEviction {
// ErrorOnEviction is enabled, complete the PJ and mark it as
// errored.
} else if transientFailure := getTransientFailure(pod); transientFailure != PodTransientFailureNone {
switch {
case transientFailure == PodTransientFailureEvicted && pj.Spec.ErrorOnEviction:
// ErrorOnEviction is enabled, complete the PJ and mark it as errored.
r.log.WithField("error-on-eviction", true).WithFields(pjutil.ProwJobFields(pj)).Info("Pods Node got evicted, fail job.")
pj.SetComplete()
pj.Status.State = prowv1.ErrorState
pj.Status.Description = "Job pod was evicted by the cluster."
} else {
// ErrorOnEviction is disabled. Delete the pod now and recreate it in
// the next resync.
r.log.WithFields(pjutil.ProwJobFields(pj)).Info("Pods Node got evicted, deleting & next sync loop will restart pod")
client, ok := r.buildClients[pj.ClusterAlias()]
if !ok {
return nil, TerminalError(fmt.Errorf("evicted pod %s: unknown cluster alias %q", pod.Name, pj.ClusterAlias()))
}
if finalizers := sets.New[string](pod.Finalizers...); finalizers.Has(kubernetesreporterapi.FinalizerName) {
// We want the end user to not see this, so we have to remove the finalizer, otherwise the pod hangs
oldPod := pod.DeepCopy()
pod.Finalizers = finalizers.Delete(kubernetesreporterapi.FinalizerName).UnsortedList()
if err := client.Patch(ctx, pod, ctrlruntimeclient.MergeFrom(oldPod)); err != nil {
return nil, fmt.Errorf("failed to patch pod trying to remove %s finalizer: %w", kubernetesreporterapi.FinalizerName, err)
}
}
r.log.WithField("name", pj.ObjectMeta.Name).Debug("Delete Pod.")
return nil, ctrlruntimeclient.IgnoreNotFound(client.Delete(ctx, pod))
}
} else if isPodTerminated(pod) {
// Pod was terminated.
if pj.Spec.ErrorOnTermination {
// ErrorOnTermination is enabled, complete the PJ and mark it as
// errored.
case transientFailure == PodTransientFailureTerminated && pj.Spec.ErrorOnTermination:
// ErrorOnTermination is enabled, complete the PJ and mark it as errored.
r.log.WithField("error-on-termination", true).WithFields(pjutil.ProwJobFields(pj)).Info("Pods Node got terminated, fail job.")
pj.SetComplete()
pj.Status.State = prowv1.ErrorState
pj.Status.Description = "Job pod's node was terminated."
} else {
// ErrorOnTermination is disabled. Delete the pod now and recreate it in
// the next resync.
r.log.WithFields(pjutil.ProwJobFields(pj)).Info("Pods Node got terminated, deleting & next sync loop will restart pod")
case pj.Status.RetryCount >= MaxPodRetries:
// MaxPodRetries is reached, complete the PJ and mark it as errored.
r.log.WithField("transient-failure", transientFailure).WithFields(pjutil.ProwJobFields(pj)).Info("Pod Node reached max retries, fail job.")
pj.SetComplete()
pj.Status.State = prowv1.ErrorState
pj.Status.Description = fmt.Sprintf("Job pod reached max retries (%d) for transient failure %s", MaxPodRetries, transientFailure)
default:
// Update the retry count and delete the pod so it gets recreated in the next resync.
pj.Status.RetryCount++
r.log.
WithField("transientFailure", transientFailure).
WithFields(pjutil.ProwJobFields(pj)).
Info("Pod has transient failure, deleting & next sync loop will restart pod")

client, ok := r.buildClients[pj.ClusterAlias()]
if !ok {
return nil, TerminalError(fmt.Errorf("terminated pod %s: unknown cluster alias %q", pod.Name, pj.ClusterAlias()))
return nil, TerminalError(fmt.Errorf("pod %s with transient failure %s: unknown cluster alias %q", pod.Name, transientFailure, pj.ClusterAlias()))
}
if finalizers := sets.New[string](pod.Finalizers...); finalizers.Has(kubernetesreporterapi.FinalizerName) {
if finalizers := sets.New(pod.Finalizers...); finalizers.Has(kubernetesreporterapi.FinalizerName) {
// We want the end user to not see this, so we have to remove the finalizer, otherwise the pod hangs
oldPod := pod.DeepCopy()
pod.Finalizers = finalizers.Delete(kubernetesreporterapi.FinalizerName).UnsortedList()
if err := client.Patch(ctx, pod, ctrlruntimeclient.MergeFrom(oldPod)); err != nil {
return nil, fmt.Errorf("failed to patch pod trying to remove %s finalizer: %w", kubernetesreporterapi.FinalizerName, err)
}
}
r.log.WithField("name", pj.ObjectMeta.Name).Debug("Delete Pod.")
return nil, ctrlruntimeclient.IgnoreNotFound(client.Delete(ctx, pod))
}
} else if pod.DeletionTimestamp != nil && pod.Status.Reason == NodeUnreachablePodReason {
// This can happen in any phase and means the node got evicted after it became unresponsive. Delete the finalizer so the pod
// vanishes and we will silently re-create it in the next iteration.
r.log.WithFields(pjutil.ProwJobFields(pj)).Info("Pods Node got lost, deleting & next sync loop will restart pod")
client, ok := r.buildClients[pj.ClusterAlias()]
if !ok {
return nil, TerminalError(fmt.Errorf("unknown pod %s: unknown cluster alias %q", pod.Name, pj.ClusterAlias()))
}

if finalizers := sets.New[string](pod.Finalizers...); finalizers.Has(kubernetesreporterapi.FinalizerName) {
// We want the end user to not see this, so we have to remove the finalizer, otherwise the pod hangs
oldPod := pod.DeepCopy()
pod.Finalizers = finalizers.Delete(kubernetesreporterapi.FinalizerName).UnsortedList()
if err := client.Patch(ctx, pod, ctrlruntimeclient.MergeFrom(oldPod)); err != nil {
return nil, fmt.Errorf("failed to patch pod trying to remove %s finalizer: %w", kubernetesreporterapi.FinalizerName, err)
}
}

return nil, nil
} else {
switch pod.Status.Phase {
case corev1.PodUnknown:
// Pod is in Unknown state. This can happen if there is a problem with
// the node. Delete the old pod, this will fire an event that triggers
// a new reconciliation in which we will re-create the pod.
r.log.WithFields(pjutil.ProwJobFields(pj)).Info("Pod is in unknown state, deleting & restarting pod")
client, ok := r.buildClients[pj.ClusterAlias()]
if !ok {
return nil, TerminalError(fmt.Errorf("unknown pod %s: unknown cluster alias %q", pod.Name, pj.ClusterAlias()))
// Pod is already deleted, so we don't need to delete it again.
if pod.DeletionTimestamp != nil {
return nil, nil
}

if finalizers := sets.New[string](pod.Finalizers...); finalizers.Has(kubernetesreporterapi.FinalizerName) {
// We want the end user to not see this, so we have to remove the finalizer, otherwise the pod hangs
oldPod := pod.DeepCopy()
pod.Finalizers = finalizers.Delete(kubernetesreporterapi.FinalizerName).UnsortedList()
if err := client.Patch(ctx, pod, ctrlruntimeclient.MergeFrom(oldPod)); err != nil {
return nil, fmt.Errorf("failed to patch pod trying to remove %s finalizer: %w", kubernetesreporterapi.FinalizerName, err)
}
}
r.log.WithField("name", pj.ObjectMeta.Name).Debug("Delete Pod.")
return nil, ctrlruntimeclient.IgnoreNotFound(client.Delete(ctx, pod))

}
} else {
switch pod.Status.Phase {
case corev1.PodSucceeded:
pj.SetComplete()
// There were bugs around this in the past so be paranoid and verify each container
Expand Down Expand Up @@ -701,31 +656,53 @@ func (r *reconciler) syncPendingJob(ctx context.Context, pj *prowv1.ProwJob) (*r
return nil, nil
}

func isPodTerminated(pod *corev1.Pod) bool {
type PodTransientFailure string

const (
PodTransientFailureNone PodTransientFailure = ""
PodTransientFailureUnknown PodTransientFailure = "unknown"
PodTransientFailureEvicted PodTransientFailure = "evicted"
PodTransientFailureTerminated PodTransientFailure = "terminated"
PodTransientFailureUnreachable PodTransientFailure = "unreachable"
)

func getTransientFailure(pod *corev1.Pod) PodTransientFailure {
if pod.Status.Reason == Evicted {
return PodTransientFailureEvicted
}

// If there was a Graceful node shutdown, the Pod's status will have a
// reason set to "Terminated":
// https://kubernetes.io/docs/concepts/architecture/nodes/#graceful-node-shutdown
if pod.Status.Reason == Terminated {
return true
return PodTransientFailureTerminated
}

for _, condition := range pod.Status.Conditions {
// If the node does no longer exist and the pod gets garbage collected,
// this condition will be set:
// https://kubernetes.io/docs/concepts/workloads/pods/disruptions/#pod-disruption-conditions
if condition.Reason == "DeletionByPodGC" {
return true
return PodTransientFailureTerminated
}

// On GCP, before a new spot instance is started, the old pods are garbage
// collected (if they have not been already by the Kubernetes PodGC):
// https://github.com/kubernetes/cloud-provider-gcp/blob/25e5dcc715781316bc5e39f8b17c0d5b313453f7/cmd/gcp-controller-manager/node_csr_approver.go#L1035-L1058
if condition.Reason == "DeletionByGCPControllerManager" {
return true
return PodTransientFailureTerminated
}
}

return false
if pod.Status.Reason == NodeUnreachablePodReason && pod.DeletionTimestamp != nil {
return PodTransientFailureUnreachable
}

if pod.Status.Phase == corev1.PodUnknown {
return PodTransientFailureUnknown
}

return PodTransientFailureNone
}

// syncTriggeredJob syncs jobs that do not yet have an associated test workload running
Expand Down

0 comments on commit 8f364af

Please sign in to comment.