Skip to content

Commit f88b112

Browse files
Merge pull request #3 from YaroShkvorets/yaro/clickhouse-cursor
fix cursor loading on clickhouse
2 parents 837e3de + 1120748 commit f88b112

File tree

5 files changed

+12
-1
lines changed

5 files changed

+12
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## Unreleased
99

10+
* Fixed cursor loading on Clickhouse
1011
* Improved batch block flush logic to flush after a certain number of blocks instead of taking by modulo
1112

1213
## v4.5.0

db/cursor.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ type cursorRow struct {
2424
// GetAllCursors returns an unordered map given for each module's hash recorded
2525
// the active cursor for it.
2626
func (l *Loader) GetAllCursors(ctx context.Context) (out map[string]*sink.Cursor, err error) {
27-
rows, err := l.DB.QueryContext(ctx, fmt.Sprintf("SELECT id, cursor, block_num, block_id from %s", l.cursorTable.identifier))
27+
query := l.getDialect().GetAllCursorsQuery(l.cursorTable.identifier)
28+
rows, err := l.DB.QueryContext(ctx, query)
2829
if err != nil {
2930
return nil, fmt.Errorf("query all cursors: %w", err)
3031
}

db/dialect.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type dialect interface {
2222
ExecuteSetupScript(ctx context.Context, l *Loader, schemaSql string) error
2323
DriverSupportRowsAffected() bool
2424
GetUpdateCursorQuery(table, moduleHash string, cursor *sink.Cursor, block_num uint64, block_id string) string
25+
GetAllCursorsQuery(table string) string
2526
ParseDatetimeNormalization(value string) string
2627
Flush(tx Tx, ctx context.Context, l *Loader, outputModuleHash string, lastFinalBlock uint64) (int, error)
2728
Revert(tx Tx, ctx context.Context, l *Loader, lastValidFinalBlock uint64) error

db/dialect_clickhouse.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,10 @@ func (d clickhouseDialect) GetUpdateCursorQuery(table, moduleHash string, cursor
120120
`, table, moduleHash, cursor, block_num, block_id)
121121
}
122122

123+
func (d clickhouseDialect) GetAllCursorsQuery(table string) string {
124+
return fmt.Sprintf("SELECT id, cursor, block_num, block_id FROM %s FINAL", table)
125+
}
126+
123127
func (d clickhouseDialect) ParseDatetimeNormalization(value string) string {
124128
return fmt.Sprintf("parseDateTimeBestEffort(%s)", escapeStringValue(value))
125129
}

db/dialect_postgres.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,10 @@ func (d postgresDialect) GetUpdateCursorQuery(table, moduleHash string, cursor *
234234
`, table, cursor, block_num, block_id, moduleHash)
235235
}
236236

237+
func (d postgresDialect) GetAllCursorsQuery(table string) string {
238+
return fmt.Sprintf("SELECT id, cursor, block_num, block_id FROM %s", table)
239+
}
240+
237241
func (d postgresDialect) ParseDatetimeNormalization(value string) string {
238242
return escapeStringValue(value)
239243
}

0 commit comments

Comments
 (0)