Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not return the instances from peer, if sync health check failed. #1503

Merged
merged 3 commits into from
Mar 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion datasource/etcd/cache/filter_instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/apache/servicecomb-service-center/datasource/etcd/util"
"github.com/apache/servicecomb-service-center/pkg/cache"
"github.com/apache/servicecomb-service-center/pkg/log"
util2 "github.com/apache/servicecomb-service-center/pkg/util"
)

type InstancesFilter struct {
Expand Down Expand Up @@ -91,14 +92,22 @@ func (f *InstancesFilter) findInstances(ctx context.Context, domainProject, serv
return
}

requiredProperties, _ := ctx.Value(util2.CtxRequiredInstancePropertiesOnDisco).(map[string]string)
for _, kv := range resp.Kvs {
inst := kv.Value.(*pb.MicroServiceInstance)
if inst == nil {
continue
}
if !util2.IsMapFullyMatch(inst.Properties, requiredProperties) {
continue
}
if i, ok := getOrCreateClustersIndex()[kv.ClusterName]; ok {
if kv.ModRevision > maxRevs[i] {
maxRevs[i] = kv.ModRevision
}
counts[i]++
}
instances = append(instances, kv.Value.(*pb.MicroServiceInstance))
instances = append(instances, inst)
}
return
}
Expand Down
15 changes: 8 additions & 7 deletions pkg/util/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ import "os"
type CtxKey string

const (
HeaderRev = "X-Resource-Revision"
CtxGlobal CtxKey = "global"
CtxNocache CtxKey = "noCache"
CtxCacheOnly CtxKey = "cacheOnly"
CtxRequestRevision CtxKey = "requestRev"
CtxResponseRevision CtxKey = "responseRev"
CtxEnableSync CtxKey = "enableSync"
HeaderRev = "X-Resource-Revision"
CtxGlobal CtxKey = "global"
CtxNocache CtxKey = "noCache"
CtxCacheOnly CtxKey = "cacheOnly"
CtxRequestRevision CtxKey = "requestRev"
CtxResponseRevision CtxKey = "responseRev"
CtxEnableSync CtxKey = "enableSync"
CtxRequiredInstancePropertiesOnDisco CtxKey = "requiredInstancePropertiesOnDisco"
)

func GetAppRoot() string {
Expand Down
16 changes: 16 additions & 0 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,19 @@ func GeneratePassword() (string, error) {
}
return pass, nil
}

func IsMapFullyMatch(source, required map[string]string) bool {
if len(required) == 0 {
return true
}
if len(source) < len(required) {
return false
}

for key, value := range required {
if resourceValue, exists := source[key]; !exists || resourceValue != value {
return false
}
}
return true
}
83 changes: 83 additions & 0 deletions pkg/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,86 @@ func TestGeneratePassword(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 8, len(password), password)
}

func TestIsMapFullyMatch(t *testing.T) {
type args struct {
resource map[string]string
required map[string]string
}
tests := []struct {
name string
args args
want bool
}{
{
name: "source/required nil",
args: args{},
want: true,
},
{
name: "required nil",
args: args{
resource: map[string]string{
"k1": "v1",
},
required: nil,
},
want: true,
},
{
name: "source nil",
args: args{
resource: nil,
required: map[string]string{
"k1": "v1",
},
},
want: false,
},
{
name: "source is larger",
args: args{
resource: map[string]string{
"k1": "v1",
"k2": "v2",
},
required: map[string]string{
"k1": "v1",
},
},
want: true,
},
{
name: "source is smaller",
args: args{
resource: map[string]string{
"k1": "v1",
},
required: map[string]string{
"k1": "v1",
"k2": "v2",
},
},
want: false,
},
{
name: "source equals required",
args: args{
resource: map[string]string{
"k1": "v1",
"k2": "v2",
},
required: map[string]string{
"k1": "v1",
"k2": "v2",
},
},
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, IsMapFullyMatch(tt.args.resource, tt.args.required), "IsMapFullyMatch(%v, %v)", tt.args.resource, tt.args.required)
})
}
}
8 changes: 4 additions & 4 deletions server/service/disco/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ var (
propertiesMap map[string]string
)

func getInnerProperties() map[string]string {
func GetInnerProperties() map[string]string {
once.Do(func() {
propertiesMap = config.GetStringMap("registry.instance.properties")
})
Expand Down Expand Up @@ -139,7 +139,7 @@ func appendInnerPropertiesToInstance(instance *pb.MicroServiceInstance) {
instance.Properties = make(map[string]string)
}

innerProps := getInnerProperties()
innerProps := GetInnerProperties()
if len(innerProps) <= 0 {
return
}
Expand Down Expand Up @@ -210,7 +210,7 @@ func appendInnerProperties(ctx context.Context, serviceID string, instanceID str

func shouldAppendInnerProperties(instance *pb.MicroServiceInstance) bool {
instProps := instance.Properties
innerProps := getInnerProperties()
innerProps := GetInnerProperties()
if len(innerProps) == 0 {
return false
}
Expand Down Expand Up @@ -452,7 +452,7 @@ func PutInstanceProperties(ctx context.Context, in *pb.UpdateInstancePropsReques
return pb.NewError(pb.ErrInvalidParams, err.Error())
}

properties := getInnerProperties()
properties := GetInnerProperties()
if in.Properties == nil {
in.Properties = make(map[string]string, len(properties))
}
Expand Down
5 changes: 5 additions & 0 deletions server/service/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/config"
"github.com/apache/servicecomb-service-center/server/service/disco"
"github.com/apache/servicecomb-service-center/syncer/service/admin"
)

var (
Expand All @@ -40,6 +42,9 @@ func Enable() bool {
}

func SetContext(ctx context.Context) context.Context {
if Enable() && !admin.ShouldTrustPeerServer() { // 不信任对端SC,则服务发现时,只保留在本SC注册的实例
util.SetContext(ctx, util.CtxRequiredInstancePropertiesOnDisco, disco.GetInnerProperties())
}
var val string
if Enable() {
val = "1"
Expand Down
82 changes: 82 additions & 0 deletions syncer/service/admin/check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package admin

import (
"context"
"fmt"
"time"

"github.com/go-chassis/foundation/gopool"

"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/syncer/rpc"
)

func ShouldTrustPeerServer() bool {
return globalHealthChecker.ShouldTrustPeerServer()
}

type HealthChecker struct {
checkIntervalBySecond uint
latestCheckResult *Resp
latestCheckErr error

// 为了容忍网络抖动,使用滑动窗口判断状态
failureWindow *HealthCheckWindow
// 恢复窗口,成功率要求更高,用于数据同步的恢复
recoveryWindow *HealthCheckWindow
shouldTrustPeerServer bool
}

func (h *HealthChecker) check() {
h.latestCheckResult, h.latestCheckErr = checkPeerStatus()
if h.latestCheckErr != nil || h.latestCheckResult == nil || len(h.latestCheckResult.Peers) == 0 {
h.AddResult(false)
return
}
if h.latestCheckResult.Peers[0].Status != rpc.HealthStatusConnected {
h.AddResult(false)
return
}
h.AddResult(true)
}

func (h *HealthChecker) ShouldTrustPeerServer() bool {
return h.shouldTrustPeerServer
}

func (h *HealthChecker) AddResult(pass bool) {
h.failureWindow.AddResult(pass)
h.recoveryWindow.AddResult(pass)

shouldTrustPeerServerNew := true
if h.shouldTrustPeerServer {
// 健康 > 不健康
shouldTrustPeerServerNew = h.failureWindow.IsHealthy()
} else {
// 不健康 > 健康
shouldTrustPeerServerNew = h.recoveryWindow.IsHealthy()
}
if h.shouldTrustPeerServer != shouldTrustPeerServerNew {
log.Info(fmt.Sprintf("should trust peer server changed, old: %v, new: %v", h.shouldTrustPeerServer, shouldTrustPeerServerNew))
h.shouldTrustPeerServer = shouldTrustPeerServerNew
return
}
}

func (h *HealthChecker) LatestHealthCheckResult() (*Resp, error) {
return h.latestCheckResult, h.latestCheckErr
}

func (h *HealthChecker) RunChecker() {
h.check()
gopool.Go(func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(h.checkIntervalBySecond) * time.Second):
h.check()
}
}
})
}
68 changes: 68 additions & 0 deletions syncer/service/admin/check_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package admin

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestHealthChecker_AddResult(t *testing.T) {
h := &HealthChecker{
checkIntervalBySecond: 1,
failureWindow: NewHealthCheckWindow(8, 5),
recoveryWindow: NewHealthCheckWindow(4, 2),
shouldTrustPeerServer: true,
}
// 全部true
for i := 0; i < 10; i++ {
h.AddResult(true)
assert.True(t, h.failureWindow.IsHealthy())
assert.True(t, h.recoveryWindow.IsHealthy())
assert.True(t, h.ShouldTrustPeerServer())
}

// t t t t f f f f
h.AddResult(false)
h.AddResult(false)
h.AddResult(false)
h.AddResult(false)
assert.True(t, h.failureWindow.IsHealthy())
assert.False(t, h.recoveryWindow.IsHealthy()) // recoveryWindow 首先变成失败
assert.True(t, h.ShouldTrustPeerServer()) // 结果还是健康,因为 健康 > 不健康,看 failureWindow

// t t t f f f f f
h.AddResult(false)
assert.False(t, h.failureWindow.IsHealthy())
assert.False(t, h.recoveryWindow.IsHealthy())
assert.False(t, h.ShouldTrustPeerServer()) // 不健康

// 全false
for i := 0; i < 10; i++ {
h.AddResult(false)
assert.False(t, h.failureWindow.IsHealthy())
assert.False(t, h.recoveryWindow.IsHealthy())
assert.False(t, h.ShouldTrustPeerServer())
}
assert.ElementsMatch(t, []bool{false, false, false, false, false, false, false, false}, h.failureWindow.checkPassResults)
assert.ElementsMatch(t, []bool{false, false, false, false}, h.recoveryWindow.checkPassResults)

h.AddResult(true)
h.AddResult(false)
h.AddResult(true)
h.AddResult(false)
h.AddResult(true)
h.AddResult(false)
h.AddResult(true)
h.AddResult(false)
assert.ElementsMatch(t, []bool{true, false, true, false, true, false, true, false}, h.failureWindow.checkPassResults)
assert.ElementsMatch(t, []bool{true, false, true, false}, h.recoveryWindow.checkPassResults)
assert.True(t, h.failureWindow.IsHealthy()) // failureWindow 恢复
assert.False(t, h.recoveryWindow.IsHealthy()) // 但是 recoveryWindow 还没恢复
assert.False(t, h.ShouldTrustPeerServer()) // 结果是不健康,因为不健康 > 健康,看 recoveryWindow

h.AddResult(true)
h.AddResult(true)
assert.True(t, h.failureWindow.IsHealthy())
assert.True(t, h.recoveryWindow.IsHealthy())
assert.True(t, h.ShouldTrustPeerServer()) // 结果健康
}
Loading
Loading