Skip to content

Commit e7f90cf

Browse files
committed
fix flushing on stream completion
1 parent 90756a2 commit e7f90cf

File tree

1 file changed

+12
-0
lines changed

1 file changed

+12
-0
lines changed

sinker/sinker.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,18 @@ func (s *SQLSinker) applyDatabaseChanges(dbChanges *pbdatabase.DatabaseChanges,
211211
return nil
212212
}
213213

214+
215+
func (s *SQLSinker) HandleBlockRangeCompletion(ctx context.Context, cursor *sink.Cursor) error {
216+
217+
s.logger.Info("stream completed, flushing to database", zap.Stringer("block", cursor.Block()))
218+
_, err := s.loader.Flush(ctx, s.OutputModuleHash(), cursor, cursor.Block().Num())
219+
if err != nil {
220+
return fmt.Errorf("failed to flush %s block on completion: %w", cursor.Block(), err)
221+
}
222+
223+
return nil
224+
}
225+
214226
func (s *SQLSinker) HandleBlockUndoSignal(ctx context.Context, data *pbsubstreamsrpc.BlockUndoSignal, cursor *sink.Cursor) error {
215227
return s.loader.Revert(ctx, s.OutputModuleHash(), cursor, data.LastValidBlock.Number)
216228
}

0 commit comments

Comments
 (0)