Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2977,6 +2977,11 @@ const (
ShardDistributorAssignLoopAttempts
ShardDistributorAssignLoopSuccess
ShardDistributorAssignLoopFail
ShardDistributorAssignLoopGetStateLatency
ShardDistributorAssignLoopCalculateLatency
ShardDistributorAssignLoopWriteLatency
ShardDistributorAssignLoopNumExecutors
ShardDistributorAssignLoopNumShards

ShardDistributorActiveShards

Expand All @@ -2985,6 +2990,11 @@ const (
ShardDistributorStoreRequestsPerNamespace
ShardDistributorStoreLatencyHistogramPerNamespace

// GetState internal breakdown metrics
ShardDistributorGetStateEtcdFetchLatency
ShardDistributorGetStateDeserializeLatency
ShardDistributorGetStateNumKeys

// ShardDistributorShardAssignmentDistributionLatency measures the time taken between assignment of a shard
// and the time it is fully distributed to executors
ShardDistributorShardAssignmentDistributionLatency
Expand Down Expand Up @@ -3776,13 +3786,21 @@ var MetricDefs = map[ServiceIdx]map[MetricIdx]metricDefinition{
ShardDistributorAssignLoopAttempts: {metricName: "shard_distrubutor_shard_assign_attempt", metricType: Counter},
ShardDistributorAssignLoopSuccess: {metricName: "shard_distrubutor_shard_assign_success", metricType: Counter},
ShardDistributorAssignLoopFail: {metricName: "shard_distrubutor_shard_assign_fail", metricType: Counter},
ShardDistributorAssignLoopGetStateLatency: {metricName: "shard_distributor_assign_loop_get_state_latency", metricType: Histogram, buckets: ShardDistributorExecutorStoreLatencyBuckets},
ShardDistributorAssignLoopCalculateLatency: {metricName: "shard_distributor_assign_loop_calculate_latency", metricType: Histogram, buckets: ShardDistributorExecutorStoreLatencyBuckets},
ShardDistributorAssignLoopWriteLatency: {metricName: "shard_distributor_assign_loop_write_latency", metricType: Histogram, buckets: ShardDistributorExecutorStoreLatencyBuckets},
ShardDistributorAssignLoopNumExecutors: {metricName: "shard_distributor_assign_loop_num_executors", metricType: Gauge},
ShardDistributorAssignLoopNumShards: {metricName: "shard_distributor_assign_loop_num_shards", metricType: Gauge},

ShardDistributorActiveShards: {metricName: "shard_distributor_active_shards", metricType: Gauge},

ShardDistributorStoreExecutorNotFound: {metricName: "shard_distributor_store_executor_not_found", metricType: Counter},
ShardDistributorStoreFailuresPerNamespace: {metricName: "shard_distributor_store_failures_per_namespace", metricType: Counter},
ShardDistributorStoreRequestsPerNamespace: {metricName: "shard_distributor_store_requests_per_namespace", metricType: Counter},
ShardDistributorStoreLatencyHistogramPerNamespace: {metricName: "shard_distributor_store_latency_histogram_per_namespace", metricType: Histogram, buckets: ShardDistributorExecutorStoreLatencyBuckets},
ShardDistributorGetStateEtcdFetchLatency: {metricName: "shard_distributor_get_state_etcd_fetch_latency", metricType: Histogram, buckets: ShardDistributorExecutorStoreLatencyBuckets},
ShardDistributorGetStateDeserializeLatency: {metricName: "shard_distributor_get_state_deserialize_latency", metricType: Histogram, buckets: ShardDistributorExecutorStoreLatencyBuckets},
ShardDistributorGetStateNumKeys: {metricName: "shard_distributor_get_state_num_keys", metricType: Gauge},

ShardDistributorShardAssignmentDistributionLatency: {metricName: "shard_distributor_shard_assignment_distribution_latency", metricType: Histogram, buckets: ShardDistributorShardAssignmentLatencyBuckets},
ShardDistributorShardHandoverLatency: {metricName: "shard_distributor_shard_handover_latency", metricType: Histogram, buckets: ShardDistributorShardAssignmentLatencyBuckets},
Expand Down
31 changes: 30 additions & 1 deletion service/sharddistributor/leader/process/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,10 @@ func (p *namespaceProcessor) rebalanceShards(ctx context.Context) (err error) {
}

func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoopScope metrics.Scope) (err error) {
// Phase 1: Get state from store
getStateStart := p.timeSource.Now()
namespaceState, err := p.shardStore.GetState(ctx, p.namespaceCfg.Name)
metricsLoopScope.RecordHistogramDuration(metrics.ShardDistributorAssignLoopGetStateLatency, p.timeSource.Now().Sub(getStateStart))
if err != nil {
return fmt.Errorf("get state: %w", err)
}
Expand All @@ -365,6 +368,9 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo
}
p.lastAppliedRevision = namespaceState.GlobalRevision

// Phase 2: Calculate new assignments
calculateStart := p.timeSource.Now()

// Identify stale executors that need to be removed
staleExecutors := p.identifyStaleExecutors(namespaceState)
if len(staleExecutors) > 0 {
Expand All @@ -374,6 +380,7 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo
activeExecutors := p.getActiveExecutors(namespaceState, staleExecutors)
if len(activeExecutors) == 0 {
p.logger.Info("No active executors found. Cannot assign shards.")
metricsLoopScope.RecordHistogramDuration(metrics.ShardDistributorAssignLoopCalculateLatency, p.timeSource.Now().Sub(calculateStart))
return nil
}
p.logger.Info("Active executors", tag.ShardExecutors(activeExecutors))
Expand All @@ -382,6 +389,14 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo
shardsToReassign, currentAssignments := p.findShardsToReassign(activeExecutors, namespaceState, deletedShards, staleExecutors)

metricsLoopScope.UpdateGauge(metrics.ShardDistributorAssignLoopNumRebalancedShards, float64(len(shardsToReassign)))
metricsLoopScope.UpdateGauge(metrics.ShardDistributorAssignLoopNumExecutors, float64(len(activeExecutors)))

// Count total shards across all executors
totalShards := 0
for _, shards := range currentAssignments {
totalShards += len(shards)
}
metricsLoopScope.UpdateGauge(metrics.ShardDistributorAssignLoopNumShards, float64(totalShards))

// If there are deleted shards or stale executors, the distribution has changed.
assignedToEmptyExecutors := assignShardsToEmptyExecutors(currentAssignments)
Expand All @@ -391,24 +406,38 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo
distributionChanged := len(deletedShards) > 0 || len(staleExecutors) > 0 || assignedToEmptyExecutors || updatedAssignments || isRebalancedByShardLoad
if !distributionChanged {
p.logger.Info("No changes to distribution detected. Skipping rebalance.")
metricsLoopScope.RecordHistogramDuration(metrics.ShardDistributorAssignLoopCalculateLatency, p.timeSource.Now().Sub(calculateStart))
return nil
}

newState := p.getNewAssignmentsState(namespaceState, currentAssignments)
metricsLoopScope.RecordHistogramDuration(metrics.ShardDistributorAssignLoopCalculateLatency, p.timeSource.Now().Sub(calculateStart))

if p.sdConfig.GetMigrationMode(p.namespaceCfg.Name) != types.MigrationModeONBOARDED {
p.logger.Info("Running rebalancing in shadow mode", tag.Dynamic("old_assignments", namespaceState.ShardAssignments), tag.Dynamic("new_assignments", newState))
p.emitActiveShardMetric(namespaceState.ShardAssignments, metricsLoopScope)

// Even in shadow mode, cleanup stale executors to prevent data accumulation
if len(staleExecutors) > 0 {
p.logger.Info("Cleaning up stale executors in shadow mode", tag.ShardExecutors(slices.Collect(maps.Keys(staleExecutors))))
if err := p.shardStore.DeleteExecutors(ctx, p.namespaceCfg.Name, slices.Collect(maps.Keys(staleExecutors)), p.election.Guard()); err != nil {
p.logger.Error("Failed to delete stale executors in shadow mode", tag.Error(err))
// Don't return error - shadow mode should be resilient
}
}
return nil
}

namespaceState.ShardAssignments = newState
p.logger.Info("Applying new shard distribution.")

// Use the leader guard for the assign and delete operation.
// Phase 3: Write new assignments to store
writeStart := p.timeSource.Now()
err = p.shardStore.AssignShards(ctx, p.namespaceCfg.Name, store.AssignShardsRequest{
NewState: namespaceState,
ExecutorsToDelete: staleExecutors,
}, p.election.Guard())
metricsLoopScope.RecordHistogramDuration(metrics.ShardDistributorAssignLoopWriteLatency, p.timeSource.Now().Sub(writeStart))
if err != nil {
return fmt.Errorf("assign shards: %w", err)
}
Expand Down
55 changes: 35 additions & 20 deletions service/sharddistributor/store/etcd/executorstore/etcdstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/sharddistributor/config"
"github.com/uber/cadence/service/sharddistributor/store"
Expand All @@ -29,13 +30,14 @@ var (
)

type executorStoreImpl struct {
client etcdclient.Client
prefix string
logger log.Logger
shardCache *shardcache.ShardToExecutorCache
timeSource clock.TimeSource
recordWriter *common.RecordWriter
cfg *config.Config
client etcdclient.Client
prefix string
logger log.Logger
shardCache *shardcache.ShardToExecutorCache
timeSource clock.TimeSource
recordWriter *common.RecordWriter
cfg *config.Config
metricsClient metrics.Client
}

// shardStatisticsUpdate holds the staged statistics for a shard so we can write them
Expand All @@ -49,12 +51,13 @@ type shardStatisticsUpdate struct {
type ExecutorStoreParams struct {
fx.In

Client etcdclient.Client `name:"executorstore"`
ETCDConfig ETCDConfig
Lifecycle fx.Lifecycle
Logger log.Logger
TimeSource clock.TimeSource
Config *config.Config
Client etcdclient.Client `name:"executorstore"`
ETCDConfig ETCDConfig
Lifecycle fx.Lifecycle
Logger log.Logger
TimeSource clock.TimeSource
Config *config.Config
MetricsClient metrics.Client
}

// NewStore creates a new etcd-backed store and provides it to the fx application.
Expand All @@ -72,13 +75,14 @@ func NewStore(p ExecutorStoreParams) (store.Store, error) {
}

store := &executorStoreImpl{
client: p.Client,
prefix: p.ETCDConfig.Prefix,
logger: p.Logger,
shardCache: shardCache,
timeSource: timeSource,
recordWriter: recordWriter,
cfg: p.Config,
client: p.Client,
prefix: p.ETCDConfig.Prefix,
logger: p.Logger,
shardCache: shardCache,
timeSource: timeSource,
recordWriter: recordWriter,
cfg: p.Config,
metricsClient: p.MetricsClient,
}

p.Lifecycle.Append(fx.StartStopHook(store.Start, store.Stop))
Expand Down Expand Up @@ -201,16 +205,26 @@ func (s *executorStoreImpl) GetHeartbeat(ctx context.Context, namespace string,
// --- ShardStore Implementation ---

func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*store.NamespaceState, error) {
metricsScope := s.metricsClient.Scope(metrics.ShardDistributorGetShardOwnerScope, metrics.NamespaceTag(namespace))

heartbeatStates := make(map[string]store.HeartbeatState)
assignedStates := make(map[string]store.AssignedState)
shardStats := make(map[string]store.ShardStatistics)

// Phase 1: Fetch from etcd
etcdFetchStart := s.timeSource.Now()
executorPrefix := etcdkeys.BuildExecutorsPrefix(s.prefix, namespace)
resp, err := s.client.Get(ctx, executorPrefix, clientv3.WithPrefix())
metricsScope.RecordHistogramDuration(metrics.ShardDistributorGetStateEtcdFetchLatency, s.timeSource.Now().Sub(etcdFetchStart))
if err != nil {
return nil, fmt.Errorf("get executor data: %w", err)
}

// Record number of keys fetched
metricsScope.UpdateGauge(metrics.ShardDistributorGetStateNumKeys, float64(len(resp.Kvs)))

// Phase 2: Deserialize all keys
deserializeStart := s.timeSource.Now()
for _, kv := range resp.Kvs {
key := string(kv.Key)
value := string(kv.Value)
Expand Down Expand Up @@ -258,6 +272,7 @@ func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*st
heartbeatStates[executorID] = heartbeat
assignedStates[executorID] = assigned
}
metricsScope.RecordHistogramDuration(metrics.ShardDistributorGetStateDeserializeLatency, s.timeSource.Now().Sub(deserializeStart))

return &store.NamespaceState{
Executors: heartbeatStates,
Expand Down
Loading