diff --git a/.mockery.yaml b/.mockery.yaml index 8f139231c..77930d950 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -58,6 +58,11 @@ packages: dir: ./block/internal/syncing pkgname: syncing filename: syncer_mock.go + HeightStore: + config: + dir: ./block/internal/syncing + pkgname: syncing + filename: height_store_mock.go github.com/evstack/ev-node/block/internal/common: interfaces: Broadcaster: diff --git a/apps/evm/cmd/rollback.go b/apps/evm/cmd/rollback.go index 552d296e8..a8f3ed645 100644 --- a/apps/evm/cmd/rollback.go +++ b/apps/evm/cmd/rollback.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" + "github.com/evstack/ev-node/pkg/sync" ds "github.com/ipfs/go-datastore" kt "github.com/ipfs/go-datastore/keytransform" "github.com/spf13/cobra" @@ -13,7 +14,6 @@ import ( "github.com/evstack/ev-node/node" rollcmd "github.com/evstack/ev-node/pkg/cmd" "github.com/evstack/ev-node/pkg/store" - "github.com/evstack/ev-node/types" ) // NewRollbackCmd creates a command to rollback ev-node state by one height. @@ -70,7 +70,7 @@ func NewRollbackCmd() *cobra.Command { } // rollback ev-node goheader state - headerStore, err := goheaderstore.NewStore[*types.SignedHeader]( + headerStore, err := goheaderstore.NewStore[*sync.SignedHeaderWithDAHint]( evolveDB, goheaderstore.WithStorePrefix("headerSync"), goheaderstore.WithMetrics(), @@ -79,7 +79,7 @@ func NewRollbackCmd() *cobra.Command { return err } - dataStore, err := goheaderstore.NewStore[*types.Data]( + dataStore, err := goheaderstore.NewStore[*sync.DataWithDAHint]( evolveDB, goheaderstore.WithStorePrefix("dataSync"), goheaderstore.WithMetrics(), diff --git a/apps/testapp/cmd/rollback.go b/apps/testapp/cmd/rollback.go index 5600eaeb9..22ee216b9 100644 --- a/apps/testapp/cmd/rollback.go +++ b/apps/testapp/cmd/rollback.go @@ -5,13 +5,12 @@ import ( "errors" "fmt" + goheaderstore "github.com/celestiaorg/go-header/store" kvexecutor "github.com/evstack/ev-node/apps/testapp/kv" "github.com/evstack/ev-node/node" rollcmd "github.com/evstack/ev-node/pkg/cmd" "github.com/evstack/ev-node/pkg/store" - "github.com/evstack/ev-node/types" - - goheaderstore "github.com/celestiaorg/go-header/store" + "github.com/evstack/ev-node/pkg/sync" ds "github.com/ipfs/go-datastore" kt "github.com/ipfs/go-datastore/keytransform" "github.com/spf13/cobra" @@ -76,7 +75,7 @@ func NewRollbackCmd() *cobra.Command { } // rollback ev-node goheader state - headerStore, err := goheaderstore.NewStore[*types.SignedHeader]( + headerStore, err := goheaderstore.NewStore[*sync.SignedHeaderWithDAHint]( evolveDB, goheaderstore.WithStorePrefix("headerSync"), goheaderstore.WithMetrics(), @@ -85,7 +84,7 @@ func NewRollbackCmd() *cobra.Command { return err } - dataStore, err := goheaderstore.NewStore[*types.Data]( + dataStore, err := goheaderstore.NewStore[*sync.DataWithDAHint]( evolveDB, goheaderstore.WithStorePrefix("dataSync"), goheaderstore.WithMetrics(), diff --git a/block/components.go b/block/components.go index bd29f9244..559d46913 100644 --- a/block/components.go +++ b/block/components.go @@ -5,10 +5,10 @@ import ( "errors" "fmt" + "github.com/evstack/ev-node/pkg/sync" "github.com/rs/zerolog" "github.com/evstack/ev-node/block/internal/cache" - "github.com/evstack/ev-node/block/internal/common" "github.com/evstack/ev-node/block/internal/executing" "github.com/evstack/ev-node/block/internal/reaping" "github.com/evstack/ev-node/block/internal/submitting" @@ -132,8 +132,8 @@ func NewSyncComponents( store store.Store, exec coreexecutor.Executor, da coreda.DA, - headerStore common.Broadcaster[*types.SignedHeader], - dataStore common.Broadcaster[*types.Data], + headerStore *sync.HeaderSyncService, + dataStore *sync.DataSyncService, logger zerolog.Logger, metrics *Metrics, blockOpts BlockOptions, @@ -165,7 +165,7 @@ func NewSyncComponents( ) // Create submitter for sync nodes (no signer, only DA inclusion processing) - daSubmitter := submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger) + daSubmitter := submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger, headerStore, dataStore) submitter := submitting.NewSubmitter( store, exec, @@ -198,8 +198,8 @@ func NewAggregatorComponents( sequencer coresequencer.Sequencer, da coreda.DA, signer signer.Signer, - headerBroadcaster common.Broadcaster[*types.SignedHeader], - dataBroadcaster common.Broadcaster[*types.Data], + headerBroadcaster *sync.HeaderSyncService, + dataBroadcaster *sync.DataSyncService, logger zerolog.Logger, metrics *Metrics, blockOpts BlockOptions, @@ -256,7 +256,7 @@ func NewAggregatorComponents( // Create DA client and submitter for aggregator nodes (with signer for submission) daClient := NewDAClient(da, config, logger) - daSubmitter := submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger) + daSubmitter := submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger, headerBroadcaster, dataBroadcaster) submitter := submitting.NewSubmitter( store, exec, diff --git a/block/internal/common/broadcaster_mock.go b/block/internal/common/broadcaster_mock.go index 298347807..39d748f5c 100644 --- a/block/internal/common/broadcaster_mock.go +++ b/block/internal/common/broadcaster_mock.go @@ -8,6 +8,7 @@ import ( "context" "github.com/celestiaorg/go-header" + "github.com/evstack/ev-node/types" "github.com/libp2p/go-libp2p-pubsub" mock "github.com/stretchr/testify/mock" ) @@ -39,48 +40,152 @@ func (_m *MockBroadcaster[H]) EXPECT() *MockBroadcaster_Expecter[H] { return &MockBroadcaster_Expecter[H]{mock: &_m.Mock} } -// Store provides a mock function for the type MockBroadcaster -func (_mock *MockBroadcaster[H]) Store() header.Store[H] { - ret := _mock.Called() +// AppendDAHint provides a mock function for the type MockBroadcaster +func (_mock *MockBroadcaster[H]) AppendDAHint(ctx context.Context, daHeight uint64, hashes ...types.Hash) error { + // types.Hash + _va := make([]interface{}, len(hashes)) + for _i := range hashes { + _va[_i] = hashes[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, daHeight) + _ca = append(_ca, _va...) + ret := _mock.Called(_ca...) if len(ret) == 0 { - panic("no return value specified for Store") + panic("no return value specified for AppendDAHint") } - var r0 header.Store[H] - if returnFunc, ok := ret.Get(0).(func() header.Store[H]); ok { - r0 = returnFunc() + var r0 error + if returnFunc, ok := ret.Get(0).(func(context.Context, uint64, ...types.Hash) error); ok { + r0 = returnFunc(ctx, daHeight, hashes...) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockBroadcaster_AppendDAHint_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AppendDAHint' +type MockBroadcaster_AppendDAHint_Call[H header.Header[H]] struct { + *mock.Call +} + +// AppendDAHint is a helper method to define mock.On call +// - ctx context.Context +// - daHeight uint64 +// - hashes ...types.Hash +func (_e *MockBroadcaster_Expecter[H]) AppendDAHint(ctx interface{}, daHeight interface{}, hashes ...interface{}) *MockBroadcaster_AppendDAHint_Call[H] { + return &MockBroadcaster_AppendDAHint_Call[H]{Call: _e.mock.On("AppendDAHint", + append([]interface{}{ctx, daHeight}, hashes...)...)} +} + +func (_c *MockBroadcaster_AppendDAHint_Call[H]) Run(run func(ctx context.Context, daHeight uint64, hashes ...types.Hash)) *MockBroadcaster_AppendDAHint_Call[H] { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 uint64 + if args[1] != nil { + arg1 = args[1].(uint64) + } + var arg2 []types.Hash + variadicArgs := make([]types.Hash, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(types.Hash) + } + } + arg2 = variadicArgs + run( + arg0, + arg1, + arg2..., + ) + }) + return _c +} + +func (_c *MockBroadcaster_AppendDAHint_Call[H]) Return(err error) *MockBroadcaster_AppendDAHint_Call[H] { + _c.Call.Return(err) + return _c +} + +func (_c *MockBroadcaster_AppendDAHint_Call[H]) RunAndReturn(run func(ctx context.Context, daHeight uint64, hashes ...types.Hash) error) *MockBroadcaster_AppendDAHint_Call[H] { + _c.Call.Return(run) + return _c +} + +// GetByHeight provides a mock function for the type MockBroadcaster +func (_mock *MockBroadcaster[H]) GetByHeight(ctx context.Context, height uint64) (H, uint64, error) { + ret := _mock.Called(ctx, height) + + if len(ret) == 0 { + panic("no return value specified for GetByHeight") + } + + var r0 H + var r1 uint64 + var r2 error + if returnFunc, ok := ret.Get(0).(func(context.Context, uint64) (H, uint64, error)); ok { + return returnFunc(ctx, height) + } + if returnFunc, ok := ret.Get(0).(func(context.Context, uint64) H); ok { + r0 = returnFunc(ctx, height) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(header.Store[H]) + r0 = ret.Get(0).(H) } } - return r0 + if returnFunc, ok := ret.Get(1).(func(context.Context, uint64) uint64); ok { + r1 = returnFunc(ctx, height) + } else { + r1 = ret.Get(1).(uint64) + } + if returnFunc, ok := ret.Get(2).(func(context.Context, uint64) error); ok { + r2 = returnFunc(ctx, height) + } else { + r2 = ret.Error(2) + } + return r0, r1, r2 } -// MockBroadcaster_Store_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Store' -type MockBroadcaster_Store_Call[H header.Header[H]] struct { +// MockBroadcaster_GetByHeight_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetByHeight' +type MockBroadcaster_GetByHeight_Call[H header.Header[H]] struct { *mock.Call } -// Store is a helper method to define mock.On call -func (_e *MockBroadcaster_Expecter[H]) Store() *MockBroadcaster_Store_Call[H] { - return &MockBroadcaster_Store_Call[H]{Call: _e.mock.On("Store")} +// GetByHeight is a helper method to define mock.On call +// - ctx context.Context +// - height uint64 +func (_e *MockBroadcaster_Expecter[H]) GetByHeight(ctx interface{}, height interface{}) *MockBroadcaster_GetByHeight_Call[H] { + return &MockBroadcaster_GetByHeight_Call[H]{Call: _e.mock.On("GetByHeight", ctx, height)} } -func (_c *MockBroadcaster_Store_Call[H]) Run(run func()) *MockBroadcaster_Store_Call[H] { +func (_c *MockBroadcaster_GetByHeight_Call[H]) Run(run func(ctx context.Context, height uint64)) *MockBroadcaster_GetByHeight_Call[H] { _c.Call.Run(func(args mock.Arguments) { - run() + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 uint64 + if args[1] != nil { + arg1 = args[1].(uint64) + } + run( + arg0, + arg1, + ) }) return _c } -func (_c *MockBroadcaster_Store_Call[H]) Return(store header.Store[H]) *MockBroadcaster_Store_Call[H] { - _c.Call.Return(store) +func (_c *MockBroadcaster_GetByHeight_Call[H]) Return(v H, v1 uint64, err error) *MockBroadcaster_GetByHeight_Call[H] { + _c.Call.Return(v, v1, err) return _c } -func (_c *MockBroadcaster_Store_Call[H]) RunAndReturn(run func() header.Store[H]) *MockBroadcaster_Store_Call[H] { +func (_c *MockBroadcaster_GetByHeight_Call[H]) RunAndReturn(run func(ctx context.Context, height uint64) (H, uint64, error)) *MockBroadcaster_GetByHeight_Call[H] { _c.Call.Return(run) return _c } diff --git a/block/internal/common/event.go b/block/internal/common/event.go index 69d0300f9..f02a181de 100644 --- a/block/internal/common/event.go +++ b/block/internal/common/event.go @@ -20,4 +20,7 @@ type DAHeightEvent struct { DaHeight uint64 // Source indicates where this event originated from (DA or P2P) Source EventSource + + // Optional DA height hints from P2P. first is the DA height hint for the header, second is the DA height hint for the data + DaHeightHints [2]uint64 } diff --git a/block/internal/common/expected_interfaces.go b/block/internal/common/expected_interfaces.go index 8f36af624..0eeef6ab3 100644 --- a/block/internal/common/expected_interfaces.go +++ b/block/internal/common/expected_interfaces.go @@ -3,13 +3,20 @@ package common import ( "context" + "github.com/evstack/ev-node/types" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/celestiaorg/go-header" ) -// broadcaster interface for P2P broadcasting +type ( + HeaderP2PBroadcaster = Broadcaster[*types.SignedHeader] + DataP2PBroadcaster = Broadcaster[*types.Data] +) + +// Broadcaster interface for P2P broadcasting type Broadcaster[H header.Header[H]] interface { WriteToStoreAndBroadcast(ctx context.Context, payload H, opts ...pubsub.PubOpt) error - Store() header.Store[H] + AppendDAHint(ctx context.Context, daHeight uint64, hashes ...types.Hash) error + GetByHeight(ctx context.Context, height uint64) (H, uint64, error) } diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index 263d59081..42609de56 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -38,8 +38,8 @@ type Executor struct { metrics *common.Metrics // Broadcasting - headerBroadcaster common.Broadcaster[*types.SignedHeader] - dataBroadcaster common.Broadcaster[*types.Data] + headerBroadcaster common.HeaderP2PBroadcaster + dataBroadcaster common.DataP2PBroadcaster // Configuration config config.Config @@ -79,8 +79,8 @@ func NewExecutor( metrics *common.Metrics, config config.Config, genesis genesis.Genesis, - headerBroadcaster common.Broadcaster[*types.SignedHeader], - dataBroadcaster common.Broadcaster[*types.Data], + headerBroadcaster common.HeaderP2PBroadcaster, + dataBroadcaster common.DataP2PBroadcaster, logger zerolog.Logger, options common.BlockOptions, errorCh chan<- error, diff --git a/block/internal/executing/executor_test.go b/block/internal/executing/executor_test.go index e310c6d40..7ef0f64e2 100644 --- a/block/internal/executing/executor_test.go +++ b/block/internal/executing/executor_test.go @@ -1,7 +1,6 @@ package executing import ( - "context" "testing" "time" @@ -120,48 +119,3 @@ func TestExecutor_NilBroadcasters(t *testing.T) { assert.Equal(t, cacheManager, executor.cache) assert.Equal(t, gen, executor.genesis) } - -func TestExecutor_BroadcastFlow(t *testing.T) { - // This test demonstrates how the broadcast flow works - // when an Executor produces a block - - // Create mock broadcasters - headerBroadcaster := common.NewMockBroadcaster[*types.SignedHeader](t) - dataBroadcaster := common.NewMockBroadcaster[*types.Data](t) - - // Create sample data that would be broadcast - sampleHeader := &types.SignedHeader{ - Header: types.Header{ - BaseHeader: types.BaseHeader{ - ChainID: "test-chain", - Height: 1, - Time: uint64(time.Now().UnixNano()), - }, - }, - } - - sampleData := &types.Data{ - Metadata: &types.Metadata{ - ChainID: "test-chain", - Height: 1, - Time: uint64(time.Now().UnixNano()), - }, - Txs: []types.Tx{}, - } - - // Test broadcast calls - ctx := context.Background() - - // Set up expectations - headerBroadcaster.EXPECT().WriteToStoreAndBroadcast(ctx, sampleHeader).Return(nil).Once() - dataBroadcaster.EXPECT().WriteToStoreAndBroadcast(ctx, sampleData).Return(nil).Once() - - // Simulate what happens in produceBlock() after block creation - err := headerBroadcaster.WriteToStoreAndBroadcast(ctx, sampleHeader) - require.NoError(t, err) - - err = dataBroadcaster.WriteToStoreAndBroadcast(ctx, sampleData) - require.NoError(t, err) - - // Verify expectations were met (automatically checked by testify mock on cleanup) -} diff --git a/block/internal/submitting/da_submitter.go b/block/internal/submitting/da_submitter.go index 34d6951d8..2fc2c1c47 100644 --- a/block/internal/submitting/da_submitter.go +++ b/block/internal/submitting/da_submitter.go @@ -90,14 +90,20 @@ func clamp(v, min, max time.Duration) time.Duration { return v } +type DAHintAppender interface { + AppendDAHint(ctx context.Context, daHeight uint64, hash ...types.Hash) error +} + // DASubmitter handles DA submission operations type DASubmitter struct { - client da.Client - config config.Config - genesis genesis.Genesis - options common.BlockOptions - logger zerolog.Logger - metrics *common.Metrics + client da.Client + config config.Config + genesis genesis.Genesis + options common.BlockOptions + logger zerolog.Logger + metrics *common.Metrics + headerDAHintAppender DAHintAppender + dataDAHintAppender DAHintAppender // address selector for multi-account support addressSelector pkgda.AddressSelector @@ -111,6 +117,8 @@ func NewDASubmitter( options common.BlockOptions, metrics *common.Metrics, logger zerolog.Logger, + headerDAHintAppender DAHintAppender, + dataDAHintAppender DAHintAppender, ) *DASubmitter { daSubmitterLogger := logger.With().Str("component", "da_submitter").Logger() @@ -136,13 +144,15 @@ func NewDASubmitter( } return &DASubmitter{ - client: client, - config: config, - genesis: genesis, - options: options, - metrics: metrics, - logger: daSubmitterLogger, - addressSelector: addressSelector, + client: client, + config: config, + genesis: genesis, + options: options, + metrics: metrics, + logger: daSubmitterLogger, + addressSelector: addressSelector, + headerDAHintAppender: headerDAHintAppender, + dataDAHintAppender: dataDAHintAppender, } } @@ -182,8 +192,15 @@ func (s *DASubmitter) SubmitHeaders(ctx context.Context, cache cache.Manager) er return proto.Marshal(headerPb) }, func(submitted []*types.SignedHeader, res *coreda.ResultSubmit) { - for _, header := range submitted { - cache.SetHeaderDAIncluded(header.Hash().String(), res.Height, header.Height()) + hashes := make([]types.Hash, len(submitted)) + for i, header := range submitted { + headerHash := header.Hash() + cache.SetHeaderDAIncluded(headerHash.String(), res.Height, header.Height()) + hashes[i] = headerHash + } + if err := s.headerDAHintAppender.AppendDAHint(ctx, res.Height, hashes...); err != nil { + s.logger.Error().Err(err).Msg("failed to append da height hint in header p2p store") + // ignoring error here, since we don't want to block the block submission' } if l := len(submitted); l > 0 { lastHeight := submitted[l-1].Height() @@ -225,8 +242,14 @@ func (s *DASubmitter) SubmitData(ctx context.Context, cache cache.Manager, signe return signedData.MarshalBinary() }, func(submitted []*types.SignedData, res *coreda.ResultSubmit) { - for _, sd := range submitted { + hashes := make([]types.Hash, len(submitted)) + for i, sd := range submitted { cache.SetDataDAIncluded(sd.Data.DACommitment().String(), res.Height, sd.Height()) + hashes[i] = sd.Hash() + } + if err := s.dataDAHintAppender.AppendDAHint(ctx, res.Height, hashes...); err != nil { + s.logger.Error().Err(err).Msg("failed to append da height hint in data p2p store") + // ignoring error here, since we don't want to block the block submission' } if l := len(submitted); l > 0 { lastHeight := submitted[l-1].Height() diff --git a/block/internal/submitting/da_submitter_integration_test.go b/block/internal/submitting/da_submitter_integration_test.go index 5b768e1a5..943abbff7 100644 --- a/block/internal/submitting/da_submitter_integration_test.go +++ b/block/internal/submitting/da_submitter_integration_test.go @@ -93,7 +93,7 @@ func TestDASubmitter_SubmitHeadersAndData_MarksInclusionAndUpdatesLastSubmitted( Namespace: cfg.DA.Namespace, DataNamespace: cfg.DA.DataNamespace, }) - daSubmitter := NewDASubmitter(daClient, cfg, gen, common.DefaultBlockOptions(), common.NopMetrics(), zerolog.Nop()) + daSubmitter := NewDASubmitter(daClient, cfg, gen, common.DefaultBlockOptions(), common.NopMetrics(), zerolog.Nop(), noopDAHintAppender{}, noopDAHintAppender{}) // Submit headers and data require.NoError(t, daSubmitter.SubmitHeaders(context.Background(), cm)) @@ -110,3 +110,9 @@ func TestDASubmitter_SubmitHeadersAndData_MarksInclusionAndUpdatesLastSubmitted( assert.True(t, ok) } + +type noopDAHintAppender struct{} + +func (n noopDAHintAppender) AppendDAHint(ctx context.Context, daHeight uint64, hash ...types.Hash) error { + return nil +} diff --git a/block/internal/submitting/da_submitter_mocks_test.go b/block/internal/submitting/da_submitter_mocks_test.go index b215b0cf2..045cf821e 100644 --- a/block/internal/submitting/da_submitter_mocks_test.go +++ b/block/internal/submitting/da_submitter_mocks_test.go @@ -36,7 +36,7 @@ func newTestSubmitter(mockDA *mocks.MockDA, override func(*config.Config)) *DASu Namespace: cfg.DA.Namespace, DataNamespace: cfg.DA.DataNamespace, }) - return NewDASubmitter(daClient, cfg, genesis.Genesis{} /*options=*/, common.BlockOptions{}, common.NopMetrics(), zerolog.Nop()) + return NewDASubmitter(daClient, cfg, genesis.Genesis{} /*options=*/, common.BlockOptions{}, common.NopMetrics(), zerolog.Nop(), nil, nil) } // marshal helper for simple items diff --git a/block/internal/submitting/da_submitter_test.go b/block/internal/submitting/da_submitter_test.go index 214ab98db..02f06f057 100644 --- a/block/internal/submitting/da_submitter_test.go +++ b/block/internal/submitting/da_submitter_test.go @@ -65,6 +65,8 @@ func setupDASubmitterTest(t *testing.T) (*DASubmitter, store.Store, cache.Manage common.DefaultBlockOptions(), common.NopMetrics(), zerolog.Nop(), + noopDAHintAppender{}, + noopDAHintAppender{}, ) return daSubmitter, st, cm, dummyDA, gen @@ -115,6 +117,8 @@ func TestNewDASubmitterSetsVisualizerWhenEnabled(t *testing.T) { common.DefaultBlockOptions(), common.NopMetrics(), zerolog.Nop(), + noopDAHintAppender{}, + noopDAHintAppender{}, ) require.NotNil(t, server.GetDAVisualizationServer()) diff --git a/block/internal/submitting/submitter_test.go b/block/internal/submitting/submitter_test.go index c1df11bf5..dc4c8e142 100644 --- a/block/internal/submitting/submitter_test.go +++ b/block/internal/submitting/submitter_test.go @@ -168,7 +168,7 @@ func TestSubmitter_setSequencerHeightToDAHeight(t *testing.T) { Namespace: cfg.DA.Namespace, DataNamespace: cfg.DA.DataNamespace, }) - daSub := NewDASubmitter(daClient, cfg, genesis.Genesis{}, common.BlockOptions{}, metrics, zerolog.Nop()) + daSub := NewDASubmitter(daClient, cfg, genesis.Genesis{}, common.BlockOptions{}, metrics, zerolog.Nop(), nil, nil) s := NewSubmitter(mockStore, nil, cm, metrics, cfg, genesis.Genesis{}, daSub, nil, zerolog.Nop(), nil) s.ctx = ctx @@ -253,7 +253,7 @@ func TestSubmitter_processDAInclusionLoop_advances(t *testing.T) { Namespace: cfg.DA.Namespace, DataNamespace: cfg.DA.DataNamespace, }) - daSub := NewDASubmitter(daClient, cfg, genesis.Genesis{}, common.BlockOptions{}, metrics, zerolog.Nop()) + daSub := NewDASubmitter(daClient, cfg, genesis.Genesis{}, common.BlockOptions{}, metrics, zerolog.Nop(), nil, nil) s := NewSubmitter(st, exec, cm, metrics, cfg, genesis.Genesis{}, daSub, nil, zerolog.Nop(), nil) // prepare two consecutive blocks in store with DA included in cache @@ -444,7 +444,7 @@ func TestSubmitter_CacheClearedOnHeightInclusion(t *testing.T) { Namespace: cfg.DA.Namespace, DataNamespace: cfg.DA.DataNamespace, }) - daSub := NewDASubmitter(daClient, cfg, genesis.Genesis{}, common.BlockOptions{}, metrics, zerolog.Nop()) + daSub := NewDASubmitter(daClient, cfg, genesis.Genesis{}, common.BlockOptions{}, metrics, zerolog.Nop(), nil, nil) s := NewSubmitter(st, exec, cm, metrics, cfg, genesis.Genesis{}, daSub, nil, zerolog.Nop(), nil) // Create test blocks diff --git a/block/internal/syncing/async_da_retriever.go b/block/internal/syncing/async_da_retriever.go new file mode 100644 index 000000000..7c79a3751 --- /dev/null +++ b/block/internal/syncing/async_da_retriever.go @@ -0,0 +1,111 @@ +package syncing + +import ( + "context" + "sync" + + "github.com/evstack/ev-node/block/internal/common" + "github.com/rs/zerolog" +) + +// AsyncDARetriever handles concurrent DA retrieval operations. +type AsyncDARetriever struct { + retriever DARetriever + resultCh chan<- common.DAHeightEvent + workCh chan uint64 + inFlight map[uint64]struct{} + mu sync.Mutex + logger zerolog.Logger + wg sync.WaitGroup + ctx context.Context + cancel context.CancelFunc +} + +// NewAsyncDARetriever creates a new AsyncDARetriever. +func NewAsyncDARetriever( + retriever DARetriever, + resultCh chan<- common.DAHeightEvent, + logger zerolog.Logger, +) *AsyncDARetriever { + return &AsyncDARetriever{ + retriever: retriever, + resultCh: resultCh, + workCh: make(chan uint64, 100), // Buffer size 100 + inFlight: make(map[uint64]struct{}), + logger: logger.With().Str("component", "async_da_retriever").Logger(), + } +} + +// Start starts the worker pool. +func (r *AsyncDARetriever) Start(ctx context.Context) { + r.ctx, r.cancel = context.WithCancel(ctx) + // Start 5 workers + for i := 0; i < 5; i++ { + r.wg.Add(1) + go r.worker() + } + r.logger.Info().Msg("AsyncDARetriever started") +} + +// Stop stops the worker pool. +func (r *AsyncDARetriever) Stop() { + if r.cancel != nil { + r.cancel() + } + r.wg.Wait() + r.logger.Info().Msg("AsyncDARetriever stopped") +} + +// RequestRetrieval requests a DA retrieval for the given height. +// It is non-blocking and idempotent. +func (r *AsyncDARetriever) RequestRetrieval(height uint64) { + r.mu.Lock() + defer r.mu.Unlock() + + if _, exists := r.inFlight[height]; exists { + return + } + + select { + case r.workCh <- height: + r.inFlight[height] = struct{}{} + r.logger.Debug().Uint64("height", height).Msg("queued DA retrieval request") + default: + r.logger.Debug().Uint64("height", height).Msg("DA retrieval worker pool full, dropping request") + } +} + +func (r *AsyncDARetriever) worker() { + defer r.wg.Done() + + for { + select { + case <-r.ctx.Done(): + return + case height := <-r.workCh: + r.processRetrieval(height) + } + } +} + +func (r *AsyncDARetriever) processRetrieval(height uint64) { + defer func() { + r.mu.Lock() + delete(r.inFlight, height) + r.mu.Unlock() + }() + + events, err := r.retriever.RetrieveFromDA(r.ctx, height) + if err != nil { + r.logger.Debug().Err(err).Uint64("height", height).Msg("async DA retrieval failed") + return + } + + for _, event := range events { + select { + case r.resultCh <- event: + case <-r.ctx.Done(): + return + } + } +} diff --git a/block/internal/syncing/async_da_retriever_test.go b/block/internal/syncing/async_da_retriever_test.go new file mode 100644 index 000000000..dfaecc922 --- /dev/null +++ b/block/internal/syncing/async_da_retriever_test.go @@ -0,0 +1,141 @@ +package syncing + +import ( + "context" + "testing" + "time" + + "github.com/evstack/ev-node/block/internal/common" + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestAsyncDARetriever_RequestRetrieval(t *testing.T) { + logger := zerolog.Nop() + mockRetriever := NewMockDARetriever(t) + resultCh := make(chan common.DAHeightEvent, 10) + + asyncRetriever := NewAsyncDARetriever(mockRetriever, resultCh, logger) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + asyncRetriever.Start(ctx) + defer asyncRetriever.Stop() + + // 1. Test successful retrieval + height1 := uint64(100) + mockRetriever.EXPECT().RetrieveFromDA(mock.Anything, height1).Return([]common.DAHeightEvent{{DaHeight: height1}}, nil).Once() + + asyncRetriever.RequestRetrieval(height1) + + select { + case event := <-resultCh: + assert.Equal(t, height1, event.DaHeight) + case <-time.After(1 * time.Second): + t.Fatal("timeout waiting for result") + } + + // 2. Test deduplication (idempotency) + // We'll block the retriever to simulate a slow request, then send multiple requests for the same height + height2 := uint64(200) + + // Create a channel to signal when the mock is called + calledCh := make(chan struct{}) + // Create a channel to unblock the mock + unblockCh := make(chan struct{}) + + mockRetriever.EXPECT().RetrieveFromDA(mock.Anything, height2).RunAndReturn(func(ctx context.Context, h uint64) ([]common.DAHeightEvent, error) { + close(calledCh) + <-unblockCh + return []common.DAHeightEvent{{DaHeight: h}}, nil + }).Once() // Should be called only once despite multiple requests + + // Send first request + asyncRetriever.RequestRetrieval(height2) + + // Wait for the worker to pick it up + select { + case <-calledCh: + case <-time.After(1 * time.Second): + t.Fatal("timeout waiting for retriever call") + } + + // Send duplicate requests while the first one is still in flight + asyncRetriever.RequestRetrieval(height2) + asyncRetriever.RequestRetrieval(height2) + + // Unblock the worker + close(unblockCh) + + // We should receive exactly one result + select { + case event := <-resultCh: + assert.Equal(t, height2, event.DaHeight) + case <-time.After(1 * time.Second): + t.Fatal("timeout waiting for result") + } + + // Ensure no more results come through + select { + case <-resultCh: + t.Fatal("received duplicate result") + default: + } +} + +func TestAsyncDARetriever_WorkerPoolLimit(t *testing.T) { + logger := zerolog.Nop() + mockRetriever := NewMockDARetriever(t) + resultCh := make(chan common.DAHeightEvent, 100) + + asyncRetriever := NewAsyncDARetriever(mockRetriever, resultCh, logger) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + asyncRetriever.Start(ctx) + defer asyncRetriever.Stop() + + // We have 5 workers. We'll block them all. + unblockCh := make(chan struct{}) + + // Expect 5 calls that block + for i := 0; i < 5; i++ { + h := uint64(1000 + i) + mockRetriever.EXPECT().RetrieveFromDA(mock.Anything, h).RunAndReturn(func(ctx context.Context, h uint64) ([]common.DAHeightEvent, error) { + <-unblockCh + return []common.DAHeightEvent{{DaHeight: h}}, nil + }).Once() + asyncRetriever.RequestRetrieval(h) + } + + // Give workers time to pick up tasks + time.Sleep(100 * time.Millisecond) + + // Now send a 6th request. It should be queued but not processed yet. + height6 := uint64(1005) + processed6 := make(chan struct{}) + mockRetriever.EXPECT().RetrieveFromDA(mock.Anything, height6).RunAndReturn(func(ctx context.Context, h uint64) ([]common.DAHeightEvent, error) { + close(processed6) + return []common.DAHeightEvent{{DaHeight: h}}, nil + }).Once() + + asyncRetriever.RequestRetrieval(height6) + + // Ensure 6th request is NOT processed yet + select { + case <-processed6: + t.Fatal("6th request processed too early") + default: + } + + // Unblock workers + close(unblockCh) + + // Now 6th request should be processed + select { + case <-processed6: + case <-time.After(1 * time.Second): + t.Fatal("timeout waiting for 6th request") + } +} diff --git a/block/internal/syncing/height_store_mock.go b/block/internal/syncing/height_store_mock.go new file mode 100644 index 000000000..b4857accf --- /dev/null +++ b/block/internal/syncing/height_store_mock.go @@ -0,0 +1,113 @@ +// Code generated by mockery; DO NOT EDIT. +// github.com/vektra/mockery +// template: testify + +package syncing + +import ( + "context" + + "github.com/celestiaorg/go-header" + mock "github.com/stretchr/testify/mock" +) + +// NewMockHeightStore creates a new instance of MockHeightStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockHeightStore[H header.Header[H]](t interface { + mock.TestingT + Cleanup(func()) +}) *MockHeightStore[H] { + mock := &MockHeightStore[H]{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} + +// MockHeightStore is an autogenerated mock type for the HeightStore type +type MockHeightStore[H header.Header[H]] struct { + mock.Mock +} + +type MockHeightStore_Expecter[H header.Header[H]] struct { + mock *mock.Mock +} + +func (_m *MockHeightStore[H]) EXPECT() *MockHeightStore_Expecter[H] { + return &MockHeightStore_Expecter[H]{mock: &_m.Mock} +} + +// GetByHeight provides a mock function for the type MockHeightStore +func (_mock *MockHeightStore[H]) GetByHeight(ctx context.Context, height uint64) (H, uint64, error) { + ret := _mock.Called(ctx, height) + + if len(ret) == 0 { + panic("no return value specified for GetByHeight") + } + + var r0 H + var r1 uint64 + var r2 error + if returnFunc, ok := ret.Get(0).(func(context.Context, uint64) (H, uint64, error)); ok { + return returnFunc(ctx, height) + } + if returnFunc, ok := ret.Get(0).(func(context.Context, uint64) H); ok { + r0 = returnFunc(ctx, height) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(H) + } + } + if returnFunc, ok := ret.Get(1).(func(context.Context, uint64) uint64); ok { + r1 = returnFunc(ctx, height) + } else { + r1 = ret.Get(1).(uint64) + } + if returnFunc, ok := ret.Get(2).(func(context.Context, uint64) error); ok { + r2 = returnFunc(ctx, height) + } else { + r2 = ret.Error(2) + } + return r0, r1, r2 +} + +// MockHeightStore_GetByHeight_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetByHeight' +type MockHeightStore_GetByHeight_Call[H header.Header[H]] struct { + *mock.Call +} + +// GetByHeight is a helper method to define mock.On call +// - ctx context.Context +// - height uint64 +func (_e *MockHeightStore_Expecter[H]) GetByHeight(ctx interface{}, height interface{}) *MockHeightStore_GetByHeight_Call[H] { + return &MockHeightStore_GetByHeight_Call[H]{Call: _e.mock.On("GetByHeight", ctx, height)} +} + +func (_c *MockHeightStore_GetByHeight_Call[H]) Run(run func(ctx context.Context, height uint64)) *MockHeightStore_GetByHeight_Call[H] { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 uint64 + if args[1] != nil { + arg1 = args[1].(uint64) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *MockHeightStore_GetByHeight_Call[H]) Return(v H, v1 uint64, err error) *MockHeightStore_GetByHeight_Call[H] { + _c.Call.Return(v, v1, err) + return _c +} + +func (_c *MockHeightStore_GetByHeight_Call[H]) RunAndReturn(run func(ctx context.Context, height uint64) (H, uint64, error)) *MockHeightStore_GetByHeight_Call[H] { + _c.Call.Return(run) + return _c +} diff --git a/block/internal/syncing/p2p_handler.go b/block/internal/syncing/p2p_handler.go index d8c10bc4c..18c3e6c63 100644 --- a/block/internal/syncing/p2p_handler.go +++ b/block/internal/syncing/p2p_handler.go @@ -6,13 +6,13 @@ import ( "fmt" "sync/atomic" - goheader "github.com/celestiaorg/go-header" + "github.com/celestiaorg/go-header" + "github.com/evstack/ev-node/types" "github.com/rs/zerolog" "github.com/evstack/ev-node/block/internal/cache" "github.com/evstack/ev-node/block/internal/common" "github.com/evstack/ev-node/pkg/genesis" - "github.com/evstack/ev-node/types" ) type p2pHandler interface { @@ -20,6 +20,11 @@ type p2pHandler interface { SetProcessedHeight(height uint64) } +// HeightStore is a subset of goheader.Store +type HeightStore[H header.Header[H]] interface { + GetByHeight(ctx context.Context, height uint64) (H, uint64, error) +} + // P2PHandler coordinates block retrieval from P2P stores for the syncer. // It waits for both header and data to be available at a given height, // validates their consistency, and emits events to the syncer for processing. @@ -27,8 +32,8 @@ type p2pHandler interface { // The handler maintains a processedHeight to track the highest block that has been // successfully validated and sent to the syncer, preventing duplicate processing. type P2PHandler struct { - headerStore goheader.Store[*types.SignedHeader] - dataStore goheader.Store[*types.Data] + headerStore HeightStore[*types.SignedHeader] + dataStore HeightStore[*types.Data] cache cache.CacheManager genesis genesis.Genesis logger zerolog.Logger @@ -38,8 +43,8 @@ type P2PHandler struct { // NewP2PHandler creates a new P2P handler. func NewP2PHandler( - headerStore goheader.Store[*types.SignedHeader], - dataStore goheader.Store[*types.Data], + headerStore HeightStore[*types.SignedHeader], + dataStore HeightStore[*types.Data], cache cache.CacheManager, genesis genesis.Genesis, logger zerolog.Logger, @@ -74,7 +79,7 @@ func (h *P2PHandler) ProcessHeight(ctx context.Context, height uint64, heightInC return nil } - header, err := h.headerStore.GetByHeight(ctx, height) + header, headerDAHint, err := h.headerStore.GetByHeight(ctx, height) if err != nil { if ctx.Err() == nil { h.logger.Debug().Uint64("height", height).Err(err).Msg("header unavailable in store") @@ -86,14 +91,13 @@ func (h *P2PHandler) ProcessHeight(ctx context.Context, height uint64, heightInC return err } - data, err := h.dataStore.GetByHeight(ctx, height) + data, dataDAHint, err := h.dataStore.GetByHeight(ctx, height) if err != nil { if ctx.Err() == nil { h.logger.Debug().Uint64("height", height).Err(err).Msg("data unavailable in store") } return err } - dataCommitment := data.DACommitment() if !bytes.Equal(header.DataHash[:], dataCommitment[:]) { err := fmt.Errorf("data hash mismatch: header %x, data %x", header.DataHash, dataCommitment) @@ -104,10 +108,10 @@ func (h *P2PHandler) ProcessHeight(ctx context.Context, height uint64, heightInC // further header validation (signature) is done in validateBlock. // we need to be sure that the previous block n-1 was executed before validating block n event := common.DAHeightEvent{ - Header: header, - Data: data, - DaHeight: 0, - Source: common.SourceP2P, + Header: header, + Data: data, + Source: common.SourceP2P, + DaHeightHints: [2]uint64{headerDAHint, dataDAHint}, } select { diff --git a/block/internal/syncing/p2p_handler_test.go b/block/internal/syncing/p2p_handler_test.go index dfab41faa..597090047 100644 --- a/block/internal/syncing/p2p_handler_test.go +++ b/block/internal/syncing/p2p_handler_test.go @@ -18,7 +18,6 @@ import ( "github.com/evstack/ev-node/pkg/genesis" signerpkg "github.com/evstack/ev-node/pkg/signer" "github.com/evstack/ev-node/pkg/signer/noop" - extmocks "github.com/evstack/ev-node/test/mocks/external" "github.com/evstack/ev-node/types" ) @@ -57,8 +56,8 @@ func p2pMakeSignedHeader(t *testing.T, chainID string, height uint64, proposer [ // P2PTestData aggregates dependencies used by P2P handler tests. type P2PTestData struct { Handler *P2PHandler - HeaderStore *extmocks.MockStore[*types.SignedHeader] - DataStore *extmocks.MockStore[*types.Data] + HeaderStore *MockHeightStore[*types.SignedHeader] + DataStore *MockHeightStore[*types.Data] Cache cache.CacheManager Genesis genesis.Genesis ProposerAddr []byte @@ -73,8 +72,8 @@ func setupP2P(t *testing.T) *P2PTestData { gen := genesis.Genesis{ChainID: "p2p-test", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: proposerAddr} - headerStoreMock := extmocks.NewMockStore[*types.SignedHeader](t) - dataStoreMock := extmocks.NewMockStore[*types.Data](t) + headerStoreMock := NewMockHeightStore[*types.SignedHeader](t) + dataStoreMock := NewMockHeightStore[*types.Data](t) cfg := config.Config{ RootDir: t.TempDir(), @@ -137,8 +136,8 @@ func TestP2PHandler_ProcessHeight_EmitsEventWhenHeaderAndDataPresent(t *testing. require.NoError(t, err) header.Signature = sig - p.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(5)).Return(header, nil).Once() - p.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(5)).Return(data, nil).Once() + p.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(5)).Return(header, 0, nil).Once() + p.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(5)).Return(data, 0, nil).Once() ch := make(chan common.DAHeightEvent, 1) err = p.Handler.ProcessHeight(ctx, 5, ch) @@ -163,8 +162,8 @@ func TestP2PHandler_ProcessHeight_SkipsWhenDataMissing(t *testing.T) { require.NoError(t, err) header.Signature = sig - p.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(7)).Return(header, nil).Once() - p.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(7)).Return(nil, errors.New("missing")).Once() + p.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(7)).Return(header, 0, nil).Once() + p.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(7)).Return(nil, 0, errors.New("missing")).Once() ch := make(chan common.DAHeightEvent, 1) err = p.Handler.ProcessHeight(ctx, 7, ch) @@ -177,7 +176,7 @@ func TestP2PHandler_ProcessHeight_SkipsWhenHeaderMissing(t *testing.T) { p := setupP2P(t) ctx := context.Background() - p.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(9)).Return(nil, errors.New("missing")).Once() + p.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(9)).Return(nil, 0, errors.New("missing")).Once() ch := make(chan common.DAHeightEvent, 1) err := p.Handler.ProcessHeight(ctx, 9, ch) @@ -198,7 +197,7 @@ func TestP2PHandler_ProcessHeight_SkipsOnProposerMismatch(t *testing.T) { header := p2pMakeSignedHeader(t, p.Genesis.ChainID, 11, badAddr, pub, signer) header.DataHash = common.DataHashForEmptyTxs - p.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(11)).Return(header, nil).Once() + p.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(11)).Return(header, 0, nil).Once() ch := make(chan common.DAHeightEvent, 1) err = p.Handler.ProcessHeight(ctx, 11, ch) @@ -233,8 +232,8 @@ func TestP2PHandler_ProcessedHeightSkipsPreviouslyHandledBlocks(t *testing.T) { require.NoError(t, err) header.Signature = sig - p.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(6)).Return(header, nil).Once() - p.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(6)).Return(data, nil).Once() + p.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(6)).Return(header, 0, nil).Once() + p.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(6)).Return(data, 0, nil).Once() require.NoError(t, p.Handler.ProcessHeight(ctx, 6, ch)) @@ -256,8 +255,8 @@ func TestP2PHandler_SetProcessedHeightPreventsDuplicates(t *testing.T) { require.NoError(t, err) header.Signature = sig - p.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(8)).Return(header, nil).Once() - p.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(8)).Return(data, nil).Once() + p.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(8)).Return(header, 0, nil).Once() + p.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(8)).Return(data, 0, nil).Once() ch := make(chan common.DAHeightEvent, 1) require.NoError(t, p.Handler.ProcessHeight(ctx, 8, ch)) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 2c45fd163..0e8b61b64 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -12,20 +12,18 @@ import ( "sync/atomic" "time" - pubsub "github.com/libp2p/go-libp2p-pubsub" - "github.com/rs/zerolog" - "golang.org/x/sync/errgroup" - - coreda "github.com/evstack/ev-node/core/da" - coreexecutor "github.com/evstack/ev-node/core/execution" - "github.com/evstack/ev-node/block/internal/cache" "github.com/evstack/ev-node/block/internal/common" "github.com/evstack/ev-node/block/internal/da" + coreda "github.com/evstack/ev-node/core/da" + coreexecutor "github.com/evstack/ev-node/core/execution" "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/pkg/store" "github.com/evstack/ev-node/types" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/rs/zerolog" + "golang.org/x/sync/errgroup" ) // Syncer handles block synchronization from DA and P2P sources. @@ -52,8 +50,8 @@ type Syncer struct { daRetrieverHeight *atomic.Uint64 // P2P stores - headerStore common.Broadcaster[*types.SignedHeader] - dataStore common.Broadcaster[*types.Data] + headerStore common.HeaderP2PBroadcaster + dataStore common.DataP2PBroadcaster // Channels for coordination heightInCh chan common.DAHeightEvent @@ -74,6 +72,9 @@ type Syncer struct { // P2P wait coordination p2pWaitState atomic.Value // stores p2pWaitState + + // Async DA retriever + asyncDARetriever *AsyncDARetriever } // pendingForcedInclusionTx represents a forced inclusion transaction that hasn't been included yet @@ -93,8 +94,8 @@ func NewSyncer( metrics *common.Metrics, config config.Config, genesis genesis.Genesis, - headerStore common.Broadcaster[*types.SignedHeader], - dataStore common.Broadcaster[*types.Data], + headerStore common.HeaderP2PBroadcaster, + dataStore common.DataP2PBroadcaster, logger zerolog.Logger, options common.BlockOptions, errorCh chan<- error, @@ -131,8 +132,10 @@ func (s *Syncer) Start(ctx context.Context) error { // Initialize handlers s.daRetriever = NewDARetriever(s.daClient, s.cache, s.genesis, s.logger) + s.asyncDARetriever = NewAsyncDARetriever(s.daRetriever, s.heightInCh, s.logger) + s.asyncDARetriever.Start(s.ctx) s.fiRetriever = da.NewForcedInclusionRetriever(s.daClient, s.genesis, s.logger) - s.p2pHandler = NewP2PHandler(s.headerStore.Store(), s.dataStore.Store(), s.cache, s.genesis, s.logger) + s.p2pHandler = NewP2PHandler(s.headerStore, s.dataStore, s.cache, s.genesis, s.logger) if currentHeight, err := s.store.Height(s.ctx); err != nil { s.logger.Error().Err(err).Msg("failed to set initial processed height for p2p handler") } else { @@ -162,6 +165,9 @@ func (s *Syncer) Stop() error { if s.cancel != nil { s.cancel() } + if s.asyncDARetriever != nil { + s.asyncDARetriever.Stop() + } s.cancelP2PWait(0) s.wg.Wait() s.logger.Info().Msg("syncer stopped") @@ -448,6 +454,48 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { return } + // If this is a P2P event with a DA height hint, trigger targeted DA retrieval + // This allows us to fetch the block directly from the specified DA height instead of sequential scanning + if event.Source == common.SourceP2P { + var daHeightHints []uint64 + switch { + case event.DaHeightHints == [2]uint64{0, 0}: + // empty, nothing to do + case event.DaHeightHints[0] == 0: + // check only data + if _, exists := s.cache.GetDataDAIncluded(event.Data.Hash().String()); !exists { + daHeightHints = []uint64{event.DaHeightHints[1]} + } + case event.DaHeightHints[1] == 0: + // check only header + if _, exists := s.cache.GetHeaderDAIncluded(event.Header.Hash().String()); !exists { + daHeightHints = []uint64{event.DaHeightHints[0]} + } + default: + // check both + if _, exists := s.cache.GetHeaderDAIncluded(event.Header.Hash().String()); !exists { + daHeightHints = []uint64{event.DaHeightHints[0]} + } + if _, exists := s.cache.GetDataDAIncluded(event.Data.Hash().String()); !exists { + daHeightHints = append(daHeightHints, event.DaHeightHints[1]) + } + if len(daHeightHints) == 2 && daHeightHints[0] == daHeightHints[1] { + daHeightHints = daHeightHints[0:1] + } + } + if len(daHeightHints) > 0 { + for _, daHeightHint := range daHeightHints { + s.logger.Debug(). + Uint64("height", height). + Uint64("da_height_hint", daHeightHint). + Msg("P2P event with DA height hint, triggering targeted DA retrieval") + + // Trigger targeted DA retrieval in background via worker pool + s.asyncDARetriever.RequestRetrieval(daHeightHint) + } + } + } + // Last data must be got from store if the event comes from DA and the data hash is empty. // When if the event comes from P2P, the sequencer and then all the full nodes contains the data. if event.Source == common.SourceDA && bytes.Equal(event.Header.DataHash, common.DataHashForEmptyTxs) && currentHeight > 0 { diff --git a/block/internal/syncing/syncer_backoff_test.go b/block/internal/syncing/syncer_backoff_test.go index 65f258696..ed9cd4c40 100644 --- a/block/internal/syncing/syncer_backoff_test.go +++ b/block/internal/syncing/syncer_backoff_test.go @@ -83,14 +83,6 @@ func TestSyncer_BackoffOnDAError(t *testing.T) { mockDataStore := extmocks.NewMockStore[*types.Data](t) mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe() - headerStore := common.NewMockBroadcaster[*types.SignedHeader](t) - headerStore.EXPECT().Store().Return(mockHeaderStore).Maybe() - syncer.headerStore = headerStore - - dataStore := common.NewMockBroadcaster[*types.Data](t) - dataStore.EXPECT().Store().Return(mockDataStore).Maybe() - syncer.dataStore = dataStore - var callTimes []time.Time callCount := 0 @@ -179,14 +171,6 @@ func TestSyncer_BackoffResetOnSuccess(t *testing.T) { mockDataStore := extmocks.NewMockStore[*types.Data](t) mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe() - headerStore := common.NewMockBroadcaster[*types.SignedHeader](t) - headerStore.EXPECT().Store().Return(mockHeaderStore).Maybe() - syncer.headerStore = headerStore - - dataStore := common.NewMockBroadcaster[*types.Data](t) - dataStore.EXPECT().Store().Return(mockDataStore).Maybe() - syncer.dataStore = dataStore - var callTimes []time.Time // First call - error (should trigger backoff) @@ -269,14 +253,6 @@ func TestSyncer_BackoffBehaviorIntegration(t *testing.T) { mockDataStore := extmocks.NewMockStore[*types.Data](t) mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe() - headerStore := common.NewMockBroadcaster[*types.SignedHeader](t) - headerStore.EXPECT().Store().Return(mockHeaderStore).Maybe() - syncer.headerStore = headerStore - - dataStore := common.NewMockBroadcaster[*types.Data](t) - dataStore.EXPECT().Store().Return(mockDataStore).Maybe() - syncer.dataStore = dataStore - var callTimes []time.Time p2pHandler.On("SetProcessedHeight", mock.Anything).Return().Maybe() diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 5c16da443..5132b37ba 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -347,10 +347,7 @@ func TestSyncLoopPersistState(t *testing.T) { mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe() mockP2PHeaderStore := common.NewMockBroadcaster[*types.SignedHeader](t) - mockP2PHeaderStore.EXPECT().Store().Return(mockHeaderStore).Maybe() - mockP2PDataStore := common.NewMockBroadcaster[*types.Data](t) - mockP2PDataStore.EXPECT().Store().Return(mockDataStore).Maybe() errorCh := make(chan error, 1) syncerInst1 := NewSyncer( @@ -700,3 +697,79 @@ func TestSyncer_getHighestStoredDAHeight(t *testing.T) { highestDA = syncer.getHighestStoredDAHeight() assert.Equal(t, uint64(200), highestDA, "should return highest DA height from most recent included height") } + +func TestProcessHeightEvent_TriggersAsyncDARetrieval(t *testing.T) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + cm, err := cache.NewCacheManager(config.DefaultConfig(), zerolog.Nop()) + require.NoError(t, err) + + addr, _, _ := buildSyncTestSigner(t) + cfg := config.DefaultConfig() + gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} + + mockExec := testmocks.NewMockExecutor(t) + mockExec.EXPECT().InitChain(mock.Anything, mock.Anything, uint64(1), "tchain").Return([]byte("app0"), uint64(1024), nil).Once() + + s := NewSyncer( + st, + mockExec, + nil, + cm, + common.NopMetrics(), + cfg, + gen, + common.NewMockBroadcaster[*types.SignedHeader](t), + common.NewMockBroadcaster[*types.Data](t), + zerolog.Nop(), + common.DefaultBlockOptions(), + make(chan error, 1), + ) + require.NoError(t, s.initializeState()) + s.ctx = context.Background() + + // Mock AsyncDARetriever + mockRetriever := NewMockDARetriever(t) + asyncRetriever := NewAsyncDARetriever(mockRetriever, s.heightInCh, zerolog.Nop()) + // We don't start the async retriever to avoid race conditions in test, + // we just want to verify RequestRetrieval queues the request. + // However, RequestRetrieval writes to a channel, so we need a consumer or a buffered channel. + // The workCh is buffered (100), so we are good. + s.asyncDARetriever = asyncRetriever + + // Create event with DA height hint + evt := common.DAHeightEvent{ + Header: &types.SignedHeader{Header: types.Header{BaseHeader: types.BaseHeader{ChainID: "c", Height: 2}}}, + Data: &types.Data{Metadata: &types.Metadata{ChainID: "c", Height: 2}}, + Source: common.SourceP2P, + DaHeightHints: [2]uint64{100, 100}, + } + + // Current height is 0 (from init), event height is 2. + // processHeightEvent checks: + // 1. height <= currentHeight (2 <= 0 -> false) + // 2. height != currentHeight+1 (2 != 1 -> true) -> stores as pending event + + // We need to simulate height 1 being processed first so height 2 is "next" + // OR we can just test that it DOES NOT trigger DA retrieval if it's pending. + // Wait, the logic for DA retrieval is BEFORE the "next block" check? + // Let's check syncer.go... + // Yes, "If this is a P2P event with a DA height hint, trigger targeted DA retrieval" block is AFTER "If this is not the next block in sequence... return" + + // So we need to be at height 1 to process height 2. + // Let's set the store height to 1. + batch, err := st.NewBatch(context.Background()) + require.NoError(t, err) + require.NoError(t, batch.SetHeight(1)) + require.NoError(t, batch.Commit()) + + s.processHeightEvent(&evt) + + // Verify that the request was queued in the async retriever + select { + case h := <-asyncRetriever.workCh: + assert.Equal(t, uint64(100), h) + default: + t.Fatal("expected DA retrieval request to be queued") + } +} diff --git a/node/full.go b/node/full.go index 6d03a87c0..69be1d35b 100644 --- a/node/full.go +++ b/node/full.go @@ -31,7 +31,7 @@ import ( evsync "github.com/evstack/ev-node/pkg/sync" ) -// prefixes used in KV store to separate rollkit data from execution environment data (if the same data base is reused) +// EvPrefix used in KV store to separate rollkit data from execution environment data (if the same data base is reused) var EvPrefix = "0" const ( diff --git a/pkg/rpc/client/client_test.go b/pkg/rpc/client/client_test.go index 4b2b82e1b..05c8df4d0 100644 --- a/pkg/rpc/client/client_test.go +++ b/pkg/rpc/client/client_test.go @@ -8,6 +8,7 @@ import ( "time" goheader "github.com/celestiaorg/go-header" + "github.com/evstack/ev-node/pkg/sync" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" "github.com/rs/zerolog" @@ -28,10 +29,11 @@ import ( func setupTestServer( t *testing.T, mockStore *mocks.MockStore, - headerStore goheader.Store[*types.SignedHeader], - dataStore goheader.Store[*types.Data], + headerStore goheader.Store[*sync.SignedHeaderWithDAHint], + dataStore goheader.Store[*sync.DataWithDAHint], mockP2P *mocks.MockP2PRPC, ) (*httptest.Server, *Client) { + t.Helper() mux := http.NewServeMux() logger := zerolog.Nop() @@ -105,19 +107,19 @@ func TestClientGetMetadata(t *testing.T) { func TestClientGetP2PStoreInfo(t *testing.T) { mockStore := mocks.NewMockStore(t) mockP2P := mocks.NewMockP2PRPC(t) - headerStore := headerstoremocks.NewMockStore[*types.SignedHeader](t) - dataStore := headerstoremocks.NewMockStore[*types.Data](t) + headerStore := headerstoremocks.NewMockStore[*sync.SignedHeaderWithDAHint](t) + dataStore := headerstoremocks.NewMockStore[*sync.DataWithDAHint](t) now := time.Now().UTC() - headerHead := testSignedHeader(10, now) - headerTail := testSignedHeader(5, now.Add(-time.Minute)) + headerHead := &sync.SignedHeaderWithDAHint{Entry: testSignedHeader(10, now)} + headerTail := &sync.SignedHeaderWithDAHint{Entry: testSignedHeader(5, now.Add(-time.Minute))} headerStore.On("Height").Return(uint64(10)) headerStore.On("Head", mock.Anything).Return(headerHead, nil) headerStore.On("Tail", mock.Anything).Return(headerTail, nil) - dataHead := testData(8, now.Add(-30*time.Second)) - dataTail := testData(4, now.Add(-2*time.Minute)) + dataHead := &sync.DataWithDAHint{Entry: testData(8, now.Add(-30*time.Second))} + dataTail := &sync.DataWithDAHint{Entry: testData(4, now.Add(-2*time.Minute))} dataStore.On("Height").Return(uint64(8)) dataStore.On("Head", mock.Anything).Return(dataHead, nil) dataStore.On("Tail", mock.Anything).Return(dataTail, nil) diff --git a/pkg/rpc/server/server.go b/pkg/rpc/server/server.go index e0abed2de..f113b52fb 100644 --- a/pkg/rpc/server/server.go +++ b/pkg/rpc/server/server.go @@ -14,6 +14,7 @@ import ( "connectrpc.com/grpcreflect" goheader "github.com/celestiaorg/go-header" coreda "github.com/evstack/ev-node/core/da" + "github.com/evstack/ev-node/pkg/sync" ds "github.com/ipfs/go-datastore" "github.com/rs/zerolog" "golang.org/x/net/http2" @@ -34,16 +35,16 @@ var _ rpc.StoreServiceHandler = (*StoreServer)(nil) // StoreServer implements the StoreService defined in the proto file type StoreServer struct { store store.Store - headerStore goheader.Store[*types.SignedHeader] - dataStore goheader.Store[*types.Data] + headerStore goheader.Store[*sync.SignedHeaderWithDAHint] + dataStore goheader.Store[*sync.DataWithDAHint] logger zerolog.Logger } // NewStoreServer creates a new StoreServer instance func NewStoreServer( store store.Store, - headerStore goheader.Store[*types.SignedHeader], - dataStore goheader.Store[*types.Data], + headerStore goheader.Store[*sync.SignedHeaderWithDAHint], + dataStore goheader.Store[*sync.DataWithDAHint], logger zerolog.Logger, ) *StoreServer { return &StoreServer{ @@ -370,8 +371,8 @@ func (p *P2PServer) GetNetInfo( // NewServiceHandler creates a new HTTP handler for Store, P2P and Config services func NewServiceHandler( store store.Store, - headerStore goheader.Store[*types.SignedHeader], - dataStore goheader.Store[*types.Data], + headerStore goheader.Store[*sync.SignedHeaderWithDAHint], + dataStore goheader.Store[*sync.DataWithDAHint], peerManager p2p.P2PRPC, proposerAddress []byte, logger zerolog.Logger, diff --git a/pkg/rpc/server/server_test.go b/pkg/rpc/server/server_test.go index 32e9b0ebe..3a4dbd89b 100644 --- a/pkg/rpc/server/server_test.go +++ b/pkg/rpc/server/server_test.go @@ -11,6 +11,7 @@ import ( "time" "connectrpc.com/connect" + "github.com/evstack/ev-node/pkg/sync" ds "github.com/ipfs/go-datastore" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" @@ -325,8 +326,8 @@ func TestGetGenesisDaHeight_InvalidLength(t *testing.T) { func TestGetP2PStoreInfo(t *testing.T) { t.Run("returns snapshots for configured stores", func(t *testing.T) { mockStore := mocks.NewMockStore(t) - headerStore := headerstoremocks.NewMockStore[*types.SignedHeader](t) - dataStore := headerstoremocks.NewMockStore[*types.Data](t) + headerStore := headerstoremocks.NewMockStore[*sync.SignedHeaderWithDAHint](t) + dataStore := headerstoremocks.NewMockStore[*sync.DataWithDAHint](t) logger := zerolog.Nop() server := NewStoreServer(mockStore, headerStore, dataStore, logger) @@ -354,10 +355,10 @@ func TestGetP2PStoreInfo(t *testing.T) { t.Run("returns error when a store edge fails", func(t *testing.T) { mockStore := mocks.NewMockStore(t) - headerStore := headerstoremocks.NewMockStore[*types.SignedHeader](t) + headerStore := headerstoremocks.NewMockStore[*sync.SignedHeaderWithDAHint](t) logger := zerolog.Nop() headerStore.On("Height").Return(uint64(0)) - headerStore.On("Head", mock.Anything).Return((*types.SignedHeader)(nil), fmt.Errorf("boom")) + headerStore.On("Head", mock.Anything).Return((*sync.SignedHeaderWithDAHint)(nil), fmt.Errorf("boom")) server := NewStoreServer(mockStore, headerStore, nil, logger) resp, err := server.GetP2PStoreInfo(context.Background(), connect.NewRequest(&emptypb.Empty{})) @@ -627,8 +628,8 @@ func TestHealthReadyEndpoint(t *testing.T) { }) } -func makeTestSignedHeader(height uint64, ts time.Time) *types.SignedHeader { - return &types.SignedHeader{ +func makeTestSignedHeader(height uint64, ts time.Time) *sync.SignedHeaderWithDAHint { + return &sync.SignedHeaderWithDAHint{Entry: &types.SignedHeader{ Header: types.Header{ BaseHeader: types.BaseHeader{ Height: height, @@ -639,15 +640,17 @@ func makeTestSignedHeader(height uint64, ts time.Time) *types.SignedHeader { DataHash: []byte{0x02}, AppHash: []byte{0x03}, }, + }, } } -func makeTestData(height uint64, ts time.Time) *types.Data { - return &types.Data{ +func makeTestData(height uint64, ts time.Time) *sync.DataWithDAHint { + return &sync.DataWithDAHint{Entry: &types.Data{ Metadata: &types.Metadata{ ChainID: "test-chain", Height: height, Time: uint64(ts.UnixNano()), }, + }, } } diff --git a/pkg/sync/da_hint_container.go b/pkg/sync/da_hint_container.go new file mode 100644 index 000000000..5d904d885 --- /dev/null +++ b/pkg/sync/da_hint_container.go @@ -0,0 +1,81 @@ +package sync + +import ( + "encoding/binary" + "fmt" + "time" + + "github.com/celestiaorg/go-header" + "github.com/evstack/ev-node/types" +) + +type SignedHeaderWithDAHint = DAHeightHintContainer[*types.SignedHeader] +type DataWithDAHint = DAHeightHintContainer[*types.Data] + +type DAHeightHintContainer[H header.Header[H]] struct { + Entry H + DAHeightHint uint64 +} + +func (s *DAHeightHintContainer[H]) ChainID() string { + return s.Entry.ChainID() +} + +func (s *DAHeightHintContainer[H]) Hash() header.Hash { + return s.Entry.Hash() +} + +func (s *DAHeightHintContainer[H]) Height() uint64 { + return s.Entry.Height() +} + +func (s *DAHeightHintContainer[H]) LastHeader() header.Hash { + return s.Entry.LastHeader() +} + +func (s *DAHeightHintContainer[H]) Time() time.Time { + return s.Entry.Time() +} + +func (s *DAHeightHintContainer[H]) Validate() error { + return s.Entry.Validate() +} + +func (s *DAHeightHintContainer[H]) New() *DAHeightHintContainer[H] { + var empty H + return &DAHeightHintContainer[H]{Entry: empty.New()} +} + +func (sh *DAHeightHintContainer[H]) Verify(untrstH *DAHeightHintContainer[H]) error { + return sh.Entry.Verify(untrstH.Entry) +} + +func (s *DAHeightHintContainer[H]) SetDAHint(daHeight uint64) { + s.DAHeightHint = daHeight +} +func (s *DAHeightHintContainer[H]) DAHint() uint64 { + return s.DAHeightHint +} + +func (s *DAHeightHintContainer[H]) IsZero() bool { + return s == nil +} + +func (s *DAHeightHintContainer[H]) MarshalBinary() ([]byte, error) { + bz, err := s.Entry.MarshalBinary() + if err != nil { + return nil, err + } + out := make([]byte, 8+len(bz)) + binary.BigEndian.PutUint64(out, s.DAHeightHint) + copy(out[8:], bz) + return out, nil +} + +func (s *DAHeightHintContainer[H]) UnmarshalBinary(data []byte) error { + if len(data) < 8 { + return fmt.Errorf("invalid length: %d", len(data)) + } + s.DAHeightHint = binary.BigEndian.Uint64(data) + return s.Entry.UnmarshalBinary(data[8:]) +} diff --git a/pkg/sync/sync_service.go b/pkg/sync/sync_service.go index 9b018460e..22d7c3c90 100644 --- a/pkg/sync/sync_service.go +++ b/pkg/sync/sync_service.go @@ -36,10 +36,22 @@ const ( // TODO: when we add pruning we can remove this const ninetyNineYears = 99 * 365 * 24 * time.Hour +type EntityWithDAHint[H any] interface { + header.Header[H] + SetDAHint(daHeight uint64) + DAHint() uint64 +} + +// HeaderSyncService is the P2P Sync Service for headers. +type HeaderSyncService = SyncService[*types.SignedHeader] + +// DataSyncService is the P2P Sync Service for blocks. +type DataSyncService = SyncService[*types.Data] + // SyncService is the P2P Sync Service for blocks and headers. // // Uses the go-header library for handling all P2P logic. -type SyncService[H header.Header[H]] struct { +type SyncService[V header.Header[V]] struct { conf config.Config logger zerolog.Logger syncType syncType @@ -48,22 +60,16 @@ type SyncService[H header.Header[H]] struct { p2p *p2p.Client - ex *goheaderp2p.Exchange[H] - sub *goheaderp2p.Subscriber[H] - p2pServer *goheaderp2p.ExchangeServer[H] - store *goheaderstore.Store[H] - syncer *goheadersync.Syncer[H] + ex *goheaderp2p.Exchange[*DAHeightHintContainer[V]] + sub *goheaderp2p.Subscriber[*DAHeightHintContainer[V]] + p2pServer *goheaderp2p.ExchangeServer[*DAHeightHintContainer[V]] + store *goheaderstore.Store[*DAHeightHintContainer[V]] + syncer *goheadersync.Syncer[*DAHeightHintContainer[V]] syncerStatus *SyncerStatus - topicSubscription header.Subscription[H] + topicSubscription header.Subscription[*DAHeightHintContainer[V]] storeInitialized atomic.Bool } -// DataSyncService is the P2P Sync Service for blocks. -type DataSyncService = SyncService[*types.Data] - -// HeaderSyncService is the P2P Sync Service for headers. -type HeaderSyncService = SyncService[*types.SignedHeader] - // NewDataSyncService returns a new DataSyncService. func NewDataSyncService( store ds.Batching, @@ -86,19 +92,19 @@ func NewHeaderSyncService( return newSyncService[*types.SignedHeader](store, headerSync, conf, genesis, p2p, logger) } -func newSyncService[H header.Header[H]]( +func newSyncService[V header.Header[V]]( store ds.Batching, syncType syncType, conf config.Config, genesis genesis.Genesis, p2p *p2p.Client, logger zerolog.Logger, -) (*SyncService[H], error) { +) (*SyncService[V], error) { if p2p == nil { return nil, errors.New("p2p client cannot be nil") } - ss, err := goheaderstore.NewStore[H]( + ss, err := goheaderstore.NewStore[*DAHeightHintContainer[V]]( store, goheaderstore.WithStorePrefix(string(syncType)), goheaderstore.WithMetrics(), @@ -107,7 +113,7 @@ func newSyncService[H header.Header[H]]( return nil, fmt.Errorf("failed to initialize the %s store: %w", syncType, err) } - svc := &SyncService[H]{ + svc := &SyncService[V]{ conf: conf, genesis: genesis, p2p: p2p, @@ -121,21 +127,22 @@ func newSyncService[H header.Header[H]]( } // Store returns the store of the SyncService -func (syncService *SyncService[H]) Store() header.Store[H] { +func (syncService *SyncService[V]) Store() header.Store[*DAHeightHintContainer[V]] { return syncService.store } // WriteToStoreAndBroadcast initializes store if needed and broadcasts provided header or block. // Note: Only returns an error in case store can't be initialized. Logs error if there's one while broadcasting. -func (syncService *SyncService[H]) WriteToStoreAndBroadcast(ctx context.Context, headerOrData H, opts ...pubsub.PubOpt) error { +func (syncService *SyncService[V]) WriteToStoreAndBroadcast(ctx context.Context, payload V, opts ...pubsub.PubOpt) error { if syncService.genesis.InitialHeight == 0 { return fmt.Errorf("invalid initial height; cannot be zero") } - if headerOrData.IsZero() { + if payload.IsZero() { return fmt.Errorf("empty header/data cannot write to store or broadcast") } + headerOrData := &DAHeightHintContainer[V]{Entry: payload} storeInitialized := false if syncService.storeInitialized.CompareAndSwap(false, true) { var err error @@ -174,8 +181,33 @@ func (syncService *SyncService[H]) WriteToStoreAndBroadcast(ctx context.Context, return nil } +func (s *SyncService[V]) AppendDAHint(ctx context.Context, daHeight uint64, hashes ...types.Hash) error { + entries := make([]*DAHeightHintContainer[V], 0, len(hashes)) + for _, h := range hashes { + v, err := s.store.Get(ctx, h) + if err != nil { + if errors.Is(err, header.ErrNotFound) { + continue + } + return err + } + v.SetDAHint(daHeight) + entries = append(entries, v) + } + return s.store.Append(ctx, entries...) +} + +func (s *SyncService[V]) GetByHeight(ctx context.Context, height uint64) (V, uint64, error) { + c, err := s.store.GetByHeight(ctx, height) + if err != nil { + var zero V + return zero, 0, err + } + return c.Entry, c.DAHint(), nil +} + // Start is a part of Service interface. -func (syncService *SyncService[H]) Start(ctx context.Context) error { +func (syncService *SyncService[V]) Start(ctx context.Context) error { // setup P2P infrastructure, but don't start Subscriber yet. peerIDs, err := syncService.setupP2PInfrastructure(ctx) if err != nil { @@ -183,7 +215,7 @@ func (syncService *SyncService[H]) Start(ctx context.Context) error { } // create syncer, must be before initFromP2PWithRetry which calls startSyncer. - if syncService.syncer, err = newSyncer( + if syncService.syncer, err = newSyncer[V]( syncService.ex, syncService.store, syncService.sub, @@ -226,7 +258,7 @@ func (syncService *SyncService[H]) startSyncer(ctx context.Context) error { // initStore initializes the store with the given initial header. // it is a no-op if the store is already initialized. // Returns true when the store was initialized by this call. -func (syncService *SyncService[H]) initStore(ctx context.Context, initial H) (bool, error) { +func (syncService *SyncService[V]) initStore(ctx context.Context, initial *DAHeightHintContainer[V]) (bool, error) { if initial.IsZero() { return false, errors.New("failed to initialize the store") } @@ -250,7 +282,7 @@ func (syncService *SyncService[H]) initStore(ctx context.Context, initial H) (bo // setupP2PInfrastructure sets up the P2P infrastructure (Exchange, ExchangeServer, Store) // but does not start the Subscriber. Returns peer IDs for later use. -func (syncService *SyncService[H]) setupP2PInfrastructure(ctx context.Context) ([]peer.ID, error) { +func (syncService *SyncService[V]) setupP2PInfrastructure(ctx context.Context) ([]peer.ID, error) { ps := syncService.p2p.PubSub() _, _, chainID, err := syncService.p2p.Info() @@ -260,7 +292,7 @@ func (syncService *SyncService[H]) setupP2PInfrastructure(ctx context.Context) ( networkID := syncService.getNetworkID(chainID) // Create subscriber but DON'T start it yet - syncService.sub, err = goheaderp2p.NewSubscriber[H]( + syncService.sub, err = goheaderp2p.NewSubscriber[*DAHeightHintContainer[V]]( ps, pubsub.DefaultMsgIdFn, goheaderp2p.WithSubscriberNetworkID(networkID), @@ -283,7 +315,7 @@ func (syncService *SyncService[H]) setupP2PInfrastructure(ctx context.Context) ( peerIDs := syncService.getPeerIDs() - if syncService.ex, err = newP2PExchange[H](syncService.p2p.Host(), peerIDs, networkID, syncService.genesis.ChainID, syncService.p2p.ConnectionGater()); err != nil { + if syncService.ex, err = newP2PExchange[*DAHeightHintContainer[V]](syncService.p2p.Host(), peerIDs, networkID, syncService.genesis.ChainID, syncService.p2p.ConnectionGater()); err != nil { return nil, fmt.Errorf("error while creating exchange: %w", err) } if err := syncService.ex.Start(ctx); err != nil { @@ -311,14 +343,14 @@ func (syncService *SyncService[H]) startSubscriber(ctx context.Context) error { // It inspects the local store to determine the first height to request: // - when the store already contains items, it reuses the latest height as the starting point; // - otherwise, it falls back to the configured genesis height. -func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, peerIDs []peer.ID) error { +func (syncService *SyncService[V]) initFromP2PWithRetry(ctx context.Context, peerIDs []peer.ID) error { if len(peerIDs) == 0 { return nil } tryInit := func(ctx context.Context) (bool, error) { var ( - trusted H + trusted *DAHeightHintContainer[V] err error heightToQuery uint64 ) @@ -382,7 +414,7 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee // Stop is a part of Service interface. // // `store` is closed last because it's used by other services. -func (syncService *SyncService[H]) Stop(ctx context.Context) error { +func (syncService *SyncService[V]) Stop(ctx context.Context) error { // unsubscribe from topic first so that sub.Stop() does not fail syncService.topicSubscription.Cancel() err := errors.Join( @@ -428,17 +460,17 @@ func newP2PExchange[H header.Header[H]]( // newSyncer constructs new Syncer for headers/blocks. func newSyncer[H header.Header[H]]( - ex header.Exchange[H], - store header.Store[H], - sub header.Subscriber[H], + ex header.Exchange[*DAHeightHintContainer[H]], + store header.Store[*DAHeightHintContainer[H]], + sub header.Subscriber[*DAHeightHintContainer[H]], opts []goheadersync.Option, -) (*goheadersync.Syncer[H], error) { +) (*goheadersync.Syncer[*DAHeightHintContainer[H]], error) { opts = append(opts, goheadersync.WithMetrics(), goheadersync.WithPruningWindow(ninetyNineYears), goheadersync.WithTrustingPeriod(ninetyNineYears), ) - return goheadersync.NewSyncer(ex, store, sub, opts...) + return goheadersync.NewSyncer[*DAHeightHintContainer[H]](ex, store, sub, opts...) } func (syncService *SyncService[H]) getNetworkID(network string) string { diff --git a/pkg/sync/sync_service_test.go b/pkg/sync/sync_service_test.go index 93603752a..b0e244a95 100644 --- a/pkg/sync/sync_service_test.go +++ b/pkg/sync/sync_service_test.go @@ -167,6 +167,181 @@ func TestHeaderSyncServiceInitFromHigherHeight(t *testing.T) { require.NoError(t, svc.WriteToStoreAndBroadcast(ctx, signedHeader)) } +func TestDAHintStorageHeader(t *testing.T) { + mainKV := sync.MutexWrap(datastore.NewMapDatastore()) + pk, _, err := crypto.GenerateEd25519Key(cryptoRand.Reader) + require.NoError(t, err) + noopSigner, err := noop.NewNoopSigner(pk) + require.NoError(t, err) + rnd := rand.New(rand.NewSource(1)) // nolint:gosec // test code only + mn := mocknet.New() + + chainId := "test-chain-id" + + proposerAddr := []byte("test") + genesisDoc := genesispkg.Genesis{ + ChainID: chainId, + StartTime: time.Now(), + InitialHeight: 1, + ProposerAddress: proposerAddr, + } + conf := config.DefaultConfig() + conf.RootDir = t.TempDir() + nodeKey, err := key.LoadOrGenNodeKey(filepath.Dir(conf.ConfigPath())) + require.NoError(t, err) + logger := zerolog.Nop() + priv := nodeKey.PrivKey + p2pHost, err := mn.AddPeer(priv, nil) + require.NoError(t, err) + + p2pClient, err := p2p.NewClientWithHost(conf.P2P, nodeKey.PrivKey, mainKV, chainId, logger, p2p.NopMetrics(), p2pHost) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + require.NoError(t, p2pClient.Start(ctx)) + + headerSvc, err := NewHeaderSyncService(mainKV, conf, genesisDoc, p2pClient, logger) + require.NoError(t, err) + require.NoError(t, headerSvc.Start(ctx)) + + headerConfig := types.HeaderConfig{ + Height: genesisDoc.InitialHeight, + DataHash: bytesN(rnd, 32), + AppHash: bytesN(rnd, 32), + Signer: noopSigner, + } + signedHeader, err := types.GetRandomSignedHeaderCustom(&headerConfig, genesisDoc.ChainID) + require.NoError(t, err) + require.NoError(t, signedHeader.Validate()) + + require.NoError(t, headerSvc.WriteToStoreAndBroadcast(ctx, signedHeader)) + + daHeight := uint64(100) + require.NoError(t, headerSvc.AppendDAHint(ctx, daHeight, signedHeader.Hash())) + + h, hint, err := headerSvc.GetByHeight(ctx, signedHeader.Height()) + require.NoError(t, err) + require.Equal(t, signedHeader.Hash(), h.Hash()) + require.Equal(t, daHeight, hint) + + _ = p2pClient.Close() + _ = headerSvc.Stop(ctx) + cancel() + + // Restart + h2, err := mn.AddPeer(priv, nil) + require.NoError(t, err) + p2pClient, err = p2p.NewClientWithHost(conf.P2P, nodeKey.PrivKey, mainKV, chainId, logger, p2p.NopMetrics(), h2) + require.NoError(t, err) + + ctx, cancel = context.WithCancel(t.Context()) + defer cancel() + require.NoError(t, p2pClient.Start(ctx)) + t.Cleanup(func() { _ = p2pClient.Close() }) + + headerSvc, err = NewHeaderSyncService(mainKV, conf, genesisDoc, p2pClient, logger) + require.NoError(t, err) + require.NoError(t, headerSvc.Start(ctx)) + t.Cleanup(func() { _ = headerSvc.Stop(context.Background()) }) + + h, hint, err = headerSvc.GetByHeight(ctx, signedHeader.Height()) + require.NoError(t, err) + require.Equal(t, signedHeader.Hash(), h.Hash()) + require.Equal(t, daHeight, hint) +} + +func TestDAHintStorageData(t *testing.T) { + mainKV := sync.MutexWrap(datastore.NewMapDatastore()) + pk, _, err := crypto.GenerateEd25519Key(cryptoRand.Reader) + require.NoError(t, err) + noopSigner, err := noop.NewNoopSigner(pk) + require.NoError(t, err) + rnd := rand.New(rand.NewSource(1)) // nolint:gosec // test code only + mn := mocknet.New() + + chainId := "test-chain-id" + + proposerAddr := []byte("test") + genesisDoc := genesispkg.Genesis{ + ChainID: chainId, + StartTime: time.Now(), + InitialHeight: 1, + ProposerAddress: proposerAddr, + } + conf := config.DefaultConfig() + conf.RootDir = t.TempDir() + nodeKey, err := key.LoadOrGenNodeKey(filepath.Dir(conf.ConfigPath())) + require.NoError(t, err) + logger := zerolog.Nop() + priv := nodeKey.PrivKey + p2pHost, err := mn.AddPeer(priv, nil) + require.NoError(t, err) + + p2pClient, err := p2p.NewClientWithHost(conf.P2P, nodeKey.PrivKey, mainKV, chainId, logger, p2p.NopMetrics(), p2pHost) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + require.NoError(t, p2pClient.Start(ctx)) + + dataSvc, err := NewDataSyncService(mainKV, conf, genesisDoc, p2pClient, logger) + require.NoError(t, err) + require.NoError(t, dataSvc.Start(ctx)) + + // Need a valid header height for data metadata + headerConfig := types.HeaderConfig{ + Height: genesisDoc.InitialHeight, + DataHash: bytesN(rnd, 32), + AppHash: bytesN(rnd, 32), + Signer: noopSigner, + } + signedHeader, err := types.GetRandomSignedHeaderCustom(&headerConfig, genesisDoc.ChainID) + require.NoError(t, err) + + data := types.Data{ + Txs: types.Txs{[]byte("tx1")}, + Metadata: &types.Metadata{ + Height: signedHeader.Height(), + }, + } + + require.NoError(t, dataSvc.WriteToStoreAndBroadcast(ctx, &data)) + + daHeight := uint64(100) + require.NoError(t, dataSvc.AppendDAHint(ctx, daHeight, data.Hash())) + + d, hint, err := dataSvc.GetByHeight(ctx, signedHeader.Height()) + require.NoError(t, err) + require.Equal(t, data.Hash(), d.Hash()) + require.Equal(t, daHeight, hint) + + _ = p2pClient.Close() + _ = dataSvc.Stop(ctx) + cancel() + + // Restart + h2, err := mn.AddPeer(priv, nil) + require.NoError(t, err) + p2pClient, err = p2p.NewClientWithHost(conf.P2P, nodeKey.PrivKey, mainKV, chainId, logger, p2p.NopMetrics(), h2) + require.NoError(t, err) + + ctx, cancel = context.WithCancel(t.Context()) + defer cancel() + require.NoError(t, p2pClient.Start(ctx)) + t.Cleanup(func() { _ = p2pClient.Close() }) + + dataSvc, err = NewDataSyncService(mainKV, conf, genesisDoc, p2pClient, logger) + require.NoError(t, err) + require.NoError(t, dataSvc.Start(ctx)) + t.Cleanup(func() { _ = dataSvc.Stop(context.Background()) }) + + d, hint, err = dataSvc.GetByHeight(ctx, signedHeader.Height()) + require.NoError(t, err) + require.Equal(t, data.Hash(), d.Hash()) + require.Equal(t, daHeight, hint) +} + func nextHeader(t *testing.T, previousHeader *types.SignedHeader, chainID string, noopSigner signer.Signer) *types.SignedHeader { newSignedHeader := &types.SignedHeader{ Header: types.GetRandomNextHeader(previousHeader.Header, chainID), @@ -178,8 +353,7 @@ func nextHeader(t *testing.T, previousHeader *types.SignedHeader, chainID string require.NoError(t, err) newSignedHeader.Signature = signature require.NoError(t, newSignedHeader.Validate()) - previousHeader = newSignedHeader - return previousHeader + return newSignedHeader } func bytesN(r *rand.Rand, n int) []byte { diff --git a/test/e2e/go.sum b/test/e2e/go.sum index cf58c8349..c5b535343 100644 --- a/test/e2e/go.sum +++ b/test/e2e/go.sum @@ -72,6 +72,8 @@ github.com/btcsuite/btcd/btcutil v1.1.6 h1:zFL2+c3Lb9gEgqKNzowKUPQNb8jV7v5Oaodi/ github.com/btcsuite/btcd/btcutil v1.1.6/go.mod h1:9dFymx8HpuLqBnsPELrImQeTQfKBQqzqGbbV3jK55aE= github.com/celestiaorg/go-header v0.7.4 h1:kQx3bVvKV+H2etxRi4IUuby5VQydBONx3giHFXDcZ/o= github.com/celestiaorg/go-header v0.7.4/go.mod h1:eX9iTSPthVEAlEDLux40ZT/olXPGhpxHd+mEzJeDhd0= +github.com/celestiaorg/go-libp2p-messenger v0.2.2 h1:osoUfqjss7vWTIZrrDSy953RjQz+ps/vBFE7bychLEc= +github.com/celestiaorg/go-libp2p-messenger v0.2.2/go.mod h1:oTCRV5TfdO7V/k6nkx7QjQzGrWuJbupv+0o1cgnY2i4= github.com/celestiaorg/go-square/v3 v3.0.2 h1:eSQOgNII8inK9IhiBZ+6GADQeWbRq4HYY72BOgcduA4= github.com/celestiaorg/go-square/v3 v3.0.2/go.mod h1:oFReMLsSDMRs82ICFEeFQFCqNvwdsbIM1BzCcb0f7dM= github.com/celestiaorg/tastora v0.8.0 h1:+FWAIsP2onwwqPTGzBLIBtx8B1h9sImdx4msv2N4DsI=