Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions agent/consul/fsm/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ func NewFromDeps(deps Deps) *FSM {
state: deps.NewStateStore(),
}

// Set the logger for the state store
fsm.state.SetLogger(fsm.logger)

// Build out the apply dispatch table based on the registered commands.
for msg, fn := range commands {
thisFn := fn
Expand Down Expand Up @@ -231,6 +234,9 @@ func (c *FSM) Restore(old io.ReadCloser) error {

stateNew := c.deps.NewStateStore()

// Set the logger for the new state store
stateNew.SetLogger(c.logger)

// Set up a new restore transaction
restore := stateNew.Restore()
defer restore.Abort()
Expand Down
111 changes: 104 additions & 7 deletions agent/consul/state/config_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-multierror"
hashstructure_v2 "github.com/mitchellh/hashstructure/v2"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/configentry"
Expand All @@ -20,6 +21,25 @@ import (
"github.com/hashicorp/consul/lib/maps"
)

// discoveryChainCacheKey is the key for caching compiled discovery chains
type discoveryChainCacheKey struct {
ServiceName string
Namespace string
Partition string
Datacenter string
OverrideProtocol string
// Hash of other override parameters (mesh gateway, connect timeout)
OverridesHash uint64
}

// discoveryChainCacheEntry contains a cached compiled discovery chain
type discoveryChainCacheEntry struct {
Chain *structs.CompiledDiscoveryChain
Entries *configentry.DiscoveryChainSet
// Index is the config entry index this chain was compiled at
Index uint64
}

var (
permissiveModeNotAllowedError = errors.New("cannot set MutualTLSMode=permissive because AllowEnablingPermissiveMutualTLS=false in the mesh config entry")
)
Expand Down Expand Up @@ -879,7 +899,7 @@ var serviceGraphKinds = []string{

// discoveryChainTargets will return a list of services listed as a target for the input's discovery chain
func (s *Store) discoveryChainTargetsTxn(tx ReadTxn, ws memdb.WatchSet, dc, service string, entMeta *acl.EnterpriseMeta) (uint64, []structs.ServiceName, error) {
idx, targets, err := discoveryChainOriginalTargetsTxn(tx, ws, dc, service, entMeta)
idx, targets, err := s.discoveryChainOriginalTargetsTxn(tx, ws, dc, service, entMeta)
if err != nil {
return 0, nil, err
}
Expand All @@ -897,7 +917,7 @@ func (s *Store) discoveryChainTargetsTxn(tx ReadTxn, ws memdb.WatchSet, dc, serv
return idx, resp, nil
}

func discoveryChainOriginalTargetsTxn(
func (s *Store) discoveryChainOriginalTargetsTxn(
tx ReadTxn,
ws memdb.WatchSet,
dc, service string,
Expand All @@ -910,7 +930,7 @@ func discoveryChainOriginalTargetsTxn(
EvaluateInPartition: source.PartitionOrDefault(),
EvaluateInDatacenter: dc,
}
idx, chain, _, err := serviceDiscoveryChainTxn(tx, ws, source.Name, entMeta, req)
idx, chain, _, err := s.serviceDiscoveryChainTxn(tx, ws, source.Name, entMeta, req)
if err != nil {
return 0, nil, fmt.Errorf("failed to fetch discovery chain for %q: %v", source.String(), err)
}
Expand Down Expand Up @@ -957,7 +977,7 @@ func (s *Store) discoveryChainSourcesTxn(tx ReadTxn, ws memdb.WatchSet, dc strin
EvaluateInPartition: sn.PartitionOrDefault(),
EvaluateInDatacenter: dc,
}
idx, chain, _, err := serviceDiscoveryChainTxn(tx, ws, sn.Name, &sn.EnterpriseMeta, req)
idx, chain, _, err := s.serviceDiscoveryChainTxn(tx, ws, sn.Name, &sn.EnterpriseMeta, req)
if err != nil {
return 0, nil, fmt.Errorf("failed to fetch discovery chain for %q: %v", sn.String(), err)
}
Expand Down Expand Up @@ -1463,10 +1483,10 @@ func (s *Store) ServiceDiscoveryChain(
tx := s.db.ReadTxn()
defer tx.Abort()

return serviceDiscoveryChainTxn(tx, ws, serviceName, entMeta, req)
return s.serviceDiscoveryChainTxn(tx, ws, serviceName, entMeta, req)
}

func serviceDiscoveryChainTxn(
func (s *Store) serviceDiscoveryChainTxn(
tx ReadTxn,
ws memdb.WatchSet,
serviceName string,
Expand All @@ -1478,6 +1498,63 @@ func serviceDiscoveryChainTxn(
if err != nil {
return 0, nil, nil, err
}

// Generate cache key
overridesHash, err := hashstructure_v2.Hash(struct {
MeshGateway structs.MeshGatewayConfig
ConnectTimeout any
}{
MeshGateway: req.OverrideMeshGateway,
ConnectTimeout: req.OverrideConnectTimeout,
}, hashstructure_v2.FormatV2, nil)
if err != nil {
return 0, nil, nil, fmt.Errorf("error hashing compile overrides: %w", err)
}

cacheKey := discoveryChainCacheKey{
ServiceName: serviceName,
Namespace: entMeta.NamespaceOrDefault(),
Partition: entMeta.PartitionOrDefault(),
Datacenter: req.EvaluateInDatacenter,
OverrideProtocol: req.OverrideProtocol,
OverridesHash: overridesHash,
}

// Check cache
s.discoveryChainCacheLock.RLock()
cached, ok := s.discoveryChainCache.Get(cacheKey)
s.discoveryChainCacheLock.RUnlock()

// Cache hit: return cached chain if index matches
if ok && cached.Index == index {
s.logger.Trace("discovery chain cache hit",
"service", serviceName,
"namespace", entMeta.NamespaceOrDefault(),
"partition", entMeta.PartitionOrDefault(),
"datacenter", req.EvaluateInDatacenter,
"index", index,
)
return index, cached.Chain, cached.Entries, nil
}

// Cache miss or stale
if ok {
s.logger.Debug("discovery chain cache stale, recompiling",
"service", serviceName,
"namespace", entMeta.NamespaceOrDefault(),
"partition", entMeta.PartitionOrDefault(),
"cached_index", cached.Index,
"current_index", index,
)
} else {
s.logger.Debug("discovery chain cache miss, compiling",
"service", serviceName,
"namespace", entMeta.NamespaceOrDefault(),
"partition", entMeta.PartitionOrDefault(),
)
}

// Cache miss or stale: compile the chain
req.Entries = entries

_, config, err := caConfigTxn(tx, ws)
Expand Down Expand Up @@ -1512,12 +1589,32 @@ func serviceDiscoveryChainTxn(
copy(req.ManualVirtualIPs, serviceVIPEntry.ManualIPs)
}

// Then we compile it into something useful.
// Compile the discovery chain.
chain, err := discoverychain.Compile(req)
if err != nil {
return 0, nil, nil, fmt.Errorf("failed to compile discovery chain: %v", err)
}

// Store in cache
cacheEntry := &discoveryChainCacheEntry{
Chain: chain,
Entries: entries,
Index: index,
}

s.discoveryChainCacheLock.Lock()
s.discoveryChainCache.Add(cacheKey, cacheEntry)
cacheSize := s.discoveryChainCache.Len()
s.discoveryChainCacheLock.Unlock()

s.logger.Trace("discovery chain compiled and cached",
"service", serviceName,
"namespace", entMeta.NamespaceOrDefault(),
"partition", entMeta.PartitionOrDefault(),
"index", index,
"cache_size", cacheSize,
)

return index, chain, entries, nil
}

Expand Down
8 changes: 4 additions & 4 deletions agent/consul/state/peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string, dc str
return 0, &structs.ExportedServiceList{}, nil
}

return exportedServicesForPeerTxn(ws, tx, peering, dc)
return s.exportedServicesForPeerTxn(ws, tx, peering, dc)
}

func (s *Store) ExportedServicesForAllPeersByName(ws memdb.WatchSet, dc string, entMeta acl.EnterpriseMeta) (uint64, map[string]structs.ServiceList, error) {
Expand All @@ -741,7 +741,7 @@ func (s *Store) ExportedServicesForAllPeersByName(ws memdb.WatchSet, dc string,

out := make(map[string]structs.ServiceList)
for _, peering := range peerings {
idx, list, err := exportedServicesForPeerTxn(ws, tx, peering, dc)
idx, list, err := s.exportedServicesForPeerTxn(ws, tx, peering, dc)
if err != nil {
return 0, nil, fmt.Errorf("failed to list exported services for peer %q: %w", peering.ID, err)
}
Expand All @@ -763,7 +763,7 @@ func (s *Store) ExportedServicesForAllPeersByName(ws memdb.WatchSet, dc string,
// specific peering, and optionally include information about discovery chain
// reachable targets for these exported services if the "dc" parameter is
// specified.
func exportedServicesForPeerTxn(
func (s *Store) exportedServicesForPeerTxn(
ws memdb.WatchSet,
tx ReadTxn,
peering *pbpeering.Peering,
Expand Down Expand Up @@ -970,7 +970,7 @@ func exportedServicesForPeerTxn(
}
info.Protocol = protocol

idx, targets, err := discoveryChainOriginalTargetsTxn(tx, ws, dc, svc.Name, &svc.EnterpriseMeta)
idx, targets, err := s.discoveryChainOriginalTargetsTxn(tx, ws, dc, svc.Name, &svc.EnterpriseMeta)
if err != nil {
return fmt.Errorf("failed to get discovery chain targets for service %q: %w", svc, err)
}
Expand Down
35 changes: 31 additions & 4 deletions agent/consul/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ package state
import (
"errors"
"fmt"
"sync"

"github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
lru "github.com/hashicorp/golang-lru/v2"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/stream"
Expand Down Expand Up @@ -115,6 +118,14 @@ type Store struct {

// lockDelay holds expiration times for locks associated with keys.
lockDelay *Delay

// logger is used for logging state store operations.
logger hclog.Logger

// discoveryChainCache caches compiled discovery chains to avoid expensive
// recompilation when config entries haven't changed.
discoveryChainCacheLock sync.RWMutex
discoveryChainCache *lru.Cache[discoveryChainCacheKey, *discoveryChainCacheEntry]
}

// Snapshot is used to provide a point-in-time snapshot. It
Expand Down Expand Up @@ -155,11 +166,20 @@ func NewStateStore(gc *TombstoneGC) *Store {
// be a programming error, which should panic.
panic(fmt.Sprintf("failed to create state store: %v", err))
}
// Create discovery chain cache with a reasonable size
// Each entry contains a compiled chain which can be large, so we limit to 1000 entries
discoveryCache, err := lru.New[discoveryChainCacheKey, *discoveryChainCacheEntry](1000)
if err != nil {
panic(fmt.Sprintf("failed to create discovery chain cache: %v", err))
}

s := &Store{
schema: schema,
abandonCh: make(chan struct{}),
kvsGraveyard: NewGraveyard(gc),
lockDelay: NewDelay(),
schema: schema,
abandonCh: make(chan struct{}),
kvsGraveyard: NewGraveyard(gc),
lockDelay: NewDelay(),
logger: hclog.NewNullLogger(),
discoveryChainCache: discoveryCache,
db: &changeTrackerDB{
db: db,
publisher: stream.NoOpEventPublisher{},
Expand All @@ -176,6 +196,13 @@ func NewStateStoreWithEventPublisher(gc *TombstoneGC, publisher EventPublisher)
return store
}

// SetLogger sets the logger for the state store.
func (s *Store) SetLogger(logger hclog.Logger) {
if logger != nil {
s.logger = logger.Named("state")
}
}

// Snapshot is used to create a point-in-time snapshot of the entire db.
func (s *Store) Snapshot() *Snapshot {
tx := s.db.Txn(false)
Expand Down
Loading