Skip to content

Commit 9d0571b

Browse files
authored
Merge pull request #159886 from tbg/blathers/backport-release-26.1-158473
2 parents 6c30c24 + 8347e94 commit 9d0571b

17 files changed

+912
-121
lines changed

pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go

Lines changed: 125 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,23 @@ func sortTargetCandidateSetAndPick(
509509
}
510510
slices.SortFunc(cands.candidates, func(a, b candidateInfo) int {
511511
if diversityScoresAlmostEqual(a.diversityScore, b.diversityScore) {
512+
// Note: Consider the case where the current leaseholder's LPI is
513+
// 3 (lower is better) and we have the following candidates:
514+
// - LPI=1 SLS=normal
515+
// - LPI=2 SLS=low
516+
// Currently we consider the low-SLS candidate first. This is in
517+
// contrast to the single-metric allocator, which only considers
518+
// candidates in the lowest-SLS class (i.e. wouldn't even consider
519+
// the low-SLS candidate since we have a candidate at LPI=1). If we
520+
// make the corresponding change in candidateToMoveLease, we would
521+
// match the single-metric allocator's behavior, but it's unclear
522+
// that that would be better. A good middle ground could be sorting
523+
// here by LPI first, then SLS. That should result in mma preferring
524+
// improving the lease preference, but if that is not possible, it
525+
// would settle for not making it worse (than the current
526+
// leaseholder), which the single-metric allocator won't.
527+
//
528+
// TODO(tbg): consider changing this to sort by LPI first, then SLS.
512529
return cmp.Or(cmp.Compare(a.sls, b.sls),
513530
cmp.Compare(a.leasePreferenceIndex, b.leasePreferenceIndex),
514531
cmp.Compare(a.StoreID, b.StoreID))
@@ -537,6 +554,9 @@ func sortTargetCandidateSetAndPick(
537554
}
538555
}
539556
// Diversity is the same. Include if not reaching disk capacity.
557+
// TODO(tbg): remove highDiskSpaceUtilization check here. These candidates
558+
// should instead be filtered out by retainReadyLeaseTargetStoresOnly (which
559+
// filters down the initial candidate set before computing the mean).
540560
if !cand.highDiskSpaceUtilization {
541561
cands.candidates[j] = cand
542562
j++
@@ -796,10 +816,10 @@ func (cs *clusterState) ensureAnalyzedConstraints(rstate *rangeState) {
796816
// - Need diversity change for each candidate.
797817
//
798818
// The first 3 bullets are encapsulated in the helper function
799-
// computeCandidatesForRange. It works for both replica additions and
819+
// computeCandidatesForReplicaTransfer. It works for both replica additions and
800820
// rebalancing.
801821
//
802-
// For the last bullet (diversity), the caller of computeCandidatesForRange
822+
// For the last bullet (diversity), the caller of computeCandidatesForReplicaTransfer
803823
// needs to populate candidateInfo.diversityScore for each candidate in
804824
// candidateSet. It does so via diversityScoringMemo. Then the (loadSummary,
805825
// diversityScore) pair can be used to order candidates for attempts to add.
@@ -827,41 +847,125 @@ func (cs *clusterState) ensureAnalyzedConstraints(rstate *rangeState) {
827847

828848
// loadSheddingStore is only specified if this candidate computation is
829849
// happening because of overload.
830-
func (cs *clusterState) computeCandidatesForRange(
850+
//
851+
// postMeansExclusions are filtered post-means: their load is included in the
852+
// mean (they're viable locations in principle) but they're not candidates for
853+
// this specific transfer (the classic case: already have a replica).
854+
func (cs *clusterState) computeCandidatesForReplicaTransfer(
831855
ctx context.Context,
832-
expr constraintsDisj,
833-
storesToExclude storeSet,
856+
conj constraintsConj,
857+
existingReplicas storeSet,
858+
postMeansExclusions storeSet,
834859
loadSheddingStore roachpb.StoreID,
835860
passObs *rebalancingPassMetricsAndLogger,
836861
) (_ candidateSet, sheddingSLS storeLoadSummary) {
837-
means := cs.meansMemo.getMeans(expr)
838-
if loadSheddingStore > 0 {
839-
sheddingSS := cs.stores[loadSheddingStore]
840-
sheddingSLS = cs.meansMemo.getStoreLoadSummary(ctx, means, loadSheddingStore, sheddingSS.loadSeqNum)
841-
if sheddingSLS.sls <= loadNoChange && sheddingSLS.nls <= loadNoChange {
842-
// In this set of stores, this store no longer looks overloaded.
843-
passObs.replicaShed(notOverloaded)
844-
return candidateSet{}, sheddingSLS
845-
}
862+
// Start with computing the stores (and corresponding means) that satisfy
863+
// the constraint expression. If we don't see a need to filter out any of
864+
// these stores before computing the means, we can use it verbatim, otherwise
865+
// we will recompute the means again below.
866+
cs.scratchDisj[0] = conj
867+
means := cs.meansMemo.getMeans(cs.scratchDisj[:1])
868+
869+
// Pre-means filtering: copy to scratch, then filter in place.
870+
// Filter out stores that have a non-OK replica disposition.
871+
cs.scratchStoreSet = append(cs.scratchStoreSet[:0], means.stores...)
872+
filteredStores := retainReadyReplicaTargetStoresOnly(ctx, cs.scratchStoreSet, cs.stores, existingReplicas)
873+
874+
// Determine which means to use.
875+
//
876+
// TODO(tbg): unit testing.
877+
var effectiveMeans *meansLoad
878+
if len(filteredStores) == len(means.stores) {
879+
// Common case: nothing was filtered, use cached means.
880+
effectiveMeans = &means.meansLoad
881+
} else if len(filteredStores) == 0 {
882+
// No viable candidates at all.
883+
return candidateSet{}, sheddingSLS
884+
} else {
885+
// Some stores were filtered; recompute means over filtered set.
886+
cs.scratchMeans = computeMeansForStoreSet(
887+
cs, filteredStores, cs.meansMemo.scratchNodes, cs.meansMemo.scratchStores)
888+
effectiveMeans = &cs.scratchMeans
889+
log.KvDistribution.VEventf(ctx, 2,
890+
"pre-means filtered %d stores → remaining %v, means: store=%v node=%v",
891+
len(means.stores)-len(filteredStores), filteredStores,
892+
effectiveMeans.storeLoad, effectiveMeans.nodeLoad)
846893
}
847-
// We only filter out stores that are not fdOK. The rest of the filtering
848-
// happens later.
894+
895+
sheddingSLS = cs.computeLoadSummary(ctx, loadSheddingStore, &effectiveMeans.storeLoad, &effectiveMeans.nodeLoad)
896+
if sheddingSLS.sls <= loadNoChange && sheddingSLS.nls <= loadNoChange {
897+
// In this set of stores, this store no longer looks overloaded.
898+
passObs.replicaShed(notOverloaded)
899+
return candidateSet{}, sheddingSLS
900+
}
901+
849902
var cset candidateSet
850-
for _, storeID := range means.stores {
851-
if storesToExclude.contains(storeID) {
903+
for _, storeID := range filteredStores {
904+
if postMeansExclusions.contains(storeID) {
905+
// This store's load is included in the mean, but it's not a viable
906+
// target for this specific transfer (e.g. it already has a replica).
852907
continue
853908
}
854-
ss := cs.stores[storeID]
855-
csls := cs.meansMemo.getStoreLoadSummary(ctx, means, storeID, ss.loadSeqNum)
909+
csls := cs.computeLoadSummary(ctx, storeID, &effectiveMeans.storeLoad, &effectiveMeans.nodeLoad)
856910
cset.candidates = append(cset.candidates, candidateInfo{
857911
StoreID: storeID,
858912
storeLoadSummary: csls,
859913
})
860914
}
861-
cset.means = &means.meansLoad
915+
cset.means = effectiveMeans
862916
return cset, sheddingSLS
863917
}
864918

919+
// retainReadyReplicaTargetStoresOnly filters the input set to only those stores
920+
// that are ready to accept a replica. A store is not ready if it has a non-OK
921+
// replica disposition. In practice, the input set is already filtered by
922+
// constraints.
923+
//
924+
// Stores already housing a replica (on top of being in the input storeSet)
925+
// bypass this disposition check since they already have the replica - its load
926+
// should be in the mean regardless of its disposition, as we'll pick candidates
927+
// based on improving clustering around the mean.
928+
//
929+
// The input storeSet is mutated (and returned as the result).
930+
func retainReadyReplicaTargetStoresOnly(
931+
ctx context.Context,
932+
in storeSet,
933+
stores map[roachpb.StoreID]*storeState,
934+
existingReplicas storeSet,
935+
) storeSet {
936+
out := in[:0]
937+
for _, storeID := range in {
938+
if existingReplicas.contains(storeID) {
939+
// Stores on existing replicas already have the load and we want to
940+
// include them in the mean, even if they are not accepting new replicas
941+
// or even try to shed.
942+
//
943+
// TODO(tbg): health might play into this, though. For example, when
944+
// a store is dead, whatever load we have from it is stale and we
945+
// are better off not including it. For now, we ignore this problem
946+
// because the mma only handles rebalancing, whereas a replica on a
947+
// dead store would be removed by the single-metric allocator after
948+
// the TimeUntilStoreDead and so would disappear from our view.
949+
out = append(out, storeID)
950+
continue
951+
}
952+
ss := stores[storeID]
953+
switch {
954+
case ss.status.Disposition.Replica != ReplicaDispositionOK:
955+
log.KvDistribution.VEventf(ctx, 2, "skipping s%d for replica transfer: replica disposition %v (health %v)", storeID, ss.status.Disposition.Replica, ss.status.Health)
956+
case highDiskSpaceUtilization(ss.reportedLoad[ByteSize], ss.capacity[ByteSize]):
957+
// TODO(tbg): remove this from mma and just let the caller set this
958+
// disposition based on the following cluster settings:
959+
// - kv.allocator.max_disk_utilization_threshold
960+
// - kv.allocator.rebalance_to_max_disk_utilization_threshold
961+
log.KvDistribution.VEventf(ctx, 2, "skipping s%d for replica transfer: high disk utilization (health %v)", storeID, ss.status.Health)
962+
default:
963+
out = append(out, storeID)
964+
}
965+
}
966+
return out
967+
}
968+
865969
// Diversity scoring is very amenable to caching, since the set of unique
866970
// locality tiers for range replicas is likely to be small. And the cache does
867971
// not need to be cleared after every allocator pass. This caching is done via

pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go

Lines changed: 81 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1262,6 +1262,9 @@ type clusterState struct {
12621262
ranges map[roachpb.RangeID]*rangeState
12631263

12641264
scratchRangeMap map[roachpb.RangeID]struct{}
1265+
scratchStoreSet storeSet // scratch space for pre-means filtering
1266+
scratchMeans meansLoad // scratch space for recomputed means
1267+
scratchDisj [1]constraintsConj // scratch space for getMeans call
12651268

12661269
// Added to when a change is proposed. Will also add to corresponding
12671270
// rangeState.pendingChanges and to the affected storeStates.
@@ -1619,6 +1622,7 @@ func (cs *clusterState) processStoreLeaseholderMsgInternal(
16191622
topk.dim = WriteBandwidth
16201623
}
16211624
if sls.highDiskSpaceUtilization {
1625+
// If disk space is running out, shedding bytes becomes the top priority.
16221626
topk.dim = ByteSize
16231627
} else if sls.sls > loadNoChange {
16241628
// If multiple dimensions are contributing the same loadSummary, we will pick
@@ -2187,15 +2191,42 @@ func (cs *clusterState) getNodeReportedLoad(nodeID roachpb.NodeID) *NodeLoad {
21872191
return nil
21882192
}
21892193

2190-
// canShedAndAddLoad returns true if the delta can be added to the target
2191-
// store and removed from the src store, such that the relative load summaries
2192-
// will not get worse.
2194+
// canShedAndAddLoad returns true if the delta can be added to the target store
2195+
// and removed from the src store. It does not change any state between the call
2196+
// and return.
21932197
//
2194-
// It does not change any state between the call and return.
2198+
// overloadDim represents the dimension that is overloaded in the source and the
2199+
// function requires that along that dimension, the target is < loadNoChange and
2200+
// the source is > loadNoChange.
21952201
//
2196-
// overloadDim represents the dimension that is overloaded in the source and
2197-
// the function requires that the target must be currently < loadNoChange
2198-
// along that dimension.
2202+
// Broadly speaking, the method tries to ascertain that the target wouldn't be
2203+
// worse off than the source following the transfer. To do this, the method
2204+
// looks at a load summary for the target that would result from the load
2205+
// transfer (targetLoadSummary).
2206+
//
2207+
// When onlyConsiderTargetCPUSummary is true, the targetLoadSummary derives from
2208+
// the target's post-transfer CPU dimension only. This is appropriate when a lease is
2209+
// transferred, as this should only affect the CPU dimension, and we don't want
2210+
// lease transfers to be subject to stricter checks related to other dimensions.
2211+
// When onlyConsiderTargetCPUSummary is false, targetLoadSummary is the target's
2212+
// worst post-transfer load summary. In both cases, the node load summary is also
2213+
// considered.
2214+
//
2215+
// TODO(tbg): understand and explain why the node load summary is in the mix here.
2216+
//
2217+
// In either case, if the targetLoadSummary is < loadNoChange, the change is
2218+
// permitted right away. Otherwise, stricter checks apply: After the transfer,
2219+
// - the target must not be overloadUrgent,
2220+
// - the target has no pending changes (to delay making a potentially non-ideal
2221+
// choice of the target),
2222+
// - the target's overloaded dimension's summary must not be worse than the
2223+
// source's ("overloadedDimPermitsChange"),
2224+
// - along each of the other (!=overloadeDim) dimensions, the percentage
2225+
// increase in load is at most a third of that of the overloaded dimension.
2226+
// (e.g. if CPU goes up by 30%, WriteBandwidth can go up by at most 10%).
2227+
// - the target's node load summary must not be worse than the target's store
2228+
// load summary. See inline comment for more details.
2229+
21992230
func (cs *clusterState) canShedAndAddLoad(
22002231
ctx context.Context,
22012232
srcSS *storeState,
@@ -2212,6 +2243,11 @@ func (cs *clusterState) canShedAndAddLoad(
22122243
// the load delta addition flips the loadSummary for either the target or the
22132244
// source, which suggests it might be useful to add this to verbose logging.
22142245

2246+
// Compute srcSLS and targetSLS, which are the load summaries of the source
2247+
// and target that would result from moving the lease.
2248+
//
2249+
// TODO(tbg): extract this into a helper and set it up so that it doesn't
2250+
// temporarily modify the cluster state.
22152251
targetNS := cs.nodes[targetSS.NodeID]
22162252
// Add the delta.
22172253
deltaToAdd := loadVectorToAdd(delta)
@@ -2250,28 +2286,16 @@ func (cs *clusterState) canShedAndAddLoad(
22502286
reason.WriteString("targetSLS.highDiskSpaceUtilization")
22512287
return false
22522288
}
2253-
// We define targetSummary as a summarization across all dimensions of the
2254-
// target. A targetSummary < loadNoChange always accepts the change. When
2255-
// the targetSummary >= loadNoChange, we are stricter and require both that
2256-
// there are no pending changes in the target, and the target is "not worse"
2257-
// in a way that will cause thrashing, where the details are defined below.
2258-
// The no pending changes requirement is to delay making a potentially
2259-
// non-ideal choice of the target.
2260-
//
2261-
// NB: The target's overload dimension summary must have been <
2262-
// loadNoChange, and the source must have been > loadNoChange.
2289+
2290+
// We define targetSummary as a "worst" of the considered load dimesions
2291+
// (only CPU, or all).
22632292
var targetSummary loadSummary
22642293
if onlyConsiderTargetCPUSummary {
22652294
targetSummary = targetSLS.dimSummary[CPURate]
2266-
if targetSummary < targetSLS.nls {
2267-
targetSummary = targetSLS.nls
2268-
}
22692295
} else {
22702296
targetSummary = targetSLS.sls
2271-
if targetSummary < targetSLS.nls {
2272-
targetSummary = targetSLS.nls
2273-
}
22742297
}
2298+
targetSummary = max(targetSummary, targetSLS.nls)
22752299

22762300
if targetSummary < loadNoChange {
22772301
return true
@@ -2280,6 +2304,7 @@ func (cs *clusterState) canShedAndAddLoad(
22802304
reason.WriteString("overloadUrgent")
22812305
return false
22822306
}
2307+
22832308
// Need to consider additional factors.
22842309
//
22852310
// It is possible that both are overloadSlow in aggregate. We want to make
@@ -2308,7 +2333,7 @@ func (cs *clusterState) canShedAndAddLoad(
23082333
// That boolean predicate can also be too strict, in that we should permit
23092334
// transitions to overloadSlow along one dimension, to allow for an
23102335
// exchange.
2311-
overloadedDimFractionIncrease := math.MaxFloat64
2336+
var overloadedDimFractionIncrease float64
23122337
if targetSS.adjusted.load[overloadedDim] > 0 {
23132338
overloadedDimFractionIncrease = float64(deltaToAdd[overloadedDim]) /
23142339
float64(targetSS.adjusted.load[overloadedDim])
@@ -2349,14 +2374,33 @@ func (cs *clusterState) canShedAndAddLoad(
23492374
targetSLS.maxFractionPendingIncrease < epsilon &&
23502375
targetSLS.maxFractionPendingDecrease < epsilon &&
23512376
// NB: targetSLS.nls <= targetSLS.sls is not a typo, in that we are
2352-
// comparing targetSLS with itself. The nls only captures node-level
2353-
// CPU, so if a store that is overloaded wrt WriteBandwidth wants to
2354-
// shed to a store that is overloaded wrt CPURate, we need to permit
2355-
// that. However, the nls of the former will be less than the that of
2356-
// the latter. By looking at the nls of the target here, we are making
2357-
// sure that it is no worse than the sls of the target, since if it
2358-
// is, the node is overloaded wrt CPU due to some other store on that
2359-
// node, and we should be shedding that load first.
2377+
// comparing targetSLS with itself.
2378+
//
2379+
// Consider a node that has two stores:
2380+
// - s1 is low on CPU
2381+
// - s2 is very high on CPU, resulting in a node load summary of
2382+
// overloadSlow or overloadUrgent)
2383+
//
2384+
// In this code path, targetSLS is >= loadNoChange, so there must be
2385+
// some overload dimension in targetSLS. If it comes from write bandwidth
2386+
// (or any other non-CPU dimension), without this check,s1 might be
2387+
// considered an acceptable target for adding CPU load. But it is clearly
2388+
// not a good target, since the node housing s1 is CPU overloaded - s2
2389+
// should be shedding CPU load first.
2390+
// This example motivates the condition below. If we reach this code,
2391+
// we know that targetSLS >= loadNoChange, and we decide:
2392+
// - at sls=loadNoChange, we require nls <= loadNoChange
2393+
// - at sls=overloadSlow, we require nls <= overloadSlow
2394+
// - at sls=overloadUrgent, we require nls <= overloadUrgent.
2395+
// In other words, whenever a node level summary was "bumped up" beyond
2396+
// the target's by some other local store, we reject the change.
2397+
//
2398+
// TODO(tbg): While the example illustrates that "something had to be
2399+
// done", I don't understand why it makes sense to solve this exactly
2400+
// as it was done. The node level summary is based on node-wide CPU
2401+
// utilization as well as its distance from the mean (across the
2402+
// candidate set). Store summaries a) reflect the worst dimension, and
2403+
// b) on the CPU dimension are based on the store-apportioned capacity.
23602404
targetSLS.nls <= targetSLS.sls
23612405
if canAddLoad {
23622406
return true
@@ -2447,10 +2491,11 @@ func computeLoadSummary(
24472491
}
24482492
nls := loadSummaryForDimension(ctx, storeIDForLogging, ns.NodeID, CPURate, ns.adjustedCPU, ns.CapacityCPU, mnl.loadCPU, mnl.utilCPU)
24492493
return storeLoadSummary{
2450-
worstDim: worstDim,
2451-
sls: sls,
2452-
nls: nls,
2453-
dimSummary: dimSummary,
2494+
worstDim: worstDim,
2495+
sls: sls,
2496+
nls: nls,
2497+
dimSummary: dimSummary,
2498+
// TODO(tbg): remove highDiskSpaceUtilization.
24542499
highDiskSpaceUtilization: highDiskSpaceUtil,
24552500
maxFractionPendingIncrease: ss.maxFractionPendingIncrease,
24562501
maxFractionPendingDecrease: ss.maxFractionPendingDecrease,

0 commit comments

Comments
 (0)