From 74afd8174bba69e76b0b95e6e5b77d2200c5661a Mon Sep 17 00:00:00 2001 From: Irina Skvortsova Date: Fri, 25 Apr 2025 14:40:58 +0000 Subject: [PATCH 1/3] add test to check metadata --- ydb/core/kafka_proxy/ut/kafka_test_client.cpp | 29 ++++++++++++++++++ ydb/core/kafka_proxy/ut/kafka_test_client.h | 8 +++++ ydb/core/kafka_proxy/ut/ut_protocol.cpp | 30 ++++++++++++------- 3 files changed, 57 insertions(+), 10 deletions(-) diff --git a/ydb/core/kafka_proxy/ut/kafka_test_client.cpp b/ydb/core/kafka_proxy/ut/kafka_test_client.cpp index c13409d4f95b..5b908c8dc882 100644 --- a/ydb/core/kafka_proxy/ut/kafka_test_client.cpp +++ b/ydb/core/kafka_proxy/ut/kafka_test_client.cpp @@ -82,6 +82,35 @@ TMessagePtr TKafkaTestClient::InitProducerId(const return WriteAndRead(header, request); } + + +TMessagePtr TKafkaTestClient::OffsetCommit(TString groupId, std::unordered_map>> topicsToPartions) { + Cerr << ">>>>> TOffsetCommitRequestData\n"; + + TRequestHeaderData header = Header(NKafka::EApiKey::OFFSET_COMMIT, 1); + + TOffsetCommitRequestData request; + request.GroupId = groupId; + + for (const auto& topicToPartitions : topicsToPartions) { + NKafka::TOffsetCommitRequestData::TOffsetCommitRequestTopic topic; + topic.Name = topicToPartitions.first; + + for (auto partitionAndOffset : topicToPartitions.second) { + NKafka::TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition partition; + partition.PartitionIndex = partitionAndOffset.first; + + auto partitionDataObject = partitionAndOffset.second; + partition.CommittedOffset = partitionDataObject.Offset; + partition.CommittedMetadata = partitionDataObject.Metadata; + topic.Partitions.push_back(partition); + } + request.Topics.push_back(topic); + } + + return WriteAndRead(header, request); +} + TMessagePtr TKafkaTestClient::OffsetCommit(TString groupId, std::unordered_map>> topicsToPartions) { Cerr << ">>>>> TOffsetCommitRequestData\n"; diff --git a/ydb/core/kafka_proxy/ut/kafka_test_client.h b/ydb/core/kafka_proxy/ut/kafka_test_client.h index 531c86b4b968..d7a676859dd8 100644 --- a/ydb/core/kafka_proxy/ut/kafka_test_client.h +++ b/ydb/core/kafka_proxy/ut/kafka_test_client.h @@ -37,6 +37,12 @@ struct TReadInfo { i32 GenerationId; }; +struct TPartitionDataOffsetMeta { + ui64 Offset; + TString Metadata; +}; + + class TKafkaTestClient { public: TKafkaTestClient(ui16 port, const TString clientName = "TestClient"); @@ -63,6 +69,8 @@ class TKafkaTestClient { TMessagePtr OffsetCommit(TString groupId, std::unordered_map>> topicsToPartions); + TMessagePtr OffsetCommit(TString groupId, std::unordered_map>> topicsToPartions); + TMessagePtr Produce(const TString& topicName, ui32 partition, const TKafkaRecordBatch& batch); TMessagePtr Produce(const TString& topicName, const std::vector> msgs); diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp index 23d144090569..8bf03fb27278 100644 --- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp +++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -1,3 +1,5 @@ + + #include #include "kafka_test_client.h" @@ -279,6 +281,8 @@ void CreateTopic(NYdb::NTopic::TTopicClient& pqClient, TString& topicName, ui32 } + + Y_UNIT_TEST_SUITE(KafkaProtocol) { // this test imitates kafka producer behaviour: // 1. get api version, @@ -996,6 +1000,8 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { TString headerKey = "header-key"; TString headerValue = "header-value"; + TString meta = "additional-info"; + NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); CreateTopic(pqClient, firstTopicName, minActivePartitions, {firstConsumerName, secondConsumerName}); CreateTopic(pqClient, secondTopicName, minActivePartitions, {firstConsumerName, secondConsumerName}); @@ -1043,10 +1049,10 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { { // Check commit - std::unordered_map>> offsets; - std::vector> partitionsAndOffsets; + std::unordered_map>> offsets; + std::vector> partitionsAndOffsets; for (ui64 i = 0; i < minActivePartitions; ++i) { - partitionsAndOffsets.emplace_back(std::make_pair(i, recordsCount)); + partitionsAndOffsets.emplace_back(std::make_pair(i, TPartitionDataOffsetMeta{static_cast(recordsCount), meta})); } offsets[firstTopicName] = partitionsAndOffsets; offsets[shortTopicName] = partitionsAndOffsets; @@ -1074,12 +1080,16 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { topicsToPartions[firstTopicName] = std::vector{0, 1, 2 , 3 }; auto msg = client.OffsetFetch(firstConsumerName, topicsToPartions); UNIT_ASSERT_VALUES_EQUAL(msg->Groups.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Groups.size(), 1); UNIT_ASSERT_VALUES_EQUAL(msg->Groups[0].Topics.size(), 1); const auto& partitions = msg->Groups[0].Topics[0].Partitions; UNIT_ASSERT_VALUES_EQUAL(partitions.size(), 4); auto partition0 = std::find_if(partitions.begin(), partitions.end(), [](const auto& partition) { return partition.PartitionIndex == 0; }); UNIT_ASSERT_VALUES_UNEQUAL(partition0, partitions.end()); UNIT_ASSERT_VALUES_EQUAL(partition0->CommittedOffset, 5); + for (auto p = partitions.begin(); p != partitions.end(); p++) { + UNIT_ASSERT_VALUES_EQUAL(p->Metadata, ""); + } } { @@ -1097,10 +1107,10 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { { // Check commit with nonexistent topic - std::unordered_map>> offsets; - std::vector> partitionsAndOffsets; + std::unordered_map>> offsets; + std::vector> partitionsAndOffsets; for (ui64 i = 0; i < minActivePartitions; ++i) { - partitionsAndOffsets.emplace_back(std::make_pair(i, recordsCount)); + partitionsAndOffsets.emplace_back(std::make_pair(i, TPartitionDataOffsetMeta{static_cast(recordsCount), meta})); } offsets[firstTopicName] = partitionsAndOffsets; offsets[notExistsTopicName] = partitionsAndOffsets; @@ -1130,10 +1140,10 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { { // Check commit with nonexistent consumer - std::unordered_map>> offsets; - std::vector> partitionsAndOffsets; + std::unordered_map>> offsets; + std::vector> partitionsAndOffsets; for (ui64 i = 0; i < minActivePartitions; ++i) { - partitionsAndOffsets.emplace_back(std::make_pair(i, recordsCount)); + partitionsAndOffsets.emplace_back(std::make_pair(i, TPartitionDataOffsetMeta{static_cast(recordsCount), meta})); } offsets[firstTopicName] = partitionsAndOffsets; @@ -1422,7 +1432,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { TKafkaTestClient client(testServer.Port); client.AuthenticateToKafka(); - + auto describeTopicSettings = NTopic::TDescribeTopicSettings().IncludeStats(true); { From 3ac54a282fc9f77045e629bb1e88f6c3c3c8e4dc Mon Sep 17 00:00:00 2001 From: Irina Skvortsova Date: Mon, 28 Apr 2025 14:39:23 +0000 Subject: [PATCH 2/3] fix codestyle rename TConsumerOffset struct add PartitionIndex field to TConsumerOffset struct to substitude std::pair in OffsetCommit function --- ydb/core/kafka_proxy/ut/kafka_test_client.cpp | 36 +++---------------- ydb/core/kafka_proxy/ut/kafka_test_client.h | 9 +++-- ydb/core/kafka_proxy/ut/ut_protocol.cpp | 22 ++++++------ 3 files changed, 20 insertions(+), 47 deletions(-) diff --git a/ydb/core/kafka_proxy/ut/kafka_test_client.cpp b/ydb/core/kafka_proxy/ut/kafka_test_client.cpp index 5b908c8dc882..924dd7e194e2 100644 --- a/ydb/core/kafka_proxy/ut/kafka_test_client.cpp +++ b/ydb/core/kafka_proxy/ut/kafka_test_client.cpp @@ -84,7 +84,7 @@ TMessagePtr TKafkaTestClient::InitProducerId(const -TMessagePtr TKafkaTestClient::OffsetCommit(TString groupId, std::unordered_map>> topicsToPartions) { +TMessagePtr TKafkaTestClient::OffsetCommit(TString groupId, std::unordered_map> topicsToPartions) { Cerr << ">>>>> TOffsetCommitRequestData\n"; TRequestHeaderData header = Header(NKafka::EApiKey::OFFSET_COMMIT, 1); @@ -98,35 +98,9 @@ TMessagePtr TKafkaTestClient::OffsetCommit(TString gr for (auto partitionAndOffset : topicToPartitions.second) { NKafka::TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition partition; - partition.PartitionIndex = partitionAndOffset.first; - - auto partitionDataObject = partitionAndOffset.second; - partition.CommittedOffset = partitionDataObject.Offset; - partition.CommittedMetadata = partitionDataObject.Metadata; - topic.Partitions.push_back(partition); - } - request.Topics.push_back(topic); - } - - return WriteAndRead(header, request); -} - -TMessagePtr TKafkaTestClient::OffsetCommit(TString groupId, std::unordered_map>> topicsToPartions) { - Cerr << ">>>>> TOffsetCommitRequestData\n"; - - TRequestHeaderData header = Header(NKafka::EApiKey::OFFSET_COMMIT, 1); - - TOffsetCommitRequestData request; - request.GroupId = groupId; - - for (const auto& topicToPartitions : topicsToPartions) { - NKafka::TOffsetCommitRequestData::TOffsetCommitRequestTopic topic; - topic.Name = topicToPartitions.first; - - for (auto partitionAndOffset : topicToPartitions.second) { - NKafka::TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition partition; - partition.PartitionIndex = partitionAndOffset.first; - partition.CommittedOffset = partitionAndOffset.second; + partition.PartitionIndex = partitionAndOffset.PartitionIndex; + partition.CommittedOffset = partitionAndOffset.Offset; + partition.CommittedMetadata = partitionAndOffset.Metadata; topic.Partitions.push_back(partition); } request.Topics.push_back(topic); @@ -692,4 +666,4 @@ void TKafkaTestClient::FillTopicsFromJoinGroupMetadata(TKafkaBytes& metadata, TH topics.emplace(topic.value()); } } -} \ No newline at end of file +} diff --git a/ydb/core/kafka_proxy/ut/kafka_test_client.h b/ydb/core/kafka_proxy/ut/kafka_test_client.h index d7a676859dd8..580abf49fc85 100644 --- a/ydb/core/kafka_proxy/ut/kafka_test_client.h +++ b/ydb/core/kafka_proxy/ut/kafka_test_client.h @@ -37,7 +37,8 @@ struct TReadInfo { i32 GenerationId; }; -struct TPartitionDataOffsetMeta { +struct TConsumerOffset { + ui64 PartitionIndex; ui64 Offset; TString Metadata; }; @@ -67,9 +68,7 @@ class TKafkaTestClient { TMessagePtr InitProducerId(const TString& transactionalId = ""); - TMessagePtr OffsetCommit(TString groupId, std::unordered_map>> topicsToPartions); - - TMessagePtr OffsetCommit(TString groupId, std::unordered_map>> topicsToPartions); + TMessagePtr OffsetCommit(TString groupId, std::unordered_map> topicsToPartions); TMessagePtr Produce(const TString& topicName, ui32 partition, const TKafkaRecordBatch& batch); @@ -136,4 +135,4 @@ class TKafkaTestClient { ui32 Correlation; TString ClientName; - }; \ No newline at end of file + }; diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp index 8bf03fb27278..17ee8e3ca12e 100644 --- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp +++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -1000,7 +1000,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { TString headerKey = "header-key"; TString headerValue = "header-value"; - TString meta = "additional-info"; + TString commitedMetaData = "additional-info"; NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); CreateTopic(pqClient, firstTopicName, minActivePartitions, {firstConsumerName, secondConsumerName}); @@ -1049,10 +1049,10 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { { // Check commit - std::unordered_map>> offsets; - std::vector> partitionsAndOffsets; + std::unordered_map> offsets; + std::vector partitionsAndOffsets; for (ui64 i = 0; i < minActivePartitions; ++i) { - partitionsAndOffsets.emplace_back(std::make_pair(i, TPartitionDataOffsetMeta{static_cast(recordsCount), meta})); + partitionsAndOffsets.emplace_back(TConsumerOffset{i, static_cast(recordsCount), commitedMetaData}); } offsets[firstTopicName] = partitionsAndOffsets; offsets[shortTopicName] = partitionsAndOffsets; @@ -1107,10 +1107,10 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { { // Check commit with nonexistent topic - std::unordered_map>> offsets; - std::vector> partitionsAndOffsets; + std::unordered_map> offsets; + std::vector partitionsAndOffsets; for (ui64 i = 0; i < minActivePartitions; ++i) { - partitionsAndOffsets.emplace_back(std::make_pair(i, TPartitionDataOffsetMeta{static_cast(recordsCount), meta})); + partitionsAndOffsets.emplace_back(TConsumerOffset{i, static_cast(recordsCount), commitedMetaData}); } offsets[firstTopicName] = partitionsAndOffsets; offsets[notExistsTopicName] = partitionsAndOffsets; @@ -1140,10 +1140,10 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { { // Check commit with nonexistent consumer - std::unordered_map>> offsets; - std::vector> partitionsAndOffsets; + std::unordered_map> offsets; + std::vector partitionsAndOffsets; for (ui64 i = 0; i < minActivePartitions; ++i) { - partitionsAndOffsets.emplace_back(std::make_pair(i, TPartitionDataOffsetMeta{static_cast(recordsCount), meta})); + partitionsAndOffsets.emplace_back(TConsumerOffset{i, static_cast(recordsCount), commitedMetaData}); } offsets[firstTopicName] = partitionsAndOffsets; @@ -1432,7 +1432,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { TKafkaTestClient client(testServer.Port); client.AuthenticateToKafka(); - + auto describeTopicSettings = NTopic::TDescribeTopicSettings().IncludeStats(true); { From 049e152d2bf7f6516ee877f2ebe13ec5f95297b8 Mon Sep 17 00:00:00 2001 From: Irina Skvortsova Date: Mon, 5 May 2025 13:33:30 +0000 Subject: [PATCH 3/3] rename variables, add ToDo comment for the future --- ydb/core/kafka_proxy/ut/kafka_test_client.cpp | 14 +++++++------- ydb/core/kafka_proxy/ut/kafka_test_client.h | 2 +- ydb/core/kafka_proxy/ut/ut_protocol.cpp | 2 ++ 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/ydb/core/kafka_proxy/ut/kafka_test_client.cpp b/ydb/core/kafka_proxy/ut/kafka_test_client.cpp index 924dd7e194e2..57ae44760d31 100644 --- a/ydb/core/kafka_proxy/ut/kafka_test_client.cpp +++ b/ydb/core/kafka_proxy/ut/kafka_test_client.cpp @@ -84,7 +84,7 @@ TMessagePtr TKafkaTestClient::InitProducerId(const -TMessagePtr TKafkaTestClient::OffsetCommit(TString groupId, std::unordered_map> topicsToPartions) { +TMessagePtr TKafkaTestClient::OffsetCommit(TString groupId, std::unordered_map> topicsToConsumerOffsets) { Cerr << ">>>>> TOffsetCommitRequestData\n"; TRequestHeaderData header = Header(NKafka::EApiKey::OFFSET_COMMIT, 1); @@ -92,15 +92,15 @@ TMessagePtr TKafkaTestClient::OffsetCommit(TString gr TOffsetCommitRequestData request; request.GroupId = groupId; - for (const auto& topicToPartitions : topicsToPartions) { + for (const auto& topicToConsumerOffsets : topicsToConsumerOffsets) { NKafka::TOffsetCommitRequestData::TOffsetCommitRequestTopic topic; - topic.Name = topicToPartitions.first; + topic.Name = topicToConsumerOffsets.first; - for (auto partitionAndOffset : topicToPartitions.second) { + for (auto consumerOffset : topicToConsumerOffsets.second) { NKafka::TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition partition; - partition.PartitionIndex = partitionAndOffset.PartitionIndex; - partition.CommittedOffset = partitionAndOffset.Offset; - partition.CommittedMetadata = partitionAndOffset.Metadata; + partition.PartitionIndex = consumerOffset.PartitionIndex; + partition.CommittedOffset = consumerOffset.Offset; + partition.CommittedMetadata = consumerOffset.Metadata; topic.Partitions.push_back(partition); } request.Topics.push_back(topic); diff --git a/ydb/core/kafka_proxy/ut/kafka_test_client.h b/ydb/core/kafka_proxy/ut/kafka_test_client.h index 580abf49fc85..8516dbd3766f 100644 --- a/ydb/core/kafka_proxy/ut/kafka_test_client.h +++ b/ydb/core/kafka_proxy/ut/kafka_test_client.h @@ -68,7 +68,7 @@ class TKafkaTestClient { TMessagePtr InitProducerId(const TString& transactionalId = ""); - TMessagePtr OffsetCommit(TString groupId, std::unordered_map> topicsToPartions); + TMessagePtr OffsetCommit(TString groupId, std::unordered_map> topicsToConsumerOffsets); TMessagePtr Produce(const TString& topicName, ui32 partition, const TKafkaRecordBatch& batch); diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp index 17ee8e3ca12e..c8837c76133e 100644 --- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp +++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -1087,6 +1087,8 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { auto partition0 = std::find_if(partitions.begin(), partitions.end(), [](const auto& partition) { return partition.PartitionIndex == 0; }); UNIT_ASSERT_VALUES_UNEQUAL(partition0, partitions.end()); UNIT_ASSERT_VALUES_EQUAL(partition0->CommittedOffset, 5); + // ToDo: return here to change ASSERT to check that metadata is transfered correctly + // after realization of metadata saving is completed. for (auto p = partitions.begin(); p != partitions.end(); p++) { UNIT_ASSERT_VALUES_EQUAL(p->Metadata, ""); }