Skip to content

Conversation

@VPolka
Copy link
Collaborator

@VPolka VPolka commented Oct 9, 2025

No description provided.

@VPolka VPolka changed the title Add graceful shutdown for topic EXT-1396: Add graceful shutdown for topic Oct 16, 2025
@VPolka VPolka force-pushed the add-graceful-shutdown-for-topic branch from c78f445 to 64d5bde Compare October 16, 2025 17:25
@VPolka VPolka force-pushed the add-graceful-shutdown-for-topic branch from 64d5bde to f60a06b Compare October 16, 2025 17:30

func DoReplication(ctx context.Context, prc *processor.Processor, dstTables []*dst_table.DstTable,
lockExecutor func(fn func(context.Context, table.Session, table.Transaction) error) error, mon pmon.Metrics) {
lockExecutor func(fn func(context.Context, table.Session, table.Transaction) error) error, mon pmon.Metrics) error {
Copy link
Collaborator Author

@VPolka VPolka Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Если ошибка в топике случилась в main горутине (в которой процессор работает), то ошибка отправляется в канал, который мониторится и пытается завершить приложение (как при получении сигнала), и постепенно начинаю выходить из функций возвращая эту ошибку

В случае read topic горутины - она просто завершается и ничего не делается, пока ошибка в канале не обработается и не начнется завершаться приложение. При этом другие горутины не трогаю, они могут продолжать работать, завершаться после обработки ошибки из канала

}
xlog.Fatal(ctx, "Unable to perform replication without error", zap.Error(err))
errMsg := fmt.Sprintf("Unable to perform replication without error")
return types.ReturnError(ctx, err, errMsg)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Специальная функция для возвращения ошибки - если ошибка gracefull, то она возврашается, если нет, что делается xlog.Fatal
То есть до конца будет возвращаться graceful ошбика и когда завершится функция doMain, то там просто отеняется контекст, отпустится лок и приложение заверщается как будто graceful
До конца доходит, если из канала не успела ошибка обработаться по каким то причинам

DoReplication(ctx, prc, dstTables, lockExecutor, mon)
err := DoReplication(ctx, prc, dstTables, lockExecutor, mon)
if err != nil {
errMsg := fmt.Sprintf("doMain")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Такая штука добавляется в начале ошибки, то есть в итоге будет текст ошибки такой
"doMain: text of error"

case sig := <-signalChannel:
xlog.Info(ctx, "Got OS signal, stopping aardappel....", zap.String("signal name", sig.String()))
cancel()
case err := <-errChannel:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

вот тут как раз и пытается обработаться ошибка из канала

xlog.Debug(ctx, "YDB src opened")
doMain(lockCtx, config, srcDb.TopicClient, dstDb, locker, mon)
err = doMain(lockCtx, config, srcDb.TopicClient, dstDb, locker, mon, errChannel)
xlog.Info(ctx, "Finished with error, stopping aardappel....", zap.String("error", err.Error()))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Сюда попадаем, если do main завершился, он авершился, если случилась проблема с топиками и мы начали возвращаться graceful ошибки
а так doMain в бесконечном цикле

zap.Int64("offset", offset))

resp.StartFrom(offset)
err := types.NewGraceful(fmt.Sprintf("Read session has been closed: topic# %s, partitionId# %d", req.Topic, req.PartitionID))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Если эта функция вызывается в принципе - значит приложение стартануло или рестартанула сессия чтения
Если при этом у нас в памяти есть какой-то lastPosition - то значит, что приложение уже работало, значит сессия рестартанула, значит завершаемся graceful

}
rv := verifyStream(msg.PartitionID(), data)
data.CommitTopic = func() error {
if msg.Context().Err() != nil {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Значит что был рестарт сессии чтения, больше сособщение закомитить не можем, отсюда тоже кидаем ошибку на graceful shutdown, далее начинаем выходить аккуратненько из функций с такой же ошибкой

@VPolka VPolka merged commit aba2ed9 into main Oct 28, 2025
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants