Skip to content

Commit 989dfd1

Browse files
committed
cleanup and fix tests
1 parent 68d602d commit 989dfd1

20 files changed

+352
-303
lines changed

cmd/access/node_builder/access_node_builder.go

+25-25
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ import (
7070
"github.com/onflow/flow-go/module/metrics/unstaked"
7171
"github.com/onflow/flow-go/module/state_synchronization"
7272
"github.com/onflow/flow-go/module/state_synchronization/indexer"
73-
"github.com/onflow/flow-go/module/state_synchronization/proxies"
7473
edrequester "github.com/onflow/flow-go/module/state_synchronization/requester"
7574
"github.com/onflow/flow-go/network"
7675
alspmgr "github.com/onflow/flow-go/network/alsp/manager"
@@ -279,8 +278,8 @@ type FlowAccessNodeBuilder struct {
279278
ExecutionIndexer *indexer.Indexer
280279
ExecutionIndexerCore *indexer.IndexerCore
281280
ScriptExecutor *backend.ScriptExecutor
282-
RegistersStoreProxy *proxies.RegistersStore
283-
IndexReporterProxy *proxies.IndexReporter
281+
RegistersAsyncStore *execution.RegistersAsyncStore
282+
EventsIndex *backend.EventsIndex
284283
IndexerDependencies *cmd.DependencyList
285284

286285
// The sync engine participants provider is the libp2p peer store for the access node
@@ -775,16 +774,6 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
775774
return nil, err
776775
}
777776

778-
err = builder.IndexReporterProxy.Initialize(builder.ExecutionIndexer)
779-
if err != nil {
780-
return nil, err
781-
}
782-
783-
err = builder.RegistersStoreProxy.Initialize(registers)
784-
if err != nil {
785-
return nil, err
786-
}
787-
788777
// setup requester to notify indexer when new execution data is received
789778
execDataDistributor.AddOnExecutionDataReceivedConsumer(builder.ExecutionIndexer.OnExecutionData)
790779

@@ -803,7 +792,20 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
803792
return nil, err
804793
}
805794

806-
builder.ScriptExecutor.InitReporter(builder.ExecutionIndexer, scripts)
795+
err = builder.ScriptExecutor.Initialize(builder.ExecutionIndexer, scripts)
796+
if err != nil {
797+
return nil, err
798+
}
799+
800+
err = builder.EventsIndex.Initialize(builder.ExecutionIndexer)
801+
if err != nil {
802+
return nil, err
803+
}
804+
805+
err = builder.RegistersAsyncStore.Initialize(registers)
806+
if err != nil {
807+
return nil, err
808+
}
807809

808810
return builder.ExecutionIndexer, nil
809811
}, builder.IndexerDependencies)
@@ -844,16 +846,15 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
844846
builder.stateStreamConf,
845847
node.State,
846848
node.Storage.Headers,
847-
node.Storage.Events,
848849
node.Storage.Seals,
849850
node.Storage.Results,
850851
builder.ExecutionDataStore,
851852
executionDataStoreCache,
852853
broadcaster,
853854
builder.executionDataConfig.InitialBlockHeight,
854855
highestAvailableHeight,
855-
builder.RegistersStoreProxy,
856-
builder.IndexReporterProxy,
856+
builder.RegistersAsyncStore,
857+
builder.EventsIndex,
857858
useIndex,
858859
)
859860
if err != nil {
@@ -1445,18 +1446,18 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
14451446
builder.ScriptExecutor = backend.NewScriptExecutor(builder.Logger, builder.scriptExecMinBlock, builder.scriptExecMaxBlock)
14461447
return nil
14471448
}).
1448-
Module("register store proxy", func(node *cmd.NodeConfig) error {
1449-
builder.RegistersStoreProxy = proxies.NewRegistersStore()
1450-
return nil
1451-
}).
1452-
Module("index reporter proxy", func(node *cmd.NodeConfig) error {
1453-
builder.IndexReporterProxy = proxies.NewIndexReporter()
1449+
Module("async register store", func(node *cmd.NodeConfig) error {
1450+
builder.RegistersAsyncStore = execution.NewRegistersAsyncStore()
14541451
return nil
14551452
}).
14561453
Module("events storage", func(node *cmd.NodeConfig) error {
14571454
builder.Storage.Events = bstorage.NewEvents(node.Metrics.Cache, node.DB)
14581455
return nil
14591456
}).
1457+
Module("events index", func(node *cmd.NodeConfig) error {
1458+
builder.EventsIndex = backend.NewEventsIndex(builder.Storage.Events)
1459+
return nil
1460+
}).
14601461
Component("RPC engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
14611462
config := builder.rpcConf
14621463
backendConfig := config.BackendConfig
@@ -1508,7 +1509,6 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
15081509
HistoricalAccessNodes: builder.HistoricalAccessRPCs,
15091510
Blocks: node.Storage.Blocks,
15101511
Headers: node.Storage.Headers,
1511-
Events: node.Storage.Events,
15121512
Collections: node.Storage.Collections,
15131513
Transactions: node.Storage.Transactions,
15141514
ExecutionReceipts: node.Storage.Receipts,
@@ -1528,7 +1528,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
15281528
ScriptExecutor: builder.ScriptExecutor,
15291529
ScriptExecutionMode: scriptExecMode,
15301530
EventQueryMode: eventQueryMode,
1531-
IndexReporter: builder.IndexReporterProxy,
1531+
EventsIndex: builder.EventsIndex,
15321532
})
15331533
if err != nil {
15341534
return nil, fmt.Errorf("could not initialize backend: %w", err)

engine/access/integration_unsecure_grpc_server_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,6 @@ func (suite *SameGRPCPortTestSuite) SetupTest() {
243243
conf,
244244
suite.state,
245245
suite.headers,
246-
suite.events,
247246
suite.seals,
248247
suite.results,
249248
nil,
@@ -252,6 +251,7 @@ func (suite *SameGRPCPortTestSuite) SetupTest() {
252251
rootBlock.Header.Height,
253252
rootBlock.Header.Height,
254253
suite.registers,
254+
backend.NewEventsIndex(suite.events),
255255
false,
256256
)
257257
assert.NoError(suite.T(), err)

engine/access/rpc/backend/backend.go

+2-5
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"github.com/onflow/flow-go/model/flow/filter"
1919
"github.com/onflow/flow-go/module"
2020
"github.com/onflow/flow-go/module/execution"
21-
"github.com/onflow/flow-go/module/state_synchronization"
2221
"github.com/onflow/flow-go/state/protocol"
2322
"github.com/onflow/flow-go/storage"
2423
)
@@ -84,7 +83,6 @@ type Params struct {
8483
HistoricalAccessNodes []accessproto.AccessAPIClient
8584
Blocks storage.Blocks
8685
Headers storage.Headers
87-
Events storage.Events
8886
Collections storage.Collections
8987
Transactions storage.Transactions
9088
ExecutionReceipts storage.ExecutionReceipts
@@ -105,7 +103,7 @@ type Params struct {
105103
ScriptExecutor execution.ScriptExecutor
106104
ScriptExecutionMode IndexQueryMode
107105
EventQueryMode IndexQueryMode
108-
IndexReporter state_synchronization.IndexReporter
106+
EventsIndex *EventsIndex
109107
}
110108

111109
// New creates backend instance
@@ -185,13 +183,12 @@ func New(params Params) (*Backend, error) {
185183
chain: params.ChainID.Chain(),
186184
state: params.State,
187185
headers: params.Headers,
188-
events: params.Events,
189186
executionReceipts: params.ExecutionReceipts,
190187
connFactory: params.ConnFactory,
191188
maxHeightRange: params.MaxHeightRange,
192189
nodeCommunicator: params.Communicator,
193190
queryMode: params.EventQueryMode,
194-
indexReporter: params.IndexReporter,
191+
eventsIndex: params.EventsIndex,
195192
},
196193
backendBlockHeaders: backendBlockHeaders{
197194
headers: params.Headers,

engine/access/rpc/backend/backend_accounts_test.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
connectionmock "github.com/onflow/flow-go/engine/access/rpc/connection/mock"
1616
"github.com/onflow/flow-go/engine/common/rpc/convert"
1717
"github.com/onflow/flow-go/model/flow"
18-
"github.com/onflow/flow-go/module/execution"
1918
execmock "github.com/onflow/flow-go/module/execution/mock"
2019
"github.com/onflow/flow-go/module/irrecoverable"
2120
protocol "github.com/onflow/flow-go/state/protocol/mock"
@@ -222,7 +221,7 @@ func (s *BackendAccountsSuite) TestGetAccountFromStorage_Fails() {
222221
statusCode codes.Code
223222
}{
224223
{
225-
err: execution.ErrDataNotAvailable,
224+
err: storage.ErrHeightNotIndexed,
226225
statusCode: codes.OutOfRange,
227226
},
228227
{
@@ -267,7 +266,7 @@ func (s *BackendAccountsSuite) TestGetAccountFromFailover_HappyPath() {
267266
backend.scriptExecMode = IndexQueryModeFailover
268267
backend.scriptExecutor = scriptExecutor
269268

270-
for _, errToReturn := range []error{execution.ErrDataNotAvailable, storage.ErrNotFound} {
269+
for _, errToReturn := range []error{storage.ErrHeightNotIndexed, storage.ErrNotFound} {
271270
scriptExecutor.On("GetAccountAtBlockHeight", mock.Anything, s.account.Address, s.block.Header.Height).
272271
Return(nil, errToReturn).Times(3)
273272

@@ -299,7 +298,7 @@ func (s *BackendAccountsSuite) TestGetAccountFromFailover_ReturnsENErrors() {
299298

300299
scriptExecutor := execmock.NewScriptExecutor(s.T())
301300
scriptExecutor.On("GetAccountAtBlockHeight", mock.Anything, s.failingAddress, s.block.Header.Height).
302-
Return(nil, execution.ErrDataNotAvailable)
301+
Return(nil, storage.ErrHeightNotIndexed)
303302

304303
backend := s.defaultBackend()
305304
backend.scriptExecMode = IndexQueryModeFailover

engine/access/rpc/backend/backend_events.go

+6-43
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,14 @@ import (
2020
"github.com/onflow/flow-go/engine/common/rpc/convert"
2121
"github.com/onflow/flow-go/model/events"
2222
"github.com/onflow/flow-go/model/flow"
23-
"github.com/onflow/flow-go/module/execution"
2423
"github.com/onflow/flow-go/module/irrecoverable"
25-
"github.com/onflow/flow-go/module/state_synchronization"
24+
"github.com/onflow/flow-go/module/state_synchronization/indexer"
2625
"github.com/onflow/flow-go/state/protocol"
2726
"github.com/onflow/flow-go/storage"
2827
)
2928

3029
type backendEvents struct {
3130
headers storage.Headers
32-
events storage.Events
3331
executionReceipts storage.ExecutionReceipts
3432
state protocol.State
3533
chain flow.Chain
@@ -38,7 +36,7 @@ type backendEvents struct {
3836
maxHeightRange uint
3937
nodeCommunicator Communicator
4038
queryMode IndexQueryMode
41-
indexReporter state_synchronization.IndexReporter
39+
eventsIndex *EventsIndex
4240
}
4341

4442
// blockMetadata is used to capture information about requested blocks to avoid repeated blockID
@@ -230,25 +228,16 @@ func (b *backendEvents) getBlockEventsFromStorage(
230228
missing := make([]blockMetadata, 0)
231229
resp := make([]flow.BlockEvents, 0)
232230

233-
lowestHeight, highestHeight, err := b.getIndexerHeights()
234-
if err != nil {
235-
return nil, nil, err
236-
}
237-
238231
for _, blockInfo := range blockInfos {
239232
if ctx.Err() != nil {
240233
return nil, nil, rpc.ConvertError(ctx.Err(), "failed to get events from storage", codes.Canceled)
241234
}
242235

243-
if blockInfo.Height < lowestHeight || blockInfo.Height > highestHeight {
244-
missing = append(missing, blockInfo)
245-
continue
246-
}
247-
248-
events, err := b.events.ByBlockID(blockInfo.ID)
236+
events, err := b.eventsIndex.GetEvents(blockInfo.ID, blockInfo.Height)
249237
if err != nil {
250-
// Note: if there are no events for a block, an empty slice is returned
251-
if errors.Is(err, storage.ErrNotFound) {
238+
if errors.Is(err, storage.ErrNotFound) ||
239+
errors.Is(err, storage.ErrHeightNotIndexed) ||
240+
errors.Is(err, indexer.ErrIndexNotInitialized) {
252241
missing = append(missing, blockInfo)
253242
continue
254243
}
@@ -443,29 +432,3 @@ func (b *backendEvents) tryGetEvents(ctx context.Context,
443432

444433
return execRPCClient.GetEventsForBlockIDs(ctx, req)
445434
}
446-
447-
// getIndexerHeights returns the lowest and highest indexed block heights
448-
// Expected errors during normal operation:
449-
// - codes.FailedPrecondition: if the index reporter is not ready yet.
450-
// - codes.Internal: if there was any other error getting the heights.
451-
func (b *backendEvents) getIndexerHeights() (uint64, uint64, error) {
452-
lowestHeight, err := b.indexReporter.LowestIndexedHeight()
453-
if err != nil {
454-
if errors.Is(err, execution.ErrDataNotAvailable) {
455-
// if the index is not ready yet, but likely will be eventually
456-
return 0, 0, status.Errorf(codes.FailedPrecondition, "failed to get lowest indexed height: %v", err)
457-
}
458-
return 0, 0, rpc.ConvertError(err, "failed to get lowest indexed height", codes.Internal)
459-
}
460-
461-
highestHeight, err := b.indexReporter.HighestIndexedHeight()
462-
if err != nil {
463-
if errors.Is(err, execution.ErrDataNotAvailable) {
464-
// if the index is not ready yet, but likely will be eventually
465-
return 0, 0, status.Errorf(codes.FailedPrecondition, "failed to get highest indexed height: %v", err)
466-
}
467-
return 0, 0, rpc.ConvertError(err, "failed to get highest indexed height", codes.Internal)
468-
}
469-
470-
return lowestHeight, highestHeight, nil
471-
}

0 commit comments

Comments
 (0)