Skip to content

Commit da79a83

Browse files
Merge branch 'develop' into yaro/fix-flush
2 parents 19ce975 + f88b112 commit da79a83

File tree

7 files changed

+26
-6
lines changed

7 files changed

+26
-6
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
88
## Unreleased
99

1010
* Fixed a bug with not flushing rows on stream completion
11+
* Fixed cursor loading on Clickhouse
12+
* Improved batch block flush logic to flush after a certain number of blocks instead of taking by modulo
1113

1214
## v4.5.0
1315

db/cursor.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ type cursorRow struct {
2424
// GetAllCursors returns an unordered map given for each module's hash recorded
2525
// the active cursor for it.
2626
func (l *Loader) GetAllCursors(ctx context.Context) (out map[string]*sink.Cursor, err error) {
27-
rows, err := l.DB.QueryContext(ctx, fmt.Sprintf("SELECT id, cursor, block_num, block_id from %s", l.cursorTable.identifier))
27+
query := l.getDialect().GetAllCursorsQuery(l.cursorTable.identifier)
28+
rows, err := l.DB.QueryContext(ctx, query)
2829
if err != nil {
2930
return nil, fmt.Errorf("query all cursors: %w", err)
3031
}

db/dialect.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type dialect interface {
2222
ExecuteSetupScript(ctx context.Context, l *Loader, schemaSql string) error
2323
DriverSupportRowsAffected() bool
2424
GetUpdateCursorQuery(table, moduleHash string, cursor *sink.Cursor, block_num uint64, block_id string) string
25+
GetAllCursorsQuery(table string) string
2526
ParseDatetimeNormalization(value string) string
2627
Flush(tx Tx, ctx context.Context, l *Loader, outputModuleHash string, lastFinalBlock uint64) (int, error)
2728
Revert(tx Tx, ctx context.Context, l *Loader, lastValidFinalBlock uint64) error

db/dialect_clickhouse.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,10 @@ func (d clickhouseDialect) GetUpdateCursorQuery(table, moduleHash string, cursor
120120
`, table, moduleHash, cursor, block_num, block_id)
121121
}
122122

123+
func (d clickhouseDialect) GetAllCursorsQuery(table string) string {
124+
return fmt.Sprintf("SELECT id, cursor, block_num, block_id FROM %s FINAL", table)
125+
}
126+
123127
func (d clickhouseDialect) ParseDatetimeNormalization(value string) string {
124128
return fmt.Sprintf("parseDateTimeBestEffort(%s)", escapeStringValue(value))
125129
}

db/dialect_postgres.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,10 @@ func (d postgresDialect) GetUpdateCursorQuery(table, moduleHash string, cursor *
234234
`, table, cursor, block_num, block_id, moduleHash)
235235
}
236236

237+
func (d postgresDialect) GetAllCursorsQuery(table string) string {
238+
return fmt.Sprintf("SELECT id, cursor, block_num, block_id FROM %s", table)
239+
}
240+
237241
func (d postgresDialect) ParseDatetimeNormalization(value string) string {
238242
return escapeStringValue(value)
239243
}

sinker/sinker.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,11 +118,12 @@ func (s *SQLSinker) HandleBlockScopedData(ctx context.Context, data *pbsubstream
118118
}
119119
}
120120

121-
if (s.batchBlockModulo(isLive) > 0 && data.Clock.Number%s.batchBlockModulo(isLive) == 0) || s.loader.FlushNeeded() {
121+
blockFlushNeeded := s.batchBlockModulo(isLive) > 0 && data.Clock.Number-s.stats.GetLastBlockNum() >= s.batchBlockModulo(isLive)
122+
if blockFlushNeeded || s.loader.FlushNeeded() {
122123
s.logger.Debug("flushing to database",
123124
zap.Stringer("block", cursor.Block()),
124125
zap.Bool("is_live", *isLive),
125-
zap.Bool("block_flush_interval_reached", s.batchBlockModulo(isLive) > 0 && data.Clock.Number%s.batchBlockModulo(isLive) == 0),
126+
zap.Bool("block_flush_interval_reached", blockFlushNeeded),
126127
zap.Bool("row_flush_interval_reached", s.loader.FlushNeeded()),
127128
)
128129

sinker/stats.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ type Stats struct {
1515

1616
dbFlushRate *dmetrics.AvgRatePromCounter
1717
dbFlushAvgDuration *dmetrics.AvgDurationCounter
18-
flusehdRows *dmetrics.ValueFromMetric
18+
flushedRows *dmetrics.ValueFromMetric
19+
dbFlushedRowsRate *dmetrics.AvgRatePromCounter
1920
lastBlock bstream.BlockRef
2021
logger *zap.Logger
2122
}
@@ -26,7 +27,8 @@ func NewStats(logger *zap.Logger) *Stats {
2627

2728
dbFlushRate: dmetrics.MustNewAvgRateFromPromCounter(FlushCount, 1*time.Second, 30*time.Second, "flush"),
2829
dbFlushAvgDuration: dmetrics.NewAvgDurationCounter(30*time.Second, dmetrics.InferUnit, "per flush"),
29-
flusehdRows: dmetrics.NewValueFromMetric(FlushedRowsCount, "rows"),
30+
flushedRows: dmetrics.NewValueFromMetric(FlushedRowsCount, "rows"),
31+
dbFlushedRowsRate: dmetrics.MustNewAvgRateFromPromCounter(FlushedRowsCount, 1*time.Second, 30*time.Second, "flushed rows"),
3032
logger: logger,
3133

3234
lastBlock: unsetBlockRef{},
@@ -41,6 +43,10 @@ func (s *Stats) RecordFlushDuration(duration time.Duration) {
4143
s.dbFlushAvgDuration.AddDuration(duration)
4244
}
4345

46+
func (s *Stats) GetLastBlockNum() uint64 {
47+
return s.lastBlock.Num()
48+
}
49+
4450
func (s *Stats) Start(each time.Duration, cursor *sink.Cursor) {
4551
if !cursor.IsBlank() {
4652
s.lastBlock = cursor.Block()
@@ -71,7 +77,8 @@ func (s *Stats) LogNow() {
7177
s.logger.Info("postgres sink stats",
7278
zap.Stringer("db_flush_rate", s.dbFlushRate),
7379
zap.Stringer("db_flush_duration_rate", s.dbFlushAvgDuration),
74-
zap.Uint64("flushed_rows", s.flusehdRows.ValueUint()),
80+
zap.Uint64("flushed_rows", s.flushedRows.ValueUint()),
81+
zap.Stringer("db_flushed_rows_rate", s.dbFlushedRowsRate),
7582
zap.Stringer("last_block", s.lastBlock),
7683
)
7784
}

0 commit comments

Comments
 (0)