Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: add sync readiness checker #1499

Merged
merged 3 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 52 additions & 2 deletions server/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,25 @@ package health

import (
"errors"
"fmt"
"time"

"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/server/alarm"
"github.com/apache/servicecomb-service-center/syncer/config"
"github.com/apache/servicecomb-service-center/syncer/rpc"
)

var healthChecker Checker = &NullChecker{}
var readinessChecker Checker = &DefaultHealthChecker{}
var (
healthChecker Checker = &NullChecker{}
readinessChecker Checker = &DefaultHealthChecker{}
syncReadinessChecker Checker = &SyncReadinessChecker{}
)

var (
scNotReadyError = errors.New("sc api server is not ready")
syncerNotReadyError = errors.New("the syncer module is not ready")
)

type Checker interface {
Healthy() error
Expand All @@ -40,7 +53,41 @@ func (n NullChecker) Healthy() error {
type DefaultHealthChecker struct {
}

type SyncReadinessChecker struct {
startupTime time.Time
}

func SetStartupTime(startupTime time.Time) {
syncReadinessChecker.(*SyncReadinessChecker).startupTime = startupTime
}

func (src *SyncReadinessChecker) Healthy() error {
err := defaultHealth()
if err != nil {
return err
}
if src.startupTime.IsZero() {
return scNotReadyError
}
passTime := src.startupTime.Add(30 * time.Second)
if !rpc.IsNotReceiveSyncRequest() && rpc.GetFirstReceiveTime().Sub(src.startupTime) < 30*time.Second {
passTime = passTime.Add(rpc.GetFirstReceiveTime().Sub(src.startupTime))
} else {
log.Warn(fmt.Sprintf("first sync request is not received or received 30 seconds after startup,%s,%s", rpc.GetFirstReceiveTime(), src.startupTime))
passTime = passTime.Add(30 * time.Second)
}
nowTime := time.Now()
if nowTime.After(passTime) {
return nil
}
return syncerNotReadyError
}

func (hc *DefaultHealthChecker) Healthy() error {
return defaultHealth()
}

func defaultHealth() error {
for _, a := range alarm.ListAll() {
if a.Status == alarm.Cleared {
continue
Expand All @@ -65,5 +112,8 @@ func SetGlobalReadinessChecker(hc Checker) {
}

func GlobalReadinessChecker() Checker {
if config.GetConfig().Sync.EnableOnStart {
return syncReadinessChecker
}
return readinessChecker
}
40 changes: 40 additions & 0 deletions server/health/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/apache/servicecomb-service-center/server/alarm"
"github.com/apache/servicecomb-service-center/server/event"
"github.com/apache/servicecomb-service-center/syncer/rpc"
)

func TestDefaultHealthChecker_Healthy(t *testing.T) {
Expand Down Expand Up @@ -55,3 +58,40 @@ func TestDefaultHealthChecker_Healthy(t *testing.T) {
t.Fatal("TestDefaultHealthChecker_Healthy failed")
}
}

func TestHealthy(t *testing.T) {
now := time.Now()
t.Run("sync_not_start", func(t *testing.T) {
err := syncReadinessChecker.Healthy()
assert.ErrorIs(t, err, scNotReadyError)
})

t.Run("no_sync", func(t *testing.T) {
// 未接受到同步请求,并且nowTime.Before(passTime)
src := &SyncReadinessChecker{startupTime: now}
err := src.Healthy()
assert.ErrorIs(t, err, syncerNotReadyError)
})

t.Run("no_sync_but_exceeds_60s", func(t *testing.T) {
// 未接受到同步请求,但是超出最大等待时间
src := &SyncReadinessChecker{startupTime: now.Add(-60 * time.Second)}
err := src.Healthy()
assert.Nil(t, err)
})

t.Run("sync_and_before", func(t *testing.T) {
// 30s内接受到第一次sync请求,并且nowTime.Before(passTime)
src := &SyncReadinessChecker{startupTime: now}
err := src.Healthy()
rpc.RecordFirstReceivedRequestTime()
assert.ErrorIs(t, err, syncerNotReadyError)
})

t.Run("31s_sync_and_before", func(t *testing.T) {
// 30s后接受到第一次sync请求,并且nowTime.Before(passTime)
src := &SyncReadinessChecker{startupTime: now.Add(-31 * time.Second)}
err := src.Healthy()
assert.ErrorIs(t, err, syncerNotReadyError)
})
}
24 changes: 23 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ package server
import (
"context"
"crypto/tls"
"github.com/apache/servicecomb-service-center/pkg/protect"
"os"
"time"

"github.com/apache/servicecomb-service-center/pkg/protect"
"github.com/apache/servicecomb-service-center/server/health"

"github.com/gofiber/fiber/v2"

Expand All @@ -47,6 +50,11 @@ import (
"github.com/apache/servicecomb-service-center/server/service/rbac"
)

const (
apiServerStartCheckInterval = 1 * time.Second
apiServerStartCheckTimes = 120 // 共检查2分钟
)

var sc ServiceCenterServer

func Run() {
Expand Down Expand Up @@ -76,9 +84,23 @@ func (s *ServiceCenterServer) Run() {

signal.RegisterListener()

go initScStartupTime()

s.waitForQuit()
}

func initScStartupTime() {
i := 1
for ; i <= apiServerStartCheckTimes; i++ {
time.Sleep(apiServerStartCheckInterval)
// 等待sc api server初始化完成
if !GetAPIServer().IsClose() {
health.SetStartupTime(time.Now())
break
}
}
}

func (s *ServiceCenterServer) startChassis() {
go func() {
mask := make([]string, 0)
Expand Down
18 changes: 18 additions & 0 deletions syncer/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ const (
RbacAllowedRoleName = "sync-admin"
)

var firstReceiveTime time.Time

func NewServer() *Server {
return &Server{
replicator: replicator.Manager(),
Expand All @@ -53,6 +55,7 @@ type Server struct {
}

func (s *Server) Sync(ctx context.Context, events *v1sync.EventList) (*v1sync.Results, error) {
RecordFirstReceivedRequestTime()
err := auth(ctx)
if err != nil {
log.Error("auth failed", err)
Expand All @@ -66,6 +69,21 @@ func (s *Server) Sync(ctx context.Context, events *v1sync.EventList) (*v1sync.Re
return s.toResults(res), nil
}

func RecordFirstReceivedRequestTime() {
if IsNotReceiveSyncRequest() {
firstReceiveTime = time.Now()
log.Info(fmt.Sprintf("receive first received request time: %s", firstReceiveTime))
}
}

func IsNotReceiveSyncRequest() bool {
return firstReceiveTime.IsZero()
}

func GetFirstReceiveTime() time.Time {
return firstReceiveTime
}

func generateFailedResults(events *v1sync.EventList, err error) (*v1sync.Results, error) {
if events == nil || len(events.Events) == 0 {
return &v1sync.Results{Results: map[string]*v1sync.Result{}}, nil
Expand Down
6 changes: 6 additions & 0 deletions syncer/service/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,19 @@ package sync

import (
"fmt"
"time"

"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/syncer/service/event"
"github.com/apache/servicecomb-service-center/syncer/service/replicator"
"github.com/apache/servicecomb-service-center/syncer/service/task"
)

const (
apiServerStartCheckInterval = 1 * time.Second
apiServerStartCheckTimes = 120 // 共检查2分钟
)

func Init() {
err := replicator.Work()
if err != nil {
Expand Down
Loading