Skip to content

Commit

Permalink
cluster: maitain heartbeart between nodes in the same cluster (#91)
Browse files Browse the repository at this point in the history
  • Loading branch information
bbdshow authored Nov 11, 2022
1 parent 2de04ef commit abcf7c9
Show file tree
Hide file tree
Showing 8 changed files with 468 additions and 154 deletions.
86 changes: 80 additions & 6 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
"context"
"fmt"
"sync"
"time"

"github.com/lonng/nano/cluster/clusterpb"
"github.com/lonng/nano/internal/env"
"github.com/lonng/nano/internal/log"
)

Expand All @@ -42,15 +44,18 @@ type cluster struct {
}

func newCluster(currentNode *Node) *cluster {
return &cluster{currentNode: currentNode}
c := &cluster{currentNode: currentNode}
if currentNode.IsMaster {
c.checkMemberHeartbeat()
}
return c
}

// Register implements the MasterServer gRPC service
func (c *cluster) Register(_ context.Context, req *clusterpb.RegisterRequest) (*clusterpb.RegisterResponse, error) {
if req.MemberInfo == nil {
return nil, ErrInvalidRegisterReq
}

resp := &clusterpb.RegisterResponse{}
c.mu.Lock()
for k, m := range c.members {
Expand Down Expand Up @@ -90,12 +95,12 @@ func (c *cluster) Register(_ context.Context, req *clusterpb.RegisterRequest) (*
// Register services to current node
c.currentNode.handler.addRemoteService(req.MemberInfo)
c.mu.Lock()
c.members = append(c.members, &Member{isMaster: false, memberInfo: req.MemberInfo})
c.members = append(c.members, &Member{isMaster: false, memberInfo: req.MemberInfo, lastHeartbeatAt: time.Now()})
c.mu.Unlock()
return resp, nil
}

// Register implements the MasterServer gRPC service
// Unregister implements the MasterServer gRPC service
func (c *cluster) Unregister(_ context.Context, req *clusterpb.UnregisterRequest) (*clusterpb.UnregisterResponse, error) {
if req.ServiceAddr == "" {
return nil, ErrInvalidRegisterReq
Expand All @@ -110,12 +115,17 @@ func (c *cluster) Unregister(_ context.Context, req *clusterpb.UnregisterRequest
}
}
if index < 0 {
return nil, fmt.Errorf("address %s has notregistered", req.ServiceAddr)
return nil, fmt.Errorf("address %s has not registered", req.ServiceAddr)
}

// Notify registered node to update remote services
delMember := &clusterpb.DelMemberRequest{ServiceAddr: req.ServiceAddr}
for _, m := range c.members {
for i, m := range c.members {
if i == index {
// this node is down.
continue
}

if m.MemberInfo().ServiceAddr == c.currentNode.ServiceAddr {
continue
}
Expand All @@ -132,6 +142,10 @@ func (c *cluster) Unregister(_ context.Context, req *clusterpb.UnregisterRequest

log.Println("Exists peer unregister to cluster", req.ServiceAddr)

if c.currentNode.UnregisterCallback != nil {
c.currentNode.UnregisterCallback(*c.members[index])
}

// Register services to current node
c.currentNode.handler.delMember(req.ServiceAddr)
c.mu.Lock()
Expand All @@ -141,9 +155,69 @@ func (c *cluster) Unregister(_ context.Context, req *clusterpb.UnregisterRequest
c.members = append(c.members[:index], c.members[index+1:]...)
}
c.mu.Unlock()

return resp, nil
}

func (c *cluster) Heartbeat(_ context.Context, req *clusterpb.HeartbeatRequest) (*clusterpb.HeartbeatResponse, error) {
c.mu.Lock()
defer c.mu.Unlock()

isHit := false
for i, m := range c.members {
if m.MemberInfo().GetServiceAddr() == req.GetMemberInfo().GetServiceAddr() {
c.members[i].lastHeartbeatAt = time.Now()
isHit = true
}
}
if !isHit {
// master local not binding this node, other members do not need to be notified, because this node registered.
// maybe the master process reload
m := &Member{
isMaster: false,
memberInfo: req.GetMemberInfo(),
lastHeartbeatAt: time.Now(),
}
c.members = append(c.members, m)
c.currentNode.handler.addRemoteService(req.MemberInfo)
log.Println("Heartbeat peer register to cluster", req.MemberInfo.ServiceAddr)
}
return &clusterpb.HeartbeatResponse{}, nil
}

func (c *cluster) checkMemberHeartbeat() {
check := func() {
unregisterMembers := make([]*Member, 0)
// check heartbeat time
for _, m := range c.members {
if time.Now().Sub(m.lastHeartbeatAt) > 4*env.Heartbeat && !m.isMaster {
unregisterMembers = append(unregisterMembers, m)
}
}

for _, m := range unregisterMembers {
if _, err := c.Unregister(context.Background(), &clusterpb.UnregisterRequest{
ServiceAddr: m.MemberInfo().ServiceAddr,
}); err != nil {
log.Println("Heartbeat unregister error", err)
}
}
}
go func() {
ticker := time.NewTicker(env.Heartbeat)
for {
select {
case <-ticker.C:
if !c.currentNode.IsMaster {
ticker.Stop()
return
}
check()
}
}
}()
}

func (c *cluster) setRpcClient(client *rpcClient) {
c.rpcClient = client
}
Expand Down
Loading

0 comments on commit abcf7c9

Please sign in to comment.