Skip to content
Closed
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
1d10848
feat: stream events flush with additional data
jeffwoooo Feb 25, 2025
5febcbb
add mq config
0xF0D0 Feb 25, 2025
de48fa7
fix typo
0xF0D0 Feb 25, 2025
6e850ee
feat: new event manager inside sdk context
jeffwoooo Feb 27, 2025
07c18c7
feat: flush stream events on commit
jeffwoooo Feb 27, 2025
730182b
feat: abci event placeholder
jeffwoooo Mar 5, 2025
b69b60d
feat: remove app event type
jeffwoooo Mar 5, 2025
49152fe
fix: nil added while emit events
jeffwoooo Mar 5, 2025
b4ddb21
feat: put event placeholder into abci event manager
jeffwoooo Mar 6, 2025
a304533
feat: unit test for publish events
jeffwoooo Mar 6, 2025
0bb11d7
feat: separate event channel
jeffwoooo Mar 11, 2025
4edb934
feat: temporary implement serializable for PublishEventFlush
jeffwoooo Mar 12, 2025
68a80f9
feat: temporary implement serializable for PublishEventFlush
jeffwoooo Mar 12, 2025
87ac127
feat: remove mq interface implementation from publish event flush
jeffwoooo Mar 12, 2025
cb7f9b9
feat: remove ToString from publish event interface
jeffwoooo Mar 13, 2025
2e74318
feat: distinct block events and tx events
jeffwoooo Mar 13, 2025
c7bca73
feat: const placeholder event type
jeffwoooo Mar 13, 2025
f861738
feat: event true order typing
jeffwoooo Mar 13, 2025
2d7453b
refactor: filter and order events after block execution
jeffwoooo Mar 13, 2025
11ec3f0
fix: flush chain stream event
jeffwoooo Mar 26, 2025
9c567c9
Merge branch 'v0.50.x-comet1-inj-nm' into v0.50.9-inj-2-data-pipeline
albertchon Mar 26, 2025
45068f8
fix: test import
albertchon Mar 26, 2025
c647364
fix(test): apply abci update
jeffwoooo Mar 27, 2025
e86e4cf
chore: remove test logging
jeffwoooo Mar 27, 2025
8072ad3
add config for control port
0xF0D0 May 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,8 @@ func (app *BaseApp) VerifyVoteExtension(req *abci.VerifyVoteExtensionRequest) (r
// must be used.
func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.FinalizeBlockRequest) (*abci.FinalizeBlockResponse, error) {
var events []abci.Event
var publishEvents sdk.PublishEvents
var txEventSet []EventSet

if err := app.checkHalt(req.Height, req.Time); err != nil {
Comment on lines 711 to 717

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change potentially affects state.

Call sequence:

(*github.com/cosmos/cosmos-sdk/baseapp.BaseApp).internalFinalizeBlock (baseapp/abci.go:712)
(*github.com/cosmos/cosmos-sdk/baseapp.BaseApp).FinalizeBlock (baseapp/abci.go:885)

return nil, err
Expand Down Expand Up @@ -803,6 +805,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
// continue
}

publishEvents = append(publishEvents, app.finalizeBlockState.Context().PublishEventManager().Events()...)
events = append(events, beginBlock.Events...)

// Reset the gas meter so that the AnteHandlers aren't required to
Expand All @@ -818,6 +821,8 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
for _, rawTx := range req.Txs {
var response *abci.ExecTxResult

app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithPublishEventManager(sdk.NewPublishEventManager()))

if _, err := app.txDecoder(rawTx); err == nil {
response = app.deliverTx(rawTx)
} else {
Expand All @@ -841,13 +846,22 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
// continue
}

filtered, order := filterOutPublishEvents(response.Events)
response.Events = filtered
txResults = append(txResults, response)
txEventSet = append(txEventSet, EventSet{
AbciEvents: response.Events,
PublishEvents: app.finalizeBlockState.Context().PublishEventManager().Events(),
TrueOrder: order,
})
}

if app.finalizeBlockState.ms.TracingEnabled() {
app.finalizeBlockState.ms = app.finalizeBlockState.ms.SetTracingContext(nil).(storetypes.CacheMultiStore)
}

app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithPublishEventManager(sdk.NewPublishEventManager()))

endBlock, err := app.endBlock(app.finalizeBlockState.Context())
if err != nil {
return nil, err
Expand All @@ -861,9 +875,25 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
// continue
}

publishEvents = append(publishEvents, app.finalizeBlockState.Context().PublishEventManager().Events()...)

events = append(events, endBlock.Events...)
cp := app.GetConsensusParams(app.finalizeBlockState.Context())

events, trueOrder := filterOutPublishEvents(events)
app.flushData = PublishEventFlush{
Height: header.Height,
PrevAppHash: header.AppHash,
BlockEvents: EventSet{
AbciEvents: events,
PublishEvents: publishEvents,
TrueOrder: trueOrder,
},
TxEvents: txEventSet,
}

fmt.Println("events in finalize block", events)

return &abci.FinalizeBlockResponse{
Events: events,
TxResults: txResults,
Expand All @@ -872,6 +902,22 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
}, nil
}

func filterOutPublishEvents(events []abci.Event) ([]abci.Event, []EventType) {
var filteredEvents []abci.Event
var trueOrder []EventType

for _, e := range events {
if e.Type == sdk.PlaceholderEventType {
trueOrder = append(trueOrder, EventTypePublish)
continue
}
filteredEvents = append(filteredEvents, e)
trueOrder = append(trueOrder, EventTypeAbci)
}

return filteredEvents, trueOrder
}

// FinalizeBlock will execute the block proposal provided by FinalizeBlockRequest.
// Specifically, it will execute an application's BeginBlock (if defined), followed
// by the transactions in the proposal, finally followed by the application's
Expand Down Expand Up @@ -959,7 +1005,10 @@ func (app *BaseApp) Commit() (*abci.CommitResponse, error) {
rms.SetCommitHeader(header)
}

app.cms.Commit()
commitId := app.cms.Commit()

app.flushData.NewAppHash = commitId.Hash
app.PublishBlockEvents(app.flushData)

resp := &abci.CommitResponse{
RetainHeight: retainHeight,
Expand Down
6 changes: 3 additions & 3 deletions baseapp/abci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ func TestABCI_CheckTx(t *testing.T) {
anteOpt := func(bapp *baseapp.BaseApp) { bapp.SetAnteHandler(anteHandlerTxTest(t, capKey1, counterKey)) }
suite := NewBaseAppSuite(t, anteOpt)

baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, counterKey})
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, counterKey, false})

nTxs := int64(5)
_, err := suite.baseApp.InitChain(&abci.InitChainRequest{
Expand Down Expand Up @@ -618,7 +618,7 @@ func TestABCI_FinalizeBlock_DeliverTx(t *testing.T) {
require.NoError(t, err)

deliverKey := []byte("deliver-key")
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey})
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey, false})

nBlocks := 3
txPerHeight := 5
Expand Down Expand Up @@ -668,7 +668,7 @@ func TestABCI_FinalizeBlock_MultiMsg(t *testing.T) {
require.NoError(t, err)

deliverKey := []byte("deliver-key")
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey})
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey, false})

deliverKey2 := []byte("deliver-key2")
baseapptestutil.RegisterCounter2Server(suite.baseApp.MsgServiceRouter(), Counter2ServerImpl{t, capKey1, deliverKey2})
Expand Down
5 changes: 5 additions & 0 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ type BaseApp struct {
EnableStreamer bool
StreamEvents chan StreamEvents

EnablePublish bool
PublishEvents chan PublishEventFlush
flushData PublishEventFlush

traceFlightRecorder *metrics.TraceRecorder
}

Expand All @@ -227,6 +231,7 @@ func NewBaseApp(
sigverifyTx: true,
queryGasLimit: math.MaxUint64,
StreamEvents: make(chan StreamEvents),
PublishEvents: make(chan PublishEventFlush),
}

for _, option := range options {
Expand Down
6 changes: 3 additions & 3 deletions baseapp/baseapp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func TestAnteHandlerGasMeter(t *testing.T) {
require.NoError(t, err)

deliverKey := []byte("deliver-key")
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey})
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey, false})

tx := newTxCounter(t, suite.txConfig, 0, 0)
txBytes, err := suite.txConfig.TxEncoder()(tx)
Expand Down Expand Up @@ -562,7 +562,7 @@ func TestBaseAppAnteHandler(t *testing.T) {
suite := NewBaseAppSuite(t, anteOpt)

deliverKey := []byte("deliver-key")
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey})
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey, false})

_, err := suite.baseApp.InitChain(&abci.InitChainRequest{
ConsensusParams: &cmtproto.ConsensusParams{},
Expand Down Expand Up @@ -636,7 +636,7 @@ func TestBaseAppPostHandler(t *testing.T) {
}

suite := NewBaseAppSuite(t, anteOpt)
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, []byte("foo")})
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, []byte("foo"), false})

_, err := suite.baseApp.InitChain(&abci.InitChainRequest{
ConsensusParams: &cmtproto.ConsensusParams{},
Expand Down
3 changes: 2 additions & 1 deletion baseapp/chain_stream.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package baseapp

import (
abci "github.com/cometbft/cometbft/abci/types"
"time"

abci "github.com/cometbft/cometbft/abci/types"
)

type StreamEvents struct {
Expand Down
33 changes: 33 additions & 0 deletions baseapp/publish_event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package baseapp

import (
abci "github.com/cometbft/cometbft/abci/types"
types "github.com/cosmos/cosmos-sdk/types"
)

type EventType byte

const (
EventTypeAbci EventType = iota
EventTypePublish
)

type EventSet struct {
AbciEvents []abci.Event
PublishEvents types.PublishEvents
TrueOrder []EventType
}

type PublishEventFlush struct {
Height int64
PrevAppHash []byte
NewAppHash []byte
BlockEvents EventSet
TxEvents []EventSet
}

func (app *BaseApp) PublishBlockEvents(flush PublishEventFlush) {
if app.EnablePublish {
app.PublishEvents <- flush
}
}
Loading
Loading