diff --git a/execute/costlymessages/costly_messages.go b/execute/costlymessages/costly_messages.go index 90ba6c46df..51127a4e17 100644 --- a/execute/costlymessages/costly_messages.go +++ b/execute/costlymessages/costly_messages.go @@ -116,6 +116,10 @@ func (o *observer) Observe( costlyMessages := make([]cciptypes.Bytes32, 0) for _, msg := range messages { + if msg.IsEmpty() { + continue + } + fee, ok := messageFees[msg.Header.MessageID] if !ok { return nil, fmt.Errorf("missing fee for message %s", msg.Header.MessageID) @@ -335,6 +339,9 @@ func (n *StaticMessageExecCostUSD18Calculator) MessageExecCostUSD18( messageExecCosts := make(map[cciptypes.Bytes32]plugintypes.USD18) for _, msg := range messages { + if msg.IsEmpty() { + continue + } cost, ok := n.costs[msg.Header.MessageID] if !ok { return nil, fmt.Errorf("missing exec cost for message %s", msg.Header.MessageID) @@ -397,6 +404,9 @@ func (c *CCIPMessageFeeUSD18Calculator) MessageFeeUSD18( messageFees := make(map[cciptypes.Bytes32]plugintypes.USD18) for _, msg := range messages { + if msg.IsEmpty() { + continue + } feeUSD18 := mathslib.CalculateUsdPerUnitGas(msg.FeeValueJuels.Int, linkPriceUSD.Int) timestamp, ok := messageTimeStamps[msg.Header.MessageID] if !ok { diff --git a/execute/factory.go b/execute/factory.go index 04aa05b9bb..ab008b87be 100644 --- a/execute/factory.go +++ b/execute/factory.go @@ -190,6 +190,7 @@ func (p PluginFactory) NewReportingPlugin( lggr, costlyMessageObserver, metricsReporter, + p.chainWriters[p.ocrConfig.Config.ChainSelector].GetTransactionStatus, // dest writer can get status. ), ocr3types.ReportingPluginInfo{ Name: "CCIPRoleExecute", Limits: ocr3types.ReportingPluginLimits{ diff --git a/execute/observation.go b/execute/observation.go index 9ebbff9b89..9bc54ec4ed 100644 --- a/execute/observation.go +++ b/execute/observation.go @@ -8,12 +8,12 @@ import ( "golang.org/x/exp/maps" "github.com/smartcontractkit/chainlink-common/pkg/logger" - "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" "github.com/smartcontractkit/libocr/offchainreporting2plus/types" "github.com/smartcontractkit/chainlink-ccip/execute/exectypes" "github.com/smartcontractkit/chainlink-ccip/execute/optimizers" + "github.com/smartcontractkit/chainlink-ccip/execute/report" typeconv "github.com/smartcontractkit/chainlink-ccip/internal/libs/typeconv" dt "github.com/smartcontractkit/chainlink-ccip/internal/plugincommon/discovery/discoverytypes" "github.com/smartcontractkit/chainlink-ccip/pkg/logutil" @@ -273,6 +273,47 @@ func readAllMessages( return messageObs, availableReports, messageTimestamps } +// filterMessages according to various caches that we know about. +func (p *Plugin) filterMessages( + ctx context.Context, lggr logger.Logger, messageObs exectypes.MessageObservations, +) exectypes.MessageObservations { + + // TXM Status checking only supports singleton execution because the msgID is used for the transactionID. + // Arbitrary txm checking would be much more complex because we don't know which messages are in the same report. + isSingletonExecute := p.offchainCfg.MaxReportMessages == 1 && p.offchainCfg.MaxSingleChainReports == 1 + if isSingletonExecute { + // Reset cache during message observation. It will be referenced again when the report is generated. + p.statusCache = report.NewMessageStatusCache(p.statusGetter) + } + txmStatusChecker := report.NewTXMCheck(p.statusCache, p.offchainCfg.MaxTxmStatusChecks) + for chainSelector, msgs := range messageObs { + for seqNum, msg := range msgs { + // Inflight messages do not need to be observed. + if p.inflightMessageCache != nil && p.inflightMessageCache.IsInflight(chainSelector, msg.Header.MessageID) { + messageObs[chainSelector][seqNum] = cciptypes.Message{} + //delete(messageObs[chainSelector], seqNum) + lggr.Infow("skipping message observation - inflight", "msg", msg) + continue + } + // Messages with fatal txm statuses do not need to be observed. + if p.statusGetter != nil && isSingletonExecute { + status, err := txmStatusChecker(ctx, lggr, msg, 0, exectypes.CommitData{}) + if err != nil { + lggr.Errorw("txm status check error", "msg", msg, "err", err) + } else if status != report.None { + messageObs[chainSelector][seqNum] = cciptypes.Message{} + //delete(messageObs[chainSelector], seqNum) + lggr.Infow(fmt.Sprintf("skipping message observation - txm status %s", status), + "msg", msg) + continue + } + } + } + } + + return messageObs +} + func (p *Plugin) getMessagesObservation( ctx context.Context, lggr logger.Logger, @@ -296,6 +337,18 @@ func (p *Plugin) getMessagesObservation( previousOutcome.CommitReports, ) + // Compute hashes to allow deleting messages from the observation. + hashes, err := exectypes.GetHashes(ctx, messageObs, p.msgHasher) + if err != nil { + return exectypes.Observation{}, fmt.Errorf("unable to get message hashes: %w", err) + } + + // Remove messages which have failed txm statuses. + // This is a roundabout way to remove messages from the observation because it will continue to be + // executed by other nodes until >F nodes have failed to execute it. + // We cannot form consensus on the TXM state because the fatal status is not recorded onchain. + messageObs = p.filterMessages(ctx, lggr, messageObs) + tkData, err1 := p.tokenDataObserver.Observe(ctx, messageObs) if err1 != nil { return exectypes.Observation{}, fmt.Errorf("unable to process token data %w", err1) @@ -312,11 +365,6 @@ func (p *Plugin) getMessagesObservation( return exectypes.Observation{}, fmt.Errorf("unable to observe costly messages: %w", err) } - hashes, err := exectypes.GetHashes(ctx, messageObs, p.msgHasher) - if err != nil { - return exectypes.Observation{}, fmt.Errorf("unable to get message hashes: %w", err) - } - observation.CommitReports = commitReportCache observation.Messages = messageObs observation.Hashes = hashes diff --git a/execute/observation_test.go b/execute/observation_test.go index c85751a5ff..992104fcb4 100644 --- a/execute/observation_test.go +++ b/execute/observation_test.go @@ -9,11 +9,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" "github.com/smartcontractkit/chainlink-ccip/execute/costlymessages" "github.com/smartcontractkit/chainlink-ccip/execute/exectypes" "github.com/smartcontractkit/chainlink-ccip/execute/internal/cache" + "github.com/smartcontractkit/chainlink-ccip/execute/report" "github.com/smartcontractkit/chainlink-ccip/execute/tokendata" "github.com/smartcontractkit/chainlink-ccip/internal/mocks" "github.com/smartcontractkit/chainlink-ccip/mocks/internal_/reader" @@ -104,31 +106,13 @@ func Test_Observation_CacheUpdate(t *testing.T) { } func Test_getMessagesObservation(t *testing.T) { - ctx := context.Background() - - // Create mock objects - ccipReader := readerpkg_mock.NewMockCCIPReader(t) - msgHasher := mocks.NewMessageHasher() - tokenDataObserver := tokendata.NoopTokenDataObserver{} - costlyMessageObserver := costlymessages.NoopObserver{} - - //emptyMsgHash, err := msgHasher.Hash(ctx, cciptypes.Message{}) - //require.NoError(t, err) - // Set up the plugin with mock objects - plugin := &Plugin{ - lggr: mocks.NullLogger, - ccipReader: ccipReader, - msgHasher: msgHasher, - tokenDataObserver: &tokenDataObserver, - costlyMessageObserver: &costlyMessageObserver, - ocrTypeCodec: jsonOcrTypeCodec, - } - tests := []struct { - name string - previousOutcome exectypes.Outcome - expectedObs exectypes.Observation - expectedError bool + name string + previousOutcome exectypes.Outcome + expectedObs exectypes.Observation + inflightMessageCache func() *cache.InflightMessageCache + statusGetter func() report.StatusGetter + expectedError bool }{ { name: "no commit reports", @@ -230,10 +214,129 @@ func Test_getMessagesObservation(t *testing.T) { }, expectedError: false, }, + { + name: "skip inflight messages", + + inflightMessageCache: func() *cache.InflightMessageCache { + cache := cache.NewInflightMessageCache(10 * time.Minute) + cache.MarkInflight(1, cciptypes.Bytes32{}) + //cache.MarkInflight(1, ) + return cache + }, + //inflightMessageCache func() *cache.InflightMessageCache + //statusGetter func() report.StatusGetter + previousOutcome: exectypes.Outcome{ + CommitReports: []exectypes.CommitData{ + { + SourceChain: 1, + SequenceNumberRange: cciptypes.NewSeqNumRange(1, 3), + }, + }, + }, + expectedObs: exectypes.Observation{ + CommitReports: exectypes.CommitObservations{ + 1: []exectypes.CommitData{ + { + SourceChain: 1, + SequenceNumberRange: cciptypes.NewSeqNumRange(1, 3), + }, + }, + }, + Messages: exectypes.MessageObservations{ + 1: { + 1: cciptypes.Message{}, // skipped message is truncated + 2: cciptypes.Message{}, // skipped message is truncated + 3: cciptypes.Message{}, // skipped message is truncated + }, + }, + CostlyMessages: []cciptypes.Bytes32{}, + TokenData: exectypes.TokenDataObservations{ + 1: { + 1: exectypes.NewMessageTokenData(), + 2: exectypes.NewMessageTokenData(), + 3: exectypes.NewMessageTokenData(), + }, + }, + }, + expectedError: false, + }, + { + name: "bad message status", + + statusGetter: func() report.StatusGetter { + return func(ctx context.Context, transactionID string) (types.TransactionStatus, error) { + return types.Fatal, nil + } + }, + previousOutcome: exectypes.Outcome{ + CommitReports: []exectypes.CommitData{ + { + SourceChain: 1, + SequenceNumberRange: cciptypes.NewSeqNumRange(1, 3), + }, + }, + }, + expectedObs: exectypes.Observation{ + CommitReports: exectypes.CommitObservations{ + 1: []exectypes.CommitData{ + { + SourceChain: 1, + SequenceNumberRange: cciptypes.NewSeqNumRange(1, 3), + }, + }, + }, + Messages: exectypes.MessageObservations{ + 1: { + 1: cciptypes.Message{}, // skipped message is truncated + 2: cciptypes.Message{}, // skipped message is truncated + 3: cciptypes.Message{}, // skipped message is truncated + }, + }, + CostlyMessages: []cciptypes.Bytes32{}, + TokenData: exectypes.TokenDataObservations{ + 1: { + 1: exectypes.NewMessageTokenData(), + 2: exectypes.NewMessageTokenData(), + 3: exectypes.NewMessageTokenData(), + }, + }, + }, + expectedError: false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + + // Create mock objects + ccipReader := readerpkg_mock.NewMockCCIPReader(t) + msgHasher := mocks.NewMessageHasher() + tokenDataObserver := tokendata.NoopTokenDataObserver{} + costlyMessageObserver := costlymessages.NoopObserver{} + + // Set up the plugin with mock objects + plugin := &Plugin{ + lggr: mocks.NullLogger, + ccipReader: ccipReader, + msgHasher: msgHasher, + tokenDataObserver: &tokenDataObserver, + costlyMessageObserver: &costlyMessageObserver, + ocrTypeCodec: jsonOcrTypeCodec, + } + + if tt.inflightMessageCache != nil { + plugin.inflightMessageCache = tt.inflightMessageCache() + } + + if tt.statusGetter != nil { + plugin.statusGetter = tt.statusGetter() + + // If the status getter is provided, set the offchain configuration to use it. + plugin.offchainCfg.MaxSingleChainReports = 1 + plugin.offchainCfg.MaxReportMessages = 1 + } + // Set up mock expectations ccipReader.On("MsgsBetweenSeqNums", ctx, cciptypes.ChainSelector(1), cciptypes.NewSeqNumRange(1, 3)).Return([]cciptypes.Message{ diff --git a/execute/plugin.go b/execute/plugin.go index dbed4c4ce3..e75c8005bb 100644 --- a/execute/plugin.go +++ b/execute/plugin.go @@ -62,6 +62,7 @@ type Plugin struct { discovery ContractDiscoveryInterface chainSupport plugincommon.ChainSupport observer metrics.Reporter + statusGetter report.StatusGetter oracleIDToP2pID map[commontypes.OracleID]libocrtypes.PeerID tokenDataObserver tokendata.TokenDataObserver @@ -77,6 +78,8 @@ type Plugin struct { commitRootsCache cache.CommitsRootsCache // inflightMessageCache prevents duplicate reports from being sent for the same message. inflightMessageCache inflightMessageCache + // statusCache remembers message status to optimize DB lookups. It is reset for each round. + statusCache *report.MessageStatusCache } func NewPlugin( @@ -94,6 +97,7 @@ func NewPlugin( lggr logger.Logger, costlyMessageObserver costlymessages.Observer, metricsReporter metrics.Reporter, + getter report.StatusGetter, ) *Plugin { lggr.Infow("creating new plugin instance", "p2pID", oracleIDToP2pID[reportingCfg.OracleID]) @@ -111,6 +115,7 @@ func NewPlugin( estimateProvider: estimateProvider, lggr: logutil.WithComponent(lggr, "ExecutePlugin"), costlyMessageObserver: costlyMessageObserver, + statusGetter: getter, discovery: discovery.NewContractDiscoveryProcessor( logutil.WithComponent(lggr, "Discovery"), &ccipReader, @@ -459,6 +464,18 @@ func (p *Plugin) Reports( } reportInfo := extractReportInfo(decodedOutcome) + + // Add the TxID if we have a single chain report with a single message. + if p.offchainCfg.MaxSingleChainReports == 1 && p.offchainCfg.MaxReportMessages == 1 { + if len(decodedOutcome.Report.ChainReports) == 1 { + if len(decodedOutcome.Report.ChainReports[0].Messages) == 1 { + msg := decodedOutcome.Report.ChainReports[0].Messages[0] + reportInfo.TxID = p.statusCache.NextTransactionID( + msg.Header.SourceChainSelector, msg.Header.MessageID.String()) + } + } + } + encodedInfo, err := reportInfo.Encode() if err != nil { return nil, err diff --git a/execute/plugin_e2e_test.go b/execute/plugin_e2e_test.go index 8a28b03954..539ade57e2 100644 --- a/execute/plugin_e2e_test.go +++ b/execute/plugin_e2e_test.go @@ -99,9 +99,13 @@ func Test_ExcludingCostlyMessages(t *testing.T) { // Message1 cost=40000, fee=10000 // Message2 cost=200000, fee=20000 // Message3 cost=200000, fee=30000 - for i := 0; i < 3; i++ { - outcome = runRoundAndGetOutcome(ctx, ocrTypeCodec, t, runner) - } + outcome = runRoundAndGetOutcome(ctx, ocrTypeCodec, t, runner) + require.Equal(t, exectypes.GetCommitReports, outcome.State) + outcome = runRoundAndGetOutcome(ctx, ocrTypeCodec, t, runner) + require.Equal(t, exectypes.GetMessages, outcome.State) + outcome = runRoundAndGetOutcome(ctx, ocrTypeCodec, t, runner) + // empty outcome, everything is too costly + require.Equal(t, exectypes.Initialized, outcome.State) require.Len(t, outcome.Report.ChainReports, 0) // 4 hours later, we agree to pay higher fee, but only for the first message @@ -109,9 +113,12 @@ func Test_ExcludingCostlyMessages(t *testing.T) { // Message2 cost=200000, fee=20000 // Message3 cost=200000, fee=30000 tm.SetNow(time.Now()) - for i := 0; i < 3; i++ { - outcome = runRoundAndGetOutcome(ctx, ocrTypeCodec, t, runner) - } + outcome = runRoundAndGetOutcome(ctx, ocrTypeCodec, t, runner) + require.Equal(t, exectypes.GetCommitReports, outcome.State) + outcome = runRoundAndGetOutcome(ctx, ocrTypeCodec, t, runner) + require.Equal(t, exectypes.GetMessages, outcome.State) + outcome = runRoundAndGetOutcome(ctx, ocrTypeCodec, t, runner) + require.Equal(t, exectypes.Filter, outcome.State) sequenceNumbers := extractSequenceNumbers(outcome.Report.ChainReports[0].Messages) require.ElementsMatch(t, sequenceNumbers, []cciptypes.SeqNum{100}) @@ -121,9 +128,12 @@ func Test_ExcludingCostlyMessages(t *testing.T) { // Message2 cost=40000, fee=100000 // Message3 cost=200000, fee=150000 intTest.UpdateExecutionCost(messages[1].Header.MessageID, 40000) - for i := 0; i < 3; i++ { - outcome = runRoundAndGetOutcome(ctx, ocrTypeCodec, t, runner) - } + outcome = runRoundAndGetOutcome(ctx, ocrTypeCodec, t, runner) + require.Equal(t, exectypes.GetCommitReports, outcome.State) + outcome = runRoundAndGetOutcome(ctx, ocrTypeCodec, t, runner) + require.Equal(t, exectypes.GetMessages, outcome.State) + outcome = runRoundAndGetOutcome(ctx, ocrTypeCodec, t, runner) + require.Equal(t, exectypes.Filter, outcome.State) sequenceNumbers = extractSequenceNumbers(outcome.Report.ChainReports[0].Messages) require.ElementsMatch(t, sequenceNumbers, []cciptypes.SeqNum{101}) @@ -133,9 +143,10 @@ func Test_ExcludingCostlyMessages(t *testing.T) { // Message2 cost=40000, fee=160000 // Message3 cost=200000, fee=240000 tm.SetNow(time.Now().Add(3 * time.Hour)) - for i := 0; i < 3; i++ { - outcome = runRoundAndGetOutcome(ctx, ocrTypeCodec, t, runner) - } + require.Equal(t, exectypes.GetCommitReports, runRoundAndGetOutcome(ctx, ocrTypeCodec, t, runner).State) + require.Equal(t, exectypes.GetMessages, runRoundAndGetOutcome(ctx, ocrTypeCodec, t, runner).State) + outcome = runRoundAndGetOutcome(ctx, ocrTypeCodec, t, runner) + require.Equal(t, exectypes.Filter, outcome.State) sequenceNumbers = extractSequenceNumbers(outcome.Report.ChainReports[0].Messages) require.ElementsMatch(t, sequenceNumbers, []cciptypes.SeqNum{102}) } diff --git a/execute/report/report.go b/execute/report/report.go index 451e8bbe96..5b0d9177ca 100644 --- a/execute/report/report.go +++ b/execute/report/report.go @@ -134,6 +134,9 @@ const ( MissingNonce messageStatus = "missing_nonce" InvalidNonce messageStatus = "invalid_nonce" TooCostly messageStatus = "tooCostly" + TXMCheckError messageStatus = "txm_check_error" + TXMFatalStatus messageStatus = "txm_fatal_status" + SkippedInflight messageStatus = "skipped_inflight" /* SenderAlreadySkipped messageStatus = "sender_already_skipped" MessageMaxGasCalcError messageStatus = "message_max_gas_calc_error" @@ -148,7 +151,13 @@ const ( ) // Check for the messages. -type Check func(lggr logger.Logger, msg ccipocr3.Message, idx int, report exectypes.CommitData) (messageStatus, error) +type Check func( + ctx context.Context, + lggr logger.Logger, + msg ccipocr3.Message, + idx int, + report exectypes.CommitData, +) (messageStatus, error) /* // Template @@ -165,7 +174,13 @@ func Template() Check { // CheckIfPseudoDeleted checks if the message has been removed, typically done to reduce observation size. // This check should happen early because other checks are likely to fail if the message has been deleted. func CheckIfPseudoDeleted() Check { - return func(lggr logger.Logger, msg ccipocr3.Message, idx int, report exectypes.CommitData) (messageStatus, error) { + return func( + _ context.Context, + lggr logger.Logger, + msg ccipocr3.Message, + idx int, + report exectypes.CommitData, + ) (messageStatus, error) { if msg.IsEmpty() { lggr.Errorw("message pseudo deleted", "index", idx) return PseudoDeleted, nil @@ -176,7 +191,13 @@ func CheckIfPseudoDeleted() Check { // CheckAlreadyExecuted checks the report executed list to see if the message has been executed. func CheckAlreadyExecuted() Check { - return func(lggr logger.Logger, msg ccipocr3.Message, idx int, report exectypes.CommitData) (messageStatus, error) { + return func( + _ context.Context, + lggr logger.Logger, + msg ccipocr3.Message, + idx int, + report exectypes.CommitData, + ) (messageStatus, error) { if slices.Contains(report.ExecutedMessages, msg.Header.SequenceNumber) { lggr.Infow( "message already executed", @@ -193,7 +214,13 @@ func CheckAlreadyExecuted() Check { // CheckTokenData rejects messages which are missing their token data (attestations, i.e. CCTP). func CheckTokenData() Check { - return func(lggr logger.Logger, msg ccipocr3.Message, idx int, report exectypes.CommitData) (messageStatus, error) { + return func( + _ context.Context, + lggr logger.Logger, + msg ccipocr3.Message, + idx int, + report exectypes.CommitData, + ) (messageStatus, error) { if idx >= len(report.MessageTokenData) { lggr.Errorw("token data index out of range", "index", idx, "messageTokensData", len(report.MessageTokenData)) return Error, fmt.Errorf("token data index out of range") @@ -224,7 +251,13 @@ func CheckTokenData() Check { // CheckTooCostly compares the costly list for a given message. func CheckTooCostly() Check { - return func(lggr logger.Logger, msg ccipocr3.Message, idx int, report exectypes.CommitData) (messageStatus, error) { + return func( + _ context.Context, + lggr logger.Logger, + msg ccipocr3.Message, + idx int, + report exectypes.CommitData, + ) (messageStatus, error) { // 4. Check if the message is too costly to execute. if slices.Contains(report.CostlyMessages, msg.Header.MessageID) { lggr.Infow( @@ -249,7 +282,13 @@ func CheckNonces(sendersNonce map[ccipocr3.ChainSelector]map[string]uint64) Chec // temporary map to store state between nonce checks for this round. expectedNonce := make(map[ccipocr3.ChainSelector]map[string]uint64) - return func(lggr logger.Logger, msg ccipocr3.Message, idx int, report exectypes.CommitData) (messageStatus, error) { + return func( + _ context.Context, + lggr logger.Logger, + msg ccipocr3.Message, + idx int, + report exectypes.CommitData, + ) (messageStatus, error) { // Setting the Nonce to zero (or omitting it) indicates that the message // can be executed out of order. We allow this in the plugin by skipping // the nonce check. @@ -305,7 +344,13 @@ func CheckNonces(sendersNonce map[ccipocr3.ChainSelector]map[string]uint64) Chec type IsInflight func(src ccipocr3.ChainSelector, msgID ccipocr3.Bytes32) bool func CheckIfInflight(inflight IsInflight) Check { - return func(lggr logger.Logger, msg ccipocr3.Message, idx int, report exectypes.CommitData) (messageStatus, error) { + return func( + _ context.Context, + lggr logger.Logger, + msg ccipocr3.Message, + idx int, + report exectypes.CommitData, + ) (messageStatus, error) { if inflight(report.SourceChain, msg.Header.MessageID) { lggr.Infow( "message already in flight", @@ -338,7 +383,7 @@ func (b *execReportBuilder) checkMessages(ctx context.Context, report exectypes. // checkMessage for execution readiness. func (b *execReportBuilder) checkMessage( - _ context.Context, idx int, execReport exectypes.CommitData, + ctx context.Context, idx int, execReport exectypes.CommitData, ) (exectypes.CommitData, messageStatus, error) { result := execReport @@ -351,7 +396,7 @@ func (b *execReportBuilder) checkMessage( msg := execReport.Messages[idx] for _, check := range b.checks { - status, err := check(b.lggr, msg, idx, execReport) + status, err := check(ctx, b.lggr, msg, idx, execReport) if err != nil { return execReport, Error, err } diff --git a/execute/report/txm_checker.go b/execute/report/txm_checker.go new file mode 100644 index 0000000000..4a817734dc --- /dev/null +++ b/execute/report/txm_checker.go @@ -0,0 +1,121 @@ +package report + +import ( + "context" + "fmt" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/types" + + "github.com/smartcontractkit/chainlink-ccip/execute/exectypes" + "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3" +) + +// StatusGetter is a narrowed interface to the contract writer that only exposes the GetTransactionStatus method. +type StatusGetter func(ctx context.Context, transactionID string) (types.TransactionStatus, error) + +type MessageStatusDetails struct { + numAttempts uint64 + hasFatalStatus bool +} + +// MessageStatusCache is a cache of status metadata needed for each message. It is multi-purpose, intended +// to support report checking and for computing the next txID for the transmitter. +// +// This object needs to be recreated for each round in order to refresh the cache. +type MessageStatusCache struct { + statusGetter StatusGetter + + cache map[string]MessageStatusDetails +} + +// NewMessageStatusCache creates a new cache for message statuses. +func NewMessageStatusCache(statusGetter StatusGetter) *MessageStatusCache { + return &MessageStatusCache{ + statusGetter: statusGetter, + cache: make(map[string]MessageStatusDetails), + } +} + +func toID(selector ccipocr3.ChainSelector, msgID string, attempt uint64) string { + return fmt.Sprintf("%d-%s-%d", selector, msgID, attempt) +} + +// NextTransactionID returns the next transaction ID for the given message ID. +func (m *MessageStatusCache) NextTransactionID(selector ccipocr3.ChainSelector, msgID string) string { + // this works with a zero value, so no need to hceck if cache[msgID] exists. + return toID(selector, msgID, m.cache[msgID].numAttempts) +} + +// statuses fetches the statusu details for a given message ID. Results are cached for subsequent calls. +func (m *MessageStatusCache) statuses( + ctx context.Context, + selector ccipocr3.ChainSelector, + msgID string, +) (MessageStatusDetails, error) { + if details, ok := m.cache[msgID]; ok { + return details, nil + } + + var details MessageStatusDetails + for { + transactionID := toID(selector, msgID, details.numAttempts) + status, err := m.statusGetter(ctx, transactionID) + + if err != nil && status == types.Unknown { + // TODO: Would be nice to have a typed error here as a more deliberate check. + // By code inspection we see that if the status is unknown and err not nil, + // it means the transaction was not found. It will be the next txID. + break + } else if err != nil { + // TODO: Why wasn't this case wasn't handled in 1.5? + return MessageStatusDetails{}, err + } + if status == types.Fatal { + details.hasFatalStatus = true + break + } + details.numAttempts++ + } + + m.cache[msgID] = details + return details, nil +} + +// NewTXMCheck creates a new check that queries the TXM for the status of a transaction. +// +// Algorithm ported from OCR2 version: chainlink/core/services/relay/evm/statuschecker/txm_status_checker.go#L31 +// +// TODO: this is used during observation, not report building. So it no longer needs to implement the Check interface. +func NewTXMCheck(statuses *MessageStatusCache, maxAttempts uint64) Check { + return func( + ctx context.Context, + lggr logger.Logger, + msg ccipocr3.Message, + _ int, + _ exectypes.CommitData, + ) (messageStatus, error) { + details, err := statuses.statuses(ctx, msg.Header.SourceChainSelector, msg.Header.MessageID.String()) + if err != nil { + return None, err + } + + if details.hasFatalStatus { + lggr.Infow("Skipping message - found a fatal TXM status", + "message", msg, + ) + return TXMFatalStatus, nil + } + + if details.numAttempts >= maxAttempts { + lggr.Infow( + "skipping message due to maximum number of statuses reached, possible infinite loop", + "message", msg, + "maxAttempts", maxAttempts, + ) + return TXMCheckError, nil + } + + return None, nil + } +} diff --git a/execute/test_utils.go b/execute/test_utils.go index b4631b4547..d634869ca0 100644 --- a/execute/test_utils.go +++ b/execute/test_utils.go @@ -343,6 +343,7 @@ func (it *IntTest) newNode( it.lggr, costlyMessageObserver, &metrics.Noop{}, + nil, ) return nodeSetup{ diff --git a/pkg/types/ccipocr3/plugin_execute_types.go b/pkg/types/ccipocr3/plugin_execute_types.go index 5e70b7d43c..7c2a91f910 100644 --- a/pkg/types/ccipocr3/plugin_execute_types.go +++ b/pkg/types/ccipocr3/plugin_execute_types.go @@ -23,6 +23,9 @@ type ExecutePluginReportSingleChain struct { type ExecuteReportInfo struct { AbstractReports []ExecutePluginReportSingleChain MerkleRoots []MerkleRootChain + + // TxID to use when submitting the report. + TxID string `json:",omitempty"` } // Encode v1 execute report info. Very basic versioning in the first byte to diff --git a/pkg/types/ccipocr3/plugin_execute_types_test.go b/pkg/types/ccipocr3/plugin_execute_types_test.go index fa1acb17c4..304c4c56c1 100644 --- a/pkg/types/ccipocr3/plugin_execute_types_test.go +++ b/pkg/types/ccipocr3/plugin_execute_types_test.go @@ -69,12 +69,38 @@ func TestExecuteReportInfo_EncodeDecode(t *testing.T) { want: append([]byte{1}, []byte(`{"AbstractReports":[{"sourceChainSelector":1,"messages":[],"offchainTokenData":[],"proofs":[],"proofFlagBits":null}],"MerkleRoots":[{"chain":10,"onRampAddress":"0x04d4cc5972ad487f71b85654d48b27d32b13a22f","seqNumsRange":[100,200],"merkleRoot":"0x0000000000000000000000000000000000000000000000000000000000000000"}]}`)...), wantErr: require.NoError, }, + { + name: "object with txid", + reportInfo: ExecuteReportInfo{ + AbstractReports: []ExecutePluginReportSingleChain{ + { + SourceChainSelector: ChainSelector(1), + Messages: []Message{}, + OffchainTokenData: [][][]byte{}, + Proofs: []Bytes32{}, + ProofFlagBits: BigInt{}, + }, + }, + MerkleRoots: []MerkleRootChain{ + { + ChainSel: 10, + OnRampAddress: mustNewUnknownAddress(t, "0x04D4cC5972ad487F71b85654d48b27D32b13a22F"), + SeqNumsRange: NewSeqNumRange(100, 200), + MerkleRoot: Bytes32{}, + }, + }, + TxID: "asdf", + }, + //nolint:lll + want: append([]byte{1}, []byte(`{"AbstractReports":[{"sourceChainSelector":1,"messages":[],"offchainTokenData":[],"proofs":[],"proofFlagBits":null}],"MerkleRoots":[{"chain":10,"onRampAddress":"0x04d4cc5972ad487f71b85654d48b27d32b13a22f","seqNumsRange":[100,200],"merkleRoot":"0x0000000000000000000000000000000000000000000000000000000000000000"}],"TxID":"asdf"}`)...), + wantErr: require.NoError, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { got, err := tt.reportInfo.Encode() tt.wantErr(t, err, "Encode()") - require.Equalf(t, tt.want, got, "Encode()") + require.Equalf(t, string(tt.want), string(got), "Encode()") eri2, err := DecodeExecuteReportInfo(got) tt.wantErr(t, err, "Decode()") diff --git a/pluginconfig/execute.go b/pluginconfig/execute.go index 28719bab01..7b09c19484 100644 --- a/pluginconfig/execute.go +++ b/pluginconfig/execute.go @@ -50,6 +50,10 @@ type ExecuteOffchainConfig struct { // MaxSingleChainReports is the maximum number of single chain reports that can be included in a report. // When set to 0, this setting is ignored. MaxSingleChainReports uint64 `json:"maxSingleChainReports"` + + // MaxTxmStatusChecks is the maximum number of times to check the transaction manager status for a fatal + // transaction result. When set to 0 Txm status checking is disabled. + MaxTxmStatusChecks uint64 `json:"maxTxmStatusChecks"` } func (e *ExecuteOffchainConfig) ApplyDefaultsAndValidate() error {