Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

* Fixed: flush remaining rows on terminating. If you previously loaded historic data that didn't match the flush interval for the stop block, you might be missing data.

## v4.5.0

* Added more flags to configure flushing intervals.
Expand Down
81 changes: 57 additions & 24 deletions sinker/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/streamingfast/logging"
Expand All @@ -27,6 +28,10 @@ type SQLSinker struct {
logger *zap.Logger
tracer logging.Tracer

lastSeenCursor *sink.Cursor
lastSeenFinalBlock uint64
flushMutex *sync.Mutex

stats *Stats
}

Expand All @@ -39,6 +44,8 @@ func New(sink *sink.Sinker, loader *db.Loader, logger *zap.Logger, tracer loggin
logger: logger,
tracer: tracer,

flushMutex: &sync.Mutex{},

stats: NewStats(logger),
}, nil
}
Expand All @@ -65,14 +72,19 @@ func (s *SQLSinker) Run(ctx context.Context) {
}
}

s.Sinker.OnTerminating(s.Shutdown)
s.OnTerminating(func(err error) {
s.Sinker.OnTerminating(func(err error) {
if err == nil {
s.logger.Info("sql sinker terminating, flushing remaining rows")
err = s.flush(ctx)
}
s.stats.LogNow()
s.logger.Info("sql sinker terminating", zap.Stringer("last_block_written", s.stats.lastBlock))
s.logger.Info("sql sinker terminated", zap.Stringer("last_block_written", s.stats.lastBlock), zap.Error(err))
s.Shutdown(err)
})
s.OnTerminating(func(err error) {
s.Sinker.Shutdown(err)
s.stats.Close()
})

s.OnTerminating(func(_ error) { s.stats.Close() })
s.stats.OnTerminated(func(err error) { s.Shutdown(err) })

logEach := 15 * time.Second
Expand Down Expand Up @@ -113,43 +125,64 @@ func (s *SQLSinker) HandleBlockScopedData(ctx context.Context, data *pbsubstream
return fmt.Errorf("unmarshal database changes: %w", err)
}

if err := s.applyDatabaseChanges(dbChanges, data.Clock.Number, data.FinalBlockHeight); err != nil {
// We lock here to ensure that blocks are always fully processed and state is updated before any flush can happen.
s.flushMutex.Lock()
if err := s.applyDatabaseChanges(dbChanges, data.Clock.Number, data.FinalBlockHeight); err != nil {
s.flushMutex.Unlock()
return fmt.Errorf("apply database changes: %w", err)
}
}

s.lastSeenCursor = cursor
s.lastSeenFinalBlock = data.FinalBlockHeight
Comment on lines +136 to +137
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we simply add those to flush directly instead? That would avoid maintaining some state before calling the function.

It also makes me wondering what happen if flush is called through OnTerminating but last block cursor/final block is the one before. It seems that it's a possibility here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we simply add those to flush directly instead? That would avoid maintaining some state before calling the function.

That doesn't work because we don't know the last seen cursor and final block when executing flush from OnTerminating

It also makes me wondering what happen if flush is called through OnTerminating but last block cursor/final block is the one before. It seems that it's a possibility here.

True, I guess we can just lock a mutex in HandleBlockScopedData to ensure that we always have a block fully processed before anything is flushed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added flushMutex that ensures each block is fully processed and the lastSeen stats are saved before any flush can happen

s.flushMutex.Unlock()

if (s.batchBlockModulo(isLive) > 0 && data.Clock.Number%s.batchBlockModulo(isLive) == 0) || s.loader.FlushNeeded() {
s.logger.Debug("flushing to database",
zap.Stringer("block", cursor.Block()),
zap.Bool("is_live", *isLive),
zap.Bool("block_flush_interval_reached", s.batchBlockModulo(isLive) > 0 && data.Clock.Number%s.batchBlockModulo(isLive) == 0),
zap.Bool("row_flush_interval_reached", s.loader.FlushNeeded()),
)
return s.flush(ctx)
}

flushStart := time.Now()
rowFlushedCount, err := s.loader.Flush(ctx, s.OutputModuleHash(), cursor, data.FinalBlockHeight)
if err != nil {
return fmt.Errorf("failed to flush at block %s: %w", cursor.Block(), err)
}
return nil
}

flushDuration := time.Since(flushStart)
if flushDuration > 5*time.Second {
level := zap.InfoLevel
if flushDuration > 30*time.Second {
level = zap.WarnLevel
}
func (s *SQLSinker) flush(ctx context.Context) error {

s.logger.Check(level, "flush to database took a long time to complete, could cause long sync time along the road").Write(zap.Duration("took", flushDuration))
}
// we haven't received any data yet, so nothing to do here
if s.lastSeenCursor == nil {
return nil
}

s.flushMutex.Lock()
defer s.flushMutex.Unlock()

FlushCount.Inc()
FlushedRowsCount.AddInt(rowFlushedCount)
FlushDuration.AddInt64(flushDuration.Nanoseconds())
flushStart := time.Now()
rowFlushedCount, err := s.loader.Flush(ctx, s.OutputModuleHash(), s.lastSeenCursor, s.lastSeenFinalBlock)
if err != nil {
return fmt.Errorf("failed to flush at block %s: %w", s.lastSeenCursor.Block(), err)
}

flushDuration := time.Since(flushStart)
if flushDuration > 5*time.Second {
level := zap.InfoLevel
if flushDuration > 30*time.Second {
level = zap.WarnLevel
}

s.stats.RecordBlock(cursor.Block())
s.stats.RecordFlushDuration(flushDuration)
s.logger.Check(level, "flush to database took a long time to complete, could cause long sync time along the road").Write(zap.Duration("took", flushDuration))
}

FlushCount.Inc()
FlushedRowsCount.AddInt(rowFlushedCount)
FlushDuration.AddInt64(flushDuration.Nanoseconds())

s.stats.RecordBlock(s.lastSeenCursor.Block())
s.stats.RecordFlushDuration(flushDuration)

return nil
}

Expand Down