Skip to content

Commit 161c948

Browse files
authored
Add remaining state changes (#313)
1 parent 752f1a4 commit 161c948

File tree

18 files changed

+2341
-99
lines changed

18 files changed

+2341
-99
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ require (
2222
github.com/sirupsen/logrus v1.9.3
2323
github.com/spf13/cobra v1.9.1
2424
github.com/spf13/viper v1.20.1
25-
github.com/stellar/go v0.0.0-20250822224526-9397ce4b6da2
25+
github.com/stellar/go v0.0.0-20250903085211-00c0b06cd7cc
2626
github.com/stellar/go-xdr v0.0.0-20231122183749-b53fb00bcac2
2727
github.com/stellar/stellar-rpc v0.9.6-0.20250523162628-6bb9d7a387d5
2828
github.com/stretchr/testify v1.10.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -358,8 +358,8 @@ github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o=
358358
github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
359359
github.com/spf13/viper v1.20.1 h1:ZMi+z/lvLyPSCoNtFCpqjy0S4kPbirhpTMwl8BkW9X4=
360360
github.com/spf13/viper v1.20.1/go.mod h1:P9Mdzt1zoHIG8m2eZQinpiBjo6kCmZSKBClNNqjJvu4=
361-
github.com/stellar/go v0.0.0-20250822224526-9397ce4b6da2 h1:nZEjpYUZ+0cYqZXhYO+QiJcwPQnwTrEs2bWsbRv/Tz8=
362-
github.com/stellar/go v0.0.0-20250822224526-9397ce4b6da2/go.mod h1:ac8hwpljbFXC3Sf9nGfqBXXEvAEdnNRqQHGqP7QN8oY=
361+
github.com/stellar/go v0.0.0-20250903085211-00c0b06cd7cc h1:iRveajcdqBdOlp5U6ZCts9dr1J4VM3LJ8sVnpHYkoVE=
362+
github.com/stellar/go v0.0.0-20250903085211-00c0b06cd7cc/go.mod h1:ac8hwpljbFXC3Sf9nGfqBXXEvAEdnNRqQHGqP7QN8oY=
363363
github.com/stellar/go-xdr v0.0.0-20231122183749-b53fb00bcac2 h1:OzCVd0SV5qE3ZcDeSFCmOWLZfEWZ3Oe8KtmSOYKEVWE=
364364
github.com/stellar/go-xdr v0.0.0-20231122183749-b53fb00bcac2/go.mod h1:yoxyU/M8nl9LKeWIoBrbDPQ7Cy+4jxRcWcOayZ4BMps=
365365
github.com/stellar/stellar-rpc v0.9.6-0.20250523162628-6bb9d7a387d5 h1:V+XezRLVHuk6c1nMkXkWjCwtoHN7F+DK86dK2kkNSZo=

internal/db/migrations/2025-06-10.4-create_indexer_table_state_changes.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ CREATE TABLE state_changes (
2525
deployer_account_id TEXT,
2626
funder_account_id TEXT,
2727
thresholds JSONB,
28+
trustline_limit JSONB,
2829
PRIMARY KEY (to_id, state_change_order)
2930
);
3031

internal/indexer/indexer.go

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
operation_processor "github.com/stellar/go/processors/operation"
1111

1212
"github.com/stellar/wallet-backend/internal/indexer/processors"
13+
contract_processors "github.com/stellar/wallet-backend/internal/indexer/processors/contracts"
1314
"github.com/stellar/wallet-backend/internal/indexer/types"
1415
)
1516

@@ -37,23 +38,26 @@ type ParticipantsProcessorInterface interface {
3738

3839
type OperationProcessorInterface interface {
3940
ProcessOperation(ctx context.Context, opWrapper *operation_processor.TransactionOperationWrapper) ([]types.StateChange, error)
41+
Name() string
4042
}
4143

4244
type Indexer struct {
43-
Buffer IndexerBufferInterface
44-
participantsProcessor ParticipantsProcessorInterface
45-
tokenTransferProcessor TokenTransferProcessorInterface
46-
effectsProcessor OperationProcessorInterface
47-
contractDeployProcessor OperationProcessorInterface
45+
Buffer IndexerBufferInterface
46+
participantsProcessor ParticipantsProcessorInterface
47+
tokenTransferProcessor TokenTransferProcessorInterface
48+
processors []OperationProcessorInterface
4849
}
4950

50-
func NewIndexer(networkPassphrase string) *Indexer {
51+
func NewIndexer(networkPassphrase string, ledgerEntryProvider processors.LedgerEntryProvider) *Indexer {
5152
return &Indexer{
52-
Buffer: NewIndexerBuffer(),
53-
participantsProcessor: processors.NewParticipantsProcessor(networkPassphrase),
54-
tokenTransferProcessor: processors.NewTokenTransferProcessor(networkPassphrase),
55-
effectsProcessor: processors.NewEffectsProcessor(networkPassphrase),
56-
contractDeployProcessor: processors.NewContractDeployProcessor(networkPassphrase),
53+
Buffer: NewIndexerBuffer(),
54+
participantsProcessor: processors.NewParticipantsProcessor(networkPassphrase),
55+
tokenTransferProcessor: processors.NewTokenTransferProcessor(networkPassphrase),
56+
processors: []OperationProcessorInterface{
57+
processors.NewEffectsProcessor(networkPassphrase, ledgerEntryProvider),
58+
processors.NewContractDeployProcessor(networkPassphrase),
59+
contract_processors.NewSACEventsProcessor(networkPassphrase),
60+
},
5761
}
5862
}
5963

@@ -80,7 +84,6 @@ func (i *Indexer) ProcessTransaction(ctx context.Context, transaction ingest.Led
8084
return fmt.Errorf("getting operations participants: %w", err)
8185
}
8286
var dataOp *types.Operation
83-
var effectsStateChanges, contractDeployStateChanges []types.StateChange
8487
for opID, opParticipants := range opsParticipants {
8588
dataOp, err = processors.ConvertOperation(&transaction, &opParticipants.OpWrapper.Operation, opID)
8689
if err != nil {
@@ -91,19 +94,13 @@ func (i *Indexer) ProcessTransaction(ctx context.Context, transaction ingest.Led
9194
i.Buffer.PushParticipantOperation(participant, *dataOp, *dataTx)
9295
}
9396

94-
// 2.1. Index effects state changes
95-
effectsStateChanges, err = i.effectsProcessor.ProcessOperation(ctx, opParticipants.OpWrapper)
96-
if err != nil {
97-
return fmt.Errorf("processing effects state changes: %w", err)
98-
}
99-
i.Buffer.PushStateChanges(effectsStateChanges)
100-
101-
// 2.2. Index contract deploy state changes
102-
contractDeployStateChanges, err = i.contractDeployProcessor.ProcessOperation(ctx, opParticipants.OpWrapper)
103-
if err != nil && !errors.Is(err, processors.ErrInvalidOpType) {
104-
return fmt.Errorf("processing contract deploy state changes: %w", err)
97+
for _, processor := range i.processors {
98+
stateChanges, processorErr := processor.ProcessOperation(ctx, opParticipants.OpWrapper)
99+
if processorErr != nil && !errors.Is(processorErr, processors.ErrInvalidOpType) {
100+
return fmt.Errorf("processing %s state changes: %w", processor.Name(), processorErr)
101+
}
102+
i.Buffer.PushStateChanges(stateChanges)
105103
}
106-
i.Buffer.PushStateChanges(contractDeployStateChanges)
107104
}
108105

109106
// 3. Index token transfer state changes

internal/indexer/indexer_test.go

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -88,14 +88,14 @@ var (
8888
func TestIndexer_ProcessTransaction(t *testing.T) {
8989
tests := []struct {
9090
name string
91-
setupMocks func(*MockParticipantsProcessor, *MockTokenTransferProcessor, *MockOperationProcessor, *MockOperationProcessor, *MockIndexerBuffer)
91+
setupMocks func(*MockParticipantsProcessor, *MockTokenTransferProcessor, *MockOperationProcessor, *MockOperationProcessor, *MockOperationProcessor, *MockIndexerBuffer)
9292
wantError string
9393
txParticipants set.Set[string]
9494
opsParticipants map[int64]processors.OperationParticipants
9595
}{
9696
{
9797
name: "🟢 successful processing with participants",
98-
setupMocks: func(mockParticipants *MockParticipantsProcessor, mockTokenTransfer *MockTokenTransferProcessor, mockEffects *MockOperationProcessor, mockContractDeploy *MockOperationProcessor, mockBuffer *MockIndexerBuffer) {
98+
setupMocks: func(mockParticipants *MockParticipantsProcessor, mockTokenTransfer *MockTokenTransferProcessor, mockEffects *MockOperationProcessor, mockContractDeploy *MockOperationProcessor, mockSACEventsProcessor *MockOperationProcessor, mockBuffer *MockIndexerBuffer) {
9999
participants := set.NewSet("alice", "bob")
100100
mockParticipants.On("GetTransactionParticipants", mock.Anything).Return(participants, nil)
101101

@@ -116,11 +116,14 @@ func TestIndexer_ProcessTransaction(t *testing.T) {
116116
mockTokenTransfer.On("ProcessTransaction", mock.Anything, mock.Anything).Return(tokenStateChanges, nil)
117117

118118
effectsStateChanges := []types.StateChange{{ToID: 2, StateChangeOrder: 1}}
119-
mockEffects.On("ProcessOperation", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(effectsStateChanges, nil)
119+
mockEffects.On("ProcessOperation", mock.Anything, mock.Anything).Return(effectsStateChanges, nil)
120120

121121
contractDeployStateChanges := []types.StateChange{{ToID: 3, StateChangeOrder: 1}}
122122
mockContractDeploy.On("ProcessOperation", mock.Anything, mock.Anything).Return(contractDeployStateChanges, nil)
123123

124+
contractSACEventsStateChanges := []types.StateChange{{ToID: 4, StateChangeOrder: 1}}
125+
mockSACEventsProcessor.On("ProcessOperation", mock.Anything, mock.Anything).Return(contractSACEventsStateChanges, nil)
126+
124127
// Verify transaction was pushed to buffer with correct participants
125128
// PushParticipantTransaction is called once for each participant
126129
mockBuffer.On("PushParticipantTransaction", "alice",
@@ -157,6 +160,10 @@ func TestIndexer_ProcessTransaction(t *testing.T) {
157160
mock.MatchedBy(func(stateChanges []types.StateChange) bool {
158161
return len(stateChanges) == 1 && stateChanges[0].ToID == 3 && stateChanges[0].StateChangeOrder == 1
159162
})).Return()
163+
mockBuffer.On("PushStateChanges",
164+
mock.MatchedBy(func(stateChanges []types.StateChange) bool {
165+
return len(stateChanges) == 1 && stateChanges[0].ToID == 4 && stateChanges[0].StateChangeOrder == 1
166+
})).Return()
160167
},
161168
txParticipants: set.NewSet("alice", "bob"),
162169
opsParticipants: map[int64]processors.OperationParticipants{
@@ -173,7 +180,7 @@ func TestIndexer_ProcessTransaction(t *testing.T) {
173180
},
174181
{
175182
name: "🟢 successful processing without participants",
176-
setupMocks: func(mockParticipants *MockParticipantsProcessor, mockTokenTransfer *MockTokenTransferProcessor, mockEffects *MockOperationProcessor, mockContractDeploy *MockOperationProcessor, mockBuffer *MockIndexerBuffer) {
183+
setupMocks: func(mockParticipants *MockParticipantsProcessor, mockTokenTransfer *MockTokenTransferProcessor, mockEffects *MockOperationProcessor, mockContractDeploy *MockOperationProcessor, mockSACEventsProcessor *MockOperationProcessor, mockBuffer *MockIndexerBuffer) {
177184
participants := set.NewSet[string]()
178185
mockParticipants.On("GetTransactionParticipants", mock.Anything).Return(participants, nil)
179186

@@ -196,14 +203,14 @@ func TestIndexer_ProcessTransaction(t *testing.T) {
196203
},
197204
{
198205
name: "🔴 error getting transaction participants",
199-
setupMocks: func(mockParticipants *MockParticipantsProcessor, mockTokenTransfer *MockTokenTransferProcessor, mockEffects *MockOperationProcessor, mockContractDeploy *MockOperationProcessor, mockBuffer *MockIndexerBuffer) {
206+
setupMocks: func(mockParticipants *MockParticipantsProcessor, mockTokenTransfer *MockTokenTransferProcessor, mockEffects *MockOperationProcessor, mockContractDeploy *MockOperationProcessor, mockSACEventsProcessor *MockOperationProcessor, mockBuffer *MockIndexerBuffer) {
200207
mockParticipants.On("GetTransactionParticipants", mock.Anything).Return(set.NewSet[string](), errors.New("participant error"))
201208
},
202209
wantError: "getting transaction participants: participant error",
203210
},
204211
{
205212
name: "🔴 error getting operations participants",
206-
setupMocks: func(mockParticipants *MockParticipantsProcessor, mockTokenTransfer *MockTokenTransferProcessor, mockEffects *MockOperationProcessor, mockContractDeploy *MockOperationProcessor, mockBuffer *MockIndexerBuffer) {
213+
setupMocks: func(mockParticipants *MockParticipantsProcessor, mockTokenTransfer *MockTokenTransferProcessor, mockEffects *MockOperationProcessor, mockContractDeploy *MockOperationProcessor, mockSACEventsProcessor *MockOperationProcessor, mockBuffer *MockIndexerBuffer) {
207214
participants := set.NewSet[string]()
208215
mockParticipants.On("GetTransactionParticipants", mock.Anything).Return(participants, nil)
209216
mockParticipants.On("GetOperationsParticipants", mock.Anything).Return(map[int64]processors.OperationParticipants{}, errors.New("operations error"))
@@ -212,7 +219,7 @@ func TestIndexer_ProcessTransaction(t *testing.T) {
212219
},
213220
{
214221
name: "🔴 error processing effects state changes",
215-
setupMocks: func(mockParticipants *MockParticipantsProcessor, mockTokenTransfer *MockTokenTransferProcessor, mockEffects *MockOperationProcessor, mockContractDeploy *MockOperationProcessor, mockBuffer *MockIndexerBuffer) {
222+
setupMocks: func(mockParticipants *MockParticipantsProcessor, mockTokenTransfer *MockTokenTransferProcessor, mockEffects *MockOperationProcessor, mockContractDeploy *MockOperationProcessor, mockSACEventsProcessor *MockOperationProcessor, mockBuffer *MockIndexerBuffer) {
216223
participants := set.NewSet[string]()
217224
mockParticipants.On("GetTransactionParticipants", mock.Anything).Return(participants, nil)
218225

@@ -231,12 +238,13 @@ func TestIndexer_ProcessTransaction(t *testing.T) {
231238

232239
mockBuffer.On("PushParticipantOperation", mock.Anything, mock.Anything, mock.Anything).Return()
233240
mockEffects.On("ProcessOperation", mock.Anything, mock.Anything).Return([]types.StateChange{}, errors.New("effects error"))
241+
mockEffects.On("Name").Return("effects")
234242
},
235243
wantError: "processing effects state changes: effects error",
236244
},
237245
{
238246
name: "🔴 error processing token transfer state changes",
239-
setupMocks: func(mockParticipants *MockParticipantsProcessor, mockTokenTransfer *MockTokenTransferProcessor, mockEffects *MockOperationProcessor, mockContractDeploy *MockOperationProcessor, mockBuffer *MockIndexerBuffer) {
247+
setupMocks: func(mockParticipants *MockParticipantsProcessor, mockTokenTransfer *MockTokenTransferProcessor, mockEffects *MockOperationProcessor, mockContractDeploy *MockOperationProcessor, mockSACEventsProcessor *MockOperationProcessor, mockBuffer *MockIndexerBuffer) {
240248
participants := set.NewSet[string]()
241249
mockParticipants.On("GetTransactionParticipants", mock.Anything).Return(participants, nil)
242250

@@ -256,18 +264,18 @@ func TestIndexer_ProcessTransaction(t *testing.T) {
256264
mockTokenTransfer := &MockTokenTransferProcessor{}
257265
mockEffects := &MockOperationProcessor{}
258266
mockContractDeploy := &MockOperationProcessor{}
267+
mockSACEventsProcessor := &MockOperationProcessor{}
259268
mockBuffer := &MockIndexerBuffer{}
260269

261270
// Setup mock expectations
262-
tt.setupMocks(mockParticipants, mockTokenTransfer, mockEffects, mockContractDeploy, mockBuffer)
271+
tt.setupMocks(mockParticipants, mockTokenTransfer, mockEffects, mockContractDeploy, mockSACEventsProcessor, mockBuffer)
263272

264273
// Create testable indexer with mocked dependencies
265274
indexer := &Indexer{
266-
Buffer: mockBuffer,
267-
participantsProcessor: mockParticipants,
268-
tokenTransferProcessor: mockTokenTransfer,
269-
effectsProcessor: mockEffects,
270-
contractDeployProcessor: mockContractDeploy,
275+
Buffer: mockBuffer,
276+
participantsProcessor: mockParticipants,
277+
tokenTransferProcessor: mockTokenTransfer,
278+
processors: []OperationProcessorInterface{mockEffects, mockContractDeploy, mockSACEventsProcessor},
271279
}
272280

273281
err := indexer.ProcessTransaction(context.Background(), testTx)
@@ -305,6 +313,8 @@ func TestIndexer_ProcessTransaction(t *testing.T) {
305313
mockParticipants.AssertExpectations(t)
306314
mockTokenTransfer.AssertExpectations(t)
307315
mockEffects.AssertExpectations(t)
316+
mockContractDeploy.AssertExpectations(t)
317+
mockSACEventsProcessor.AssertExpectations(t)
308318
mockBuffer.AssertExpectations(t)
309319
})
310320
}

internal/indexer/mocks.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ func (m *MockOperationProcessor) ProcessOperation(ctx context.Context, opWrapper
4545
return args.Get(0).([]types.StateChange), args.Error(1)
4646
}
4747

48+
func (m *MockOperationProcessor) Name() string {
49+
args := m.Called()
50+
return args.String(0)
51+
}
52+
4853
type MockIndexerBuffer struct {
4954
mock.Mock
5055
}

internal/indexer/processors/contract_deploy.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ func NewContractDeployProcessor(networkPassphrase string) *ContractDeployProcess
1919
return &ContractDeployProcessor{networkPassphrase: networkPassphrase}
2020
}
2121

22+
func (p *ContractDeployProcessor) Name() string {
23+
return "contract_deploy"
24+
}
25+
2226
// ProcessOperation emits a state change for each contract deployment (including subinvocations).
2327
func (p *ContractDeployProcessor) ProcessOperation(_ context.Context, op *operation_processor.TransactionOperationWrapper) ([]types.StateChange, error) {
2428
if op.OperationType() != xdr.OperationTypeInvokeHostFunction {

0 commit comments

Comments
 (0)