Skip to content

Commit 2ef71cd

Browse files
committed
Save WIP
1 parent 34f88e6 commit 2ef71cd

File tree

2 files changed

+129
-12
lines changed

2 files changed

+129
-12
lines changed

internal/controller/etcdcluster_controller.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,11 +113,40 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
113113
}
114114
state.endpointsFound = clusterClient != nil && singleClients != nil
115115

116+
if clusterClient != nil {
117+
state.endpoints = clusterClient.Endpoints()
118+
}
119+
120+
// fetch PVCs
121+
state.pvcs, err = factory.PVCs(ctx, instance, r.Client)
122+
if err != nil {
123+
return ctrl.Result{}, err
124+
}
125+
116126
if !state.endpointsFound {
117127
if !state.stsExists {
118-
// TODO: happy path for new cluster creation
119-
log.Debug(ctx, "happy path for new cluster creation (not yet implemented)")
128+
return r.createClusterFromScratch(ctx, &state) // TODO: needs implementing
129+
}
130+
// else try reconciling the sts
131+
existingSts := state.statefulSet.DeepCopy()
132+
desiredSts := factory.TemplateStatefulSet() // TODO: needs implementing
133+
existingSts.Spec.Template.Spec = desiredSts.Spec.Template.Spec
134+
err := r.patchOrCreateObject(ctx, existingSts)
135+
if err != nil {
136+
return ctrl.Result{}, err
137+
}
138+
state.statefulSet = *existingSts
139+
if existingSts.Status.ReadyReplicas != *existingSts.Spec.Replicas { // TODO: this check might not be the best to check for a ready sts
140+
return ctrl.Result{}, fmt.Errorf("waiting for statefulset to become ready")
141+
}
142+
if *existingSts.Spec.Replicas > 0 {
143+
return ctrl.Result{}, fmt.Errorf("reached an impossible state (no endpoints, but active pods)")
144+
}
145+
if *instance.Spec.Replicas == 0 {
146+
// cluster successfully scaled down to zero
147+
return ctrl.Result{}, nil
120148
}
149+
return r.scaleUpFromZero(ctx, &state) // TODO: needs implementing
121150
}
122151

123152
// get status of every endpoint and member list from every endpoint

internal/controller/observables.go

Lines changed: 98 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@ package controller
22

33
import (
44
"context"
5-
// "strconv"
6-
// "strings"
5+
"strconv"
6+
"strings"
77
"sync"
88

99
"github.com/aenix-io/etcd-operator/api/v1alpha1"
10-
// "github.com/aenix-io/etcd-operator/pkg/set"
10+
"github.com/aenix-io/etcd-operator/pkg/set"
1111
clientv3 "go.etcd.io/etcd/client/v3"
1212
appsv1 "k8s.io/api/apps/v1"
1313
corev1 "k8s.io/api/core/v1"
@@ -49,15 +49,43 @@ func (o *observables) setClusterID() {
4949

5050
// inSplitbrain compares clusterID field with clusterIDs in etcdStatuses.
5151
// If more than one unique ID is reported, cluster is in splitbrain.
52+
// Also if members have different opinions on the list of members, this is
53+
// also a splitbrain.
5254
func (o *observables) inSplitbrain() bool {
55+
return o.clusterIDsAllEqual() && o.memberListsAllEqual()
56+
}
57+
58+
func (o *observables) clusterIDsAllEqual() bool {
59+
ids := set.New[uint64]()
5360
for i := range o.etcdStatuses {
5461
if o.etcdStatuses[i].endpointStatus != nil {
55-
if o.clusterID != o.etcdStatuses[i].endpointStatus.Header.ClusterId {
56-
return true
62+
ids.Add(o.etcdStatuses[i].endpointStatus.Header.ClusterId)
63+
}
64+
}
65+
return len(ids) <= 1
66+
}
67+
68+
func (o *observables) memberListsAllEqual() bool {
69+
type m struct {
70+
Name string
71+
ID uint64
72+
}
73+
memberLists := make([]set.Set[m], 0, len(o.etcdStatuses))
74+
for i := range o.etcdStatuses {
75+
if o.etcdStatuses[i].memberList != nil {
76+
memberSet := set.New[m]()
77+
for _, member := range o.etcdStatuses[i].memberList.Members {
78+
memberSet.Add(m{member.Name, member.ID})
5779
}
80+
memberLists = append(memberLists, memberSet)
81+
}
82+
}
83+
for i := range memberLists {
84+
if !memberLists[0].Equals(memberLists[i]) {
85+
return false
5886
}
5987
}
60-
return false
88+
return true
6189
}
6290

6391
// fill takes a single-endpoint client and populates the fields of etcdStatus
@@ -73,15 +101,75 @@ func (s *etcdStatus) fill(ctx context.Context, c *clientv3.Client) {
73101
wg.Wait()
74102
}
75103

76-
// TODO: make a real function
104+
func (o *observables) pvcMaxIndex() (max int) {
105+
max = -1
106+
for i := range o.pvcs {
107+
tokens := strings.Split(o.pvcs[i].Name, "-")
108+
index, err := strconv.Atoi(tokens[len(tokens)-1])
109+
if err != nil {
110+
continue
111+
}
112+
if index > max {
113+
max = index
114+
}
115+
}
116+
return max
117+
}
118+
119+
func (o *observables) endpointMaxIndex() (max int) {
120+
for i := range o.endpoints {
121+
tokens := strings.Split(o.endpoints[i], ":")
122+
if len(tokens) < 2 {
123+
continue
124+
}
125+
tokens = strings.Split(tokens[len(tokens)-2], "-")
126+
index, err := strconv.Atoi(tokens[len(tokens)-1])
127+
if err != nil {
128+
continue
129+
}
130+
if index > max {
131+
max = index
132+
}
133+
}
134+
return max
135+
}
136+
137+
// TODO: make a real function to determine the right number of replicas.
138+
// Hint: if ClientURL in the member list is absent, the member has not yet
139+
// started, but if the name field is populated, this is a member of the
140+
// initial cluster. If the name field is empty, this member has just been
141+
// added with etcdctl member add (or equivalent API call).
77142
// nolint:unused
78-
func (o *observables) desiredReplicas() int {
143+
func (o *observables) desiredReplicas() (max int) {
144+
max = -1
79145
if o.etcdStatuses != nil {
80146
for i := range o.etcdStatuses {
81147
if o.etcdStatuses[i].memberList != nil {
82-
return len(o.etcdStatuses[i].memberList.Members)
148+
for j := range o.etcdStatuses[i].memberList.Members {
149+
tokens := strings.Split(o.etcdStatuses[i].memberList.Members[j].Name, "-")
150+
index, err := strconv.Atoi(tokens[len(tokens)-1])
151+
if err != nil {
152+
continue
153+
}
154+
if index > max {
155+
max = index
156+
}
157+
}
83158
}
84159
}
85160
}
86-
return 0
161+
if max > -1 {
162+
return max + 1
163+
}
164+
165+
if epMax := o.endpointMaxIndex(); epMax > max {
166+
max = epMax
167+
}
168+
if pvcMax := o.pvcMaxIndex(); pvcMax > max {
169+
max = pvcMax
170+
}
171+
if max == -1 {
172+
return int(*o.instance.Spec.Replicas)
173+
}
174+
return max + 1
87175
}

0 commit comments

Comments
 (0)