diff --git a/ydb/core/kafka_proxy/ut/kafka_test_client.cpp b/ydb/core/kafka_proxy/ut/kafka_test_client.cpp index c13409d4f95b..57ae44760d31 100644 --- a/ydb/core/kafka_proxy/ut/kafka_test_client.cpp +++ b/ydb/core/kafka_proxy/ut/kafka_test_client.cpp @@ -82,7 +82,9 @@ TMessagePtr TKafkaTestClient::InitProducerId(const return WriteAndRead(header, request); } -TMessagePtr TKafkaTestClient::OffsetCommit(TString groupId, std::unordered_map>> topicsToPartions) { + + +TMessagePtr TKafkaTestClient::OffsetCommit(TString groupId, std::unordered_map> topicsToConsumerOffsets) { Cerr << ">>>>> TOffsetCommitRequestData\n"; TRequestHeaderData header = Header(NKafka::EApiKey::OFFSET_COMMIT, 1); @@ -90,14 +92,15 @@ TMessagePtr TKafkaTestClient::OffsetCommit(TString gr TOffsetCommitRequestData request; request.GroupId = groupId; - for (const auto& topicToPartitions : topicsToPartions) { + for (const auto& topicToConsumerOffsets : topicsToConsumerOffsets) { NKafka::TOffsetCommitRequestData::TOffsetCommitRequestTopic topic; - topic.Name = topicToPartitions.first; + topic.Name = topicToConsumerOffsets.first; - for (auto partitionAndOffset : topicToPartitions.second) { + for (auto consumerOffset : topicToConsumerOffsets.second) { NKafka::TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition partition; - partition.PartitionIndex = partitionAndOffset.first; - partition.CommittedOffset = partitionAndOffset.second; + partition.PartitionIndex = consumerOffset.PartitionIndex; + partition.CommittedOffset = consumerOffset.Offset; + partition.CommittedMetadata = consumerOffset.Metadata; topic.Partitions.push_back(partition); } request.Topics.push_back(topic); @@ -663,4 +666,4 @@ void TKafkaTestClient::FillTopicsFromJoinGroupMetadata(TKafkaBytes& metadata, TH topics.emplace(topic.value()); } } -} \ No newline at end of file +} diff --git a/ydb/core/kafka_proxy/ut/kafka_test_client.h b/ydb/core/kafka_proxy/ut/kafka_test_client.h index 531c86b4b968..8516dbd3766f 100644 --- a/ydb/core/kafka_proxy/ut/kafka_test_client.h +++ b/ydb/core/kafka_proxy/ut/kafka_test_client.h @@ -37,6 +37,13 @@ struct TReadInfo { i32 GenerationId; }; +struct TConsumerOffset { + ui64 PartitionIndex; + ui64 Offset; + TString Metadata; +}; + + class TKafkaTestClient { public: TKafkaTestClient(ui16 port, const TString clientName = "TestClient"); @@ -61,7 +68,7 @@ class TKafkaTestClient { TMessagePtr InitProducerId(const TString& transactionalId = ""); - TMessagePtr OffsetCommit(TString groupId, std::unordered_map>> topicsToPartions); + TMessagePtr OffsetCommit(TString groupId, std::unordered_map> topicsToConsumerOffsets); TMessagePtr Produce(const TString& topicName, ui32 partition, const TKafkaRecordBatch& batch); @@ -128,4 +135,4 @@ class TKafkaTestClient { ui32 Correlation; TString ClientName; - }; \ No newline at end of file + }; diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp index 23d144090569..c8837c76133e 100644 --- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp +++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -1,3 +1,5 @@ + + #include #include "kafka_test_client.h" @@ -279,6 +281,8 @@ void CreateTopic(NYdb::NTopic::TTopicClient& pqClient, TString& topicName, ui32 } + + Y_UNIT_TEST_SUITE(KafkaProtocol) { // this test imitates kafka producer behaviour: // 1. get api version, @@ -996,6 +1000,8 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { TString headerKey = "header-key"; TString headerValue = "header-value"; + TString commitedMetaData = "additional-info"; + NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); CreateTopic(pqClient, firstTopicName, minActivePartitions, {firstConsumerName, secondConsumerName}); CreateTopic(pqClient, secondTopicName, minActivePartitions, {firstConsumerName, secondConsumerName}); @@ -1043,10 +1049,10 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { { // Check commit - std::unordered_map>> offsets; - std::vector> partitionsAndOffsets; + std::unordered_map> offsets; + std::vector partitionsAndOffsets; for (ui64 i = 0; i < minActivePartitions; ++i) { - partitionsAndOffsets.emplace_back(std::make_pair(i, recordsCount)); + partitionsAndOffsets.emplace_back(TConsumerOffset{i, static_cast(recordsCount), commitedMetaData}); } offsets[firstTopicName] = partitionsAndOffsets; offsets[shortTopicName] = partitionsAndOffsets; @@ -1074,12 +1080,18 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { topicsToPartions[firstTopicName] = std::vector{0, 1, 2 , 3 }; auto msg = client.OffsetFetch(firstConsumerName, topicsToPartions); UNIT_ASSERT_VALUES_EQUAL(msg->Groups.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Groups.size(), 1); UNIT_ASSERT_VALUES_EQUAL(msg->Groups[0].Topics.size(), 1); const auto& partitions = msg->Groups[0].Topics[0].Partitions; UNIT_ASSERT_VALUES_EQUAL(partitions.size(), 4); auto partition0 = std::find_if(partitions.begin(), partitions.end(), [](const auto& partition) { return partition.PartitionIndex == 0; }); UNIT_ASSERT_VALUES_UNEQUAL(partition0, partitions.end()); UNIT_ASSERT_VALUES_EQUAL(partition0->CommittedOffset, 5); + // ToDo: return here to change ASSERT to check that metadata is transfered correctly + // after realization of metadata saving is completed. + for (auto p = partitions.begin(); p != partitions.end(); p++) { + UNIT_ASSERT_VALUES_EQUAL(p->Metadata, ""); + } } { @@ -1097,10 +1109,10 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { { // Check commit with nonexistent topic - std::unordered_map>> offsets; - std::vector> partitionsAndOffsets; + std::unordered_map> offsets; + std::vector partitionsAndOffsets; for (ui64 i = 0; i < minActivePartitions; ++i) { - partitionsAndOffsets.emplace_back(std::make_pair(i, recordsCount)); + partitionsAndOffsets.emplace_back(TConsumerOffset{i, static_cast(recordsCount), commitedMetaData}); } offsets[firstTopicName] = partitionsAndOffsets; offsets[notExistsTopicName] = partitionsAndOffsets; @@ -1130,10 +1142,10 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { { // Check commit with nonexistent consumer - std::unordered_map>> offsets; - std::vector> partitionsAndOffsets; + std::unordered_map> offsets; + std::vector partitionsAndOffsets; for (ui64 i = 0; i < minActivePartitions; ++i) { - partitionsAndOffsets.emplace_back(std::make_pair(i, recordsCount)); + partitionsAndOffsets.emplace_back(TConsumerOffset{i, static_cast(recordsCount), commitedMetaData}); } offsets[firstTopicName] = partitionsAndOffsets;