Skip to content

[Kafka API] Add poison pill from transaction coordinator to transaction actor #17764

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 26 additions & 26 deletions ydb/core/kafka_proxy/actors/kafka_transaction_actor.cpp
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@ if (!ProducerInRequestIsValid(ev->Get()->Request)) { \

namespace NKafka {

void TKafkaTransactionActor::Handle(TEvKafka::TEvAddPartitionsToTxnRequest::TPtr& ev, const TActorContext&){
void TTransactionActor::Handle(TEvKafka::TEvAddPartitionsToTxnRequest::TPtr& ev, const TActorContext&){
KAFKA_LOG_D("Receieved ADD_PARTITIONS_TO_TXN request");
VALIDATE_PRODUCER_IN_REQUEST(TAddPartitionsToTxnResponseData);

@@ -36,13 +36,13 @@ namespace NKafka {
// Method does nothing. In Kafka it will add __consumer_offsets topic to transaction, but
// in YDB Topics we store offsets in table and do not need this extra action.
// Thus we can just ignore this request.
void TKafkaTransactionActor::Handle(TEvKafka::TEvAddOffsetsToTxnRequest::TPtr& ev, const TActorContext&) {
void TTransactionActor::Handle(TEvKafka::TEvAddOffsetsToTxnRequest::TPtr& ev, const TActorContext&) {
KAFKA_LOG_D("Receieved ADD_OFFSETS_TO_TXN request");
VALIDATE_PRODUCER_IN_REQUEST(TAddOffsetsToTxnResponseData);
SendOkResponse<TAddOffsetsToTxnResponseData>(ev);
}

void TKafkaTransactionActor::Handle(TEvKafka::TEvTxnOffsetCommitRequest::TPtr& ev, const TActorContext&) {
void TTransactionActor::Handle(TEvKafka::TEvTxnOffsetCommitRequest::TPtr& ev, const TActorContext&) {
KAFKA_LOG_D("Receieved TXN_OFFSET_COMMIT request");
VALIDATE_PRODUCER_IN_REQUEST(TTxnOffsetCommitResponseData);

@@ -78,7 +78,7 @@ namespace NKafka {
6. Handle(NKqp::TEvKqp::TEvQueryResponse): If everything committed successfully, return OK to the client
7. Close KQP session, send to coordinator TEvTransactionActorDied, die.
*/
void TKafkaTransactionActor::Handle(TEvKafka::TEvEndTxnRequest::TPtr& ev, const TActorContext& ctx) {
void TTransactionActor::Handle(TEvKafka::TEvEndTxnRequest::TPtr& ev, const TActorContext& ctx) {
KAFKA_LOG_D("Receieved END_TXN request");
VALIDATE_PRODUCER_IN_REQUEST(TEndTxnResponseData);

@@ -94,11 +94,11 @@ namespace NKafka {
}
}

void TKafkaTransactionActor::Handle(TEvents::TEvPoison::TPtr&, const TActorContext& ctx) {
void TTransactionActor::Handle(TEvents::TEvPoison::TPtr&, const TActorContext& ctx) {
Die(ctx);
}

void TKafkaTransactionActor::Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx) {
void TTransactionActor::Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx) {
KAFKA_LOG_D(TStringBuilder() << "KQP session created");
if (!Kqp->HandleCreateSessionResponse(ev, ctx)) {
SendFailResponse<TEndTxnResponseData>(EndTxnRequestPtr, EKafkaErrors::BROKER_NOT_AVAILABLE, "Failed to create KQP session");
@@ -109,7 +109,7 @@ namespace NKafka {
SendToKqpValidationRequests(ctx);
}

void TKafkaTransactionActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
void TTransactionActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
KAFKA_LOG_D(TStringBuilder() << "Receieved query response from KQP for " << GetAsStr(LastSentToKqpRequest) << " request");
if (auto error = GetErrorFromYdbResponse(ev)) {
KAFKA_LOG_W(error);
@@ -130,13 +130,13 @@ namespace NKafka {
}
}

void TKafkaTransactionActor::StartKqpSession(const TActorContext& ctx) {
void TTransactionActor::StartKqpSession(const TActorContext& ctx) {
Kqp = std::make_unique<TKqpTxHelper>(DatabasePath);
KAFKA_LOG_D(TStringBuilder() << "Sending create session request to KQP for database " << DatabasePath);
Kqp->SendCreateSessionRequest(ctx, KqpActorId);
}

void TKafkaTransactionActor::SendToKqpValidationRequests(const TActorContext& ctx) {
void TTransactionActor::SendToKqpValidationRequests(const TActorContext& ctx) {
KAFKA_LOG_D(TStringBuilder() << "Sending select request to KQP for database " << DatabasePath);
Kqp->SendYqlRequest(
GetYqlWithTablesNames(NKafkaTransactionSql::SELECT_FOR_VALIDATION),
@@ -150,7 +150,7 @@ namespace NKafka {
LastSentToKqpRequest = EKafkaTxnKqpRequests::SELECT;
}

void TKafkaTransactionActor::SendCommitTxnRequest(const TString& kqpTransactionId) {
void TTransactionActor::SendCommitTxnRequest(const TString& kqpTransactionId) {
auto request = BuildCommitTxnRequestToKqp(kqpTransactionId);

Send(KqpActorId, request.Release(), 0, ++KqpCookie);
@@ -160,7 +160,7 @@ namespace NKafka {

// Response senders
template<class ErrorResponseType, class EventType>
void TKafkaTransactionActor::SendFailResponse(TAutoPtr<TEventHandle<EventType>>& evHandle, EKafkaErrors errorCode, const TString& errorMessage) {
void TTransactionActor::SendFailResponse(TAutoPtr<TEventHandle<EventType>>& evHandle, EKafkaErrors errorCode, const TString& errorMessage) {
if (errorMessage) {
KAFKA_LOG_W(TStringBuilder() << "Sending fail response with error code: " << errorCode << ". Reason: " << errorMessage);
} else {
@@ -172,15 +172,15 @@ namespace NKafka {
}

template<class ResponseType, class EventType>
void TKafkaTransactionActor::SendOkResponse(TAutoPtr<TEventHandle<EventType>>& evHandle) {
void TTransactionActor::SendOkResponse(TAutoPtr<TEventHandle<EventType>>& evHandle) {
auto& kafkaRequest = evHandle->Get()->Request;
KAFKA_LOG_D(TStringBuilder() << "Sending OK response to " << evHandle->Get()->ConnectionId << " with correlationId " << evHandle->Get()->CorrelationId << " and transactionalId " << TransactionalId);
std::shared_ptr<ResponseType> response = NKafkaTransactions::BuildResponse<ResponseType>(kafkaRequest, EKafkaErrors::NONE_ERROR);
Send(evHandle->Get()->ConnectionId, new TEvKafka::TEvResponse(evHandle->Get()->CorrelationId, response, EKafkaErrors::NONE_ERROR));
}

// helper methods
void TKafkaTransactionActor::Die(const TActorContext &ctx) {
void TTransactionActor::Die(const TActorContext &ctx) {
KAFKA_LOG_D("Dying.");
if (Kqp) {
Kqp->CloseKqpSession(ctx);
@@ -190,17 +190,17 @@ namespace NKafka {
}

template<class EventType>
bool TKafkaTransactionActor::ProducerInRequestIsValid(TMessagePtr<EventType> kafkaRequest) {
bool TTransactionActor::ProducerInRequestIsValid(TMessagePtr<EventType> kafkaRequest) {
return kafkaRequest->TransactionalId->c_str() == TransactionalId
&& kafkaRequest->ProducerId == ProducerInstanceId.Id
&& kafkaRequest->ProducerEpoch == ProducerInstanceId.Epoch;
}

TString TKafkaTransactionActor::GetFullTopicPath(const TString& topicName) {
TString TTransactionActor::GetFullTopicPath(const TString& topicName) {
return NPersQueue::GetFullTopicPath(DatabasePath, topicName);
}

TString TKafkaTransactionActor::GetYqlWithTablesNames(const TString& templateStr) {
TString TTransactionActor::GetYqlWithTablesNames(const TString& templateStr) {
TString templateWithProducerStateTable = std::regex_replace(
templateStr.c_str(),
std::regex("<producer_state_table_name>"),
@@ -215,7 +215,7 @@ namespace NKafka {
return templateWithConsumerStateTable;
}

NYdb::TParams TKafkaTransactionActor::BuildSelectParams() {
NYdb::TParams TTransactionActor::BuildSelectParams() {
NYdb::TParamsBuilder params;
params.AddParam("$Database").Utf8(DatabasePath).Build();
params.AddParam("$TransactionalId").Utf8(TransactionalId).Build();
@@ -236,7 +236,7 @@ namespace NKafka {
return params.Build();
}

THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> TKafkaTransactionActor::BuildCommitTxnRequestToKqp(const TString& kqpTransactionId) {
THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> TTransactionActor::BuildCommitTxnRequestToKqp(const TString& kqpTransactionId) {
auto ev = MakeHolder<TEvKqp::TEvQueryRequest>();

ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_UNDEFINED);
@@ -272,7 +272,7 @@ namespace NKafka {
return ev;
}

void TKafkaTransactionActor::HandleSelectResponse(const NKqp::TEvKqp::TEvQueryResponse& response, const TActorContext& ctx) {
void TTransactionActor::HandleSelectResponse(const NKqp::TEvKqp::TEvQueryResponse& response, const TActorContext& ctx) {
// YDB should return exactly two result sets for two queries: to producer and consumer state tables
if (response.Record.GetResponse().GetYdbResults().size() != 2) {
TString error = TStringBuilder() << "KQP returned wrong number of result sets on SELECT query. Expected 2, got " << response.Record.GetResponse().GetYdbResults().size() << ".";
@@ -315,13 +315,13 @@ namespace NKafka {
SendCommitTxnRequest(kqpTxnId);
}

void TKafkaTransactionActor::HandleCommitResponse(const TActorContext& ctx) {
void TTransactionActor::HandleCommitResponse(const TActorContext& ctx) {
KAFKA_LOG_D("Successfully committed transaction. Sending ok and dying.");
SendOkResponse<TEndTxnResponseData>(EndTxnRequestPtr);
Die(ctx);
}

TMaybe<TString> TKafkaTransactionActor::GetErrorFromYdbResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) {
TMaybe<TString> TTransactionActor::GetErrorFromYdbResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) {
TStringBuilder builder = TStringBuilder() << "Recieved error on request to KQP. Last sent request: " << GetAsStr(LastSentToKqpRequest) << ". Reason: ";
if (ev->Cookie != KqpCookie) {
return builder << "Unexpected cookie in TEvQueryResponse. Expected KQP Cookie: " << KqpCookie << ", Actual: " << ev->Cookie << ".";
@@ -332,7 +332,7 @@ namespace NKafka {
}
}

TMaybe<TProducerState> TKafkaTransactionActor::ParseProducerState(const NKqp::TEvKqp::TEvQueryResponse& response) {
TMaybe<TProducerState> TTransactionActor::ParseProducerState(const NKqp::TEvKqp::TEvQueryResponse& response) {
auto& resp = response.Record.GetResponse();

NYdb::TResultSetParser parser(resp.GetYdbResults(NKafkaTransactionSql::PRODUCER_STATE_REQUEST_INDEX));
@@ -357,7 +357,7 @@ namespace NKafka {
}
}

TMaybe<TString> TKafkaTransactionActor::GetErrorInProducerState(const TMaybe<TProducerState>& producerState) {
TMaybe<TString> TTransactionActor::GetErrorInProducerState(const TMaybe<TProducerState>& producerState) {
if (!producerState) {
return "No producer state found. May be it has expired";
} else if (producerState->TransactionalId != TransactionalId || producerState->ProducerId != ProducerInstanceId.Id || producerState->ProducerEpoch != ProducerInstanceId.Epoch) {
@@ -376,7 +376,7 @@ namespace NKafka {
* @param response The response object containing the result set from the YDB query.
* @return A map where keys are consumer group names and values are their corresponding generations.
*/
std::unordered_map<TString, i32> TKafkaTransactionActor::ParseConsumersGenerations(const NKqp::TEvKqp::TEvQueryResponse& response) {
std::unordered_map<TString, i32> TTransactionActor::ParseConsumersGenerations(const NKqp::TEvKqp::TEvQueryResponse& response) {
std::unordered_map<TString, i32> generationByConsumerName;

NYdb::TResultSetParser parser(response.Record.GetResponse().GetYdbResults(NKafkaTransactionSql::CONSUMER_STATES_REQUEST_INDEX));
@@ -389,7 +389,7 @@ namespace NKafka {
return generationByConsumerName;
}

TMaybe<TString> TKafkaTransactionActor::GetErrorInConsumersStates(const std::unordered_map<TString, i32>& consumerGenerationByName) {
TMaybe<TString> TTransactionActor::GetErrorInConsumersStates(const std::unordered_map<TString, i32>& consumerGenerationByName) {
TStringBuilder builder;
bool foundError = false;
for (auto& [topicPartition, offsetCommit] : OffsetsToCommit) {
@@ -421,7 +421,7 @@ namespace NKafka {
}
}

TString TKafkaTransactionActor::GetAsStr(EKafkaTxnKqpRequests request) {
TString TTransactionActor::GetAsStr(EKafkaTxnKqpRequests request) {
switch (request) {
case SELECT:
return "SELECT";
8 changes: 4 additions & 4 deletions ydb/core/kafka_proxy/actors/kafka_transaction_actor.h
Original file line number Diff line number Diff line change
@@ -11,9 +11,9 @@ namespace NKafka {

It accumulates transaction state (partitions in tx, offsets) and on commit submits transaction to KQP
*/
class TKafkaTransactionActor : public NActors::TActor<TKafkaTransactionActor> {
class TTransactionActor : public NActors::TActor<TTransactionActor> {

using TBase = NActors::TActor<TKafkaTransactionActor>;
using TBase = NActors::TActor<TTransactionActor>;

public:
struct TTopicPartition {
@@ -48,8 +48,8 @@ namespace NKafka {
};

// we need to exlplicitly specify kqpActorId and txnCoordinatorActorId for unit tests
TKafkaTransactionActor(const TString& transactionalId, i64 producerId, i16 producerEpoch, const TString& DatabasePath, const TActorId& kqpActorId, const TActorId& txnCoordinatorActorId) :
TActor<TKafkaTransactionActor>(&TKafkaTransactionActor::StateFunc),
TTransactionActor(const TString& transactionalId, i64 producerId, i16 producerEpoch, const TString& DatabasePath, const TActorId& kqpActorId, const TActorId& txnCoordinatorActorId) :
TActor<TTransactionActor>(&TTransactionActor::StateFunc),
TransactionalId(transactionalId),
ProducerInstanceId({producerId, producerEpoch}),
DatabasePath(DatabasePath),
Loading