Skip to content

Commit 87ae25f

Browse files
Replication/failover simulation operations: startWorkflow, failover and validate (#6655)
1 parent 5d8add7 commit 87ae25f

20 files changed

+744
-344
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ test_eventsV2_xdc
2626
test_eventsV2_xdc.log
2727
matching-simulator-output/
2828
replication-simulator-output/
29+
worker*.log
2930

3031
# Executables produced by cadence repo
3132
/cadence

codecov.yml

+1
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ ignore:
6767
- "service/matching/service.go"
6868
- "service/matching/tasklist/testing.go"
6969
- "service/worker/service.go"
70+
- "simulation/**"
7071
- "testflags/**"
7172
- "tools/common/schema/test/**"
7273
- "tools/linter/**"

common/cache/domainCache.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -675,16 +675,17 @@ func (c *DefaultDomainCache) triggerDomainChangePrepareCallbackLocked() {
675675
}
676676
}
677677

678-
func (c *DefaultDomainCache) triggerDomainChangeCallbackLocked(
679-
nextDomains []*DomainCacheEntry,
680-
) {
681-
678+
func (c *DefaultDomainCache) triggerDomainChangeCallbackLocked(nextDomains []*DomainCacheEntry) {
682679
sw := c.scope.StartTimer(metrics.DomainCacheCallbacksLatency)
683680
defer sw.Stop()
684681

685-
for _, callback := range c.callbacks {
682+
c.logger.Info("Domain change callbacks are going to triggered", tag.Number(int64(len(nextDomains))))
683+
for i, callback := range c.callbacks {
684+
c.logger.Info("Domain cache change callback started", tag.Number(int64(i)))
686685
callback(nextDomains)
686+
c.logger.Info("Domain cache change callback completed", tag.Number(int64(i)))
687687
}
688+
c.logger.Info("Domain change callbacks are completed", tag.Number(int64(len(nextDomains))))
688689
}
689690

690691
func (c *DefaultDomainCache) buildEntryFromRecord(

common/domain/handler.go

+1
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,7 @@ func (d *handlerImpl) UpdateDomain(
504504
now := d.timeSource.Now()
505505
// Check the failover cool down time
506506
if lastUpdatedTime.Add(d.config.FailoverCoolDown(info.Name)).After(now) {
507+
d.logger.Debugf("Domain was last updated at %v, failoverCoolDown: %v, current time: %v.", lastUpdatedTime, d.config.FailoverCoolDown(info.Name), now)
507508
return nil, errDomainUpdateTooFrequent
508509
}
509510

common/domain/replication_queue.go

+1
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ func (q *replicationQueueImpl) GetReplicationMessages(
165165
replicationTasks = append(replicationTasks, thrift.ToReplicationTask(&replicationTask))
166166
}
167167

168+
q.logger.Debugf("Returning %d domain replication tasks. lastMessageID: %d", len(replicationTasks), lastMessageID)
168169
return replicationTasks, lastMessageID, nil
169170
}
170171

common/domain/replication_queue_test.go

+15-14
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"go.uber.org/mock/gomock"
3535

3636
"github.com/uber/cadence/common"
37+
"github.com/uber/cadence/common/log/testlogger"
3738
"github.com/uber/cadence/common/persistence"
3839
"github.com/uber/cadence/common/types"
3940
)
@@ -67,7 +68,7 @@ func TestReplicationQueueImpl_Start(t *testing.T) {
6768
t.Run(tt.name, func(t *testing.T) {
6869
ctrl := gomock.NewController(t)
6970
mockQueue := persistence.NewMockQueueManager(ctrl)
70-
rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil).(*replicationQueueImpl)
71+
rq := NewReplicationQueue(mockQueue, "testCluster", nil, testlogger.New(t)).(*replicationQueueImpl)
7172
atomic.StoreInt32(&rq.status, tt.initialStatus)
7273

7374
rq.Start()
@@ -111,7 +112,7 @@ func TestReplicationQueueImpl_Stop(t *testing.T) {
111112
t.Run(tt.name, func(t *testing.T) {
112113
ctrl := gomock.NewController(t)
113114
mockQueue := persistence.NewMockQueueManager(ctrl)
114-
rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil).(*replicationQueueImpl)
115+
rq := NewReplicationQueue(mockQueue, "testCluster", nil, testlogger.New(t)).(*replicationQueueImpl)
115116
atomic.StoreInt32(&rq.status, tt.initialStatus)
116117

117118
rq.Stop()
@@ -158,7 +159,7 @@ func TestReplicationQueueImpl_Publish(t *testing.T) {
158159
t.Run(tt.name, func(t *testing.T) {
159160
ctrl := gomock.NewController(t)
160161
mockQueue := persistence.NewMockQueueManager(ctrl)
161-
rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil)
162+
rq := NewReplicationQueue(mockQueue, "testCluster", nil, testlogger.New(t))
162163
tt.setupMock(mockQueue)
163164
err := rq.Publish(context.Background(), tt.task)
164165
if tt.wantErr {
@@ -199,7 +200,7 @@ func TestReplicationQueueImpl_PublishToDLQ(t *testing.T) {
199200
t.Run(tt.name, func(t *testing.T) {
200201
ctrl := gomock.NewController(t)
201202
mockQueue := persistence.NewMockQueueManager(ctrl)
202-
rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil)
203+
rq := NewReplicationQueue(mockQueue, "testCluster", nil, testlogger.New(t))
203204
tt.setupMock(mockQueue)
204205
err := rq.PublishToDLQ(context.Background(), tt.task)
205206
if tt.wantErr {
@@ -275,7 +276,7 @@ func TestGetReplicationMessages(t *testing.T) {
275276
ctrl := gomock.NewController(t)
276277
mockQueueManager := persistence.NewMockQueueManager(ctrl)
277278
tc.setupMocks(mockQueueManager)
278-
replicationQueue := NewReplicationQueue(mockQueueManager, "testCluster", nil, nil)
279+
replicationQueue := NewReplicationQueue(mockQueueManager, "testCluster", nil, testlogger.New(t))
279280
tasks, lastID, err := replicationQueue.GetReplicationMessages(context.Background(), 0, 10)
280281

281282
if tc.expectError {
@@ -322,7 +323,7 @@ func TestUpdateAckLevel(t *testing.T) {
322323
ctrl := gomock.NewController(t)
323324
mockQueue := persistence.NewMockQueueManager(ctrl)
324325

325-
rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil)
326+
rq := NewReplicationQueue(mockQueue, "testCluster", nil, testlogger.New(t))
326327
tt.setupMock(mockQueue)
327328
err := rq.UpdateAckLevel(context.Background(), tt.lastID, tt.cluster)
328329
if tt.wantErr {
@@ -362,7 +363,7 @@ func TestReplicationQueueImpl_GetAckLevels(t *testing.T) {
362363
t.Run(tt.name, func(t *testing.T) {
363364
ctrl := gomock.NewController(t)
364365
mockQueue := persistence.NewMockQueueManager(ctrl)
365-
rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil)
366+
rq := NewReplicationQueue(mockQueue, "testCluster", nil, testlogger.New(t))
366367
tt.setupMock(mockQueue)
367368
got, err := rq.GetAckLevels(context.Background())
368369
if tt.wantErr {
@@ -415,7 +416,7 @@ func TestGetMessagesFromDLQ(t *testing.T) {
415416
t.Run(tt.name, func(t *testing.T) {
416417
ctrl := gomock.NewController(t)
417418
mockQueue := persistence.NewMockQueueManager(ctrl)
418-
rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil)
419+
rq := NewReplicationQueue(mockQueue, "testCluster", nil, testlogger.New(t))
419420

420421
if !tt.wantErr {
421422
encodedData, _ := mockEncodeReplicationTask(tt.taskID)
@@ -468,7 +469,7 @@ func TestUpdateDLQAckLevel(t *testing.T) {
468469
t.Run(tt.name, func(t *testing.T) {
469470
ctrl := gomock.NewController(t)
470471
mockQueue := persistence.NewMockQueueManager(ctrl)
471-
rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil)
472+
rq := NewReplicationQueue(mockQueue, "testCluster", nil, testlogger.New(t))
472473
tt.setupMock(mockQueue)
473474
err := rq.UpdateDLQAckLevel(context.Background(), tt.lastID)
474475
if tt.wantErr {
@@ -508,7 +509,7 @@ func TestGetDLQAckLevel(t *testing.T) {
508509
t.Run(tt.name, func(t *testing.T) {
509510
ctrl := gomock.NewController(t)
510511
mockQueue := persistence.NewMockQueueManager(ctrl)
511-
rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil)
512+
rq := NewReplicationQueue(mockQueue, "testCluster", nil, testlogger.New(t))
512513
tt.setupMock(mockQueue)
513514
got, err := rq.GetDLQAckLevel(context.Background())
514515
if tt.wantErr {
@@ -553,7 +554,7 @@ func TestRangeDeleteMessagesFromDLQ(t *testing.T) {
553554
t.Run(tt.name, func(t *testing.T) {
554555
ctrl := gomock.NewController(t)
555556
mockQueue := persistence.NewMockQueueManager(ctrl)
556-
rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil)
557+
rq := NewReplicationQueue(mockQueue, "testCluster", nil, testlogger.New(t))
557558
tt.setupMock(mockQueue)
558559
err := rq.RangeDeleteMessagesFromDLQ(context.Background(), tt.firstID, tt.lastID)
559560
if tt.wantErr {
@@ -594,7 +595,7 @@ func TestDeleteMessageFromDLQ(t *testing.T) {
594595
t.Run(tt.name, func(t *testing.T) {
595596
ctrl := gomock.NewController(t)
596597
mockQueue := persistence.NewMockQueueManager(ctrl)
597-
rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil)
598+
rq := NewReplicationQueue(mockQueue, "testCluster", nil, testlogger.New(t))
598599
tt.setupMock(mockQueue)
599600
err := rq.DeleteMessageFromDLQ(context.Background(), tt.messageID)
600601
if tt.wantErr {
@@ -643,7 +644,7 @@ func TestGetDLQSize(t *testing.T) {
643644
ctrl := gomock.NewController(t)
644645
mockQueueManager := persistence.NewMockQueueManager(ctrl)
645646
tt.setupMock(mockQueueManager)
646-
q := &replicationQueueImpl{queue: mockQueueManager}
647+
q := &replicationQueueImpl{queue: mockQueueManager, logger: testlogger.New(t)}
647648
size, err := q.GetDLQSize(context.Background())
648649
if tt.wantErr {
649650
assert.Error(t, err)
@@ -698,7 +699,7 @@ func TestPurgeAckedMessages(t *testing.T) {
698699
ctrl := gomock.NewController(t)
699700
mockQueueManager := persistence.NewMockQueueManager(ctrl)
700701
tt.setupMock(mockQueueManager)
701-
q := &replicationQueueImpl{queue: mockQueueManager}
702+
q := &replicationQueueImpl{queue: mockQueueManager, logger: testlogger.New(t)}
702703
err := q.purgeAckedMessages()
703704

704705
if tt.wantErr {
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
# This file is used as dynamicconfig override for "default" replication simulation scenario configured via host/testdata/replication_simulation_default.yaml
2-
2+
system.writeVisibilityStoreName:
3+
- value: "db"
4+
system.readVisibilityStoreName:
5+
- value: "db"
36
history.replicatorTaskBatchSize:
4-
- value: 25
5-
constraints: {}
7+
- value: 25
8+
constraints: {}
9+
frontend.failoverCoolDown:
10+
- value: 5s
11+
history.ReplicationTaskProcessorStartWait: # default is 5s. repl task processor sleeps this much before processing received messages.
12+
- value: 10ms

docker/buildkite/docker-compose-local-replication-simulation.yml

+80-8
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,17 @@ services:
6666
- "7002:7002" # history prometheus
6767
- "7003:7003" # worker prometheus
6868
environment:
69+
- DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/replication_simulation_${SCENARIO}.yml
70+
- "CLUSTER_REDIRECT_POLICY=selected-apis-forwarding"
6971
- "BIND_ON_IP=0.0.0.0"
7072
- "PRIMARY_FRONTEND_SERVICE=cadence-cluster0"
7173
- "SECONDARY_FRONTEND_SERVICE=cadence-cluster1"
7274
- "CASSANDRA_SEEDS=cassandra"
73-
- "DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/replication_simulation_${SCENARIO}.yml"
7475
- "ENABLE_GLOBAL_DOMAIN=true"
7576
- "KEYSPACE=cadence_primary"
7677
- "VISIBILITY_KEYSPACE=cadence_visibility_primary"
78+
- "LOG_LEVEL=debug"
79+
- "MATCHING_LOG_EVENTS=true"
7780
- "PROMETHEUS_ENDPOINT_0=0.0.0.0:7000" # frontend scrape endpoint
7881
- "PROMETHEUS_ENDPOINT_1=0.0.0.0:7001" # matching scrape endpoint
7982
- "PROMETHEUS_ENDPOINT_2=0.0.0.0:7002" # history scrape endpoint
@@ -109,15 +112,18 @@ services:
109112
- "8002:8002" # history prometheus
110113
- "8003:8003" # worker prometheus
111114
environment:
115+
- DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/replication_simulation_${SCENARIO}.yml
116+
- "CLUSTER_REDIRECT_POLICY=selected-apis-forwarding"
112117
- "BIND_ON_IP=0.0.0.0"
113118
- "PRIMARY_FRONTEND_SERVICE=cadence-cluster0"
114119
- "SECONDARY_FRONTEND_SERVICE=cadence-cluster1"
115120
- "CASSANDRA_SEEDS=cassandra"
116-
- "DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/replication_simulation_${SCENARIO}.yml"
117121
- "IS_NOT_PRIMARY=true"
118122
- "ENABLE_GLOBAL_DOMAIN=true"
119123
- "KEYSPACE=cadence_secondary"
120124
- "VISIBILITY_KEYSPACE=cadence_visibility_secondary"
125+
- "LOG_LEVEL=debug"
126+
- "MATCHING_LOG_EVENTS=true"
121127
- "PROMETHEUS_ENDPOINT_0=0.0.0.0:8000" # frontend scrape endpoint
122128
- "PROMETHEUS_ENDPOINT_1=0.0.0.0:8001" # matching scrape endpoint
123129
- "PROMETHEUS_ENDPOINT_2=0.0.0.0:8002" # history scrape endpoint
@@ -132,7 +138,7 @@ services:
132138
aliases:
133139
- cadence-cluster1
134140

135-
cadence-web-cluster0:
141+
cadence-web0:
136142
image: ubercadence/web:latest
137143
environment:
138144
- "CADENCE_TCHANNEL_PEERS=cadence-cluster0:7933"
@@ -143,9 +149,9 @@ services:
143149
networks:
144150
services-network:
145151
aliases:
146-
- cadence-web-cluster0
152+
- cadence-web0
147153

148-
cadence-web-cluster1:
154+
cadence-web1:
149155
image: ubercadence/web:latest
150156
environment:
151157
- "CADENCE_TCHANNEL_PEERS=cadence-cluster1:7933"
@@ -156,7 +162,69 @@ services:
156162
networks:
157163
services-network:
158164
aliases:
159-
- cadence-web-cluster0
165+
- cadence-web1
166+
167+
cadence-worker0:
168+
build:
169+
context: ../../
170+
dockerfile: ./docker/buildkite/Dockerfile
171+
working_dir: /cadence/simulation/replication/worker/cmd
172+
command:
173+
- /bin/sh
174+
- -e
175+
- -c
176+
- >
177+
go run *.go --cluster cluster0 | tee worker0.log
178+
depends_on:
179+
cadence-cluster0:
180+
condition: service_started
181+
cadence-cluster1:
182+
condition: service_started
183+
healthcheck:
184+
test: ["CMD", "curl", "-f", "http://localhost:6060/health"]
185+
interval: 10s
186+
timeout: 2s
187+
retries: 20
188+
volumes:
189+
- ../../:/cadence
190+
- /cadence/.build/ # ensure we don't mount the build directory
191+
- /cadence/.bin/ # ensure we don't mount the bin directory
192+
- ../../simulation/replication/testdata/:/cadence/simulation/replication/worker/cmd/testdata/
193+
networks:
194+
services-network:
195+
aliases:
196+
- cadence-worker0
197+
198+
cadence-worker1:
199+
build:
200+
context: ../../
201+
dockerfile: ./docker/buildkite/Dockerfile
202+
working_dir: /cadence/simulation/replication/worker/cmd
203+
command:
204+
- /bin/sh
205+
- -e
206+
- -c
207+
- >
208+
go run *.go --cluster cluster1 | tee worker1.log
209+
depends_on:
210+
cadence-cluster0:
211+
condition: service_started
212+
cadence-cluster1:
213+
condition: service_started
214+
healthcheck:
215+
test: ["CMD", "curl", "-f", "http://localhost:6060/health"]
216+
interval: 10s
217+
timeout: 2s
218+
retries: 20
219+
volumes:
220+
- ../../:/cadence
221+
- /cadence/.build/ # ensure we don't mount the build directory
222+
- /cadence/.bin/ # ensure we don't mount the bin directory
223+
- ../../simulation/replication/testdata/:/cadence/simulation/replication/worker/cmd/testdata/
224+
networks:
225+
services-network:
226+
aliases:
227+
- cadence-worker1
160228

161229
replication-simulator:
162230
build:
@@ -176,11 +244,15 @@ services:
176244
depends_on:
177245
cadence-cluster0:
178246
condition: service_started
247+
cadence-worker0:
248+
condition: service_started
249+
cadence-web0:
250+
condition: service_started
179251
cadence-cluster1:
180252
condition: service_started
181-
cadence-web-cluster0:
253+
cadence-worker1:
182254
condition: service_started
183-
cadence-web-cluster1:
255+
cadence-web1:
184256
condition: service_started
185257
grafana:
186258
condition: service_started

docker/config_template.yaml

+5-1
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,11 @@ kafka:
302302
dlq-topic: {{ default .Env.VISIBILITY_NAME "cadence-visibility-dev" }}-dlq
303303

304304
publicClient:
305-
hostPort: {{ default .Env.FRONTEND_SERVICE "cadence" }}:{{ default .Env.GRPC_FRONTEND_PORT "7833" }}
305+
{{- if .Env.IS_NOT_PRIMARY }}
306+
hostPort: {{ default .Env.SECONDARY_FRONTEND_SERVICE "cadence" }}:{{ default .Env.FRONTEND_PORT "7833" }}
307+
{{- else }}
308+
hostPort: {{ default .Env.PRIMARY_FRONTEND_SERVICE "cadence" }}:{{ default .Env.FRONTEND_PORT "7833" }}
309+
{{- end }}
306310

307311
dynamicconfig:
308312
client: filebased

docker/docker-compose-multiclusters-cass-mysql-es.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ services:
8282
- "ES_SEEDS=elasticsearch"
8383
- "KAFKA_SEEDS=kafka"
8484
- "VISIBILITY_NAME=cadence-visibility-primary"
85-
- "FRONTEND_SERVICE=cadence"
85+
- "PRIMARY_FRONTEND_SERVICE=cadence"
8686
depends_on:
8787
cassandra:
8888
condition: service_healthy
@@ -125,7 +125,7 @@ services:
125125
- "ES_SEEDS=elasticsearch"
126126
- "KAFKA_SEEDS=kafka"
127127
- "VISIBILITY_NAME=cadence-visibility-secondary"
128-
- "FRONTEND_SERVICE=cadence-secondary"
128+
- "SECONDARY_FRONTEND_SERVICE=cadence-secondary"
129129
depends_on:
130130
- mysql
131131
- prometheus

0 commit comments

Comments
 (0)