Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions managed/AGENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ Multiple code generation tools are used:
- Don't commit test binaries or test artifacts (add to `.gitignore` if needed)
- Don't comment on every single line of code unnecessarily, only where clarity is needed
- Don't inline comments (i.e. `code // comment`), always put comments on separate lines
- Don't use named return values in functions

### Error Handling
- Use `status.Error()` for gRPC errors with proper codes
Expand Down
107 changes: 94 additions & 13 deletions managed/services/ha/haservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@
package ha

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -113,6 +116,56 @@ func (f *fsmSnapshot) Release() {
// Nothing to release for stateless FSM
}

// memberlistLogWriter is an io.Writer that converts memberlist's standard log format to structured output.
type memberlistLogWriter struct {
logger *logrus.Entry
logRegex *regexp.Regexp
}

// newMemberlistLogWriter creates a new log writer for memberlist.
func newMemberlistLogWriter(logger *logrus.Entry) *memberlistLogWriter {
return &memberlistLogWriter{
logger: logger,
logRegex: regexp.MustCompile(`^\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2} \[(\w+)\] (.+)$`),
}
}

// Write implements io.Writer interface and converts memberlist logs to logrus format.
func (w *memberlistLogWriter) Write(p []byte) (int, error) {
n := len(p)

// Remove trailing newline for parsing
msg := string(bytes.TrimRight(p, "\n"))

// Parse memberlist log format: "2025/12/22 21:43:27 [DEBUG] message"
matches := w.logRegex.FindStringSubmatch(msg)
if len(matches) == 3 { //nolint:mnd
level := strings.ToLower(matches[1])
message := matches[2]

// Log with appropriate level
switch level {
case "debug":
w.logger.Debug(message)
case "info":
w.logger.Info(message)
case "warn", "warning":
w.logger.Warn(message)
case "error", "err":
w.logger.Error(message)
default:
w.logger.Info(message)
}
} else {
// Fallback for unparseable logs
w.logger.Info(msg)
}

return n, nil
}

var _ io.Writer = (*memberlistLogWriter)(nil)

// setupRaftStorage sets up persistent storage for Raft.
func setupRaftStorage(nodeID string, l *logrus.Entry) (*raftboltdb.BoltStore, *raftboltdb.BoltStore, *raft.FileSnapshotStore, error) {
// Create the Raft data directory for this node
Expand Down Expand Up @@ -215,12 +268,17 @@ func (s *Service) Run(ctx context.Context) error {
raftConfig.SnapshotThreshold = defaultSnapshotThreshold
raftConfig.TrailingLogs = defaultTrailingLogs

// Create a new Raft transport
raa, err := net.ResolveTCPAddr("", net.JoinHostPort(s.params.AdvertiseAddress, strconv.Itoa(s.params.RaftPort)))
tcpAddr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(s.params.AdvertiseAddress, strconv.Itoa(s.params.RaftPort)))
if err != nil {
return err
}
raftTrans, err := raft.NewTCPTransport(net.JoinHostPort("0.0.0.0", strconv.Itoa(s.params.RaftPort)), raa, defaultRaftRetries, defaultTransportTimeout, nil)

raftTrans, err := raft.NewTCPTransport(
net.JoinHostPort("0.0.0.0", strconv.Itoa(s.params.RaftPort)),
tcpAddr,
defaultRaftRetries,
defaultTransportTimeout,
nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -266,9 +324,10 @@ func (s *Service) Run(ctx context.Context) error {
memberlistConfig.Name = s.params.NodeID
memberlistConfig.BindAddr = "0.0.0.0"
memberlistConfig.BindPort = s.params.GossipPort
memberlistConfig.AdvertiseAddr = raa.IP.String()
memberlistConfig.AdvertiseAddr = s.params.AdvertiseAddress
memberlistConfig.AdvertisePort = s.params.GossipPort
memberlistConfig.Events = &memberlist.ChannelEventDelegate{Ch: s.nodeCh}
memberlistConfig.LogOutput = newMemberlistLogWriter(s.l.WithField("subsystem", "memberlist"))

// Create the memberlist
s.memberlist, err = memberlist.Create(memberlistConfig)
Expand All @@ -293,7 +352,7 @@ func (s *Service) Run(ctx context.Context) error {
{
Suffrage: raft.Voter,
ID: raft.ServerID(s.params.NodeID),
Address: raft.ServerAddress(raa.String()),
Address: raft.ServerAddress(net.JoinHostPort(s.lookupFQDN(ctx, s.params.AdvertiseAddress), strconv.Itoa(s.params.RaftPort))),
},
},
}
Expand Down Expand Up @@ -340,7 +399,7 @@ func (s *Service) runRaftNodesSynchronizer(ctx context.Context) {
node := event.Node
switch event.Event {
case memberlist.NodeJoin:
s.addMemberlistNodeToRaft(node)
s.addMemberlistNodeToRaft(ctx, node)
case memberlist.NodeLeave:
s.removeMemberlistNodeFromRaft(node)
case memberlist.NodeUpdate:
Expand All @@ -364,10 +423,10 @@ func (s *Service) runRaftNodesSynchronizer(ctx context.Context) {
raftServers[string(server.ID)] = struct{}{}
}
members := s.memberlist.Members()
s.l.Infof("HA memberlist: %v", members)
s.l.Debugf("HA memberlist: %v", members)
for _, node := range members {
if _, ok := raftServers[node.Name]; !ok {
s.addMemberlistNodeToRaft(node)
s.addMemberlistNodeToRaft(ctx, node)
}
}
case <-ctx.Done():
Expand All @@ -385,16 +444,38 @@ func (s *Service) removeMemberlistNodeFromRaft(node *memberlist.Node) {
}
}

func (s *Service) addMemberlistNodeToRaft(node *memberlist.Node) {
func (s *Service) addMemberlistNodeToRaft(ctx context.Context, node *memberlist.Node) {
s.rw.RLock()
defer s.rw.RUnlock()
serverAddress := raft.ServerAddress(fmt.Sprintf("%s:%d", node.Addr.String(), s.params.RaftPort))

hostname := s.lookupFQDN(ctx, node.Addr.String())
serverAddress := raft.ServerAddress(fmt.Sprintf("%s:%d", hostname, s.params.RaftPort))

err := s.raftNode.AddVoter(raft.ServerID(node.Name), serverAddress, 0, defaultServerOpTimeout).Error()
if err != nil {
s.l.Errorf("couldn't add a server node %s: %q", node.Name, err)
s.l.Errorf("Couldn't add a server node %s (address: %s): %s", node.Name, serverAddress, err)
} else {
s.l.Infof("Added node %s to Raft cluster with address: %s", node.Name, serverAddress)
}
}

// lookupFQDN performs reverse DNS lookup to get FQDN from IP address.
func (s *Service) lookupFQDN(ctx context.Context, address string) string {
if net.ParseIP(address) == nil {
return address
}

names, err := net.DefaultResolver.LookupAddr(ctx, address)
if err != nil || len(names) == 0 {
s.l.Warnf("Failed to lookup FQDN for %s, using IP: %s", address, err)
return address
}

fqdn := strings.TrimSuffix(names[0], ".")
s.l.Debugf("Resolved %s to FQDN: %s", address, fqdn)
return fqdn
}

func (s *Service) runLeaderObserver(ctx context.Context) {
t := time.NewTicker(defaultTickerInterval)
defer t.Stop()
Expand All @@ -413,7 +494,7 @@ func (s *Service) runLeaderObserver(ctx context.Context) {
if peer.Name == s.params.NodeID {
continue
}
s.addMemberlistNodeToRaft(peer)
s.addMemberlistNodeToRaft(ctx, peer)
}
} else {
s.l.Info("I am not a leader!")
Expand All @@ -422,7 +503,7 @@ func (s *Service) runLeaderObserver(ctx context.Context) {
case <-t.C:
address, serverID := node.LeaderWithID()
if serverID != "" {
s.l.Infof("Leader is %s on %s", serverID, address)
s.l.Debugf("Leader is %s on %s", serverID, address)
}
case <-ctx.Done():
return
Expand Down
5 changes: 3 additions & 2 deletions managed/services/victoriametrics/victoriametrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,10 @@ func (svc *Service) populateConfig(cfg *config.Config) error {
if cfg.GlobalConfig.ScrapeTimeout == 0 {
cfg.GlobalConfig.ScrapeTimeout = ScrapeTimeout(resolutions.LR)
}
if !svc.params.ExternalVM() {
if !svc.params.ExternalVM() && !svc.haService.Params().Enabled {
cfg.ScrapeConfigs = append(cfg.ScrapeConfigs, scrapeConfigForVictoriaMetrics(svc.l, resolutions.HR, svc.params))
} else {
}
if svc.params.ExternalVM() && !svc.haService.Params().Enabled {
cfg.ScrapeConfigs = append(cfg.ScrapeConfigs, scrapeConfigForInternalVMAgent(resolutions.HR, svc.baseURL.Host))
}
cfg.ScrapeConfigs = append(cfg.ScrapeConfigs, scrapeConfigForVMAlert(resolutions.HR, pmmServerNodeName))
Expand Down
3 changes: 3 additions & 0 deletions managed/utils/envvars/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@ func ParseEnvVars(envs []string) (*models.ChangeSettingsParams, []error, []strin
case "PMM_INSTALL_METHOD", "PMM_DISTRIBUTION_METHOD":
continue

case "PMM_ENCRYPTION_KEY_PATH":
continue

case pkgenv.EnableAccessControl:
b, err := strconv.ParseBool(v)
if err != nil {
Expand Down
Loading