Skip to content

Commit 1120748

Browse files
Merge branch 'develop' into yaro/clickhouse-cursor
2 parents 9866c96 + 837e3de commit 1120748

File tree

3 files changed

+14
-5
lines changed

3 files changed

+14
-5
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
88
## Unreleased
99

1010
* Fixed cursor loading on Clickhouse
11+
* Improved batch block flush logic to flush after a certain number of blocks instead of taking by modulo
1112

1213
## v4.5.0
1314

sinker/sinker.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,11 +118,12 @@ func (s *SQLSinker) HandleBlockScopedData(ctx context.Context, data *pbsubstream
118118
}
119119
}
120120

121-
if (s.batchBlockModulo(isLive) > 0 && data.Clock.Number%s.batchBlockModulo(isLive) == 0) || s.loader.FlushNeeded() {
121+
blockFlushNeeded := s.batchBlockModulo(isLive) > 0 && data.Clock.Number-s.stats.GetLastBlockNum() >= s.batchBlockModulo(isLive)
122+
if blockFlushNeeded || s.loader.FlushNeeded() {
122123
s.logger.Debug("flushing to database",
123124
zap.Stringer("block", cursor.Block()),
124125
zap.Bool("is_live", *isLive),
125-
zap.Bool("block_flush_interval_reached", s.batchBlockModulo(isLive) > 0 && data.Clock.Number%s.batchBlockModulo(isLive) == 0),
126+
zap.Bool("block_flush_interval_reached", blockFlushNeeded),
126127
zap.Bool("row_flush_interval_reached", s.loader.FlushNeeded()),
127128
)
128129

sinker/stats.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ type Stats struct {
1515

1616
dbFlushRate *dmetrics.AvgRatePromCounter
1717
dbFlushAvgDuration *dmetrics.AvgDurationCounter
18-
flusehdRows *dmetrics.ValueFromMetric
18+
flushedRows *dmetrics.ValueFromMetric
19+
dbFlushedRowsRate *dmetrics.AvgRatePromCounter
1920
lastBlock bstream.BlockRef
2021
logger *zap.Logger
2122
}
@@ -26,7 +27,8 @@ func NewStats(logger *zap.Logger) *Stats {
2627

2728
dbFlushRate: dmetrics.MustNewAvgRateFromPromCounter(FlushCount, 1*time.Second, 30*time.Second, "flush"),
2829
dbFlushAvgDuration: dmetrics.NewAvgDurationCounter(30*time.Second, dmetrics.InferUnit, "per flush"),
29-
flusehdRows: dmetrics.NewValueFromMetric(FlushedRowsCount, "rows"),
30+
flushedRows: dmetrics.NewValueFromMetric(FlushedRowsCount, "rows"),
31+
dbFlushedRowsRate: dmetrics.MustNewAvgRateFromPromCounter(FlushedRowsCount, 1*time.Second, 30*time.Second, "flushed rows"),
3032
logger: logger,
3133

3234
lastBlock: unsetBlockRef{},
@@ -41,6 +43,10 @@ func (s *Stats) RecordFlushDuration(duration time.Duration) {
4143
s.dbFlushAvgDuration.AddDuration(duration)
4244
}
4345

46+
func (s *Stats) GetLastBlockNum() uint64 {
47+
return s.lastBlock.Num()
48+
}
49+
4450
func (s *Stats) Start(each time.Duration, cursor *sink.Cursor) {
4551
if !cursor.IsBlank() {
4652
s.lastBlock = cursor.Block()
@@ -71,7 +77,8 @@ func (s *Stats) LogNow() {
7177
s.logger.Info("postgres sink stats",
7278
zap.Stringer("db_flush_rate", s.dbFlushRate),
7379
zap.Stringer("db_flush_duration_rate", s.dbFlushAvgDuration),
74-
zap.Uint64("flushed_rows", s.flusehdRows.ValueUint()),
80+
zap.Uint64("flushed_rows", s.flushedRows.ValueUint()),
81+
zap.Stringer("db_flushed_rows_rate", s.dbFlushedRowsRate),
7582
zap.Stringer("last_block", s.lastBlock),
7683
)
7784
}

0 commit comments

Comments
 (0)