Skip to content

Commit

Permalink
add rocketmq-client-java-5.x plugin (#598)
Browse files Browse the repository at this point in the history
  • Loading branch information
xyyz150 authored Aug 28, 2023
1 parent afa2578 commit 738201c
Show file tree
Hide file tree
Showing 25 changed files with 1,168 additions and 2 deletions.
1 change: 1 addition & 0 deletions .github/workflows/plugins-test.2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ jobs:
- jersey-2.26.x-2.39.x-scenario
- websphere-liberty-23.x-scenario
- nacos-client-2.x-scenario
- rocketmq-5-grpc-scenario
steps:
- uses: actions/checkout@v2
with:
Expand Down
3 changes: 2 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ Callable {
* Merge two instrumentation classes to avoid duplicate enhancements in MySQL plugins.
* Support asynchronous invocation in jetty client 9.0 and 9.x plugin
* Add nacos-client 2.x plugin
* Staticize the tags for preventing synchronization in JDK 8
* Staticize the tags for preventing synchronization in JDK 8
* Add RocketMQ-Client-Java 5.x plugin
* Fix NullPointerException in lettuce-5.x-plugin.

#### Documentation
Expand Down
1 change: 1 addition & 0 deletions apm-sniffer/apm-sdk-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@
<module>rocketMQ-5.x-plugin</module>
<module>websphere-liberty-23.x-plugin</module>
<module>aerospike-plugin</module>
<module>rocketMQ-client-java-5.x-plugin</module>
</modules>
<packaging>pom</packaging>

Expand Down
54 changes: 54 additions & 0 deletions apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-sdk-plugin</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>9.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>apm-rocketmq-client-java-5.x-plugin</artifactId>
<name>rocketMQ-client-java-5.x-plugin</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<rocketmq-client-java.version>5.0.5</rocketmq-client-java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>${rocketmq-client-java.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5;

import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define.ConsumerEnhanceInfos;

import java.lang.reflect.Method;

public class MessageListenerInterceptor implements InstanceMethodsAroundInterceptor {

public static final String CONSUMER_OPERATION_NAME_PREFIX = "RocketMQ/";

@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
MessageView messageView = (MessageView) allArguments[0];

ContextCarrier contextCarrier = getContextCarrierFromMessage(messageView);

AbstractSpan span = ContextManager.createEntrySpan(CONSUMER_OPERATION_NAME_PREFIX + messageView.getTopic()
+ "/Consumer", contextCarrier);
Tags.MQ_TOPIC.set(span, messageView.getTopic());

Object skyWalkingDynamicField = objInst.getSkyWalkingDynamicField();
if (skyWalkingDynamicField != null) {
ConsumerEnhanceInfos consumerEnhanceInfos = (ConsumerEnhanceInfos) skyWalkingDynamicField;
Tags.MQ_BROKER.set(span, consumerEnhanceInfos.getNamesrvAddr());
span.setPeer(consumerEnhanceInfos.getNamesrvAddr());
}

span.setComponent(ComponentsDefine.ROCKET_MQ_CONSUMER);
SpanLayer.asMQ(span);
}

@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
ConsumeResult status = (ConsumeResult) ret;
if (ConsumeResult.FAILURE.equals(status)) {
AbstractSpan activeSpan = ContextManager.activeSpan();
activeSpan.errorOccurred();
Tags.MQ_STATUS.set(activeSpan, status.name());
}
ContextManager.stopSpan();
return ret;
}

@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().log(t);
}

private ContextCarrier getContextCarrierFromMessage(MessageView message) {
ContextCarrier contextCarrier = new ContextCarrier();

CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
next.setHeadValue(message.getProperties().get(next.getHeadKey()));
}

return contextCarrier;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5;

import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.apis.producer.Transaction;
import org.apache.rocketmq.client.java.impl.ClientImpl;
import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.agent.core.util.CollectionUtil;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.util.StringUtil;

import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* {@link MessageSendInterceptor} create exit span when the method {@link org.apache.rocketmq.client.java.impl.producer.ProducerImpl#send(Message)}
* and {@link org.apache.rocketmq.client.java.impl.producer.ProducerImpl#send(Message, Transaction)} execute.
*/
public class MessageSendInterceptor implements InstanceMethodsAroundInterceptor {

public static final String ASYNC_SEND_OPERATION_NAME_PREFIX = "RocketMQ/";

@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
Message message = (Message) allArguments[0];
ClientImpl producerImpl = (ClientImpl) objInst;

ContextCarrier contextCarrier = new ContextCarrier();
String namingServiceAddress = producerImpl.getClientConfiguration().getEndpoints();
AbstractSpan span = ContextManager.createExitSpan(buildOperationName(message.getTopic()), contextCarrier, namingServiceAddress);
span.setComponent(ComponentsDefine.ROCKET_MQ_PRODUCER);
Tags.MQ_BROKER.set(span, namingServiceAddress);
Tags.MQ_TOPIC.set(span, message.getTopic());
Collection<String> keys = message.getKeys();
if (!CollectionUtil.isEmpty(keys)) {
span.tag(Tags.ofKey("mq.message.keys"), keys.stream().collect(Collectors.joining(",")));
}
Optional<String> tag = message.getTag();
if (tag.isPresent()) {
span.tag(Tags.ofKey("mq.message.tags"), tag.get());
}

contextCarrier.extensionInjector().injectSendingTimestamp();
SpanLayer.asMQ(span);

Map<String, String> properties = message.getProperties();
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
if (!StringUtil.isEmpty(next.getHeadValue())) {
properties.put(next.getHeadKey(), next.getHeadValue());
}
}

MessageBuilder messageBuilder = new MessageBuilderImpl();
messageBuilder.setTopic(message.getTopic());
if (message.getTag().isPresent()) {
messageBuilder.setTag(message.getTag().get());
}
messageBuilder.setKeys(message.getKeys().toArray(new String[0]));
if (message.getMessageGroup().isPresent()) {
messageBuilder.setMessageGroup(message.getMessageGroup().get());
}

byte[] body = new byte[message.getBody().limit()];
message.getBody().get(body);
messageBuilder.setBody(body);
if (message.getDeliveryTimestamp().isPresent()) {
messageBuilder.setDeliveryTimestamp(message.getDeliveryTimestamp().get());
}
properties.entrySet().forEach(item -> messageBuilder.addProperty(item.getKey(), item.getValue()));
allArguments[0] = messageBuilder.build();
}

@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
SendReceipt sendReceipt = (SendReceipt) ret;
if (sendReceipt != null && sendReceipt.getMessageId() != null) {
AbstractSpan activeSpan = ContextManager.activeSpan();
activeSpan.tag(Tags.ofKey("mq.message.id"), sendReceipt.getMessageId().toString());
}
ContextManager.stopSpan();
return ret;
}

@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().log(t);
}

private String buildOperationName(String topicName) {
return ASYNC_SEND_OPERATION_NAME_PREFIX + topicName + "/Producer";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5;

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.consumer.MessageListener;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
import org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define.ConsumerEnhanceInfos;

import java.util.Map;

/**
* {@link PushConsumerImplInterceptor} create exit span when the method {@link org.apache.rocketmq.client.java.impl.consumer.PushConsumerImpl#PushConsumerImpl(ClientConfiguration, String, Map, MessageListener, int, int, int)} execute.
*/
public class PushConsumerImplInterceptor implements InstanceConstructorInterceptor {

@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
ClientConfiguration clientConfiguration = (ClientConfiguration) allArguments[0];
String namesrvAddr = clientConfiguration.getEndpoints();
ConsumerEnhanceInfos consumerEnhanceInfos = new ConsumerEnhanceInfos(namesrvAddr);

if (allArguments[3] instanceof EnhancedInstance) {
EnhancedInstance enhancedMessageListener = (EnhancedInstance) allArguments[3];
enhancedMessageListener.setSkyWalkingDynamicField(consumerEnhanceInfos);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define;

public class ConsumerEnhanceInfos {

private String namesrvAddr;

public ConsumerEnhanceInfos(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}

public String getNamesrvAddr() {
return namesrvAddr;
}
}
Loading

0 comments on commit 738201c

Please sign in to comment.