diff --git a/datasource/etcd/cache/filter_instances.go b/datasource/etcd/cache/filter_instances.go index 9b49c2c57..c2ed4288a 100644 --- a/datasource/etcd/cache/filter_instances.go +++ b/datasource/etcd/cache/filter_instances.go @@ -29,6 +29,7 @@ import ( "github.com/apache/servicecomb-service-center/datasource/etcd/util" "github.com/apache/servicecomb-service-center/pkg/cache" "github.com/apache/servicecomb-service-center/pkg/log" + util2 "github.com/apache/servicecomb-service-center/pkg/util" ) type InstancesFilter struct { @@ -91,14 +92,22 @@ func (f *InstancesFilter) findInstances(ctx context.Context, domainProject, serv return } + requiredProperties, _ := ctx.Value(util2.CtxRequiredInstancePropertiesOnDisco).(map[string]string) for _, kv := range resp.Kvs { + inst := kv.Value.(*pb.MicroServiceInstance) + if inst == nil { + continue + } + if !util2.IsMapFullyMatch(inst.Properties, requiredProperties) { + continue + } if i, ok := getOrCreateClustersIndex()[kv.ClusterName]; ok { if kv.ModRevision > maxRevs[i] { maxRevs[i] = kv.ModRevision } counts[i]++ } - instances = append(instances, kv.Value.(*pb.MicroServiceInstance)) + instances = append(instances, inst) } return } diff --git a/pkg/util/common.go b/pkg/util/common.go index b9661f9ac..3de7dcb9d 100644 --- a/pkg/util/common.go +++ b/pkg/util/common.go @@ -22,13 +22,14 @@ import "os" type CtxKey string const ( - HeaderRev = "X-Resource-Revision" - CtxGlobal CtxKey = "global" - CtxNocache CtxKey = "noCache" - CtxCacheOnly CtxKey = "cacheOnly" - CtxRequestRevision CtxKey = "requestRev" - CtxResponseRevision CtxKey = "responseRev" - CtxEnableSync CtxKey = "enableSync" + HeaderRev = "X-Resource-Revision" + CtxGlobal CtxKey = "global" + CtxNocache CtxKey = "noCache" + CtxCacheOnly CtxKey = "cacheOnly" + CtxRequestRevision CtxKey = "requestRev" + CtxResponseRevision CtxKey = "responseRev" + CtxEnableSync CtxKey = "enableSync" + CtxRequiredInstancePropertiesOnDisco CtxKey = "requiredInstancePropertiesOnDisco" ) func GetAppRoot() string { diff --git a/pkg/util/util.go b/pkg/util/util.go index 525518697..391a32fbf 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -194,3 +194,19 @@ func GeneratePassword() (string, error) { } return pass, nil } + +func IsMapFullyMatch(source, required map[string]string) bool { + if len(required) == 0 { + return true + } + if len(source) < len(required) { + return false + } + + for key, value := range required { + if resourceValue, exists := source[key]; !exists || resourceValue != value { + return false + } + } + return true +} diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index 4950ea5b3..56b551f70 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -184,3 +184,86 @@ func TestGeneratePassword(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 8, len(password), password) } + +func TestIsMapFullyMatch(t *testing.T) { + type args struct { + resource map[string]string + required map[string]string + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "source/required nil", + args: args{}, + want: true, + }, + { + name: "required nil", + args: args{ + resource: map[string]string{ + "k1": "v1", + }, + required: nil, + }, + want: true, + }, + { + name: "source nil", + args: args{ + resource: nil, + required: map[string]string{ + "k1": "v1", + }, + }, + want: false, + }, + { + name: "source is larger", + args: args{ + resource: map[string]string{ + "k1": "v1", + "k2": "v2", + }, + required: map[string]string{ + "k1": "v1", + }, + }, + want: true, + }, + { + name: "source is smaller", + args: args{ + resource: map[string]string{ + "k1": "v1", + }, + required: map[string]string{ + "k1": "v1", + "k2": "v2", + }, + }, + want: false, + }, + { + name: "source equals required", + args: args{ + resource: map[string]string{ + "k1": "v1", + "k2": "v2", + }, + required: map[string]string{ + "k1": "v1", + "k2": "v2", + }, + }, + want: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equalf(t, tt.want, IsMapFullyMatch(tt.args.resource, tt.args.required), "IsMapFullyMatch(%v, %v)", tt.args.resource, tt.args.required) + }) + } +} diff --git a/server/service/disco/instance.go b/server/service/disco/instance.go index 992ff8b67..bb24b69d3 100644 --- a/server/service/disco/instance.go +++ b/server/service/disco/instance.go @@ -50,7 +50,7 @@ var ( propertiesMap map[string]string ) -func getInnerProperties() map[string]string { +func GetInnerProperties() map[string]string { once.Do(func() { propertiesMap = config.GetStringMap("registry.instance.properties") }) @@ -139,7 +139,7 @@ func appendInnerPropertiesToInstance(instance *pb.MicroServiceInstance) { instance.Properties = make(map[string]string) } - innerProps := getInnerProperties() + innerProps := GetInnerProperties() if len(innerProps) <= 0 { return } @@ -210,7 +210,7 @@ func appendInnerProperties(ctx context.Context, serviceID string, instanceID str func shouldAppendInnerProperties(instance *pb.MicroServiceInstance) bool { instProps := instance.Properties - innerProps := getInnerProperties() + innerProps := GetInnerProperties() if len(innerProps) == 0 { return false } @@ -452,7 +452,7 @@ func PutInstanceProperties(ctx context.Context, in *pb.UpdateInstancePropsReques return pb.NewError(pb.ErrInvalidParams, err.Error()) } - properties := getInnerProperties() + properties := GetInnerProperties() if in.Properties == nil { in.Properties = make(map[string]string, len(properties)) } diff --git a/server/service/sync/sync.go b/server/service/sync/sync.go index 5b6f26e09..e327c6357 100644 --- a/server/service/sync/sync.go +++ b/server/service/sync/sync.go @@ -23,6 +23,8 @@ import ( "github.com/apache/servicecomb-service-center/pkg/util" "github.com/apache/servicecomb-service-center/server/config" + "github.com/apache/servicecomb-service-center/server/service/disco" + "github.com/apache/servicecomb-service-center/syncer/service/admin" ) var ( @@ -40,6 +42,9 @@ func Enable() bool { } func SetContext(ctx context.Context) context.Context { + if Enable() && !admin.ShouldTrustPeerServer() { // 不信任对端SC,则服务发现时,只保留在本SC注册的实例 + util.SetContext(ctx, util.CtxRequiredInstancePropertiesOnDisco, disco.GetInnerProperties()) + } var val string if Enable() { val = "1" diff --git a/syncer/service/admin/check.go b/syncer/service/admin/check.go new file mode 100644 index 000000000..3934631a4 --- /dev/null +++ b/syncer/service/admin/check.go @@ -0,0 +1,82 @@ +package admin + +import ( + "context" + "fmt" + "time" + + "github.com/go-chassis/foundation/gopool" + + "github.com/apache/servicecomb-service-center/pkg/log" + "github.com/apache/servicecomb-service-center/syncer/rpc" +) + +func ShouldTrustPeerServer() bool { + return globalHealthChecker.ShouldTrustPeerServer() +} + +type HealthChecker struct { + checkIntervalBySecond uint + latestCheckResult *Resp + latestCheckErr error + + // 为了容忍网络抖动,使用滑动窗口判断状态 + failureWindow *HealthCheckWindow + // 恢复窗口,成功率要求更高,用于数据同步的恢复 + recoveryWindow *HealthCheckWindow + shouldTrustPeerServer bool +} + +func (h *HealthChecker) check() { + h.latestCheckResult, h.latestCheckErr = checkPeerStatus() + if h.latestCheckErr != nil || h.latestCheckResult == nil || len(h.latestCheckResult.Peers) == 0 { + h.AddResult(false) + return + } + if h.latestCheckResult.Peers[0].Status != rpc.HealthStatusConnected { + h.AddResult(false) + return + } + h.AddResult(true) +} + +func (h *HealthChecker) ShouldTrustPeerServer() bool { + return h.shouldTrustPeerServer +} + +func (h *HealthChecker) AddResult(pass bool) { + h.failureWindow.AddResult(pass) + h.recoveryWindow.AddResult(pass) + + shouldTrustPeerServerNew := true + if h.shouldTrustPeerServer { + // 健康 > 不健康 + shouldTrustPeerServerNew = h.failureWindow.IsHealthy() + } else { + // 不健康 > 健康 + shouldTrustPeerServerNew = h.recoveryWindow.IsHealthy() + } + if h.shouldTrustPeerServer != shouldTrustPeerServerNew { + log.Info(fmt.Sprintf("should trust peer server changed, old: %v, new: %v", h.shouldTrustPeerServer, shouldTrustPeerServerNew)) + h.shouldTrustPeerServer = shouldTrustPeerServerNew + return + } +} + +func (h *HealthChecker) LatestHealthCheckResult() (*Resp, error) { + return h.latestCheckResult, h.latestCheckErr +} + +func (h *HealthChecker) RunChecker() { + h.check() + gopool.Go(func(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-time.After(time.Duration(h.checkIntervalBySecond) * time.Second): + h.check() + } + } + }) +} diff --git a/syncer/service/admin/check_test.go b/syncer/service/admin/check_test.go new file mode 100644 index 000000000..f1a22dc9d --- /dev/null +++ b/syncer/service/admin/check_test.go @@ -0,0 +1,68 @@ +package admin + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestHealthChecker_AddResult(t *testing.T) { + h := &HealthChecker{ + checkIntervalBySecond: 1, + failureWindow: NewHealthCheckWindow(8, 5), + recoveryWindow: NewHealthCheckWindow(4, 2), + shouldTrustPeerServer: true, + } + // 全部true + for i := 0; i < 10; i++ { + h.AddResult(true) + assert.True(t, h.failureWindow.IsHealthy()) + assert.True(t, h.recoveryWindow.IsHealthy()) + assert.True(t, h.ShouldTrustPeerServer()) + } + + // t t t t f f f f + h.AddResult(false) + h.AddResult(false) + h.AddResult(false) + h.AddResult(false) + assert.True(t, h.failureWindow.IsHealthy()) + assert.False(t, h.recoveryWindow.IsHealthy()) // recoveryWindow 首先变成失败 + assert.True(t, h.ShouldTrustPeerServer()) // 结果还是健康,因为 健康 > 不健康,看 failureWindow + + // t t t f f f f f + h.AddResult(false) + assert.False(t, h.failureWindow.IsHealthy()) + assert.False(t, h.recoveryWindow.IsHealthy()) + assert.False(t, h.ShouldTrustPeerServer()) // 不健康 + + // 全false + for i := 0; i < 10; i++ { + h.AddResult(false) + assert.False(t, h.failureWindow.IsHealthy()) + assert.False(t, h.recoveryWindow.IsHealthy()) + assert.False(t, h.ShouldTrustPeerServer()) + } + assert.ElementsMatch(t, []bool{false, false, false, false, false, false, false, false}, h.failureWindow.checkPassResults) + assert.ElementsMatch(t, []bool{false, false, false, false}, h.recoveryWindow.checkPassResults) + + h.AddResult(true) + h.AddResult(false) + h.AddResult(true) + h.AddResult(false) + h.AddResult(true) + h.AddResult(false) + h.AddResult(true) + h.AddResult(false) + assert.ElementsMatch(t, []bool{true, false, true, false, true, false, true, false}, h.failureWindow.checkPassResults) + assert.ElementsMatch(t, []bool{true, false, true, false}, h.recoveryWindow.checkPassResults) + assert.True(t, h.failureWindow.IsHealthy()) // failureWindow 恢复 + assert.False(t, h.recoveryWindow.IsHealthy()) // 但是 recoveryWindow 还没恢复 + assert.False(t, h.ShouldTrustPeerServer()) // 结果是不健康,因为不健康 > 健康,看 recoveryWindow + + h.AddResult(true) + h.AddResult(true) + assert.True(t, h.failureWindow.IsHealthy()) + assert.True(t, h.recoveryWindow.IsHealthy()) + assert.True(t, h.ShouldTrustPeerServer()) // 结果健康 +} diff --git a/syncer/service/admin/health.go b/syncer/service/admin/health.go index c47ae4204..134ac1a5b 100644 --- a/syncer/service/admin/health.go +++ b/syncer/service/admin/health.go @@ -30,6 +30,8 @@ import ( "google.golang.org/grpc/status" "github.com/apache/servicecomb-service-center/client" + "github.com/apache/servicecomb-service-center/datasource/etcd/event" + "github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore" "github.com/apache/servicecomb-service-center/pkg/log" pkgrpc "github.com/apache/servicecomb-service-center/pkg/rpc" "github.com/apache/servicecomb-service-center/server/plugin/security/cipher" @@ -43,11 +45,16 @@ import ( const ( scheme = "grpc" serviceName = "syncer" + + // eventPushCheckIntervalBySecond is suggested to be checkPeerHealthIntervalBySecond*1/3 + checkPeerHealthIntervalBySecond = 15 + eventPushCheckIntervalBySecond = 5 ) var ( - peerInfos []*PeerInfo - ErrConfigIsEmpty = errors.New("sync config is empty") + peerInfos []*PeerInfo + ErrConfigIsEmpty = errors.New("sync config is empty") + globalHealthChecker *HealthChecker ) type Resp struct { @@ -98,9 +105,21 @@ func Init() { } peerInfos = append(peerInfos, &PeerInfo{Peer: p, ClientConn: conn}) } + + globalHealthChecker = &HealthChecker{ + checkIntervalBySecond: checkPeerHealthIntervalBySecond, + // 失败阈值,最近8次检查,5次失败即视为不健康,120s。 + failureWindow: NewHealthCheckWindow(8, 5), + // 恢复阈值,最近6次检查,最多2次失败即视为不健康,即最多1次失败,90s。 + recoveryWindow: NewHealthCheckWindow(6, 2), + shouldTrustPeerServer: true, // 默认信任对端,只有通过检查,确认对端无法连接,即两个SC同步异常,才认为对端数据不可信任 + } + globalHealthChecker.RunChecker() + h := NewInstanceEventHandler(globalHealthChecker, eventPushCheckIntervalBySecond*time.Second, event.NewInstanceEventHandler()) + kvstore.AddEventHandler(h) } -func Health() (*Resp, error) { +func checkPeerStatus() (*Resp, error) { if len(peerInfos) <= 0 { return nil, ErrConfigIsEmpty } @@ -124,6 +143,10 @@ func Health() (*Resp, error) { return resp, nil } +func Health() (*Resp, error) { + return checkPeerStatus() +} + func getPeerStatus(peerInfo *PeerInfo) string { if peerInfo.ClientConn == nil { log.Warn("clientConn is nil") diff --git a/syncer/service/admin/instance_event_handler.go b/syncer/service/admin/instance_event_handler.go new file mode 100644 index 000000000..928f055a8 --- /dev/null +++ b/syncer/service/admin/instance_event_handler.go @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package admin + +import ( + "context" + "fmt" + "sync" + "time" + + pb "github.com/go-chassis/cari/discovery" + "github.com/go-chassis/foundation/gopool" + + "github.com/apache/servicecomb-service-center/datasource" + "github.com/apache/servicecomb-service-center/datasource/etcd/sd" + "github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore" + "github.com/apache/servicecomb-service-center/pkg/log" + "github.com/apache/servicecomb-service-center/pkg/util" + "github.com/apache/servicecomb-service-center/server/service/disco" +) + +// InstanceEventHandler 对端SC同步异常期间,无法查询到在对端SC注册的实例,因此这期间对端SC的实例的事件,对客户端而言是无效事件, +// 需要在对端SC同步恢复后,处理一次 +type InstanceEventHandler struct { + handler kvstore.EventHandler + interval time.Duration + healthChecker *HealthChecker + + events map[string]kvstore.Event + eventsLock sync.Mutex +} + +func (h *InstanceEventHandler) Type() kvstore.Type { + return sd.TypeInstance +} + +func (h *InstanceEventHandler) OnEvent(evt kvstore.Event) { + if h.healthChecker.ShouldTrustPeerServer() { + return + } + instance, ignore := checkShouldIgnoreEvent(evt) + if ignore { + return + } + h.add(evt, instance) +} + +func (h *InstanceEventHandler) exportAndClearEvents() []kvstore.Event { + h.eventsLock.Lock() + defer h.eventsLock.Unlock() + result := make([]kvstore.Event, 0, len(h.events)) + for _, evt := range h.events { + result = append(result, evt) + } + h.events = make(map[string]kvstore.Event) + return result +} + +func (h *InstanceEventHandler) add(evt kvstore.Event, instance *pb.MicroServiceInstance) { + h.eventsLock.Lock() + defer h.eventsLock.Unlock() + _, ok := h.events[instance.ServiceId] // 每个服务仅保留一个事件 + if !ok { + evt.Type = pb.EVT_UPDATE // 统一改为刷新事件,避免影响配额 + h.events[instance.ServiceId] = evt + } +} + +func (h *InstanceEventHandler) run() { + gopool.Go(func(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-time.After(h.interval): + if !h.healthChecker.ShouldTrustPeerServer() { + continue + } + // 对端SC恢复后,再重新处理事件 + events := h.exportAndClearEvents() + h.handleAgain(events) + } + } + }) +} +func (h *InstanceEventHandler) handleAgain(events []kvstore.Event) { + for _, evt := range events { + h.handler.OnEvent(evt) + } +} + +func NewInstanceEventHandler(healthChecker *HealthChecker, interval time.Duration, handler kvstore.EventHandler) *InstanceEventHandler { + h := &InstanceEventHandler{ + handler: handler, + events: make(map[string]kvstore.Event), + eventsLock: sync.Mutex{}, + interval: interval, + healthChecker: healthChecker, + } + h.run() + return h +} + +func checkShouldIgnoreEvent(evt kvstore.Event) (storedInstance *pb.MicroServiceInstance, ignore bool) { + action := evt.Type + // 初始化是内部事件,忽略 + // 本来就查不到,推了删除事件后还是查不到,因此删除事件也忽略 + if action == pb.EVT_INIT || action == pb.EVT_DELETE { + return nil, true + } + instance, ok := evt.KV.Value.(*pb.MicroServiceInstance) + if !ok { + log.Error("failed to assert microServiceInstance", datasource.ErrAssertFail) + return nil, true + } + // 连接在本SC的实例,忽略 + if util.IsMapFullyMatch(instance.Properties, disco.GetInnerProperties()) { + return nil, true + } + log.Info(fmt.Sprintf("caught [%s] service[%s] instance[%s] event, endpoints %v, will handle it again when peer sc recovery", + action, instance.ServiceId, instance.InstanceId, instance.Endpoints)) + return instance, false +} diff --git a/syncer/service/admin/instance_event_handler_test.go b/syncer/service/admin/instance_event_handler_test.go new file mode 100644 index 000000000..fd4ac0d16 --- /dev/null +++ b/syncer/service/admin/instance_event_handler_test.go @@ -0,0 +1,158 @@ +package admin + +import ( + "sync" + "testing" + "time" + + pb "github.com/go-chassis/cari/discovery" + "github.com/stretchr/testify/assert" + + "github.com/apache/servicecomb-service-center/datasource/etcd/sd" + "github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore" +) + +type mockEventHandler struct { + processedEvts []kvstore.Event +} + +func (h *mockEventHandler) Type() kvstore.Type { + return sd.TypeInstance +} + +func (h *mockEventHandler) OnEvent(evt kvstore.Event) { + h.processedEvts = append(h.processedEvts, evt) +} + +func TestInstanceEventHandler_OnEvent(t *testing.T) { + h := NewInstanceEventHandler(&HealthChecker{ + shouldTrustPeerServer: false, + }, 100*time.Millisecond, kvstore.EventHandler(nil)) + + evt := kvstore.Event{ + Type: pb.EVT_CREATE, + KV: &kvstore.KeyValue{ + Value: &pb.MicroServiceInstance{ + InstanceId: "test_instnace", + ServiceId: "test_service", + }, + }, + } + h.OnEvent(evt) + assert.Equal(t, 1, len(h.events)) + + // 信任对端SC,不处理 + h.events = make(map[string]kvstore.Event) + h.healthChecker.shouldTrustPeerServer = true + h.OnEvent(evt) + assert.Equal(t, 0, len(h.events)) + + h.events = make(map[string]kvstore.Event) + // 忽略的事件,不处理 + evt.Type = pb.EVT_INIT + h.healthChecker.shouldTrustPeerServer = false + h.OnEvent(evt) + assert.Equal(t, 0, len(h.events)) + + // 处理 + evt.Type = pb.EVT_CREATE + h.OnEvent(evt) + assert.Equal(t, 1, len(h.events)) +} + +func Test_checkShouldIgnoreEvent(t *testing.T) { + instanceDef := &pb.MicroServiceInstance{ + InstanceId: "test_instnace", + ServiceId: "test_service", + Properties: map[string]string{ + "engineID": "test_engineID_fake", + "engineName": "test_engineName", + }, + } + + evt := kvstore.Event{ + Type: pb.EVT_CREATE, + KV: &kvstore.KeyValue{ + Value: instanceDef, + }, + } + instance, ignore := checkShouldIgnoreEvent(evt) + assert.False(t, ignore) + assert.Equal(t, instanceDef, instance) + + evt.Type = pb.EVT_INIT + _, ignore = checkShouldIgnoreEvent(evt) + assert.True(t, ignore) + evt.Type = pb.EVT_DELETE + _, ignore = checkShouldIgnoreEvent(evt) + assert.True(t, ignore) + evt.Type = pb.EVT_UPDATE + _, ignore = checkShouldIgnoreEvent(evt) + assert.False(t, ignore) + + evt.KV.Value = 1 + _, ignore = checkShouldIgnoreEvent(evt) + assert.True(t, ignore) + evt.KV.Value = instanceDef + _, ignore = checkShouldIgnoreEvent(evt) + assert.False(t, ignore) + + instanceDef.Properties["engineID"] = "test_engineID" + _, ignore = checkShouldIgnoreEvent(evt) + assert.True(t, ignore) + instanceDef.Properties["engineID"] = "test_engineID_fake" + _, ignore = checkShouldIgnoreEvent(evt) + assert.False(t, ignore) +} + +func TestInstanceEventHandler_run(t *testing.T) { + mockHandler := &mockEventHandler{} + h := NewInstanceEventHandler(&HealthChecker{ + shouldTrustPeerServer: false, + }, 100*time.Millisecond, mockHandler) + + var wg sync.WaitGroup + numGoroutines := 50 + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + h.OnEvent(kvstore.Event{ + Type: pb.EVT_CREATE, + KV: &kvstore.KeyValue{ + Value: &pb.MicroServiceInstance{ + InstanceId: "test_instance", + ServiceId: "test_service", + }, + }, + }) + h.OnEvent(kvstore.Event{ + Type: pb.EVT_CREATE, + KV: &kvstore.KeyValue{ + Value: &pb.MicroServiceInstance{ + InstanceId: "test_instance1", + ServiceId: "test_service1", + }, + }, + }) + }() + } + wg.Wait() + assert.Equal(t, 2, len(h.events)) // 一个服务多个事件仅保留一个 + assert.Equal(t, 0, len(mockHandler.processedEvts)) + + time.Sleep(110 * time.Millisecond) + assert.Equal(t, 2, len(h.events)) + assert.Equal(t, 0, len(mockHandler.processedEvts)) // 对端不可信,只保存,不处理 + + h.healthChecker.shouldTrustPeerServer = true + time.Sleep(110 * time.Millisecond) + assert.Equal(t, 0, len(h.events)) // 对端可信,处理事件,事件处理完成清空 + assert.Equal(t, 2, len(mockHandler.processedEvts)) // 处理事件 + assert.ElementsMatch(t, []string{"test_service", "test_service1"}, // 处理服务为设置的服务 + []string{mockHandler.processedEvts[0].KV.Value.(*pb.MicroServiceInstance).ServiceId, mockHandler.processedEvts[1].KV.Value.(*pb.MicroServiceInstance).ServiceId}) + + for _, evt := range mockHandler.processedEvts { + assert.Equal(t, pb.EVT_UPDATE, evt.Type) + } +} diff --git a/syncer/service/admin/window.go b/syncer/service/admin/window.go new file mode 100644 index 000000000..04c450c47 --- /dev/null +++ b/syncer/service/admin/window.go @@ -0,0 +1,32 @@ +package admin + +type HealthCheckWindow struct { + windowSize int + checkPassResults []bool + failureThreshold int +} + +func NewHealthCheckWindow(windowSize int, failureThreshold int) *HealthCheckWindow { + return &HealthCheckWindow{ + windowSize: windowSize, + checkPassResults: make([]bool, 0, windowSize), + failureThreshold: failureThreshold, + } +} + +func (h *HealthCheckWindow) AddResult(pass bool) { + h.checkPassResults = append(h.checkPassResults, pass) + if len(h.checkPassResults) > h.windowSize { + h.checkPassResults = h.checkPassResults[1:] + } +} + +func (h *HealthCheckWindow) IsHealthy() bool { + failureCount := 0 + for _, pass := range h.checkPassResults { + if !pass { + failureCount++ + } + } + return failureCount < h.failureThreshold +} diff --git a/syncer/service/admin/window_test.go b/syncer/service/admin/window_test.go new file mode 100644 index 000000000..3f7004675 --- /dev/null +++ b/syncer/service/admin/window_test.go @@ -0,0 +1,71 @@ +package admin + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestHealthCheckWindow_AddResult(t *testing.T) { + c := NewHealthCheckWindow(5, 2) + for i := 0; i < 5; i++ { + c.AddResult(true) + assert.Equal(t, i+1, len(c.checkPassResults)) + } + for i := 0; i < 5; i++ { + c.AddResult(true) + assert.Equal(t, c.windowSize, len(c.checkPassResults)) + } + c.AddResult(false) + c.AddResult(false) + assert.ElementsMatch(t, []bool{true, true, true, false, false}, c.checkPassResults) +} + +func TestHealthCheckWindow_IsHealthy(t *testing.T) { + c := NewHealthCheckWindow(4, 2) + + // f 健康 + c.AddResult(false) + assert.True(t, c.IsHealthy()) + + // 达到2次失败,不健康 + for i := 0; i < 5; i++ { + c.AddResult(false) + assert.False(t, c.IsHealthy()) + } + // 结束状态:f f f f + + // f f f t 3次失败 + c.AddResult(true) + assert.False(t, c.IsHealthy()) + + // f f t t 2次失败 + c.AddResult(true) + assert.False(t, c.IsHealthy()) + + // f t t t 1次失败,变为健康 + c.AddResult(true) + assert.True(t, c.IsHealthy()) + + // t t t f 1次失败 + c.AddResult(false) + assert.True(t, c.IsHealthy()) + + // t t f f 2次失败,变为不健康 + c.AddResult(false) + assert.False(t, c.IsHealthy()) + + // t f f t 2次失败 + c.AddResult(true) + assert.False(t, c.IsHealthy()) + + // f f t t 2次失败 + c.AddResult(true) + assert.False(t, c.IsHealthy()) + + // 小于2次失败,变为成功 + for i := 0; i < 4; i++ { + c.AddResult(true) + assert.True(t, c.IsHealthy()) + } +}