Skip to content

Commit ae35471

Browse files
Refactored syncing tx error messages by separated core and created separated engine for tx error messages, updated tests
1 parent 9ce9d22 commit ae35471

9 files changed

Lines changed: 1019 additions & 502 deletions

File tree

cmd/access/node_builder/access_node_builder.go

Lines changed: 52 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import (
4545
"github.com/onflow/flow-go/engine"
4646
"github.com/onflow/flow-go/engine/access/index"
4747
"github.com/onflow/flow-go/engine/access/ingestion"
48+
"github.com/onflow/flow-go/engine/access/ingestion/tx_error_messages"
4849
pingeng "github.com/onflow/flow-go/engine/access/ping"
4950
"github.com/onflow/flow-go/engine/access/rest"
5051
"github.com/onflow/flow-go/engine/access/rest/routes"
@@ -56,6 +57,7 @@ import (
5657
"github.com/onflow/flow-go/engine/access/subscription"
5758
followereng "github.com/onflow/flow-go/engine/common/follower"
5859
"github.com/onflow/flow-go/engine/common/requester"
60+
commonrpc "github.com/onflow/flow-go/engine/common/rpc"
5961
"github.com/onflow/flow-go/engine/common/stop"
6062
synceng "github.com/onflow/flow-go/engine/common/synchronization"
6163
"github.com/onflow/flow-go/engine/common/version"
@@ -352,6 +354,8 @@ type FlowAccessNodeBuilder struct {
352354

353355
stateStreamBackend *statestreambackend.StateStreamBackend
354356
nodeBackend *backend.Backend
357+
358+
TxResultErrorMessagesCore *tx_error_messages.TxErrorMessagesCore
355359
}
356360

357361
func (builder *FlowAccessNodeBuilder) buildFollowerState() *FlowAccessNodeBuilder {
@@ -1804,13 +1808,6 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
18041808
processedFinalizedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressIngestionEngineBlockHeight)
18051809
return nil
18061810
}).
1807-
Module("processed error messages block height consumer progress", func(node *cmd.NodeConfig) error {
1808-
processedTxErrorMessagesBlockHeight = bstorage.NewConsumerProgress(
1809-
builder.DB,
1810-
module.ConsumeProgressIngestionEngineTxErrorMessagesBlockHeight,
1811-
)
1812-
return nil
1813-
}).
18141811
Module("processed last full block height monotonic consumer progress", func(node *cmd.NodeConfig) error {
18151812
rootBlockHeight := node.State.Params().FinalizedRoot().Height
18161813

@@ -2052,6 +2049,28 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
20522049
return nil, fmt.Errorf("could not create requester engine: %w", err)
20532050
}
20542051

2052+
preferredENIdentifiers, err := commonrpc.IdentifierList(builder.rpcConf.BackendConfig.PreferredExecutionNodeIDs)
2053+
if err != nil {
2054+
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for preferred EN map: %w", err)
2055+
}
2056+
2057+
fixedENIdentifiers, err := commonrpc.IdentifierList(builder.rpcConf.BackendConfig.FixedExecutionNodeIDs)
2058+
if err != nil {
2059+
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN map: %w", err)
2060+
}
2061+
2062+
if builder.storeTxResultErrorMessages {
2063+
builder.TxResultErrorMessagesCore = tx_error_messages.NewTxErrorMessagesCore(
2064+
node.Logger,
2065+
node.State,
2066+
builder.nodeBackend,
2067+
node.Storage.Receipts,
2068+
node.Storage.TransactionResultErrorMessages,
2069+
preferredENIdentifiers,
2070+
fixedENIdentifiers,
2071+
)
2072+
}
2073+
20552074
builder.IngestEng, err = ingestion.New(
20562075
node.Logger,
20572076
node.EngineRegistry,
@@ -2064,14 +2083,10 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
20642083
node.Storage.Transactions,
20652084
node.Storage.Results,
20662085
node.Storage.Receipts,
2067-
node.Storage.TransactionResultErrorMessages,
20682086
builder.collectionExecutedMetric,
20692087
processedFinalizedBlockHeight,
2070-
processedTxErrorMessagesBlockHeight,
20712088
lastFullBlockHeight,
2072-
builder.nodeBackend,
2073-
builder.rpcConf.BackendConfig.PreferredExecutionNodeIDs,
2074-
builder.rpcConf.BackendConfig.FixedExecutionNodeIDs,
2089+
builder.TxResultErrorMessagesCore,
20752090
)
20762091
if err != nil {
20772092
return nil, err
@@ -2089,6 +2104,31 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
20892104
return builder.RequestEng, nil
20902105
})
20912106

2107+
if builder.storeTxResultErrorMessages {
2108+
builder.Module("processed error messages block height consumer progress", func(node *cmd.NodeConfig) error {
2109+
processedTxErrorMessagesBlockHeight = bstorage.NewConsumerProgress(
2110+
builder.DB,
2111+
module.ConsumeProgressEngineTxErrorMessagesBlockHeight,
2112+
)
2113+
return nil
2114+
})
2115+
builder.Component("transaction result error messages engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
2116+
engine, err := tx_error_messages.New(
2117+
node.Logger,
2118+
node.State,
2119+
node.Storage.Headers,
2120+
processedTxErrorMessagesBlockHeight,
2121+
builder.TxResultErrorMessagesCore,
2122+
)
2123+
if err != nil {
2124+
return nil, err
2125+
}
2126+
builder.FollowerDistributor.AddOnBlockFinalizedConsumer(engine.OnFinalizedBlock)
2127+
2128+
return engine, nil
2129+
})
2130+
}
2131+
20922132
if builder.supportsObserver {
20932133
builder.Component("public sync request handler", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
20942134
syncRequestHandler, err := synceng.NewRequestHandlerEngine(

engine/access/access_test.go

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -683,7 +683,6 @@ func (suite *Suite) TestGetSealedTransaction() {
683683

684684
// create the ingest engine
685685
processedHeight := bstorage.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight)
686-
processedTxErrorMessagesBlockHeight := bstorage.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineTxErrorMessagesBlockHeight)
687686

688687
ingestEng, err := ingestion.New(
689688
suite.log,
@@ -697,13 +696,9 @@ func (suite *Suite) TestGetSealedTransaction() {
697696
transactions,
698697
results,
699698
receipts,
700-
nil,
701699
collectionExecutedMetric,
702700
processedHeight,
703-
processedTxErrorMessagesBlockHeight,
704701
lastFullBlockHeight,
705-
bnd,
706-
enNodeIDs.Strings(),
707702
nil,
708703
)
709704
require.NoError(suite.T(), err)
@@ -796,6 +791,9 @@ func (suite *Suite) TestGetTransactionResult() {
796791
allIdentities := append(colIdentities, enIdentities...)
797792
finalSnapshot.On("Identities", mock.Anything).Return(allIdentities, nil)
798793

794+
suite.state.On("AtBlockID", blockNegativeId).Return(suite.sealedSnapshot)
795+
suite.sealedSnapshot.On("Identities", mock.Anything).Return(allIdentities, nil)
796+
799797
// assume execution node returns an empty list of events
800798
suite.execClient.On("GetTransactionResult", mock.Anything, mock.Anything).Return(&execproto.GetTransactionResultResponse{
801799
Events: nil,
@@ -857,7 +855,6 @@ func (suite *Suite) TestGetTransactionResult() {
857855
require.NoError(suite.T(), err)
858856

859857
processedHeight := bstorage.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight)
860-
processedTxErrorMessagesBlockHeight := bstorage.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineTxErrorMessagesBlockHeight)
861858

862859
lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(
863860
bstorage.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight),
@@ -878,13 +875,9 @@ func (suite *Suite) TestGetTransactionResult() {
878875
transactions,
879876
results,
880877
receipts,
881-
nil,
882878
collectionExecutedMetric,
883879
processedHeight,
884-
processedTxErrorMessagesBlockHeight,
885880
lastFullBlockHeight,
886-
bnd,
887-
enNodeIDs.Strings(),
888881
nil,
889882
)
890883
require.NoError(suite.T(), err)
@@ -982,6 +975,7 @@ func (suite *Suite) TestGetTransactionResult() {
982975
}
983976
resp, err := handler.GetTransactionResult(context.Background(), getReq)
984977
require.Error(suite.T(), err)
978+
require.Contains(suite.T(), err.Error(), "failed to find: transaction not in block")
985979
require.Nil(suite.T(), resp)
986980
})
987981

@@ -1106,7 +1100,6 @@ func (suite *Suite) TestExecuteScript() {
11061100
Once()
11071101

11081102
processedHeight := bstorage.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight)
1109-
processedTxErrorMessagesBlockHeight := bstorage.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineTxErrorMessagesBlockHeight)
11101103

11111104
lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(
11121105
bstorage.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight),
@@ -1127,14 +1120,10 @@ func (suite *Suite) TestExecuteScript() {
11271120
transactions,
11281121
results,
11291122
receipts,
1130-
nil,
11311123
collectionExecutedMetric,
11321124
processedHeight,
1133-
processedTxErrorMessagesBlockHeight,
11341125
lastFullBlockHeight,
1135-
suite.backend,
11361126
nil,
1137-
identities.NodeIDs().Strings(),
11381127
)
11391128
require.NoError(suite.T(), err)
11401129

0 commit comments

Comments
 (0)