Skip to content

Commit f226d0c

Browse files
authored
Add DataLoader to make sure cluster has minimal data (FoundationDB#1762)
* Add DataLoader to make sure cluster has minimal data
1 parent b111ed8 commit f226d0c

File tree

12 files changed

+492
-46
lines changed

12 files changed

+492
-46
lines changed

e2e/fixtures/factory.go

+6
Original file line numberDiff line numberDiff line change
@@ -824,6 +824,12 @@ func (factory *Factory) GetOperatorImage() string {
824824
return prependRegistry(factory.options.registry, factory.options.operatorImage)
825825
}
826826

827+
// GetDataLoaderImage returns the dataloader image provided via command line. If a registry was defined the registry will be
828+
// prepended.
829+
func (factory *Factory) GetDataLoaderImage() string {
830+
return prependRegistry(factory.options.registry, factory.options.dataLoaderImage)
831+
}
832+
827833
// GetSidecarImage returns the sidecar image provided via command line. If a registry was defined the registry will be
828834
// prepended.
829835
func (factory *Factory) GetSidecarImage() string {

e2e/fixtures/fdb_cluster.go

+31
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"fmt"
2626
"golang.org/x/sync/errgroup"
2727
"log"
28+
"math"
2829
"strconv"
2930
"strings"
3031
"sync"
@@ -1320,3 +1321,33 @@ func (fdbCluster *FdbCluster) UpgradeAndVerify(version string) {
13201321
gomega.Expect(fdbCluster.UpgradeCluster(version, true)).NotTo(gomega.HaveOccurred())
13211322
fdbCluster.VerifyVersion(version)
13221323
}
1324+
1325+
// EnsureTeamTrackersAreHealthy will check if the machine-readable status suggest that the team trackers are healthy
1326+
// and all data is present.
1327+
func (fdbCluster *FdbCluster) EnsureTeamTrackersAreHealthy() {
1328+
gomega.Eventually(func() bool {
1329+
for _, tracker := range fdbCluster.GetStatus().Cluster.Data.TeamTrackers {
1330+
if !tracker.State.Healthy {
1331+
return false
1332+
}
1333+
}
1334+
1335+
return true
1336+
}).WithTimeout(1 * time.Minute).WithPolling(1 * time.Second).MustPassRepeatedly(5).Should(gomega.BeTrue())
1337+
}
1338+
1339+
// EnsureTeamTrackersHaveMinReplicas will check if the machine-readable status suggest that the team trackers min_replicas
1340+
// match the expected replicas.
1341+
func (fdbCluster *FdbCluster) EnsureTeamTrackersHaveMinReplicas() {
1342+
desiredFaultTolerance := fdbCluster.GetCachedCluster().DesiredFaultTolerance()
1343+
gomega.Eventually(func() int {
1344+
minReplicas := math.MaxInt
1345+
for _, tracker := range fdbCluster.GetStatus().Cluster.Data.TeamTrackers {
1346+
if minReplicas > tracker.State.MinReplicasRemaining {
1347+
minReplicas = tracker.State.MinReplicasRemaining
1348+
}
1349+
}
1350+
1351+
return minReplicas
1352+
}).WithTimeout(1 * time.Minute).WithPolling(1 * time.Second).MustPassRepeatedly(5).Should(gomega.BeNumerically(">=", desiredFaultTolerance))
1353+
}

e2e/fixtures/fdb_data_loader.go

+273
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
/*
2+
* fdb_data_loader.go
3+
*
4+
* This source file is part of the FoundationDB open source project
5+
*
6+
* Copyright 2023 Apple Inc. and the FoundationDB project authors
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package fixtures
22+
23+
import (
24+
"bytes"
25+
"context"
26+
"errors"
27+
"github.com/onsi/gomega"
28+
"io"
29+
batchv1 "k8s.io/api/batch/v1"
30+
corev1 "k8s.io/api/core/v1"
31+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
32+
"k8s.io/apimachinery/pkg/runtime"
33+
"k8s.io/apimachinery/pkg/runtime/serializer/yaml"
34+
yamlutil "k8s.io/apimachinery/pkg/util/yaml"
35+
"sigs.k8s.io/controller-runtime/pkg/client"
36+
"text/template"
37+
"time"
38+
)
39+
40+
const (
41+
// The name of the data loader Job.
42+
dataLoaderName = "fdb-data-loader"
43+
44+
// For now we only load 2GB into the cluster, we can increase this later if we want.
45+
dataLoaderJob = `apiVersion: batch/v1
46+
kind: Job
47+
metadata:
48+
name: {{ .Name }}
49+
namespace: {{ .Namespace }}
50+
labels:
51+
app: {{ .Name }}
52+
spec:
53+
backoffLimit: 2
54+
completions: 2
55+
parallelism: 2
56+
template:
57+
spec:
58+
containers:
59+
- image: {{ .Image }}
60+
imagePullPolicy: Always
61+
name: {{ .Name }}
62+
# This configuration will load ~1GB per data loader.
63+
args:
64+
- --keys=1000000
65+
- --batch-size=50
66+
- --value-size=1000
67+
env:
68+
- name: FDB_CLUSTER_FILE
69+
value: /var/dynamic/fdb/fdb.cluster
70+
- name: FDB_TLS_CERTIFICATE_FILE
71+
value: /tmp/fdb-certs/tls.crt
72+
- name: FDB_TLS_CA_FILE
73+
value: /tmp/fdb-certs/ca.pem
74+
- name: FDB_TLS_KEY_FILE
75+
value: /tmp/fdb-certs/tls.key
76+
# FDB 7.3 adds a check for loading external client library, which doesn't work with 6.3.
77+
# Consider remove this option once 6.3 is no longer being used.
78+
- name: FDB_NETWORK_OPTION_IGNORE_EXTERNAL_CLIENT_FAILURES
79+
value: ""
80+
- name: LD_LIBRARY_PATH
81+
value: /var/dynamic/fdb/primary/lib
82+
- name: FDB_NETWORK_OPTION_TRACE_LOG_GROUP
83+
value: {{ .Name }}
84+
- name: FDB_NETWORK_OPTION_EXTERNAL_CLIENT_DIRECTORY
85+
value: /var/dynamic/fdb/libs
86+
- name: PYTHONUNBUFFERED
87+
value: "on"
88+
volumeMounts:
89+
- name: config-map
90+
mountPath: /var/dynamic-conf
91+
- name: fdb-libs
92+
mountPath: /var/dynamic/fdb
93+
- name: fdb-certs
94+
mountPath: /tmp/fdb-certs
95+
readOnly: true
96+
resources:
97+
requests:
98+
cpu: "1"
99+
memory: 4Gi
100+
initContainers:
101+
{{ range $index, $version := .SidecarVersions }}
102+
- name: foundationdb-kubernetes-init-{{ $index }}
103+
image: {{ .BaseImage }}:{{ .SidecarTag}}
104+
imagePullPolicy: Always
105+
command:
106+
- /bin/bash
107+
# This is a workaround for a change of the version schema that was never tested/supported
108+
args:
109+
- -c
110+
- echo "{{ .FDBVersion.String }}" > /var/fdb/version && runuser -u fdb -g fdb -- /entrypoint.bash --copy-library {{ .FDBVersion.Compact }} --output-dir /var/output-files/{{ .FDBVersion.Compact }} --init-mode
111+
volumeMounts:
112+
- name: fdb-libs
113+
mountPath: /var/output-files
114+
securityContext:
115+
runAsUser: 0
116+
runAsGroup: 0
117+
# Install this library in a special location to force the operator to use it as the primary library.
118+
{{ if eq .FDBVersion.Compact "7.1" }}
119+
- name: foundationdb-kubernetes-init-7-1-primary
120+
image: {{ .BaseImage }}:{{ .SidecarTag}}
121+
imagePullPolicy: {{ .ImagePullPolicy }}
122+
args:
123+
# Note that we are only copying a library, rather than copying any binaries.
124+
- "--copy-library"
125+
- "{{ .FDBVersion.Compact }}"
126+
- "--output-dir"
127+
- "/var/output-files/primary" # Note that we use primary as the subdirectory rather than specifying the FoundationDB version like we did in the other examples.
128+
- "--init-mode"
129+
volumeMounts:
130+
- name: fdb-libs
131+
mountPath: /var/output-files
132+
{{ end }}
133+
{{ end }}
134+
- image: {{ .Image }}
135+
imagePullPolicy: Always
136+
name: fdb-lib-copy
137+
command:
138+
- /bin/bash
139+
args:
140+
- -c
141+
- mkdir -p /var/dynamic/fdb/libs && {{ range $index, $version := .SidecarVersions -}} cp /var/dynamic/fdb/{{ .FDBVersion.Compact }}/lib/libfdb_c.so /var/dynamic/fdb/libs/libfdb_{{ .FDBVersion.Compact }}_c.so && {{ end }} cp /var/dynamic-conf/fdb.cluster /var/dynamic/fdb/fdb.cluster
142+
volumeMounts:
143+
- name: config-map
144+
mountPath: /var/dynamic-conf
145+
- name: fdb-libs
146+
mountPath: /var/dynamic/fdb
147+
- name: fdb-certs
148+
mountPath: /tmp/fdb-certs
149+
readOnly: true
150+
restartPolicy: Never
151+
volumes:
152+
- name: config-map
153+
configMap:
154+
name: {{ .ClusterName }}-config
155+
items:
156+
- key: cluster-file
157+
path: fdb.cluster
158+
- name: fdb-libs
159+
emptyDir: {}
160+
- name: fdb-certs
161+
secret:
162+
secretName: {{ .SecretName }}`
163+
)
164+
165+
// dataLoaderConfig represents the configuration of the Dataloader Job.
166+
type dataLoaderConfig struct {
167+
// Name of the data loader Job.
168+
Name string
169+
// Image represents the data loader image that should be used in the Job.
170+
Image string
171+
// SidecarVersions represents the sidecar configurations for different FoundationDB versions.
172+
SidecarVersions []SidecarConfig
173+
// Namespace represents the namespace for the Deployment and all associated resources
174+
Namespace string
175+
// ClusterName the name of the cluster to load data into.
176+
ClusterName string
177+
// SecretName represents the Kubernetes secret that contains the certificates for communicating with the FoundationDB
178+
// cluster.
179+
SecretName string
180+
}
181+
182+
func (factory *Factory) getDataLoaderConfig(cluster *FdbCluster) *dataLoaderConfig {
183+
return &dataLoaderConfig{
184+
Name: dataLoaderName,
185+
Image: factory.GetDataLoaderImage(),
186+
Namespace: cluster.Namespace(),
187+
SidecarVersions: factory.GetSidecarConfigs(),
188+
ClusterName: cluster.Name(),
189+
SecretName: factory.GetSecretName(),
190+
}
191+
}
192+
193+
// CreateDataLoaderIfAbsent will create the data loader for the provided cluster and load some random data into the cluster.
194+
func (factory *Factory) CreateDataLoaderIfAbsent(cluster *FdbCluster) {
195+
if !factory.options.enableDataLoading {
196+
return
197+
}
198+
199+
t, err := template.New("dataLoaderJob").Parse(dataLoaderJob)
200+
gomega.Expect(err).NotTo(gomega.HaveOccurred())
201+
buf := bytes.Buffer{}
202+
gomega.Expect(t.Execute(&buf, factory.getDataLoaderConfig(cluster))).NotTo(gomega.HaveOccurred())
203+
decoder := yamlutil.NewYAMLOrJSONDecoder(&buf, 100000)
204+
for {
205+
var rawObj runtime.RawExtension
206+
err := decoder.Decode(&rawObj)
207+
if err != nil {
208+
if errors.Is(err, io.EOF) {
209+
break
210+
}
211+
gomega.Expect(err).NotTo(gomega.HaveOccurred())
212+
}
213+
214+
obj, _, err := yaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme).
215+
Decode(rawObj.Raw, nil, nil)
216+
gomega.Expect(err).NotTo(gomega.HaveOccurred())
217+
unstructuredMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
218+
gomega.Expect(err).NotTo(gomega.HaveOccurred())
219+
unstructuredObj := &unstructured.Unstructured{Object: unstructuredMap}
220+
221+
gomega.Expect(
222+
factory.CreateIfAbsent(unstructuredObj),
223+
).NotTo(gomega.HaveOccurred())
224+
}
225+
226+
factory.WaitUntilDataLoaderIsDone(cluster)
227+
}
228+
229+
// WaitUntilDataLoaderIsDone will wait until the data loader Job has finished.
230+
func (factory *Factory) WaitUntilDataLoaderIsDone(cluster *FdbCluster) {
231+
gomega.Eventually(func() int {
232+
pods := &corev1.PodList{}
233+
gomega.Expect(
234+
factory.controllerRuntimeClient.List(
235+
context.Background(),
236+
pods,
237+
client.InNamespace(cluster.Namespace()),
238+
client.MatchingLabels(map[string]string{"job-name": dataLoaderName}),
239+
),
240+
).NotTo(gomega.HaveOccurred())
241+
242+
var runningPods int
243+
for _, pod := range pods.Items {
244+
if pod.Status.Phase == corev1.PodRunning {
245+
runningPods++
246+
}
247+
}
248+
249+
return runningPods
250+
}).WithTimeout(5 * time.Minute).WithPolling(5 * time.Second).Should(gomega.BeNumerically(">", 0))
251+
252+
// Wait for at most 15 minutes to let the data load complete.
253+
gomega.Eventually(func() corev1.ConditionStatus {
254+
job := &batchv1.Job{}
255+
gomega.Expect(
256+
factory.controllerRuntimeClient.Get(
257+
context.Background(),
258+
client.ObjectKey{
259+
Namespace: cluster.Namespace(),
260+
Name: dataLoaderName,
261+
},
262+
job),
263+
).NotTo(gomega.HaveOccurred())
264+
265+
for _, condition := range job.Status.Conditions {
266+
if condition.Type == batchv1.JobComplete {
267+
return condition.Status
268+
}
269+
}
270+
271+
return corev1.ConditionUnknown
272+
}).WithTimeout(15 * time.Minute).WithPolling(5 * time.Second).Should(gomega.Equal(corev1.ConditionTrue))
273+
}

e2e/fixtures/fixtures.go

+1-5
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,8 @@ func CheckInvariant(
7070
threshold time.Duration,
7171
f func() error,
7272
) error {
73-
err := f()
74-
if err != nil {
75-
return fmt.Errorf("invariant %s not true at beginning of test: %w", invariantName, err)
76-
}
7773
var waitGroup sync.WaitGroup
78-
ticker := time.NewTicker(100 * time.Millisecond)
74+
ticker := time.NewTicker(250 * time.Millisecond)
7975
var last error
8076
testFailed := false
8177
quit := make(chan struct{})

e2e/fixtures/options.go

+16-2
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,15 @@ type FactoryOptions struct {
3838
fdbImage string // TODO (johscheuer): Make this optional if we use the default
3939
sidecarImage string // TODO (johscheuer): Make this optional if we use the default
4040
operatorImage string
41+
dataLoaderImage string
4142
registry string
4243
fdbVersion string
4344
username string
4445
storageClass string
4546
upgradeString string
4647
cloudProvider string
4748
enableChaosTests bool
49+
enableDataLoading bool
4850
cleanup bool
4951
featureOperatorDNS bool
5052
featureOperatorLocalities bool
@@ -133,17 +135,29 @@ func (options *FactoryOptions) BindFlags(fs *flag.FlagSet) {
133135
"",
134136
"defines the cloud provider used for the Kubernetes cluster, for defining cloud provider specific behaviour. Currently only Kind is support.",
135137
)
138+
fs.StringVar(
139+
&options.dataLoaderImage,
140+
"data-loader-image",
141+
"foundationdb/fdb-data-loader:latest",
142+
"defines the data loader image that should be used for testing",
143+
)
136144
fs.BoolVar(
137145
&options.enableChaosTests,
138146
"enable-chaos-tests",
139147
true,
140-
"defines if the chaos tests should be enabled in operator related tests cases",
148+
"defines if the chaos tests should be enabled in operator related tests cases.",
149+
)
150+
fs.BoolVar(
151+
&options.enableDataLoading,
152+
"enable-data-load",
153+
true,
154+
"defines if the created FDB cluster should be loaded with some random data.",
141155
)
142156
fs.BoolVar(
143157
&options.cleanup,
144158
"cleanup",
145159
true,
146-
"defines if the test namespace should be removed after the test run",
160+
"defines if the test namespace should be removed after the test run.",
147161
)
148162
fs.BoolVar(
149163
&options.featureOperatorUnifiedImage,

0 commit comments

Comments
 (0)