Skip to content
Merged
83 changes: 60 additions & 23 deletions internal/xds/balancer/outlierdetection/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/buffer"
"google.golang.org/grpc/internal/channelz"
Expand All @@ -52,6 +53,24 @@ var (
// Name is the name of the outlier detection balancer.
const Name = "outlier_detection_experimental"

var (
ejectionsEnforcedMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
Name: "grpc.lb.outlier_detection.ejections_enforced",
Description: "EXPERIMENTAL. Number of outlier ejections enforced by detection method",
Unit: "{ejection}",
Labels: []string{"grpc.target", "grpc.lb.outlier_detection.detection_method"},
Default: false,
})

ejectionsUnenforcedMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
Name: "grpc.lb.outlier_detection.ejections_unenforced",
Description: "EXPERIMENTAL. Number of unenforced outlier ejections due to either `max_ejection_percentage` or `enforcement_percentage`",
Unit: "{ejection}",
Labels: []string{"grpc.target", "grpc.lb.outlier_detection.detection_method", "grpc.lb.outlier_detection.unenforced_reason"},
Default: false,
})
)

func init() {
balancer.Register(bb{})
}
Expand All @@ -60,14 +79,16 @@ type bb struct{}

func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
b := &outlierDetectionBalancer{
ClientConn: cc,
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
addrs: make(map[string]*endpointInfo),
scUpdateCh: buffer.NewUnbounded(),
pickerUpdateCh: buffer.NewUnbounded(),
channelzParent: bOpts.ChannelzParent,
endpoints: resolver.NewEndpointMap[*endpointInfo](),
ClientConn: cc,
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
addrs: make(map[string]*endpointInfo),
scUpdateCh: buffer.NewUnbounded(),
pickerUpdateCh: buffer.NewUnbounded(),
channelzParent: bOpts.ChannelzParent,
endpoints: resolver.NewEndpointMap[*endpointInfo](),
metricsRecorder: cc.MetricsRecorder(), // we use an explicit field instead of using cc.MetricsRecorder() so we can override the metric recorder in tests.
target: bOpts.Target.String(),
}
b.logger = prefixLogger(b)
b.logger.Infof("Created")
Expand Down Expand Up @@ -169,10 +190,12 @@ type outlierDetectionBalancer struct {
// to suppress redundant picker updates.
recentPickerNoop bool

closed *grpcsync.Event
done *grpcsync.Event
logger *grpclog.PrefixLogger
channelzParent channelz.Identifier
closed *grpcsync.Event
done *grpcsync.Event
logger *grpclog.PrefixLogger
channelzParent channelz.Identifier
metricsRecorder estats.MetricsRecorder
target string

child synchronizingBalancerWrapper

Expand Down Expand Up @@ -788,18 +811,24 @@ func (b *outlierDetectionBalancer) successRateAlgorithm() {
return
}
mean, stddev := b.meanAndStdDev(endpointsToConsider)
ejectionCfg := b.cfg.SuccessRateEjection
for _, epInfo := range endpointsToConsider {
bucket := epInfo.callCounter.inactiveBucket
ejectionCfg := b.cfg.SuccessRateEjection
if float64(b.numEndpointsEjected)/float64(b.endpoints.Len())*100 >= float64(b.cfg.MaxEjectionPercent) {
return
}
successRate := float64(bucket.numSuccesses) / float64(bucket.numSuccesses+bucket.numFailures)
requiredSuccessRate := mean - stddev*(float64(ejectionCfg.StdevFactor)/1000)
if successRate < requiredSuccessRate {
channelz.Infof(logger, b.channelzParent, "SuccessRate algorithm detected outlier: %s. Parameters: successRate=%f, mean=%f, stddev=%f, requiredSuccessRate=%f", epInfo, successRate, mean, stddev, requiredSuccessRate)
// Check if max ejection percentage would prevent ejection.
if float64(b.numEndpointsEjected)/float64(b.endpoints.Len())*100 >= float64(b.cfg.MaxEjectionPercent) {
// Record unenforced ejection due to max ejection percentage.
ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "success_rate", "max_ejection_overflow")
continue
}
if uint32(rand.Int32N(100)) < ejectionCfg.EnforcementPercentage {
b.ejectEndpoint(epInfo)
b.ejectEndpoint(epInfo, "success_rate")
} else {
// Record unenforced ejection due to enforcement percentage.
ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "success_rate", "enforcement_percentage")
}
}
}
Expand All @@ -816,24 +845,30 @@ func (b *outlierDetectionBalancer) failurePercentageAlgorithm() {
return
}

ejectionCfg := b.cfg.FailurePercentageEjection
for _, epInfo := range endpointsToConsider {
bucket := epInfo.callCounter.inactiveBucket
ejectionCfg := b.cfg.FailurePercentageEjection
if float64(b.numEndpointsEjected)/float64(b.endpoints.Len())*100 >= float64(b.cfg.MaxEjectionPercent) {
return
}
failurePercentage := (float64(bucket.numFailures) / float64(bucket.numSuccesses+bucket.numFailures)) * 100
if failurePercentage > float64(b.cfg.FailurePercentageEjection.Threshold) {
channelz.Infof(logger, b.channelzParent, "FailurePercentage algorithm detected outlier: %s, failurePercentage=%f", epInfo, failurePercentage)
// Check if max ejection percentage would prevent ejection.
if float64(b.numEndpointsEjected)/float64(b.endpoints.Len())*100 >= float64(b.cfg.MaxEjectionPercent) {
// Record unenforced ejection due to max ejection percentage.
ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "failure_percentage", "max_ejection_overflow")
continue
}
if uint32(rand.Int32N(100)) < ejectionCfg.EnforcementPercentage {
b.ejectEndpoint(epInfo)
b.ejectEndpoint(epInfo, "failure_percentage")
} else {
// Record unenforced ejection due to enforcement percentage.
ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "failure_percentage", "enforcement_percentage")
}
}
}
}

// Caller must hold b.mu.
func (b *outlierDetectionBalancer) ejectEndpoint(epInfo *endpointInfo) {
func (b *outlierDetectionBalancer) ejectEndpoint(epInfo *endpointInfo, detectionMethod string) {
b.numEndpointsEjected++
epInfo.latestEjectionTimestamp = b.timerStartTime
epInfo.ejectionTimeMultiplier++
Expand All @@ -842,6 +877,8 @@ func (b *outlierDetectionBalancer) ejectEndpoint(epInfo *endpointInfo) {
channelz.Infof(logger, b.channelzParent, "Subchannel ejected: %s", sbw)
}

// Record the enforced ejection metric.
ejectionsEnforcedMetric.Record(b.metricsRecorder, 1, b.target, detectionMethod)
}

// Caller must hold b.mu.
Expand Down
Loading