Skip to content

Get topic data handler for UI #17730

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/http_proxy/ut/internal_counters.json
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@
"folder_id": "folder4",
"database": "/Root"
},
"value":238,
"value":243,
"kind":"RATE"
},
{
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1418,7 +1418,7 @@ void TPartition::Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& c
}

TReadAnswer answer(info.FormAnswer(
ctx, *ev->Get(), EndOffset, Partition, userInfo,
ctx, *ev->Get(), StartOffset, EndOffset, Partition, userInfo,
info.Destination, GetSizeLag(info.Offset), Tablet, Config.GetMeteringMode(), IsActive()
));
const auto& resp = dynamic_cast<TEvPQ::TEvProxyResponse*>(answer.Event.Get())->Response;
Expand Down
11 changes: 8 additions & 3 deletions ydb/core/persqueue/partition_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ ui64 GetFirstHeaderOffset(const TKey& key, const TString& blob)
TReadAnswer TReadInfo::FormAnswer(
const TActorContext& ctx,
const TEvPQ::TEvBlobResponse& blobResponse,
const ui64 startOffset,
const ui64 endOffset,
const TPartitionId& partition,
TUserInfo* userInfo,
Expand All @@ -410,7 +411,9 @@ TReadAnswer TReadInfo::FormAnswer(
res.SetErrorCode(NPersQueue::NErrorCode::OK);
auto readResult = res.MutablePartitionResponse()->MutableCmdReadResult();
readResult->SetWaitQuotaTimeMs(WaitQuotaTime.MilliSeconds());
readResult->SetStartOffset(startOffset);
readResult->SetMaxOffset(endOffset);
readResult->SetEndOffset(endOffset);
readResult->SetRealReadOffset(Offset);
ui64 realReadOffset = Offset;
readResult->SetReadFromTimestampMs(ReadTimestampMs);
Expand Down Expand Up @@ -474,6 +477,7 @@ TReadAnswer TReadInfo::FormAnswer(
SizeEstimate = answerSize;
readResult->SetSizeEstimate(SizeEstimate);
readResult->SetLastOffset(LastOffset);
readResult->SetStartOffset(startOffset);
readResult->SetEndOffset(endOffset);
return {answerSize, std::move(answer)};
}
Expand Down Expand Up @@ -507,7 +511,7 @@ TReadAnswer TReadInfo::FormAnswer(
continue;


PQ_LOG_D("FormAnswer processing batch offset " << (offset - header.GetCount()) << " totakecount " << count << " count " << header.GetCount()
PQ_LOG_D("FormAnswer processing batch offset " << (offset - header.GetCount()) << " totakecount " << count << " count " << header.GetCount()
<< " size " << header.GetPayloadSize() << " from pos " << pos << " cbcount " << batch.Blobs.size());

for (size_t i = pos; i < batch.Blobs.size(); ++i) {
Expand Down Expand Up @@ -580,6 +584,7 @@ TReadAnswer TReadInfo::FormAnswer(
SizeEstimate = answerSize;
readResult->SetSizeEstimate(SizeEstimate);
readResult->SetLastOffset(LastOffset);
readResult->SetStartOffset(startOffset);
readResult->SetEndOffset(endOffset);

return {answerSize, std::move(answer)};
Expand All @@ -590,7 +595,7 @@ void TPartition::Handle(TEvPQ::TEvReadTimeout::TPtr& ev, const TActorContext& ct
if (!res)
return;
TReadAnswer answer(res->FormAnswer(
ctx, res->Offset, Partition, nullptr, res->Destination, 0, Tablet, Config.GetMeteringMode(), IsActive()
ctx, StartOffset, res->Offset, Partition, nullptr, res->Destination, 0, Tablet, Config.GetMeteringMode(), IsActive()
));
ctx.Send(Tablet, answer.Event.Release());
PQ_LOG_D(" waiting read cookie " << ev->Get()->Cookie
Expand Down Expand Up @@ -1040,7 +1045,7 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u
PQ_LOG_D("Reading cookie " << cookie << ". All data is from uncompacted head.");

TReadAnswer answer = info.FormAnswer(
ctx, EndOffset, Partition, &UsersInfoStorage->GetOrCreate(info.User, ctx),
ctx, StartOffset, EndOffset, Partition, &UsersInfoStorage->GetOrCreate(info.User, ctx),
info.Destination, GetSizeLag(info.Offset), Tablet, Config.GetMeteringMode(), IsActive()
);
const auto* ev = dynamic_cast<TEvPQ::TEvProxyResponse*>(answer.Event.Get());
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ class TReadProxy : public TActorBootstrapped<TReadProxy> {
auto partResp = responseRecord.MutablePartitionResponse()->MutableCmdReadResult();

partResp->SetMaxOffset(readResult.GetMaxOffset());
partResp->SetStartOffset(readResult.GetStartOffset());
partResp->SetEndOffset(readResult.GetEndOffset());
partResp->SetSizeLag(readResult.GetSizeLag());
partResp->SetWaitQuotaTimeMs(partResp->GetWaitQuotaTimeMs() + readResult.GetWaitQuotaTimeMs());

Expand Down
4 changes: 3 additions & 1 deletion ydb/core/persqueue/subscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ struct TReadInfo {
TReadAnswer FormAnswer(
const TActorContext& ctx,
const TEvPQ::TEvBlobResponse& response,
const ui64 startOffset,
const ui64 endOffset,
const TPartitionId& partition,
TUserInfo* ui,
Expand All @@ -93,6 +94,7 @@ struct TReadInfo {

TReadAnswer FormAnswer(
const TActorContext& ctx,
const ui64 startOffset,
const ui64 endOffset,
const TPartitionId& partition,
TUserInfo* ui,
Expand All @@ -103,7 +105,7 @@ struct TReadInfo {
const bool isActive
) {
TEvPQ::TEvBlobResponse response(0, TVector<TRequestedBlob>());
return FormAnswer(ctx, response, endOffset, partition, ui, dst, sizeLag, tablet, meteringMode, isActive);
return FormAnswer(ctx, response, startOffset, endOffset, partition, ui, dst, sizeLag, tablet, meteringMode, isActive);
}
};

Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/msgbus_pq.proto
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ message TCmdReadResult {
optional uint64 ReadFromTimestampMs = 12;
optional uint64 SizeEstimate = 13;
optional int64 LastOffset = 14;
optional uint64 StartOffset = 17;
optional uint64 EndOffset = 15;
optional bool ReadingFinished = 16;
}
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/viewer/json_handlers_viewer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "viewer_describe.h"
#include "viewer_describe_topic.h"
#include "viewer_feature_flags.h"
#include "viewer_topic_data.h"
#include "viewer_graph.h"
#include "viewer_healthcheck.h"
#include "viewer_hiveinfo.h"
Expand Down Expand Up @@ -189,6 +190,10 @@ void InitViewerTopicInfoJsonHandler(TJsonHandlers& handlers) {
handlers.AddHandler("/viewer/topicinfo", new TJsonHandler<TJsonTopicInfo>(TJsonTopicInfo::GetSwagger()));
}

void InitViewerTopicDataJsonHandler(TJsonHandlers& handlers) {
handlers.AddHandler("/viewer/topic_data", new TJsonHandler<TTopicData>(TTopicData::GetSwagger()));
}

void InitViewerPQConsumerInfoJsonHandler(TJsonHandlers& handlers) {
handlers.AddHandler("/viewer/pqconsumerinfo", new TJsonHandler<TJsonPQConsumerInfo>(TJsonPQConsumerInfo::GetSwagger()));
}
Expand Down Expand Up @@ -303,6 +308,7 @@ void InitViewerJsonHandlers(TJsonHandlers& jsonHandlers) {
InitViewerConfigJsonHandler(jsonHandlers);
InitViewerCountersJsonHandler(jsonHandlers);
InitViewerTopicInfoJsonHandler(jsonHandlers);
InitViewerTopicDataJsonHandler(jsonHandlers);
InitViewerPQConsumerInfoJsonHandler(jsonHandlers);
InitViewerTabletCountersJsonHandler(jsonHandlers);
InitViewerStorageJsonHandler(jsonHandlers);
Expand Down
61 changes: 53 additions & 8 deletions ydb/core/viewer/json_pipe_req.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -544,24 +544,29 @@ void TViewerPipeClient::RequestBSControllerPDiskUpdateStatus(const NKikimrBlobSt
SendRequestToPipe(pipeClient, request.Release());
}

void TViewerPipeClient::RequestSchemeCacheNavigate(const TString& path) {
THolder<NSchemeCache::TSchemeCacheNavigate> TViewerPipeClient::SchemeCacheNavigateRequestBuilder (
NSchemeCache::TSchemeCacheNavigate::TEntry&& entry
) {
THolder<NSchemeCache::TSchemeCacheNavigate> request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
entry.Path = SplitPath(path);
entry.RedirectRequired = false;
entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpPath;
request->ResultSet.emplace_back(entry);
request->ResultSet.emplace_back(std::move(entry));
return request;
}

void TViewerPipeClient::RequestSchemeCacheNavigate(const TString& path) {
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
entry.Path = SplitPath(path);

auto request = SchemeCacheNavigateRequestBuilder(std::move(entry));
SendRequest(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()));
}

void TViewerPipeClient::RequestSchemeCacheNavigate(const TPathId& pathId) {
THolder<NSchemeCache::TSchemeCacheNavigate> request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
entry.TableId.PathId = pathId;
entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId;
entry.RedirectRequired = false;
entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpPath;
request->ResultSet.emplace_back(entry);
auto request = SchemeCacheNavigateRequestBuilder(std::move(entry));
SendRequest(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()));
}

Expand Down Expand Up @@ -594,6 +599,42 @@ TViewerPipeClient::TRequestResponse<TEvTxProxySchemeCache::TEvNavigateKeySetResu
return response;
}

TViewerPipeClient::TRequestResponse<NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult>
TViewerPipeClient::MakeRequestSchemeShardDescribe(TTabletId schemeShardId, const TString& path, const NKikimrSchemeOp::TDescribeOptions& options, ui64 cookie) {
auto request = std::make_unique<NSchemeShard::TEvSchemeShard::TEvDescribeScheme>();
request->Record.SetSchemeshardId(schemeShardId);
request->Record.SetPath(path);
request->Record.MutableOptions()->CopyFrom(options);
auto pipe = ConnectTabletPipe(schemeShardId);
auto response = MakeRequest<NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult>(pipe, request.release(), cookie);
if (response.Span) {
response.Span.Attribute("path", path);
}
return response;
}

TViewerPipeClient::TRequestResponse<TEvTxProxySchemeCache::TEvNavigateKeySetResult> TViewerPipeClient::MakeRequestSchemeCacheNavigateWithToken(
const TString& path, bool showPrivate, ui32 access, ui64 cookie
) {
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
entry.Path = SplitPath(path);
entry.ShowPrivatePath = showPrivate;
entry.Access = access;
auto request = SchemeCacheNavigateRequestBuilder(std::move(entry));

if (!Event->Get()->UserToken.empty())
request->UserToken = new NACLib::TUserToken(Event->Get()->UserToken);

auto response = MakeRequest<TEvTxProxySchemeCache::TEvNavigateKeySetResult>(
MakeSchemeCacheID(),
new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()), 0 /*flags*/, cookie
);
if (response.Span) {
response.Span.Attribute("path", path);
}
return response;
}

void TViewerPipeClient::RequestTxProxyDescribe(const TString& path) {
THolder<TEvTxUserProxy::TEvNavigate> request(new TEvTxUserProxy::TEvNavigate());
request->Record.MutableDescribePath()->SetPath(path);
Expand Down Expand Up @@ -776,6 +817,10 @@ TString TViewerPipeClient::GetHTTPBADREQUEST(TString contentType, TString respon
return Viewer->GetHTTPBADREQUEST(GetRequest(), std::move(contentType), std::move(response));
}

TString TViewerPipeClient::GetHTTPNOTFOUND(TString, TString) {
return Viewer->GetHTTPNOTFOUND(GetRequest());
}

TString TViewerPipeClient::GetHTTPINTERNALERROR(TString contentType, TString response) {
return Viewer->GetHTTPINTERNALERROR(GetRequest(), std::move(contentType), std::move(response));
}
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/viewer/json_pipe_req.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,10 +266,19 @@ class TViewerPipeClient : public TActorBootstrapped<TViewerPipeClient> {
TRequestResponse<NSysView::TEvSysView::TEvGetPDisksResponse> RequestBSControllerPDisks();
TRequestResponse<NSysView::TEvSysView::TEvGetStorageStatsResponse> RequestBSControllerStorageStats();
void RequestBSControllerPDiskUpdateStatus(const NKikimrBlobStorage::TUpdateDriveStatus& driveStatus, bool force = false);

THolder<NSchemeCache::TSchemeCacheNavigate> SchemeCacheNavigateRequestBuilder(NSchemeCache::TSchemeCacheNavigate::TEntry&& entry);

void RequestSchemeCacheNavigate(const TString& path);
void RequestSchemeCacheNavigate(const TPathId& pathId);

TRequestResponse<TEvTxProxySchemeCache::TEvNavigateKeySetResult> MakeRequestSchemeCacheNavigate(const TString& path, ui64 cookie = 0);
TRequestResponse<TEvTxProxySchemeCache::TEvNavigateKeySetResult> MakeRequestSchemeCacheNavigate(TPathId pathId, ui64 cookie = 0);

TRequestResponse<NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult> MakeRequestSchemeShardDescribe(TTabletId schemeShardId, const TString& path, const NKikimrSchemeOp::TDescribeOptions& options = {}, ui64 cookie = 0);
TRequestResponse<TEvTxProxySchemeCache::TEvNavigateKeySetResult> MakeRequestSchemeCacheNavigateWithToken(
const TString& path, bool showPrivate, ui32 access, ui64 cookie = 0);

TRequestResponse<TEvViewer::TEvViewerResponse> MakeRequestViewer(TNodeId nodeId, TEvViewer::TEvViewerRequest* request, ui32 flags = 0);
void RequestTxProxyDescribe(const TString& path);
void RequestStateStorageEndpointsLookup(const TString& path);
Expand Down Expand Up @@ -316,6 +325,7 @@ class TViewerPipeClient : public TActorBootstrapped<TViewerPipeClient> {
TString GetHTTPOKJSON(const google::protobuf::Message& response, TInstant lastModified = {});
TString GetHTTPGATEWAYTIMEOUT(TString contentType = {}, TString response = {});
TString GetHTTPBADREQUEST(TString contentType = {}, TString response = {});
TString GetHTTPNOTFOUND(TString contentType = {}, TString response = {});
TString GetHTTPINTERNALERROR(TString contentType = {}, TString response = {});
TString GetHTTPFORBIDDEN(TString contentType = {}, TString response = {});
TString MakeForward(const std::vector<ui32>& nodes);
Expand Down
26 changes: 26 additions & 0 deletions ydb/core/viewer/protos/viewer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -830,3 +830,29 @@ message TFeatureFlagsConfig {
repeated TDatabase Databases = 2;
}

message TTopicDataResponse {
message TMessage {
uint64 Offset = 1;
uint64 CreateTimestamp = 2;
uint64 WriteTimestamp = 3;
uint64 TimestampDiff = 4;
string Message = 5;
uint32 StorageSize = 6;
uint32 OriginalSize = 7;
uint32 Codec = 8;
string ProducerId = 9;
uint64 SeqNo = 10;

message TMetadataItem {
string Key = 1;
string Value = 2;
}
repeated TMetadataItem MessageMetadata = 11;
}

uint64 StartOffset = 1;
uint64 EndOffset = 2;
repeated TMessage Messages = 3;
bool Truncated = 4;
}

Loading
Loading