Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 52 additions & 15 deletions cmd/aardappel/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,12 @@ func EstimateReplicationLagSec(pos types.Position) float32 {
}

func DoReplication(ctx context.Context, prc *processor.Processor, dstTables []*dst_table.DstTable,
lockExecutor func(fn func(context.Context, table.Session, table.Transaction) error) error, mon pmon.Metrics) {
lockExecutor func(fn func(context.Context, table.Session, table.Transaction) error) error, mon pmon.Metrics) error {
Copy link
Collaborator Author

@VPolka VPolka Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Если ошибка в топике случилась в main горутине (в которой процессор работает), то ошибка отправляется в канал, который мониторится и пытается завершить приложение (как при получении сигнала), и постепенно начинаю выходить из функций возвращая эту ошибку

В случае read topic горутины - она просто завершается и ничего не делается, пока ошибка в канале не обработается и не начнется завершаться приложение. При этом другие горутины не трогаю, они могут продолжать работать, завершаться после обработки ошибки из канала

passed := time.Now().UnixMilli()
stats, err := prc.DoReplication(ctx, dstTables, lockExecutor)
if err != nil {
if ctx.Err() != nil {
xlog.Error(ctx, "Context cancelled or expired during replication step", zap.Error(err))
return
}
xlog.Fatal(ctx, "Unable to perform replication without error", zap.Error(err))
errMsg := fmt.Sprintf("Unable to perform replication without error")
return types.ReturnError(ctx, err, errMsg)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Специальная функция для возвращения ошибки - если ошибка gracefull, то она возврашается, если нет, что делается xlog.Fatal
То есть до конца будет возвращаться graceful ошбика и когда завершится функция doMain, то там просто отеняется контекст, отпустится лок и приложение заверщается как будто graceful
До конца доходит, если из канала не успела ошибка обработаться по каким то причинам

}
passed = time.Now().UnixMilli() - passed
perSecond := float32(stats.ModificationsCount) / (float32(passed) / 1000.0)
Expand All @@ -103,6 +100,8 @@ func DoReplication(ctx context.Context, prc *processor.Processor, dstTables []*d
zap.Int("request size", stats.RequestSize),
zap.Float32("quorum waiting duration", float32(stats.QuorumWaitingDurationMs)/1000),
zap.Float32("replication lag estimation:", lag))

return nil
}

func createReplicaStateTable(ctx context.Context, client *client.TableClient, stateTable string) error {
Expand Down Expand Up @@ -155,7 +154,7 @@ func doDescribeTopics(ctx context.Context, config configInit.Config, srcDb *clie
}

func doMain(ctx context.Context, config configInit.Config, srcDb *client.TopicClient, dstDb *client.YdbClient,
locker *ydb_locker.Locker, mon pmon.Metrics) {
locker *ydb_locker.Locker, mon pmon.Metrics, errChannel chan error) error {
topics := doDescribeTopics(ctx, config, srcDb)

xlog.Debug(ctx, "All topics described",
Expand Down Expand Up @@ -200,7 +199,7 @@ func doMain(ctx context.Context, config configInit.Config, srcDb *client.TopicCl

for i := 0; i < len(config.Streams); i++ {
cfgStream := &config.Streams[i]
startCb, updateCb := topicReader.MakeTopicReaderGuard()
startCb, updateCb := topicReader.MakeTopicReaderGuard(errChannel)
reader, err := srcDb.StartReader(config.Streams[i].Consumer, cfgStream.SrcTopic,
topicoptions.WithReaderGetPartitionStartOffset(startCb))

Expand All @@ -224,7 +223,7 @@ func doMain(ctx context.Context, config configInit.Config, srcDb *client.TopicCl
PartCount: topics.TopicPartsCountMap[i].PartitionsCount,
ProblemStrategy: cfgStream.ProblemStrategy}
xlog.Debug(ctx, "Start reading")
go topicReader.ReadTopic(ctx, streamInfo, reader, prc, conflictHandler, updateCb, dlQueue)
go topicReader.ReadTopic(ctx, streamInfo, reader, prc, conflictHandler, updateCb, dlQueue, errChannel)
}

lockExecutor := func(fn func(context.Context, table.Session, table.Transaction) error) error {
Expand All @@ -235,8 +234,14 @@ func doMain(ctx context.Context, config configInit.Config, srcDb *client.TopicCl
}

for ctx.Err() == nil {
DoReplication(ctx, prc, dstTables, lockExecutor, mon)
err := DoReplication(ctx, prc, dstTables, lockExecutor, mon)
if err != nil {
errMsg := fmt.Sprintf("doMain")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Такая штука добавляется в начале ошибки, то есть в итоге будет текст ошибки такой
"doMain: text of error"

return types.ReturnError(ctx, err, errMsg)
}
}

return nil
}

func trySrcConnect(ctx context.Context, config configInit.Config, srcOpts []ydb.Option) bool {
Expand Down Expand Up @@ -291,11 +296,31 @@ func main() {
logger := xlog.SetupLogging(config.LogLevel)
xlog.SetInternalLogger(logger)

errChannel := make(chan error)

go func() {
select {
case sig := <-signalChannel:
xlog.Info(ctx, "Got OS signal, stopping aardappel....", zap.String("signal name", sig.String()))
cancel()
for {
select {
case sig := <-signalChannel:
xlog.Info(ctx, "Got OS signal, stopping aardappel....", zap.String("signal name", sig.String()))
cancel()
case err := <-errChannel:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

вот тут как раз и пытается обработаться ошибка из канала

if err == nil {
continue
}
var appErr *types.Error
if errors.As(err, &appErr) {
switch appErr.Kind() {
case types.Graceful:
xlog.Info(ctx, "Got error, stopping aardappel....", zap.String("error", appErr.Error()))
cancel()
case types.Fatal:
xlog.Fatal(ctx, "Got error, stopping aardappel....", zap.String("error", appErr.Error()))
}
} else {
xlog.Error(ctx, "Unexpected error", zap.Error(err))
}
}
}
}()

Expand Down Expand Up @@ -389,7 +414,19 @@ func main() {
xlog.Fatal(ctx, "Unable to connect to src cluster", zap.Error(err))
}
xlog.Debug(ctx, "YDB src opened")
doMain(lockCtx, config, srcDb.TopicClient, dstDb, locker, mon)
err = doMain(lockCtx, config, srcDb.TopicClient, dstDb, locker, mon, errChannel)
var appErr *types.Error
if errors.As(err, &appErr) {
switch appErr.Kind() {
case types.Graceful:
xlog.Info(ctx, "Got error, stopping aardappel....", zap.String("error", appErr.Error()))
cancel()
case types.Fatal:
xlog.Fatal(ctx, "Got error, stopping aardappel....", zap.String("error", appErr.Error()))
}
} else {
xlog.Error(ctx, "Unexpected error", zap.Error(err))
}
cont = false
select {
case _, ok := <-lockChannel:
Expand Down
11 changes: 7 additions & 4 deletions internal/hb_tracker/hb_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (
"aardappel/internal/util/misc"
"aardappel/internal/util/xlog"
"fmt"
"go.uber.org/zap"
"golang.org/x/net/context"
"sync"
"sync/atomic"
"time"

"go.uber.org/zap"
"golang.org/x/net/context"
)

type HeartBeatTracker struct {
Expand Down Expand Up @@ -124,17 +125,19 @@ func (ht *HeartBeatTracker) StartHbGuard(ctx context.Context, timeout uint32, me
go ht.guardLoop(ctx, timeout, metrics)
}

func (ht *HeartBeatTracker) AddHb(data types.HbData) error {
func (ht *HeartBeatTracker) AddHb(ctx context.Context, data types.HbData) error {
ht.lock.Lock()
defer ht.lock.Unlock()
hb, ok := ht.streams[data.StreamId]
if ok {
if types.NewPosition(hb).LessThan(*types.NewPosition(data)) {
// Got new heartbeat for stream - we can commit previous one
err := hb.CommitTopic()

if err != nil {
return fmt.Errorf("unable to commit topic during update HB %w, stepId: %d, txId: %d", err,
errMsg := fmt.Sprintf("AddHb: unable to commit topic during update HB %v, stepId: %d, txId: %d", err,
hb.Step, hb.TxId)
return types.ReturnError(ctx, err, errMsg)
}
hb = data
}
Expand Down
Loading
Loading