diff --git a/.gitignore b/.gitignore index 53b95349..9e7f355c 100644 --- a/.gitignore +++ b/.gitignore @@ -28,3 +28,6 @@ go.work # macOS .DS_Store + +# CI Artifacts +artifacts diff --git a/api/v1alpha1/etcdcluster_types.go b/api/v1alpha1/etcdcluster_types.go index 3b195335..6d02f004 100644 --- a/api/v1alpha1/etcdcluster_types.go +++ b/api/v1alpha1/etcdcluster_types.go @@ -130,6 +130,82 @@ type ProviderCertManagerConfig struct { type EtcdClusterStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster // Important: Run "make" to regenerate code after modifying this file + + // ObservedGeneration is the most recent generation observed for this EtcdCluster by the controller. + // +optional + ObservedGeneration int64 `json:"observedGeneration,omitempty"` + + // CurrentReplicas is the number of etcd pods managed by the StatefulSet for this cluster. + // This reflects the .spec.replicas of the underlying StatefulSet. + // +optional + CurrentReplicas int32 `json:"currentReplicas,omitempty"` + + // ReadyReplicas is the number of etcd pods managed by the StatefulSet that are currently ready. + // This reflects the .status.readyReplicas of the underlying StatefulSet. + // +optional + ReadyReplicas int32 `json:"readyReplicas,omitempty"` + + // MemberCount is the number of members currently registered in the etcd cluster, + // as reported by the etcd 'member list' API. This may differ from CurrentReplicas + // during scaling operations or if members are added/removed outside the operator's direct control. + // +optional + MemberCount int32 `json:"memberCount,omitempty"` + + // CurrentVersion is the observed etcd version of the cluster. + // This is typically derived from the version of the healthy leader or a consensus among healthy members. + // +optional + CurrentVersion string `json:"currentVersion,omitempty"` + + // LeaderId is the hex-encoded ID of the current etcd cluster leader, if one exists and is known. + // +optional + LeaderId string `json:"leaderId,omitempty"` + + // TODO: expose LastDefragTime once the controller owns automated defragmentation. + + // Members provides the status of each individual etcd member. + // +optional + // +listType=map + // +listMapKey=id + // Alternative listMapKey could be 'name' if 'id' is not always immediately available or stable during init. + // However, 'id' is more canonical once a member is part of the cluster. + Members []MemberStatus `json:"members,omitempty"` + + // Conditions represent the latest available observations of the EtcdCluster's state. + // +optional + // +patchMergeKey=type + // +patchStrategy=merge + // +listType=map + // +listMapKey=type + Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"` +} + +// MemberStatus defines the observed state of a single etcd member. +type MemberStatus struct { + // Name of the etcd member, typically the pod name (e.g., "etcd-cluster-example-0"). + // This can also be the name reported by etcd itself if set. + // +optional + Name string `json:"name,omitempty"` + + // ID is the hex-encoded member ID as reported by etcd. + // This is the canonical identifier for an etcd member. + ID string `json:"id"` // Made non-optional as it's key for identification + + // Version of etcd running on this member. + // +optional + Version string `json:"version,omitempty"` + + // IsHealthy indicates if the member is considered healthy. + // A member is healthy if its etcd /health endpoint is reachable and reports OK, + // and its Status endpoint does not report any 'Errors'. + IsHealthy bool `json:"isHealthy"` // No omitempty, always show health + + // IsLearner indicates if the member is currently a learner in the etcd cluster. + // +optional + IsLearner bool `json:"isLearner,omitempty"` + + // IsLeader indicates if this member is currently the cluster leader. + // +optional + IsLeader bool `json:"isLeader,omitempty"` } // +kubebuilder:object:root=true diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index e1088bd2..7150dc9b 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -21,6 +21,7 @@ limitations under the License. package v1alpha1 import ( + "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" netx "net" ) @@ -83,7 +84,7 @@ func (in *EtcdCluster) DeepCopyInto(out *EtcdCluster) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) - out.Status = in.Status + in.Status.DeepCopyInto(&out.Status) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EtcdCluster. @@ -174,6 +175,18 @@ func (in *EtcdClusterSpec) DeepCopy() *EtcdClusterSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *EtcdClusterStatus) DeepCopyInto(out *EtcdClusterStatus) { *out = *in + if in.Members != nil { + in, out := &in.Members, &out.Members + *out = make([]MemberStatus, len(*in)) + copy(*out, *in) + } + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]v1.Condition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EtcdClusterStatus. @@ -186,6 +199,21 @@ func (in *EtcdClusterStatus) DeepCopy() *EtcdClusterStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *MemberStatus) DeepCopyInto(out *MemberStatus) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MemberStatus. +func (in *MemberStatus) DeepCopy() *MemberStatus { + if in == nil { + return nil + } + out := new(MemberStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PodMetadata) DeepCopyInto(out *PodMetadata) { *out = *in diff --git a/config/crd/bases/operator.etcd.io_etcdclusters.yaml b/config/crd/bases/operator.etcd.io_etcdclusters.yaml index aa67be64..1402ece5 100644 --- a/config/crd/bases/operator.etcd.io_etcdclusters.yaml +++ b/config/crd/bases/operator.etcd.io_etcdclusters.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.18.0 + controller-gen.kubebuilder.io/version: v0.19.0 name: etcdclusters.operator.etcd.io spec: group: operator.etcd.io @@ -221,6 +221,145 @@ spec: type: object status: description: EtcdClusterStatus defines the observed state of EtcdCluster. + properties: + conditions: + description: Conditions represent the latest available observations + of the EtcdCluster's state. + items: + description: Condition contains details for one aspect of the current + state of this API Resource. + properties: + lastTransitionTime: + description: |- + lastTransitionTime is the last time the condition transitioned from one status to another. + This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable. + format: date-time + type: string + message: + description: |- + message is a human readable message indicating details about the transition. + This may be an empty string. + maxLength: 32768 + type: string + observedGeneration: + description: |- + observedGeneration represents the .metadata.generation that the condition was set based upon. + For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date + with respect to the current state of the instance. + format: int64 + minimum: 0 + type: integer + reason: + description: |- + reason contains a programmatic identifier indicating the reason for the condition's last transition. + Producers of specific condition types may define expected values and meanings for this field, + and whether the values are considered a guaranteed API. + The value should be a CamelCase string. + This field may not be empty. + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ + type: string + status: + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown + type: string + type: + description: type of condition in CamelCase or in foo.example.com/CamelCase. + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - reason + - status + - type + type: object + type: array + x-kubernetes-list-map-keys: + - type + x-kubernetes-list-type: map + currentReplicas: + description: |- + CurrentReplicas is the number of etcd pods managed by the StatefulSet for this cluster. + This reflects the .spec.replicas of the underlying StatefulSet. + format: int32 + type: integer + currentVersion: + description: |- + CurrentVersion is the observed etcd version of the cluster. + This is typically derived from the version of the healthy leader or a consensus among healthy members. + type: string + leaderId: + description: LeaderId is the hex-encoded ID of the current etcd cluster + leader, if one exists and is known. + type: string + memberCount: + description: |- + MemberCount is the number of members currently registered in the etcd cluster, + as reported by the etcd 'member list' API. This may differ from CurrentReplicas + during scaling operations or if members are added/removed outside the operator's direct control. + format: int32 + type: integer + members: + description: |- + Members provides the status of each individual etcd member. + Alternative listMapKey could be 'name' if 'id' is not always immediately available or stable during init. + However, 'id' is more canonical once a member is part of the cluster. + items: + description: MemberStatus defines the observed state of a single + etcd member. + properties: + id: + description: |- + ID is the hex-encoded member ID as reported by etcd. + This is the canonical identifier for an etcd member. + type: string + isHealthy: + description: |- + IsHealthy indicates if the member is considered healthy. + A member is healthy if its etcd /health endpoint is reachable and reports OK, + and its Status endpoint does not report any 'Errors'. + type: boolean + isLeader: + description: IsLeader indicates if this member is currently + the cluster leader. + type: boolean + isLearner: + description: IsLearner indicates if the member is currently + a learner in the etcd cluster. + type: boolean + name: + description: |- + Name of the etcd member, typically the pod name (e.g., "etcd-cluster-example-0"). + This can also be the name reported by etcd itself if set. + type: string + version: + description: Version of etcd running on this member. + type: string + required: + - id + - isHealthy + type: object + type: array + x-kubernetes-list-map-keys: + - id + x-kubernetes-list-type: map + observedGeneration: + description: ObservedGeneration is the most recent generation observed + for this EtcdCluster by the controller. + format: int64 + type: integer + readyReplicas: + description: |- + ReadyReplicas is the number of etcd pods managed by the StatefulSet that are currently ready. + This reflects the .status.readyReplicas of the underlying StatefulSet. + format: int32 + type: integer type: object type: object served: true diff --git a/internal/controller/etcdcluster_controller.go b/internal/controller/etcdcluster_controller.go index ae6f0cfa..96863b41 100644 --- a/internal/controller/etcdcluster_controller.go +++ b/internal/controller/etcdcluster_controller.go @@ -25,6 +25,7 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" @@ -33,6 +34,7 @@ import ( ecv1alpha1 "go.etcd.io/etcd-operator/api/v1alpha1" "go.etcd.io/etcd-operator/internal/etcdutils" + "go.etcd.io/etcd-operator/pkg/status" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -52,10 +54,12 @@ type EtcdClusterReconciler struct { // Every phase of Reconcile stores intermediate information here so that // subsequent phases can operate without additional lookups. type reconcileState struct { - cluster *ecv1alpha1.EtcdCluster // cluster custom resource currently being reconciled - sts *appsv1.StatefulSet // associated StatefulSet for the cluster - memberListResp *clientv3.MemberListResponse // member list fetched from the etcd cluster - memberHealth []etcdutils.EpHealth // health information for each etcd member + cluster *ecv1alpha1.EtcdCluster // cluster custom resource currently being reconciled + sts *appsv1.StatefulSet // associated StatefulSet for the cluster + memberListResp *clientv3.MemberListResponse // member list fetched from the etcd cluster + memberHealth []etcdutils.EpHealth // health information for each etcd member + calculatedStatus *ecv1alpha1.EtcdClusterStatus // calculated status for this reconcile cycle + cm *status.Manager // condition manager for setting status conditions } // +kubebuilder:rbac:groups=operator.etcd.io,resources=etcdclusters,verbs=get;list;watch;create;update;patch;delete @@ -75,16 +79,24 @@ type reconcileState struct { // health of the etcd cluster and then adjusts its state to match the desired // specification. Each phase is handled by a dedicated helper method. // +// Status updates are tracked in state.calculatedStatus and atomically patched +// at the end via a deferred call to ensure status reflects the reconcile outcome. +// // For more details on the controller-runtime Reconcile contract see: // https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/reconcile -func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error) { state, res, err := r.fetchAndValidateState(ctx, req) if state == nil || err != nil { return res, err } - if bootstrapRes, err := r.bootstrapStatefulSet(ctx, state); err != nil || !bootstrapRes.IsZero() { - return bootstrapRes, err + r.initializeStatusTracking(state) + defer r.registerStatusPatch(ctx, state, &res, &err) + + r.setBaselineConditions(state) + + if res, err = r.bootstrapStatefulSet(ctx, state); err != nil || !res.IsZero() { + return res, err } if err = r.performHealthChecks(ctx, state); err != nil { @@ -150,6 +162,72 @@ func (r *EtcdClusterReconciler) fetchAndValidateState(ctx context.Context, req c return &reconcileState{cluster: ec, sts: sts}, ctrl.Result{}, nil } +// initializeStatusTracking makes a deep copy of the CR status so we can compute +// changes without mutating the object fetched from the API server. It also +// prepares a condition manager bound to the current generation. +func (r *EtcdClusterReconciler) initializeStatusTracking(state *reconcileState) { + if state == nil || state.cluster == nil { + return + } + + statusCopy := state.cluster.Status.DeepCopy() + state.calculatedStatus = statusCopy + state.cm = status.NewManager(&state.calculatedStatus.Conditions, state.cluster.Generation) +} + +// setBaselineConditions ensures we report progress without clobbering established state. +func (r *EtcdClusterReconciler) setBaselineConditions(state *reconcileState) { + if state == nil || state.cm == nil || state.calculatedStatus == nil { + return + } + + updater := state.cm.Update() + + if meta.FindStatusCondition(state.calculatedStatus.Conditions, status.ConditionAvailable) == nil { + updater = updater.Available(false, status.ReasonReconciling, "Reconciliation started") + } + + updater = updater.Progressing(true, status.ReasonReconciling, "Reconciliation started") + + if meta.FindStatusCondition(state.calculatedStatus.Conditions, status.ConditionDegraded) == nil { + updater = updater.Degraded(false, status.ReasonReconciling, "Reconciliation started") + } + + updater.Apply() +} + +// registerStatusPatch defers patching the status subresource using the +// calculated snapshot. Any patch error is surfaced through the main reconcile +// error/result so callers do not have to handle status updates manually. +func (r *EtcdClusterReconciler) registerStatusPatch( + ctx context.Context, + state *reconcileState, + res *ctrl.Result, + reconcileErr *error, +) { + if state == nil || state.cluster == nil || state.calculatedStatus == nil { + return + } + + logger := log.FromContext(ctx) + + state.calculatedStatus.ObservedGeneration = state.cluster.Generation + + patchErr := status.PatchStatusMutate(ctx, r.Client, state.cluster, func(latest *ecv1alpha1.EtcdCluster) error { + latest.Status = *state.calculatedStatus + return nil + }) + if patchErr != nil { + logger.Error(patchErr, "Failed to patch EtcdCluster status") + if reconcileErr != nil && *reconcileErr == nil { + *reconcileErr = patchErr + if res != nil { + *res = ctrl.Result{} + } + } + } +} + // bootstrapStatefulSet ensures that the foundational Kubernetes objects for // a cluster exist and are correctly initialized. It creates the StatefulSet (initially // with 0 replicas) and the headless Service if necessary. When either resource @@ -164,22 +242,59 @@ func (r *EtcdClusterReconciler) bootstrapStatefulSet(ctx context.Context, s *rec switch { case s.sts == nil: logger.Info("Creating StatefulSet with 0 replica", "expectedSize", s.cluster.Spec.Size) + s.cm.Update(). + Progressing(true, status.ReasonCreatingResources, "Creating StatefulSet"). + Available(false, status.ReasonCreatingResources, "StatefulSet does not exist yet"). + Apply() + s.sts, err = reconcileStatefulSet(ctx, logger, s.cluster, r.Client, 0, r.Scheme) if err != nil { + errMsg := status.FormatError("Failed to create StatefulSet", err) + s.cm.Update(). + Available(false, status.ReasonResourceCreateFail, errMsg). + Progressing(false, status.ReasonResourceCreateFail, errMsg). + Degraded(true, status.ReasonResourceCreateFail, errMsg). + Apply() return ctrl.Result{}, err } + r.updateStatusFromStatefulSet(s.calculatedStatus, s.sts) + s.cm.Update(). + Progressing(true, status.ReasonInitializingCluster, "StatefulSet created with 0 replicas, requires scaling to 1"). + Apply() requeue = true case s.sts.Spec.Replicas != nil && *s.sts.Spec.Replicas == 0: logger.Info("StatefulSet has 0 replicas. Trying to create a new cluster with 1 member") + s.cm.Update(). + Progressing(true, status.ReasonInitializingCluster, "Scaling StatefulSet from 0 to 1 replica"). + Apply() + s.sts, err = reconcileStatefulSet(ctx, logger, s.cluster, r.Client, 1, r.Scheme) if err != nil { + errMsg := status.FormatError("Failed to scale StatefulSet to 1", err) + s.cm.Update(). + Available(false, status.ReasonResourceUpdateFail, errMsg). + Progressing(false, status.ReasonResourceUpdateFail, errMsg). + Degraded(true, status.ReasonResourceUpdateFail, errMsg). + Apply() return ctrl.Result{}, err } + r.updateStatusFromStatefulSet(s.calculatedStatus, s.sts) requeue = true } + // Update status from STS if it exists + if s.sts != nil { + r.updateStatusFromStatefulSet(s.calculatedStatus, s.sts) + } + if err = createHeadlessServiceIfNotExist(ctx, logger, r.Client, s.cluster, r.Scheme); err != nil { + errMsg := status.FormatError("Failed to ensure Headless Service", err) + s.cm.Update(). + Available(false, status.ReasonResourceCreateFail, errMsg). + Progressing(false, status.ReasonResourceCreateFail, errMsg). + Degraded(true, status.ReasonResourceCreateFail, errMsg). + Apply() return ctrl.Result{}, err } @@ -191,15 +306,60 @@ func (r *EtcdClusterReconciler) bootstrapStatefulSet(ctx context.Context, s *rec // performHealthChecks obtains the member list and health status from the etcd // cluster specified in the StatefulSet. Results are stored on the reconcileState -// for later reconciliation steps. +// for later reconciliation steps. Also populates status with member information. func (r *EtcdClusterReconciler) performHealthChecks(ctx context.Context, s *reconcileState) error { logger := log.FromContext(ctx) logger.Info("Now checking health of the cluster members") var err error s.memberListResp, s.memberHealth, err = healthCheck(s.sts, logger) if err != nil { + logger.Error(err, "Health check failed") + errMsg := status.FormatError("Health check failed", err) + s.cm.Update(). + Available(false, status.ReasonHealthCheckError, errMsg). + Degraded(true, status.ReasonHealthCheckError, errMsg). + Progressing(true, status.ReasonHealthCheckError, "Retrying after health check failure"). + Apply() + + // Clear stale data on error + if s.calculatedStatus != nil { + s.calculatedStatus.Members = nil + s.calculatedStatus.MemberCount = 0 + s.calculatedStatus.LeaderId = "" + s.calculatedStatus.CurrentVersion = "" + } return fmt.Errorf("health check failed: %w", err) } + + // Update member count + memberCnt := 0 + if s.memberListResp != nil { + memberCnt = len(s.memberListResp.Members) + } + + if s.calculatedStatus != nil { + s.calculatedStatus.MemberCount = int32(memberCnt) + + // Find leader and populate leader info + var leaderIDHex string + s.calculatedStatus.LeaderId = "" + s.calculatedStatus.CurrentVersion = "" + + if memberCnt > 0 { + _, leaderStatus := etcdutils.FindLeaderStatus(s.memberHealth, logger) + if leaderStatus != nil && leaderStatus.Header != nil { + leaderIDHex = fmt.Sprintf("%x", leaderStatus.Header.MemberId) + s.calculatedStatus.LeaderId = leaderIDHex + if leaderStatus.Version != "" { + s.calculatedStatus.CurrentVersion = leaderStatus.Version + } + } + } + + // Populate detailed member statuses + s.calculatedStatus.Members = r.populateMemberStatuses(s.memberListResp, s.memberHealth, leaderIDHex) + } + return nil } @@ -208,130 +368,352 @@ func (r *EtcdClusterReconciler) performHealthChecks(ctx context.Context, s *reco // and handles learner promotion when needed. A ctrl.Result with a requeue // instructs the controller to retry after adjustments. func (r *EtcdClusterReconciler) reconcileClusterState(ctx context.Context, s *reconcileState) (ctrl.Result, error) { - logger := log.FromContext(ctx) memberCnt := 0 if s.memberListResp != nil { memberCnt = len(s.memberListResp.Members) } targetReplica := *s.sts.Spec.Replicas - var err error - // The number of replicas in the StatefulSet doesn't match the number of etcd members in the cluster. - if int(targetReplica) != memberCnt { - logger.Info("The expected number of replicas doesn't match the number of etcd members in the cluster", "targetReplica", targetReplica, "memberCnt", memberCnt) - if int(targetReplica) < memberCnt { - logger.Info("An etcd member was added into the cluster, but the StatefulSet hasn't scaled out yet") - newReplicaCount := targetReplica + 1 - logger.Info("Increasing StatefulSet replicas to match the etcd cluster member count", "oldReplicaCount", targetReplica, "newReplicaCount", newReplicaCount) - if _, err := reconcileStatefulSet(ctx, logger, s.cluster, r.Client, newReplicaCount, r.Scheme); err != nil { - return ctrl.Result{}, err - } - } else { - logger.Info("An etcd member was removed from the cluster, but the StatefulSet hasn't scaled in yet") - newReplicaCount := targetReplica - 1 - logger.Info("Decreasing StatefulSet replicas to remove the unneeded Pod.", "oldReplicaCount", targetReplica, "newReplicaCount", newReplicaCount) - if _, err := reconcileStatefulSet(ctx, logger, s.cluster, r.Client, newReplicaCount, r.Scheme); err != nil { - return ctrl.Result{}, err - } + if res, handled, err := r.handleReplicaMismatch(ctx, s, memberCnt, targetReplica); handled { + return res, err + } + + if res, handled, err := r.handlePendingLearner(ctx, s, memberCnt); handled { + return res, err + } + + if targetReplica == int32(s.cluster.Spec.Size) { + return r.handleStableCluster(ctx, s) + } + + return r.adjustReplicaTowardsSpec(ctx, s, memberCnt, targetReplica) +} + +// handleReplicaMismatch ensures the StatefulSet replica count eventually +// matches the number of etcd members observed in the cluster. +// - If members > replicas: scale STS out by 1 and requeue. +// - If members < replicas: scale STS in by 1 and requeue. +// Conditions: +// - Progressing=True, Available=False with ReasonMembersMismatch while adjusting +func (r *EtcdClusterReconciler) handleReplicaMismatch( + ctx context.Context, + s *reconcileState, + memberCnt int, + targetReplica int32, +) (ctrl.Result, bool, error) { + if int(targetReplica) == memberCnt { + return ctrl.Result{}, false, nil + } + + logger := log.FromContext(ctx) + logger.Info("The expected number of replicas doesn't match the number of etcd members in the cluster", "targetReplica", targetReplica, "memberCnt", memberCnt) + progressMsg := fmt.Sprintf("StatefulSet replicas (%d) differ from etcd members (%d), adjusting...", targetReplica, memberCnt) + s.cm.Update(). + Progressing(true, status.ReasonMembersMismatch, progressMsg). + Available(false, status.ReasonMembersMismatch, "Adjusting StatefulSet replicas to match etcd members"). + Apply() + + var err error + var newSts *appsv1.StatefulSet + if int(targetReplica) < memberCnt { + logger.Info("An etcd member was added into the cluster, but the StatefulSet hasn't scaled out yet") + newReplicaCount := targetReplica + 1 + logger.Info("Increasing StatefulSet replicas to match the etcd cluster member count", "oldReplicaCount", targetReplica, "newReplicaCount", newReplicaCount) + if newSts, err = reconcileStatefulSet(ctx, logger, s.cluster, r.Client, newReplicaCount, r.Scheme); err != nil { + errMsg := status.FormatError("Failed to adjust StatefulSet replicas", err) + s.cm.Update(). + Available(false, status.ReasonResourceUpdateFail, errMsg). + Progressing(false, status.ReasonResourceUpdateFail, errMsg). + Degraded(true, status.ReasonResourceUpdateFail, errMsg). + Apply() + return ctrl.Result{}, true, err } - return ctrl.Result{RequeueAfter: requeueDuration}, nil + } else { + logger.Info("An etcd member was removed from the cluster, but the StatefulSet hasn't scaled in yet") + newReplicaCount := targetReplica - 1 + logger.Info("Decreasing StatefulSet replicas to remove the unneeded Pod.", "oldReplicaCount", targetReplica, "newReplicaCount", newReplicaCount) + if newSts, err = reconcileStatefulSet(ctx, logger, s.cluster, r.Client, newReplicaCount, r.Scheme); err != nil { + errMsg := status.FormatError("Failed to adjust StatefulSet replicas", err) + s.cm.Update(). + Available(false, status.ReasonResourceUpdateFail, errMsg). + Progressing(false, status.ReasonResourceUpdateFail, errMsg). + Degraded(true, status.ReasonResourceUpdateFail, errMsg). + Apply() + return ctrl.Result{}, true, err + } + } + s.sts = newSts + + if s.calculatedStatus != nil { + r.updateStatusFromStatefulSet(s.calculatedStatus, s.sts) + } + return ctrl.Result{RequeueAfter: requeueDuration}, true, nil +} + +// handlePendingLearner promotes a pending learner to a voting member when +// it is sufficiently caught up. If no leader is present, we return and let +// a subsequent loop re-check after election. When a learner is not ready +// yet, we requeue to wait for it to catch up. +// Conditions: +// - Promoting: Progressing=True, Available=False +// - Leader missing: Available=False, Progressing=True, Degraded=True +func (r *EtcdClusterReconciler) handlePendingLearner( + ctx context.Context, + s *reconcileState, + memberCnt int, +) (ctrl.Result, bool, error) { + if memberCnt == 0 { + return ctrl.Result{}, false, nil + } + + logger := log.FromContext(ctx) + _, leaderStatus := etcdutils.FindLeaderStatus(s.memberHealth, logger) + if leaderStatus == nil { + s.cm.Update(). + Available(false, status.ReasonLeaderNotFound, "Cluster has no elected leader"). + Progressing(true, status.ReasonLeaderNotFound, "Waiting for leader election"). + Degraded(true, status.ReasonLeaderNotFound, "Cluster has no elected leader"). + Apply() + return ctrl.Result{}, true, fmt.Errorf("couldn't find leader, memberCnt: %d", memberCnt) } - var ( - learnerStatus *clientv3.StatusResponse - learner uint64 - leaderStatus *clientv3.StatusResponse - ) + learnerID, learnerStatus := etcdutils.FindLearnerStatus(s.memberHealth, logger) + if learnerID == 0 { + return ctrl.Result{}, false, nil + } - if memberCnt > 0 { - // Find the leader status - _, leaderStatus = etcdutils.FindLeaderStatus(s.memberHealth, logger) - if leaderStatus == nil { - // If the leader is not available, wait for the leader to be elected - return ctrl.Result{}, fmt.Errorf("couldn't find leader, memberCnt: %d", memberCnt) + logger.Info("Learner found", "learnedID", learnerID, "learnerStatus", learnerStatus) + if etcdutils.IsLearnerReady(leaderStatus, learnerStatus) { + logger.Info("Learner is ready to be promoted to voting member", "learnerID", learnerID) + logger.Info("Promoting the learner member", "learnerID", learnerID) + s.cm.Update(). + Progressing(true, status.ReasonPromotingLearner, fmt.Sprintf("Promoting learner %d to voting member", learnerID)). + Available(false, status.ReasonPromotingLearner, "Learner promotion in progress"). + Apply() + eps := clientEndpointsFromStatefulsets(s.sts) + eps = eps[:len(eps)-1] + if err := etcdutils.PromoteLearner(eps, learnerID); err != nil { + errMsg := status.FormatError("Failed to promote learner", err) + s.cm.Update(). + Available(false, status.ReasonPromotingLearner, errMsg). + Progressing(false, status.ReasonPromotingLearner, errMsg). + Degraded(true, status.ReasonPromotingLearner, errMsg). + Apply() + return ctrl.Result{}, true, err } + return ctrl.Result{RequeueAfter: requeueDuration}, true, nil + } - learner, learnerStatus = etcdutils.FindLearnerStatus(s.memberHealth, logger) - if learner > 0 { - // There is at least one learner. Try to promote it if it's ready; otherwise requeue and wait. - logger.Info("Learner found", "learnedID", learner, "learnerStatus", learnerStatus) - if etcdutils.IsLearnerReady(leaderStatus, learnerStatus) { - logger.Info("Learner is ready to be promoted to voting member", "learnerID", learner) - logger.Info("Promoting the learner member", "learnerID", learner) - eps := clientEndpointsFromStatefulsets(s.sts) - eps = eps[:(len(eps) - 1)] - if err := etcdutils.PromoteLearner(eps, learner); err != nil { - // The member is not promoted yet, so we error out and requeue via the caller. - return ctrl.Result{}, err - } - } else { - // Learner is not yet ready. We can't add another learner or proceed further until this one is promoted. - logger.Info("The learner member isn't ready to be promoted yet", "learnerID", learner) - return ctrl.Result{RequeueAfter: requeueDuration}, nil + logger.Info("The learner member isn't ready to be promoted yet", "learnerID", learnerID) + s.cm.Update(). + Progressing(true, status.ReasonPromotingLearner, fmt.Sprintf("Waiting for learner %d to catch up", learnerID)). + Available(false, status.ReasonPromotingLearner, "Learner not ready for promotion"). + Apply() + return ctrl.Result{RequeueAfter: requeueDuration}, true, nil +} + +// handleStableCluster runs once the expected replica count matches the spec. +// Ensure every etcd member reports healthy before declaring success. +// Conditions: +// - All healthy → Available=True, Progressing=False, Degraded=False +// - Some unhealthy → Available=False, (optionally) Progressing=False, Degraded=True +func (r *EtcdClusterReconciler) handleStableCluster(ctx context.Context, s *reconcileState) (ctrl.Result, error) { + logger := log.FromContext(ctx) + logger.Info("EtcdCluster is already up-to-date") + // Check if all members are healthy + allHealthy := true + if s.memberHealth != nil { + for _, health := range s.memberHealth { + if health.Error != "" || health.Status == nil || !health.Health { + allHealthy = false + break } } } - if targetReplica == int32(s.cluster.Spec.Size) { - logger.Info("EtcdCluster is already up-to-date") - return ctrl.Result{}, nil + if allHealthy { + s.cm.Update(). + Available(true, status.ReasonClusterReady, "Cluster is fully available and healthy"). + Progressing(false, status.ReasonReconcileSuccess, "All reconciliation operations complete"). + Degraded(false, status.ReasonClusterHealthy, "All members are healthy"). + Apply() + } else { + s.cm.Update(). + Available(false, status.ReasonMembersUnhealthy, "Some cluster members are unhealthy"). + Progressing(false, status.ReasonMembersUnhealthy, "Waiting for members to become healthy"). + Degraded(true, status.ReasonMembersUnhealthy, "Some cluster members are unhealthy"). + Apply() } + return ctrl.Result{}, nil +} +// adjustReplicaTowardsSpec moves the cluster towards the desired size in spec. +// Scale-out path: +// - Add a learner via etcd API first, then increment desired replica locally +// and reconcile the StatefulSet to include the new pod. We requeue to allow +// the learner to be promoted in later loops. +// +// Scale-in path: +// - Remove the last member via etcd API, then decrease STS by 1 and requeue. +// +// Conditions: +// - ScalingUp/ScalingDown set Progressing=True, Available=False while in-flight +func (r *EtcdClusterReconciler) adjustReplicaTowardsSpec( + ctx context.Context, + s *reconcileState, + memberCnt int, + targetReplica int32, +) (ctrl.Result, error) { + logger := log.FromContext(ctx) eps := clientEndpointsFromStatefulsets(s.sts) - // If there are no learners left, we can proceed to scale the cluster towards the desired size. // When there are no members to add, the controller will requeue above and this block won't execute. if targetReplica < int32(s.cluster.Spec.Size) { // scale out _, peerURL := peerEndpointForOrdinalIndex(s.cluster, int(targetReplica)) - targetReplica++ logger.Info("[Scale out] adding a new learner member to etcd cluster", "peerURLs", peerURL) + s.cm.Update(). + Progressing(true, status.ReasonScalingUp, fmt.Sprintf("Adding learner member to scale to %d", s.cluster.Spec.Size)). + Available(false, status.ReasonScalingUp, "Cluster scaling operation in progress"). + Apply() if _, err := etcdutils.AddMember(eps, []string{peerURL}, true); err != nil { + errMsg := status.FormatError("Failed to add learner member", err) + s.cm.Update(). + Available(false, status.ReasonScalingUp, errMsg). + Progressing(false, status.ReasonScalingUp, errMsg). + Degraded(true, status.ReasonScalingUp, errMsg). + Apply() return ctrl.Result{}, err } logger.Info("Learner member added successfully", "peerURLs", peerURL) - if s.sts, err = reconcileStatefulSet(ctx, logger, s.cluster, r.Client, targetReplica, r.Scheme); err != nil { + // Increment the local desired replica only after AddMember succeeds. + // We then pass this value to reconcileStatefulSet to scale the STS. + targetReplica++ + if sts, err := reconcileStatefulSet(ctx, logger, s.cluster, r.Client, targetReplica, r.Scheme); err != nil { + errMsg := status.FormatError("Failed to update StatefulSet during scale-out", err) + s.cm.Update(). + Available(false, status.ReasonResourceUpdateFail, errMsg). + Progressing(false, status.ReasonResourceUpdateFail, errMsg). + Degraded(true, status.ReasonResourceUpdateFail, errMsg). + Apply() return ctrl.Result{}, err + } else { + s.sts = sts } + if s.calculatedStatus != nil { + r.updateStatusFromStatefulSet(s.calculatedStatus, s.sts) + } return ctrl.Result{RequeueAfter: requeueDuration}, nil } - if targetReplica > int32(s.cluster.Spec.Size) { - // scale in - targetReplica-- - logger = logger.WithValues("targetReplica", targetReplica, "expectedSize", s.cluster.Spec.Size) - - memberID := s.memberHealth[memberCnt-1].Status.Header.MemberId + // scale in: decrement desired size, remove + // the etcd member, then shrink the StatefulSet. + targetReplica-- + logger = logger.WithValues("targetReplica", targetReplica, "expectedSize", s.cluster.Spec.Size) + memberID := s.memberHealth[memberCnt-1].Status.Header.MemberId + + logger.Info("[Scale in] removing one member", "memberID", memberID) + s.cm.Update(). + Progressing(true, status.ReasonScalingDown, fmt.Sprintf("Removing member to scale down to %d", s.cluster.Spec.Size)). + Available(false, status.ReasonScalingDown, "Cluster scaling operation in progress"). + Apply() + eps = eps[:int(targetReplica)] + if err := etcdutils.RemoveMember(eps, memberID); err != nil { + errMsg := status.FormatError("Failed to remove member", err) + s.cm.Update(). + Available(false, status.ReasonScalingDown, errMsg). + Progressing(false, status.ReasonScalingDown, errMsg). + Degraded(true, status.ReasonScalingDown, errMsg). + Apply() + return ctrl.Result{}, err + } - logger.Info("[Scale in] removing one member", "memberID", memberID) - eps = eps[:targetReplica] - if err := etcdutils.RemoveMember(eps, memberID); err != nil { - return ctrl.Result{}, err - } + if sts, err := reconcileStatefulSet(ctx, logger, s.cluster, r.Client, targetReplica, r.Scheme); err != nil { + errMsg := status.FormatError("Failed to update StatefulSet during scale-in", err) + s.cm.Update(). + Available(false, status.ReasonResourceUpdateFail, errMsg). + Progressing(false, status.ReasonResourceUpdateFail, errMsg). + Degraded(true, status.ReasonResourceUpdateFail, errMsg). + Apply() + return ctrl.Result{}, err + } else { + s.sts = sts + } - if s.sts, err = reconcileStatefulSet(ctx, logger, s.cluster, r.Client, targetReplica, r.Scheme); err != nil { - return ctrl.Result{}, err - } + if s.calculatedStatus != nil { + r.updateStatusFromStatefulSet(s.calculatedStatus, s.sts) + } + return ctrl.Result{RequeueAfter: requeueDuration}, nil +} - return ctrl.Result{RequeueAfter: requeueDuration}, nil +// updateStatusFromStatefulSet updates the status replica counts from the StatefulSet. +func (r *EtcdClusterReconciler) updateStatusFromStatefulSet( + etcdClusterStatus *ecv1alpha1.EtcdClusterStatus, + sts *appsv1.StatefulSet, +) { + if etcdClusterStatus == nil { + return + } + if sts == nil { + etcdClusterStatus.CurrentReplicas = 0 + etcdClusterStatus.ReadyReplicas = 0 + return + } + if sts.Spec.Replicas != nil { + etcdClusterStatus.CurrentReplicas = *sts.Spec.Replicas } + etcdClusterStatus.ReadyReplicas = sts.Status.ReadyReplicas +} - // Ensure every etcd member reports itself healthy before declaring success. - allMembersHealthy, err := areAllMembersHealthy(s.sts, logger) - if err != nil { - return ctrl.Result{}, err +// populateMemberStatuses builds the Members slice for status from etcd member list and health info. +func (r *EtcdClusterReconciler) populateMemberStatuses( + memberListResp *clientv3.MemberListResponse, + memberHealth []etcdutils.EpHealth, + leaderIDHex string, +) []ecv1alpha1.MemberStatus { + if memberListResp == nil || len(memberListResp.Members) == 0 { + return nil } - if !allMembersHealthy { - // Requeue until the StatefulSet settles and all members are healthy. - return ctrl.Result{RequeueAfter: requeueDuration}, nil + // Index health responses by member ID so we can safely match even if etcd + // returns members and health entries in different orders. + healthByMemberID := make(map[uint64]etcdutils.EpHealth, len(memberHealth)) + for _, mh := range memberHealth { + if mh.Status != nil && mh.Status.Header != nil { + healthByMemberID[mh.Status.Header.MemberId] = mh + } } - logger.Info("EtcdCluster reconciled successfully") - return ctrl.Result{}, nil + members := make([]ecv1alpha1.MemberStatus, 0, len(memberListResp.Members)) + for _, m := range memberListResp.Members { + memberIDHex := fmt.Sprintf("%x", m.ID) + memberStatus := ecv1alpha1.MemberStatus{ + Name: m.Name, + ID: memberIDHex, + IsLearner: m.IsLearner, + } + + // Set leader flag + if leaderIDHex != "" && memberIDHex == leaderIDHex { + memberStatus.IsLeader = true + } + + // Set details from health check + if mh, ok := healthByMemberID[m.ID]; ok { + if mh.Health { + memberStatus.IsHealthy = true + } + if mh.Status != nil && mh.Status.Version != "" { + memberStatus.Version = mh.Status.Version + } + } + + members = append(members, memberStatus) + } + return members } // SetupWithManager sets up the controller with the Manager. diff --git a/internal/etcdutils/etcdutils.go b/internal/etcdutils/etcdutils.go index 7c894ca7..e411c342 100644 --- a/internal/etcdutils/etcdutils.go +++ b/internal/etcdutils/etcdutils.go @@ -92,6 +92,9 @@ func FindLeaderStatus(healthInfos []EpHealth, logger logr.Logger) (uint64, *clie // Find the leader status for i := range healthInfos { status := healthInfos[i].Status + if status == nil || status.Header == nil { + continue + } if status.Leader == status.Header.MemberId { leader = status.Header.MemberId leaderStatus = status @@ -110,9 +113,13 @@ func FindLearnerStatus(healthInfos []EpHealth, logger logr.Logger) (uint64, *cli var learnerStatus *clientv3.StatusResponse logger.Info("Now checking if there is any pending learner member that needs to be promoted") for i := range healthInfos { - if healthInfos[i].Status.IsLearner { - learner = healthInfos[i].Status.Header.MemberId - learnerStatus = healthInfos[i].Status + status := healthInfos[i].Status + if status == nil || status.Header == nil { + continue + } + if status.IsLearner { + learner = status.Header.MemberId + learnerStatus = status logger.Info("Learner member found", "memberID", learner) break } diff --git a/pkg/status/manager.go b/pkg/status/manager.go new file mode 100644 index 00000000..d939c122 --- /dev/null +++ b/pkg/status/manager.go @@ -0,0 +1,149 @@ +package status + +import ( + "fmt" + "time" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// Manager helps manage conditions in a Kubernetes resource status. +type Manager struct { + conditions *[]metav1.Condition + generation int64 // Store the generation +} + +// NewManager creates a new condition manager for the given slice of conditions and generation. +// The slice passed must be a pointer, as the manager will modify it directly. +func NewManager(conditions *[]metav1.Condition, generation int64) *Manager { + if conditions == nil { + // Initialize if nil to prevent panics. + emptyConditions := make([]metav1.Condition, 0) + conditions = &emptyConditions + } + return &Manager{conditions: conditions, generation: generation} +} + +// set is a private helper to set or update a condition using meta.SetStatusCondition. +// meta.SetStatusCondition handles updating LastTransitionTime only when the status changes. +func (m *Manager) set(conditionType string, status metav1.ConditionStatus, reason, message string) { + newCondition := metav1.Condition{ + Type: conditionType, + Status: status, + ObservedGeneration: m.generation, // Use stored generation + LastTransitionTime: metav1.NewTime(time.Now()), + Reason: reason, + Message: message, + } + meta.SetStatusCondition(m.conditions, newCondition) +} + +// --- Semantic helpers --- +// These helpers make setting common conditions more readable in the reconciler. + +// SetAvailable sets the Available condition status. +func (m *Manager) SetAvailable(status bool, reason, message string) { + conditionStatus := metav1.ConditionFalse + if status { + conditionStatus = metav1.ConditionTrue + } + m.set(ConditionAvailable, conditionStatus, reason, message) +} + +// SetProgressing sets the Progressing condition status. +func (m *Manager) SetProgressing(status bool, reason, message string) { + conditionStatus := metav1.ConditionFalse + if status { + conditionStatus = metav1.ConditionTrue + } + m.set(ConditionProgressing, conditionStatus, reason, message) +} + +// SetDegraded sets the Degraded condition status. +func (m *Manager) SetDegraded(status bool, reason, message string) { + conditionStatus := metav1.ConditionFalse + if status { + conditionStatus = metav1.ConditionTrue + } + m.set(ConditionDegraded, conditionStatus, reason, message) +} + +// Update returns a ConditionUpdater that allows chaining condition changes before applying them. +func (m *Manager) Update() ConditionUpdater { + return ConditionUpdater{manager: m} +} + +// ConditionUpdater batches updates to conditions so callers can configure multiple +// condition states without repeating boilerplate. Call Apply() to persist the changes. +type ConditionUpdater struct { + manager *Manager + updates []func() +} + +// Available queues an update for the Available condition. +func (u ConditionUpdater) Available(status bool, reason, message string) ConditionUpdater { + if u.manager == nil { + return u + } + + statusCopy := status + reasonCopy := reason + messageCopy := message + + u.updates = append(u.updates, func() { + u.manager.SetAvailable(statusCopy, reasonCopy, messageCopy) + }) + return u +} + +// Progressing queues an update for the Progressing condition. +func (u ConditionUpdater) Progressing(status bool, reason, message string) ConditionUpdater { + if u.manager == nil { + return u + } + + statusCopy := status + reasonCopy := reason + messageCopy := message + + u.updates = append(u.updates, func() { + u.manager.SetProgressing(statusCopy, reasonCopy, messageCopy) + }) + return u +} + +// Degraded queues an update for the Degraded condition. +func (u ConditionUpdater) Degraded(status bool, reason, message string) ConditionUpdater { + if u.manager == nil { + return u + } + + statusCopy := status + reasonCopy := reason + messageCopy := message + + u.updates = append(u.updates, func() { + u.manager.SetDegraded(statusCopy, reasonCopy, messageCopy) + }) + return u +} + +// Apply executes the queued condition updates. +func (u ConditionUpdater) Apply() { + if u.manager == nil { + return + } + + for _, update := range u.updates { + update() + } +} + +// Helper to format error messages concisely for Conditions +func FormatError(prefix string, err error) string { + if err == nil { + return prefix + } + return fmt.Sprintf("%s: %v", prefix, err) +} diff --git a/pkg/status/patch.go b/pkg/status/patch.go new file mode 100644 index 00000000..3bf8c67f --- /dev/null +++ b/pkg/status/patch.go @@ -0,0 +1,118 @@ +// Package status provides utilities for managing Kubernetes resource status, +// particularly focusing on Conditions and status patching. +package status + +import ( + "context" + "fmt" + + apiequality "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" // For GroupResource in conflict error + "k8s.io/client-go/util/retry" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +// DefaultRetry is the default backoff for retrying patch operations. +// Customize this if needed, e.g., by providing a different +// retry.Backoff instance to retry.RetryOnConflict. +var DefaultRetry = retry.DefaultRetry + +// PatchStatusMutate applies status changes using a mutate function. +// It fetches the latest version of the object, applies mutations via the mutate func, +// and then patches the status subresource if changes are detected. +// It uses optimistic locking and retries on conflict errors (like "the object has been modified"). +// +// Parameters: +// - ctx: The context for the operation. +// - c: The controller-runtime client. +// - originalObj: The original object instance fetched at the start of the reconcile loop. +// This is used to get the ObjectKey and the Generation at the start of the reconcile. +// It should not be modified by the reconciliation logic directly; status changes +// should be calculated and applied via the mutateFn on a fresh copy. +// - mutateFn: A function that takes the latest fetched object (as type T) and applies +// the calculated status changes *directly to that object's status field*. +// This function should return an error if the mutation itself fails for some reason, +// which will abort the patch attempt. +func PatchStatusMutate[T client.Object]( + ctx context.Context, + c client.Client, + originalObj T, // Original object from reconcile start + mutateFn func(latestFetchedObj T) error, // Mutate function now takes the latest object +) error { + logger := log.FromContext(ctx).WithValues( + "objectName", originalObj.GetName(), + "objectNamespace", originalObj.GetNamespace(), + "gvk", originalObj.GetObjectKind().GroupVersionKind().String(), + ) + key := client.ObjectKeyFromObject(originalObj) + startGeneration := originalObj.GetGeneration() // Generation at the start of reconcile + + return retry.RetryOnConflict(DefaultRetry, func() error { + // Fetch the latest version of the object in each retry iteration. + latestFetchedObj := originalObj.DeepCopyObject().(T) // Create a new instance of type T + getErr := c.Get(ctx, key, latestFetchedObj) + if getErr != nil { + if apierrors.IsNotFound(getErr) { + logger.Info("Object not found while attempting to patch status, skipping patch.") + return nil // Object deleted, no status to patch. + } + logger.Error(getErr, "Failed to get latest object for status patch.") + // Returning the error from Get will stop retries by default retry settings + // if it's not a conflict error. + return fmt.Errorf("failed to get latest object %v for status patch: %w", key, getErr) + } + + // Check if the generation changed during our reconcile. This might mean the spec changed + // and our calculated status is stale. + if startGeneration != latestFetchedObj.GetGeneration() { + logger.Info("Object generation changed during reconcile, status calculated based on old spec might be stale. "+ + "Aborting this patch attempt.", + "key", key, "startGeneration", startGeneration, "currentGeneration", latestFetchedObj.GetGeneration()) + // Returning nil here means we acknowledge the generation change and decide *not* to patch stale status. + // The main Reconcile loop will requeue soon with the new generation. + // This avoids spamming patch retries for stale data. + return nil + } + + // Create a copy *before* mutation to compare against. This is the object state + // from the API server *before* we apply our calculated changes for this attempt. + beforeMutateStatusCopy := latestFetchedObj.DeepCopyObject().(T) + + // Apply the calculated status changes by calling the mutate function. + // The mutate function should modify the 'latestFetchedObj's status field directly. + if err := mutateFn(latestFetchedObj); err != nil { + // If the mutation logic itself fails, we likely want to stop retrying the patch. + logger.Error(err, "Mutation function failed during status patch attempt.") + // Stop retries by returning non-conflict error + return fmt.Errorf("mutate function failed during status patch: %w", err) + } + + // Compare the object's status before and after mutation. + // We use Semantic.DeepEqual on the whole objects because `Status()` subresource patch + // still sends a patch based on the whole object typically. More accurately, + // we should compare just the status fields if we could extract them generically. + // However, comparing the whole object after mutation (which only touched status) + // against its state before mutation (but after GET) is correct. + if apiequality.Semantic.DeepEqual(beforeMutateStatusCopy, latestFetchedObj) { + logger.V(1).Info("No status change detected after applying mutation, skipping patch.") + return nil // No actual changes to status, no need to patch + } + + // Patch the status subresource using the mutated 'latestFetchedObj' object. + // client.MergeFrom(beforeMutateStatusCopy) generates a JSON merge patch from the diff between the + // mutated object and its pre-mutation copy. We intentionally avoid StrategicMerge + // because CRDs (like EtcdCluster) don't support it. + logger.Info("Status change detected, attempting to patch status subresource.") + patchErr := c.Status().Patch(ctx, latestFetchedObj, client.MergeFrom(beforeMutateStatusCopy)) + if patchErr != nil { + // Log the patch error. RetryOnConflict will decide whether to retry based on the error type. + // Conflict errors will be retried. Other errors might not. + logger.Info("Failed to patch status, will retry if conflict error.", "error", patchErr.Error()) + return patchErr // Return the error to retry.RetryOnConflict + } + + logger.Info("Successfully patched status subresource.") + return nil // Patch successful + }) +} diff --git a/pkg/status/type.go b/pkg/status/type.go new file mode 100644 index 00000000..595411b4 --- /dev/null +++ b/pkg/status/type.go @@ -0,0 +1,53 @@ +// Package status provides utilities for managing Kubernetes resource status, +// particularly focusing on Conditions according to standard practices. +package status + +// Condition types used for EtcdCluster status. +// Adhering to Kubernetes API conventions as much as possible. +// See: +// github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md + +const ( + // ConditionAvailable indicates that the etcd cluster has reached its desired state, + // has quorum, and is ready to serve requests. All members are healthy. + ConditionAvailable string = "Available" + + // ConditionProgressing indicates that the operator is actively working + // to bring the etcd cluster towards the desired state (e.g., creating resources, + // scaling, promoting learners). It's True when reconciliation is in progress + // and False when the desired state is reached or a terminal state occurs. + ConditionProgressing string = "Progressing" + + // ConditionDegraded indicates that the etcd cluster is functional but + // operating with potential issues that might impact performance or fault tolerance + // (e.g., some members unhealthy but quorum maintained, leader missing temporarily). + // It requires attention but is not necessarily completely unavailable. + ConditionDegraded string = "Degraded" +) + +// Common reasons for EtcdCluster status conditions. Reasons should be CamelCase and concise. +const ( + // General Reasons + ReasonReconciling string = "Reconciling" + ReasonReconcileSuccess string = "ReconcileSuccess" + ReasonClusterHealthy string = "ClusterHealthy" + + // Available Reasons + ReasonClusterReady string = "ClusterReady" + ReasonLeaderNotFound string = "LeaderNotFound" + ReasonSizeIsZero string = "SizeIsZero" + + // Progressing Reasons + ReasonInitializingCluster string = "InitializingCluster" + ReasonCreatingResources string = "CreatingResources" // STS, Service etc. + ReasonScalingUp string = "ScalingUp" + ReasonScalingDown string = "ScalingDown" + ReasonPromotingLearner string = "PromotingLearner" + ReasonMembersMismatch string = "MembersMismatch" + + // Degraded Reasons + ReasonMembersUnhealthy string = "MembersUnhealthy" + ReasonHealthCheckError string = "HealthCheckError" + ReasonResourceCreateFail string = "ResourceCreateFail" // Non-fatal resource creation failure + ReasonResourceUpdateFail string = "ResourceUpdateFail" // Non-fatal resource update failure +) diff --git a/test/e2e/datapersistence_test.go b/test/e2e/datapersistence_test.go index 2937c8dd..abaea9ec 100644 --- a/test/e2e/datapersistence_test.go +++ b/test/e2e/datapersistence_test.go @@ -76,6 +76,8 @@ func TestDataPersistence(t *testing.T) { feature.Setup(func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context { client := cfg.Client() + ctx = enableStatusRecording(ctx, t, cfg, t.Name(), etcdClusterName) + // create etcd cluster if err := client.Resources().Create(ctx, etcdCluster); err != nil { t.Fatalf("unable to create etcd cluster: %s", err) @@ -114,6 +116,8 @@ func TestDataPersistence(t *testing.T) { t.Fatalf("unable to create sts with size 1: %s", err) } + waitForClusterHealthyStatus(t, c, etcdClusterName, size) + return ctx }, ) @@ -198,6 +202,12 @@ func TestDataPersistence(t *testing.T) { }, ) + feature.Teardown(func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { + stopStatusRecording(ctx, t) + cleanupEtcdCluster(ctx, t, c, etcdClusterName) + return ctx + }) + _ = testEnv.Test(t, feature.Feature()) } diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 8178a53f..edff3a56 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -143,8 +143,10 @@ func TestScaling(t *testing.T) { etcdClusterName := fmt.Sprintf("etcd-%s", strings.ToLower(tc.name)) feature.Setup(func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { + ctx = enableStatusRecording(ctx, t, c, t.Name(), etcdClusterName) createEtcdClusterWithPVC(ctx, t, c, etcdClusterName, tc.initialSize) waitForSTSReadiness(t, c, etcdClusterName, tc.initialSize) + waitForClusterHealthyStatus(t, c, etcdClusterName, tc.initialSize) return ctx }) @@ -154,6 +156,7 @@ func TestScaling(t *testing.T) { scaleEtcdCluster(ctx, t, c, etcdClusterName, tc.scaleTo) // Wait until StatefulSet spec/status reflect the scaled size and are ready waitForSTSReadiness(t, c, etcdClusterName, tc.scaleTo) + waitForClusterHealthyStatus(t, c, etcdClusterName, tc.expectedMembers) return ctx }, ) @@ -177,6 +180,7 @@ func TestScaling(t *testing.T) { }) feature.Teardown(func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { + stopStatusRecording(ctx, t) cleanupEtcdCluster(ctx, t, c, etcdClusterName) return ctx }) @@ -191,8 +195,10 @@ func TestPodRecovery(t *testing.T) { etcdClusterName := "etcd-recovery-test" feature.Setup(func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { + ctx = enableStatusRecording(ctx, t, c, t.Name(), etcdClusterName) createEtcdClusterWithPVC(ctx, t, c, etcdClusterName, 3) waitForSTSReadiness(t, c, etcdClusterName, 3) + waitForClusterHealthyStatus(t, c, etcdClusterName, 3) return ctx }) @@ -238,6 +244,7 @@ func TestPodRecovery(t *testing.T) { // Wait for full StatefulSet readiness waitForSTSReadiness(t, c, etcdClusterName, int(initialReplicas)) + waitForClusterHealthyStatus(t, c, etcdClusterName, int(initialReplicas)) // Verify PVC usage after recovery verifyPodUsesPVC(t, c, targetPodName, "etcd-data-"+etcdClusterName) @@ -278,6 +285,7 @@ func TestPodRecovery(t *testing.T) { }) feature.Teardown(func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { + stopStatusRecording(ctx, t) cleanupEtcdCluster(ctx, t, c, etcdClusterName) return ctx }) @@ -294,6 +302,7 @@ func TestEtcdClusterFunctionality(t *testing.T) { feature.Setup(func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { createEtcdClusterWithPVC(ctx, t, c, etcdClusterName, 3) waitForSTSReadiness(t, c, etcdClusterName, 3) + waitForClusterHealthyStatus(t, c, etcdClusterName, 3) return ctx }) @@ -303,6 +312,7 @@ func TestEtcdClusterFunctionality(t *testing.T) { // Wait until all members are promoted to voting members (no learners), // otherwise endpoint health will fail on learners. waitForNoLearners(t, c, podName, 3) + waitForClusterHealthyStatus(t, c, etcdClusterName, 3) // Check health for the whole cluster rather than a single member command := []string{"etcdctl", "endpoint", "health", "--cluster"} diff --git a/test/e2e/helpers_test.go b/test/e2e/helpers_test.go index b6c7c5ba..c60aa820 100644 --- a/test/e2e/helpers_test.go +++ b/test/e2e/helpers_test.go @@ -19,22 +19,31 @@ package e2e import ( "bytes" "context" + "crypto/sha256" "encoding/json" "fmt" + "io" "os" + "path/filepath" "strings" + "sync" "testing" "time" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/e2e-framework/klient/k8s/resources" + "sigs.k8s.io/e2e-framework/klient/k8s/watcher" "sigs.k8s.io/e2e-framework/klient/wait" "sigs.k8s.io/e2e-framework/pkg/envconf" ecv1alpha1 "go.etcd.io/etcd-operator/api/v1alpha1" + "go.etcd.io/etcd-operator/pkg/status" etcdserverpb "go.etcd.io/etcd/api/v3/etcdserverpb" ) @@ -206,6 +215,260 @@ func getEtcdMemberListPB(t *testing.T, c *envconf.Config, podName string) *etcds return &memberList } +// StatusRecorder observes EtcdCluster status transitions and appends JSON lines to writer. +type StatusRecorder struct { + cancel context.CancelFunc + watcher *watcher.EventHandlerFuncs + done chan struct{} + writer io.Writer + mu sync.Mutex + lastHash string +} + +type statusRecorderHandle struct { + recorder *StatusRecorder + writer io.WriteCloser + once sync.Once +} + +func (h *statusRecorderHandle) stop(t *testing.T) { + if h == nil { + return + } + h.once.Do(func() { + if h.recorder != nil { + h.recorder.Stop() + } + if h.writer != nil { + if err := h.writer.Close(); err != nil { + t.Logf("failed to close status recorder writer: %v", err) + } + } + }) +} + +// StartStatusRecorder begins watching the EtcdCluster identified by nn. +// Whenever the status (including conditions) changes, a snapshot is written as JSON. +func StartStatusRecorder( + t *testing.T, + cfg *envconf.Config, + nn types.NamespacedName, + writer io.Writer, +) (*StatusRecorder, error) { + t.Helper() + + ctx, cancel := context.WithCancel(t.Context()) + res := cfg.Client().Resources().WithNamespace(nn.Namespace) + + w := res.Watch( + &ecv1alpha1.EtcdClusterList{}, + resources.WithFieldSelector(fmt.Sprintf("metadata.name=%s", nn.Name)), + ) + + recorder := &StatusRecorder{ + cancel: cancel, + watcher: w, + done: make(chan struct{}), + writer: writer, + } + + handle := func(event string, obj interface{}) { + cluster, ok := obj.(*ecv1alpha1.EtcdCluster) + if !ok || cluster == nil { + return + } + recorder.recordSnapshot(t, event, cluster) + } + + w.WithAddFunc(func(obj interface{}) { handle("ADDED", obj) }). + WithUpdateFunc(func(obj interface{}) { handle("MODIFIED", obj) }). + WithDeleteFunc(func(obj interface{}) { handle("DELETED", obj) }) + + if err := w.Start(ctx); err != nil { + cancel() + return nil, fmt.Errorf("failed to start status recorder watch: %w", err) + } + + go func() { + <-ctx.Done() + close(recorder.done) + }() + + return recorder, nil +} + +// Stop stops the recorder and waits for the watcher to terminate. +func (r *StatusRecorder) Stop() { + if r == nil { + return + } + + r.cancel() + if r.watcher != nil { + r.watcher.Stop() + } + <-r.done +} + +func (r *StatusRecorder) recordSnapshot(t *testing.T, event string, cluster *ecv1alpha1.EtcdCluster) { + statusCopy := ecv1alpha1.EtcdClusterStatus{} + cluster.Status.DeepCopyInto(&statusCopy) + + statusBytes, err := json.Marshal(statusCopy) + if err != nil { + t.Logf("failed to marshal EtcdCluster status for recorder: %v", err) + return + } + hashBytes := sha256.Sum256(statusBytes) + hash := fmt.Sprintf("%d|%d|%x", cluster.Generation, statusCopy.ObservedGeneration, hashBytes) + + payload := struct { + Timestamp string `json:"timestamp"` + Event string `json:"event"` + Generation int64 `json:"generation"` + ObservedGeneration int64 `json:"observedGeneration"` + ResourceVersion string `json:"resourceVersion"` + Status ecv1alpha1.EtcdClusterStatus `json:"status"` + }{ + Timestamp: time.Now().UTC().Format(time.RFC3339Nano), + Event: event, + Generation: cluster.Generation, + ObservedGeneration: statusCopy.ObservedGeneration, + ResourceVersion: cluster.ResourceVersion, + Status: statusCopy, + } + + data, err := json.Marshal(payload) + if err != nil { + t.Logf("failed to marshal recorder payload: %v", err) + return + } + + r.mu.Lock() + defer r.mu.Unlock() + if hash == r.lastHash { + return + } + if _, err := r.writer.Write(data); err != nil { + t.Logf("failed to write status recorder payload: %v", err) + return + } + if _, err := r.writer.Write([]byte("\n")); err != nil { + t.Logf("failed to finalize status recorder payload: %v", err) + return + } + r.lastHash = hash +} + +func createStatusHistoryWriter(t *testing.T, testName, clusterName string) io.WriteCloser { + t.Helper() + + base := resolveStatusArtifactsBase() + + dir := filepath.Join(base, sanitizeFileComponent(testName)) + if err := os.MkdirAll(dir, 0o755); err != nil { + t.Fatalf("failed to create status artifact directory %s: %v", dir, err) + } + + filePath := filepath.Join(dir, fmt.Sprintf("%s-status.jsonl", sanitizeFileComponent(clusterName))) + file, err := os.Create(filePath) + if err != nil { + t.Fatalf("failed to create status artifact file %s: %v", filePath, err) + } + + t.Logf("recording EtcdCluster status to %s", filePath) + return file +} + +func sanitizeFileComponent(s string) string { + s = strings.TrimSpace(s) + if s == "" { + return "unnamed" + } + var b strings.Builder + for _, r := range s { + if (r >= 'a' && r <= 'z') || + (r >= 'A' && r <= 'Z') || + (r >= '0' && r <= '9') || + r == '-' || r == '_' { + b.WriteRune(r) + } else { + b.WriteRune('-') + } + } + return b.String() +} + +func resolveStatusArtifactsBase() string { + if explicit := os.Getenv("ETCD_OPERATOR_E2E_ARTIFACTS_DIR"); explicit != "" { + return explicit + } + + if prowArtifacts := os.Getenv("ARTIFACTS"); prowArtifacts != "" { + return filepath.Join(prowArtifacts, "etcdcluster-status") + } + + return filepath.Join(".", "artifacts", "status") +} + +type statusRecorderContextKey struct{} + +var statusRecorderKey statusRecorderContextKey + +func enableStatusRecording( + ctx context.Context, + t *testing.T, + c *envconf.Config, + testName string, + clusterName string, +) context.Context { + t.Helper() + + if ctx == nil { + ctx = context.Background() + } + + if existing, ok := ctx.Value(statusRecorderKey).(*statusRecorderHandle); ok && existing != nil { + t.Logf("status recording already enabled for test %s", testName) + return ctx + } + + writer := createStatusHistoryWriter(t, testName, clusterName) + recorder, err := StartStatusRecorder( + t, + c, + types.NamespacedName{ + Name: clusterName, + Namespace: namespace, + }, + writer, + ) + if err != nil { + if closeErr := writer.Close(); closeErr != nil { + t.Logf("failed to close status recorder writer after start error: %v", closeErr) + } + t.Fatalf("failed to start status recorder: %v", err) + } + + handle := &statusRecorderHandle{ + recorder: recorder, + writer: writer, + } + return context.WithValue(ctx, statusRecorderKey, handle) +} + +func stopStatusRecording(ctx context.Context, t *testing.T) { + t.Helper() + + if ctx == nil { + return + } + + if handle, ok := ctx.Value(statusRecorderKey).(*statusRecorderHandle); ok && handle != nil { + handle.stop(t) + } +} + // waitForNoLearners waits until the member list has the expected number of members // and all members are voting (i.e., no learners remain). func waitForNoLearners(t *testing.T, c *envconf.Config, podName string, expectedMembers int) { @@ -296,3 +559,85 @@ func verifyDataOperations(t *testing.T, c *envconf.Config, etcdClusterName strin t.Errorf("Expected key-value pair [%s=%s], but got output: %s", testKey, testValue, stdout) } } + +func waitForClusterHealthyStatus( + t *testing.T, + c *envconf.Config, + name string, + expectedMembers int, +) { + t.Helper() + var cluster ecv1alpha1.EtcdCluster + err := wait.For(func(ctx context.Context) (bool, error) { + if err := c.Client().Resources().Get(ctx, name, namespace, &cluster); err != nil { + return false, err + } + + if cluster.Status.ObservedGeneration < cluster.Generation { + return false, nil + } + + if cluster.Status.CurrentReplicas != int32(expectedMembers) || + cluster.Status.ReadyReplicas != int32(expectedMembers) || + cluster.Status.MemberCount != int32(expectedMembers) || + len(cluster.Status.Members) != expectedMembers { + return false, nil + } + + leaderCount := 0 + for _, member := range cluster.Status.Members { + if !member.IsHealthy || member.IsLearner { + return false, nil + } + if member.IsLeader { + leaderCount++ + } + } + if leaderCount != 1 { + return false, nil + } + + if !conditionEquals( + cluster.Status.Conditions, + status.ConditionAvailable, + metav1.ConditionTrue, + status.ReasonClusterReady, + ) { + return false, nil + } + if !conditionEquals( + cluster.Status.Conditions, + status.ConditionProgressing, + metav1.ConditionFalse, + status.ReasonReconcileSuccess, + ) { + return false, nil + } + if !conditionEquals( + cluster.Status.Conditions, + status.ConditionDegraded, + metav1.ConditionFalse, + status.ReasonClusterHealthy, + ) { + return false, nil + } + + return true, nil + }, wait.WithTimeout(5*time.Minute), wait.WithInterval(5*time.Second)) + if err != nil { + t.Fatalf("EtcdCluster %s did not reach healthy status for %d members: %v", name, expectedMembers, err) + } +} + +func conditionEquals( + conditions []metav1.Condition, + condType string, + condStatus metav1.ConditionStatus, + condReason string, +) bool { + cond := meta.FindStatusCondition(conditions, condType) + if cond == nil { + return false + } + return cond.Status == condStatus && cond.Reason == condReason +}