Skip to content

Commit

Permalink
Feature: add sync readiness checker (#1499)
Browse files Browse the repository at this point in the history
* Feature: add sync readiness checker

---------

Co-authored-by: sunhaidong <[email protected]>
  • Loading branch information
holden-cpu and sunhaidong1 authored Feb 20, 2025
1 parent ff84385 commit 47b2e16
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 3 deletions.
54 changes: 52 additions & 2 deletions server/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,25 @@ package health

import (
"errors"
"fmt"
"time"

"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/server/alarm"
"github.com/apache/servicecomb-service-center/syncer/config"
"github.com/apache/servicecomb-service-center/syncer/rpc"
)

var healthChecker Checker = &NullChecker{}
var readinessChecker Checker = &DefaultHealthChecker{}
var (
healthChecker Checker = &NullChecker{}
readinessChecker Checker = &DefaultHealthChecker{}
syncReadinessChecker Checker = &SyncReadinessChecker{}
)

var (
scNotReadyError = errors.New("sc api server is not ready")
syncerNotReadyError = errors.New("the syncer module is not ready")
)

type Checker interface {
Healthy() error
Expand All @@ -40,7 +53,41 @@ func (n NullChecker) Healthy() error {
type DefaultHealthChecker struct {
}

type SyncReadinessChecker struct {
startupTime time.Time
}

func SetStartupTime(startupTime time.Time) {
syncReadinessChecker.(*SyncReadinessChecker).startupTime = startupTime
}

func (src *SyncReadinessChecker) Healthy() error {
err := defaultHealth()
if err != nil {
return err
}
if src.startupTime.IsZero() {
return scNotReadyError
}
passTime := src.startupTime.Add(30 * time.Second)
if !rpc.IsNotReceiveSyncRequest() && rpc.GetFirstReceiveTime().Sub(src.startupTime) < 30*time.Second {
passTime = passTime.Add(rpc.GetFirstReceiveTime().Sub(src.startupTime))
} else {
log.Warn(fmt.Sprintf("first sync request is not received or received 30 seconds after startup,%s,%s", rpc.GetFirstReceiveTime(), src.startupTime))
passTime = passTime.Add(30 * time.Second)
}
nowTime := time.Now()
if nowTime.After(passTime) {
return nil
}
return syncerNotReadyError
}

func (hc *DefaultHealthChecker) Healthy() error {
return defaultHealth()
}

func defaultHealth() error {
for _, a := range alarm.ListAll() {
if a.Status == alarm.Cleared {
continue
Expand All @@ -65,5 +112,8 @@ func SetGlobalReadinessChecker(hc Checker) {
}

func GlobalReadinessChecker() Checker {
if config.GetConfig().Sync.EnableOnStart {
return syncReadinessChecker
}
return readinessChecker
}
40 changes: 40 additions & 0 deletions server/health/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/apache/servicecomb-service-center/server/alarm"
"github.com/apache/servicecomb-service-center/server/event"
"github.com/apache/servicecomb-service-center/syncer/rpc"
)

func TestDefaultHealthChecker_Healthy(t *testing.T) {
Expand Down Expand Up @@ -55,3 +58,40 @@ func TestDefaultHealthChecker_Healthy(t *testing.T) {
t.Fatal("TestDefaultHealthChecker_Healthy failed")
}
}

func TestHealthy(t *testing.T) {
now := time.Now()
t.Run("sync_not_start", func(t *testing.T) {
err := syncReadinessChecker.Healthy()
assert.ErrorIs(t, err, scNotReadyError)
})

t.Run("no_sync", func(t *testing.T) {
// 未接受到同步请求,并且nowTime.Before(passTime)
src := &SyncReadinessChecker{startupTime: now}
err := src.Healthy()
assert.ErrorIs(t, err, syncerNotReadyError)
})

t.Run("no_sync_but_exceeds_60s", func(t *testing.T) {
// 未接受到同步请求,但是超出最大等待时间
src := &SyncReadinessChecker{startupTime: now.Add(-60 * time.Second)}
err := src.Healthy()
assert.Nil(t, err)
})

t.Run("sync_and_before", func(t *testing.T) {
// 30s内接受到第一次sync请求,并且nowTime.Before(passTime)
src := &SyncReadinessChecker{startupTime: now}
err := src.Healthy()
rpc.RecordFirstReceivedRequestTime()
assert.ErrorIs(t, err, syncerNotReadyError)
})

t.Run("31s_sync_and_before", func(t *testing.T) {
// 30s后接受到第一次sync请求,并且nowTime.Before(passTime)
src := &SyncReadinessChecker{startupTime: now.Add(-31 * time.Second)}
err := src.Healthy()
assert.ErrorIs(t, err, syncerNotReadyError)
})
}
24 changes: 23 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ package server
import (
"context"
"crypto/tls"
"github.com/apache/servicecomb-service-center/pkg/protect"
"os"
"time"

"github.com/apache/servicecomb-service-center/pkg/protect"
"github.com/apache/servicecomb-service-center/server/health"

"github.com/gofiber/fiber/v2"

Expand All @@ -47,6 +50,11 @@ import (
"github.com/apache/servicecomb-service-center/server/service/rbac"
)

const (
apiServerStartCheckInterval = 1 * time.Second
apiServerStartCheckTimes = 120 // 共检查2分钟
)

var sc ServiceCenterServer

func Run() {
Expand Down Expand Up @@ -76,9 +84,23 @@ func (s *ServiceCenterServer) Run() {

signal.RegisterListener()

go initScStartupTime()

s.waitForQuit()
}

func initScStartupTime() {
i := 1
for ; i <= apiServerStartCheckTimes; i++ {
time.Sleep(apiServerStartCheckInterval)
// 等待sc api server初始化完成
if !GetAPIServer().IsClose() {
health.SetStartupTime(time.Now())
break
}
}
}

func (s *ServiceCenterServer) startChassis() {
go func() {
mask := make([]string, 0)
Expand Down
18 changes: 18 additions & 0 deletions syncer/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ const (
RbacAllowedRoleName = "sync-admin"
)

var firstReceiveTime time.Time

func NewServer() *Server {
return &Server{
replicator: replicator.Manager(),
Expand All @@ -53,6 +55,7 @@ type Server struct {
}

func (s *Server) Sync(ctx context.Context, events *v1sync.EventList) (*v1sync.Results, error) {
RecordFirstReceivedRequestTime()
err := auth(ctx)
if err != nil {
log.Error("auth failed", err)
Expand All @@ -66,6 +69,21 @@ func (s *Server) Sync(ctx context.Context, events *v1sync.EventList) (*v1sync.Re
return s.toResults(res), nil
}

func RecordFirstReceivedRequestTime() {
if IsNotReceiveSyncRequest() {
firstReceiveTime = time.Now()
log.Info(fmt.Sprintf("receive first received request time: %s", firstReceiveTime))
}
}

func IsNotReceiveSyncRequest() bool {
return firstReceiveTime.IsZero()
}

func GetFirstReceiveTime() time.Time {
return firstReceiveTime
}

func generateFailedResults(events *v1sync.EventList, err error) (*v1sync.Results, error) {
if events == nil || len(events.Events) == 0 {
return &v1sync.Results{Results: map[string]*v1sync.Result{}}, nil
Expand Down
6 changes: 6 additions & 0 deletions syncer/service/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,19 @@ package sync

import (
"fmt"
"time"

"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/syncer/service/event"
"github.com/apache/servicecomb-service-center/syncer/service/replicator"
"github.com/apache/servicecomb-service-center/syncer/service/task"
)

const (
apiServerStartCheckInterval = 1 * time.Second
apiServerStartCheckTimes = 120 // 共检查2分钟
)

func Init() {
err := replicator.Work()
if err != nil {
Expand Down

0 comments on commit 47b2e16

Please sign in to comment.