diff --git a/CHANGELOG.md b/CHANGELOG.md index 0312964..eb69ebc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Unreleased + +* Fixed: flush remaining rows on terminating. If you previously loaded historic data that didn't match the flush interval for the stop block, you might be missing data. + ## v4.5.0 * Added more flags to configure flushing intervals. diff --git a/sinker/sinker.go b/sinker/sinker.go index 89b0ca9..7f7f93b 100644 --- a/sinker/sinker.go +++ b/sinker/sinker.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "strings" + "sync" "time" "github.com/streamingfast/logging" @@ -27,6 +28,10 @@ type SQLSinker struct { logger *zap.Logger tracer logging.Tracer + lastSeenCursor *sink.Cursor + lastSeenFinalBlock uint64 + flushMutex *sync.Mutex + stats *Stats } @@ -39,6 +44,8 @@ func New(sink *sink.Sinker, loader *db.Loader, logger *zap.Logger, tracer loggin logger: logger, tracer: tracer, + flushMutex: &sync.Mutex{}, + stats: NewStats(logger), }, nil } @@ -65,14 +72,19 @@ func (s *SQLSinker) Run(ctx context.Context) { } } - s.Sinker.OnTerminating(s.Shutdown) - s.OnTerminating(func(err error) { + s.Sinker.OnTerminating(func(err error) { + if err == nil { + s.logger.Info("sql sinker terminating, flushing remaining rows") + err = s.flush(ctx) + } s.stats.LogNow() - s.logger.Info("sql sinker terminating", zap.Stringer("last_block_written", s.stats.lastBlock)) + s.logger.Info("sql sinker terminated", zap.Stringer("last_block_written", s.stats.lastBlock), zap.Error(err)) + s.Shutdown(err) + }) + s.OnTerminating(func(err error) { s.Sinker.Shutdown(err) + s.stats.Close() }) - - s.OnTerminating(func(_ error) { s.stats.Close() }) s.stats.OnTerminated(func(err error) { s.Shutdown(err) }) logEach := 15 * time.Second @@ -113,11 +125,18 @@ func (s *SQLSinker) HandleBlockScopedData(ctx context.Context, data *pbsubstream return fmt.Errorf("unmarshal database changes: %w", err) } - if err := s.applyDatabaseChanges(dbChanges, data.Clock.Number, data.FinalBlockHeight); err != nil { + // We lock here to ensure that blocks are always fully processed and state is updated before any flush can happen. + s.flushMutex.Lock() + if err := s.applyDatabaseChanges(dbChanges, data.Clock.Number, data.FinalBlockHeight); err != nil { + s.flushMutex.Unlock() return fmt.Errorf("apply database changes: %w", err) } } + s.lastSeenCursor = cursor + s.lastSeenFinalBlock = data.FinalBlockHeight + s.flushMutex.Unlock() + if (s.batchBlockModulo(isLive) > 0 && data.Clock.Number%s.batchBlockModulo(isLive) == 0) || s.loader.FlushNeeded() { s.logger.Debug("flushing to database", zap.Stringer("block", cursor.Block()), @@ -125,31 +144,45 @@ func (s *SQLSinker) HandleBlockScopedData(ctx context.Context, data *pbsubstream zap.Bool("block_flush_interval_reached", s.batchBlockModulo(isLive) > 0 && data.Clock.Number%s.batchBlockModulo(isLive) == 0), zap.Bool("row_flush_interval_reached", s.loader.FlushNeeded()), ) + return s.flush(ctx) + } - flushStart := time.Now() - rowFlushedCount, err := s.loader.Flush(ctx, s.OutputModuleHash(), cursor, data.FinalBlockHeight) - if err != nil { - return fmt.Errorf("failed to flush at block %s: %w", cursor.Block(), err) - } + return nil +} - flushDuration := time.Since(flushStart) - if flushDuration > 5*time.Second { - level := zap.InfoLevel - if flushDuration > 30*time.Second { - level = zap.WarnLevel - } +func (s *SQLSinker) flush(ctx context.Context) error { - s.logger.Check(level, "flush to database took a long time to complete, could cause long sync time along the road").Write(zap.Duration("took", flushDuration)) - } + // we haven't received any data yet, so nothing to do here + if s.lastSeenCursor == nil { + return nil + } + + s.flushMutex.Lock() + defer s.flushMutex.Unlock() - FlushCount.Inc() - FlushedRowsCount.AddInt(rowFlushedCount) - FlushDuration.AddInt64(flushDuration.Nanoseconds()) + flushStart := time.Now() + rowFlushedCount, err := s.loader.Flush(ctx, s.OutputModuleHash(), s.lastSeenCursor, s.lastSeenFinalBlock) + if err != nil { + return fmt.Errorf("failed to flush at block %s: %w", s.lastSeenCursor.Block(), err) + } + + flushDuration := time.Since(flushStart) + if flushDuration > 5*time.Second { + level := zap.InfoLevel + if flushDuration > 30*time.Second { + level = zap.WarnLevel + } - s.stats.RecordBlock(cursor.Block()) - s.stats.RecordFlushDuration(flushDuration) + s.logger.Check(level, "flush to database took a long time to complete, could cause long sync time along the road").Write(zap.Duration("took", flushDuration)) } + FlushCount.Inc() + FlushedRowsCount.AddInt(rowFlushedCount) + FlushDuration.AddInt64(flushDuration.Nanoseconds()) + + s.stats.RecordBlock(s.lastSeenCursor.Block()) + s.stats.RecordFlushDuration(flushDuration) + return nil }