Skip to content

Commit

Permalink
Add support for deletion tracking in ClickHouse sink tables
Browse files Browse the repository at this point in the history
- Introduce `__data_transfer_is_deleted` generated column for tracking deletions
- Update ClickHouse destination model to support deletable flag
- Infer is\_deleted usage based on clickhouse version
- Include is\_deleted flag into replacing merge tree

Closes: <#219>

---

Pull Request resolved: <#231>

Co-authored-by: tserakhau <[email protected]>
Co-authored-by: tserakhau <[email protected]>
Co-authored-by: tserakhau <[email protected]>
Co-authored-by: tserakhau <[email protected]>
Co-authored-by: tserakhau <[email protected]>
Co-authored-by: tserakhau <[email protected]>
commit_hash:9e5d0898877ded9792fa13e1f24b60e9f216e875
  • Loading branch information
laskoviymishka authored and robot-piglet committed Feb 28, 2025
1 parent 833e3c9 commit c1054f3
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pkg/providers/clickhouse/schema/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func DescribeTable(db *sql.DB, database, table string, knownPrimaryKeys []string

//add other fields
for _, colname := range colNames {
if colname == "__data_transfer_commit_time" || colname == "__data_transfer_delete_time" {
if colname == "__data_transfer_commit_time" || colname == "__data_transfer_delete_time" || colname == "__data_transfer_is_deleted" {
continue
}
if colPrimary[colname] {
Expand Down
25 changes: 25 additions & 0 deletions pkg/providers/clickhouse/sink_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/blang/semver/v4"
"github.com/cenkalti/backoff/v4"
"github.com/doublecloud/transfer/library/go/core/xerrors"
"github.com/doublecloud/transfer/pkg/abstract"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/doublecloud/transfer/pkg/providers/clickhouse/errors"
"github.com/doublecloud/transfer/pkg/providers/clickhouse/model"
"github.com/doublecloud/transfer/pkg/stats"
"github.com/doublecloud/transfer/pkg/util"
"github.com/jmoiron/sqlx"
"go.ytsaurus.tech/library/go/core/log"
)
Expand All @@ -40,6 +42,7 @@ type SinkServer struct {
lastFail time.Time
callbacks *SinkServerCallbacks // special callback, used only in test
cluster *sinkCluster
version semver.Version
}

type SinkServerCallbacks struct {
Expand Down Expand Up @@ -255,6 +258,7 @@ func (s *SinkServer) GetTable(table string, schema *abstract.TableSchema) (*sink
cluster: s.cluster,
timezoneFetched: false,
timezone: nil,
version: s.version,
}

if err := tbl.resolveTimezone(); err != nil {
Expand Down Expand Up @@ -327,6 +331,26 @@ func NewSinkServerImpl(cfg model.ChSinkServerParams, lgr log.Logger, metrics *st
return nil, xerrors.Errorf("native connection error: %w", err)
}

version, err := backoff.RetryNotifyWithData(func() (string, error) {
var version string

if err := db.QueryRow("select version();").Scan(&version); err != nil {
if errors.IsFatalClickhouseError(err) {
return "", backoff.Permanent(xerrors.Errorf("unable to select clickhouse version: %w", err))
}
return "", xerrors.Errorf("unable to select clickhouse version: %w", err)
}
return version, nil
}, backoff.NewExponentialBackOff(), util.BackoffLoggerWarn(lgr, "version resolver"))
if err != nil {
return nil, xerrors.Errorf("unable to extract version: %w", err)
}

parsedVersion, err := parseSemver(version)
if err != nil {
return nil, xerrors.Errorf("unable to parse semver: %w", err)
}

s := &SinkServer{
db: db,
logger: log.With(lgr, log.String("ch_host", host)),
Expand All @@ -341,6 +365,7 @@ func NewSinkServerImpl(cfg model.ChSinkServerParams, lgr log.Logger, metrics *st
lastFail: time.Time{},
callbacks: nil,
cluster: cluster,
version: *parsedVersion,
}

ctx, cancel := context.WithTimeout(context.Background(), errors.ClickhouseReadTimeout)
Expand Down
13 changes: 13 additions & 0 deletions pkg/providers/clickhouse/sink_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/ClickHouse/clickhouse-go/v2/lib/column"
"github.com/blang/semver/v4"
"github.com/doublecloud/transfer/internal/logger"
"github.com/doublecloud/transfer/library/go/core/xerrors"
"github.com/doublecloud/transfer/library/go/slices"
Expand Down Expand Up @@ -38,8 +39,14 @@ type sinkTable struct {
cluster *sinkCluster
timezoneFetched bool
timezone *time.Location
version semver.Version
}

var (
// see: https://github.com/Altinity/clickhouse-sink-connector/issues/206#issuecomment-1529968850
deleteableVersion = semver.MustParse("23.2.0")
)

func normalizeTableName(table string) string {
res := strings.ReplaceAll(table, "-", "_")
return strings.ReplaceAll(res, ".", "_")
Expand Down Expand Up @@ -133,6 +140,9 @@ func (t *sinkTable) generateDDL(cols []abstract.ColSchema, distributed bool) str
if t.config.IsUpdateable() {
columnDefinitions = append(columnDefinitions, "`__data_transfer_commit_time` UInt64")
columnDefinitions = append(columnDefinitions, "`__data_transfer_delete_time` UInt64")
if t.version.GTE(deleteableVersion) {
columnDefinitions = append(columnDefinitions, "`__data_transfer_is_deleted` UInt8 MATERIALIZED (if(__data_transfer_delete_time != 0, 1, 0))")
}
}
_, _ = result.WriteString(fmt.Sprintf(" (%s)", strings.Join(columnDefinitions, ", ")))

Expand All @@ -141,6 +151,9 @@ func (t *sinkTable) generateDDL(cols []abstract.ColSchema, distributed bool) str
if t.config.IsUpdateable() {
engine = fmt.Sprintf("Replacing%s", engine)
engineArgs = append(engineArgs, "__data_transfer_commit_time")
if t.version.GTE(deleteableVersion) {
engineArgs = append(engineArgs, "__data_transfer_is_deleted")
}
}
if distributed {
engine = fmt.Sprintf("Replicated%s", engine)
Expand Down
66 changes: 66 additions & 0 deletions tests/e2e/pg2ch/replication/check_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/doublecloud/transfer/internal/logger"
"github.com/doublecloud/transfer/pkg/abstract"
cpclient "github.com/doublecloud/transfer/pkg/abstract/coordinator"
"github.com/doublecloud/transfer/pkg/providers/clickhouse"
chrecipe "github.com/doublecloud/transfer/pkg/providers/clickhouse/recipe"
pgcommon "github.com/doublecloud/transfer/pkg/providers/postgres"
"github.com/doublecloud/transfer/pkg/providers/postgres/pgrecipe"
Expand Down Expand Up @@ -94,3 +95,68 @@ func TestSnapshotAndIncrement(t *testing.T) {
require.NoError(t, helpers.WaitEqualRowsCount(t, databaseName, "__test", helpers.GetSampleableStorageByModel(t, Source), helpers.GetSampleableStorageByModel(t, Target), 60*time.Second))
require.NoError(t, helpers.CompareStorages(t, Source, Target, helpers.NewCompareStorageParams().WithEqualDataTypes(pg2ch.PG2CHDataTypesComparator)))
}

func TestOptimizeCleanup(t *testing.T) {
// Setup same as in TestSnapshotAndIncrement
connConfig, err := pgcommon.MakeConnConfigFromSrc(logger.Log, &Source)
require.NoError(t, err)
conn, err := pgcommon.NewPgConnPool(connConfig, logger.Log)
require.NoError(t, err)

// Start transfer
transfer := helpers.MakeTransfer(helpers.TransferID, &Source, &Target, TransferType)
err = tasks.ActivateDelivery(context.Background(), nil, cpclient.NewFakeClient(), *transfer, helpers.EmptyRegistry())
require.NoError(t, err)

localWorker := local.NewLocalWorker(cpclient.NewFakeClient(), transfer, helpers.EmptyRegistry(), logger.Log)
localWorker.Start()
defer localWorker.Stop()

// Insert test data
rows, err := conn.Query(context.Background(), "INSERT INTO __test (id, val1, val2) VALUES (100, 100, 'test_cleanup')")
require.NoError(t, err)
rows.Close()

// Wait until data appears in CH
require.NoError(
t,
helpers.WaitEqualRowsCount(
t,
databaseName,
"__test",
helpers.GetSampleableStorageByModel(t, Source),
helpers.GetSampleableStorageByModel(t, Target),
60*time.Second,
),
)

// Delete the data
rows, err = conn.Query(context.Background(), "DELETE FROM __test WHERE id=100")
require.NoError(t, err)
rows.Close()

// Wait until deletion is reflected in CH
require.NoError(t, helpers.WaitEqualRowsCount(t, databaseName, "__test",
helpers.GetSampleableStorageByModel(t, Source),
helpers.GetSampleableStorageByModel(t, Target),
60*time.Second))

// Get CH connection for verification
chConn, err := clickhouse.MakeConnection(Target.ToStorageParams())
require.NoError(t, err)

// Run OPTIMIZE ... FINAL CLEANUP
_, err = chConn.Exec("OPTIMIZE TABLE public.__test FINAL CLEANUP")
require.NoError(t, err)

// Verify that rows are physically deleted
var count int
err = chConn.QueryRow("SELECT count() FROM public.__test WHERE id = 100").Scan(&count)
require.NoError(t, err)
require.Equal(t, 0, count)

// Verify that rows don't reappear after OPTIMIZE
err = chConn.QueryRow("SELECT count() FROM public.__test FINAL WHERE id = 100").Scan(&count)
require.NoError(t, err)
require.Equal(t, 0, count)
}

0 comments on commit c1054f3

Please sign in to comment.