Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(groupStatus): Add "groupStatus" command to the CLI tools #764 #875

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,12 @@ public BrokerController(BrokerConfig brokerConfig) throws Exception {

// TODO: Split controller to a separate port
ControllerServiceImpl controllerService = MetadataStoreBuilder.build(metadataStore);
ProxyServiceImpl proxyService = new ProxyServiceImpl(messageStore, extendMessageService, producerManager, consumerManager);
ProxyServiceImpl proxyService = new ProxyServiceImpl(messageStore, extendMessageService
, producerManager, consumerManager,serviceManager);
grpcServer = new GrpcProtocolServer(brokerConfig.proxy(), messagingProcessor, controllerService, proxyService);
remotingServer = new RemotingProtocolServer(messagingProcessor);

((DefaultServiceManager)serviceManager).setRemotingProtocolServer(this.remotingServer);
}

@Override
Expand Down
4 changes: 3 additions & 1 deletion cli/src/main/java/com/automq/rocketmq/cli/MQAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.automq.rocketmq.cli.broker.TerminateNode;
import com.automq.rocketmq.cli.consumer.ConsumeMessage;
import com.automq.rocketmq.cli.consumer.ConsumerClientConnection;
import com.automq.rocketmq.cli.consumer.ConsumerStatus;
import com.automq.rocketmq.cli.consumer.CreateGroup;
import com.automq.rocketmq.cli.consumer.DeleteGroup;
import com.automq.rocketmq.cli.consumer.DescribeGroup;
Expand Down Expand Up @@ -64,7 +65,8 @@
TerminateNode.class,
ResetConsumeOffset.class,
ProducerClientConnection.class,
ConsumerClientConnection.class
ConsumerClientConnection.class,
ConsumerStatus.class
}
)
public class MQAdmin implements Runnable {
Expand Down
204 changes: 204 additions & 0 deletions cli/src/main/java/com/automq/rocketmq/cli/consumer/ConsumerStatus.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.cli.consumer;

import apache.rocketmq.proxy.v1.ConsumerStatusReply;
import apache.rocketmq.proxy.v1.ConsumerStatusRequest;
import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.cli.MQAdmin;
import com.automq.rocketmq.proxy.grpc.client.GrpcProxyClient;
import de.vandermeer.asciitable.AT_Row;
import de.vandermeer.asciitable.AsciiTable;
import de.vandermeer.asciitable.CWC_LongestLine;
import de.vandermeer.skb.interfaces.transformers.textformat.TextAlignment;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.body.ConsumeStatus;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.remoting.protocol.body.PopProcessQueueInfo;
import org.apache.rocketmq.remoting.protocol.body.ProcessQueueInfo;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import picocli.CommandLine;

import java.util.Map;
import java.util.concurrent.Callable;
import java.util.stream.IntStream;

@CommandLine.Command(name = "consumerStatus", mixinStandardHelpOptions = true, showDefaultValues = true)
public class ConsumerStatus implements Callable<Void> {
@CommandLine.ParentCommand
MQAdmin mqAdmin;

@CommandLine.Option(names = {"-g", "--groupName"}, description = "Group name", required = true)
String groupName;
@CommandLine.Option(names = {"-i", "--clientId"}, description = "clientId", required = false)
String clientId;
@CommandLine.Option(names = {"-s", "--jstack"}, description = "jstack", required = false)
boolean jstackEnable;

@Override
public Void call() throws Exception {
GrpcProxyClient proxyClient = new GrpcProxyClient(new CliClientConfig());

ConsumerStatusReply consumerStatusReply
= proxyClient.consumerStatus(mqAdmin.getEndpoint(),
ConsumerStatusRequest.newBuilder().setGroup(groupName)
.setClientId(clientId).setJstackEnable(jstackEnable).build()).get();

AsciiTable groupTable = new AsciiTable();
groupTable.addStrongRule();
groupTable.addRow("Consumer Group");
groupTable.addRow("consumeType", "messageModel", "consumeFromWhere");
groupTable.addRow(consumerStatusReply.getConsumeType(), consumerStatusReply.getMessageModel(), consumerStatusReply.getConsumeFromWhere());


groupTable.addStrongRule();
groupTable.addRow("Consumer Client");
groupTable.addRow("CLIENT ID", "PROTOCOL", "VERSION", "ADDRESS", "LANGUAGE");
groupTable.addRule();
for (apache.rocketmq.proxy.v1.ConsumerClientConnection connection : consumerStatusReply.getConnectionList()) {
groupTable.addRow(connection.getClientId(), connection.getProtocol(), connection.getVersion(), connection.getAddress(), connection.getLanguage());
groupTable.addRule();
}

//Consumer Subscription
final ConsumerRunningInfo consumerRunningInfo = ConsumerRunningInfo
.decode(consumerStatusReply.getConsumerRunningInfo().toByteArray(), ConsumerRunningInfo.class);
addGroupInfo(consumerRunningInfo,groupTable);

// CWC_LongestLine cwc = new CWC_LongestLine();
// IntStream.range(0, row.getCells().size()).forEach((i) -> {
// cwc.add(10, 0);
// });
// groupTable.getRenderer().setCWC(cwc);

String render = groupTable.render();
System.out.println(render);
return null;
}

private void addGroupInfo(ConsumerRunningInfo consumerRunningInfo,AsciiTable groupTable) {
groupTable.addStrongRule();
groupTable.addRow("Consumer Subscription");
AT_Row row = groupTable.addRow("Topic", "PROTOCOL", "ClassFilter", "SubExpression");
if (consumerRunningInfo.getSubscriptionSet() != null
&& !consumerRunningInfo.getSubscriptionSet().isEmpty()) {
for (SubscriptionData subscriptionData : consumerRunningInfo.getSubscriptionSet()) {
groupTable.addRow(subscriptionData.getTopic(), subscriptionData.isClassFilterMode(),
subscriptionData.getSubString());
groupTable.addRule();
}
}

groupTable.addStrongRule();
groupTable.addRow("Consumer Offset");
groupTable.addRow("Topic", "Broker Name", "QID", "Consumer Offset");
if (consumerRunningInfo.getMqTable() != null
&& !consumerRunningInfo.getMqTable().entrySet().isEmpty()) {
for (Map.Entry<MessageQueue, ProcessQueueInfo> entry : consumerRunningInfo.getMqTable().entrySet()) {
groupTable.addRow(entry.getKey().getTopic(),
entry.getKey().getBrokerName(),
entry.getKey().getQueueId(),
entry.getValue().getCommitOffset());
groupTable.addRule();
}
}

groupTable.addStrongRule();
groupTable.addRow("Consumer MQ Detail");
groupTable.addRow("Topic", "Broker Name", "QID", "Consumer Offset");
if (consumerRunningInfo.getMqTable() != null
&& !consumerRunningInfo.getMqTable().entrySet().isEmpty()) {
for (Map.Entry<MessageQueue, ProcessQueueInfo> entry : consumerRunningInfo.getMqTable().entrySet()) {
groupTable.addRow(entry.getKey().getTopic(),
entry.getKey().getBrokerName(),
entry.getKey().getQueueId(),
entry.getValue().toString());
groupTable.addRule();
}
}

groupTable.addStrongRule();
groupTable.addRow("Consumer MQ Detail");
groupTable.addRow("Topic", "Broker Name", "QID", "ProcessQueueInfo");
if (consumerRunningInfo.getMqTable() != null
&& !consumerRunningInfo.getMqTable().entrySet().isEmpty()) {
for (Map.Entry<MessageQueue, ProcessQueueInfo> entry : consumerRunningInfo.getMqTable().entrySet()) {
groupTable.addRow(entry.getKey().getTopic(),
entry.getKey().getBrokerName(),
entry.getKey().getQueueId(),
entry.getValue().toString());
groupTable.addRule();
}
}

groupTable.addStrongRule();
groupTable.addRow("Consumer Pop Detail");
groupTable.addRow("Topic", "Broker Name", "QID", "ProcessQueueInfo");
if (consumerRunningInfo.getMqTable() != null
&& !consumerRunningInfo.getMqTable().entrySet().isEmpty()) {
for (Map.Entry<MessageQueue, PopProcessQueueInfo> entry : consumerRunningInfo.getMqPopTable().entrySet()) {
groupTable.addRow(entry.getKey().getTopic(),
entry.getKey().getBrokerName(),
entry.getKey().getQueueId(),
entry.getValue().toString());
groupTable.addRule();
}
}

groupTable.addStrongRule();
groupTable.addRow("Consumer RT&TPS");
groupTable.addRow("Topic", "Pull RT", "Pull TPS", "Consume RT", "ConsumeOK TPS", "ConsumeFailed TPS", "ConsumeFailedMsgsInHour");
if (consumerRunningInfo.getStatusTable() != null
&& !consumerRunningInfo.getMqTable().entrySet().isEmpty()) {
for (Map.Entry<String, ConsumeStatus> entry : consumerRunningInfo.getStatusTable().entrySet()) {
groupTable.addRow(entry.getKey(),
entry.getValue().getPullRT(),
entry.getValue().getPullTPS(),
entry.getValue().getConsumeRT(),
entry.getValue().getConsumeOKTPS(),
entry.getValue().getConsumeFailedTPS(),
entry.getValue().getConsumeFailedMsgs());
groupTable.addRule();
}
}

groupTable.addStrongRule();
groupTable.addRow("User Consume Info");
groupTable.addRow("Topic", "Pull RT", "Pull TPS", "Consume RT", "ConsumeOK TPS", "ConsumeFailed TPS", "ConsumeFailedMsgsInHour");
if (consumerRunningInfo.getUserConsumerInfo() != null
&& !consumerRunningInfo.getUserConsumerInfo().entrySet().isEmpty()) {
for (Map.Entry<String, String> entry : consumerRunningInfo.getUserConsumerInfo().entrySet()) {
groupTable.addRow(entry.getKey(), entry.getValue());
groupTable.addRule();
}
}

groupTable.addStrongRule();
groupTable.addRow("Consumer jstack");
if (consumerRunningInfo.getJstack() != null &&
!consumerRunningInfo.getJstack().isEmpty()) {
groupTable.addRow(consumerRunningInfo.getJstack());
}

}

private void centralize(AT_Row row) {
row.setTextAlignment(TextAlignment.CENTER);
// row.getCells().forEach((cell -> cell.getContext().setTextAlignment(TextAlignment.CENTER)));
}
}
41 changes: 41 additions & 0 deletions proto/src/main/proto/proxy/proxy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,52 @@ message ConsumerClientConnection {
int64 last_update_time = 7;
}



message ConsumerClientConnectionReply {
Status status = 1;
// Producer client connection
repeated ConsumerClientConnection connection = 2;
}



message ConsumerSubInfo {
// topic
string topic = 1;
// subExpression
string sub_expression = 2;
}

message ConsumerStatusReply {
Status status = 1;
// consumer connection
repeated ConsumerClientConnection connection = 2;

repeated ConsumerSubInfo consumer_sub_info = 3;

string consume_type = 4;

string message_model = 5;

string consume_from_where = 6;

bytes consumer_running_info = 7;
}

message ConsumerStatusRequest {
ProxyRequestContext context = 1;
// Consumer group name
string group = 2;

string client_id = 3;

bool jstack_enable = 4;
}




message TraceContext {
// Trace id
string trace_id = 1;
Expand Down Expand Up @@ -172,4 +212,5 @@ service ProxyService {
rpc producerClientConnection(ProducerClientConnectionRequest) returns (ProducerClientConnectionReply) {}
rpc consumerClientConnection(ConsumerClientConnectionRequest) returns (ConsumerClientConnectionReply) {}
rpc relay(RelayRequest) returns (RelayReply) {}
rpc consumerStatus(ConsumerStatusRequest) returns (ConsumerStatusReply) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import apache.rocketmq.proxy.v1.ConsumerClientConnection;
import apache.rocketmq.proxy.v1.ConsumerClientConnectionRequest;
import apache.rocketmq.proxy.v1.ConsumerStatusReply;
import apache.rocketmq.proxy.v1.ConsumerStatusRequest;
import apache.rocketmq.proxy.v1.ProducerClientConnection;
import apache.rocketmq.proxy.v1.ProducerClientConnectionRequest;
import apache.rocketmq.proxy.v1.QueueStats;
Expand All @@ -44,5 +46,8 @@ CompletableFuture<List<ProducerClientConnection>> producerClientConnection(Strin
CompletableFuture<List<ConsumerClientConnection>> consumerClientConnection(String target,
ConsumerClientConnectionRequest request);

CompletableFuture<ConsumerStatusReply> consumerStatus(String target, ConsumerStatusRequest request);


CompletableFuture<Status> relayMessage(String target, FlatMessage message);
}
Loading
Loading