Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions cmd/stellar-rpc/internal/db/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@
InsertEvents(lcm xdr.LedgerCloseMeta) error
}

// EventOrder represents the order in which events are returned
type EventOrder string

const (
// EventOrderAsc returns events in ascending order (oldest first)
EventOrderAsc EventOrder = "asc"
// EventOrderDesc returns events in descending order (newest first)
EventOrderDesc EventOrder = "desc"
)

// EventReader has all the public methods to fetch events from DB
type EventReader interface {
GetEvents(
Expand All @@ -38,6 +48,7 @@
contractIDs [][]byte,
topics NestedTopicArray,
eventTypes []int,
order EventOrder,
f ScanFunction,
) error
}
Expand Down Expand Up @@ -292,7 +303,7 @@

// GetEvents applies f on all the events occurring in the given range with
// specified contract IDs if provided. The events are returned in sorted
// ascending Cursor order.
// order based on the order parameter (ascending or descending).
//
// If f returns false, the scan terminates early (f will not be applied on
// remaining events in the range).
Expand All @@ -304,16 +315,23 @@
contractIDs [][]byte,
topics NestedTopicArray,
eventTypes []int,
order EventOrder,
scanner ScanFunction,
) error {
start := time.Now()

// Determine sort order
orderDirection := "ASC"
if order == EventOrderDesc {
orderDirection = "DESC"
}

rowQ := sq.
Select("id", "event_data", "transaction_hash", "ledger_close_time").
From(eventTableName).
Where(sq.GtOrEq{"id": cursorRange.Start.String()}).
Where(sq.Lt{"id": cursorRange.End.String()}).
OrderBy("id ASC")
OrderBy("id " + orderDirection)

if len(contractIDs) > 0 {
rowQ = rowQ.Where(sq.Eq{"contract_id": contractIDs})
Expand Down Expand Up @@ -444,7 +462,7 @@
writer: &eventHandler{
log: logger,
db: db,
stmtCache: sq.NewStmtCache(db.GetTx()),

Check failure on line 465 in cmd/stellar-rpc/internal/db/event.go

View workflow job for this annotation

GitHub Actions / golangci-lint

db.GetTx undefined (type *DB has no field or method GetTx) (typecheck)
passphrase: passphrase,
},
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/stellar-rpc/internal/db/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,6 @@ func TestInsertEvents(t *testing.T) {
end := protocol.Cursor{Ledger: 100}
cursorRange := protocol.CursorRange{Start: start, End: end}

err = eventReader.GetEvents(ctx, cursorRange, nil, nil, nil, nil)
err = eventReader.GetEvents(ctx, cursorRange, nil, nil, nil, EventOrderAsc, nil)
require.NoError(t, err)
}
2 changes: 1 addition & 1 deletion cmd/stellar-rpc/internal/db/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func TestTransactionFound(t *testing.T) {
end := protocol.Cursor{Ledger: 1000}
cursorRange := protocol.CursorRange{Start: start, End: end}

err = eventReader.GetEvents(ctx, cursorRange, nil, nil, nil, nil)
err = eventReader.GetEvents(ctx, cursorRange, nil, nil, nil, EventOrderAsc, nil)
require.NoError(t, err)

// check all 200 cases
Expand Down
118 changes: 95 additions & 23 deletions cmd/stellar-rpc/internal/methods/get_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,30 +117,75 @@ func (h eventsRPCHandler) getEvents(ctx context.Context, request protocol.GetEve
}
}

start := protocol.Cursor{Ledger: request.StartLedger}
order := protocol.EventOrderAsc
if request.Pagination != nil && request.Pagination.Order != "" {
order = request.Pagination.Order
}
isDescending := order == protocol.EventOrderDesc

limit := h.defaultLimit
if request.Pagination != nil {
if request.Pagination.Cursor != nil {
if request.Pagination != nil && request.Pagination.Limit > 0 {
limit = request.Pagination.Limit
}

// Build cursor range based on order direction
// For ASC: startLedger is lower bound, endLedger is upper bound
// For DESC: startLedger is upper bound, endLedger is lower bound
var cursorRange protocol.CursorRange
var validationLedger uint32 // The ledger to validate against retention window

if isDescending {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not like the complexity that this "feature" introduces for to an already complicated and fragile method. Can you provide a use case where it is valuable to see events in descending order? By their very nature events are meant to be consumed in chronological order.

// DESC order: startLedger is upper bound, scan backwards
// Calculate lower bound
lowerBound := uint32(0)
if request.StartLedger > LedgerScanLimit {
lowerBound = request.StartLedger - LedgerScanLimit
}
// lowerBound should not be before ledger retention window
lowerBound = max(ledgerRange.FirstLedger.Sequence, lowerBound)
if request.EndLedger != 0 {
lowerBound = max(request.EndLedger, lowerBound)
}

// Handle cursor-based pagination for DESC
upperCursor := protocol.Cursor{Ledger: request.StartLedger + 1} // +1 because end is exclusive
if request.Pagination != nil && request.Pagination.Cursor != nil {
upperCursor = *request.Pagination.Cursor
// For descending order, we move backwards from the cursor
if upperCursor.Event > 0 {
upperCursor.Event--
} else {
upperCursor = decrementCursor(upperCursor)
}
}

cursorRange = protocol.CursorRange{
Start: protocol.Cursor{Ledger: lowerBound},
End: upperCursor,
}
validationLedger = request.StartLedger
} else {
// ASC order: startLedger is lower bound, scan forwards (original behavior)
start := protocol.Cursor{Ledger: request.StartLedger}
if request.Pagination != nil && request.Pagination.Cursor != nil {
start = *request.Pagination.Cursor
// increment event index because, when paginating, we start with the
// item right after the cursor
start.Event++
}
if request.Pagination.Limit > 0 {
limit = request.Pagination.Limit

endLedger := start.Ledger + LedgerScanLimit
endLedger = min(ledgerRange.LastLedger.Sequence+1, endLedger)
if request.EndLedger != 0 {
endLedger = min(request.EndLedger, endLedger)
}
}
endLedger := start.Ledger + LedgerScanLimit
// endLedger should not exceed ledger retention window
endLedger = min(ledgerRange.LastLedger.Sequence+1, endLedger)
if request.EndLedger != 0 {
endLedger = min(request.EndLedger, endLedger)
}

end := protocol.Cursor{Ledger: endLedger}
cursorRange := protocol.CursorRange{Start: start, End: end}
cursorRange = protocol.CursorRange{
Start: start,
End: protocol.Cursor{Ledger: endLedger},
}
validationLedger = request.StartLedger
}

if start.Ledger < ledgerRange.FirstLedger.Sequence || start.Ledger > ledgerRange.LastLedger.Sequence {
if validationLedger < ledgerRange.FirstLedger.Sequence || validationLedger > ledgerRange.LastLedger.Sequence {
return protocol.GetEventsResponse{}, &jrpc2.Error{
Code: jrpc2.InvalidRequest,
Message: fmt.Sprintf(
Expand Down Expand Up @@ -179,7 +224,13 @@ func (h eventsRPCHandler) getEvents(ctx context.Context, request protocol.GetEve
return uint(len(found)) < limit
}

err = h.dbReader.GetEvents(ctx, cursorRange, contractIDs, topics, eventTypes, eventScanFunction)
// Convert order to db.EventOrder
dbOrder := db.EventOrderAsc
if isDescending {
dbOrder = db.EventOrderDesc
}

err = h.dbReader.GetEvents(ctx, cursorRange, contractIDs, topics, eventTypes, dbOrder, eventScanFunction)
if err != nil {
return protocol.GetEventsResponse{}, &jrpc2.Error{
Code: jrpc2.InvalidRequest, Message: err.Error(),
Expand Down Expand Up @@ -207,11 +258,15 @@ func (h eventsRPCHandler) getEvents(ctx context.Context, request protocol.GetEve
cursor = lastEvent.ID
} else {
// cursor represents end of the search window if events does not reach limit
// here endLedger is always exclusive when fetching events
// so search window is max Cursor value with endLedger - 1
maxCursor := protocol.MaxCursor
maxCursor.Ledger = endLedger - 1
cursor = maxCursor.String()
if isDescending {
// For descending order, the cursor represents the lower bound of the search window
cursor = cursorRange.Start.String()
} else {
// For ascending order, the cursor represents the upper bound of the search window
maxCursor := protocol.MaxCursor
maxCursor.Ledger = cursorRange.End.Ledger - 1
cursor = maxCursor.String()
}
}

return protocol.GetEventsResponse{
Expand All @@ -225,6 +280,23 @@ func (h eventsRPCHandler) getEvents(ctx context.Context, request protocol.GetEve
}, nil
}

// decrementCursor decrements the cursor to the previous position
func decrementCursor(c protocol.Cursor) protocol.Cursor {
// If we're at the minimum cursor for this ledger, we can't go further back
// The cursor will remain at position 0,0,0 for the ledger
if c.Event == 0 && c.Op == 0 && c.Tx == 0 {
return c
}
// Set to the maximum possible cursor value to capture all earlier events
// This effectively means "everything before this cursor in this ledger"
return protocol.Cursor{
Ledger: c.Ledger,
Tx: c.Tx,
Op: c.Op,
Event: 0, // The DB query will handle the rest with DESC ordering
}
}

func eventInfoForEvent(
event xdr.DiagnosticEvent,
cursor protocol.Cursor,
Expand Down
Loading
Loading