Skip to content

CCIP-5425 Introduce ConfigPollerSyncFreq #751

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions commit/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func (p *PluginFactory) NewReportingPlugin(ctx context.Context, config ocr3types
p.ocrConfig.Config.ChainSelector,
p.ocrConfig.Config.OfframpAddress,
p.addrCodec,
offchainConfig.ConfigPollerSyncFreq,
)

// The node supports the chain that the token prices are on.
Expand Down
1 change: 1 addition & 0 deletions execute/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (p PluginFactory) NewReportingPlugin(
p.ocrConfig.Config.ChainSelector,
p.ocrConfig.Config.OfframpAddress,
p.addrCodec,
offchainConfig.ConfigPollerSyncFreq,
)

tokenDataObserver, err := tokendata.NewConfigBasedCompositeObservers(
Expand Down
8 changes: 4 additions & 4 deletions pkg/reader/ccip.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
mapset "github.com/deckarep/golang-set/v2"
"golang.org/x/exp/maps"

commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/types/query"
Expand All @@ -32,9 +33,6 @@ import (
plugintypes2 "github.com/smartcontractkit/chainlink-ccip/plugintypes"
)

// Default refresh period for cache if not specified
const defaultRefreshPeriod = 30 * time.Second

// TODO: unit test the implementation when the actual contract reader and writer interfaces are finalized and mocks
// can be generated.
type ccipChainReader struct {
Expand All @@ -55,6 +53,8 @@ func newCCIPChainReaderInternal(
destChain cciptypes.ChainSelector,
offrampAddress []byte,
addrCodec cciptypes.AddressCodec,
configPollerSyncFreq commonconfig.Duration,

) *ccipChainReader {
var crs = make(map[cciptypes.ChainSelector]contractreader.Extended)
for chainSelector, cr := range contractReaders {
Expand All @@ -76,7 +76,7 @@ func newCCIPChainReaderInternal(
}

// Initialize cache with readers
reader.configPoller = newConfigPoller(lggr, reader, defaultRefreshPeriod)
reader.configPoller = newConfigPoller(lggr, reader, configPollerSyncFreq)

contracts := ContractAddresses{
consts.ContractNameOffRamp: {
Expand Down
14 changes: 13 additions & 1 deletion pkg/reader/ccip_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"time"

commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives"
Expand Down Expand Up @@ -129,6 +130,7 @@ func NewCCIPChainReader(
destChain cciptypes.ChainSelector,
offrampAddress []byte,
addrCodec cciptypes.AddressCodec,
configPollerSyncFreq commonconfig.Duration,
) CCIPReader {
return NewObservedCCIPReader(
newCCIPChainReaderInternal(
Expand All @@ -139,6 +141,7 @@ func NewCCIPChainReader(
destChain,
offrampAddress,
addrCodec,
configPollerSyncFreq,
),
lggr,
destChain,
Expand All @@ -154,8 +157,17 @@ func NewCCIPReaderWithExtendedContractReaders(
destChain cciptypes.ChainSelector,
offrampAddress []byte,
addrCodec cciptypes.AddressCodec,
configPollerSyncFreq commonconfig.Duration,
) CCIPReader {
cr := newCCIPChainReaderInternal(ctx, lggr, nil, contractWriters, destChain, offrampAddress, addrCodec)
cr := newCCIPChainReaderInternal(
ctx,
lggr,
nil,
contractWriters,
destChain,
offrampAddress,
addrCodec,
configPollerSyncFreq)
for ch, extendedCr := range contractReaders {
cr.WithExtendedContractReader(ch, extendedCr)
}
Expand Down
17 changes: 11 additions & 6 deletions pkg/reader/ccip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"

commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"

"github.com/smartcontractkit/chainlink-ccip/internal"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
Expand All @@ -33,10 +35,11 @@ import (
)

var (
chainA = cciptypes.ChainSelector(1)
chainB = cciptypes.ChainSelector(2)
chainC = cciptypes.ChainSelector(3)
chainD = cciptypes.ChainSelector(4)
chainA = cciptypes.ChainSelector(1)
chainB = cciptypes.ChainSelector(2)
chainC = cciptypes.ChainSelector(3)
chainD = cciptypes.ChainSelector(4)
defaultConfigPollerSyncDuration = *commonconfig.MustNewDuration(30 * time.Second)
)

func TestCCIPChainReader_CreateExecutedMessagesKeyFilter(t *testing.T) {
Expand Down Expand Up @@ -353,7 +356,7 @@ func TestCCIPChainReader_getSourceChainsConfig(t *testing.T) {
chainA: sourceCRs[chainA],
chainB: sourceCRs[chainB],
chainC: destCR,
}, nil, chainC, offrampAddress, mockAddrCodec,
}, nil, chainC, offrampAddress, mockAddrCodec, defaultConfigPollerSyncDuration,
)

// Add cleanup to ensure resources are released
Expand Down Expand Up @@ -1085,7 +1088,7 @@ func TestCCIPChainReader_getFeeQuoterTokenPriceUSD(t *testing.T) {
logger.Test(t),
map[cciptypes.ChainSelector]contractreader.ContractReaderFacade{
chainC: destCR,
}, nil, chainC, offrampAddress, mockAddrCodec,
}, nil, chainC, offrampAddress, mockAddrCodec, defaultConfigPollerSyncDuration,
)

// Add cleanup to properly shut down the background polling
Expand Down Expand Up @@ -1130,6 +1133,7 @@ func TestCCIPFeeComponents_HappyPath(t *testing.T) {
chainC,
[]byte{0x3},
internal.NewMockAddressCodecHex(t),
defaultConfigPollerSyncDuration,
)

// Add cleanup to ensure resources are released
Expand Down Expand Up @@ -1167,6 +1171,7 @@ func TestCCIPFeeComponents_NotFoundErrors(t *testing.T) {
chainC,
[]byte{0x3},
internal.NewMockAddressCodecHex(t),
defaultConfigPollerSyncDuration,
)

// Add cleanup to ensure resources are released
Expand Down
8 changes: 5 additions & 3 deletions pkg/reader/config_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync/atomic"
"time"

commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"

"github.com/smartcontractkit/chainlink-ccip/pkg/consts"
"github.com/smartcontractkit/chainlink-ccip/pkg/contractreader"
cciptypes "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3"
Expand Down Expand Up @@ -41,7 +43,7 @@ type configPoller struct {

sync.RWMutex
chainCaches map[cciptypes.ChainSelector]*chainCache
refreshPeriod time.Duration
refreshPeriod commonconfig.Duration
reader ccipReaderInternal
lggr logger.Logger

Expand Down Expand Up @@ -75,7 +77,7 @@ type chainCache struct {
func newConfigPoller(
lggr logger.Logger,
reader ccipReaderInternal,
refreshPeriod time.Duration,
refreshPeriod commonconfig.Duration,
) *configPoller {
return &configPoller{
chainCaches: make(map[cciptypes.ChainSelector]*chainCache),
Expand All @@ -92,7 +94,7 @@ func (c *configPoller) startBackgroundPolling() {
c.wg.Add(1)
go func() {
defer c.wg.Done()
ticker := time.NewTicker(c.refreshPeriod)
ticker := time.NewTicker(c.refreshPeriod.Duration())
defer ticker.Stop()

for {
Expand Down
23 changes: 12 additions & 11 deletions pkg/reader/config_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
Expand All @@ -32,7 +33,7 @@ func setupBasicCache(t *testing.T) (*configPoller, *reader_mocks.MockExtended) {
destChain: chainA,
}

cache := newConfigPoller(logger.Test(t), reader, 1*time.Second)
cache := newConfigPoller(logger.Test(t), reader, *commonconfig.MustNewDuration(1 * time.Second))
return cache, mockReader
}

Expand Down Expand Up @@ -215,7 +216,7 @@ func TestConfigPoller_StartStop(t *testing.T) {
require.NoError(t, err, "Starting config poller should not error")

// Verify it's running by letting it execute at least once
time.Sleep(2 * cache.refreshPeriod)
time.Sleep(2 * cache.refreshPeriod.Duration())

// Stop the poller
err = cache.Close()
Expand All @@ -229,7 +230,7 @@ func TestConfigPoller_StartStop(t *testing.T) {
Maybe().Return(nil, nil, nil)

// Sleep for refresh period again - no calls should occur
time.Sleep(2 * cache.refreshPeriod)
time.Sleep(2 * cache.refreshPeriod.Duration())

// Verify no calls occurred after stopping
reader.AssertNumberOfCalls(t, "ExtendedBatchGetLatestValues", 0)
Expand Down Expand Up @@ -373,7 +374,7 @@ func TestConfigPoller_TrackSourceChain(t *testing.T) {
require.NoError(t, err)

// Let it run for a refresh cycle (increased duration)
time.Sleep(3 * cache.refreshPeriod)
time.Sleep(3 * cache.refreshPeriod.Duration())

// Stop the poller
err = cache.Close()
Expand Down Expand Up @@ -402,7 +403,7 @@ func TestConfigPoller_BackgroundErrorHandling(t *testing.T) {
require.NoError(t, err)

// Let it run and encounter the error
time.Sleep(2 * cache.refreshPeriod)
time.Sleep(2 * cache.refreshPeriod.Duration())

// Stop the poller
err = cache.Close()
Expand Down Expand Up @@ -746,7 +747,7 @@ func TestConfigCache_Initialization(t *testing.T) {
testCases := []struct {
name string
setupReader func() *ccipChainReader
refreshPeriod time.Duration
refreshPeriod commonconfig.Duration
chainToTest cciptypes.ChainSelector
expectedErr string
}{
Expand All @@ -759,7 +760,7 @@ func TestConfigCache_Initialization(t *testing.T) {
destChain: chainA,
}
},
refreshPeriod: time.Second,
refreshPeriod: *commonconfig.MustNewDuration(1 * time.Second),
chainToTest: chainA,
expectedErr: "no contract reader for chain",
},
Expand All @@ -772,7 +773,7 @@ func TestConfigCache_Initialization(t *testing.T) {
destChain: chainA,
}
},
refreshPeriod: time.Second,
refreshPeriod: *commonconfig.MustNewDuration(1 * time.Second),
chainToTest: chainA,
expectedErr: "no contract reader for chain",
},
Expand All @@ -787,7 +788,7 @@ func TestConfigCache_Initialization(t *testing.T) {
destChain: chainA,
}
},
refreshPeriod: time.Second,
refreshPeriod: *commonconfig.MustNewDuration(1 * time.Second),
chainToTest: chainA,
expectedErr: "no contract reader for chain",
},
Expand Down Expand Up @@ -958,7 +959,7 @@ func TestConfigCache_MultipleChains(t *testing.T) {
destChain: chainA,
}

cache := newConfigPoller(logger.Test(t), reader, 1*time.Second)
cache := newConfigPoller(logger.Test(t), reader, *commonconfig.MustNewDuration(1 * time.Second))
ctx := tests.Context(t)

// Setup mock response for both chains
Expand Down Expand Up @@ -1046,7 +1047,7 @@ func TestConfigCache_BackgroundRefreshPeriod(t *testing.T) {
destChain: chainA,
}

cache := newConfigPoller(logger.Test(t), reader, tc.refreshPeriod)
cache := newConfigPoller(logger.Test(t), reader, *commonconfig.MustNewDuration(tc.refreshPeriod))
ctx := tests.Context(t)

mockConfig := OCRConfigResponse{
Expand Down
12 changes: 12 additions & 0 deletions pluginconfig/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
defaultInflightPriceCheckRetries = 5
defaultAsyncObserverSyncFreq = 5 * time.Second
defaultAsyncObserverSyncTimeout = 10 * time.Second
defaultConfigPollerSyncFreq = 30 * time.Second
)

type TokenInfo struct {
Expand Down Expand Up @@ -162,6 +163,9 @@ type CommitOffchainConfig struct {
// in order to avoid delays when there are reports from multiple sources.
// NOTE: this can only be used if RMNEnabled == false.
MultipleReportsEnabled bool `json:"multipleReports"`

// ConfigPollerSyncFreq is the frequency at which the config poller should sync.
ConfigPollerSyncFreq commonconfig.Duration `json:"configPollerSyncFreq"`
}

//nolint:gocyclo // it is considered ok since we don't have complicated logic here
Expand Down Expand Up @@ -225,6 +229,10 @@ func (c *CommitOffchainConfig) applyDefaults() {
c.TokenPriceAsyncObserverSyncTimeout = *commonconfig.MustNewDuration(defaultAsyncObserverSyncTimeout)
}
}

if c.ConfigPollerSyncFreq.Duration() == 0 {
c.ConfigPollerSyncFreq = *commonconfig.MustNewDuration(defaultConfigPollerSyncFreq)
}
}

//nolint:gocyclo // it is considered ok since we don't have complicated logic here
Expand Down Expand Up @@ -281,6 +289,10 @@ func (c *CommitOffchainConfig) Validate() error {
c.ChainFeeAsyncObserverSyncFreq, c.ChainFeeAsyncObserverSyncTimeout)
}

if c.ConfigPollerSyncFreq.Duration() == 0 {
return errors.New("ConfigPollerSyncFreq not set")
}

// Options for multiple reports. These settings were added so that Solana can be configured
// to split merkle roots across multiple reports. The functions do not support RMN, so it is
// an error to use them unless RMNEnabled == false.
Expand Down
Loading
Loading