Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 125 additions & 21 deletions pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,23 @@ func sortTargetCandidateSetAndPick(
}
slices.SortFunc(cands.candidates, func(a, b candidateInfo) int {
if diversityScoresAlmostEqual(a.diversityScore, b.diversityScore) {
// Note: Consider the case where the current leaseholder's LPI is
// 3 (lower is better) and we have the following candidates:
// - LPI=1 SLS=normal
// - LPI=2 SLS=low
// Currently we consider the low-SLS candidate first. This is in
// contrast to the single-metric allocator, which only considers
// candidates in the lowest-SLS class (i.e. wouldn't even consider
// the low-SLS candidate since we have a candidate at LPI=1). If we
// make the corresponding change in candidateToMoveLease, we would
// match the single-metric allocator's behavior, but it's unclear
// that that would be better. A good middle ground could be sorting
// here by LPI first, then SLS. That should result in mma preferring
// improving the lease preference, but if that is not possible, it
// would settle for not making it worse (than the current
// leaseholder), which the single-metric allocator won't.
//
// TODO(tbg): consider changing this to sort by LPI first, then SLS.
return cmp.Or(cmp.Compare(a.sls, b.sls),
cmp.Compare(a.leasePreferenceIndex, b.leasePreferenceIndex),
cmp.Compare(a.StoreID, b.StoreID))
Expand Down Expand Up @@ -537,6 +554,9 @@ func sortTargetCandidateSetAndPick(
}
}
// Diversity is the same. Include if not reaching disk capacity.
// TODO(tbg): remove highDiskSpaceUtilization check here. These candidates
// should instead be filtered out by retainReadyLeaseTargetStoresOnly (which
// filters down the initial candidate set before computing the mean).
if !cand.highDiskSpaceUtilization {
cands.candidates[j] = cand
j++
Expand Down Expand Up @@ -815,10 +835,10 @@ func (cs *clusterState) ensureAnalyzedConstraints(rstate *rangeState) {
// - Need diversity change for each candidate.
//
// The first 3 bullets are encapsulated in the helper function
// computeCandidatesForRange. It works for both replica additions and
// computeCandidatesForReplicaTransfer. It works for both replica additions and
// rebalancing.
//
// For the last bullet (diversity), the caller of computeCandidatesForRange
// For the last bullet (diversity), the caller of computeCandidatesForReplicaTransfer
// needs to populate candidateInfo.diversityScore for each candidate in
// candidateSet. It does so via diversityScoringMemo. Then the (loadSummary,
// diversityScore) pair can be used to order candidates for attempts to add.
Expand Down Expand Up @@ -846,41 +866,125 @@ func (cs *clusterState) ensureAnalyzedConstraints(rstate *rangeState) {

// loadSheddingStore is only specified if this candidate computation is
// happening because of overload.
func (cs *clusterState) computeCandidatesForRange(
//
// postMeansExclusions are filtered post-means: their load is included in the
// mean (they're viable locations in principle) but they're not candidates for
// this specific transfer (the classic case: already have a replica).
func (cs *clusterState) computeCandidatesForReplicaTransfer(
ctx context.Context,
expr constraintsDisj,
storesToExclude storeSet,
conj constraintsConj,
existingReplicas storeSet,
postMeansExclusions storeSet,
loadSheddingStore roachpb.StoreID,
passObs *rebalancingPassMetricsAndLogger,
) (_ candidateSet, sheddingSLS storeLoadSummary) {
means := cs.meansMemo.getMeans(expr)
if loadSheddingStore > 0 {
sheddingSS := cs.stores[loadSheddingStore]
sheddingSLS = cs.meansMemo.getStoreLoadSummary(ctx, means, loadSheddingStore, sheddingSS.loadSeqNum)
if sheddingSLS.sls <= loadNoChange && sheddingSLS.nls <= loadNoChange {
// In this set of stores, this store no longer looks overloaded.
passObs.replicaShed(notOverloaded)
return candidateSet{}, sheddingSLS
}
// Start with computing the stores (and corresponding means) that satisfy
// the constraint expression. If we don't see a need to filter out any of
// these stores before computing the means, we can use it verbatim, otherwise
// we will recompute the means again below.
cs.scratchDisj[0] = conj
means := cs.meansMemo.getMeans(cs.scratchDisj[:1])

// Pre-means filtering: copy to scratch, then filter in place.
// Filter out stores that have a non-OK replica disposition.
cs.scratchStoreSet = append(cs.scratchStoreSet[:0], means.stores...)
filteredStores := retainReadyReplicaTargetStoresOnly(ctx, cs.scratchStoreSet, cs.stores, existingReplicas)

// Determine which means to use.
//
// TODO(tbg): unit testing.
var effectiveMeans *meansLoad
if len(filteredStores) == len(means.stores) {
// Common case: nothing was filtered, use cached means.
effectiveMeans = &means.meansLoad
} else if len(filteredStores) == 0 {
// No viable candidates at all.
return candidateSet{}, sheddingSLS
} else {
// Some stores were filtered; recompute means over filtered set.
cs.scratchMeans = computeMeansForStoreSet(
cs, filteredStores, cs.meansMemo.scratchNodes, cs.meansMemo.scratchStores)
effectiveMeans = &cs.scratchMeans
log.KvDistribution.VEventf(ctx, 2,
"pre-means filtered %d stores → remaining %v, means: store=%v node=%v",
len(means.stores)-len(filteredStores), filteredStores,
effectiveMeans.storeLoad, effectiveMeans.nodeLoad)
}
// We only filter out stores that are not fdOK. The rest of the filtering
// happens later.

sheddingSLS = cs.computeLoadSummary(ctx, loadSheddingStore, &effectiveMeans.storeLoad, &effectiveMeans.nodeLoad)
if sheddingSLS.sls <= loadNoChange && sheddingSLS.nls <= loadNoChange {
// In this set of stores, this store no longer looks overloaded.
passObs.replicaShed(notOverloaded)
return candidateSet{}, sheddingSLS
}

var cset candidateSet
for _, storeID := range means.stores {
if storesToExclude.contains(storeID) {
for _, storeID := range filteredStores {
if postMeansExclusions.contains(storeID) {
// This store's load is included in the mean, but it's not a viable
// target for this specific transfer (e.g. it already has a replica).
continue
}
ss := cs.stores[storeID]
csls := cs.meansMemo.getStoreLoadSummary(ctx, means, storeID, ss.loadSeqNum)
csls := cs.computeLoadSummary(ctx, storeID, &effectiveMeans.storeLoad, &effectiveMeans.nodeLoad)
cset.candidates = append(cset.candidates, candidateInfo{
StoreID: storeID,
storeLoadSummary: csls,
})
}
cset.means = &means.meansLoad
cset.means = effectiveMeans
return cset, sheddingSLS
}

// retainReadyReplicaTargetStoresOnly filters the input set to only those stores
// that are ready to accept a replica. A store is not ready if it has a non-OK
// replica disposition. In practice, the input set is already filtered by
// constraints.
//
// Stores already housing a replica (on top of being in the input storeSet)
// bypass this disposition check since they already have the replica - its load
// should be in the mean regardless of its disposition, as we'll pick candidates
// based on improving clustering around the mean.
//
// The input storeSet is mutated (and returned as the result).
func retainReadyReplicaTargetStoresOnly(
ctx context.Context,
in storeSet,
stores map[roachpb.StoreID]*storeState,
existingReplicas storeSet,
) storeSet {
out := in[:0]
for _, storeID := range in {
if existingReplicas.contains(storeID) {
// Stores on existing replicas already have the load and we want to
// include them in the mean, even if they are not accepting new replicas
// or even try to shed.
//
// TODO(tbg): health might play into this, though. For example, when
// a store is dead, whatever load we have from it is stale and we
// are better off not including it. For now, we ignore this problem
// because the mma only handles rebalancing, whereas a replica on a
// dead store would be removed by the single-metric allocator after
// the TimeUntilStoreDead and so would disappear from our view.
out = append(out, storeID)
continue
}
ss := stores[storeID]
switch {
case ss.status.Disposition.Replica != ReplicaDispositionOK:
log.KvDistribution.VEventf(ctx, 2, "skipping s%d for replica transfer: replica disposition %v (health %v)", storeID, ss.status.Disposition.Replica, ss.status.Health)
case highDiskSpaceUtilization(ss.reportedLoad[ByteSize], ss.capacity[ByteSize]):
// TODO(tbg): remove this from mma and just let the caller set this
// disposition based on the following cluster settings:
// - kv.allocator.max_disk_utilization_threshold
// - kv.allocator.rebalance_to_max_disk_utilization_threshold
log.KvDistribution.VEventf(ctx, 2, "skipping s%d for replica transfer: high disk utilization (health %v)", storeID, ss.status.Health)
default:
out = append(out, storeID)
}
}
return out
}

// Diversity scoring is very amenable to caching, since the set of unique
// locality tiers for range replicas is likely to be small. And the cache does
// not need to be cleared after every allocator pass. This caching is done via
Expand Down
117 changes: 81 additions & 36 deletions pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -1268,6 +1268,9 @@ type clusterState struct {
ranges map[roachpb.RangeID]*rangeState

scratchRangeMap map[roachpb.RangeID]struct{}
scratchStoreSet storeSet // scratch space for pre-means filtering
scratchMeans meansLoad // scratch space for recomputed means
scratchDisj [1]constraintsConj // scratch space for getMeans call

// Added to when a change is proposed. Will also add to corresponding
// rangeState.pendingChanges and to the affected storeStates.
Expand Down Expand Up @@ -1623,6 +1626,7 @@ func (cs *clusterState) processStoreLeaseholderMsgInternal(
topk.dim = WriteBandwidth
}
if sls.highDiskSpaceUtilization {
// If disk space is running out, shedding bytes becomes the top priority.
topk.dim = ByteSize
} else if sls.sls > loadNoChange {
// If multiple dimensions are contributing the same loadSummary, we will pick
Expand Down Expand Up @@ -2191,15 +2195,42 @@ func (cs *clusterState) getNodeReportedLoad(nodeID roachpb.NodeID) *NodeLoad {
return nil
}

// canShedAndAddLoad returns true if the delta can be added to the target
// store and removed from the src store, such that the relative load summaries
// will not get worse.
// canShedAndAddLoad returns true if the delta can be added to the target store
// and removed from the src store. It does not change any state between the call
// and return.
//
// It does not change any state between the call and return.
// overloadDim represents the dimension that is overloaded in the source and the
// function requires that along that dimension, the target is < loadNoChange and
// the source is > loadNoChange.
//
// overloadDim represents the dimension that is overloaded in the source and
// the function requires that the target must be currently < loadNoChange
// along that dimension.
// Broadly speaking, the method tries to ascertain that the target wouldn't be
// worse off than the source following the transfer. To do this, the method
// looks at a load summary for the target that would result from the load
// transfer (targetLoadSummary).
//
// When onlyConsiderTargetCPUSummary is true, the targetLoadSummary derives from
// the target's post-transfer CPU dimension only. This is appropriate when a lease is
// transferred, as this should only affect the CPU dimension, and we don't want
// lease transfers to be subject to stricter checks related to other dimensions.
// When onlyConsiderTargetCPUSummary is false, targetLoadSummary is the target's
// worst post-transfer load summary. In both cases, the node load summary is also
// considered.
//
// TODO(tbg): understand and explain why the node load summary is in the mix here.
//
// In either case, if the targetLoadSummary is < loadNoChange, the change is
// permitted right away. Otherwise, stricter checks apply: After the transfer,
// - the target must not be overloadUrgent,
// - the target has no pending changes (to delay making a potentially non-ideal
// choice of the target),
// - the target's overloaded dimension's summary must not be worse than the
// source's ("overloadedDimPermitsChange"),
// - along each of the other (!=overloadeDim) dimensions, the percentage
// increase in load is at most a third of that of the overloaded dimension.
// (e.g. if CPU goes up by 30%, WriteBandwidth can go up by at most 10%).
// - the target's node load summary must not be worse than the target's store
// load summary. See inline comment for more details.

func (cs *clusterState) canShedAndAddLoad(
ctx context.Context,
srcSS *storeState,
Expand All @@ -2216,6 +2247,11 @@ func (cs *clusterState) canShedAndAddLoad(
// the load delta addition flips the loadSummary for either the target or the
// source, which suggests it might be useful to add this to verbose logging.

// Compute srcSLS and targetSLS, which are the load summaries of the source
// and target that would result from moving the lease.
//
// TODO(tbg): extract this into a helper and set it up so that it doesn't
// temporarily modify the cluster state.
targetNS := cs.nodes[targetSS.NodeID]
// Add the delta.
deltaToAdd := loadVectorToAdd(delta)
Expand Down Expand Up @@ -2254,28 +2290,16 @@ func (cs *clusterState) canShedAndAddLoad(
reason.WriteString("targetSLS.highDiskSpaceUtilization")
return false
}
// We define targetSummary as a summarization across all dimensions of the
// target. A targetSummary < loadNoChange always accepts the change. When
// the targetSummary >= loadNoChange, we are stricter and require both that
// there are no pending changes in the target, and the target is "not worse"
// in a way that will cause thrashing, where the details are defined below.
// The no pending changes requirement is to delay making a potentially
// non-ideal choice of the target.
//
// NB: The target's overload dimension summary must have been <
// loadNoChange, and the source must have been > loadNoChange.

// We define targetSummary as a "worst" of the considered load dimesions
// (only CPU, or all).
var targetSummary loadSummary
if onlyConsiderTargetCPUSummary {
targetSummary = targetSLS.dimSummary[CPURate]
if targetSummary < targetSLS.nls {
targetSummary = targetSLS.nls
}
} else {
targetSummary = targetSLS.sls
if targetSummary < targetSLS.nls {
targetSummary = targetSLS.nls
}
}
targetSummary = max(targetSummary, targetSLS.nls)

if targetSummary < loadNoChange {
return true
Expand All @@ -2284,6 +2308,7 @@ func (cs *clusterState) canShedAndAddLoad(
reason.WriteString("overloadUrgent")
return false
}

// Need to consider additional factors.
//
// It is possible that both are overloadSlow in aggregate. We want to make
Expand Down Expand Up @@ -2312,7 +2337,7 @@ func (cs *clusterState) canShedAndAddLoad(
// That boolean predicate can also be too strict, in that we should permit
// transitions to overloadSlow along one dimension, to allow for an
// exchange.
overloadedDimFractionIncrease := math.MaxFloat64
var overloadedDimFractionIncrease float64
if targetSS.adjusted.load[overloadedDim] > 0 {
overloadedDimFractionIncrease = float64(deltaToAdd[overloadedDim]) /
float64(targetSS.adjusted.load[overloadedDim])
Expand Down Expand Up @@ -2353,14 +2378,33 @@ func (cs *clusterState) canShedAndAddLoad(
targetSLS.maxFractionPendingIncrease < epsilon &&
targetSLS.maxFractionPendingDecrease < epsilon &&
// NB: targetSLS.nls <= targetSLS.sls is not a typo, in that we are
// comparing targetSLS with itself. The nls only captures node-level
// CPU, so if a store that is overloaded wrt WriteBandwidth wants to
// shed to a store that is overloaded wrt CPURate, we need to permit
// that. However, the nls of the former will be less than the that of
// the latter. By looking at the nls of the target here, we are making
// sure that it is no worse than the sls of the target, since if it
// is, the node is overloaded wrt CPU due to some other store on that
// node, and we should be shedding that load first.
// comparing targetSLS with itself.
//
// Consider a node that has two stores:
// - s1 is low on CPU
// - s2 is very high on CPU, resulting in a node load summary of
// overloadSlow or overloadUrgent)
//
// In this code path, targetSLS is >= loadNoChange, so there must be
// some overload dimension in targetSLS. If it comes from write bandwidth
// (or any other non-CPU dimension), without this check,s1 might be
// considered an acceptable target for adding CPU load. But it is clearly
// not a good target, since the node housing s1 is CPU overloaded - s2
// should be shedding CPU load first.
// This example motivates the condition below. If we reach this code,
// we know that targetSLS >= loadNoChange, and we decide:
// - at sls=loadNoChange, we require nls <= loadNoChange
// - at sls=overloadSlow, we require nls <= overloadSlow
// - at sls=overloadUrgent, we require nls <= overloadUrgent.
// In other words, whenever a node level summary was "bumped up" beyond
// the target's by some other local store, we reject the change.
//
// TODO(tbg): While the example illustrates that "something had to be
// done", I don't understand why it makes sense to solve this exactly
// as it was done. The node level summary is based on node-wide CPU
// utilization as well as its distance from the mean (across the
// candidate set). Store summaries a) reflect the worst dimension, and
// b) on the CPU dimension are based on the store-apportioned capacity.
targetSLS.nls <= targetSLS.sls
if canAddLoad {
return true
Expand Down Expand Up @@ -2451,10 +2495,11 @@ func computeLoadSummary(
}
nls := loadSummaryForDimension(ctx, storeIDForLogging, ns.NodeID, CPURate, ns.adjustedCPU, ns.CapacityCPU, mnl.loadCPU, mnl.utilCPU)
return storeLoadSummary{
worstDim: worstDim,
sls: sls,
nls: nls,
dimSummary: dimSummary,
worstDim: worstDim,
sls: sls,
nls: nls,
dimSummary: dimSummary,
// TODO(tbg): remove highDiskSpaceUtilization.
highDiskSpaceUtilization: highDiskSpaceUtil,
maxFractionPendingIncrease: ss.maxFractionPendingIncrease,
maxFractionPendingDecrease: ss.maxFractionPendingDecrease,
Expand Down
Loading