From eacdaa50a93c7b7f4b60e10bf9f322e4a1dbd141 Mon Sep 17 00:00:00 2001 From: FloatingCrowbar Date: Mon, 17 Mar 2025 16:32:19 +0300 Subject: [PATCH 1/6] Topic data handler in viewer (#15250) --- ydb/core/http_proxy/ut/internal_counters.json | 2 +- ydb/core/persqueue/partition.cpp | 2 +- ydb/core/persqueue/partition_read.cpp | 11 +- ydb/core/persqueue/pq_impl.cpp | 2 + ydb/core/persqueue/subscriber.h | 4 +- ydb/core/protos/msgbus_pq.proto | 1 + ydb/core/viewer/json_handlers_viewer.cpp | 6 + ydb/core/viewer/json_pipe_req.cpp | 60 +++- ydb/core/viewer/json_pipe_req.h | 10 + ydb/core/viewer/protos/viewer.proto | 26 ++ ydb/core/viewer/tests/canondata/result.json | 70 +++++ ydb/core/viewer/tests/test.py | 102 +++++++ ydb/core/viewer/tests/ya.make | 2 +- ydb/core/viewer/topic_data_ut.cpp | 227 ++++++++++++++ ydb/core/viewer/ut/ut_utils.cpp | 18 ++ ydb/core/viewer/ut/ut_utils.h | 57 ++++ ydb/core/viewer/ut/ya.make | 8 +- ydb/core/viewer/viewer_topic_data.cpp | 281 ++++++++++++++++++ ydb/core/viewer/viewer_topic_data.h | 135 +++++++++ ydb/core/viewer/viewer_ut.cpp | 63 +--- ydb/core/viewer/ya.make | 2 + ydb/services/lib/auth/auth_helpers.cpp | 27 ++ ydb/services/lib/auth/auth_helpers.h | 15 + ydb/services/lib/auth/ya.make | 13 + 24 files changed, 1068 insertions(+), 76 deletions(-) create mode 100644 ydb/core/viewer/topic_data_ut.cpp create mode 100644 ydb/core/viewer/ut/ut_utils.cpp create mode 100644 ydb/core/viewer/ut/ut_utils.h create mode 100644 ydb/core/viewer/viewer_topic_data.cpp create mode 100644 ydb/core/viewer/viewer_topic_data.h create mode 100644 ydb/services/lib/auth/auth_helpers.cpp create mode 100644 ydb/services/lib/auth/auth_helpers.h create mode 100644 ydb/services/lib/auth/ya.make diff --git a/ydb/core/http_proxy/ut/internal_counters.json b/ydb/core/http_proxy/ut/internal_counters.json index 0810450fe319..094924b507c9 100644 --- a/ydb/core/http_proxy/ut/internal_counters.json +++ b/ydb/core/http_proxy/ut/internal_counters.json @@ -611,7 +611,7 @@ "folder_id": "folder4", "database": "/Root" }, - "value":238, + "value":243, "kind":"RATE" }, { diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index c21f6c8fc572..ba168924c7b1 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -1418,7 +1418,7 @@ void TPartition::Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& c } TReadAnswer answer(info.FormAnswer( - ctx, *ev->Get(), EndOffset, Partition, userInfo, + ctx, *ev->Get(), StartOffset, EndOffset, Partition, userInfo, info.Destination, GetSizeLag(info.Offset), Tablet, Config.GetMeteringMode(), IsActive() )); const auto& resp = dynamic_cast(answer.Event.Get())->Response; diff --git a/ydb/core/persqueue/partition_read.cpp b/ydb/core/persqueue/partition_read.cpp index a3236e13fbf7..4161ea3fd99d 100644 --- a/ydb/core/persqueue/partition_read.cpp +++ b/ydb/core/persqueue/partition_read.cpp @@ -384,6 +384,7 @@ ui64 GetFirstHeaderOffset(const TKey& key, const TString& blob) TReadAnswer TReadInfo::FormAnswer( const TActorContext& ctx, const TEvPQ::TEvBlobResponse& blobResponse, + const ui64 startOffset, const ui64 endOffset, const TPartitionId& partition, TUserInfo* userInfo, @@ -410,7 +411,9 @@ TReadAnswer TReadInfo::FormAnswer( res.SetErrorCode(NPersQueue::NErrorCode::OK); auto readResult = res.MutablePartitionResponse()->MutableCmdReadResult(); readResult->SetWaitQuotaTimeMs(WaitQuotaTime.MilliSeconds()); + readResult->SetStartOffset(startOffset); readResult->SetMaxOffset(endOffset); + readResult->SetEndOffset(endOffset); readResult->SetRealReadOffset(Offset); ui64 realReadOffset = Offset; readResult->SetReadFromTimestampMs(ReadTimestampMs); @@ -474,6 +477,7 @@ TReadAnswer TReadInfo::FormAnswer( SizeEstimate = answerSize; readResult->SetSizeEstimate(SizeEstimate); readResult->SetLastOffset(LastOffset); + readResult->SetStartOffset(startOffset); readResult->SetEndOffset(endOffset); return {answerSize, std::move(answer)}; } @@ -507,7 +511,7 @@ TReadAnswer TReadInfo::FormAnswer( continue; - PQ_LOG_D("FormAnswer processing batch offset " << (offset - header.GetCount()) << " totakecount " << count << " count " << header.GetCount() + PQ_LOG_D("FormAnswer processing batch offset " << (offset - header.GetCount()) << " totakecount " << count << " count " << header.GetCount() << " size " << header.GetPayloadSize() << " from pos " << pos << " cbcount " << batch.Blobs.size()); for (size_t i = pos; i < batch.Blobs.size(); ++i) { @@ -580,6 +584,7 @@ TReadAnswer TReadInfo::FormAnswer( SizeEstimate = answerSize; readResult->SetSizeEstimate(SizeEstimate); readResult->SetLastOffset(LastOffset); + readResult->SetStartOffset(startOffset); readResult->SetEndOffset(endOffset); return {answerSize, std::move(answer)}; @@ -590,7 +595,7 @@ void TPartition::Handle(TEvPQ::TEvReadTimeout::TPtr& ev, const TActorContext& ct if (!res) return; TReadAnswer answer(res->FormAnswer( - ctx, res->Offset, Partition, nullptr, res->Destination, 0, Tablet, Config.GetMeteringMode(), IsActive() + ctx, StartOffset, res->Offset, Partition, nullptr, res->Destination, 0, Tablet, Config.GetMeteringMode(), IsActive() )); ctx.Send(Tablet, answer.Event.Release()); PQ_LOG_D(" waiting read cookie " << ev->Get()->Cookie @@ -1040,7 +1045,7 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u PQ_LOG_D("Reading cookie " << cookie << ". All data is from uncompacted head."); TReadAnswer answer = info.FormAnswer( - ctx, EndOffset, Partition, &UsersInfoStorage->GetOrCreate(info.User, ctx), + ctx, StartOffset, EndOffset, Partition, &UsersInfoStorage->GetOrCreate(info.User, ctx), info.Destination, GetSizeLag(info.Offset), Tablet, Config.GetMeteringMode(), IsActive() ); const auto* ev = dynamic_cast(answer.Event.Get()); diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index e88042ea8a03..2918c297ba7c 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -183,6 +183,8 @@ class TReadProxy : public TActorBootstrapped { auto partResp = responseRecord.MutablePartitionResponse()->MutableCmdReadResult(); partResp->SetMaxOffset(readResult.GetMaxOffset()); + partResp->SetStartOffset(readResult.GetStartOffset()); + partResp->SetEndOffset(readResult.GetEndOffset()); partResp->SetSizeLag(readResult.GetSizeLag()); partResp->SetWaitQuotaTimeMs(partResp->GetWaitQuotaTimeMs() + readResult.GetWaitQuotaTimeMs()); diff --git a/ydb/core/persqueue/subscriber.h b/ydb/core/persqueue/subscriber.h index 528c268fdcfe..3831babb81e2 100644 --- a/ydb/core/persqueue/subscriber.h +++ b/ydb/core/persqueue/subscriber.h @@ -81,6 +81,7 @@ struct TReadInfo { TReadAnswer FormAnswer( const TActorContext& ctx, const TEvPQ::TEvBlobResponse& response, + const ui64 startOffset, const ui64 endOffset, const TPartitionId& partition, TUserInfo* ui, @@ -93,6 +94,7 @@ struct TReadInfo { TReadAnswer FormAnswer( const TActorContext& ctx, + const ui64 startOffset, const ui64 endOffset, const TPartitionId& partition, TUserInfo* ui, @@ -103,7 +105,7 @@ struct TReadInfo { const bool isActive ) { TEvPQ::TEvBlobResponse response(0, TVector()); - return FormAnswer(ctx, response, endOffset, partition, ui, dst, sizeLag, tablet, meteringMode, isActive); + return FormAnswer(ctx, response, startOffset, endOffset, partition, ui, dst, sizeLag, tablet, meteringMode, isActive); } }; diff --git a/ydb/core/protos/msgbus_pq.proto b/ydb/core/protos/msgbus_pq.proto index 8c57210de2a7..9e16ce29a372 100644 --- a/ydb/core/protos/msgbus_pq.proto +++ b/ydb/core/protos/msgbus_pq.proto @@ -417,6 +417,7 @@ message TCmdReadResult { optional uint64 ReadFromTimestampMs = 12; optional uint64 SizeEstimate = 13; optional int64 LastOffset = 14; + optional uint64 StartOffset = 17; optional uint64 EndOffset = 15; optional bool ReadingFinished = 16; } diff --git a/ydb/core/viewer/json_handlers_viewer.cpp b/ydb/core/viewer/json_handlers_viewer.cpp index aa39185ef068..906bb11687d5 100644 --- a/ydb/core/viewer/json_handlers_viewer.cpp +++ b/ydb/core/viewer/json_handlers_viewer.cpp @@ -12,6 +12,7 @@ #include "viewer_describe.h" #include "viewer_describe_topic.h" #include "viewer_feature_flags.h" +#include "viewer_topic_data.h" #include "viewer_graph.h" #include "viewer_healthcheck.h" #include "viewer_hiveinfo.h" @@ -189,6 +190,10 @@ void InitViewerTopicInfoJsonHandler(TJsonHandlers& handlers) { handlers.AddHandler("/viewer/topicinfo", new TJsonHandler(TJsonTopicInfo::GetSwagger())); } +void InitViewerTopicDataJsonHandler(TJsonHandlers& handlers) { + handlers.AddHandler("/viewer/topic_data", new TJsonHandler(TTopicData::GetSwagger())); +} + void InitViewerPQConsumerInfoJsonHandler(TJsonHandlers& handlers) { handlers.AddHandler("/viewer/pqconsumerinfo", new TJsonHandler(TJsonPQConsumerInfo::GetSwagger())); } @@ -303,6 +308,7 @@ void InitViewerJsonHandlers(TJsonHandlers& jsonHandlers) { InitViewerConfigJsonHandler(jsonHandlers); InitViewerCountersJsonHandler(jsonHandlers); InitViewerTopicInfoJsonHandler(jsonHandlers); + InitViewerTopicDataJsonHandler(jsonHandlers); InitViewerPQConsumerInfoJsonHandler(jsonHandlers); InitViewerTabletCountersJsonHandler(jsonHandlers); InitViewerStorageJsonHandler(jsonHandlers); diff --git a/ydb/core/viewer/json_pipe_req.cpp b/ydb/core/viewer/json_pipe_req.cpp index a6baa1f48e0b..24297420466d 100644 --- a/ydb/core/viewer/json_pipe_req.cpp +++ b/ydb/core/viewer/json_pipe_req.cpp @@ -544,24 +544,29 @@ void TViewerPipeClient::RequestBSControllerPDiskUpdateStatus(const NKikimrBlobSt SendRequestToPipe(pipeClient, request.Release()); } -void TViewerPipeClient::RequestSchemeCacheNavigate(const TString& path) { +THolder TViewerPipeClient::SchemeCacheNavigateRequestBuilder ( + NSchemeCache::TSchemeCacheNavigate::TEntry&& entry +) { THolder request = MakeHolder(); - NSchemeCache::TSchemeCacheNavigate::TEntry entry; - entry.Path = SplitPath(path); entry.RedirectRequired = false; entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpPath; - request->ResultSet.emplace_back(entry); + request->ResultSet.emplace_back(std::move(entry)); + return request; +} + +void TViewerPipeClient::RequestSchemeCacheNavigate(const TString& path) { + NSchemeCache::TSchemeCacheNavigate::TEntry entry; + entry.Path = SplitPath(path); + + auto request = SchemeCacheNavigateRequestBuilder(std::move(entry)); SendRequest(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release())); } void TViewerPipeClient::RequestSchemeCacheNavigate(const TPathId& pathId) { - THolder request = MakeHolder(); NSchemeCache::TSchemeCacheNavigate::TEntry entry; entry.TableId.PathId = pathId; entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId; - entry.RedirectRequired = false; - entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpPath; - request->ResultSet.emplace_back(entry); + auto request = SchemeCacheNavigateRequestBuilder(std::move(entry)); SendRequest(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release())); } @@ -594,6 +599,41 @@ TViewerPipeClient::TRequestResponse + TViewerPipeClient::MakeRequestSchemeShardDescribe(TTabletId schemeShardId, const TString& path, const NKikimrSchemeOp::TDescribeOptions& options, ui64 cookie) { + auto request = std::make_unique(); + request->Record.SetSchemeshardId(schemeShardId); + request->Record.SetPath(path); + request->Record.MutableOptions()->CopyFrom(options); + auto response = MakeRequestToTablet(schemeShardId, request.release(), cookie); + if (response.Span) { + response.Span.Attribute("path", path); + } + return response; +} + +TViewerPipeClient::TRequestResponse TViewerPipeClient::MakeRequestSchemeCacheNavigateWithToken( + const TString& path, bool showPrivate, ui32 access, ui64 cookie +) { + NSchemeCache::TSchemeCacheNavigate::TEntry entry; + entry.Path = SplitPath(path); + entry.ShowPrivatePath = showPrivate; + entry.Access = access; + auto request = SchemeCacheNavigateRequestBuilder(std::move(entry)); + + if (!Event->Get()->UserToken.empty()) + request->UserToken = new NACLib::TUserToken(Event->Get()->UserToken); + + auto response = MakeRequest( + MakeSchemeCacheID(), + new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()), 0 /*flags*/, cookie + ); + if (response.Span) { + response.Span.Attribute("path", path); + } + return response; +} + void TViewerPipeClient::RequestTxProxyDescribe(const TString& path) { THolder request(new TEvTxUserProxy::TEvNavigate()); request->Record.MutableDescribePath()->SetPath(path); @@ -776,6 +816,10 @@ TString TViewerPipeClient::GetHTTPBADREQUEST(TString contentType, TString respon return Viewer->GetHTTPBADREQUEST(GetRequest(), std::move(contentType), std::move(response)); } +TString TViewerPipeClient::GetHTTPNOTFOUND(TString, TString) { + return Viewer->GetHTTPNOTFOUND(GetRequest()); +} + TString TViewerPipeClient::GetHTTPINTERNALERROR(TString contentType, TString response) { return Viewer->GetHTTPINTERNALERROR(GetRequest(), std::move(contentType), std::move(response)); } diff --git a/ydb/core/viewer/json_pipe_req.h b/ydb/core/viewer/json_pipe_req.h index c4d29c553f25..779ba794ad47 100644 --- a/ydb/core/viewer/json_pipe_req.h +++ b/ydb/core/viewer/json_pipe_req.h @@ -266,10 +266,19 @@ class TViewerPipeClient : public TActorBootstrapped { TRequestResponse RequestBSControllerPDisks(); TRequestResponse RequestBSControllerStorageStats(); void RequestBSControllerPDiskUpdateStatus(const NKikimrBlobStorage::TUpdateDriveStatus& driveStatus, bool force = false); + + THolder SchemeCacheNavigateRequestBuilder(NSchemeCache::TSchemeCacheNavigate::TEntry&& entry); + void RequestSchemeCacheNavigate(const TString& path); void RequestSchemeCacheNavigate(const TPathId& pathId); + TRequestResponse MakeRequestSchemeCacheNavigate(const TString& path, ui64 cookie = 0); TRequestResponse MakeRequestSchemeCacheNavigate(TPathId pathId, ui64 cookie = 0); + + TRequestResponse MakeRequestSchemeShardDescribe(TTabletId schemeShardId, const TString& path, const NKikimrSchemeOp::TDescribeOptions& options = {}, ui64 cookie = 0); + TRequestResponse MakeRequestSchemeCacheNavigateWithToken( + const TString& path, bool showPrivate, ui32 access, ui64 cookie = 0); + TRequestResponse MakeRequestViewer(TNodeId nodeId, TEvViewer::TEvViewerRequest* request, ui32 flags = 0); void RequestTxProxyDescribe(const TString& path); void RequestStateStorageEndpointsLookup(const TString& path); @@ -316,6 +325,7 @@ class TViewerPipeClient : public TActorBootstrapped { TString GetHTTPOKJSON(const google::protobuf::Message& response, TInstant lastModified = {}); TString GetHTTPGATEWAYTIMEOUT(TString contentType = {}, TString response = {}); TString GetHTTPBADREQUEST(TString contentType = {}, TString response = {}); + TString GetHTTPNOTFOUND(TString contentType = {}, TString response = {}); TString GetHTTPINTERNALERROR(TString contentType = {}, TString response = {}); TString GetHTTPFORBIDDEN(TString contentType = {}, TString response = {}); TString MakeForward(const std::vector& nodes); diff --git a/ydb/core/viewer/protos/viewer.proto b/ydb/core/viewer/protos/viewer.proto index dff4b2d4cbc2..ff36362e34e9 100644 --- a/ydb/core/viewer/protos/viewer.proto +++ b/ydb/core/viewer/protos/viewer.proto @@ -830,3 +830,29 @@ message TFeatureFlagsConfig { repeated TDatabase Databases = 2; } +message TTopicDataResponse { + message TMessage { + uint64 Offset = 1; + uint64 CreateTimestamp = 2; + uint64 WriteTimestamp = 3; + uint64 TimestampDiff = 4; + string Message = 5; + uint32 StorageSize = 6; + uint32 OriginalSize = 7; + uint32 Codec = 8; + string ProducerId = 9; + uint64 SeqNo = 10; + + message TMetadataItem { + string Key = 1; + string Value = 2; + } + repeated TMetadataItem MessageMetadata = 11; + } + + uint64 StartOffset = 1; + uint64 EndOffset = 2; + repeated TMessage Messages = 3; + bool Truncated = 4; +} + diff --git a/ydb/core/viewer/tests/canondata/result.json b/ydb/core/viewer/tests/canondata/result.json index af422fd482df..3f8c0ade58bf 100644 --- a/ydb/core/viewer/tests/canondata/result.json +++ b/ydb/core/viewer/tests/canondata/result.json @@ -4713,5 +4713,75 @@ }, "test.test_wait_for_cluster_ready": { "wait_good": true + }, + "test.test_topic_data": { + "response_read": { + "Messages": [ + { + "Codec": 0, "StorageSize": 9, "SeqNo": 1, "Message": "bWVzc2FnZS0w", "OriginalSize": 9, "Offset": 0 + }, + { + "Codec": 0, "StorageSize": 9, "SeqNo": 2, "Message": "bWVzc2FnZS0x", "OriginalSize": 9, "Offset": 1 + }, + { + "Codec": 0, "StorageSize": 9, "SeqNo": 3, "Message": "bWVzc2FnZS0y", "OriginalSize": 9, "Offset": 2 + }, + { + "Codec": 0, "StorageSize": 9, "SeqNo": 4, "Message": "bWVzc2FnZS0z", "OriginalSize": 9, "Offset": 3 + }, + { + "Codec": 0, "StorageSize": 9, "SeqNo": 5, "Message": "bWVzc2FnZS00", "OriginalSize": 9, "Offset": 4 + } + ], + "StartOffset": 0, + "EndOffset": 21, + "Truncated": true + }, + "response_metadata": { + "Messages": [ + { + "Codec": 1, "StorageSize": 37, "SeqNo": 11, + "MessageMetadata": [ + {"Value": "value1", "Key": "key1"}, + {"Value": "value2", "Key": "key2"}], + "Message": "bWVzc2FnZV93aXRoX21ldGE=", "OriginalSize": 17, "Offset": 10 + } + ], + "StartOffset": 0, + "EndOffset": 21, + "Truncated": true + }, + "response_compressed": { + "Messages": [ + { + "Codec": 1, "StorageSize": 38, "SeqNo": 12, "Message": "Y29tcHJlc3NlZC1tZXNzYWdlLTA=", "OriginalSize": 20, "Offset": 11 + }, + { + "Codec": 1, "StorageSize": 38, "SeqNo": 13, "Message": "Y29tcHJlc3NlZC1tZXNzYWdlLTE=", "OriginalSize": 20, "Offset": 12 + }, + { + "Codec": 1, "StorageSize": 38, "SeqNo": 14, "Message": "Y29tcHJlc3NlZC1tZXNzYWdlLTI=", "OriginalSize": 20, "Offset": 13 + }, + { + "Codec": 1, "StorageSize": 38, "SeqNo": 15, "Message": "Y29tcHJlc3NlZC1tZXNzYWdlLTM=", "OriginalSize": 20, "Offset": 14 + }, + { + "Codec": 1, "StorageSize": 38, "SeqNo": 16, "Message": "Y29tcHJlc3NlZC1tZXNzYWdlLTQ=", "OriginalSize": 20, "Offset": 15 + } + ], + "StartOffset": 0, + "EndOffset": 21, + "Truncated": true + }, + "response_not_truncated": { + "Messages": [ + { + "Codec": 1, "StorageSize": 38, "SeqNo": 21, "Message": "Y29tcHJlc3NlZC1tZXNzYWdlLTk=", "OriginalSize": 20, "Offset": 20 + } + ], + "StartOffset": 0, + "EndOffset": 21, + "Truncated": false + } } } diff --git a/ydb/core/viewer/tests/test.py b/ydb/core/viewer/tests/test.py index 8107183f21eb..c79175651b67 100644 --- a/ydb/core/viewer/tests/test.py +++ b/ydb/core/viewer/tests/test.py @@ -1,5 +1,9 @@ # -*- coding: utf-8 -*- + +import ydb +from ydb._topic_writer.topic_writer import PublicMessage from ydb.tests.library.harness.kikimr_runner import KiKiMR + from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator import requests from urllib.parse import urlencode @@ -560,3 +564,101 @@ def test_pqrb_tablet(): 'PathId', 'SchemeShard' ]) + + +def test_viewer_nodes_issue_14992(): + response_group_by = get_viewer_normalized("/viewer/nodes", { + 'group': 'Uptime' + }) + response_group = get_viewer_normalized("/viewer/nodes", { + 'filter_group_by': 'Uptime', + 'filter_group' : response_group_by['NodeGroups'][0]['GroupName'], + }) + result = { + 'response_group_by': response_group_by, + 'response_group': response_group, + } + return result + + +def test_topic_data(): + grpc_port = cluster.nodes[1].grpc_port + + call_viewer("/viewer/query", { + 'database': dedicated_db, + 'query': 'CREATE TOPIC topic1', + 'schema': 'multi' + }) + + endpoint = "localhost:{}".format(grpc_port) + driver = ydb.Driver(endpoint=endpoint, database=dedicated_db, oauth=None) + driver.wait(10, fail_fast=True) + driver.topic_client.create_topic('topic2', min_active_partitions=1, max_active_partitions=1) + + def write(writer, message_pattern, close=True): + writer.write(["{}-{}".format(message_pattern, i) for i in range(10)]) + writer.flush() + if close: + writer.close() + + writer = driver.topic_client.writer('topic2', producer_id="12345") + write(writer, "message", False) + + # Also write one messagewith metadata + message_w_meta = PublicMessage(data="message_with_meta", metadata_items={"key1": "value1", "key2": "value2"}) + writer.write(message_w_meta) + writer.close() + + writer_compressed = driver.topic_client.writer('topic2', producer_id="12345", codec=2) + write(writer_compressed, "compressed-message") + + response = call_viewer("/viewer/topic_data", { + 'database': dedicated_db, + 'path': '{}/topic2'.format(dedicated_db), + 'partition': '0', + 'offset': '0', + 'limit': '5' + }) + + response_w_meta = call_viewer("/viewer/topic_data", { + 'database': dedicated_db, + 'path': '{}/topic2'.format(dedicated_db), + 'partition': '0', + 'offset': '10', + 'limit': '1' + }) + response_compressed = call_viewer("/viewer/topic_data", { + 'database': dedicated_db, + 'path': '{}/topic2'.format(dedicated_db), + 'partition': '0', + 'offset': '11', + 'limit': '5' + }) + + response_last = call_viewer("/viewer/topic_data", { + 'database': dedicated_db, + 'path': '{}/topic2'.format(dedicated_db), + 'partition': '0', + 'offset': '20', + 'limit': '5' + }) + + def strip_non_canonized(resp): + for message in resp["Messages"]: + assert int(message.get("CreateTimestamp", "0")) != 0 + assert int(message.get("WriteTimestamp", "0")) != 0 + assert int(message.get("TimestampDiff", None)) >= 0 + assert message.get("ProducerId", None) is not None + del message["CreateTimestamp"] + del message["WriteTimestamp"] + del message["TimestampDiff"] + del message["ProducerId"] + return resp + + result = { + 'response_read': strip_non_canonized(response), + 'response_metadata': strip_non_canonized(response_w_meta), + 'response_compressed': strip_non_canonized(response_compressed), + 'response_not_truncated': strip_non_canonized(response_last) + } + return result diff --git a/ydb/core/viewer/tests/ya.make b/ydb/core/viewer/tests/ya.make index 820daa387969..c768a5a87f27 100644 --- a/ydb/core/viewer/tests/ya.make +++ b/ydb/core/viewer/tests/ya.make @@ -4,7 +4,6 @@ ENV(YDB_DRIVER_BINARY="ydb/apps/ydbd/ydbd") TEST_SRCS(test.py) SIZE(MEDIUM) - DEPENDS( ydb/apps/ydbd ) @@ -13,6 +12,7 @@ PEERDIR( contrib/python/requests contrib/python/urllib3 ydb/tests/library + ydb/public/sdk/python/enable_v3_new_behavior ) END() diff --git a/ydb/core/viewer/topic_data_ut.cpp b/ydb/core/viewer/topic_data_ut.cpp new file mode 100644 index 000000000000..834ecfdefe2e --- /dev/null +++ b/ydb/core/viewer/topic_data_ut.cpp @@ -0,0 +1,227 @@ +#include "ut/ut_utils.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace NKikimr; +using namespace Tests; +using namespace NYdb::NPersQueue; +using namespace NHttp; +using namespace NJson; + + +Y_UNIT_TEST_SUITE(ViewerTopicDataTests) { + template + void CheckMapValue(const NJson::TJsonValue::TMapType& map, const TString& key, const T& value) { + auto iter = map.find(key); + UNIT_ASSERT(iter != map.end()); + UNIT_ASSERT_VALUES_EQUAL_C(iter->second, value, key); + } + TString GetRequestUrl(TString topic, ui32 partition, ui64 offset = 0, ui32 limit = 10) { + TStringBuilder url; + CGIUnescape(topic); + url << "/viewer/topic_data" << "?path=" << topic << "&partition=" << partition << "&offset=" << offset + << "&limit=" << limit << "&encode_data=false"; + return url; + } + + TKeepAliveHttpClient::THttpCode MakeRequest(TKeepAliveHttpClient& httpClient, const TString& url, TJsonValue& json) { + json = TJsonValue{}; + TString response; + TStringStream responseStream; + + TKeepAliveHttpClient::THeaders headers; + headers["Accept"] = "application/json"; + + NKikimr::NViewerTests::THttpRequest httpReq(HTTP_METHOD_GET); + httpReq.HttpHeaders.AddHeader("Accept", "application/json"); + + auto statusCode = httpClient.DoGet(url, &responseStream, headers); + response = responseStream.ReadAll(); + if (statusCode != HTTP_OK) { + Cerr << "Got response:" << statusCode << ": " << response << Endl; + return statusCode; + } + + NJson::TJsonReaderConfig jsonCfg; + NJson::ReadJsonTree(response, &jsonCfg, &json, /* throwOnError = */ true); + UNIT_ASSERT(json.GetType() == EJsonValueType::JSON_MAP); + const auto& map_ = json.GetMap(); + UNIT_ASSERT(map_.find("Messages") != map_.end()); + UNIT_ASSERT(map_.find("Messages")->second.GetType() == EJsonValueType::JSON_ARRAY); + return statusCode; + } + + Y_UNIT_TEST(TopicDataTest) { + TPortManager tp; + ui16 port = tp.GetPort(2134); + ui16 grpcPort = tp.GetPort(2135); + ui16 monPort = tp.GetPort(8765); + + auto settings = NKikimr::NPersQueueTests::PQSettings(port, 1); + settings.PQConfig.MutableQuotingConfig()->SetEnableQuoting(false); + settings.PQConfig.SetTopicsAreFirstClassCitizen(true); + + settings.InitKikimrRunConfig() + .SetNodeCount(1) + .SetUseRealThreads(true) + .SetDomainName("Root") + .SetMonitoringPortOffset(monPort, true); + + auto grpcSettings = NYdbGrpc::TServerOptions().SetHost("[::1]").SetPort(grpcPort); + TServer server{settings}; + server.EnableGRpc(grpcSettings); + + auto client = MakeHolder(settings, grpcPort); + client->InitRoot(); + client->InitSourceIds(); + NYdb::TDriverConfig driverCfg; + TString topicPath = "/Root/topic1"; + driverCfg.SetEndpoint(TStringBuilder() << "localhost:" << grpcPort) + .SetLog(std::unique_ptr(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG).Release())); + + NYdb::TDriver ydbDriver{driverCfg}; + auto topicClient = NYdb::NTopic::TTopicClient(ydbDriver); + + auto res = topicClient.CreateTopic(topicPath).GetValueSync(); + UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); + + auto writeData = [&](NYdb::NPersQueue::ECodec codec, ui64 count, const TString& producerId, ui64 size = 100u) { + NYdb::NPersQueue::TWriteSessionSettings wsSettings; + wsSettings.Path(topicPath); + wsSettings.MessageGroupId(producerId); + wsSettings.Codec(codec); + + auto writer = TPersQueueClient(ydbDriver).CreateSimpleBlockingWriteSession(TWriteSessionSettings(wsSettings).ClusterDiscoveryMode(EClusterDiscoveryMode::Off)); + TString dataFiller{size, 'a'}; + + for (auto i = 0u; i < count; ++i) { + writer->Write(TStringBuilder() << "Message " << i << " : " << dataFiller); + } + writer->Close(); + }; + + writeData(ECodec::GZIP, 20, "producer1"); + writeData(ECodec::RAW, 20, "producer2"); + + TKeepAliveHttpClient httpClient("localhost", monPort); + NKikimr::NViewerTests::WaitForHttpReady(httpClient); + + TJsonValue json; + TString producer1, producer2, producer3; + + // Test 1 - compressed data with limit + { + auto statusCode = MakeRequest(httpClient, GetRequestUrl(topicPath, 0, 0, 10), json); + UNIT_ASSERT_EQUAL(statusCode, HTTP_OK); + const auto& overalResponse = json.GetMap(); + CheckMapValue(overalResponse, "StartOffset", 0); + CheckMapValue(overalResponse, "EndOffset", 40); + + const auto& messages = overalResponse.find("Messages")->second.GetArray(); + + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 10); + for (auto i = 0u; i < 10; ++i) { + const auto& item = messages[i]; + UNIT_ASSERT(item.GetType() == EJsonValueType::JSON_MAP); + const auto& jsonMap = item.GetMap(); + CheckMapValue(jsonMap, "Offset", i); + CheckMapValue(jsonMap, "SeqNo", i + 1); + CheckMapValue(jsonMap, "StorageSize", 35); + CheckMapValue(jsonMap, "OriginalSize", 112); + CheckMapValue(jsonMap, "Codec", 1); + if (producer1.empty()) { + UNIT_ASSERT(jsonMap.find("ProducerId") != jsonMap.end()); + producer1 = jsonMap.find("ProducerId")->second.GetString(); + } else { + CheckMapValue(jsonMap, "ProducerId", producer1); + } + UNIT_ASSERT(jsonMap.find("CreateTimestamp") != jsonMap.end()); + UNIT_ASSERT(jsonMap.find("WriteTimestamp") != jsonMap.end()); + } + } + // Test 2 - uncompressed data with limit and start offset + { + auto statusCode = MakeRequest(httpClient, GetRequestUrl(topicPath, 0, 20, 10), json); + UNIT_ASSERT_EQUAL(statusCode, HTTP_OK); + const auto& overalResponse = json.GetMap(); + CheckMapValue(overalResponse, "StartOffset", 0); + CheckMapValue(overalResponse, "EndOffset", 40); + const auto& messages = overalResponse.find("Messages")->second.GetArray(); + + + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 10); + for (auto i = 0u; i < 10; ++i) { + const auto& item = messages[i]; + UNIT_ASSERT(item.GetType() == EJsonValueType::JSON_MAP); + const auto& jsonMap = item.GetMap(); + CheckMapValue(jsonMap, "Offset", 20 + i); + CheckMapValue(jsonMap, "SeqNo", i + 1); + CheckMapValue(jsonMap, "StorageSize", 112); + CheckMapValue(jsonMap, "OriginalSize", 112); + CheckMapValue(jsonMap, "Codec", 0); + if (producer2.empty()) { + UNIT_ASSERT(jsonMap.find("ProducerId") != jsonMap.end()); + producer2 = jsonMap.find("ProducerId")->second.GetString(); + UNIT_ASSERT_VALUES_UNEQUAL(producer2, producer1); + } else { + CheckMapValue(jsonMap, "ProducerId", producer2); + } + UNIT_ASSERT(jsonMap.find("CreateTimestamp") != jsonMap.end()); + UNIT_ASSERT(jsonMap.find("WriteTimestamp") != jsonMap.end()); + } + } + // Test 3 - large messages + + { + writeData(ECodec::GZIP, 20, "producer3", 1_MB); + + auto statusCode = MakeRequest(httpClient, GetRequestUrl(topicPath, 0, 40, 20), json); + UNIT_ASSERT_EQUAL(statusCode, HTTP_OK); + const auto& overalResponse = json.GetMap(); + CheckMapValue(overalResponse, "EndOffset", 60); + const auto& messages = overalResponse.find("Messages")->second.GetArray(); + + + UNIT_ASSERT_C(messages.size() <= 10, messages.size()); + for (auto i = 0u; i < 10; ++i) { + const auto& item = messages[i]; + UNIT_ASSERT(item.GetType() == EJsonValueType::JSON_MAP); + const auto& jsonMap = item.GetMap(); + CheckMapValue(jsonMap, "Offset", 40 + i); + CheckMapValue(jsonMap, "OriginalSize", 1_MB + 12); + CheckMapValue(jsonMap, "Codec", 1); + UNIT_ASSERT(jsonMap.find("Message") != jsonMap.end()); + Cerr << "Size: " << jsonMap.find("Message")->second.GetString().size() << Endl; + UNIT_ASSERT(jsonMap.find("Message")->second.GetString().size() < 1500_KB); + UNIT_ASSERT(jsonMap.find("Message")->second.GetString().size() > 500_KB); + CheckMapValue(jsonMap, "SeqNo", i + 1); + + if (producer3.empty()) { + UNIT_ASSERT(jsonMap.find("ProducerId") != jsonMap.end()); + producer3 = jsonMap.find("ProducerId")->second.GetString(); + UNIT_ASSERT_VALUES_UNEQUAL(producer2, producer3); + } else { + CheckMapValue(jsonMap, "ProducerId", producer3); + } + UNIT_ASSERT(jsonMap.find("CreateTimestamp") != jsonMap.end()); + UNIT_ASSERT(jsonMap.find("WriteTimestamp") != jsonMap.end()); + } + } + // Test 4 - bad topic, partition, offset + { + auto statusCode = MakeRequest(httpClient, GetRequestUrl("/Root/bad-topic", 0, 20, 10), json); + UNIT_ASSERT_EQUAL(statusCode, HTTP_BAD_REQUEST); + statusCode = MakeRequest(httpClient, GetRequestUrl(topicPath, 10, 20, 10), json); + UNIT_ASSERT_EQUAL(statusCode, HTTP_BAD_REQUEST); + statusCode = MakeRequest(httpClient, GetRequestUrl(topicPath, 0, 10000, 10), json); + UNIT_ASSERT_EQUAL(statusCode, HTTP_BAD_REQUEST); + } + } +}; diff --git a/ydb/core/viewer/ut/ut_utils.cpp b/ydb/core/viewer/ut/ut_utils.cpp new file mode 100644 index 000000000000..8f21938308bf --- /dev/null +++ b/ydb/core/viewer/ut/ut_utils.cpp @@ -0,0 +1,18 @@ +#include "ut_utils.h" + +namespace NKikimr::NViewerTests { + +void WaitForHttpReady(TKeepAliveHttpClient& client) { + for (int retries = 0;; ++retries) { + UNIT_ASSERT(retries < 100); + TStringStream responseStream; + const TKeepAliveHttpClient::THttpCode statusCode = client.DoGet("/viewer/simple_counter?max_counter=1&period=100", &responseStream); + const TString response = responseStream.ReadAll(); + if (statusCode == HTTP_OK) { + break; + } + } +} + +} // namespace NKikimr::NViewerTests + diff --git a/ydb/core/viewer/ut/ut_utils.h b/ydb/core/viewer/ut/ut_utils.h new file mode 100644 index 000000000000..9f930ec353f7 --- /dev/null +++ b/ydb/core/viewer/ut/ut_utils.h @@ -0,0 +1,57 @@ +#pragma once + +#include +#include +#include + +namespace NKikimr::NViewerTests { + +struct THttpRequest : NMonitoring::IHttpRequest { + HTTP_METHOD Method; + TCgiParameters CgiParameters; + THttpHeaders HttpHeaders; + TString PostContent; + + THttpRequest(HTTP_METHOD method) + : Method(method) + {} + + ~THttpRequest() {} + + const char* GetURI() const override { + return ""; + } + + const char* GetPath() const override { + return ""; + } + + const TCgiParameters& GetParams() const override { + return CgiParameters; + } + + const TCgiParameters& GetPostParams() const override { + return CgiParameters; + } + + TStringBuf GetPostContent() const override { + return PostContent; + } + + HTTP_METHOD GetMethod() const override { + return Method; + } + + const THttpHeaders& GetHeaders() const override { + return HttpHeaders; + } + + TString GetRemoteAddr() const override { + return TString(); + } +}; + +void WaitForHttpReady(TKeepAliveHttpClient& client); + +} // namespace NKikimr::NViewerTests + diff --git a/ydb/core/viewer/ut/ya.make b/ydb/core/viewer/ut/ya.make index ee2ee65ee7f8..66c01bab212d 100644 --- a/ydb/core/viewer/ut/ya.make +++ b/ydb/core/viewer/ut/ya.make @@ -1,19 +1,25 @@ UNITTEST_FOR(ydb/core/viewer) +ADDINCL( + ydb/public/sdk/cpp +) + FORK_SUBTESTS() SIZE(MEDIUM) - YQL_LAST_ABI_VERSION() SRCS( viewer_ut.cpp + topic_data_ut.cpp + ut/ut_utils.cpp ) PEERDIR( library/cpp/http/misc library/cpp/http/simple ydb/core/testlib/default + ydb/public/sdk/cpp/src/client/persqueue_public/ut/ut_utils ) END() diff --git a/ydb/core/viewer/viewer_topic_data.cpp b/ydb/core/viewer/viewer_topic_data.cpp new file mode 100644 index 000000000000..5f1e875f2d55 --- /dev/null +++ b/ydb/core/viewer/viewer_topic_data.cpp @@ -0,0 +1,281 @@ +#include "viewer_topic_data.h" +#include +#include +#include + +namespace NKikimr::NViewer { + +void TTopicData::HandleDescribe(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + if (ev->Cookie != 1) { + return ReplyAndPassAway(GetHTTPINTERNALERROR("text/plain", "Internal actor state got corrupted while trying to describe topic")); + } + NavigateResponse->Set(std::move(ev)); + if (NavigateResponse->IsError()) { + return ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", NavigateResponse->GetError())); + } + Y_ABORT_UNLESS(NavigateResponse->Get()); + Y_ABORT_UNLESS(NavigateResponse->Get()->Request); + const auto& request = *NavigateResponse->Get()->Request; + if (!NavigateResponse->IsOk()) { + TStringBuilder error; + + if (request.ResultSet.size() != 0) { + switch (request.ResultSet[0].Status) { + case NSchemeCache::TSchemeCacheNavigate::EStatus::Ok: + break; // Unexpected but just in case + case NSchemeCache::TSchemeCacheNavigate::EStatus::Unknown: + case NSchemeCache::TSchemeCacheNavigate::EStatus::PathNotTable: + case NSchemeCache::TSchemeCacheNavigate::EStatus::TableCreationNotComplete: + case NSchemeCache::TSchemeCacheNavigate::EStatus::LookupError: + case NSchemeCache::TSchemeCacheNavigate::EStatus::RedirectLookupError: + error << "Got internal schema error while trying to describe topic: '" << TopicPath << "'"; + return ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", error)); + + case NSchemeCache::TSchemeCacheNavigate::EStatus::RootUnknown: + case NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown: + case NSchemeCache::TSchemeCacheNavigate::EStatus::PathNotPath: + error << "Topic not found: '" << TopicPath << "'"; + return ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", error)); + + case NSchemeCache::TSchemeCacheNavigate::EStatus::AccessDenied: + error << "Access denied to topuc: '" << TopicPath << "'"; + return ReplyAndPassAway(GetHTTPFORBIDDEN("text/plain", error)); + + default: + return ReplyAndPassAway(GetHTTPINTERNALERROR("text/plain", "Got unknown error type trying to describe topic")); + + } + } + error << "While trying to find topic: '" << TopicPath << "' got error '" << NavigateResponse->GetError() << "'"; + return ReplyAndPassAway(GetHTTPINTERNALERROR("text/plain", error)); + } + const auto& response = request.ResultSet.front(); + if (response.Self->Info.GetPathType() != NKikimrSchemeOp::EPathTypePersQueueGroup) { + auto error = TStringBuilder() << "No such topic '" << TopicPath << ""; + return ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", error)); + } + + { + TString authError; + auto pathWithName = TStringBuilder() << "topic " << TopicPath; + auto authResult = NKikimr::NTopicHelpers::CheckAccess(*AppData(ActorContext()), response, Event->Get()->UserToken, pathWithName, authError); + switch (authResult) { + case NKikimr::NTopicHelpers::EAuthResult::AuthOk: + break; + case NKikimr::NTopicHelpers::EAuthResult::AccessDenied: + case NKikimr::NTopicHelpers::EAuthResult::TokenRequired: + return ReplyAndPassAway(GetHTTPFORBIDDEN("text/plain", authError)); + } + } + const auto& partitions = response.PQGroupInfo->Description.GetPartitions(); + for (auto& partition : partitions) { + auto partitionId = partition.GetPartitionId(); + if (partitionId == PartitionId) { + TabletId = partition.GetTabletId(); + SendPQReadRequest(); + RequestDone(); + return; + } + } + ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", "No such partition in topic")); +} + +void TTopicData::SendPQReadRequest() { + auto pipeClient = ConnectTabletPipe(TabletId); + + NKikimrClient::TPersQueueRequest request; + request.MutablePartitionRequest()->SetTopic(TopicPath); + request.MutablePartitionRequest()->SetPartition(PartitionId); + ActorIdToProto(pipeClient, request.MutablePartitionRequest()->MutablePipeClient()); + + auto cmdRead = request.MutablePartitionRequest()->MutableCmdRead(); + cmdRead->SetClientId(NKikimr::NPQ::CLIENTID_WITHOUT_CONSUMER); + cmdRead->SetCount(Limit); + cmdRead->SetOffset(Offset); + cmdRead->SetTimeoutMs(READ_TIMEOUT_MS); + cmdRead->SetExternalOperation(true); + + auto req = MakeHolder(); + req->Record.Swap(&request); + SendRequestToPipe(pipeClient, req.Release()); +} + +void TTopicData::HandlePQResponse(TEvPersQueue::TEvResponse::TPtr& ev) { + ReadResponse = ev->Release(); + const auto& record = ReadResponse->Record; + if (record.GetStatus() == NMsgBusProxy::MSTATUS_ERROR) { + switch (record.GetErrorCode()) { + case ::NPersQueue::NErrorCode::READ_ERROR_TOO_SMALL_OFFSET: + case ::NPersQueue::NErrorCode::READ_ERROR_TOO_BIG_OFFSET: + return ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", "Bad offset"), record.GetErrorReason()); + break; + default: + return ReplyAndPassAway(GetHTTPINTERNALERROR("text/plain", "Error trying to read messages"), record.GetErrorReason()); + } + return; + } + const auto& response = record.GetPartitionResponse(); + + if (response.HasCmdReadResult()) { + const auto& readResult = response.GetCmdReadResult(); + if (readResult.GetReadingFinished()) { + ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", "Bad partition-id")); + return; + } + } else { + return ReplyAndPassAway(GetHTTPINTERNALERROR("text/plain", "No data received from topic")); + } + FillProtoResponse(); + RequestDone(); +} + +void TTopicData::FillProtoResponse(ui64 maxSingleMessageSize, ui64 maxTotalSize) { + ui64 totalSize = 0; + const auto& response = ReadResponse->Record.GetPartitionResponse(); + if(!response.HasCmdReadResult()) { + return; + } + const auto& cmdRead = response.GetCmdReadResult(); + + auto setData = [&](NKikimrViewer::TTopicDataResponse::TMessage& protoMessage, TString&& data) { + protoMessage.SetOriginalSize(data.size()); + if (data.size() > maxSingleMessageSize) { + data.resize(maxSingleMessageSize); + } + totalSize += data.size(); + protoMessage.SetMessage(std::move(Base64Encode(data))); + }; + ProtoResponse.SetStartOffset(cmdRead.GetStartOffset()); + bool truncated = true; + ProtoResponse.SetEndOffset(cmdRead.GetEndOffset()); + + for (auto& r : cmdRead.GetResult()) { + if (totalSize >= maxTotalSize) { + break; + } + auto dataChunk = (NKikimr::GetDeserializedData(r.GetData())); + auto* messageProto = ProtoResponse.AddMessages(); + messageProto->SetOffset(r.GetOffset()); + + if (r.GetOffset() == cmdRead.GetEndOffset() - 1) + truncated = false; + + messageProto->SetCreateTimestamp(r.GetCreateTimestampMS()); + messageProto->SetWriteTimestamp(r.GetWriteTimestampMS()); + i64 diff = r.GetWriteTimestampMS() - r.GetCreateTimestampMS(); + if (diff < 0) { + diff = 0; + } + messageProto->SetTimestampDiff(diff); + messageProto->SetStorageSize(dataChunk.GetData().size()); + + if (dataChunk.HasCodec() && dataChunk.GetCodec() != NPersQueueCommon::RAW) { + const NYdb::NTopic::ICodec* codec = GetCodec(static_cast(dataChunk.GetCodec())); + if (codec == nullptr) { + return ReplyAndPassAway(GetHTTPINTERNALERROR("text/plain", "Message decompression failed")); + } + setData(*messageProto, std::move(codec->Decompress(dataChunk.GetData()))); + } else { + setData(*messageProto, std::move(*dataChunk.MutableData())); + } + messageProto->SetCodec(dataChunk.GetCodec()); + messageProto->SetProducerId(r.GetSourceId()); + messageProto->SetSeqNo(r.GetSeqNo()); + + if (dataChunk.MessageMetaSize() > 0) { + for (const auto& metadata : dataChunk.GetMessageMeta()) { + auto* metadataProto = messageProto->AddMessageMetadata(); + auto jsonMetadataItem = NJson::TJsonValue(NJson::EJsonValueType::JSON_MAP); + metadataProto->SetKey(metadata.key()); + metadataProto->SetValue(metadata.value()); + } + } + } + ProtoResponse.SetTruncated(truncated); +} + +void TTopicData::ReplyAndPassAway() { + NProtobufJson::TProto2JsonConfig config; + //config.SetAddMissingFields(true); + config.SetMissingSingleKeyMode(NProtobufJson::TProto2JsonConfig::MissingKeyDefault); + TStringStream json; + NProtobufJson::Proto2Json(ProtoResponse, json, config); + ReplyAndPassAway(GetHTTPOKJSON(json.Str())); +} + +NYdb::NTopic::ICodec* TTopicData::GetCodec(NPersQueueCommon::ECodec codec) { + ui32 codecId = static_cast(codec); + auto iter = Codecs.find(codecId); + if (iter != Codecs.end()) { + return iter->second.Get(); + } + switch (codec) { + case NPersQueueCommon::GZIP: { + auto [iterator, ins] = Codecs.emplace(codecId, MakeHolder()); + return iterator->second.Get(); + break; + } + case NPersQueueCommon::ZSTD: { + auto [iterator, ins] = Codecs.emplace(codecId, MakeHolder()); + return iterator->second.Get(); + } + default: + return nullptr; + } +} + +void TTopicData::StateRequestedDescribe(TAutoPtr<::NActors::IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleDescribe); + hFunc(TEvPersQueue::TEvResponse, HandlePQResponse); + cFunc(TEvents::TSystem::Wakeup, HandleTimeout); + } +} + +bool TTopicData::GetIntegerParam(const TString& name, i64& value) { + const auto& params(Event->Get()->Request.GetParams()); + if (params.Has(name)) { + value = FromStringWithDefault(params.Get(name), -1); + if (value == -1) { + auto error = TStringBuilder() << "field ' "<< name << "' has invalid value, an interger >= 0 is expected"; + ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), "text/plain", error)); + return false; + } + return true; + } else { + auto error = TStringBuilder() << "field ' "<< name << "' is required"; + ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), "text/plain", error)); + return false; + } +} + +void TTopicData::Bootstrap() { + if (!Database.empty() && TBase::NeedToRedirect()) { + return; + } + const auto& params(Event->Get()->Request.GetParams()); + Timeout = TDuration::Seconds(std::min((ui32)Timeout.Seconds(), 30u)); + + + if (!GetIntegerParam("partition", PartitionId)) + return; + if (!GetIntegerParam("offset", Offset)) + return; + + Limit = FromStringWithDefault(params.Get("limit"), 10); + if (Limit > MAX_MESSAGES_LIMIT) { + return ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), "text/plain", "Too many messages requested")); + } + + TopicPath = params.Get("path"); + if (!TopicPath.empty()) { + NavigateResponse = MakeRequestSchemeCacheNavigateWithToken(TopicPath, true, NACLib::DescribeSchema, 1); + } else { + return ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), "text/plain", "field 'path' is required and should not be empty")); + } + Become(&TThis::StateRequestedDescribe, Timeout, new TEvents::TEvWakeup()); +} + + +} // namespace NKikimr::NViewer + diff --git a/ydb/core/viewer/viewer_topic_data.h b/ydb/core/viewer/viewer_topic_data.h new file mode 100644 index 000000000000..28cd5e39f474 --- /dev/null +++ b/ydb/core/viewer/viewer_topic_data.h @@ -0,0 +1,135 @@ +#pragma once +#include "json_pipe_req.h" +#include "viewer.h" +#include +#include +#include + +namespace NKikimr::NViewer { + +struct TEvViewerTopicData { + enum EEv { + EvTopicDataUnpacked = EventSpaceBegin(TKikimrEvents::ES_VIEWER), + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_VIEWER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_VIEWER)"); + + struct TEvTopicDataUnpacked : TEventLocal { + explicit TEvTopicDataUnpacked() = delete; + explicit TEvTopicDataUnpacked(bool status, NJson::TJsonValue&& data) + : Status(status) + , Data(std::move(data)) + { + } + + bool Status = true; + NJson::TJsonValue Data; + }; +}; // TEvViewerTopicData + + +class TTopicData : public TViewerPipeClient { + using TBase = TViewerPipeClient; + using TThis = TTopicData; + using TBase::ReplyAndPassAway; + using TBase::GetHTTPBADREQUEST; + +private: + void HandleDescribe(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev); + void SendPQReadRequest(); + void HandlePQResponse(TEvPersQueue::TEvResponse::TPtr& ev); + void FillProtoResponse(ui64 maxSingleMessageSize = 1_MB, ui64 maxTotalSize = 10_MB); + NYdb::NTopic::ICodec* GetCodec(NPersQueueCommon::ECodec codec); + bool GetIntegerParam(const TString& name, i64& value); + + STATEFN(StateRequestedDescribe); + + +public: + TTopicData(IViewer* viewer, NMon::TEvHttpInfo::TPtr& ev) + : TViewerPipeClient(viewer, ev) + {} + + void Bootstrap() override; + void ReplyAndPassAway() override; + +private: + ui64 TabletId; + TString TopicPath; + i64 PartitionId; + i64 Offset; + i64 Limit; + TMap> Codecs; + std::optional> NavigateResponse; + + TAutoPtr ReadResponse; + NKikimrViewer::TTopicDataResponse ProtoResponse; + + static constexpr ui32 READ_TIMEOUT_MS = 1000; + static constexpr ui32 MAX_MESSAGES_LIMIT = 1000; + +public: + static YAML::Node GetSwagger() { + YAML::Node node = YAML::Load(R"___( + get: + tags: + - viewer + summary: Read topic data + description: Reads and returns data from topic (if any) + parameters: + - name: database + in: query + description: database name + type: string + required: false + - name: path + in: query + description: path of topic + required: true + type: string + - name: partition + in: query + description: partition to read from + required: true + type: integer + - name: offset + in: query + description: start offset to read from + required: true + type: integer + - name: limit + in: query + description: max number of messages to read (default = 10) + required: false + type: integer + - name: timeout + in: query + description: timeout in ms + required: false + type: integer + responses: + 200: + description: OK + content: + application/json: + schema: + {} + 400: + description: Bad Request + 403: + description: Forbidden + 500: + description: Internal Server Error + 504: + description: Gateway Timeout + )___"); + + node["get"]["responses"]["200"]["content"]["application/json"]["schema"] = TProtoToYaml::ProtoToYamlSchema(); + + return node; + } +}; + +} // namespace NKikimr::NViewer + diff --git a/ydb/core/viewer/viewer_ut.cpp b/ydb/core/viewer/viewer_ut.cpp index e146120eba12..9a27c1c6d3d4 100644 --- a/ydb/core/viewer/viewer_ut.cpp +++ b/ydb/core/viewer/viewer_ut.cpp @@ -1,9 +1,9 @@ +#include "ut/ut_utils.h" #include #include #include #include #include -#include #include #include #include @@ -20,11 +20,11 @@ #include #include #include + #include #include #include - #include #include @@ -35,6 +35,7 @@ using namespace NKikimrWhiteboard; using namespace NSchemeShard; using namespace Tests; using namespace NMonitoring; +using namespace NKikimr::NViewerTests; using TNavigate = NSchemeCache::TSchemeCacheNavigate; #ifdef NDEBUG @@ -194,51 +195,6 @@ Y_UNIT_TEST_SUITE(Viewer) { Ctest << "Data has merged" << Endl; } - struct THttpRequest : NMonitoring::IHttpRequest { - HTTP_METHOD Method; - TCgiParameters CgiParameters; - THttpHeaders HttpHeaders; - TString PostContent; - - THttpRequest(HTTP_METHOD method) - : Method(method) - {} - - ~THttpRequest() {} - - const char* GetURI() const override { - return ""; - } - - const char* GetPath() const override { - return ""; - } - - const TCgiParameters& GetParams() const override { - return CgiParameters; - } - - const TCgiParameters& GetPostParams() const override { - return CgiParameters; - } - - TStringBuf GetPostContent() const override { - return PostContent; - } - - HTTP_METHOD GetMethod() const override { - return Method; - } - - const THttpHeaders& GetHeaders() const override { - return HttpHeaders; - } - - TString GetRemoteAddr() const override { - return TString(); - } - }; - class TMonPage: public IMonPage { public: TMonPage(const TString &path, const TString &title) @@ -587,18 +543,6 @@ Y_UNIT_TEST_SUITE(Viewer) { return NJson::ReadJsonTree(&responseStream, /* throwOnError = */ true); } - void WaitForHttpReady(TKeepAliveHttpClient& client) { - for (int retries = 0;; ++retries) { - UNIT_ASSERT(retries < 100); - TStringStream responseStream; - const TKeepAliveHttpClient::THttpCode statusCode = client.DoGet("/viewer/simple_counter?max_counter=1&period=100", &responseStream); - const TString response = responseStream.ReadAll(); - if (statusCode == HTTP_OK) { - break; - } - } - } - void GrantConnect(TClient& client) { client.CreateUser("/Root", "username", "password"); client.GrantConnect("username"); @@ -2117,5 +2061,4 @@ Y_UNIT_TEST_SUITE(Viewer) { UNIT_ASSERT_EQUAL_C(statusCode, HTTP_BAD_REQUEST, statusCode << ": " << response); UNIT_ASSERT_C(response.StartsWith("Conversion error"), response); } - } diff --git a/ydb/core/viewer/ya.make b/ydb/core/viewer/ya.make index 2f6a23713c39..4cef17095544 100644 --- a/ydb/core/viewer/ya.make +++ b/ydb/core/viewer/ya.make @@ -72,6 +72,7 @@ SRCS( viewer_describe.h viewer_describe_topic.h viewer_feature_flags.h + viewer_topic_data.cpp viewer_graph.h viewer_healthcheck.h viewer_helper.h @@ -595,6 +596,7 @@ PEERDIR( ydb/public/api/grpc ydb/public/sdk/cpp/adapters/issue ydb/public/sdk/cpp/src/client/types + ydb/services/lib/auth contrib/libs/yaml-cpp ) diff --git a/ydb/services/lib/auth/auth_helpers.cpp b/ydb/services/lib/auth/auth_helpers.cpp new file mode 100644 index 000000000000..f7dff0d61447 --- /dev/null +++ b/ydb/services/lib/auth/auth_helpers.cpp @@ -0,0 +1,27 @@ +#include "auth_helpers.h" +#include + +namespace NKikimr::NTopicHelpers { + +EAuthResult CheckAccess( + const NKikimr::TAppData& appData, + const NKikimr::NSchemeCache::TSchemeCacheNavigate::TEntry& describeEntry, + const TString& serializedToken, + const TString& entityName, TString& error +) { + if (serializedToken.empty()) { + if (appData.EnforceUserTokenRequirement || appData.PQConfig.GetRequireCredentialsInNewProtocol()) { + error = "Unauthenticated access is forbidden, please provide credentials"; + return EAuthResult::TokenRequired; + } + } else { + NACLib::TUserToken token(serializedToken); + if (!describeEntry.SecurityObject->CheckAccess(NACLib::EAccessRights::SelectRow, token)) { + error = "Access to " + entityName + " is denied for subject '" + token.GetUserSID() + "'"; + return EAuthResult::AccessDenied; + } + } + return EAuthResult::AuthOk; +} + +} //namespace diff --git a/ydb/services/lib/auth/auth_helpers.h b/ydb/services/lib/auth/auth_helpers.h new file mode 100644 index 000000000000..a471abb4dfbe --- /dev/null +++ b/ydb/services/lib/auth/auth_helpers.h @@ -0,0 +1,15 @@ +#include +#include +#include + +namespace NKikimr::NTopicHelpers { +enum class EAuthResult { + AuthOk, + AccessDenied, + TokenRequired +}; + +EAuthResult CheckAccess(const NKikimr::TAppData& appData, const NKikimr::NSchemeCache::TSchemeCacheNavigate::TEntry& describeEntry, + const TString& serializedToken, const TString& entityName, TString& error); + +} // namespace diff --git a/ydb/services/lib/auth/ya.make b/ydb/services/lib/auth/ya.make new file mode 100644 index 000000000000..909afe626c48 --- /dev/null +++ b/ydb/services/lib/auth/ya.make @@ -0,0 +1,13 @@ +LIBRARY() + +SRCS( + auth_helpers.cpp +) + +PEERDIR( + ydb/library/aclib + ydb/core/base + ydb/core/tx/scheme_cache +) + +END() From 4f9c447795a752e18185686ce707a9b1221802d4 Mon Sep 17 00:00:00 2001 From: FloatingCrowbar Date: Thu, 27 Mar 2025 13:39:23 +0300 Subject: [PATCH 2/6] Improve topic data handler with more funtions (#16140) Co-authored-by: Alexey Efimov --- ydb/core/viewer/tests/canondata/result.json | 44 +++++++++++----- ydb/core/viewer/tests/test.py | 56 +++++++++++++-------- ydb/core/viewer/topic_data_ut.cpp | 25 +++++++-- ydb/core/viewer/viewer_topic_data.cpp | 50 +++++++++--------- ydb/core/viewer/viewer_topic_data.h | 10 ++-- 5 files changed, 120 insertions(+), 65 deletions(-) diff --git a/ydb/core/viewer/tests/canondata/result.json b/ydb/core/viewer/tests/canondata/result.json index 3f8c0ade58bf..7de503cf9249 100644 --- a/ydb/core/viewer/tests/canondata/result.json +++ b/ydb/core/viewer/tests/canondata/result.json @@ -4718,19 +4718,24 @@ "response_read": { "Messages": [ { - "Codec": 0, "StorageSize": 9, "SeqNo": 1, "Message": "bWVzc2FnZS0w", "OriginalSize": 9, "Offset": 0 + "Codec": 0, "StorageSize": 9, "SeqNo": 1, "Message": "bWVzc2FnZS0w", "OriginalSize": 9, "Offset": 0, + "CreateTimestamp": "not-zero-number", "WriteTimestamp": "not-zero-number", "TimestampDiff": "not-zero-number", "ProducerId": "text" }, { - "Codec": 0, "StorageSize": 9, "SeqNo": 2, "Message": "bWVzc2FnZS0x", "OriginalSize": 9, "Offset": 1 + "Codec": 0, "StorageSize": 9, "SeqNo": 2, "Message": "bWVzc2FnZS0x", "OriginalSize": 9, "Offset": 1, + "CreateTimestamp": "not-zero-number", "WriteTimestamp": "not-zero-number", "TimestampDiff": "not-zero-number", "ProducerId": "text" }, { - "Codec": 0, "StorageSize": 9, "SeqNo": 3, "Message": "bWVzc2FnZS0y", "OriginalSize": 9, "Offset": 2 + "Codec": 0, "StorageSize": 9, "SeqNo": 3, "Message": "bWVzc2FnZS0y", "OriginalSize": 9, "Offset": 2, + "CreateTimestamp": "not-zero-number", "WriteTimestamp": "not-zero-number", "TimestampDiff": "not-zero-number", "ProducerId": "text" }, { - "Codec": 0, "StorageSize": 9, "SeqNo": 4, "Message": "bWVzc2FnZS0z", "OriginalSize": 9, "Offset": 3 + "Codec": 0, "StorageSize": 9, "SeqNo": 4, "Message": "bWVzc2FnZS0z", "OriginalSize": 9, "Offset": 3, + "CreateTimestamp": "not-zero-number", "WriteTimestamp": "not-zero-number", "TimestampDiff": "not-zero-number", "ProducerId": "text" }, { - "Codec": 0, "StorageSize": 9, "SeqNo": 5, "Message": "bWVzc2FnZS00", "OriginalSize": 9, "Offset": 4 + "Codec": 0, "StorageSize": 9, "SeqNo": 5, "Message": "bWVzc2FnZS00", "OriginalSize": 9, "Offset": 4, + "CreateTimestamp": "not-zero-number", "WriteTimestamp": "not-zero-number", "TimestampDiff": "not-zero-number", "ProducerId": "text" } ], "StartOffset": 0, @@ -4744,7 +4749,8 @@ "MessageMetadata": [ {"Value": "value1", "Key": "key1"}, {"Value": "value2", "Key": "key2"}], - "Message": "bWVzc2FnZV93aXRoX21ldGE=", "OriginalSize": 17, "Offset": 10 + "Message": "bWVzc2FnZV93aXRoX21ldGE=", "OriginalSize": 17, "Offset": 10, + "CreateTimestamp": "not-zero-number", "WriteTimestamp": "not-zero-number", "TimestampDiff": "not-zero-number", "ProducerId": "text" } ], "StartOffset": 0, @@ -4754,19 +4760,24 @@ "response_compressed": { "Messages": [ { - "Codec": 1, "StorageSize": 38, "SeqNo": 12, "Message": "Y29tcHJlc3NlZC1tZXNzYWdlLTA=", "OriginalSize": 20, "Offset": 11 + "Codec": 1, "StorageSize": 38, "SeqNo": 12, "Message": "Y29tcHJlc3NlZC1tZXNzYWdlLTA=", "OriginalSize": 20, "Offset": 11, + "CreateTimestamp": "not-zero-number", "WriteTimestamp": "not-zero-number", "TimestampDiff": "not-zero-number", "ProducerId": "text" }, { - "Codec": 1, "StorageSize": 38, "SeqNo": 13, "Message": "Y29tcHJlc3NlZC1tZXNzYWdlLTE=", "OriginalSize": 20, "Offset": 12 + "Codec": 1, "StorageSize": 38, "SeqNo": 13, "Message": "Y29tcHJlc3NlZC1tZXNzYWdlLTE=", "OriginalSize": 20, "Offset": 12, + "CreateTimestamp": "not-zero-number", "WriteTimestamp": "not-zero-number", "TimestampDiff": "not-zero-number", "ProducerId": "text" }, { - "Codec": 1, "StorageSize": 38, "SeqNo": 14, "Message": "Y29tcHJlc3NlZC1tZXNzYWdlLTI=", "OriginalSize": 20, "Offset": 13 + "Codec": 1, "StorageSize": 38, "SeqNo": 14, "Message": "Y29tcHJlc3NlZC1tZXNzYWdlLTI=", "OriginalSize": 20, "Offset": 13, + "CreateTimestamp": "not-zero-number", "WriteTimestamp": "not-zero-number", "TimestampDiff": "not-zero-number", "ProducerId": "text" }, { - "Codec": 1, "StorageSize": 38, "SeqNo": 15, "Message": "Y29tcHJlc3NlZC1tZXNzYWdlLTM=", "OriginalSize": 20, "Offset": 14 + "Codec": 1, "StorageSize": 38, "SeqNo": 15, "Message": "Y29tcHJlc3NlZC1tZXNzYWdlLTM=", "OriginalSize": 20, "Offset": 14, + "CreateTimestamp": "not-zero-number", "WriteTimestamp": "not-zero-number", "TimestampDiff": "not-zero-number", "ProducerId": "text" }, { - "Codec": 1, "StorageSize": 38, "SeqNo": 16, "Message": "Y29tcHJlc3NlZC1tZXNzYWdlLTQ=", "OriginalSize": 20, "Offset": 15 + "Codec": 1, "StorageSize": 38, "SeqNo": 16, "Message": "Y29tcHJlc3NlZC1tZXNzYWdlLTQ=", "OriginalSize": 20, "Offset": 15, + "CreateTimestamp": "not-zero-number", "WriteTimestamp": "not-zero-number", "TimestampDiff": "not-zero-number", "ProducerId": "text" } ], "StartOffset": 0, @@ -4776,12 +4787,21 @@ "response_not_truncated": { "Messages": [ { - "Codec": 1, "StorageSize": 38, "SeqNo": 21, "Message": "Y29tcHJlc3NlZC1tZXNzYWdlLTk=", "OriginalSize": 20, "Offset": 20 + "Codec": 1, "StorageSize": 38, "SeqNo": 21, "Message": "Y29tcHJlc3NlZC1tZXNzYWdlLTk=", "OriginalSize": 20, "Offset": 20, + "CreateTimestamp": "not-zero-number", "WriteTimestamp": "not-zero-number", "TimestampDiff": "not-zero-number", "ProducerId": "text" } ], "StartOffset": 0, "EndOffset": 21, "Truncated": false + }, + "no_partition": { + "status_code": 400, + "text": "Parameter 'partition' is necessary" + }, + "both_offset_and_ts": { + "status_code": 400, + "text": "Only read_timestamp or offset parameter may be specified, not both" } } } diff --git a/ydb/core/viewer/tests/test.py b/ydb/core/viewer/tests/test.py index c79175651b67..0b9d4e8b8470 100644 --- a/ydb/core/viewer/tests/test.py +++ b/ydb/core/viewer/tests/test.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- +import logging import ydb from ydb._topic_writer.topic_writer import PublicMessage from ydb.tests.library.harness.kikimr_runner import KiKiMR @@ -593,7 +594,6 @@ def test_topic_data(): endpoint = "localhost:{}".format(grpc_port) driver = ydb.Driver(endpoint=endpoint, database=dedicated_db, oauth=None) driver.wait(10, fail_fast=True) - driver.topic_client.create_topic('topic2', min_active_partitions=1, max_active_partitions=1) def write(writer, message_pattern, close=True): writer.write(["{}-{}".format(message_pattern, i) for i in range(10)]) @@ -601,7 +601,7 @@ def write(writer, message_pattern, close=True): if close: writer.close() - writer = driver.topic_client.writer('topic2', producer_id="12345") + writer = driver.topic_client.writer('topic1', producer_id="12345") write(writer, "message", False) # Also write one messagewith metadata @@ -609,12 +609,12 @@ def write(writer, message_pattern, close=True): writer.write(message_w_meta) writer.close() - writer_compressed = driver.topic_client.writer('topic2', producer_id="12345", codec=2) + writer_compressed = driver.topic_client.writer('topic1', producer_id="12345", codec=2) write(writer_compressed, "compressed-message") response = call_viewer("/viewer/topic_data", { 'database': dedicated_db, - 'path': '{}/topic2'.format(dedicated_db), + 'path': '{}/topic1'.format(dedicated_db), 'partition': '0', 'offset': '0', 'limit': '5' @@ -622,14 +622,14 @@ def write(writer, message_pattern, close=True): response_w_meta = call_viewer("/viewer/topic_data", { 'database': dedicated_db, - 'path': '{}/topic2'.format(dedicated_db), + 'path': '{}/topic1'.format(dedicated_db), 'partition': '0', 'offset': '10', 'limit': '1' }) response_compressed = call_viewer("/viewer/topic_data", { 'database': dedicated_db, - 'path': '{}/topic2'.format(dedicated_db), + 'path': '{}/topic1'.format(dedicated_db), 'partition': '0', 'offset': '11', 'limit': '5' @@ -637,28 +637,40 @@ def write(writer, message_pattern, close=True): response_last = call_viewer("/viewer/topic_data", { 'database': dedicated_db, - 'path': '{}/topic2'.format(dedicated_db), + 'path': '{}/topic1'.format(dedicated_db), 'partition': '0', 'offset': '20', 'limit': '5' }) - def strip_non_canonized(resp): - for message in resp["Messages"]: - assert int(message.get("CreateTimestamp", "0")) != 0 - assert int(message.get("WriteTimestamp", "0")) != 0 - assert int(message.get("TimestampDiff", None)) >= 0 - assert message.get("ProducerId", None) is not None - del message["CreateTimestamp"] - del message["WriteTimestamp"] - del message["TimestampDiff"] - del message["ProducerId"] - return resp + response_no_part = call_viewer("/viewer/topic_data", { + 'database': dedicated_db, + 'path': '{}/topic1'.format(dedicated_db), + 'offset': '20' + }) + response_both_offset_and_ts = call_viewer("/viewer/topic_data", { + 'database': dedicated_db, + 'path': '{}/topic1'.format(dedicated_db), + 'partition': '0', + 'offset': '20', + 'read_timestamp': '20' + }) + + def replace_values(resp): + res = replace_values_by_key(resp, ['CreateTimestamp', + 'WriteTimestamp', + 'TimestampDiff', + 'ProducerId', + ]) + logging.info(res) + return res result = { - 'response_read': strip_non_canonized(response), - 'response_metadata': strip_non_canonized(response_w_meta), - 'response_compressed': strip_non_canonized(response_compressed), - 'response_not_truncated': strip_non_canonized(response_last) + 'response_read': replace_values(response), + 'response_metadata': replace_values(response_w_meta), + 'response_compressed': replace_values(response_compressed), + 'response_not_truncated': replace_values(response_last), + 'no_partition': response_no_part, + 'both_offset_and_ts': response_both_offset_and_ts } return result diff --git a/ydb/core/viewer/topic_data_ut.cpp b/ydb/core/viewer/topic_data_ut.cpp index 834ecfdefe2e..df0470295ac1 100644 --- a/ydb/core/viewer/topic_data_ut.cpp +++ b/ydb/core/viewer/topic_data_ut.cpp @@ -23,11 +23,14 @@ Y_UNIT_TEST_SUITE(ViewerTopicDataTests) { UNIT_ASSERT(iter != map.end()); UNIT_ASSERT_VALUES_EQUAL_C(iter->second, value, key); } - TString GetRequestUrl(TString topic, ui32 partition, ui64 offset = 0, ui32 limit = 10) { + TString GetRequestUrl(TString topic, ui32 partition, ui64 offset = 0, ui32 limit = 10, bool noTruncate = false) { TStringBuilder url; CGIUnescape(topic); url << "/viewer/topic_data" << "?path=" << topic << "&partition=" << partition << "&offset=" << offset - << "&limit=" << limit << "&encode_data=false"; + << "&limit=" << limit; + if (noTruncate) { + url << "&truncate=false"; + } return url; } @@ -178,7 +181,6 @@ Y_UNIT_TEST_SUITE(ViewerTopicDataTests) { } } // Test 3 - large messages - { writeData(ECodec::GZIP, 20, "producer3", 1_MB); @@ -214,6 +216,23 @@ Y_UNIT_TEST_SUITE(ViewerTopicDataTests) { UNIT_ASSERT(jsonMap.find("WriteTimestamp") != jsonMap.end()); } } + // large message with no truncate + { + writeData(ECodec::GZIP, 1, "producer4", 3_MB); + + auto statusCode = MakeRequest(httpClient, GetRequestUrl(topicPath, 0, 60, 1, true), json); + UNIT_ASSERT_EQUAL(statusCode, HTTP_OK); + const auto& overalResponse = json.GetMap(); + const auto& messages = overalResponse.find("Messages")->second.GetArray(); + + + UNIT_ASSERT_C(messages.size() == 1, messages.size()); + const auto& item = messages[0]; + const auto& jsonMap = item.GetMap(); + UNIT_ASSERT(jsonMap.find("Message") != jsonMap.end()); + Cerr << "Size: " << jsonMap.find("Message")->second.GetString().size() << Endl; + UNIT_ASSERT(jsonMap.find("Message")->second.GetString().size() > 1500_KB); + } // Test 4 - bad topic, partition, offset { auto statusCode = MakeRequest(httpClient, GetRequestUrl("/Root/bad-topic", 0, 20, 10), json); diff --git a/ydb/core/viewer/viewer_topic_data.cpp b/ydb/core/viewer/viewer_topic_data.cpp index 5f1e875f2d55..16371fe8efa8 100644 --- a/ydb/core/viewer/viewer_topic_data.cpp +++ b/ydb/core/viewer/viewer_topic_data.cpp @@ -90,8 +90,10 @@ void TTopicData::SendPQReadRequest() { auto cmdRead = request.MutablePartitionRequest()->MutableCmdRead(); cmdRead->SetClientId(NKikimr::NPQ::CLIENTID_WITHOUT_CONSUMER); - cmdRead->SetCount(Limit); + cmdRead->SetCount(TruncateLongMessages ? Limit : 1); cmdRead->SetOffset(Offset); + cmdRead->SetReadTimestampMs(Timestamp); + cmdRead->SetTimeoutMs(READ_TIMEOUT_MS); cmdRead->SetExternalOperation(true); @@ -139,7 +141,7 @@ void TTopicData::FillProtoResponse(ui64 maxSingleMessageSize, ui64 maxTotalSize) auto setData = [&](NKikimrViewer::TTopicDataResponse::TMessage& protoMessage, TString&& data) { protoMessage.SetOriginalSize(data.size()); - if (data.size() > maxSingleMessageSize) { + if (data.size() > maxSingleMessageSize && TruncateLongMessages) { data.resize(maxSingleMessageSize); } totalSize += data.size(); @@ -232,23 +234,6 @@ void TTopicData::StateRequestedDescribe(TAutoPtr<::NActors::IEventHandle>& ev) { } } -bool TTopicData::GetIntegerParam(const TString& name, i64& value) { - const auto& params(Event->Get()->Request.GetParams()); - if (params.Has(name)) { - value = FromStringWithDefault(params.Get(name), -1); - if (value == -1) { - auto error = TStringBuilder() << "field ' "<< name << "' has invalid value, an interger >= 0 is expected"; - ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), "text/plain", error)); - return false; - } - return true; - } else { - auto error = TStringBuilder() << "field ' "<< name << "' is required"; - ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), "text/plain", error)); - return false; - } -} - void TTopicData::Bootstrap() { if (!Database.empty() && TBase::NeedToRedirect()) { return; @@ -256,13 +241,30 @@ void TTopicData::Bootstrap() { const auto& params(Event->Get()->Request.GetParams()); Timeout = TDuration::Seconds(std::min((ui32)Timeout.Seconds(), 30u)); + TruncateLongMessages = FromStringWithDefault(params.Get("truncate"), true); - if (!GetIntegerParam("partition", PartitionId)) - return; - if (!GetIntegerParam("offset", Offset)) - return; + if (!params.Has("partition")) { + return ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), "text/plain", "Parameter 'partition' is necessary")); + } + PartitionId = FromStringWithDefault(params.Get("partition"), PartitionId); + + Offset = FromStringWithDefault(params.Get("offset"), Offset); + Timestamp = FromStringWithDefault(params.Get("read_timestamp"), Timestamp); + + Limit = FromStringWithDefault(params.Get("limit"), Limit); - Limit = FromStringWithDefault(params.Get("limit"), 10); + // Only allow timestamp XOR offset to be defined + if (Offset > 0 && Timestamp > 0) { + return ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), "text/plain", "Only read_timestamp or offset parameter may be specified, not both")); + } + + // No truncate is available with on offset specified and limit = 1 or undefined + if (!TruncateLongMessages && Limit > 1) { + return ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), "text/plain", "truncate=false can only be specified with limit = 1")); + } + if (!TruncateLongMessages && Timestamp > 0) { + return ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), "text/plain", "truncate=false can only be specified with an offset, not a timestamp")); + } if (Limit > MAX_MESSAGES_LIMIT) { return ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), "text/plain", "Too many messages requested")); } diff --git a/ydb/core/viewer/viewer_topic_data.h b/ydb/core/viewer/viewer_topic_data.h index 28cd5e39f474..df1578669e63 100644 --- a/ydb/core/viewer/viewer_topic_data.h +++ b/ydb/core/viewer/viewer_topic_data.h @@ -41,7 +41,6 @@ class TTopicData : public TViewerPipeClient { void HandlePQResponse(TEvPersQueue::TEvResponse::TPtr& ev); void FillProtoResponse(ui64 maxSingleMessageSize = 1_MB, ui64 maxTotalSize = 10_MB); NYdb::NTopic::ICodec* GetCodec(NPersQueueCommon::ECodec codec); - bool GetIntegerParam(const TString& name, i64& value); STATEFN(StateRequestedDescribe); @@ -57,9 +56,12 @@ class TTopicData : public TViewerPipeClient { private: ui64 TabletId; TString TopicPath; - i64 PartitionId; - i64 Offset; - i64 Limit; + ui32 PartitionId; + ui64 Offset = 0; + ui64 Timestamp = 0; + + ui32 Limit = 10; + bool TruncateLongMessages = true; TMap> Codecs; std::optional> NavigateResponse; From b4ad4815ee09b97311d0aa6f3103e567256afac9 Mon Sep 17 00:00:00 2001 From: FloatingCrowbar Date: Wed, 2 Apr 2025 17:45:04 +0300 Subject: [PATCH 3/6] Fix the swaager (#16669) --- ydb/core/viewer/viewer_topic_data.h | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ydb/core/viewer/viewer_topic_data.h b/ydb/core/viewer/viewer_topic_data.h index df1578669e63..5bc4a66179a2 100644 --- a/ydb/core/viewer/viewer_topic_data.h +++ b/ydb/core/viewer/viewer_topic_data.h @@ -95,10 +95,14 @@ class TTopicData : public TViewerPipeClient { description: partition to read from required: true type: integer + - name: read_timestamp + in: query + description: min message timestamp to read from + required: false - name: offset in: query description: start offset to read from - required: true + required: false type: integer - name: limit in: query From 137e87e6c9151dec1d1ed0e50fb60dd3cd413e0d Mon Sep 17 00:00:00 2001 From: FloatingCrowbar Date: Thu, 24 Apr 2025 12:37:25 +0300 Subject: [PATCH 4/6] Decode source id in topic data handler (#17554) --- ydb/core/viewer/tests/canondata/result.json | 267 ++++++++++++++++++++ ydb/core/viewer/viewer_topic_data.cpp | 6 +- 2 files changed, 272 insertions(+), 1 deletion(-) diff --git a/ydb/core/viewer/tests/canondata/result.json b/ydb/core/viewer/tests/canondata/result.json index 7de503cf9249..3df7673ac4e1 100644 --- a/ydb/core/viewer/tests/canondata/result.json +++ b/ydb/core/viewer/tests/canondata/result.json @@ -506,6 +506,273 @@ ], "TotalGroups": 5 }, + "test.test_topic_data": { + "both_offset_and_ts": { + "status_code": 400, + "text": "Only read_timestamp or offset parameter may be specified, not both" + }, + "no_partition": { + "status_code": 400, + "text": "Parameter 'partition' is necessary" + }, + "response_compressed": { + "EndOffset": 21, + "Messages": [ + { + "Codec": 1, + "CreateTimestamp": "not-zero-number", + "Message": "Y29tcHJlc3NlZC1tZXNzYWdlLTA=", + "Offset": 11, + "OriginalSize": 20, + "ProducerId": "not-zero-number-text", + "SeqNo": 12, + "StorageSize": 38, + "TimestampDiff": "not-zero-number", + "WriteTimestamp": "not-zero-number" + }, + { + "Codec": 1, + "CreateTimestamp": "not-zero-number", + "Message": "Y29tcHJlc3NlZC1tZXNzYWdlLTE=", + "Offset": 12, + "OriginalSize": 20, + "ProducerId": "not-zero-number-text", + "SeqNo": 13, + "StorageSize": 38, + "TimestampDiff": "not-zero-number", + "WriteTimestamp": "not-zero-number" + }, + { + "Codec": 1, + "CreateTimestamp": "not-zero-number", + "Message": "Y29tcHJlc3NlZC1tZXNzYWdlLTI=", + "Offset": 13, + "OriginalSize": 20, + "ProducerId": "not-zero-number-text", + "SeqNo": 14, + "StorageSize": 38, + "TimestampDiff": "not-zero-number", + "WriteTimestamp": "not-zero-number" + }, + { + "Codec": 1, + "CreateTimestamp": "not-zero-number", + "Message": "Y29tcHJlc3NlZC1tZXNzYWdlLTM=", + "Offset": 14, + "OriginalSize": 20, + "ProducerId": "not-zero-number-text", + "SeqNo": 15, + "StorageSize": 38, + "TimestampDiff": "not-zero-number", + "WriteTimestamp": "not-zero-number" + }, + { + "Codec": 1, + "CreateTimestamp": "not-zero-number", + "Message": "Y29tcHJlc3NlZC1tZXNzYWdlLTQ=", + "Offset": 15, + "OriginalSize": 20, + "ProducerId": "not-zero-number-text", + "SeqNo": 16, + "StorageSize": 38, + "TimestampDiff": "not-zero-number", + "WriteTimestamp": "not-zero-number" + } + ], + "StartOffset": 0, + "Truncated": true + }, + "response_metadata": { + "EndOffset": 21, + "Messages": [ + { + "Codec": 1, + "CreateTimestamp": "not-zero-number", + "Message": "bWVzc2FnZV93aXRoX21ldGE=", + "MessageMetadata": [ + { + "Key": "key1", + "Value": "value1" + }, + { + "Key": "key2", + "Value": "value2" + } + ], + "Offset": 10, + "OriginalSize": 17, + "ProducerId": "not-zero-number-text", + "SeqNo": 11, + "StorageSize": 37, + "TimestampDiff": "not-zero-number", + "WriteTimestamp": "not-zero-number" + } + ], + "StartOffset": 0, + "Truncated": true + }, + "response_not_truncated": { + "EndOffset": 21, + "Messages": [ + { + "Codec": 1, + "CreateTimestamp": "not-zero-number", + "Message": "Y29tcHJlc3NlZC1tZXNzYWdlLTk=", + "Offset": 20, + "OriginalSize": 20, + "ProducerId": "not-zero-number-text", + "SeqNo": 21, + "StorageSize": 38, + "TimestampDiff": "not-zero-number", + "WriteTimestamp": "not-zero-number" + } + ], + "StartOffset": 0, + "Truncated": false + }, + "response_read": { + "EndOffset": 21, + "Messages": [ + { + "Codec": 0, + "CreateTimestamp": "not-zero-number", + "Message": "bWVzc2FnZS0w", + "Offset": 0, + "OriginalSize": 9, + "ProducerId": "not-zero-number-text", + "SeqNo": 1, + "StorageSize": 9, + "TimestampDiff": "not-zero-number", + "WriteTimestamp": "not-zero-number" + }, + { + "Codec": 0, + "CreateTimestamp": "not-zero-number", + "Message": "bWVzc2FnZS0x", + "Offset": 1, + "OriginalSize": 9, + "ProducerId": "not-zero-number-text", + "SeqNo": 2, + "StorageSize": 9, + "TimestampDiff": "not-zero-number", + "WriteTimestamp": "not-zero-number" + }, + { + "Codec": 0, + "CreateTimestamp": "not-zero-number", + "Message": "bWVzc2FnZS0y", + "Offset": 2, + "OriginalSize": 9, + "ProducerId": "not-zero-number-text", + "SeqNo": 3, + "StorageSize": 9, + "TimestampDiff": "not-zero-number", + "WriteTimestamp": "not-zero-number" + }, + { + "Codec": 0, + "CreateTimestamp": "not-zero-number", + "Message": "bWVzc2FnZS0z", + "Offset": 3, + "OriginalSize": 9, + "ProducerId": "not-zero-number-text", + "SeqNo": 4, + "StorageSize": 9, + "TimestampDiff": "not-zero-number", + "WriteTimestamp": "not-zero-number" + }, + { + "Codec": 0, + "CreateTimestamp": "not-zero-number", + "Message": "bWVzc2FnZS00", + "Offset": 4, + "OriginalSize": 9, + "ProducerId": "not-zero-number-text", + "SeqNo": 5, + "StorageSize": 9, + "TimestampDiff": "not-zero-number", + "WriteTimestamp": "not-zero-number" + } + ], + "StartOffset": 0, + "Truncated": true + } + }, + "test.test_transfer_describe": { + "connection_params": { + "connection_string": "text", + "database": "/Root/dedicated_db", + "endpoint": "text", + "oauth": {} + }, + "error": { + "issues": [ + { + "message": "Discovery error: /Root/dedicated_db/TableNotExists: SCHEME_ERROR ({
: Error: Path not found })", + "severity": 1 + } + ] + }, + "row_consistency": {}, + "self": { + "created_at": { + "plan_step": "not-zero-number-text", + "tx_id": "not-zero-number-text" + }, + "effective_permissions": [ + { + "permission_names": [ + "ydb.database.connect" + ], + "subject": "USERS" + }, + { + "permission_names": [ + "ydb.generic.list" + ], + "subject": "METADATA-READERS" + }, + { + "permission_names": [ + "ydb.granular.select_row" + ], + "subject": "DATA-READERS" + }, + { + "permission_names": [ + "ydb.tables.modify" + ], + "subject": "DATA-WRITERS" + }, + { + "permission_names": [ + "ydb.granular.create_directory", + "ydb.granular.write_attributes", + "ydb.granular.create_table", + "ydb.granular.remove_schema", + "ydb.granular.create_queue", + "ydb.granular.alter_schema" + ], + "subject": "DDL-ADMINS" + }, + { + "permission_names": [ + "ydb.access.grant" + ], + "subject": "ACCESS-ADMINS" + }, + { + "permission_names": [ + "ydb.generic.manage" + ], + "subject": "DATABASE-ADMINS" + } + ], + "name": "TestAsyncReplication", + "owner": "root@builtin", + "type": "REPLICATION" + } + }, "test.test_viewer_acl": { "/Root": { "Common": { diff --git a/ydb/core/viewer/viewer_topic_data.cpp b/ydb/core/viewer/viewer_topic_data.cpp index 16371fe8efa8..cb67bb2681e3 100644 --- a/ydb/core/viewer/viewer_topic_data.cpp +++ b/ydb/core/viewer/viewer_topic_data.cpp @@ -181,7 +181,11 @@ void TTopicData::FillProtoResponse(ui64 maxSingleMessageSize, ui64 maxTotalSize) setData(*messageProto, std::move(*dataChunk.MutableData())); } messageProto->SetCodec(dataChunk.GetCodec()); - messageProto->SetProducerId(r.GetSourceId()); + TString decodedSrcId; + if (!r.GetSourceId().empty()) { + decodedSrcId = NPQ::NSourceIdEncoding::Decode(r.GetSourceId()); + } + messageProto->SetProducerId(decodedSrcId); messageProto->SetSeqNo(r.GetSeqNo()); if (dataChunk.MessageMetaSize() > 0) { From 6bcd3291d81aa4a7945ade6a76aec1f0813aa62b Mon Sep 17 00:00:00 2001 From: FloatingCrowbar Date: Fri, 25 Apr 2025 14:47:23 +0300 Subject: [PATCH 5/6] New topic data handler features (#17682) --- ydb/core/viewer/tests/canondata/result.json | 68 ++++++++++++++++++++- ydb/core/viewer/tests/test.py | 23 ++++++- ydb/core/viewer/viewer_topic_data.cpp | 21 ++++--- ydb/core/viewer/viewer_topic_data.h | 19 +++++- 4 files changed, 117 insertions(+), 14 deletions(-) diff --git a/ydb/core/viewer/tests/canondata/result.json b/ydb/core/viewer/tests/canondata/result.json index 3df7673ac4e1..be33b653bff8 100644 --- a/ydb/core/viewer/tests/canondata/result.json +++ b/ydb/core/viewer/tests/canondata/result.json @@ -580,7 +580,7 @@ } ], "StartOffset": 0, - "Truncated": true + "Truncated": false }, "response_metadata": { "EndOffset": 21, @@ -609,7 +609,7 @@ } ], "StartOffset": 0, - "Truncated": true + "Truncated": false }, "response_not_truncated": { "EndOffset": 21, @@ -630,6 +630,68 @@ "StartOffset": 0, "Truncated": false }, + "response_truncated": { + "EndOffset": 21, + "Messages": [ + { + "Codec": 1, + "CreateTimestamp": "not-zero-number", + "Message": "Y29tcHI=", + "Offset": 20, + "OriginalSize": 20, + "ProducerId": "not-zero-number-text", + "SeqNo": 21, + "StorageSize": 38, + "TimestampDiff": "not-zero-number", + "WriteTimestamp": "not-zero-number" + } + ], + "StartOffset": 0, + "Truncated": true + }, + "response_last_offset": { + "EndOffset": 21, + "Messages": [ + { + "Codec": 0, + "CreateTimestamp": "not-zero-number", + "Message": "bWVzc2FnZS0w", + "Offset": 0, + "OriginalSize": 9, + "ProducerId": "not-zero-number-text", + "SeqNo": 1, + "StorageSize": 9, + "TimestampDiff": "not-zero-number", + "WriteTimestamp": "not-zero-number" + }, + { + "Codec": 0, + "CreateTimestamp": "not-zero-number", + "Message": "bWVzc2FnZS0x", + "Offset": 1, + "OriginalSize": 9, + "ProducerId": "not-zero-number-text", + "SeqNo": 2, + "StorageSize": 9, + "TimestampDiff": "not-zero-number", + "WriteTimestamp": "not-zero-number" + }, + { + "Codec": 0, + "CreateTimestamp": "not-zero-number", + "Message": "bWVzc2FnZS0y", + "Offset": 2, + "OriginalSize": 9, + "ProducerId": "not-zero-number-text", + "SeqNo": 3, + "StorageSize": 9, + "TimestampDiff": "not-zero-number", + "WriteTimestamp": "not-zero-number" + } + ], + "StartOffset": 0, + "Truncated": false + }, "response_read": { "EndOffset": 21, "Messages": [ @@ -695,7 +757,7 @@ } ], "StartOffset": 0, - "Truncated": true + "Truncated": false } }, "test.test_transfer_describe": { diff --git a/ydb/core/viewer/tests/test.py b/ydb/core/viewer/tests/test.py index 0b9d4e8b8470..efbc13a43fbd 100644 --- a/ydb/core/viewer/tests/test.py +++ b/ydb/core/viewer/tests/test.py @@ -620,6 +620,15 @@ def write(writer, message_pattern, close=True): 'limit': '5' }) + response_cut_by_last_offset = call_viewer("/viewer/topic_data", { + 'database': dedicated_db, + 'path': '{}/topic1'.format(dedicated_db), + 'partition': '0', + 'offset': '0', + 'last_offset': '2', + 'limit': '5' + }) + response_w_meta = call_viewer("/viewer/topic_data", { 'database': dedicated_db, 'path': '{}/topic1'.format(dedicated_db), @@ -643,11 +652,21 @@ def write(writer, message_pattern, close=True): 'limit': '5' }) + response_short_msg = call_viewer("/viewer/topic_data", { + 'database': dedicated_db, + 'path': '{}/topic1'.format(dedicated_db), + 'partition': '0', + 'offset': '20', + 'limit': '1', + 'message_size_limit': '5' + }) + response_no_part = call_viewer("/viewer/topic_data", { 'database': dedicated_db, 'path': '{}/topic1'.format(dedicated_db), 'offset': '20' }) + response_both_offset_and_ts = call_viewer("/viewer/topic_data", { 'database': dedicated_db, 'path': '{}/topic1'.format(dedicated_db), @@ -671,6 +690,8 @@ def replace_values(resp): 'response_compressed': replace_values(response_compressed), 'response_not_truncated': replace_values(response_last), 'no_partition': response_no_part, - 'both_offset_and_ts': response_both_offset_and_ts + 'both_offset_and_ts': response_both_offset_and_ts, + 'response_truncated': replace_values(response_short_msg), + 'response_last_offset': replace_values(response_cut_by_last_offset), } return result diff --git a/ydb/core/viewer/viewer_topic_data.cpp b/ydb/core/viewer/viewer_topic_data.cpp index cb67bb2681e3..8ce9f19c09ba 100644 --- a/ydb/core/viewer/viewer_topic_data.cpp +++ b/ydb/core/viewer/viewer_topic_data.cpp @@ -92,6 +92,9 @@ void TTopicData::SendPQReadRequest() { cmdRead->SetClientId(NKikimr::NPQ::CLIENTID_WITHOUT_CONSUMER); cmdRead->SetCount(TruncateLongMessages ? Limit : 1); cmdRead->SetOffset(Offset); + if (LastOffset) { + cmdRead->SetLastOffset(LastOffset); + } cmdRead->SetReadTimestampMs(Timestamp); cmdRead->SetTimeoutMs(READ_TIMEOUT_MS); @@ -131,37 +134,35 @@ void TTopicData::HandlePQResponse(TEvPersQueue::TEvResponse::TPtr& ev) { RequestDone(); } -void TTopicData::FillProtoResponse(ui64 maxSingleMessageSize, ui64 maxTotalSize) { +void TTopicData::FillProtoResponse(ui64 maxTotalSize) { ui64 totalSize = 0; const auto& response = ReadResponse->Record.GetPartitionResponse(); if(!response.HasCmdReadResult()) { return; } const auto& cmdRead = response.GetCmdReadResult(); - + bool isTruncated = false; auto setData = [&](NKikimrViewer::TTopicDataResponse::TMessage& protoMessage, TString&& data) { protoMessage.SetOriginalSize(data.size()); - if (data.size() > maxSingleMessageSize && TruncateLongMessages) { - data.resize(maxSingleMessageSize); + if (data.size() > MaxSingleMessageSize && TruncateLongMessages) { + isTruncated = true; + data.resize(MaxSingleMessageSize); } totalSize += data.size(); protoMessage.SetMessage(std::move(Base64Encode(data))); }; ProtoResponse.SetStartOffset(cmdRead.GetStartOffset()); - bool truncated = true; ProtoResponse.SetEndOffset(cmdRead.GetEndOffset()); for (auto& r : cmdRead.GetResult()) { if (totalSize >= maxTotalSize) { + isTruncated = true; break; } auto dataChunk = (NKikimr::GetDeserializedData(r.GetData())); auto* messageProto = ProtoResponse.AddMessages(); messageProto->SetOffset(r.GetOffset()); - if (r.GetOffset() == cmdRead.GetEndOffset() - 1) - truncated = false; - messageProto->SetCreateTimestamp(r.GetCreateTimestampMS()); messageProto->SetWriteTimestamp(r.GetWriteTimestampMS()); i64 diff = r.GetWriteTimestampMS() - r.GetCreateTimestampMS(); @@ -197,7 +198,7 @@ void TTopicData::FillProtoResponse(ui64 maxSingleMessageSize, ui64 maxTotalSize) } } } - ProtoResponse.SetTruncated(truncated); + ProtoResponse.SetTruncated(isTruncated); } void TTopicData::ReplyAndPassAway() { @@ -246,6 +247,7 @@ void TTopicData::Bootstrap() { Timeout = TDuration::Seconds(std::min((ui32)Timeout.Seconds(), 30u)); TruncateLongMessages = FromStringWithDefault(params.Get("truncate"), true); + MaxSingleMessageSize = FromStringWithDefault(params.Get("message_size_limit"), MaxSingleMessageSize); if (!params.Has("partition")) { return ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), "text/plain", "Parameter 'partition' is necessary")); @@ -253,6 +255,7 @@ void TTopicData::Bootstrap() { PartitionId = FromStringWithDefault(params.Get("partition"), PartitionId); Offset = FromStringWithDefault(params.Get("offset"), Offset); + LastOffset = FromStringWithDefault(params.Get("last_offset"), LastOffset); Timestamp = FromStringWithDefault(params.Get("read_timestamp"), Timestamp); Limit = FromStringWithDefault(params.Get("limit"), Limit); diff --git a/ydb/core/viewer/viewer_topic_data.h b/ydb/core/viewer/viewer_topic_data.h index 5bc4a66179a2..37527fc0fd3f 100644 --- a/ydb/core/viewer/viewer_topic_data.h +++ b/ydb/core/viewer/viewer_topic_data.h @@ -39,7 +39,7 @@ class TTopicData : public TViewerPipeClient { void HandleDescribe(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev); void SendPQReadRequest(); void HandlePQResponse(TEvPersQueue::TEvResponse::TPtr& ev); - void FillProtoResponse(ui64 maxSingleMessageSize = 1_MB, ui64 maxTotalSize = 10_MB); + void FillProtoResponse(ui64 maxTotalSize = 10_MB); NYdb::NTopic::ICodec* GetCodec(NPersQueueCommon::ECodec codec); STATEFN(StateRequestedDescribe); @@ -58,10 +58,12 @@ class TTopicData : public TViewerPipeClient { TString TopicPath; ui32 PartitionId; ui64 Offset = 0; + ui64 LastOffset = 0; ui64 Timestamp = 0; ui32 Limit = 10; bool TruncateLongMessages = true; + ui64 MaxSingleMessageSize = 1024 * 1024; TMap> Codecs; std::optional> NavigateResponse; @@ -104,11 +106,26 @@ class TTopicData : public TViewerPipeClient { description: start offset to read from required: false type: integer + - name: last_offset + in: query + description: last offset that can possibly be read + required: false + type: integer - name: limit in: query description: max number of messages to read (default = 10) required: false type: integer + - name: message_size_limit + in: query + description: max size of single message (default = 1_MB) + required: false + type: integer + - name: truncate + in: query + description: dont truncate large messages + required: false + type: bool - name: timeout in: query description: timeout in ms From f5ab855b0229ab933565cdfb7b9812a9448497bf Mon Sep 17 00:00:00 2001 From: Konstantin Melekhov Date: Fri, 25 Apr 2025 13:09:42 +0000 Subject: [PATCH 6/6] Fix merge issues --- ydb/core/viewer/json_pipe_req.cpp | 3 +- ydb/core/viewer/tests/canondata/result.json | 281 ++++---------------- ydb/core/viewer/tests/test.py | 15 -- 3 files changed, 60 insertions(+), 239 deletions(-) diff --git a/ydb/core/viewer/json_pipe_req.cpp b/ydb/core/viewer/json_pipe_req.cpp index 24297420466d..4061d93f3f4b 100644 --- a/ydb/core/viewer/json_pipe_req.cpp +++ b/ydb/core/viewer/json_pipe_req.cpp @@ -605,7 +605,8 @@ TViewerPipeClient::TRequestResponseRecord.SetSchemeshardId(schemeShardId); request->Record.SetPath(path); request->Record.MutableOptions()->CopyFrom(options); - auto response = MakeRequestToTablet(schemeShardId, request.release(), cookie); + auto pipe = ConnectTabletPipe(schemeShardId); + auto response = MakeRequest(pipe, request.release(), cookie); if (response.Span) { response.Span.Attribute("path", path); } diff --git a/ydb/core/viewer/tests/canondata/result.json b/ydb/core/viewer/tests/canondata/result.json index be33b653bff8..e1a09f02ea2a 100644 --- a/ydb/core/viewer/tests/canondata/result.json +++ b/ydb/core/viewer/tests/canondata/result.json @@ -582,6 +582,49 @@ "StartOffset": 0, "Truncated": false }, + "response_last_offset": { + "EndOffset": 21, + "Messages": [ + { + "Codec": 0, + "CreateTimestamp": "not-zero-number", + "Message": "bWVzc2FnZS0w", + "Offset": 0, + "OriginalSize": 9, + "ProducerId": "not-zero-number-text", + "SeqNo": 1, + "StorageSize": 9, + "TimestampDiff": "not-zero-number", + "WriteTimestamp": "not-zero-number" + }, + { + "Codec": 0, + "CreateTimestamp": "not-zero-number", + "Message": "bWVzc2FnZS0x", + "Offset": 1, + "OriginalSize": 9, + "ProducerId": "not-zero-number-text", + "SeqNo": 2, + "StorageSize": 9, + "TimestampDiff": "not-zero-number", + "WriteTimestamp": "not-zero-number" + }, + { + "Codec": 0, + "CreateTimestamp": "not-zero-number", + "Message": "bWVzc2FnZS0y", + "Offset": 2, + "OriginalSize": 9, + "ProducerId": "not-zero-number-text", + "SeqNo": 3, + "StorageSize": 9, + "TimestampDiff": "not-zero-number", + "WriteTimestamp": "not-zero-number" + } + ], + "StartOffset": 0, + "Truncated": false + }, "response_metadata": { "EndOffset": 21, "Messages": [ @@ -630,68 +673,6 @@ "StartOffset": 0, "Truncated": false }, - "response_truncated": { - "EndOffset": 21, - "Messages": [ - { - "Codec": 1, - "CreateTimestamp": "not-zero-number", - "Message": "Y29tcHI=", - "Offset": 20, - "OriginalSize": 20, - "ProducerId": "not-zero-number-text", - "SeqNo": 21, - "StorageSize": 38, - "TimestampDiff": "not-zero-number", - "WriteTimestamp": "not-zero-number" - } - ], - "StartOffset": 0, - "Truncated": true - }, - "response_last_offset": { - "EndOffset": 21, - "Messages": [ - { - "Codec": 0, - "CreateTimestamp": "not-zero-number", - "Message": "bWVzc2FnZS0w", - "Offset": 0, - "OriginalSize": 9, - "ProducerId": "not-zero-number-text", - "SeqNo": 1, - "StorageSize": 9, - "TimestampDiff": "not-zero-number", - "WriteTimestamp": "not-zero-number" - }, - { - "Codec": 0, - "CreateTimestamp": "not-zero-number", - "Message": "bWVzc2FnZS0x", - "Offset": 1, - "OriginalSize": 9, - "ProducerId": "not-zero-number-text", - "SeqNo": 2, - "StorageSize": 9, - "TimestampDiff": "not-zero-number", - "WriteTimestamp": "not-zero-number" - }, - { - "Codec": 0, - "CreateTimestamp": "not-zero-number", - "Message": "bWVzc2FnZS0y", - "Offset": 2, - "OriginalSize": 9, - "ProducerId": "not-zero-number-text", - "SeqNo": 3, - "StorageSize": 9, - "TimestampDiff": "not-zero-number", - "WriteTimestamp": "not-zero-number" - } - ], - "StartOffset": 0, - "Truncated": false - }, "response_read": { "EndOffset": 21, "Messages": [ @@ -758,81 +739,25 @@ ], "StartOffset": 0, "Truncated": false - } - }, - "test.test_transfer_describe": { - "connection_params": { - "connection_string": "text", - "database": "/Root/dedicated_db", - "endpoint": "text", - "oauth": {} }, - "error": { - "issues": [ - { - "message": "Discovery error: /Root/dedicated_db/TableNotExists: SCHEME_ERROR ({
: Error: Path not found })", - "severity": 1 - } - ] - }, - "row_consistency": {}, - "self": { - "created_at": { - "plan_step": "not-zero-number-text", - "tx_id": "not-zero-number-text" - }, - "effective_permissions": [ - { - "permission_names": [ - "ydb.database.connect" - ], - "subject": "USERS" - }, - { - "permission_names": [ - "ydb.generic.list" - ], - "subject": "METADATA-READERS" - }, - { - "permission_names": [ - "ydb.granular.select_row" - ], - "subject": "DATA-READERS" - }, - { - "permission_names": [ - "ydb.tables.modify" - ], - "subject": "DATA-WRITERS" - }, - { - "permission_names": [ - "ydb.granular.create_directory", - "ydb.granular.write_attributes", - "ydb.granular.create_table", - "ydb.granular.remove_schema", - "ydb.granular.create_queue", - "ydb.granular.alter_schema" - ], - "subject": "DDL-ADMINS" - }, - { - "permission_names": [ - "ydb.access.grant" - ], - "subject": "ACCESS-ADMINS" - }, + "response_truncated": { + "EndOffset": 21, + "Messages": [ { - "permission_names": [ - "ydb.generic.manage" - ], - "subject": "DATABASE-ADMINS" + "Codec": 1, + "CreateTimestamp": "not-zero-number", + "Message": "Y29tcHI=", + "Offset": 20, + "OriginalSize": 20, + "ProducerId": "not-zero-number-text", + "SeqNo": 21, + "StorageSize": 38, + "TimestampDiff": "not-zero-number", + "WriteTimestamp": "not-zero-number" } ], - "name": "TestAsyncReplication", - "owner": "root@builtin", - "type": "REPLICATION" + "StartOffset": 0, + "Truncated": true } }, "test.test_viewer_acl": { @@ -5042,95 +4967,5 @@ }, "test.test_wait_for_cluster_ready": { "wait_good": true - }, - "test.test_topic_data": { - "response_read": { - "Messages": [ - { - "Codec": 0, "StorageSize": 9, "SeqNo": 1, "Message": "bWVzc2FnZS0w", "OriginalSize": 9, "Offset": 0, - "CreateTimestamp": "not-zero-number", "WriteTimestamp": "not-zero-number", "TimestampDiff": "not-zero-number", "ProducerId": "text" - }, - { - "Codec": 0, "StorageSize": 9, "SeqNo": 2, "Message": "bWVzc2FnZS0x", "OriginalSize": 9, "Offset": 1, - "CreateTimestamp": "not-zero-number", "WriteTimestamp": "not-zero-number", "TimestampDiff": "not-zero-number", "ProducerId": "text" - }, - { - "Codec": 0, "StorageSize": 9, "SeqNo": 3, "Message": "bWVzc2FnZS0y", "OriginalSize": 9, "Offset": 2, - "CreateTimestamp": "not-zero-number", "WriteTimestamp": "not-zero-number", "TimestampDiff": "not-zero-number", "ProducerId": "text" - }, - { - "Codec": 0, "StorageSize": 9, "SeqNo": 4, "Message": "bWVzc2FnZS0z", "OriginalSize": 9, "Offset": 3, - "CreateTimestamp": "not-zero-number", "WriteTimestamp": "not-zero-number", "TimestampDiff": "not-zero-number", "ProducerId": "text" - }, - { - "Codec": 0, "StorageSize": 9, "SeqNo": 5, "Message": "bWVzc2FnZS00", "OriginalSize": 9, "Offset": 4, - "CreateTimestamp": "not-zero-number", "WriteTimestamp": "not-zero-number", "TimestampDiff": "not-zero-number", "ProducerId": "text" - } - ], - "StartOffset": 0, - "EndOffset": 21, - "Truncated": true - }, - "response_metadata": { - "Messages": [ - { - "Codec": 1, "StorageSize": 37, "SeqNo": 11, - "MessageMetadata": [ - {"Value": "value1", "Key": "key1"}, - {"Value": "value2", "Key": "key2"}], - "Message": "bWVzc2FnZV93aXRoX21ldGE=", "OriginalSize": 17, "Offset": 10, - "CreateTimestamp": "not-zero-number", "WriteTimestamp": "not-zero-number", "TimestampDiff": "not-zero-number", "ProducerId": "text" - } - ], - "StartOffset": 0, - "EndOffset": 21, - "Truncated": true - }, - "response_compressed": { - "Messages": [ - { - "Codec": 1, "StorageSize": 38, "SeqNo": 12, "Message": "Y29tcHJlc3NlZC1tZXNzYWdlLTA=", "OriginalSize": 20, "Offset": 11, - "CreateTimestamp": "not-zero-number", "WriteTimestamp": "not-zero-number", "TimestampDiff": "not-zero-number", "ProducerId": "text" - }, - { - "Codec": 1, "StorageSize": 38, "SeqNo": 13, "Message": "Y29tcHJlc3NlZC1tZXNzYWdlLTE=", "OriginalSize": 20, "Offset": 12, - "CreateTimestamp": "not-zero-number", "WriteTimestamp": "not-zero-number", "TimestampDiff": "not-zero-number", "ProducerId": "text" - }, - { - "Codec": 1, "StorageSize": 38, "SeqNo": 14, "Message": "Y29tcHJlc3NlZC1tZXNzYWdlLTI=", "OriginalSize": 20, "Offset": 13, - "CreateTimestamp": "not-zero-number", "WriteTimestamp": "not-zero-number", "TimestampDiff": "not-zero-number", "ProducerId": "text" - }, - { - "Codec": 1, "StorageSize": 38, "SeqNo": 15, "Message": "Y29tcHJlc3NlZC1tZXNzYWdlLTM=", "OriginalSize": 20, "Offset": 14, - "CreateTimestamp": "not-zero-number", "WriteTimestamp": "not-zero-number", "TimestampDiff": "not-zero-number", "ProducerId": "text" - }, - { - "Codec": 1, "StorageSize": 38, "SeqNo": 16, "Message": "Y29tcHJlc3NlZC1tZXNzYWdlLTQ=", "OriginalSize": 20, "Offset": 15, - "CreateTimestamp": "not-zero-number", "WriteTimestamp": "not-zero-number", "TimestampDiff": "not-zero-number", "ProducerId": "text" - } - ], - "StartOffset": 0, - "EndOffset": 21, - "Truncated": true - }, - "response_not_truncated": { - "Messages": [ - { - "Codec": 1, "StorageSize": 38, "SeqNo": 21, "Message": "Y29tcHJlc3NlZC1tZXNzYWdlLTk=", "OriginalSize": 20, "Offset": 20, - "CreateTimestamp": "not-zero-number", "WriteTimestamp": "not-zero-number", "TimestampDiff": "not-zero-number", "ProducerId": "text" - } - ], - "StartOffset": 0, - "EndOffset": 21, - "Truncated": false - }, - "no_partition": { - "status_code": 400, - "text": "Parameter 'partition' is necessary" - }, - "both_offset_and_ts": { - "status_code": 400, - "text": "Only read_timestamp or offset parameter may be specified, not both" - } } } diff --git a/ydb/core/viewer/tests/test.py b/ydb/core/viewer/tests/test.py index efbc13a43fbd..713be03c59fb 100644 --- a/ydb/core/viewer/tests/test.py +++ b/ydb/core/viewer/tests/test.py @@ -567,21 +567,6 @@ def test_pqrb_tablet(): ]) -def test_viewer_nodes_issue_14992(): - response_group_by = get_viewer_normalized("/viewer/nodes", { - 'group': 'Uptime' - }) - response_group = get_viewer_normalized("/viewer/nodes", { - 'filter_group_by': 'Uptime', - 'filter_group' : response_group_by['NodeGroups'][0]['GroupName'], - }) - result = { - 'response_group_by': response_group_by, - 'response_group': response_group, - } - return result - - def test_topic_data(): grpc_port = cluster.nodes[1].grpc_port