diff --git a/agent/consul/fsm/fsm.go b/agent/consul/fsm/fsm.go index 0f7f6eb26721..2f687c618779 100644 --- a/agent/consul/fsm/fsm.go +++ b/agent/consul/fsm/fsm.go @@ -137,6 +137,9 @@ func NewFromDeps(deps Deps) *FSM { state: deps.NewStateStore(), } + // Set the logger for the state store + fsm.state.SetLogger(fsm.logger) + // Build out the apply dispatch table based on the registered commands. for msg, fn := range commands { thisFn := fn @@ -231,6 +234,9 @@ func (c *FSM) Restore(old io.ReadCloser) error { stateNew := c.deps.NewStateStore() + // Set the logger for the new state store + stateNew.SetLogger(c.logger) + // Set up a new restore transaction restore := stateNew.Restore() defer restore.Abort() diff --git a/agent/consul/state/config_entry.go b/agent/consul/state/config_entry.go index 428025ea189f..5d8cc6568c96 100644 --- a/agent/consul/state/config_entry.go +++ b/agent/consul/state/config_entry.go @@ -10,6 +10,7 @@ import ( memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-multierror" + hashstructure_v2 "github.com/mitchellh/hashstructure/v2" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/configentry" @@ -20,6 +21,25 @@ import ( "github.com/hashicorp/consul/lib/maps" ) +// discoveryChainCacheKey is the key for caching compiled discovery chains +type discoveryChainCacheKey struct { + ServiceName string + Namespace string + Partition string + Datacenter string + OverrideProtocol string + // Hash of other override parameters (mesh gateway, connect timeout) + OverridesHash uint64 +} + +// discoveryChainCacheEntry contains a cached compiled discovery chain +type discoveryChainCacheEntry struct { + Chain *structs.CompiledDiscoveryChain + Entries *configentry.DiscoveryChainSet + // Index is the config entry index this chain was compiled at + Index uint64 +} + var ( permissiveModeNotAllowedError = errors.New("cannot set MutualTLSMode=permissive because AllowEnablingPermissiveMutualTLS=false in the mesh config entry") ) @@ -879,7 +899,7 @@ var serviceGraphKinds = []string{ // discoveryChainTargets will return a list of services listed as a target for the input's discovery chain func (s *Store) discoveryChainTargetsTxn(tx ReadTxn, ws memdb.WatchSet, dc, service string, entMeta *acl.EnterpriseMeta) (uint64, []structs.ServiceName, error) { - idx, targets, err := discoveryChainOriginalTargetsTxn(tx, ws, dc, service, entMeta) + idx, targets, err := s.discoveryChainOriginalTargetsTxn(tx, ws, dc, service, entMeta) if err != nil { return 0, nil, err } @@ -897,7 +917,7 @@ func (s *Store) discoveryChainTargetsTxn(tx ReadTxn, ws memdb.WatchSet, dc, serv return idx, resp, nil } -func discoveryChainOriginalTargetsTxn( +func (s *Store) discoveryChainOriginalTargetsTxn( tx ReadTxn, ws memdb.WatchSet, dc, service string, @@ -910,7 +930,7 @@ func discoveryChainOriginalTargetsTxn( EvaluateInPartition: source.PartitionOrDefault(), EvaluateInDatacenter: dc, } - idx, chain, _, err := serviceDiscoveryChainTxn(tx, ws, source.Name, entMeta, req) + idx, chain, _, err := s.serviceDiscoveryChainTxn(tx, ws, source.Name, entMeta, req) if err != nil { return 0, nil, fmt.Errorf("failed to fetch discovery chain for %q: %v", source.String(), err) } @@ -957,7 +977,7 @@ func (s *Store) discoveryChainSourcesTxn(tx ReadTxn, ws memdb.WatchSet, dc strin EvaluateInPartition: sn.PartitionOrDefault(), EvaluateInDatacenter: dc, } - idx, chain, _, err := serviceDiscoveryChainTxn(tx, ws, sn.Name, &sn.EnterpriseMeta, req) + idx, chain, _, err := s.serviceDiscoveryChainTxn(tx, ws, sn.Name, &sn.EnterpriseMeta, req) if err != nil { return 0, nil, fmt.Errorf("failed to fetch discovery chain for %q: %v", sn.String(), err) } @@ -1463,10 +1483,10 @@ func (s *Store) ServiceDiscoveryChain( tx := s.db.ReadTxn() defer tx.Abort() - return serviceDiscoveryChainTxn(tx, ws, serviceName, entMeta, req) + return s.serviceDiscoveryChainTxn(tx, ws, serviceName, entMeta, req) } -func serviceDiscoveryChainTxn( +func (s *Store) serviceDiscoveryChainTxn( tx ReadTxn, ws memdb.WatchSet, serviceName string, @@ -1478,6 +1498,63 @@ func serviceDiscoveryChainTxn( if err != nil { return 0, nil, nil, err } + + // Generate cache key + overridesHash, err := hashstructure_v2.Hash(struct { + MeshGateway structs.MeshGatewayConfig + ConnectTimeout any + }{ + MeshGateway: req.OverrideMeshGateway, + ConnectTimeout: req.OverrideConnectTimeout, + }, hashstructure_v2.FormatV2, nil) + if err != nil { + return 0, nil, nil, fmt.Errorf("error hashing compile overrides: %w", err) + } + + cacheKey := discoveryChainCacheKey{ + ServiceName: serviceName, + Namespace: entMeta.NamespaceOrDefault(), + Partition: entMeta.PartitionOrDefault(), + Datacenter: req.EvaluateInDatacenter, + OverrideProtocol: req.OverrideProtocol, + OverridesHash: overridesHash, + } + + // Check cache + s.discoveryChainCacheLock.RLock() + cached, ok := s.discoveryChainCache.Get(cacheKey) + s.discoveryChainCacheLock.RUnlock() + + // Cache hit: return cached chain if index matches + if ok && cached.Index == index { + s.logger.Trace("discovery chain cache hit", + "service", serviceName, + "namespace", entMeta.NamespaceOrDefault(), + "partition", entMeta.PartitionOrDefault(), + "datacenter", req.EvaluateInDatacenter, + "index", index, + ) + return index, cached.Chain, cached.Entries, nil + } + + // Cache miss or stale + if ok { + s.logger.Debug("discovery chain cache stale, recompiling", + "service", serviceName, + "namespace", entMeta.NamespaceOrDefault(), + "partition", entMeta.PartitionOrDefault(), + "cached_index", cached.Index, + "current_index", index, + ) + } else { + s.logger.Debug("discovery chain cache miss, compiling", + "service", serviceName, + "namespace", entMeta.NamespaceOrDefault(), + "partition", entMeta.PartitionOrDefault(), + ) + } + + // Cache miss or stale: compile the chain req.Entries = entries _, config, err := caConfigTxn(tx, ws) @@ -1512,12 +1589,32 @@ func serviceDiscoveryChainTxn( copy(req.ManualVirtualIPs, serviceVIPEntry.ManualIPs) } - // Then we compile it into something useful. + // Compile the discovery chain. chain, err := discoverychain.Compile(req) if err != nil { return 0, nil, nil, fmt.Errorf("failed to compile discovery chain: %v", err) } + // Store in cache + cacheEntry := &discoveryChainCacheEntry{ + Chain: chain, + Entries: entries, + Index: index, + } + + s.discoveryChainCacheLock.Lock() + s.discoveryChainCache.Add(cacheKey, cacheEntry) + cacheSize := s.discoveryChainCache.Len() + s.discoveryChainCacheLock.Unlock() + + s.logger.Trace("discovery chain compiled and cached", + "service", serviceName, + "namespace", entMeta.NamespaceOrDefault(), + "partition", entMeta.PartitionOrDefault(), + "index", index, + "cache_size", cacheSize, + ) + return index, chain, entries, nil } diff --git a/agent/consul/state/peering.go b/agent/consul/state/peering.go index 08e19a62d730..fc3aeb33050c 100644 --- a/agent/consul/state/peering.go +++ b/agent/consul/state/peering.go @@ -727,7 +727,7 @@ func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string, dc str return 0, &structs.ExportedServiceList{}, nil } - return exportedServicesForPeerTxn(ws, tx, peering, dc) + return s.exportedServicesForPeerTxn(ws, tx, peering, dc) } func (s *Store) ExportedServicesForAllPeersByName(ws memdb.WatchSet, dc string, entMeta acl.EnterpriseMeta) (uint64, map[string]structs.ServiceList, error) { @@ -741,7 +741,7 @@ func (s *Store) ExportedServicesForAllPeersByName(ws memdb.WatchSet, dc string, out := make(map[string]structs.ServiceList) for _, peering := range peerings { - idx, list, err := exportedServicesForPeerTxn(ws, tx, peering, dc) + idx, list, err := s.exportedServicesForPeerTxn(ws, tx, peering, dc) if err != nil { return 0, nil, fmt.Errorf("failed to list exported services for peer %q: %w", peering.ID, err) } @@ -763,7 +763,7 @@ func (s *Store) ExportedServicesForAllPeersByName(ws memdb.WatchSet, dc string, // specific peering, and optionally include information about discovery chain // reachable targets for these exported services if the "dc" parameter is // specified. -func exportedServicesForPeerTxn( +func (s *Store) exportedServicesForPeerTxn( ws memdb.WatchSet, tx ReadTxn, peering *pbpeering.Peering, @@ -970,7 +970,7 @@ func exportedServicesForPeerTxn( } info.Protocol = protocol - idx, targets, err := discoveryChainOriginalTargetsTxn(tx, ws, dc, svc.Name, &svc.EnterpriseMeta) + idx, targets, err := s.discoveryChainOriginalTargetsTxn(tx, ws, dc, svc.Name, &svc.EnterpriseMeta) if err != nil { return fmt.Errorf("failed to get discovery chain targets for service %q: %w", svc, err) } diff --git a/agent/consul/state/state_store.go b/agent/consul/state/state_store.go index dff3441535bb..4e8a26ab51b4 100644 --- a/agent/consul/state/state_store.go +++ b/agent/consul/state/state_store.go @@ -6,8 +6,11 @@ package state import ( "errors" "fmt" + "sync" + "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" + lru "github.com/hashicorp/golang-lru/v2" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/stream" @@ -115,6 +118,14 @@ type Store struct { // lockDelay holds expiration times for locks associated with keys. lockDelay *Delay + + // logger is used for logging state store operations. + logger hclog.Logger + + // discoveryChainCache caches compiled discovery chains to avoid expensive + // recompilation when config entries haven't changed. + discoveryChainCacheLock sync.RWMutex + discoveryChainCache *lru.Cache[discoveryChainCacheKey, *discoveryChainCacheEntry] } // Snapshot is used to provide a point-in-time snapshot. It @@ -155,11 +166,20 @@ func NewStateStore(gc *TombstoneGC) *Store { // be a programming error, which should panic. panic(fmt.Sprintf("failed to create state store: %v", err)) } + // Create discovery chain cache with a reasonable size + // Each entry contains a compiled chain which can be large, so we limit to 1000 entries + discoveryCache, err := lru.New[discoveryChainCacheKey, *discoveryChainCacheEntry](1000) + if err != nil { + panic(fmt.Sprintf("failed to create discovery chain cache: %v", err)) + } + s := &Store{ - schema: schema, - abandonCh: make(chan struct{}), - kvsGraveyard: NewGraveyard(gc), - lockDelay: NewDelay(), + schema: schema, + abandonCh: make(chan struct{}), + kvsGraveyard: NewGraveyard(gc), + lockDelay: NewDelay(), + logger: hclog.NewNullLogger(), + discoveryChainCache: discoveryCache, db: &changeTrackerDB{ db: db, publisher: stream.NoOpEventPublisher{}, @@ -176,6 +196,13 @@ func NewStateStoreWithEventPublisher(gc *TombstoneGC, publisher EventPublisher) return store } +// SetLogger sets the logger for the state store. +func (s *Store) SetLogger(logger hclog.Logger) { + if logger != nil { + s.logger = logger.Named("state") + } +} + // Snapshot is used to create a point-in-time snapshot of the entire db. func (s *Store) Snapshot() *Snapshot { tx := s.db.Txn(false)