Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

Commit

Permalink
fix: add node state check in transfer leader procedure & support mult…
Browse files Browse the repository at this point in the history
…i-node concurrent shard opening. (#134)

* add node state check in transfer leader procedure

add node parallelism in scheduler

add getShardNodes interface

* fix reopen shard

* add todo issue
  • Loading branch information
ZuLiangWang authored Feb 22, 2023
1 parent d408707 commit 8caee7f
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 16 deletions.
3 changes: 2 additions & 1 deletion server/cluster/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (
)

const (
MinShardID = 0
MinShardID = 0
HeartbeatKeepAliveIntervalSec uint64 = 15
)

type TableInfo struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"encoding/json"
"sync"
"time"

"github.com/CeresDB/ceresmeta/pkg/log"
"github.com/CeresDB/ceresmeta/server/cluster"
Expand Down Expand Up @@ -264,6 +265,12 @@ func closeOldLeaderCallback(event *fsm.Event) {
}
ctx := request.ctx

// If the node is expired, skip close old leader shard.
oldLeaderNode, exists := request.cluster.GetRegisteredNodeByName(request.oldLeaderNodeName)
if !exists || oldLeaderNode.IsExpired(uint64(time.Now().Unix()), cluster.HeartbeatKeepAliveIntervalSec) {
return
}

closeShardRequest := eventdispatch.CloseShardRequest{
ShardID: uint32(request.shardID),
}
Expand Down
35 changes: 20 additions & 15 deletions server/coordinator/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ import (
"github.com/CeresDB/ceresmeta/server/storage"
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

const (
heartbeatCheckInterval = 10 * time.Second
heartbeatKeepAliveIntervalSec uint64 = 15
heartbeatCheckInterval = 10 * time.Second
)

type Scheduler struct {
Expand Down Expand Up @@ -102,18 +102,27 @@ func (s *Scheduler) checkNode(ctx context.Context, ticker *time.Ticker) {
}

func (s *Scheduler) processNodes(ctx context.Context, nodes []cluster.RegisteredNode, t time.Time, nodeShardsMapping map[string][]cluster.ShardInfo) {
group := new(errgroup.Group)
for _, node := range nodes {
// Refer to: https://go.dev/doc/faq#closures_and_goroutines
node := node
// Determines whether node is expired.
if !node.IsExpired(uint64(t.Unix()), heartbeatKeepAliveIntervalSec) {
if !node.IsExpired(uint64(t.Unix()), cluster.HeartbeatKeepAliveIntervalSec) {
// Shard versions of CeresDB and CeresMeta may be inconsistent. And close extra shards and open missing shards if so.
realShards := node.ShardInfos
expectShards := nodeShardsMapping[node.Node.Name]
err := s.applyMetadataShardInfo(ctx, node.Node.Name, realShards, expectShards)
if err != nil {
log.Error("apply metadata failed", zap.Error(err))
}
group.Go(func() error {
realShards := node.ShardInfos
expectShards := nodeShardsMapping[node.Node.Name]
err := s.applyMetadataShardInfo(ctx, node.Node.Name, realShards, expectShards)
if err != nil {
log.Error("apply metadata failed", zap.Error(err))
}
return nil
})
}
}
if err := group.Wait(); err != nil {
log.Error("error group wait", zap.Error(err))
}
}

// applyMetadataShardInfo verify shardInfo in heartbeats and metadata, they are forcibly synchronized to the latest version if they are inconsistent.
Expand Down Expand Up @@ -147,13 +156,9 @@ func (s *Scheduler) applyMetadataShardInfo(ctx context.Context, node string, rea
}

// 3. Shard exists in both metadata and node, versions are inconsistent, close and reopen invalid shard on node.
// TODO: In the current implementation mode, the scheduler will close and reopen Shard during the table creation process, which will cause Shard to be unavailable for a short time, temporarily close the detection of version inconsistency, and then open it after repair.
// Related issue: https://github.com/CeresDB/ceresmeta/issues/140
log.Info("Shard exists in both metadata and node, versions are inconsistent, close and reopen invalid shard on node.", zap.String("node", node), zap.Uint32("shardID", uint32(expectShard.ID)))
if err := s.dispatch.CloseShard(ctx, node, eventdispatch.CloseShardRequest{ShardID: uint32(expectShard.ID)}); err != nil {
return errors.WithMessagef(err, "close shard failed, shardInfo:%d", expectShard.ID)
}
if err := s.dispatch.OpenShard(ctx, node, eventdispatch.OpenShardRequest{Shard: expectShard}); err != nil {
return errors.WithMessagef(err, "reopen shard failed, shardInfo:%d", expectShard.ID)
}
}

// 4. Shard exists in node and not exists in metadata, close extra shard on node.
Expand Down
35 changes: 35 additions & 0 deletions server/service/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func (a *API) NewAPIRouter() *Router {
router.Post("/split", a.split)
router.Post("/route", a.route)
router.Post("/dropTable", a.dropTable)
router.Post("/getNodeShards", a.getNodeShards)

return router
}
Expand Down Expand Up @@ -242,6 +243,40 @@ func (a *API) route(writer http.ResponseWriter, req *http.Request) {
a.respond(writer, result)
}

type NodeShardsRequest struct {
ClusterName string `json:"clusterName"`
}

func (a *API) getNodeShards(writer http.ResponseWriter, req *http.Request) {
resp, isLeader, err := a.forwardClient.forwardToLeader(req)
if err != nil {
log.Error("forward to leader failed", zap.Error(err))
a.respondError(writer, ErrForwardToLeader, "forward to leader failed")
return
}

if !isLeader {
a.respondForward(writer, resp)
return
}
var nodeShardsRequest NodeShardsRequest
err = json.NewDecoder(req.Body).Decode(&nodeShardsRequest)
if err != nil {
log.Error("decode request body failed", zap.Error(err))
a.respondError(writer, ErrParseRequest, "decode request body failed")
return
}

result, err := a.clusterManager.GetNodeShards(context.Background(), nodeShardsRequest.ClusterName)
if err != nil {
log.Error("get node shards failed", zap.Error(err))
a.respondError(writer, ErrGetNodeShards, "get node shards failed")
return
}

a.respond(writer, result)
}

type DropTableRequest struct {
ClusterName string `json:"clusterName"`
SchemaName string `json:"schemaName"`
Expand Down
1 change: 1 addition & 0 deletions server/service/http/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ var (
ErrParseRequest = coderr.NewCodeError(coderr.BadRequest, "parse request params")
ErrDropTable = coderr.NewCodeError(coderr.Internal, "drop table")
ErrRouteTable = coderr.NewCodeError(coderr.Internal, "route table")
ErrGetNodeShards = coderr.NewCodeError(coderr.Internal, "get node shards")
ErrCreateProcedure = coderr.NewCodeError(coderr.Internal, "create procedure")
ErrSubmitProcedure = coderr.NewCodeError(coderr.Internal, "submit procedure")
ErrGetCluster = coderr.NewCodeError(coderr.Internal, "get cluster")
Expand Down

0 comments on commit 8caee7f

Please sign in to comment.