From 39a35edde2c895cc49aa2887dfa6fb6281db1e6e Mon Sep 17 00:00:00 2001 From: Andrey Serebryanskiy Date: Thu, 24 Apr 2025 10:10:35 +0000 Subject: [PATCH 1/2] remove kafka from actors names --- .../actors/kafka_transaction_actor.cpp | 52 +++++++++---------- .../actors/kafka_transaction_actor.h | 8 +-- .../kafka_transactions_coordinator.cpp | 36 ++++++------- .../kafka_transactions_coordinator.h | 12 ++--- .../kafka_proxy/ut/ut_transaction_actor.cpp | 4 +- .../ut/ut_transaction_coordinator.cpp | 4 +- 6 files changed, 58 insertions(+), 58 deletions(-) diff --git a/ydb/core/kafka_proxy/actors/kafka_transaction_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_transaction_actor.cpp index 758874c63d97..de6fad37a249 100644 --- a/ydb/core/kafka_proxy/actors/kafka_transaction_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_transaction_actor.cpp @@ -21,7 +21,7 @@ if (!ProducerInRequestIsValid(ev->Get()->Request)) { \ namespace NKafka { - void TKafkaTransactionActor::Handle(TEvKafka::TEvAddPartitionsToTxnRequest::TPtr& ev, const TActorContext&){ + void TTransactionActor::Handle(TEvKafka::TEvAddPartitionsToTxnRequest::TPtr& ev, const TActorContext&){ KAFKA_LOG_D("Receieved ADD_PARTITIONS_TO_TXN request"); VALIDATE_PRODUCER_IN_REQUEST(TAddPartitionsToTxnResponseData); @@ -36,13 +36,13 @@ namespace NKafka { // Method does nothing. In Kafka it will add __consumer_offsets topic to transaction, but // in YDB Topics we store offsets in table and do not need this extra action. // Thus we can just ignore this request. - void TKafkaTransactionActor::Handle(TEvKafka::TEvAddOffsetsToTxnRequest::TPtr& ev, const TActorContext&) { + void TTransactionActor::Handle(TEvKafka::TEvAddOffsetsToTxnRequest::TPtr& ev, const TActorContext&) { KAFKA_LOG_D("Receieved ADD_OFFSETS_TO_TXN request"); VALIDATE_PRODUCER_IN_REQUEST(TAddOffsetsToTxnResponseData); SendOkResponse(ev); } - void TKafkaTransactionActor::Handle(TEvKafka::TEvTxnOffsetCommitRequest::TPtr& ev, const TActorContext&) { + void TTransactionActor::Handle(TEvKafka::TEvTxnOffsetCommitRequest::TPtr& ev, const TActorContext&) { KAFKA_LOG_D("Receieved TXN_OFFSET_COMMIT request"); VALIDATE_PRODUCER_IN_REQUEST(TTxnOffsetCommitResponseData); @@ -78,7 +78,7 @@ namespace NKafka { 6. Handle(NKqp::TEvKqp::TEvQueryResponse): If everything committed successfully, return OK to the client 7. Close KQP session, send to coordinator TEvTransactionActorDied, die. */ - void TKafkaTransactionActor::Handle(TEvKafka::TEvEndTxnRequest::TPtr& ev, const TActorContext& ctx) { + void TTransactionActor::Handle(TEvKafka::TEvEndTxnRequest::TPtr& ev, const TActorContext& ctx) { KAFKA_LOG_D("Receieved END_TXN request"); VALIDATE_PRODUCER_IN_REQUEST(TEndTxnResponseData); @@ -94,11 +94,11 @@ namespace NKafka { } } - void TKafkaTransactionActor::Handle(TEvents::TEvPoison::TPtr&, const TActorContext& ctx) { + void TTransactionActor::Handle(TEvents::TEvPoison::TPtr&, const TActorContext& ctx) { Die(ctx); } - void TKafkaTransactionActor::Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx) { + void TTransactionActor::Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx) { KAFKA_LOG_D(TStringBuilder() << "KQP session created"); if (!Kqp->HandleCreateSessionResponse(ev, ctx)) { SendFailResponse(EndTxnRequestPtr, EKafkaErrors::BROKER_NOT_AVAILABLE, "Failed to create KQP session"); @@ -109,7 +109,7 @@ namespace NKafka { SendToKqpValidationRequests(ctx); } - void TKafkaTransactionActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { + void TTransactionActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { KAFKA_LOG_D(TStringBuilder() << "Receieved query response from KQP for " << GetAsStr(LastSentToKqpRequest) << " request"); if (auto error = GetErrorFromYdbResponse(ev)) { KAFKA_LOG_W(error); @@ -130,13 +130,13 @@ namespace NKafka { } } - void TKafkaTransactionActor::StartKqpSession(const TActorContext& ctx) { + void TTransactionActor::StartKqpSession(const TActorContext& ctx) { Kqp = std::make_unique(DatabasePath); KAFKA_LOG_D(TStringBuilder() << "Sending create session request to KQP for database " << DatabasePath); Kqp->SendCreateSessionRequest(ctx, KqpActorId); } - void TKafkaTransactionActor::SendToKqpValidationRequests(const TActorContext& ctx) { + void TTransactionActor::SendToKqpValidationRequests(const TActorContext& ctx) { KAFKA_LOG_D(TStringBuilder() << "Sending select request to KQP for database " << DatabasePath); Kqp->SendYqlRequest( GetYqlWithTablesNames(NKafkaTransactionSql::SELECT_FOR_VALIDATION), @@ -150,7 +150,7 @@ namespace NKafka { LastSentToKqpRequest = EKafkaTxnKqpRequests::SELECT; } - void TKafkaTransactionActor::SendCommitTxnRequest(const TString& kqpTransactionId) { + void TTransactionActor::SendCommitTxnRequest(const TString& kqpTransactionId) { auto request = BuildCommitTxnRequestToKqp(kqpTransactionId); Send(KqpActorId, request.Release(), 0, ++KqpCookie); @@ -160,7 +160,7 @@ namespace NKafka { // Response senders template - void TKafkaTransactionActor::SendFailResponse(TAutoPtr>& evHandle, EKafkaErrors errorCode, const TString& errorMessage) { + void TTransactionActor::SendFailResponse(TAutoPtr>& evHandle, EKafkaErrors errorCode, const TString& errorMessage) { if (errorMessage) { KAFKA_LOG_W(TStringBuilder() << "Sending fail response with error code: " << errorCode << ". Reason: " << errorMessage); } else { @@ -172,7 +172,7 @@ namespace NKafka { } template - void TKafkaTransactionActor::SendOkResponse(TAutoPtr>& evHandle) { + void TTransactionActor::SendOkResponse(TAutoPtr>& evHandle) { auto& kafkaRequest = evHandle->Get()->Request; KAFKA_LOG_D(TStringBuilder() << "Sending OK response to " << evHandle->Get()->ConnectionId << " with correlationId " << evHandle->Get()->CorrelationId << " and transactionalId " << TransactionalId); std::shared_ptr response = NKafkaTransactions::BuildResponse(kafkaRequest, EKafkaErrors::NONE_ERROR); @@ -180,7 +180,7 @@ namespace NKafka { } // helper methods - void TKafkaTransactionActor::Die(const TActorContext &ctx) { + void TTransactionActor::Die(const TActorContext &ctx) { KAFKA_LOG_D("Dying."); if (Kqp) { Kqp->CloseKqpSession(ctx); @@ -190,17 +190,17 @@ namespace NKafka { } template - bool TKafkaTransactionActor::ProducerInRequestIsValid(TMessagePtr kafkaRequest) { + bool TTransactionActor::ProducerInRequestIsValid(TMessagePtr kafkaRequest) { return kafkaRequest->TransactionalId->c_str() == TransactionalId && kafkaRequest->ProducerId == ProducerInstanceId.Id && kafkaRequest->ProducerEpoch == ProducerInstanceId.Epoch; } - TString TKafkaTransactionActor::GetFullTopicPath(const TString& topicName) { + TString TTransactionActor::GetFullTopicPath(const TString& topicName) { return NPersQueue::GetFullTopicPath(DatabasePath, topicName); } - TString TKafkaTransactionActor::GetYqlWithTablesNames(const TString& templateStr) { + TString TTransactionActor::GetYqlWithTablesNames(const TString& templateStr) { TString templateWithProducerStateTable = std::regex_replace( templateStr.c_str(), std::regex(""), @@ -215,7 +215,7 @@ namespace NKafka { return templateWithConsumerStateTable; } - NYdb::TParams TKafkaTransactionActor::BuildSelectParams() { + NYdb::TParams TTransactionActor::BuildSelectParams() { NYdb::TParamsBuilder params; params.AddParam("$Database").Utf8(DatabasePath).Build(); params.AddParam("$TransactionalId").Utf8(TransactionalId).Build(); @@ -236,7 +236,7 @@ namespace NKafka { return params.Build(); } - THolder TKafkaTransactionActor::BuildCommitTxnRequestToKqp(const TString& kqpTransactionId) { + THolder TTransactionActor::BuildCommitTxnRequestToKqp(const TString& kqpTransactionId) { auto ev = MakeHolder(); ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_UNDEFINED); @@ -272,7 +272,7 @@ namespace NKafka { return ev; } - void TKafkaTransactionActor::HandleSelectResponse(const NKqp::TEvKqp::TEvQueryResponse& response, const TActorContext& ctx) { + void TTransactionActor::HandleSelectResponse(const NKqp::TEvKqp::TEvQueryResponse& response, const TActorContext& ctx) { // YDB should return exactly two result sets for two queries: to producer and consumer state tables if (response.Record.GetResponse().GetYdbResults().size() != 2) { TString error = TStringBuilder() << "KQP returned wrong number of result sets on SELECT query. Expected 2, got " << response.Record.GetResponse().GetYdbResults().size() << "."; @@ -315,13 +315,13 @@ namespace NKafka { SendCommitTxnRequest(kqpTxnId); } - void TKafkaTransactionActor::HandleCommitResponse(const TActorContext& ctx) { + void TTransactionActor::HandleCommitResponse(const TActorContext& ctx) { KAFKA_LOG_D("Successfully committed transaction. Sending ok and dying."); SendOkResponse(EndTxnRequestPtr); Die(ctx); } - TMaybe TKafkaTransactionActor::GetErrorFromYdbResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) { + TMaybe TTransactionActor::GetErrorFromYdbResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) { TStringBuilder builder = TStringBuilder() << "Recieved error on request to KQP. Last sent request: " << GetAsStr(LastSentToKqpRequest) << ". Reason: "; if (ev->Cookie != KqpCookie) { return builder << "Unexpected cookie in TEvQueryResponse. Expected KQP Cookie: " << KqpCookie << ", Actual: " << ev->Cookie << "."; @@ -332,7 +332,7 @@ namespace NKafka { } } - TMaybe TKafkaTransactionActor::ParseProducerState(const NKqp::TEvKqp::TEvQueryResponse& response) { + TMaybe TTransactionActor::ParseProducerState(const NKqp::TEvKqp::TEvQueryResponse& response) { auto& resp = response.Record.GetResponse(); NYdb::TResultSetParser parser(resp.GetYdbResults(NKafkaTransactionSql::PRODUCER_STATE_REQUEST_INDEX)); @@ -357,7 +357,7 @@ namespace NKafka { } } - TMaybe TKafkaTransactionActor::GetErrorInProducerState(const TMaybe& producerState) { + TMaybe TTransactionActor::GetErrorInProducerState(const TMaybe& producerState) { if (!producerState) { return "No producer state found. May be it has expired"; } else if (producerState->TransactionalId != TransactionalId || producerState->ProducerId != ProducerInstanceId.Id || producerState->ProducerEpoch != ProducerInstanceId.Epoch) { @@ -376,7 +376,7 @@ namespace NKafka { * @param response The response object containing the result set from the YDB query. * @return A map where keys are consumer group names and values are their corresponding generations. */ - std::unordered_map TKafkaTransactionActor::ParseConsumersGenerations(const NKqp::TEvKqp::TEvQueryResponse& response) { + std::unordered_map TTransactionActor::ParseConsumersGenerations(const NKqp::TEvKqp::TEvQueryResponse& response) { std::unordered_map generationByConsumerName; NYdb::TResultSetParser parser(response.Record.GetResponse().GetYdbResults(NKafkaTransactionSql::CONSUMER_STATES_REQUEST_INDEX)); @@ -389,7 +389,7 @@ namespace NKafka { return generationByConsumerName; } - TMaybe TKafkaTransactionActor::GetErrorInConsumersStates(const std::unordered_map& consumerGenerationByName) { + TMaybe TTransactionActor::GetErrorInConsumersStates(const std::unordered_map& consumerGenerationByName) { TStringBuilder builder; bool foundError = false; for (auto& [topicPartition, offsetCommit] : OffsetsToCommit) { @@ -421,7 +421,7 @@ namespace NKafka { } } - TString TKafkaTransactionActor::GetAsStr(EKafkaTxnKqpRequests request) { + TString TTransactionActor::GetAsStr(EKafkaTxnKqpRequests request) { switch (request) { case SELECT: return "SELECT"; diff --git a/ydb/core/kafka_proxy/actors/kafka_transaction_actor.h b/ydb/core/kafka_proxy/actors/kafka_transaction_actor.h index 1a53d83c1410..9b91be3c139a 100644 --- a/ydb/core/kafka_proxy/actors/kafka_transaction_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_transaction_actor.h @@ -11,9 +11,9 @@ namespace NKafka { It accumulates transaction state (partitions in tx, offsets) and on commit submits transaction to KQP */ - class TKafkaTransactionActor : public NActors::TActor { + class TTransactionActor : public NActors::TActor { - using TBase = NActors::TActor; + using TBase = NActors::TActor; public: struct TTopicPartition { @@ -48,8 +48,8 @@ namespace NKafka { }; // we need to exlplicitly specify kqpActorId and txnCoordinatorActorId for unit tests - TKafkaTransactionActor(const TString& transactionalId, i64 producerId, i16 producerEpoch, const TString& DatabasePath, const TActorId& kqpActorId, const TActorId& txnCoordinatorActorId) : - TActor(&TKafkaTransactionActor::StateFunc), + TTransactionActor(const TString& transactionalId, i64 producerId, i16 producerEpoch, const TString& DatabasePath, const TActorId& kqpActorId, const TActorId& txnCoordinatorActorId) : + TActor(&TTransactionActor::StateFunc), TransactionalId(transactionalId), ProducerInstanceId({producerId, producerEpoch}), DatabasePath(DatabasePath), diff --git a/ydb/core/kafka_proxy/kafka_transactions_coordinator.cpp b/ydb/core/kafka_proxy/kafka_transactions_coordinator.cpp index 03783e6f5dad..9e8a65d2dd0b 100644 --- a/ydb/core/kafka_proxy/kafka_transactions_coordinator.cpp +++ b/ydb/core/kafka_proxy/kafka_transactions_coordinator.cpp @@ -7,7 +7,7 @@ namespace NKafka { // Handles new transactional_id+producer_id+producer_epoch: // 1. validates that producer is not a zombie (in case of parallel init_producer_requests) // 2. saves transactional_id+producer_id+producer_epoch for validation of future transactional requests - void TKafkaTransactionsCoordinator::Handle(TEvKafka::TEvSaveTxnProducerRequest::TPtr& ev, const TActorContext& ctx){ + void TTransactionsCoordinator::Handle(TEvKafka::TEvSaveTxnProducerRequest::TPtr& ev, const TActorContext& ctx){ TEvKafka::TEvSaveTxnProducerRequest* request = ev->Get(); auto it = ProducersByTransactionalId.find(request->TransactionalId); @@ -29,23 +29,23 @@ namespace NKafka { ctx.Send(ev->Sender, new TEvKafka::TEvSaveTxnProducerResponse(TEvKafka::TEvSaveTxnProducerResponse::EStatus::OK, "")); }; - void TKafkaTransactionsCoordinator::Handle(TEvKafka::TEvAddPartitionsToTxnRequest::TPtr& ev, const TActorContext& ctx){ + void TTransactionsCoordinator::Handle(TEvKafka::TEvAddPartitionsToTxnRequest::TPtr& ev, const TActorContext& ctx){ HandleTransactionalRequest(ev, ctx); }; - void TKafkaTransactionsCoordinator::Handle(TEvKafka::TEvAddOffsetsToTxnRequest::TPtr& ev, const TActorContext& ctx){ + void TTransactionsCoordinator::Handle(TEvKafka::TEvAddOffsetsToTxnRequest::TPtr& ev, const TActorContext& ctx){ HandleTransactionalRequest(ev, ctx); }; - void TKafkaTransactionsCoordinator::Handle(TEvKafka::TEvTxnOffsetCommitRequest::TPtr& ev, const TActorContext& ctx) { + void TTransactionsCoordinator::Handle(TEvKafka::TEvTxnOffsetCommitRequest::TPtr& ev, const TActorContext& ctx) { HandleTransactionalRequest(ev, ctx); }; - void TKafkaTransactionsCoordinator::Handle(TEvKafka::TEvEndTxnRequest::TPtr& ev, const TActorContext& ctx) { + void TTransactionsCoordinator::Handle(TEvKafka::TEvEndTxnRequest::TPtr& ev, const TActorContext& ctx) { HandleTransactionalRequest(ev, ctx); }; - void TKafkaTransactionsCoordinator::Handle(TEvKafka::TEvTransactionActorDied::TPtr& ev, const TActorContext&) { + void TTransactionsCoordinator::Handle(TEvKafka::TEvTransactionActorDied::TPtr& ev, const TActorContext&) { auto it = ProducersByTransactionalId.find(ev->Get()->TransactionalId); const TEvKafka::TProducerInstanceId& deadActorProducerState = ev->Get()->ProducerState; @@ -62,7 +62,7 @@ namespace NKafka { } }; - void TKafkaTransactionsCoordinator::Handle(TEvents::TEvPoison::TPtr&, const TActorContext& ctx) { + void TTransactionsCoordinator::Handle(TEvents::TEvPoison::TPtr&, const TActorContext& ctx) { KAFKA_LOG_D("Got poison pill, killing all transaction actors"); for (auto& [transactionalId, txnActorId]: TxnActorByTransactionalId) { ctx.Send(txnActorId, new TEvents::TEvPoison()); @@ -71,16 +71,16 @@ namespace NKafka { PassAway(); }; - void TKafkaTransactionsCoordinator::PassAway() { + void TTransactionsCoordinator::PassAway() { KAFKA_LOG_D("Killing myself"); TBase::PassAway(); }; // Validates producer's id and epoch - // If valid: proxies requests to the relevant TKafkaTransactionActor + // If valid: proxies requests to the relevant TTransactionActor // If outdated or not initialized: returns PRODUCER_FENCED error template - void TKafkaTransactionsCoordinator::HandleTransactionalRequest(TAutoPtr>& evHandle, const TActorContext& ctx) { + void TTransactionsCoordinator::HandleTransactionalRequest(TAutoPtr>& evHandle, const TActorContext& ctx) { EventType* ev = evHandle->Get(); KAFKA_LOG_D(TStringBuilder() << "Receieved message for transactionalId " << ev->Request->TransactionalId->c_str() << " and ApiKey " << ev->Request->ApiKey()); @@ -99,34 +99,34 @@ namespace NKafka { }; template - void TKafkaTransactionsCoordinator::SendProducerFencedResponse(TMessagePtr kafkaRequest, const TString& error, const TTransactionalRequest& txnRequestDetails) { + void TTransactionsCoordinator::SendProducerFencedResponse(TMessagePtr kafkaRequest, const TString& error, const TTransactionalRequest& txnRequestDetails) { KAFKA_LOG_W(error); std::shared_ptr response = NKafkaTransactions::BuildResponse(kafkaRequest, EKafkaErrors::PRODUCER_FENCED); Send(txnRequestDetails.ConnectionId, new TEvKafka::TEvResponse(txnRequestDetails.CorrelationId, response, EKafkaErrors::PRODUCER_FENCED)); }; template - void TKafkaTransactionsCoordinator::ForwardToTransactionActor(TAutoPtr>& evHandle, const TActorContext& ctx) { + void TTransactionsCoordinator::ForwardToTransactionActor(TAutoPtr>& evHandle, const TActorContext& ctx) { EventType* ev = evHandle->Get(); TActorId txnActorId; if (TxnActorByTransactionalId.contains(ev->Request->TransactionalId->c_str())) { txnActorId = TxnActorByTransactionalId[ev->Request->TransactionalId->c_str()]; } else { - txnActorId = ctx.Register(new TKafkaTransactionActor(ev->Request->TransactionalId->c_str(), ev->Request->ProducerId, ev->Request->ProducerEpoch, ev->DatabasePath, NKikimr::NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ctx.SelfID)); + txnActorId = ctx.Register(new TTransactionActor(ev->Request->TransactionalId->c_str(), ev->Request->ProducerId, ev->Request->ProducerEpoch, ev->DatabasePath, NKikimr::NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ctx.SelfID)); TxnActorByTransactionalId[ev->Request->TransactionalId->c_str()] = txnActorId; - KAFKA_LOG_D(TStringBuilder() << "Registered TKafkaTransactionActor with id " << txnActorId << " for transactionalId " << ev->Request->TransactionalId->c_str() << " and ApiKey " << ev->Request->ApiKey()); + KAFKA_LOG_D(TStringBuilder() << "Registered TTransactionActor with id " << txnActorId << " for transactionalId " << ev->Request->TransactionalId->c_str() << " and ApiKey " << ev->Request->ApiKey()); } TAutoPtr tmpPtr = evHandle.Release(); ctx.Forward(tmpPtr, txnActorId); - KAFKA_LOG_D(TStringBuilder() << "Forwarded message to TKafkaTransactionActor with id " << txnActorId << " for transactionalId " << ev->Request->TransactionalId->c_str() << " and ApiKey " << ev->Request->ApiKey()); + KAFKA_LOG_D(TStringBuilder() << "Forwarded message to TTransactionActor with id " << txnActorId << " for transactionalId " << ev->Request->TransactionalId->c_str() << " and ApiKey " << ev->Request->ApiKey()); }; - bool TKafkaTransactionsCoordinator::NewProducerStateIsOutdated(const TEvKafka::TProducerInstanceId& currentProducerState, const TEvKafka::TProducerInstanceId& newProducerState) { + bool TTransactionsCoordinator::NewProducerStateIsOutdated(const TEvKafka::TProducerInstanceId& currentProducerState, const TEvKafka::TProducerInstanceId& newProducerState) { return currentProducerState > newProducerState; }; - TMaybe TKafkaTransactionsCoordinator::GetTxnRequestError(const TTransactionalRequest& request) { + TMaybe TTransactionsCoordinator::GetTxnRequestError(const TTransactionalRequest& request) { auto it = ProducersByTransactionalId.find(request.TransactionalId); if (it == ProducersByTransactionalId.end()) { @@ -138,7 +138,7 @@ namespace NKafka { } }; - TString TKafkaTransactionsCoordinator::GetProducerIsOutdatedError(const TString& transactionalId, const TEvKafka::TProducerInstanceId& currentProducerState, const TEvKafka::TProducerInstanceId& newProducerState) { + TString TTransactionsCoordinator::GetProducerIsOutdatedError(const TString& transactionalId, const TEvKafka::TProducerInstanceId& currentProducerState, const TEvKafka::TProducerInstanceId& newProducerState) { return TStringBuilder() << "Producer with transactional id " << transactionalId << "is outdated. Current producer id is " << currentProducerState.Id << " and producer epoch is " << currentProducerState.Epoch << ". Requested producer id is " << newProducerState.Id << diff --git a/ydb/core/kafka_proxy/kafka_transactions_coordinator.h b/ydb/core/kafka_proxy/kafka_transactions_coordinator.h index 532a123556e1..55ddc13b8659 100644 --- a/ydb/core/kafka_proxy/kafka_transactions_coordinator.h +++ b/ydb/core/kafka_proxy/kafka_transactions_coordinator.h @@ -7,15 +7,15 @@ namespace NKafka { /* - This class serves as a proxy between Kafka SDK and TKafkaTransactionActor + This class serves as a proxy between Kafka SDK and TTransactionActor It validates that requester is not a zombie (by checking request's tranasactional_id+producer_id+producer_epoch) It does so by maintaining a set of the most relevant for this node tranasactional_id+producer_id+producer_epoch. Recieves updates from init_producer_id_actors. */ - class TKafkaTransactionsCoordinator : public NActors::TActorBootstrapped { + class TTransactionsCoordinator : public NActors::TActorBootstrapped { - using TBase = NActors::TActorBootstrapped; + using TBase = NActors::TActorBootstrapped; struct TTransactionalRequest { TString TransactionalId; @@ -26,7 +26,7 @@ namespace NKafka { public: void Bootstrap(const TActorContext&) { - TBase::Become(&TKafkaTransactionsCoordinator::StateWork); + TBase::Become(&TTransactionsCoordinator::StateWork); } TStringBuilder LogPrefix() const { @@ -50,7 +50,7 @@ namespace NKafka { // Handles new transactional_id+producer_id+producer_epoch: saves for validation of future requests void Handle(TEvKafka::TEvSaveTxnProducerRequest::TPtr& ev, const TActorContext& ctx); - // Proxies requests to the relevant TKafkaTransactionActor + // Proxies requests to the relevant TTransactionActor void Handle(TEvKafka::TEvAddPartitionsToTxnRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvKafka::TEvAddOffsetsToTxnRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvKafka::TEvTxnOffsetCommitRequest::TPtr& ev, const TActorContext& ctx); @@ -77,7 +77,7 @@ namespace NKafka { }; inline NActors::IActor* CreateKafkaTransactionsCoordinator() { - return new TKafkaTransactionsCoordinator(); + return new TTransactionsCoordinator(); }; inline TActorId MakeKafkaTransactionsServiceID() { diff --git a/ydb/core/kafka_proxy/ut/ut_transaction_actor.cpp b/ydb/core/kafka_proxy/ut/ut_transaction_actor.cpp index 64666a3af73a..4ee7760e9bf4 100644 --- a/ydb/core/kafka_proxy/ut/ut_transaction_actor.cpp +++ b/ydb/core/kafka_proxy/ut/ut_transaction_actor.cpp @@ -210,7 +210,7 @@ namespace { Ctx->Runtime->SetLogPriority(NKikimrServices::KAFKA_PROXY, NLog::PRI_DEBUG); DummyKqpActor = new TDummyKqpActor(); KqpActorId = Ctx->Runtime->Register(DummyKqpActor); - ActorId = Ctx->Runtime->Register(new TKafkaTransactionActor( + ActorId = Ctx->Runtime->Register(new TTransactionActor( TransactionalId, ProducerId, ProducerEpoch, @@ -325,7 +325,7 @@ namespace { private: void MatchPartitionsInTxn(const NKikimr::NKqp::TEvKqp::TEvQueryRequest* request, const TQueryRequestMatcher& matcher) { auto& partitionsInRequest = request->Record.GetRequest().GetKafkaApiOperations().GetPartitionsInTxn(); - std::unordered_set paritionsInRequestSet; + std::unordered_set paritionsInRequestSet; paritionsInRequestSet.reserve(partitionsInRequest.size()); for (auto& partition : partitionsInRequest) { paritionsInRequestSet.emplace(partition.GetTopicPath(), partition.GetPartitionId()); diff --git a/ydb/core/kafka_proxy/ut/ut_transaction_coordinator.cpp b/ydb/core/kafka_proxy/ut/ut_transaction_coordinator.cpp index 5262312e3c15..ef497d881760 100644 --- a/ydb/core/kafka_proxy/ut/ut_transaction_coordinator.cpp +++ b/ydb/core/kafka_proxy/ut/ut_transaction_coordinator.cpp @@ -23,7 +23,7 @@ namespace { Ctx->Prepare(); Ctx->Runtime->SetScheduledLimit(5'000); Ctx->Runtime->SetLogPriority(NKikimrServices::KAFKA_PROXY, NLog::PRI_DEBUG); - ActorId = Ctx->Runtime->Register(new NKafka::TKafkaTransactionsCoordinator()); + ActorId = Ctx->Runtime->Register(new NKafka::TTransactionsCoordinator()); } void TearDown(NUnitTest::TTestContext&) override { @@ -211,7 +211,7 @@ namespace { if (auto* event = input->CastAsLocal()) { // There will be four events TEvEndTxnRequest. We need only two of them // with recipient not equal to our TKafkaTransactionCoordinatorActor id. - // Those are event sent from TKafkaTransactionCoordinatorActor to TKafkaTransactionActor + // Those are event sent from TKafkaTransactionCoordinatorActor to TTransactionActor if (input->Recipient != ActorId) { if (eventCounter == 0) { txnActorId = input->Recipient; From 261dc888a06fbec5cf70702719c84fa751993674 Mon Sep 17 00:00:00 2001 From: Andrey Serebryanskiy Date: Sat, 26 Apr 2025 15:44:21 +0000 Subject: [PATCH 2/2] add poison pill for txn actor --- .../kafka_transactions_coordinator.cpp | 10 ++++++ .../kafka_transactions_coordinator.h | 1 + .../ut/ut_transaction_coordinator.cpp | 33 +++++++++++++++++++ 3 files changed, 44 insertions(+) diff --git a/ydb/core/kafka_proxy/kafka_transactions_coordinator.cpp b/ydb/core/kafka_proxy/kafka_transactions_coordinator.cpp index 9e8a65d2dd0b..268d74ff62b7 100644 --- a/ydb/core/kafka_proxy/kafka_transactions_coordinator.cpp +++ b/ydb/core/kafka_proxy/kafka_transactions_coordinator.cpp @@ -22,6 +22,7 @@ namespace NKafka { } currentProducerState = std::move(newProducerState); + DeleteTransactionActor(request->TransactionalId); } else { ProducersByTransactionalId.emplace(request->TransactionalId, request->ProducerState); } @@ -122,6 +123,15 @@ namespace NKafka { KAFKA_LOG_D(TStringBuilder() << "Forwarded message to TTransactionActor with id " << txnActorId << " for transactionalId " << ev->Request->TransactionalId->c_str() << " and ApiKey " << ev->Request->ApiKey()); }; + void TTransactionsCoordinator::DeleteTransactionActor(const TString& transactionalId) { + auto it = TxnActorByTransactionalId.find(transactionalId); + if (it != TxnActorByTransactionalId.end()) { + Send(it->second, new TEvents::TEvPoison()); + TxnActorByTransactionalId.erase(it); + } + // we ignore case when there is no actor, cause it means that no actor was ever created for this transactionalId + } + bool TTransactionsCoordinator::NewProducerStateIsOutdated(const TEvKafka::TProducerInstanceId& currentProducerState, const TEvKafka::TProducerInstanceId& newProducerState) { return currentProducerState > newProducerState; }; diff --git a/ydb/core/kafka_proxy/kafka_transactions_coordinator.h b/ydb/core/kafka_proxy/kafka_transactions_coordinator.h index 55ddc13b8659..9695a3f15c01 100644 --- a/ydb/core/kafka_proxy/kafka_transactions_coordinator.h +++ b/ydb/core/kafka_proxy/kafka_transactions_coordinator.h @@ -68,6 +68,7 @@ namespace NKafka { template void ForwardToTransactionActor(TAutoPtr>& evHandle, const TActorContext& ctx); + void DeleteTransactionActor(const TString& transactionalId); bool NewProducerStateIsOutdated(const TEvKafka::TProducerInstanceId& currentProducerState, const TEvKafka::TProducerInstanceId& newProducerState); TMaybe GetTxnRequestError(const TTransactionalRequest& request); TString GetProducerIsOutdatedError(const TString& transactionalId, const TEvKafka::TProducerInstanceId& currentProducerState, const TEvKafka::TProducerInstanceId& newProducerState); diff --git a/ydb/core/kafka_proxy/ut/ut_transaction_coordinator.cpp b/ydb/core/kafka_proxy/ut/ut_transaction_coordinator.cpp index ef497d881760..1463cfcc62b2 100644 --- a/ydb/core/kafka_proxy/ut/ut_transaction_coordinator.cpp +++ b/ydb/core/kafka_proxy/ut/ut_transaction_coordinator.cpp @@ -237,6 +237,39 @@ namespace { UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options)); } + Y_UNIT_TEST(OnSecondInitProducerId_ShouldSendPoisonPillToTxnActor) { + // send valid message + ui64 correlationId = 123; + TString txnId = "my-tx-id"; + i64 producerId = 1; + i16 producerEpoch = 0; + SaveTxnProducer(txnId, producerId, producerEpoch); + bool seenEvent = false; + TActorId txnActorId; + auto observer = [&](TAutoPtr& input) { + if (auto* event = input->CastAsLocal()) { + txnActorId = input->Recipient; + } else if (auto* event = input->CastAsLocal()) { + UNIT_ASSERT_VALUES_EQUAL(txnActorId, input->Recipient); + seenEvent = true; + } + + return TTestActorRuntimeBase::EEventAction::PROCESS; + }; + Ctx->Runtime->SetObserverFunc(observer); + + // first request registers actor + SendEndTxnRequest(correlationId, txnId, producerId, producerEpoch); + // request to save producer with newer epoch should trigger poison pill to current txn actor + SaveTxnProducer(txnId, producerId, producerEpoch + 1); + + TDispatchOptions options; + options.CustomFinalCondition = [&seenEvent]() { + return seenEvent; + }; + UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options)); + } + Y_UNIT_TEST(OnAddPartitions_ShouldSendBack_PRODUCER_FENCED_ErrorIfProducerIsNotInitialized) { ui64 correlationId = 123; SendAddPartitionsToTxnRequest(correlationId, "my-tx-id", 1, 0, {{"topic1", {0, 1}}, {"topic2", {0}}});