Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions contracts/test/sources/offramp.move
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,12 @@ module test::offramp {
};
event::emit(config_set);

let config_set2 = SourceChainConfigSet {
source_chain_selector: 14767482510784806043,
source_chain_config: config
};
event::emit(config_set2);

let mut off_ramp_object = OffRampObject {
id: object::new(ctx)
};
Expand Down
22 changes: 21 additions & 1 deletion relayer/chainreader/indexer/events_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type EventsIndexerApi interface {
Start(ctx context.Context) error
SyncAllEvents(ctx context.Context) error
SyncEvent(ctx context.Context, selector *client.EventSelector) error
AddEventSelector(ctx context.Context, selector *client.EventSelector) error
Ready() error
Close() error
}
Expand All @@ -57,11 +58,12 @@ func NewEventIndexer(
syncTimeout time.Duration,
) EventsIndexerApi {
dataStore := database.NewDBStore(db, log)
namedLogger := logger.Named(log, "EventsIndexer")

return &EventsIndexer{
db: dataStore,
client: ptbClient,
logger: log,
logger: namedLogger,
pollingInterval: pollingInterval,
syncTimeout: syncTimeout,
eventConfigurations: eventConfigurations,
Expand Down Expand Up @@ -414,6 +416,24 @@ eventLoop:
return nil
}

func (eIndexer *EventsIndexer) AddEventSelector(ctx context.Context, selector *client.EventSelector) error {
if selector == nil {
return fmt.Errorf("unspecified selector for AddEventSelector call")
}

// check if the event selector is already tracked, if not add it to the list
if !eIndexer.isEventSelectorAdded(*selector) {
eIndexer.configMutex.Lock()
// Double-check after acquiring write lock (avoid race with concurrent adds)
if !eIndexer.isEventSelectorAddedLocked(*selector) {
eIndexer.eventConfigurations = append(eIndexer.eventConfigurations, selector)
}
eIndexer.configMutex.Unlock()
}

return nil
}

// IsEventSelectorAdded checks if a specific event selector has already been included in the list of events to sync
func (eIndexer *EventsIndexer) isEventSelectorAdded(eConfig client.EventSelector) bool {
eIndexer.configMutex.RLock()
Expand Down
92 changes: 57 additions & 35 deletions relayer/chainreader/indexer/transactions_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ type TransactionsIndexer struct {
transmitters map[models.SuiAddress]string

// event selectors
eventPackageId string
offrampPackageId string
latestOfframpPackageId string
executionEventModuleKey string
executionEventKey string
configEventModuleKey string
Expand All @@ -45,14 +46,14 @@ type TransactionsIndexer struct {
// configs
eventConfigs map[string]*config.ChainReaderEvent

mu sync.RWMutex
eventPkgReady chan struct{}
eventPkgOnce sync.Once
mu sync.RWMutex
offrampPackageIdReady chan struct{}
offrampPackageOnce sync.Once
}

type TransactionsIndexerApi interface {
Start(ctx context.Context) error
SetOffRampPackage(pkg string)
SetOffRampPackage(pkg string, latestPkg string)
Ready() error
Close() error
}
Expand All @@ -78,9 +79,9 @@ func NewTransactionsIndexer(
executionEventKey: "ExecutionStateChanged",
configEventModuleKey: "ocr3_base",
configEventKey: "ConfigSet",
executeFunctions: []string{"finish_execute"},
executeFunctions: []string{"init_execute", "ccip_receive", "release_or_mint", "finish_execute"},
eventConfigs: eventConfigs,
eventPkgReady: make(chan struct{}),
offrampPackageIdReady: make(chan struct{}),
}
}

Expand Down Expand Up @@ -123,28 +124,27 @@ func (tIndexer *TransactionsIndexer) Start(ctx context.Context) error {
}

// SetOffRampPackage sets offramp called by chainreader Bind.
func (t *TransactionsIndexer) SetOffRampPackage(pkg string) {
func (t *TransactionsIndexer) SetOffRampPackage(pkg string, latestPkg string) {
if pkg == "" {
t.logger.Warn("SetOffRampPackage called with empty package id")
return
}
t.mu.Lock()
old := t.eventPackageId
t.eventPackageId = pkg
t.offrampPackageId = pkg
t.latestOfframpPackageId = latestPkg
t.mu.Unlock()

if old != pkg {
t.logger.Infow("OffRamp package set", "old", old, "new", pkg)
}
t.eventPkgOnce.Do(func() { close(t.eventPkgReady) })
t.logger.Infow("OffRamp package set", "offrampPackageId", pkg, "latestOfframpPackageId", latestPkg)

t.offrampPackageOnce.Do(func() { close(t.offrampPackageIdReady) })
}

// waitForOffRampPackage blocks until the OffRamp package ID is available or the
// provided context is canceled.
func (t *TransactionsIndexer) waitForOffRampPackage(ctx context.Context) (string, error) {
t.mu.RLock()
pkg := t.eventPackageId
ch := t.eventPkgReady
pkg := t.offrampPackageId
ch := t.offrampPackageIdReady
t.mu.RUnlock()
if pkg != "" {
return pkg, nil
Expand All @@ -153,7 +153,7 @@ func (t *TransactionsIndexer) waitForOffRampPackage(ctx context.Context) (string
select {
case <-ch:
t.mu.RLock()
pkg = t.eventPackageId
pkg = t.offrampPackageId
t.mu.RUnlock()
if pkg == "" {
return "", fmt.Errorf("package ready signaled but empty")
Expand Down Expand Up @@ -270,7 +270,7 @@ func (tIndexer *TransactionsIndexer) syncTransmitterTransactions(ctx context.Con
cursor := tIndexer.transmitters[transmitter]
totalProcessed := 0

eventAccountAddress, err := tIndexer.getEventPackageIdFromConfig()
eventAccountAddress, latestOfframpPackageId, err := tIndexer.getEventPackageIdFromConfig()
if err != nil {
return 0, fmt.Errorf("failed to get ExecutionStateChanged event config: %w", err)
}
Expand Down Expand Up @@ -329,28 +329,49 @@ func (tIndexer *TransactionsIndexer) syncTransmitterTransactions(ctx context.Con
continue
}

if moveAbort.Location.Module.Address != tIndexer.eventPackageId {
tIndexer.logger.Debugw("Skipping transaction with different package address",
"transmitter", transmitter, "packageAddress", moveAbort.Location.Module.Address)
tIndexer.logger.Debugw("Extracted move abort from failed transaction", "moveAbort", moveAbort, "digest", transactionRecord.Digest)

continue
}
includesValidPackage := false
includesValidModule := false

if moveAbort.Location.Module.Name != moduleKey {
tIndexer.logger.Debugw("Skipping transaction with different module",
"transmitter", transmitter, "module", moveAbort.Location.Module.Name)
// Check if any of the transaction's commands match with the expected (offramp) package and module
for _, raw := range transactionRecord.Transaction.Data.Transaction.Transactions {
if moveCall := models.MoveCall(raw); moveCall != nil {
packageID := moveCall.Package
moduleName := moveCall.Module

if (packageID == eventAccountAddress || packageID == latestOfframpPackageId) && moduleName == tIndexer.executionEventModuleKey {
includesValidPackage = true
includesValidModule = true
break
}
}
}

// NOTE: The check below does not guarantee that a malicious (known) transmitter is not sending a failed PTB
// with the expected package and module. However, it is considered as the worst case scenario simply involves
// creating an event record with a failure state against an digest that is not checked.
if !(includesValidPackage && includesValidModule) {
tIndexer.logger.Warnw(
"Expected package and module not found in commands of failed PTB originating from known transmitter",
"transmitter", transmitter,
"digest", transactionRecord.Digest,
)
continue
}

if moveAbort.Location.FunctionName == nil || !slices.Contains(tIndexer.executeFunctions, *moveAbort.Location.FunctionName) {
tIndexer.logger.Debugw("Skipping transaction for non-execute function",
"transmitter", transmitter, "location", moveAbort.Location)
tIndexer.logger.Debugw("Skipping transaction for failed function against a non-configured function name",
"transmitter", transmitter,
"location", moveAbort.Location,
"functionName", *moveAbort.Location.FunctionName,
"digest", transactionRecord.Digest,
)

continue
}

// we always get the report from the init_execute function call (index 0), the "finish_execute" function call
// We always get the report from the init_execute function call (index 0), the "finish_execute" function call
// does not contain an argument which contains the report
// NOTE: we assume that init_execute (which contains the report) is always the first command in the PTB
commandIndex := uint64(0)
Expand Down Expand Up @@ -504,7 +525,7 @@ func (tIndexer *TransactionsIndexer) getTransmitters(ctx context.Context) ([]mod
eventKey = tIndexer.configEventKey
)

eventAccountAddress, err := tIndexer.getEventPackageIdFromConfig()
eventAccountAddress, _, err := tIndexer.getEventPackageIdFromConfig()
if err != nil {
tIndexer.logger.Errorw("Failed to get OCRConfigSet event config", "error", err)
return nil, err
Expand Down Expand Up @@ -564,7 +585,7 @@ func (tIndexer *TransactionsIndexer) getSourceChainConfig(ctx context.Context, s
selector = "sourceChainSelector"
)

eventAccountAddress, err := tIndexer.getEventPackageIdFromConfig()
eventAccountAddress, _, err := tIndexer.getEventPackageIdFromConfig()
if err != nil {
return nil, fmt.Errorf("failed to get SourceChainConfigSet event config: %w", err)
}
Expand Down Expand Up @@ -607,15 +628,16 @@ func (tIndexer *TransactionsIndexer) getSourceChainConfig(ctx context.Context, s
}

// Prefer the cached OffRamp package
func (t *TransactionsIndexer) getEventPackageIdFromConfig() (string, error) {
func (t *TransactionsIndexer) getEventPackageIdFromConfig() (string, string, error) {
t.mu.RLock()
pkg := t.eventPackageId
pkg := t.offrampPackageId
latestPkg := t.latestOfframpPackageId
t.mu.RUnlock()

if pkg != "" {
return pkg, nil
return pkg, latestPkg, nil
}
return "", fmt.Errorf("offramp package not set yet")
return "", "", fmt.Errorf("offramp package not set yet")
}

// ModuleId represents Move’s ModuleId { address, name }
Expand Down
Loading
Loading