Skip to content

Commit

Permalink
Add ch async sync contextive logging
Browse files Browse the repository at this point in the history
commit_hash:2a7ef2c7c2d23ba7e3fdd7f2b5e428fe4c707fba
  • Loading branch information
boooec committed Feb 28, 2025
1 parent c1054f3 commit c9247c0
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 32 deletions.
7 changes: 3 additions & 4 deletions pkg/providers/clickhouse/async/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/cenkalti/backoff/v4"
"github.com/doublecloud/transfer/internal/logger"
"github.com/doublecloud/transfer/library/go/core/xerrors"
"github.com/doublecloud/transfer/library/go/core/xerrors/multierr"
"github.com/doublecloud/transfer/library/go/ptr"
Expand Down Expand Up @@ -218,7 +217,7 @@ func (s *shardClient) AliveHost() (DDLStreamingClient, error) {
opts := *s.opts
for i := 0; i < len(s.opts.Addr); i++ {
opts.Addr = []string{s.nextHostAddr()}
cl, err := NewHostClient(&opts, s.lgr)
cl, err := NewHostClient(&opts, log.With(s.lgr, log.String("shardHost", opts.Addr[0])))
if err != nil {
s.lgr.Warn("Error getting host client", log.String("host", opts.Addr[0]), log.Error(err))
} else {
Expand Down Expand Up @@ -279,7 +278,7 @@ func (c *clusterClient) ExecDDL(fn db_model.DDLFactory) error {
func (c *clusterClient) Close() error {
var errs error
for shardID, shard := range c.ShardMap {
logger.Log.Debugf("clusterClient: closing shard %d", shardID)
c.lgr.Debugf("clusterClient: closing shard %d", shardID)
errs = multierr.Append(errs, shard.Close())
}
return errs
Expand All @@ -294,7 +293,7 @@ func (c *clusterClient) randomShard() ShardClient {
func NewClusterClient(conn conn.ConnParams, topology *topology2.Topology, shards sharding.ShardMap[[]string], lgr log.Logger) (ClusterClient, error) {
clients := make(sharding.ShardMap[ShardClient])
for shard, hosts := range shards {
cl, err := NewShardClient(hosts, conn, topology, lgr)
cl, err := NewShardClient(hosts, conn, topology, log.With(lgr, log.String("shardID", fmt.Sprint(shard))))
if err != nil {
return nil, xerrors.Errorf("error making shard client for shard %v: %w", shard, err)
}
Expand Down
14 changes: 8 additions & 6 deletions pkg/providers/clickhouse/async/dao/parts.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/cenkalti/backoff/v4"
"github.com/doublecloud/transfer/internal/logger"
"github.com/doublecloud/transfer/library/go/core/xerrors"
"github.com/doublecloud/transfer/pkg/providers/clickhouse/async/model/db"
"go.ytsaurus.tech/library/go/core/log"
Expand All @@ -20,28 +19,31 @@ type PartsDAO struct {
func (d *PartsDAO) AttachTablePartsTo(dstDB, dstTable, srcDB, srcTable string) error {
d.lgr.Infof("Attaching partitions from %s.%s to %s.%s", srcDB, srcTable, dstDB, dstTable)
partitions, err := d.getPartitionList(srcDB, srcTable)
d.lgr.Info("Got partitions for table", log.String("table", srcTable), log.Strings("partitions", partitions))
if err != nil {
return xerrors.Errorf("error getting table partitions: %w", err)
return xerrors.Errorf("error getting table %s partitions: %w", srcTable, err)
}
d.lgr.Info(fmt.Sprintf("Got %d partitions for table", len(partitions)),
log.String("table", srcTable), log.Strings("partitions", partitions))

for _, p := range partitions {
q := fmt.Sprintf(`ALTER TABLE "%s"."%s" ATTACH PARTITION ID '%s' FROM "%s"."%s"`,
dstDB, dstTable, p, srcDB, srcTable)
d.lgr.Info("Attaching partition", log.String("sql", q))
d.lgr.Info(fmt.Sprintf("Attaching partition %s", p), log.String("sql", q))

err := backoff.RetryNotify(
func() error {
_, err := d.db.ExecContext(context.Background(), q)
return err
},
backoff.NewExponentialBackOff(backoff.WithMaxElapsedTime(10*time.Minute)),
func(err error, d time.Duration) {
logger.Log.Error(fmt.Sprintf("Got Attach Partition error, retrying after %v", d), log.Error(err))
func(err error, dur time.Duration) {
d.lgr.Error(fmt.Sprintf("Got Attach Partition error, retrying after %v", dur), log.Error(err))
},
)
if err != nil {
return xerrors.Errorf("error attaching table partition: %w", err)
}
d.lgr.Info(fmt.Sprintf("Attached partition %s", p), log.String("sql", q))
}
return nil
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/providers/clickhouse/async/part.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"sync"

"github.com/cenkalti/backoff/v4"
"github.com/doublecloud/transfer/internal/logger"
"github.com/doublecloud/transfer/library/go/core/xerrors"
"github.com/doublecloud/transfer/library/go/core/xerrors/multierr"
"github.com/doublecloud/transfer/pkg/abstract"
Expand Down Expand Up @@ -109,7 +108,7 @@ func (p *part) Commit() error {
func (p *part) Close() error {
var err error
for _, shard := range p.shards {
logger.Log.Debug("part: closing shard")
p.lgr.Debug("part: closing shard")
err = multierr.Append(err, shard.Close())
}
p.shards = nil
Expand All @@ -124,7 +123,7 @@ func (p *part) getOrCreateShardPart(shardID sharding.ShardID) (*shardPart, error
return shard, nil
}

logger.Log.Infof("Starting stream inserting for part %s of table %s.%s shard %d", p.id.PartID, p.dbName, p.id.Name, shardID)
p.lgr.Infof("Starting stream inserting for part %s of table %s.%s shard %d", p.id.PartID, p.dbName, p.id.Name, shardID)
if err := backoff.Retry(func() error {
s, err := p.createShardPart(shardID)
if err != nil {
Expand Down Expand Up @@ -198,7 +197,8 @@ func (p *part) createShardPart(shardID sharding.ShardID) (*shardPart, error) {
return nil, xerrors.Errorf("error getting table cols for table '%s': %w", p.id.Name, err)
}

shardPart, err := newShardPart(p.dbName, p.id.Name, p.dbName, p.tmpTableName(), p.query, hostDB, cols)
shardLgr := log.With(p.lgr, log.String("shardID", fmt.Sprint(shardID)))
shardPart, err := newShardPart(shardLgr, p.dbName, p.id.Name, p.dbName, p.tmpTableName(), p.query, hostDB, cols)
if err != nil {
return nil, xerrors.Errorf("error making new part for shard %d: %w", shardID, err)
}
Expand Down Expand Up @@ -226,7 +226,7 @@ func NewPart(
shardsMu: sync.RWMutex{},
dbName: dbName,
id: partID,
lgr: lgr,
lgr: log.With(lgr, log.String("partID", partID.PartID), log.String("table", partID.TableID.Fqtn())),
query: "",
transferID: transferID,
tableCols: make(map[abstract.TableID]columntypes.TypeMapping),
Expand Down
17 changes: 9 additions & 8 deletions pkg/providers/clickhouse/async/shard_part.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"sync"

"github.com/cenkalti/backoff/v4"
"github.com/doublecloud/transfer/internal/logger"
"github.com/doublecloud/transfer/library/go/core/xerrors"
"github.com/doublecloud/transfer/library/go/core/xerrors/multierr"
"github.com/doublecloud/transfer/library/go/slices"
Expand All @@ -19,6 +18,7 @@ import (
)

type shardPart struct {
lgr log.Logger
db DDLStreamingClient
streamer db.Streamer
dao *dao.DDLDAO
Expand Down Expand Up @@ -56,7 +56,7 @@ func (s *shardPart) Append(row abstract.ChangeItem) error {
s.streamer = strm
return nil
}, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 3),
util.BackoffLoggerWarn(logger.Log, "begin StreamInsert failed, retrying"))
util.BackoffLoggerWarn(s.lgr, "begin StreamInsert failed, retrying"))
if err != nil {
return xerrors.Errorf("error starting insert query: %w", err)
}
Expand All @@ -72,10 +72,10 @@ func (s *shardPart) Finish() error {
// Merge merges temporary table to destination table.
func (s *shardPart) Merge() error {
defer func() {
logger.Log.Debug("shardPart closing itself after Commit")
s.lgr.Debug("shardPart closing itself after Commit")
err := s.Close()
if err != nil {
logger.Log.Error("error closing shardPart", log.Error(err))
s.lgr.Error("error closing shardPart", log.Error(err))
}
}()
if err := s.partsDao.AttachTablePartsTo(s.baseDB, s.baseTable, s.tmpDB, s.tmpTable); err != nil {
Expand All @@ -89,7 +89,7 @@ func (s *shardPart) Close() error {
var res error
s.closeOnce.Do(func() {
if s.streamer != nil {
logger.Log.Debug("shardPart closing streamer")
s.lgr.Debug("shardPart closing streamer")
if err := s.streamer.Close(); err != nil {
res = multierr.Append(res, xerrors.Errorf("error closing streamer: %w", err))
}
Expand All @@ -104,20 +104,21 @@ func (s *shardPart) Close() error {
}

func newShardPart(
baseDB, baseTable, tmpDB, tmpTable, query string, hostDB DDLStreamingClient, cols columntypes.TypeMapping,
lgr log.Logger, baseDB, baseTable, tmpDB, tmpTable, query string, hostDB DDLStreamingClient, cols columntypes.TypeMapping,
) (*shardPart, error) {
ddldao := dao.NewDDLDAO(hostDB, logger.Log)
ddldao := dao.NewDDLDAO(hostDB, lgr)
if err := ddldao.DropTable(tmpDB, tmpTable); err != nil {
return nil, xerrors.Errorf("error dropping tmp table for part %s.%s: %w", tmpDB, tmpTable, err)
}
if err := ddldao.CreateTableAs(baseDB, baseTable, tmpDB, tmpTable); err != nil {
return nil, xerrors.Errorf("error creating tmp table for part %s.%s: %w", tmpDB, tmpTable, err)
}
return &shardPart{
lgr: lgr,
db: hostDB,
streamer: nil,
dao: ddldao,
partsDao: dao.NewPartsDAO(hostDB, logger.Log),
partsDao: dao.NewPartsDAO(hostDB, lgr),
baseDB: baseDB,
baseTable: baseTable,
tmpDB: tmpDB,
Expand Down
5 changes: 2 additions & 3 deletions pkg/providers/clickhouse/async/sink.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package async

import (
"github.com/doublecloud/transfer/internal/logger"
"github.com/doublecloud/transfer/library/go/core/metrics"
"github.com/doublecloud/transfer/library/go/core/xerrors"
"github.com/doublecloud/transfer/library/go/core/xerrors/multierr"
Expand Down Expand Up @@ -36,7 +35,7 @@ var sinkClosedErr = xerrors.New("sink is closed")
func (s *sink) Close() error {
var errs error
for partID, part := range s.parts {
logger.Log.Debugf("Sink: closing part %s", partID.PartID)
s.lgr.Debugf("Sink: closing part %s", partID.PartID)
if err := part.Close(); err != nil {
errs = multierr.Append(errs, xerrors.Errorf("error closing part %s: %w", partID.PartID, err))
}
Expand All @@ -45,7 +44,7 @@ func (s *sink) Close() error {
if err := s.middleware.Close(); err != nil {
errs = multierr.Append(errs, xerrors.Errorf("error closing middleware pipeline: %w", err))
}
logger.Log.Debug("Sink: closing clusterClient")
s.lgr.Debug("Sink: closing clusterClient")
if err := s.cl.Close(); err != nil {
errs = multierr.Append(errs, xerrors.Errorf("error closing CH cluster client: %w", err))
}
Expand Down
11 changes: 5 additions & 6 deletions pkg/providers/clickhouse/async/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/ClickHouse/clickhouse-go/v2/lib/column"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/ClickHouse/clickhouse-go/v2/lib/proto"
"github.com/doublecloud/transfer/internal/logger"
"github.com/doublecloud/transfer/library/go/core/xerrors"
"github.com/doublecloud/transfer/library/go/core/xerrors/multierr"
"github.com/doublecloud/transfer/pkg/abstract"
Expand Down Expand Up @@ -106,7 +105,7 @@ func (c *chV2Streamer) Close() error {
c.lgr.Info("Closing streaming batch", log.Error(c.err))
c.isClosed = true
if !c.batch.IsSent() {
logger.Log.Debug("Batch is not sent yet, aborting")
c.lgr.Debug("Batch is not sent yet, aborting")
if err := c.batch.Abort(); err != nil {
errs = multierr.Append(errs, xerrors.Errorf("error aborting CH streaming batch: %w", err))
}
Expand All @@ -128,7 +127,7 @@ func (c *chV2Streamer) Finish() error {
if err := c.closeIfErr(c.batch.Send); err != nil {
return xerrors.Errorf("error sending CH streaming batch: %w", err)
}
logger.Log.Debug("chV2Streamer closing itself after commit")
c.lgr.Debug("chV2Streamer closing itself after commit")
if err := c.Close(); err != nil {
c.lgr.Warn("error closing streamer", log.Error(err))
}
Expand All @@ -152,22 +151,22 @@ func (c *chV2Streamer) closeIfErr(fn func() error) error {
return nil
}
c.err = err
logger.Log.Debugf("chV2Streamer closing itself because of error %v", err)
c.lgr.Debugf("chV2Streamer closing itself because of error %v", err)
if closeErr := c.Close(); closeErr != nil {
c.lgr.Warn("error closing streamer", log.Error(closeErr))
}
return err
}

func (c *chV2Streamer) flush() error {
logger.Log.Debug("Flushing streamer")
c.lgr.Debug("Flushing streamer")
err := c.closeIfErr(c.batch.Flush)
c.memSize = 0
return err
}

func (c *chV2Streamer) restart() error {
logger.Log.Debug("Restarting streamer")
c.lgr.Debug("Restarting streamer")
return c.closeIfErr(func() error {
if err := c.batch.Send(); err != nil {
return xerrors.Errorf("error sending streaming batch: %w", err)
Expand Down

0 comments on commit c9247c0

Please sign in to comment.