Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 43 additions & 13 deletions managed/services/ha/haservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -215,12 +216,18 @@
raftConfig.SnapshotThreshold = defaultSnapshotThreshold
raftConfig.TrailingLogs = defaultTrailingLogs

// Create a new Raft transport
raa, err := net.ResolveTCPAddr("", net.JoinHostPort(s.params.AdvertiseAddress, strconv.Itoa(s.params.RaftPort)))
tcpAddr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(s.params.AdvertiseAddress, strconv.Itoa(s.params.RaftPort)))
if err != nil {
return err
}
raftTrans, err := raft.NewTCPTransport(net.JoinHostPort("0.0.0.0", strconv.Itoa(s.params.RaftPort)), raa, defaultRaftRetries, defaultTransportTimeout, nil)

raftTrans, err := raft.NewTCPTransport(

Check failure on line 224 in managed/services/ha/haservice.go

View workflow job for this annotation

GitHub Actions / Checks

arg list parens: align `)` to a same line with last argument
net.JoinHostPort("0.0.0.0", strconv.Itoa(s.params.RaftPort)),
tcpAddr,
defaultRaftRetries,
defaultTransportTimeout,
nil,
)
if err != nil {
return err
}
Expand Down Expand Up @@ -266,9 +273,10 @@
memberlistConfig.Name = s.params.NodeID
memberlistConfig.BindAddr = "0.0.0.0"
memberlistConfig.BindPort = s.params.GossipPort
memberlistConfig.AdvertiseAddr = raa.IP.String()
memberlistConfig.AdvertiseAddr = s.params.AdvertiseAddress
memberlistConfig.AdvertisePort = s.params.GossipPort
memberlistConfig.Events = &memberlist.ChannelEventDelegate{Ch: s.nodeCh}
memberlistConfig.LogOutput = s.l.Logger.Out

// Create the memberlist
s.memberlist, err = memberlist.Create(memberlistConfig)
Expand All @@ -293,7 +301,7 @@
{
Suffrage: raft.Voter,
ID: raft.ServerID(s.params.NodeID),
Address: raft.ServerAddress(raa.String()),
Address: raft.ServerAddress(net.JoinHostPort(s.lookupFQDN(ctx, s.params.AdvertiseAddress), strconv.Itoa(s.params.RaftPort))),
},
},
}
Expand Down Expand Up @@ -340,7 +348,7 @@
node := event.Node
switch event.Event {
case memberlist.NodeJoin:
s.addMemberlistNodeToRaft(node)
s.addMemberlistNodeToRaft(ctx, node)
case memberlist.NodeLeave:
s.removeMemberlistNodeFromRaft(node)
case memberlist.NodeUpdate:
Expand All @@ -364,10 +372,10 @@
raftServers[string(server.ID)] = struct{}{}
}
members := s.memberlist.Members()
s.l.Infof("HA memberlist: %v", members)
s.l.Debugf("HA memberlist: %v", members)
for _, node := range members {
if _, ok := raftServers[node.Name]; !ok {
s.addMemberlistNodeToRaft(node)
s.addMemberlistNodeToRaft(ctx, node)
}
}
case <-ctx.Done():
Expand All @@ -385,14 +393,36 @@
}
}

func (s *Service) addMemberlistNodeToRaft(node *memberlist.Node) {
func (s *Service) addMemberlistNodeToRaft(ctx context.Context, node *memberlist.Node) {
s.rw.RLock()
defer s.rw.RUnlock()
serverAddress := raft.ServerAddress(fmt.Sprintf("%s:%d", node.Addr.String(), s.params.RaftPort))

hostname := s.lookupFQDN(ctx, node.Addr.String())
serverAddress := raft.ServerAddress(fmt.Sprintf("%s:%d", hostname, s.params.RaftPort))

err := s.raftNode.AddVoter(raft.ServerID(node.Name), serverAddress, 0, defaultServerOpTimeout).Error()
if err != nil {
s.l.Errorf("couldn't add a server node %s: %q", node.Name, err)
s.l.Errorf("Couldn't add a server node %s (address: %s): %s", node.Name, serverAddress, err)
} else {
s.l.Infof("Added node %s to Raft cluster with address: %s", node.Name, serverAddress)
}
}

// lookupFQDN performs reverse DNS lookup to get FQDN from IP address.
func (s *Service) lookupFQDN(ctx context.Context, address string) string {
if net.ParseIP(address) == nil {
return address
}

names, err := net.DefaultResolver.LookupAddr(ctx, address)
if err != nil || len(names) == 0 {
s.l.Warnf("Failed to lookup FQDN for %s, using IP: %s", address, err)
return address
}

fqdn := strings.TrimSuffix(names[0], ".")
s.l.Debugf("Resolved %s to FQDN: %s", address, fqdn)
return fqdn
}

func (s *Service) runLeaderObserver(ctx context.Context) {
Expand All @@ -413,7 +443,7 @@
if peer.Name == s.params.NodeID {
continue
}
s.addMemberlistNodeToRaft(peer)
s.addMemberlistNodeToRaft(ctx, peer)
}
} else {
s.l.Info("I am not a leader!")
Expand All @@ -422,7 +452,7 @@
case <-t.C:
address, serverID := node.LeaderWithID()
if serverID != "" {
s.l.Infof("Leader is %s on %s", serverID, address)
s.l.Debugf("Leader is %s on %s", serverID, address)
}
case <-ctx.Done():
return
Expand Down
3 changes: 3 additions & 0 deletions managed/utils/envvars/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@ func ParseEnvVars(envs []string) (*models.ChangeSettingsParams, []error, []strin
case "PMM_INSTALL_METHOD", "PMM_DISTRIBUTION_METHOD":
continue

case "PMM_ENCRYPTION_KEY_PATH":
continue

case pkgenv.EnableAccessControl:
b, err := strconv.ParseBool(v)
if err != nil {
Expand Down
Loading