Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
219 commits
Select commit Hold shift + click to select a range
2de12d8
feat(shard distributor): add shard key helpers and metrics state
AndreasHolt Oct 19, 2025
5d95067
feat(shard distributor): persist shard metrics in etcd store
AndreasHolt Oct 19, 2025
6e57536
fix(shard distributor): update LastMoveTime in the case where a shard…
AndreasHolt Oct 19, 2025
595d320
test(shard distributor): add tests for shard metrics
AndreasHolt Oct 19, 2025
d9ba54d
fix(shard distributor): modify comment
AndreasHolt Oct 19, 2025
32d2ecd
fix(shard distributor): add atomic check to prevent metrics race
AndreasHolt Oct 19, 2025
b624a00
fix(shard distributor): apply shard metric updates in a second phase …
AndreasHolt Oct 19, 2025
aad7b2e
feat(shard distributor): move shard metric updates out of AssignShard…
AndreasHolt Oct 19, 2025
6360f8a
fix(shard distributor): keep NamespaceState revisions tied to assignm…
AndreasHolt Oct 20, 2025
1536d0a
refactor(shard distributor): use shard cache and clock for preparing …
AndreasHolt Oct 22, 2025
f316fbf
test(shard distributor): BuildShardPrefix, BuildShardKey, ParseShardKey
AndreasHolt Oct 22, 2025
4524da9
feat(shard distributor): simplify shard metrics updates
AndreasHolt Oct 23, 2025
126f725
refactor(shard distributor): ShardMetrics renamed to ShardStatistics.…
AndreasHolt Oct 24, 2025
cc53f68
test(shard distributor): small changes to shard key tests s.t. they l…
AndreasHolt Oct 25, 2025
733bbcb
fix(shard distributor): no longer check for key type ShardStatisticsK…
AndreasHolt Oct 25, 2025
6816b8e
refactor(shard distributor): found a place where I forgot to rename t…
AndreasHolt Oct 27, 2025
f97e0cf
fix(shard distributor): move non-exported helpers to end of file to f…
AndreasHolt Oct 27, 2025
513e88c
feat(shard distributor): clean up the shard statistics
AndreasHolt Oct 29, 2025
9833525
test(shard distributor): add test case for when shard stats are deleted
AndreasHolt Oct 29, 2025
0332fe5
fix(shard distributor): add mapping (new metric)
AndreasHolt Oct 29, 2025
d5a13d9
feat(shard distributor): retain shard stats while shards are within h…
AndreasHolt Oct 30, 2025
634bc02
feat: function to update shard statistics from heartbeat (currently n…
AndreasHolt Oct 27, 2025
812e854
test(shard distributor): add tests to verify statistics are updated a…
AndreasHolt Oct 27, 2025
b9813e7
feat(shard distributor): calculate smoothed load (ewma) using the Sha…
AndreasHolt Oct 27, 2025
dfb7448
fix(shard distributor): log invalid shard load
AndreasHolt Oct 27, 2025
36ec08f
chore: added logger warning and simplified ewma calculation
Theis-Mathiassen Nov 2, 2025
38a6e81
Merge branch 'master' into heartbeat-shard-statistics
Theis-Mathiassen Nov 6, 2025
af733e6
fix: remove duplicate test introduced in merge
AndreasHolt Nov 6, 2025
a52e86f
chore: consistent error checking, and rename function
Theis-Mathiassen Nov 11, 2025
abfc80e
chore: added decompress to unmarshal
Theis-Mathiassen Nov 11, 2025
df0feaf
feat(shard distributor): persist shard metrics in etcd store
AndreasHolt Oct 19, 2025
8546a26
feat(shard distributor): move shard metric updates out of AssignShard…
AndreasHolt Oct 19, 2025
dde87ef
fix(shard distributor): keep NamespaceState revisions tied to assignm…
AndreasHolt Oct 20, 2025
415e80c
refactor(shard distributor): use shard cache and clock for preparing …
AndreasHolt Oct 22, 2025
c67d5c3
feat(shard distributor): simplify shard metrics updates
AndreasHolt Oct 23, 2025
9ffcefb
refactor(shard distributor): ShardMetrics renamed to ShardStatistics.…
AndreasHolt Oct 24, 2025
cc769bf
refactor(shard distributor): found a place where I forgot to rename t…
AndreasHolt Oct 27, 2025
8c22663
fix(shard distributor): move non-exported helpers to end of file to f…
AndreasHolt Oct 27, 2025
5ac3c5d
test(shard distributor): add test case for when shard stats are deleted
AndreasHolt Oct 29, 2025
3973b82
feat(shard distributor): retain shard stats while shards are within h…
AndreasHolt Oct 30, 2025
3830d5e
feat: function to update shard statistics from heartbeat (currently n…
AndreasHolt Oct 27, 2025
443c0b1
test(shard distributor): add tests to verify statistics are updated a…
AndreasHolt Oct 27, 2025
9d159e7
feat(shard distributor): calculate smoothed load (ewma) using the Sha…
AndreasHolt Oct 27, 2025
18e63b7
fix(shard distributor): log invalid shard load
AndreasHolt Oct 27, 2025
e08a286
chore: added logger warning and simplified ewma calculation
Theis-Mathiassen Nov 2, 2025
08eb635
fix: remove duplicate test introduced in merge
AndreasHolt Nov 6, 2025
f63664a
chore: consistent error checking, and rename function
Theis-Mathiassen Nov 11, 2025
10e2ffa
chore: added decompress to unmarshal
Theis-Mathiassen Nov 11, 2025
8c6b0c8
chore: removed an old struct that appeared during rebase
Theis-Mathiassen Nov 11, 2025
158e030
feat(shard distributor): throttle shard-stat writes
AndreasHolt Nov 13, 2025
dd45ff0
Merge branch 'heartbeat-shard-statistics' of github.com:AndreasHolt/c…
AndreasHolt Nov 13, 2025
05e0d1d
fix(shard distributor): linter error
AndreasHolt Nov 13, 2025
e0779ec
feat(shard distributor): decouple shard stats write-throttling decisi…
AndreasHolt Nov 18, 2025
9546f24
Merge branch 'master' into heartbeat-shard-statistics
AndreasHolt Nov 19, 2025
db70702
fix(shard-distributor): inverted condition in shard stats cleanup loop
AndreasHolt Nov 19, 2025
481f9c6
chore(shard-distributor): did some formatting, and use current load i…
Theis-Mathiassen Nov 19, 2025
f754dd6
Merge branch 'master' into heartbeat-shard-statistics
AndreasHolt Dec 2, 2025
3366828
fix(shard distributor): decouple shard assignment from stats writes
AndreasHolt Dec 2, 2025
39381b9
feat: added initial load balancing, during rebalancing
Theis-Mathiassen Nov 28, 2025
ace9ddc
fix: logical errors and error handling
Theis-Mathiassen Dec 1, 2025
ecc8ef6
fix: a logical flaw that allowed the move budget to be 0
Theis-Mathiassen Dec 2, 2025
db4cb6d
test: added multiple test for load_balance
Theis-Mathiassen Dec 2, 2025
1c01604
feat: add new ExecutorKeyType for stats, add case to GetState and rem…
AndreasHolt Dec 3, 2025
e3294b9
feat: update etcdstore.go to support new way of storing stats
AndreasHolt Dec 4, 2025
915da0f
test: persistence of new stats
AndreasHolt Dec 4, 2025
e3bd964
test: update test, remove batching
AndreasHolt Dec 4, 2025
352df29
chore: ExecutorShardStatisticsKey is not significant
AndreasHolt Dec 4, 2025
a22908e
fix: linter
AndreasHolt Dec 4, 2025
49ed5cc
chore: delete shard key related helpers and tests
AndreasHolt Dec 4, 2025
1ddad41
chore: add logs to cleanup
AndreasHolt Dec 4, 2025
c0bc604
fix: comment
AndreasHolt Dec 5, 2025
d761bdc
fix(shard distributor): remove heartbeat write cooldown
AndreasHolt Dec 7, 2025
d1e2df5
chore: use current assignment in finding shards
Theis-Mathiassen Dec 8, 2025
d6ec929
Merge branch 'executor-bug' into load_balance
Theis-Mathiassen Dec 8, 2025
2e81f28
chore: remove debug logs
AndreasHolt Dec 8, 2025
e36f2a1
CI rerun
AndreasHolt Dec 8, 2025
3753640
fix: remove "len(ops) == 0" check
AndreasHolt Dec 8, 2025
e11b0d9
Merge remote-tracking branch 'origin/stats-etcd-refactor' into heartb…
Theis-Mathiassen Dec 8, 2025
b88ff69
Merge branch 'master' into heartbeat-shard-statistics
Theis-Mathiassen Dec 9, 2025
e943951
feat: changed heartbeat record statistics in new format
Theis-Mathiassen Dec 9, 2025
03a655a
fix: potential fix to smothed load not working as expected
AndreasHolt Dec 9, 2025
1c0b0fb
feat: implemented getting shard statistics, using cache
Theis-Mathiassen Dec 9, 2025
84976c3
test: added and altered tests to work with statistics from cache
Theis-Mathiassen Dec 9, 2025
c779090
Merge branch 'heartbeat-shard-statistics' of github.com:AndreasHolt/c…
Theis-Mathiassen Dec 9, 2025
cb27553
feat: locks and optimization
AndreasHolt Dec 10, 2025
746095a
chore: comments
AndreasHolt Dec 10, 2025
0ca3bda
chore: added comment describing a complicated function
Theis-Mathiassen Dec 10, 2025
bfd1973
Merge branch 'heartbeat-shard-statistics' of github.com:AndreasHolt/c…
AndreasHolt Dec 10, 2025
9605a66
Merge branch 'heartbeat-shard-statistics' into load_balance
Theis-Mathiassen Dec 10, 2025
7975639
Merge remote-tracking branch 'origin/master' into heartbeat-shard-sta…
Theis-Mathiassen Dec 10, 2025
cf6d55d
Merge branch 'heartbeat-shard-statistics' into load_balance
Theis-Mathiassen Dec 10, 2025
ece4cd4
chore: refactored a function, only calling 1 other function
Theis-Mathiassen Dec 10, 2025
590f8ab
Merge branch 'heartbeat-shard-statistics' into load_balance
Theis-Mathiassen Dec 10, 2025
7b82b9e
chore: set LastMoveTime to zero for new shards
AndreasHolt Dec 10, 2025
daf4aa4
Merge branches 'heartbeat-shard-statistics' and 'heartbeat-shard-stat…
AndreasHolt Dec 10, 2025
863424b
Merge branch 'heartbeat-shard-statistics' into load_balance
Theis-Mathiassen Dec 10, 2025
51e9480
fix: event type
AndreasHolt Dec 10, 2025
90cfc58
Merge branch 'master' into heartbeat-shard-statistics
Theis-Mathiassen Dec 11, 2025
8daa183
chore: removed unused test helper functions causing build errors
Theis-Mathiassen Dec 11, 2025
c03c9b3
chore: consistent naming
AndreasHolt Dec 11, 2025
4aa7ab2
Merge branch 'heartbeat-shard-statistics' of github.com:AndreasHolt/c…
AndreasHolt Dec 11, 2025
d2f2cfd
Merge branch 'heartbeat-shard-statistics' into load_balance
AndreasHolt Dec 11, 2025
c090d77
fix: added exepected call to get shardowner now required
Theis-Mathiassen Dec 11, 2025
a51de1e
fix: guard to prevent division by 0 in edge case
AndreasHolt Dec 11, 2025
ff95bdc
chore: delete rev guard for now, as we already have checks and load d…
AndreasHolt Dec 11, 2025
467e39d
fix(shard-distributor): keep periodic rebalance and loads accurate
AndreasHolt Dec 11, 2025
8938199
chore: return error directly, reference ewma as smoothed load
AndreasHolt Dec 11, 2025
8ae5baa
chore: Removed no longer needed statistics from test, and magic number
Theis-Mathiassen Dec 11, 2025
2a7e86d
Merge branch 'heartbeat-shard-statistics' of github.com:AndreasHolt/c…
Theis-Mathiassen Dec 11, 2025
9a83dfe
feat: initial placement of ephemeral shard. Prefer active executors w…
AndreasHolt Dec 11, 2025
56051e5
fix: initial LastMoveTime to 0
AndreasHolt Dec 11, 2025
0df27c2
feat: metrics, lb changes
AndreasHolt Dec 11, 2025
939a5e7
feat: plan multiple moves per cycle (within budget) with early stop i…
AndreasHolt Dec 11, 2025
d388ea1
tests: load balancing and initial ephemeral placement
AndreasHolt Dec 11, 2025
b939a5d
refactor: config values for move budget and hysteresis bands
AndreasHolt Dec 11, 2025
f81fe7a
chore: cleanup
AndreasHolt Dec 11, 2025
938d553
feat: loadbalance.go file for lb related code
AndreasHolt Dec 12, 2025
1558cd4
chore: moved large switch case into separate function
Theis-Mathiassen Dec 12, 2025
5a9404c
chore: refactored GetExecutorStatistics, fixed some concurrency probl…
Theis-Mathiassen Dec 12, 2025
afd2755
chore: moved statistics relevant data to separate struct
Theis-Mathiassen Dec 12, 2025
ba8b858
refactor: separate struct for config vals
AndreasHolt Dec 12, 2025
fef9cb8
chore: moved and renamed ewmasmoothload calculation
Theis-Mathiassen Dec 12, 2025
aa9553e
chore: renamed mutable variable to something more descriptive, and ma…
Theis-Mathiassen Dec 12, 2025
9e944e5
chore: returning error from applyStatistics as we now treat statistic…
Theis-Mathiassen Dec 12, 2025
718c161
chore: responding on applyShardStatistics now returning error
Theis-Mathiassen Dec 12, 2025
b38fa88
chore: continues best effort to update statistics, and return multipl…
Theis-Mathiassen Dec 12, 2025
77b754a
feat: cost-benefit for move
AndreasHolt Dec 12, 2025
3f0e893
chore: renamed recordShardStatistics in order to reflect what is happ…
Theis-Mathiassen Dec 12, 2025
0ae823b
chore: moved the load check to calculate smoothload function
Theis-Mathiassen Dec 12, 2025
4f649dd
chore: extract statistic update from etcdstore
Theis-Mathiassen Dec 12, 2025
ad01a8a
refactor: move functions
AndreasHolt Dec 12, 2025
eba5943
chore: added a timesource to namespaceShardToExecutor and ShardToExec…
Theis-Mathiassen Dec 12, 2025
e9461bf
chore: refactored refreshExecutorState to adhere closer to MVC archit…
Theis-Mathiassen Dec 12, 2025
2e51154
Merge branch 'master' into heartbeat-shard-statistics
Theis-Mathiassen Dec 12, 2025
e97e9b9
feat: escape hatch if no destinations but large imbalance
AndreasHolt Dec 13, 2025
5cf8ae0
Merge remote-tracking branch 'origin/heartbeat-shard-statistics' into…
AndreasHolt Dec 13, 2025
4104636
fix: use timesource
AndreasHolt Dec 13, 2025
cd8bccf
refactor: lb
AndreasHolt Dec 13, 2025
676b309
refactor: calculateExecutorLoadsForCandidates function
AndreasHolt Dec 13, 2025
2447691
refactor: function name, extra comment
AndreasHolt Dec 13, 2025
ec2656f
refactor: handler and loadbalance
AndreasHolt Dec 14, 2025
e25be12
refactor: update executor loads after move without recomputation
AndreasHolt Dec 14, 2025
91f2818
chore: move comment
AndreasHolt Dec 14, 2025
480d48e
refactor: draining doesn't need to be handled in loadBalance function
AndreasHolt Dec 14, 2025
92dd2c3
chore: moved config to near first usage
Theis-Mathiassen Dec 14, 2025
dff7f07
chore: kept the lock while cloning, and moved right before ok created
Theis-Mathiassen Dec 14, 2025
4625397
fix: changed the way we wait using timesource in tests
Theis-Mathiassen Dec 15, 2025
34daa65
fix: namespaceshardcache tests fail randomly
AndreasHolt Dec 15, 2025
648e14c
Merge branch 'load_balance' of github.com:AndreasHolt/cadencefork int…
AndreasHolt Dec 15, 2025
44e8a25
Merge remote-tracking branch 'origin/heartbeat-shard-statistics' into…
AndreasHolt Dec 15, 2025
ecbab71
fix: changed some time.Now to use mocktime source
Theis-Mathiassen Dec 15, 2025
037ffda
fix: test
AndreasHolt Dec 15, 2025
440ccb3
Merge remote-tracking branch 'origin/heartbeat-shard-statistics' into…
AndreasHolt Dec 15, 2025
9b9c3c9
feat: load cv and num executors metrics (cherry picked from other bra…
AndreasHolt Dec 9, 2025
f795f65
feat: basic metrics setup for canary and shard distributor
AndreasHolt Dec 9, 2025
e4d33a8
fix: DI
AndreasHolt Dec 9, 2025
fb58ce6
Merge branch 'master' into heartbeat-shard-statistics
Theis-Mathiassen Dec 15, 2025
0e783e0
fix: removed duplicate timesource
Theis-Mathiassen Dec 15, 2025
5000ca9
feat: active executors metrics from other branch
AndreasHolt Dec 10, 2025
06ba0e6
feat: emit churn
AndreasHolt Dec 15, 2025
080d041
refactor: emit churn per minute instead
AndreasHolt Dec 15, 2025
c27cbd2
chore: removed global last move to skip load balance
Theis-Mathiassen Dec 15, 2025
8edf22a
refactor: structural change
AndreasHolt Dec 15, 2025
e830cdc
Merge branch 'load_balance' of github.com:AndreasHolt/cadencefork int…
AndreasHolt Dec 15, 2025
a294f2a
fix: remove test
AndreasHolt Dec 15, 2025
8ca66c2
fix: lint
AndreasHolt Dec 15, 2025
7576c8f
Revert "fix: DI"
AndreasHolt Dec 15, 2025
ff6551f
Revert "feat: basic metrics setup for canary and shard distributor"
AndreasHolt Dec 15, 2025
cdc40a8
Merge branch 'heartbeat-shard-statistics' into load_balance
Theis-Mathiassen Dec 15, 2025
fe06ac8
feat: basic metrics setup for canary and shard distributor
AndreasHolt Dec 9, 2025
81f2021
fix: DI
AndreasHolt Dec 9, 2025
465c722
Merge branch 'master' into heartbeat-shard-statistics
Theis-Mathiassen Dec 15, 2025
0d3f7e0
feat: replay
AndreasHolt Dec 15, 2025
7f3b9a5
chore: moved CalculateSmoothedLoad out of executor store
Theis-Mathiassen Dec 16, 2025
83af565
chore: added alias to metadata
Theis-Mathiassen Dec 16, 2025
441bdd2
chore: refactored GetExecutorStatistics
Theis-Mathiassen Dec 16, 2025
bbff684
feat: benefit gating
AndreasHolt Dec 16, 2025
c3b7709
refactor: clean up
AndreasHolt Dec 16, 2025
abcb6d0
chore: added error logging statements to parse executor key
Theis-Mathiassen Dec 16, 2025
f7c9d77
chore: removed functions only used once
Theis-Mathiassen Dec 16, 2025
671661c
chore: simplified return in RecordHeartbeat
Theis-Mathiassen Dec 16, 2025
20456d6
chore: refactored calcUpdatedStatistics
Theis-Mathiassen Dec 16, 2025
92fafe5
chore: fixed duplicate error messages
Theis-Mathiassen Dec 16, 2025
e8711b1
chore: renaming
AndreasHolt Dec 16, 2025
6cddd47
refactor: reflect that we only find a single shard per iteration
AndreasHolt Dec 16, 2025
7bce072
refactor: move initializations
AndreasHolt Dec 16, 2025
86f06ba
refactor: clean up benefit gating config
AndreasHolt Dec 16, 2025
0598692
Merge branch 'load_balance' of github.com:AndreasHolt/cadencefork int…
AndreasHolt Dec 16, 2025
bfa46ae
Merge remote-tracking branch 'origin/heartbeat-shard-statistics' into…
AndreasHolt Dec 16, 2025
def2336
Revert "feat: replay"
AndreasHolt Dec 16, 2025
c50f892
Revert "fix: DI"
AndreasHolt Dec 16, 2025
27a68cb
Revert "feat: basic metrics setup for canary and shard distributor"
AndreasHolt Dec 16, 2025
38cf3d9
chore: added logging to parseExecutorData
Theis-Mathiassen Dec 16, 2025
69fbba0
reafctor: make things look nicer
AndreasHolt Dec 16, 2025
5d3b545
chore: refactored executorData to be for a single executor
Theis-Mathiassen Dec 16, 2025
09a4a40
chore: added some spacing
Theis-Mathiassen Dec 16, 2025
260af7e
feat: added helper functions for namespaceExecutorStatistics
Theis-Mathiassen Dec 16, 2025
6db1110
chore: refactored introducing maps.clone
Theis-Mathiassen Dec 16, 2025
537dab9
chore: refactored switch in watch in namespaceshardcache
Theis-Mathiassen Dec 16, 2025
7e93d0d
chore: made reason for deleting statistics more clear in handleExecut…
Theis-Mathiassen Dec 16, 2025
26ba443
refactor: slice optimization
AndreasHolt Dec 16, 2025
e4bef40
refactor: rename
AndreasHolt Dec 16, 2025
457997a
chore: refactored namespaceShardsCache
Theis-Mathiassen Dec 17, 2025
c5bb576
Merge branch 'master' into heartbeat-shard-statistics
Theis-Mathiassen Dec 17, 2025
1f8aed7
feat: basic metrics setup for canary and shard distributor
AndreasHolt Dec 9, 2025
8c1d602
fix: DI
AndreasHolt Dec 9, 2025
f61d4ea
feat: replay
AndreasHolt Dec 15, 2025
3fdc134
Merge branch 'master' into heartbeat-shard-statistics
Theis-Mathiassen Dec 17, 2025
d3e660e
fix: tests failing
AndreasHolt Dec 18, 2025
4389eab
fix: same fix but for shardcache
AndreasHolt Dec 18, 2025
c40dac8
Merge remote-tracking branch 'origin/master' into heartbeat-shard-sta…
AndreasHolt Dec 18, 2025
d4bd2f9
Merge remote-tracking branch 'origin/heartbeat-shard-statistics' into…
AndreasHolt Dec 18, 2025
44cc86a
fix: move LoadBalance struct
AndreasHolt Dec 18, 2025
e1a77e1
temp: for tests
AndreasHolt Dec 22, 2025
f8a3362
temp: cv metric
AndreasHolt Dec 22, 2025
c6e5cfe
Merge branch 'master' into heartbeat-shard-statistics
Theis-Mathiassen Jan 12, 2026
f4fab6b
Merge branch 'master' into heartbeat-shard-statistics
Theis-Mathiassen Feb 2, 2026
a874d8d
fix: test match functionality, and watch error not checked before pro…
Theis-Mathiassen Feb 3, 2026
f72f03b
Merge branch 'heartbeat-shard-statistics' into load_balance
Theis-Mathiassen Feb 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 108 additions & 18 deletions cmd/sharddistributor-canary/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/uber-go/tally"
"github.com/uber-go/tally/prometheus"
"github.com/urfave/cli/v2"
"go.uber.org/fx"
"go.uber.org/yarpc"
Expand All @@ -16,13 +17,15 @@ import (

sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
"github.com/uber/cadence/common/clock"
cadenceconfig "github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/service/sharddistributor/canary"
"github.com/uber/cadence/service/sharddistributor/canary/executors"
"github.com/uber/cadence/service/sharddistributor/canary/replay"
"github.com/uber/cadence/service/sharddistributor/client/clientcommon"
"github.com/uber/cadence/service/sharddistributor/client/executorclient"
"github.com/uber/cadence/service/sharddistributor/client/spectatorclient"
"github.com/uber/cadence/service/sharddistributor/config"
sdconfig "github.com/uber/cadence/service/sharddistributor/config"
"github.com/uber/cadence/tools/common/commoncli"
)

Expand All @@ -32,6 +35,8 @@ const (
defaultFixedNamespace = "shard-distributor-canary"
defaultEphemeralNamespace = "shard-distributor-canary-ephemeral"
defaultCanaryGRPCPort = 7953 // Port for canary to receive ping requests
defaultCanaryMetricsPort = 9098
defaultReplayNamespace = "shard-distributor-replay"

shardDistributorServiceName = "cadence-shard-distributor"
)
Expand All @@ -41,21 +46,52 @@ func runApp(c *cli.Context) {
fixedNamespace := c.String("fixed-namespace")
ephemeralNamespace := c.String("ephemeral-namespace")
canaryGRPCPort := c.Int("canary-grpc-port")
canaryMetricsPort := c.Int("canary-metrics-port")

fx.New(opts(fixedNamespace, ephemeralNamespace, endpoint, canaryGRPCPort)).Run()
replayOpts := replay.Options{
CSVPath: c.String("replay-csv"),
Speed: c.Float64("replay-speed"),
Namespace: c.String("replay-namespace"),
NumFixedExecutors: c.Int("replay-num-fixed-executors"),
}

fx.New(opts(fixedNamespace, ephemeralNamespace, endpoint, canaryGRPCPort, canaryMetricsPort, replayOpts)).Run()
}

func opts(fixedNamespace, ephemeralNamespace, endpoint string, canaryGRPCPort int) fx.Option {
configuration := clientcommon.Config{
Namespaces: []clientcommon.NamespaceConfig{
{Namespace: fixedNamespace, HeartBeatInterval: 1 * time.Second, MigrationMode: config.MigrationModeONBOARDED},
{Namespace: ephemeralNamespace, HeartBeatInterval: 1 * time.Second, MigrationMode: config.MigrationModeONBOARDED},
{Namespace: executors.LocalPassthroughNamespace, HeartBeatInterval: 1 * time.Second, MigrationMode: config.MigrationModeLOCALPASSTHROUGH},
{Namespace: executors.LocalPassthroughShadowNamespace, HeartBeatInterval: 1 * time.Second, MigrationMode: config.MigrationModeLOCALPASSTHROUGHSHADOW},
{Namespace: executors.DistributedPassthroughNamespace, HeartBeatInterval: 1 * time.Second, MigrationMode: config.MigrationModeDISTRIBUTEDPASSTHROUGH},
{Namespace: executors.ExternalAssignmentNamespace, HeartBeatInterval: 1 * time.Second, MigrationMode: config.MigrationModeDISTRIBUTEDPASSTHROUGH},
func opts(fixedNamespace, ephemeralNamespace, endpoint string, canaryGRPCPort int, canaryMetricsPort int, replayOpts replay.Options) fx.Option {
logger, _ := zap.NewDevelopment()
cadenceLogger := log.NewLogger(logger)

metricsConfig := cadenceconfig.Metrics{
Prometheus: &prometheus.Configuration{
ListenAddress: fmt.Sprintf("127.0.0.1:%d", canaryMetricsPort),
TimerType: "histogram",
},
}
metricsScope := metricsConfig.NewScope(cadenceLogger, "shard-distributor-canary")

if replayOpts.Namespace == "" {
replayOpts.Namespace = defaultReplayNamespace
}
if replayOpts.NumFixedExecutors <= 0 {
replayOpts.NumFixedExecutors = 3
}

configuration := clientcommon.Config{}
if replayOpts.Enabled() {
configuration.Namespaces = []clientcommon.NamespaceConfig{
{Namespace: replayOpts.Namespace, HeartBeatInterval: 1 * time.Second, MigrationMode: sdconfig.MigrationModeONBOARDED},
}
} else {
configuration.Namespaces = []clientcommon.NamespaceConfig{
{Namespace: fixedNamespace, HeartBeatInterval: 1 * time.Second, MigrationMode: sdconfig.MigrationModeONBOARDED},
{Namespace: ephemeralNamespace, HeartBeatInterval: 1 * time.Second, MigrationMode: sdconfig.MigrationModeONBOARDED},
{Namespace: executors.LocalPassthroughNamespace, HeartBeatInterval: 1 * time.Second, MigrationMode: sdconfig.MigrationModeLOCALPASSTHROUGH},
{Namespace: executors.LocalPassthroughShadowNamespace, HeartBeatInterval: 1 * time.Second, MigrationMode: sdconfig.MigrationModeLOCALPASSTHROUGHSHADOW},
{Namespace: executors.DistributedPassthroughNamespace, HeartBeatInterval: 1 * time.Second, MigrationMode: sdconfig.MigrationModeDISTRIBUTEDPASSTHROUGH},
{Namespace: executors.ExternalAssignmentNamespace, HeartBeatInterval: 1 * time.Second, MigrationMode: sdconfig.MigrationModeDISTRIBUTEDPASSTHROUGH},
}
}

canaryGRPCAddress := fmt.Sprintf("127.0.0.1:%d", canaryGRPCPort)

Expand All @@ -71,16 +107,35 @@ func opts(fixedNamespace, ephemeralNamespace, endpoint string, canaryGRPCPort in
clientcommon.GrpcAddressMetadataKey: canaryGRPCAddress,
}

return fx.Options(
options := []fx.Option{
fx.Supply(
fx.Annotate(tally.NoopScope, fx.As(new(tally.Scope))),
fx.Annotate(metricsScope, fx.As(new(tally.Scope))),
fx.Annotate(clock.NewRealTimeSource(), fx.As(new(clock.TimeSource))),
configuration,
transport,
executorMetadata,
logger,
replayOpts,
),
}

fx.Provide(func(peerChooser spectatorclient.SpectatorPeerChooserInterface) yarpc.Config {
if replayOpts.Enabled() {
options = append(options, fx.Provide(func() yarpc.Config {
return yarpc.Config{
Name: "shard-distributor-canary",
Inbounds: yarpc.Inbounds{
transport.NewInbound(listener), // Listen for incoming ping requests
},
Outbounds: yarpc.Outbounds{
shardDistributorServiceName: {
Unary: transport.NewSingleOutbound(endpoint),
Stream: transport.NewSingleOutbound(endpoint),
},
},
}
}))
} else {
options = append(options, fx.Provide(func(peerChooser spectatorclient.SpectatorPeerChooserInterface) yarpc.Config {
return yarpc.Config{
Name: "shard-distributor-canary",
Inbounds: yarpc.Inbounds{
Expand All @@ -98,17 +153,18 @@ func opts(fixedNamespace, ephemeralNamespace, endpoint string, canaryGRPCPort in
},
},
}
}),
}))
}

options = append(options,
fx.Provide(
func(t *grpc.Transport) peer.Transport { return t },
),
fx.Provide(
yarpc.NewDispatcher,
func(d *yarpc.Dispatcher) yarpc.ClientConfig { return d }, // Reprovide the dispatcher as a client config
func(l *zap.Logger) log.Logger { return log.NewLogger(l) },
),
fx.Provide(zap.NewDevelopment),
fx.Provide(log.NewLogger),

// We do decorate instead of Invoke because we want to start and stop the dispatcher at the
// correct time.
Expand All @@ -130,8 +186,18 @@ func opts(fixedNamespace, ephemeralNamespace, endpoint string, canaryGRPCPort in
}),

// Include the canary module - it will set up spectator peer choosers and canary client
canary.Module(canary.NamespacesNames{FixedNamespace: fixedNamespace, EphemeralNamespace: ephemeralNamespace, ExternalAssignmentNamespace: executors.ExternalAssignmentNamespace, SharddistributorServiceName: shardDistributorServiceName}),
canary.ModuleWithReplay(
canary.NamespacesNames{
FixedNamespace: fixedNamespace,
EphemeralNamespace: ephemeralNamespace,
ExternalAssignmentNamespace: executors.ExternalAssignmentNamespace,
SharddistributorServiceName: shardDistributorServiceName,
},
replayOpts,
),
)

return fx.Options(options...)
}

func buildCLI() *cli.App {
Expand Down Expand Up @@ -166,6 +232,30 @@ func buildCLI() *cli.App {
Value: defaultCanaryGRPCPort,
Usage: "port for canary to receive ping requests",
},
&cli.IntFlag{
Name: "canary-metrics-port",
Value: defaultCanaryMetricsPort,
Usage: "port for canary Prometheus metrics",
},
&cli.StringFlag{
Name: "replay-csv",
Usage: "enable CSV load replay (path to CSV: timestamp,load0,...,loadN-1)",
},
&cli.StringFlag{
Name: "replay-namespace",
Value: defaultReplayNamespace,
Usage: "fixed namespace used for CSV replay",
},
&cli.IntFlag{
Name: "replay-num-fixed-executors",
Value: 3,
Usage: "number of fixed-namespace executors to run in-process during replay",
},
&cli.Float64Flag{
Name: "replay-speed",
Value: 1.0,
Usage: "CSV replay speed multiplier (timestamp-following mode)",
},
},
Action: func(c *cli.Context) error {
runApp(c)
Expand Down
68 changes: 67 additions & 1 deletion common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2984,6 +2984,24 @@ const (
ShardDistributorAssignLoopFail

ShardDistributorActiveShards
ShardDistributorActiveExecutors
// ShardDistributorAssignmentLoadMaxOverMean measures max/mean across executor loads
ShardDistributorAssignmentLoadMaxOverMean
// ShardDistributorAssignmentLoadCV measures coefficient of variation across executor loads
ShardDistributorAssignmentLoadCV
// ShardDistributorAssignmentSmoothedLoadMaxOverMean measures max/mean across executor smoothed loads
ShardDistributorAssignmentSmoothedLoadMaxOverMean
// ShardDistributorAssignmentSmoothedLoadCV measures coefficient of variation across executor smoothed loads
ShardDistributorAssignmentSmoothedLoadCV
// ShardDistributorAssignmentReportedLoadMissingRatio measures the fraction of assigned shards that have no
// per-shard load report on the currently assigned executor.
ShardDistributorAssignmentReportedLoadMissingRatio
// ShardDistributorAssignmentSmoothedLoadMissingRatio measures the fraction of assigned shards that are missing
// smoothed load statistics (ShardStats entry absent or no update time).
ShardDistributorAssignmentSmoothedLoadMissingRatio
// ShardDistributorAssignmentSmoothedLoadStaleRatio measures the fraction of assigned shards whose smoothed load
// statistics are stale relative to the leader's staleness threshold.
ShardDistributorAssignmentSmoothedLoadStaleRatio
ShardDistributorTotalExecutors
ShardDistributorOldestExecutorHeartbeatLag

Expand All @@ -2992,6 +3010,19 @@ const (
ShardDistributorStoreRequestsPerNamespace
ShardDistributorStoreLatencyHistogramPerNamespace

// ShardDistributorLoadBalanceCycles reports number of load-balance cycles executed (counter).
ShardDistributorLoadBalanceCycles
// ShardDistributorLoadBalanceMoves reports number of shards moved by the load balancer (counter).
ShardDistributorLoadBalanceMoves
// ShardDistributorLoadBalanceSourceExecutorsInitial reports the count of source executors (above upper band)
// at the start of the load-balance cycle.
ShardDistributorLoadBalanceSourceExecutorsInitial
// ShardDistributorLoadBalanceDestinationExecutorsInitial reports the count of destination executors (below lower band)
// at the start of the load-balance cycle.
ShardDistributorLoadBalanceDestinationExecutorsInitial
// ShardDistributorLoadBalanceStopReason increments once per cycle with a low-cardinality `reason` tag describing why load-balance stopped.
ShardDistributorLoadBalanceStopReason

// ShardDistributorShardAssignmentDistributionLatency measures the time taken between assignment of a shard
// and the time it is fully distributed to executors
ShardDistributorShardAssignmentDistributionLatency
Expand Down Expand Up @@ -3789,7 +3820,25 @@ var MetricDefs = map[ServiceIdx]map[MetricIdx]metricDefinition{
ShardDistributorAssignLoopSuccess: {metricName: "shard_distrubutor_shard_assign_success", metricType: Counter},
ShardDistributorAssignLoopFail: {metricName: "shard_distrubutor_shard_assign_fail", metricType: Counter},

ShardDistributorActiveShards: {metricName: "shard_distributor_active_shards", metricType: Gauge},
ShardDistributorActiveShards: {metricName: "shard_distributor_active_shards", metricType: Gauge},
ShardDistributorActiveExecutors: {metricName: "shard_distributor_active_executors", metricType: Gauge},
ShardDistributorAssignmentLoadMaxOverMean: {metricName: "shard_distributor_assignment_load_max_over_mean", metricType: Gauge},
ShardDistributorAssignmentLoadCV: {metricName: "shard_distributor_assignment_load_cv", metricType: Gauge},
ShardDistributorAssignmentSmoothedLoadMaxOverMean: {metricName: "shard_distributor_assignment_smoothed_load_max_over_mean", metricType: Gauge},
ShardDistributorAssignmentSmoothedLoadCV: {metricName: "shard_distributor_assignment_smoothed_load_cv", metricType: Gauge},
ShardDistributorAssignmentReportedLoadMissingRatio: {
metricName: "shard_distributor_assignment_reported_load_missing_ratio",
metricType: Gauge,
},
ShardDistributorAssignmentSmoothedLoadMissingRatio: {
metricName: "shard_distributor_assignment_smoothed_load_missing_ratio",
metricType: Gauge,
},
ShardDistributorAssignmentSmoothedLoadStaleRatio: {
metricName: "shard_distributor_assignment_smoothed_load_stale_ratio",
metricType: Gauge,
},

ShardDistributorTotalExecutors: {metricName: "shard_distributor_total_executors", metricType: Gauge},
ShardDistributorOldestExecutorHeartbeatLag: {metricName: "shard_distributor_oldest_executor_heartbeat_lag", metricType: Gauge},

Expand All @@ -3798,6 +3847,23 @@ var MetricDefs = map[ServiceIdx]map[MetricIdx]metricDefinition{
ShardDistributorStoreRequestsPerNamespace: {metricName: "shard_distributor_store_requests_per_namespace", metricType: Counter},
ShardDistributorStoreLatencyHistogramPerNamespace: {metricName: "shard_distributor_store_latency_histogram_per_namespace", metricType: Histogram, buckets: ShardDistributorExecutorStoreLatencyBuckets},

ShardDistributorLoadBalanceCycles: {metricName: "shard_distributor_load_balance_cycles", metricType: Counter},
ShardDistributorLoadBalanceMoves: {metricName: "shard_distributor_load_balance_moves", metricType: Counter},
ShardDistributorLoadBalanceSourceExecutorsInitial: {
metricName: "shard_distributor_load_balance_source_executors_initial",
metricType: Gauge,
},
ShardDistributorLoadBalanceDestinationExecutorsInitial: {
metricName: "shard_distributor_load_balance_destination_executors_initial",
metricType: Gauge,
},
ShardDistributorLoadBalanceStopReason: {
metricName: "shard_distributor_load_balance_stop_reason",
metricType: Counter,
},

ShardDistributorShardAssignmentDistributionLatency: {metricName: "shard_distributor_shard_assignment_distribution_latency", metricType: Histogram, buckets: Default1ms100s.buckets()},
ShardDistributorShardHandoverLatency: {metricName: "shard_distributor_shard_handover_latency", metricType: Histogram, buckets: Default1ms100s.buckets()},
ShardDistributorShardAssignmentDistributionLatency: {metricName: "shard_distributor_shard_assignment_distribution_latency", metricType: Histogram, buckets: ShardDistributorShardAssignmentLatencyBuckets},
ShardDistributorShardHandoverLatency: {metricName: "shard_distributor_shard_handover_latency", metricType: Histogram, buckets: ShardDistributorShardAssignmentLatencyBuckets},
},
Expand Down
12 changes: 12 additions & 0 deletions config/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ shardDistribution:
type: fixed
mode: onboarded
shardNum: 32
- name: shard-distributor-replay
type: fixed
mode: onboarded
# Must match the CSV column count when using canary replay mode.
shardNum: 1000
- name: shard-distributor-canary-ephemeral
mode: onboarded
type: ephemeral
Expand Down Expand Up @@ -186,6 +191,12 @@ shardDistribution:
process:
period: 1s
heartbeatTTL: 2s
loadBalance:
disableBenefitGating: false
severeImbalanceRatio: 1.3
moveBudgetProportion: 0.05
hysteresisUpperBand: 1.1
hysteresisLowerBand: 0.95

shard-distributor-matching:
namespaces:
Expand All @@ -194,3 +205,4 @@ shard-distributor-matching:
migration_mode: local_pass
ttl_shard: 5m
ttl_report: 1m

4 changes: 4 additions & 0 deletions config/dynamicconfig/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,7 @@ shardDistributor.migrationMode:
- value: "distributed_pass"
constraints:
namespace: "test-external-assignment"
shardDistributor.loadBalancingMode:
- value: "greedy"
constraints:
namespace: "shard-distributor-replay"
2 changes: 1 addition & 1 deletion docker/grafana/provisioning/datasources/default.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
datasources:
- name: Prometheus
type: prometheus
url: http://host.docker.internal:9090
url: http://prometheus:9090
version: 1
editable: true
5 changes: 3 additions & 2 deletions docker/prometheus/prometheus.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
global:
scrape_interval: 5s
scrape_interval: 1s
external_labels:
monitor: 'cadence-monitor'
scrape_configs:
Expand All @@ -11,4 +11,5 @@ scrape_configs:
- 'cadence:8001'
- 'cadence:8002'
- 'cadence:8003'
- 'host.docker.internal:8004' # Endpoint for Cadence samples running on localhost
- 'host.docker.internal:8004' # Cadence shard distributor metrics exposed on host
- 'host.docker.internal:9098' # Shard distributor canary metrics exposed on host
18 changes: 18 additions & 0 deletions service/sharddistributor/canary/executors/executors.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,21 @@ func Module(fixedNamespace, ephemeralNamespace, externalAssignmentNamespace stri
fx.Invoke(NewExecutorsModule),
)
}

// ReplayFixedModule starts N fixed-namespace executors for a single namespace.
// It is used by the CSV replay canary to simulate multiple executors in a single process.
func ReplayFixedModule(namespace string, fixedExecutorCount int) fx.Option {
if fixedExecutorCount <= 0 {
fixedExecutorCount = 1
}

var options []fx.Option
for i := 0; i < fixedExecutorCount; i++ {
options = append(options, fx.Provide(func(params executorclient.Params[*processor.ShardProcessor]) (ExecutorResult, error) {
return NewExecutorWithFixedNamespace(params, namespace)
}))
}
options = append(options, fx.Invoke(NewExecutorsModule))

return fx.Module("ReplayFixedExecutors", fx.Options(options...))
}
Loading