Skip to content

[Kafka API] Add forwarding of txn requests to coordinator #17765

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 26 additions & 26 deletions ydb/core/kafka_proxy/actors/kafka_transaction_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ if (!ProducerInRequestIsValid(ev->Get()->Request)) { \

namespace NKafka {

void TKafkaTransactionActor::Handle(TEvKafka::TEvAddPartitionsToTxnRequest::TPtr& ev, const TActorContext&){
void TTransactionActor::Handle(TEvKafka::TEvAddPartitionsToTxnRequest::TPtr& ev, const TActorContext&){
KAFKA_LOG_D("Receieved ADD_PARTITIONS_TO_TXN request");
VALIDATE_PRODUCER_IN_REQUEST(TAddPartitionsToTxnResponseData);

Expand All @@ -36,13 +36,13 @@ namespace NKafka {
// Method does nothing. In Kafka it will add __consumer_offsets topic to transaction, but
// in YDB Topics we store offsets in table and do not need this extra action.
// Thus we can just ignore this request.
void TKafkaTransactionActor::Handle(TEvKafka::TEvAddOffsetsToTxnRequest::TPtr& ev, const TActorContext&) {
void TTransactionActor::Handle(TEvKafka::TEvAddOffsetsToTxnRequest::TPtr& ev, const TActorContext&) {
KAFKA_LOG_D("Receieved ADD_OFFSETS_TO_TXN request");
VALIDATE_PRODUCER_IN_REQUEST(TAddOffsetsToTxnResponseData);
SendOkResponse<TAddOffsetsToTxnResponseData>(ev);
}

void TKafkaTransactionActor::Handle(TEvKafka::TEvTxnOffsetCommitRequest::TPtr& ev, const TActorContext&) {
void TTransactionActor::Handle(TEvKafka::TEvTxnOffsetCommitRequest::TPtr& ev, const TActorContext&) {
KAFKA_LOG_D("Receieved TXN_OFFSET_COMMIT request");
VALIDATE_PRODUCER_IN_REQUEST(TTxnOffsetCommitResponseData);

Expand Down Expand Up @@ -78,7 +78,7 @@ namespace NKafka {
6. Handle(NKqp::TEvKqp::TEvQueryResponse): If everything committed successfully, return OK to the client
7. Close KQP session, send to coordinator TEvTransactionActorDied, die.
*/
void TKafkaTransactionActor::Handle(TEvKafka::TEvEndTxnRequest::TPtr& ev, const TActorContext& ctx) {
void TTransactionActor::Handle(TEvKafka::TEvEndTxnRequest::TPtr& ev, const TActorContext& ctx) {
KAFKA_LOG_D("Receieved END_TXN request");
VALIDATE_PRODUCER_IN_REQUEST(TEndTxnResponseData);

Expand All @@ -94,11 +94,11 @@ namespace NKafka {
}
}

void TKafkaTransactionActor::Handle(TEvents::TEvPoison::TPtr&, const TActorContext& ctx) {
void TTransactionActor::Handle(TEvents::TEvPoison::TPtr&, const TActorContext& ctx) {
Die(ctx);
}

void TKafkaTransactionActor::Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx) {
void TTransactionActor::Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx) {
KAFKA_LOG_D(TStringBuilder() << "KQP session created");
if (!Kqp->HandleCreateSessionResponse(ev, ctx)) {
SendFailResponse<TEndTxnResponseData>(EndTxnRequestPtr, EKafkaErrors::BROKER_NOT_AVAILABLE, "Failed to create KQP session");
Expand All @@ -109,7 +109,7 @@ namespace NKafka {
SendToKqpValidationRequests(ctx);
}

void TKafkaTransactionActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
void TTransactionActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
KAFKA_LOG_D(TStringBuilder() << "Receieved query response from KQP for " << GetAsStr(LastSentToKqpRequest) << " request");
if (auto error = GetErrorFromYdbResponse(ev)) {
KAFKA_LOG_W(error);
Expand All @@ -130,13 +130,13 @@ namespace NKafka {
}
}

void TKafkaTransactionActor::StartKqpSession(const TActorContext& ctx) {
void TTransactionActor::StartKqpSession(const TActorContext& ctx) {
Kqp = std::make_unique<TKqpTxHelper>(DatabasePath);
KAFKA_LOG_D(TStringBuilder() << "Sending create session request to KQP for database " << DatabasePath);
Kqp->SendCreateSessionRequest(ctx, KqpActorId);
}

void TKafkaTransactionActor::SendToKqpValidationRequests(const TActorContext& ctx) {
void TTransactionActor::SendToKqpValidationRequests(const TActorContext& ctx) {
KAFKA_LOG_D(TStringBuilder() << "Sending select request to KQP for database " << DatabasePath);
Kqp->SendYqlRequest(
GetYqlWithTablesNames(NKafkaTransactionSql::SELECT_FOR_VALIDATION),
Expand All @@ -150,7 +150,7 @@ namespace NKafka {
LastSentToKqpRequest = EKafkaTxnKqpRequests::SELECT;
}

void TKafkaTransactionActor::SendCommitTxnRequest(const TString& kqpTransactionId) {
void TTransactionActor::SendCommitTxnRequest(const TString& kqpTransactionId) {
auto request = BuildCommitTxnRequestToKqp(kqpTransactionId);

Send(KqpActorId, request.Release(), 0, ++KqpCookie);
Expand All @@ -160,7 +160,7 @@ namespace NKafka {

// Response senders
template<class ErrorResponseType, class EventType>
void TKafkaTransactionActor::SendFailResponse(TAutoPtr<TEventHandle<EventType>>& evHandle, EKafkaErrors errorCode, const TString& errorMessage) {
void TTransactionActor::SendFailResponse(TAutoPtr<TEventHandle<EventType>>& evHandle, EKafkaErrors errorCode, const TString& errorMessage) {
if (errorMessage) {
KAFKA_LOG_W(TStringBuilder() << "Sending fail response with error code: " << errorCode << ". Reason: " << errorMessage);
} else {
Expand All @@ -172,15 +172,15 @@ namespace NKafka {
}

template<class ResponseType, class EventType>
void TKafkaTransactionActor::SendOkResponse(TAutoPtr<TEventHandle<EventType>>& evHandle) {
void TTransactionActor::SendOkResponse(TAutoPtr<TEventHandle<EventType>>& evHandle) {
auto& kafkaRequest = evHandle->Get()->Request;
KAFKA_LOG_D(TStringBuilder() << "Sending OK response to " << evHandle->Get()->ConnectionId << " with correlationId " << evHandle->Get()->CorrelationId << " and transactionalId " << TransactionalId);
std::shared_ptr<ResponseType> response = NKafkaTransactions::BuildResponse<ResponseType>(kafkaRequest, EKafkaErrors::NONE_ERROR);
Send(evHandle->Get()->ConnectionId, new TEvKafka::TEvResponse(evHandle->Get()->CorrelationId, response, EKafkaErrors::NONE_ERROR));
}

// helper methods
void TKafkaTransactionActor::Die(const TActorContext &ctx) {
void TTransactionActor::Die(const TActorContext &ctx) {
KAFKA_LOG_D("Dying.");
if (Kqp) {
Kqp->CloseKqpSession(ctx);
Expand All @@ -190,17 +190,17 @@ namespace NKafka {
}

template<class EventType>
bool TKafkaTransactionActor::ProducerInRequestIsValid(TMessagePtr<EventType> kafkaRequest) {
bool TTransactionActor::ProducerInRequestIsValid(TMessagePtr<EventType> kafkaRequest) {
return kafkaRequest->TransactionalId->c_str() == TransactionalId
&& kafkaRequest->ProducerId == ProducerInstanceId.Id
&& kafkaRequest->ProducerEpoch == ProducerInstanceId.Epoch;
}

TString TKafkaTransactionActor::GetFullTopicPath(const TString& topicName) {
TString TTransactionActor::GetFullTopicPath(const TString& topicName) {
return NPersQueue::GetFullTopicPath(DatabasePath, topicName);
}

TString TKafkaTransactionActor::GetYqlWithTablesNames(const TString& templateStr) {
TString TTransactionActor::GetYqlWithTablesNames(const TString& templateStr) {
TString templateWithProducerStateTable = std::regex_replace(
templateStr.c_str(),
std::regex("<producer_state_table_name>"),
Expand All @@ -215,7 +215,7 @@ namespace NKafka {
return templateWithConsumerStateTable;
}

NYdb::TParams TKafkaTransactionActor::BuildSelectParams() {
NYdb::TParams TTransactionActor::BuildSelectParams() {
NYdb::TParamsBuilder params;
params.AddParam("$Database").Utf8(DatabasePath).Build();
params.AddParam("$TransactionalId").Utf8(TransactionalId).Build();
Expand All @@ -236,7 +236,7 @@ namespace NKafka {
return params.Build();
}

THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> TKafkaTransactionActor::BuildCommitTxnRequestToKqp(const TString& kqpTransactionId) {
THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> TTransactionActor::BuildCommitTxnRequestToKqp(const TString& kqpTransactionId) {
auto ev = MakeHolder<TEvKqp::TEvQueryRequest>();

ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_UNDEFINED);
Expand Down Expand Up @@ -272,7 +272,7 @@ namespace NKafka {
return ev;
}

void TKafkaTransactionActor::HandleSelectResponse(const NKqp::TEvKqp::TEvQueryResponse& response, const TActorContext& ctx) {
void TTransactionActor::HandleSelectResponse(const NKqp::TEvKqp::TEvQueryResponse& response, const TActorContext& ctx) {
// YDB should return exactly two result sets for two queries: to producer and consumer state tables
if (response.Record.GetResponse().GetYdbResults().size() != 2) {
TString error = TStringBuilder() << "KQP returned wrong number of result sets on SELECT query. Expected 2, got " << response.Record.GetResponse().GetYdbResults().size() << ".";
Expand Down Expand Up @@ -315,13 +315,13 @@ namespace NKafka {
SendCommitTxnRequest(kqpTxnId);
}

void TKafkaTransactionActor::HandleCommitResponse(const TActorContext& ctx) {
void TTransactionActor::HandleCommitResponse(const TActorContext& ctx) {
KAFKA_LOG_D("Successfully committed transaction. Sending ok and dying.");
SendOkResponse<TEndTxnResponseData>(EndTxnRequestPtr);
Die(ctx);
}

TMaybe<TString> TKafkaTransactionActor::GetErrorFromYdbResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) {
TMaybe<TString> TTransactionActor::GetErrorFromYdbResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) {
TStringBuilder builder = TStringBuilder() << "Recieved error on request to KQP. Last sent request: " << GetAsStr(LastSentToKqpRequest) << ". Reason: ";
if (ev->Cookie != KqpCookie) {
return builder << "Unexpected cookie in TEvQueryResponse. Expected KQP Cookie: " << KqpCookie << ", Actual: " << ev->Cookie << ".";
Expand All @@ -332,7 +332,7 @@ namespace NKafka {
}
}

TMaybe<TProducerState> TKafkaTransactionActor::ParseProducerState(const NKqp::TEvKqp::TEvQueryResponse& response) {
TMaybe<TProducerState> TTransactionActor::ParseProducerState(const NKqp::TEvKqp::TEvQueryResponse& response) {
auto& resp = response.Record.GetResponse();

NYdb::TResultSetParser parser(resp.GetYdbResults(NKafkaTransactionSql::PRODUCER_STATE_REQUEST_INDEX));
Expand All @@ -357,7 +357,7 @@ namespace NKafka {
}
}

TMaybe<TString> TKafkaTransactionActor::GetErrorInProducerState(const TMaybe<TProducerState>& producerState) {
TMaybe<TString> TTransactionActor::GetErrorInProducerState(const TMaybe<TProducerState>& producerState) {
if (!producerState) {
return "No producer state found. May be it has expired";
} else if (producerState->TransactionalId != TransactionalId || producerState->ProducerId != ProducerInstanceId.Id || producerState->ProducerEpoch != ProducerInstanceId.Epoch) {
Expand All @@ -376,7 +376,7 @@ namespace NKafka {
* @param response The response object containing the result set from the YDB query.
* @return A map where keys are consumer group names and values are their corresponding generations.
*/
std::unordered_map<TString, i32> TKafkaTransactionActor::ParseConsumersGenerations(const NKqp::TEvKqp::TEvQueryResponse& response) {
std::unordered_map<TString, i32> TTransactionActor::ParseConsumersGenerations(const NKqp::TEvKqp::TEvQueryResponse& response) {
std::unordered_map<TString, i32> generationByConsumerName;

NYdb::TResultSetParser parser(response.Record.GetResponse().GetYdbResults(NKafkaTransactionSql::CONSUMER_STATES_REQUEST_INDEX));
Expand All @@ -389,7 +389,7 @@ namespace NKafka {
return generationByConsumerName;
}

TMaybe<TString> TKafkaTransactionActor::GetErrorInConsumersStates(const std::unordered_map<TString, i32>& consumerGenerationByName) {
TMaybe<TString> TTransactionActor::GetErrorInConsumersStates(const std::unordered_map<TString, i32>& consumerGenerationByName) {
TStringBuilder builder;
bool foundError = false;
for (auto& [topicPartition, offsetCommit] : OffsetsToCommit) {
Expand Down Expand Up @@ -421,7 +421,7 @@ namespace NKafka {
}
}

TString TKafkaTransactionActor::GetAsStr(EKafkaTxnKqpRequests request) {
TString TTransactionActor::GetAsStr(EKafkaTxnKqpRequests request) {
switch (request) {
case SELECT:
return "SELECT";
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/kafka_proxy/actors/kafka_transaction_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ namespace NKafka {

It accumulates transaction state (partitions in tx, offsets) and on commit submits transaction to KQP
*/
class TKafkaTransactionActor : public NActors::TActor<TKafkaTransactionActor> {
class TTransactionActor : public NActors::TActor<TTransactionActor> {

using TBase = NActors::TActor<TKafkaTransactionActor>;
using TBase = NActors::TActor<TTransactionActor>;

public:
struct TTopicPartition {
Expand Down Expand Up @@ -48,8 +48,8 @@ namespace NKafka {
};

// we need to exlplicitly specify kqpActorId and txnCoordinatorActorId for unit tests
TKafkaTransactionActor(const TString& transactionalId, i64 producerId, i16 producerEpoch, const TString& DatabasePath, const TActorId& kqpActorId, const TActorId& txnCoordinatorActorId) :
TActor<TKafkaTransactionActor>(&TKafkaTransactionActor::StateFunc),
TTransactionActor(const TString& transactionalId, i64 producerId, i16 producerEpoch, const TString& DatabasePath, const TActorId& kqpActorId, const TActorId& txnCoordinatorActorId) :
TActor<TTransactionActor>(&TTransactionActor::StateFunc),
TransactionalId(transactionalId),
ProducerInstanceId({producerId, producerEpoch}),
DatabasePath(DatabasePath),
Expand Down
52 changes: 52 additions & 0 deletions ydb/core/kafka_proxy/kafka_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,42 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
Register(CreateKafkaAlterConfigsActor(Context, header->CorrelationId, message));
}

void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TAddPartitionsToTxnRequestData>& message) {
Send(MakeKafkaTransactionsServiceID(), new TEvKafka::TEvAddPartitionsToTxnRequest(
header->CorrelationId,
message,
Context->ConnectionId,
Context->DatabasePath
));
}

void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TAddOffsetsToTxnRequestData>& message) {
Send(MakeKafkaTransactionsServiceID(), new TEvKafka::TEvAddOffsetsToTxnRequest(
header->CorrelationId,
message,
Context->ConnectionId,
Context->DatabasePath
));
}

void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TTxnOffsetCommitRequestData>& message) {
Send(MakeKafkaTransactionsServiceID(), new TEvKafka::TEvTxnOffsetCommitRequest(
header->CorrelationId,
message,
Context->ConnectionId,
Context->DatabasePath
));
}

void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TEndTxnRequestData>& message) {
Send(MakeKafkaTransactionsServiceID(), new TEvKafka::TEvEndTxnRequest(
header->CorrelationId,
message,
Context->ConnectionId,
Context->DatabasePath
));
}

template<class T>
TMessagePtr<T> Cast(std::shared_ptr<Msg>& request) {
return TMessagePtr<T>(request->Buffer, request->Message);
Expand Down Expand Up @@ -443,6 +479,22 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
HandleMessage(&Request->Header, Cast<TAlterConfigsRequestData>(Request));
break;

case ADD_PARTITIONS_TO_TXN:
HandleMessage(&Request->Header, Cast<TAddPartitionsToTxnRequestData>(Request));
break;

case ADD_OFFSETS_TO_TXN:
HandleMessage(&Request->Header, Cast<TAddOffsetsToTxnRequestData>(Request));
break;

case TXN_OFFSET_COMMIT:
HandleMessage(&Request->Header, Cast<TTxnOffsetCommitRequestData>(Request));
break;

case END_TXN:
HandleMessage(&Request->Header, Cast<TEndTxnRequestData>(Request));
break;

default:
KAFKA_LOG_ERROR("Unsupported message: ApiKey=" << Request->Header.RequestApiKey);
PassAway();
Expand Down
Loading
Loading