Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2
github.com/hashicorp/go-version v1.8.0
github.com/hashicorp/raft v1.7.0
github.com/hashicorp/raft v1.7.3
github.com/hashicorp/raft-boltdb/v2 v2.3.1
github.com/jmoiron/sqlx v1.4.0
github.com/jotaen/kong-completion v0.0.5
Expand Down Expand Up @@ -122,7 +122,7 @@ require (
github.com/grafana/regexp v0.0.0-20250905093917-f7b3be9d1853 // indirect
github.com/hashicorp/go-hclog v1.6.3 // indirect
github.com/hashicorp/go-metrics v0.5.4 // indirect
github.com/hashicorp/go-msgpack/v2 v2.1.1 // indirect
github.com/hashicorp/go-msgpack/v2 v2.1.2 // indirect
github.com/kr/fs v0.1.0 // indirect
github.com/mattn/go-colorable v0.1.14 // indirect
github.com/miekg/dns v1.1.68 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,8 @@ github.com/hashicorp/go-metrics v0.5.4/go.mod h1:CG5yz4NZ/AI/aQt9Ucm/vdBnbh7fvmv
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-msgpack v1.1.5 h1:9byZdVjKTe5mce63pRVNP1L7UAmdHOTEMGehn6KvJWs=
github.com/hashicorp/go-msgpack v1.1.5/go.mod h1:gWVc3sv/wbDmR3rQsj1CAktEZzoz1YNK9NfGLXJ69/4=
github.com/hashicorp/go-msgpack/v2 v2.1.1 h1:xQEY9yB2wnHitoSzk/B9UjXWRQ67QKu5AOm8aFp8N3I=
github.com/hashicorp/go-msgpack/v2 v2.1.1/go.mod h1:upybraOAblm4S7rx0+jeNy+CWWhzywQsSRV5033mMu4=
github.com/hashicorp/go-msgpack/v2 v2.1.2 h1:4Ee8FTp834e+ewB71RDrQ0VKpyFdrKOjvYtnQ/ltVj0=
github.com/hashicorp/go-msgpack/v2 v2.1.2/go.mod h1:upybraOAblm4S7rx0+jeNy+CWWhzywQsSRV5033mMu4=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
Expand All @@ -338,8 +338,8 @@ github.com/hashicorp/golang-lru v0.6.0 h1:uL2shRDx7RTrOrTCUZEGP/wJUFiUI8QT6E7z5o
github.com/hashicorp/golang-lru v0.6.0/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/memberlist v0.5.0 h1:EtYPN8DpAURiapus508I4n9CzHs2W+8NZGbmmR/prTM=
github.com/hashicorp/memberlist v0.5.0/go.mod h1:yvyXLpo0QaGE59Y7hDTsTzDD25JYBZ4mHgHUZ8lrOI0=
github.com/hashicorp/raft v1.7.0 h1:4u24Qn6lQ6uwziM++UgsyiT64Q8GyRn43CV41qPiz1o=
github.com/hashicorp/raft v1.7.0/go.mod h1:N1sKh6Vn47mrWvEArQgILTyng8GoDRNYlgKyK7PMjs0=
github.com/hashicorp/raft v1.7.3 h1:DxpEqZJysHN0wK+fviai5mFcSYsCkNpFUl1xpAW8Rbo=
github.com/hashicorp/raft v1.7.3/go.mod h1:DfvCGFxpAUPE0L4Uc8JLlTPtc3GzSbdH0MTJCLgnmJQ=
github.com/hashicorp/raft-boltdb v0.0.0-20230125174641-2a8082862702 h1:RLKEcCuKcZ+qp2VlaaZsYZfLOmIiuJNpEi48Rl8u9cQ=
github.com/hashicorp/raft-boltdb v0.0.0-20230125174641-2a8082862702/go.mod h1:nTakvJ4XYq45UXtn0DbwR4aU9ZdjlnIenpbs6Cd+FM0=
github.com/hashicorp/raft-boltdb/v2 v2.3.1 h1:ackhdCNPKblmOhjEU9+4lHSJYFkJd6Jqyvj6eW9pwkc=
Expand Down
1 change: 1 addition & 0 deletions managed/AGENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ Multiple code generation tools are used:
- Don't commit test binaries or test artifacts (add to `.gitignore` if needed)
- Don't comment on every single line of code unnecessarily, only where clarity is needed
- Don't inline comments (i.e. `code // comment`), always put comments on separate lines
- Don't use named return values in functions

### Error Handling
- Use `status.Error()` for gRPC errors with proper codes
Expand Down
105 changes: 92 additions & 13 deletions managed/services/ha/haservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@
package ha

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -113,6 +116,54 @@ func (f *fsmSnapshot) Release() {
// Nothing to release for stateless FSM
}

// memberlistLogWriter is an io.Writer that converts memberlist's standard log format to structured output.
type memberlistLogWriter struct {
logger *logrus.Entry
logRegex *regexp.Regexp
}

// newMemberlistLogWriter creates a new log writer for memberlist.
func newMemberlistLogWriter(logger *logrus.Entry) *memberlistLogWriter {
return &memberlistLogWriter{
logger: logger,
logRegex: regexp.MustCompile(`^\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2} \[(\w+)\] (?:memberlist: )?(.+)$`),
}
}

// Write implements io.Writer interface and converts memberlist logs to logrus format.
func (w *memberlistLogWriter) Write(p []byte) (int, error) {
// Remove trailing newline for parsing
msg := string(bytes.TrimRight(p, "\n"))

// Parse memberlist log format: "2025/12/22 21:43:27 [DEBUG|INFO|WARN|ERR] message"
matches := w.logRegex.FindStringSubmatch(msg)
if len(matches) == 3 { //nolint:mnd
level := strings.ToLower(matches[1])
message := matches[2]

// Log with appropriate level
switch level {
case "debug":
w.logger.Debug(message)
case "info":
w.logger.Info(message)
case "warn":
w.logger.Warn(message)
case "err":
w.logger.Error(message)
default:
w.logger.Info(message)
}
} else {
// Fallback for unparseable logs
w.logger.Info(msg)
}

return len(p), nil
}

var _ io.Writer = (*memberlistLogWriter)(nil)

// setupRaftStorage sets up persistent storage for Raft.
func setupRaftStorage(nodeID string, l *logrus.Entry) (*raftboltdb.BoltStore, *raftboltdb.BoltStore, *raft.FileSnapshotStore, error) {
// Create the Raft data directory for this node
Expand Down Expand Up @@ -215,12 +266,17 @@ func (s *Service) Run(ctx context.Context) error {
raftConfig.SnapshotThreshold = defaultSnapshotThreshold
raftConfig.TrailingLogs = defaultTrailingLogs

// Create a new Raft transport
raa, err := net.ResolveTCPAddr("", net.JoinHostPort(s.params.AdvertiseAddress, strconv.Itoa(s.params.RaftPort)))
tcpAddr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(s.params.AdvertiseAddress, strconv.Itoa(s.params.RaftPort)))
if err != nil {
return err
}
raftTrans, err := raft.NewTCPTransport(net.JoinHostPort("0.0.0.0", strconv.Itoa(s.params.RaftPort)), raa, defaultRaftRetries, defaultTransportTimeout, nil)

raftTrans, err := raft.NewTCPTransport(
net.JoinHostPort("0.0.0.0", strconv.Itoa(s.params.RaftPort)),
tcpAddr,
defaultRaftRetries,
defaultTransportTimeout,
nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -266,9 +322,10 @@ func (s *Service) Run(ctx context.Context) error {
memberlistConfig.Name = s.params.NodeID
memberlistConfig.BindAddr = "0.0.0.0"
memberlistConfig.BindPort = s.params.GossipPort
memberlistConfig.AdvertiseAddr = raa.IP.String()
memberlistConfig.AdvertiseAddr = s.params.AdvertiseAddress
memberlistConfig.AdvertisePort = s.params.GossipPort
memberlistConfig.Events = &memberlist.ChannelEventDelegate{Ch: s.nodeCh}
memberlistConfig.LogOutput = newMemberlistLogWriter(s.l.WithField("subsystem", "memberlist"))

// Create the memberlist
s.memberlist, err = memberlist.Create(memberlistConfig)
Expand All @@ -293,7 +350,7 @@ func (s *Service) Run(ctx context.Context) error {
{
Suffrage: raft.Voter,
ID: raft.ServerID(s.params.NodeID),
Address: raft.ServerAddress(raa.String()),
Address: raft.ServerAddress(net.JoinHostPort(s.lookupFQDN(ctx, s.params.AdvertiseAddress), strconv.Itoa(s.params.RaftPort))),
},
},
}
Expand Down Expand Up @@ -340,7 +397,7 @@ func (s *Service) runRaftNodesSynchronizer(ctx context.Context) {
node := event.Node
switch event.Event {
case memberlist.NodeJoin:
s.addMemberlistNodeToRaft(node)
s.addMemberlistNodeToRaft(ctx, node)
case memberlist.NodeLeave:
s.removeMemberlistNodeFromRaft(node)
case memberlist.NodeUpdate:
Expand All @@ -364,10 +421,10 @@ func (s *Service) runRaftNodesSynchronizer(ctx context.Context) {
raftServers[string(server.ID)] = struct{}{}
}
members := s.memberlist.Members()
s.l.Infof("HA memberlist: %v", members)
s.l.Debugf("HA memberlist: %v", members)
for _, node := range members {
if _, ok := raftServers[node.Name]; !ok {
s.addMemberlistNodeToRaft(node)
s.addMemberlistNodeToRaft(ctx, node)
}
}
case <-ctx.Done():
Expand All @@ -385,16 +442,38 @@ func (s *Service) removeMemberlistNodeFromRaft(node *memberlist.Node) {
}
}

func (s *Service) addMemberlistNodeToRaft(node *memberlist.Node) {
func (s *Service) addMemberlistNodeToRaft(ctx context.Context, node *memberlist.Node) {
s.rw.RLock()
defer s.rw.RUnlock()
serverAddress := raft.ServerAddress(fmt.Sprintf("%s:%d", node.Addr.String(), s.params.RaftPort))

hostname := s.lookupFQDN(ctx, node.Addr.String())
serverAddress := raft.ServerAddress(fmt.Sprintf("%s:%d", hostname, s.params.RaftPort))

err := s.raftNode.AddVoter(raft.ServerID(node.Name), serverAddress, 0, defaultServerOpTimeout).Error()
if err != nil {
s.l.Errorf("couldn't add a server node %s: %q", node.Name, err)
s.l.Errorf("Couldn't add a server node %s (address: %s): %s", node.Name, serverAddress, err)
} else {
s.l.Infof("Added node %s to Raft cluster with address: %s", node.Name, serverAddress)
}
}

// lookupFQDN performs reverse DNS lookup to get FQDN from IP address.
func (s *Service) lookupFQDN(ctx context.Context, address string) string {
if net.ParseIP(address) == nil {
return address
}

names, err := net.DefaultResolver.LookupAddr(ctx, address)
if err != nil || len(names) == 0 {
s.l.Warnf("Failed to lookup FQDN for %s, using IP: %s", address, err)
return address
}

fqdn := strings.TrimSuffix(names[0], ".")
s.l.Debugf("Resolved %s to FQDN: %s", address, fqdn)
return fqdn
}

func (s *Service) runLeaderObserver(ctx context.Context) {
t := time.NewTicker(defaultTickerInterval)
defer t.Stop()
Expand All @@ -413,7 +492,7 @@ func (s *Service) runLeaderObserver(ctx context.Context) {
if peer.Name == s.params.NodeID {
continue
}
s.addMemberlistNodeToRaft(peer)
s.addMemberlistNodeToRaft(ctx, peer)
}
} else {
s.l.Info("I am not a leader!")
Expand All @@ -422,7 +501,7 @@ func (s *Service) runLeaderObserver(ctx context.Context) {
case <-t.C:
address, serverID := node.LeaderWithID()
if serverID != "" {
s.l.Infof("Leader is %s on %s", serverID, address)
s.l.Debugf("Leader is %s on %s", serverID, address)
}
case <-ctx.Done():
return
Expand Down
5 changes: 3 additions & 2 deletions managed/services/victoriametrics/victoriametrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,10 @@ func (svc *Service) populateConfig(cfg *config.Config) error {
if cfg.GlobalConfig.ScrapeTimeout == 0 {
cfg.GlobalConfig.ScrapeTimeout = ScrapeTimeout(resolutions.LR)
}
if !svc.params.ExternalVM() {
if !svc.params.ExternalVM() && !svc.haService.Params().Enabled {
cfg.ScrapeConfigs = append(cfg.ScrapeConfigs, scrapeConfigForVictoriaMetrics(svc.l, resolutions.HR, svc.params))
} else {
}
if svc.params.ExternalVM() && !svc.haService.Params().Enabled {
cfg.ScrapeConfigs = append(cfg.ScrapeConfigs, scrapeConfigForInternalVMAgent(resolutions.HR, svc.baseURL.Host))
}
cfg.ScrapeConfigs = append(cfg.ScrapeConfigs, scrapeConfigForVMAlert(resolutions.HR, pmmServerNodeName))
Expand Down
3 changes: 3 additions & 0 deletions managed/utils/envvars/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@ func ParseEnvVars(envs []string) (*models.ChangeSettingsParams, []error, []strin
case "PMM_INSTALL_METHOD", "PMM_DISTRIBUTION_METHOD":
continue

case "PMM_ENCRYPTION_KEY_PATH":
continue

case pkgenv.EnableAccessControl:
b, err := strconv.ParseBool(v)
if err != nil {
Expand Down
Loading