Skip to content

Commit 0db08f5

Browse files
authored
optimize: optimization apm-skwalking operation method to generate rules (apache#3840)
1 parent 21e0264 commit 0db08f5

File tree

11 files changed

+85
-40
lines changed

11 files changed

+85
-40
lines changed

changes/1.5.0.md

+3
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ Seata 是一款开源的分布式事务解决方案,提供高性能和简单
3939
- [[#3740](https://github.com/seata/seata/pull/3740)] 修复在某些情况下,当`Saga`事务结束时`LocalThread`未被清除的问题
4040
- [[#3792](https://github.com/seata/seata/pull/3792)] 修复Server 无法获取 redis host的问题
4141
- [[#3828](https://github.com/seata/seata/pull/3828)] 修复StringUtils抛出StackOverflowError的问题
42+
- [[#3817](https://github.com/seata/seata/pull/3817)] 修复TC在SkyWalking拓扑图节点不汇聚的问题
4243

4344

4445
### optimize:
@@ -61,6 +62,7 @@ Seata 是一款开源的分布式事务解决方案,提供高性能和简单
6162
- [[#3738](https://github.com/seata/seata/pull/3738)] `JacksonUndoLogParser`支持解析`LocalDateTime`(支持微秒时间)
6263
- [[#3794](https://github.com/seata/seata/pull/3794)] 优化`seata-server`的打包配置,修正Dockerfile的错误配置,并将Dockerfile也打包进去。
6364
- [[#3795](https://github.com/seata/seata/pull/3795)] 优化`zkRegistry`lookup方法性能
65+
- [[#3840](https://github.com/seata/seata/pull/3840)] 优化`apm-skwalking`操作方法生成规则
6466

6567

6668
### test:
@@ -87,6 +89,7 @@ Seata 是一款开源的分布式事务解决方案,提供高性能和简单
8789
- [kaka2code](https://github.com/kaka2code)
8890
- [objcoding](https://github.com/objcoding)
8991
- [iqinning](https://github.com/iqinning)
92+
- [zhaoyuguang](https://github.com/zhaoyuguang)
9093

9194
同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。
9295

changes/en-us/1.5.0.md

+3
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
- [[#3740](https://github.com/seata/seata/pull/3740)] fix that `LocalThread` is not cleared when the `Saga` transaction ends
4141
- [[#3792](https://github.com/seata/seata/pull/3792)] fix the Server can't find redis-host property
4242
- [[#3828](https://github.com/seata/seata/pull/3828)] fix StringUtils StackOverflowError
43+
- [[#3817](https://github.com/seata/seata/pull/3817)] fix TC SkyWalking topo calling node not gather
4344

4445

4546
### optimize:
@@ -62,6 +63,7 @@
6263
- [[#3738](https://github.com/seata/seata/pull/3738)] `JacksonUndoLogParser` supports to parsing `LocalDateTime`.
6364
- [[#3794](https://github.com/seata/seata/pull/3794)] optimize the packaging of `seata-server`
6465
- [[#3795](https://github.com/seata/seata/pull/3795)] optimize zk registry lookup performance
66+
- [[#3840](https://github.com/seata/seata/pull/3840)] optimization `apm-skwalking` operation method to generate rules
6567

6668

6769
### test:
@@ -88,6 +90,7 @@
8890
- [kaka2code](https://github.com/kaka2code)
8991
- [objcoding](https://github.com/objcoding)
9092
- [iqinning](https://github.com/iqinning)
93+
- [zhaoyuguang](https://github.com/zhaoyuguang)
9194

9295

9396
Also, we receive many valuable issues, questions and advices from our community. Thanks for you all.

ext/apm-seata-skywalking-plugin/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
<dependency>
3333
<groupId>org.apache.skywalking</groupId>
3434
<artifactId>apm-agent-core</artifactId>
35-
<version>8.4.0</version>
35+
<version>8.6.0</version>
3636
<scope>provided</scope>
3737
</dependency>
3838
<dependency>

ext/apm-seata-skywalking-plugin/src/main/java/io/seata/apm/skywalking/plugin/DefaultCoreDoGlobalCommitInterceptor.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
2121
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
2222
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
23-
import io.seata.apm.skywalking.plugin.common.SWSeataConstants;
23+
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
2424

2525
import java.lang.reflect.Method;
2626

@@ -32,8 +32,8 @@ public class DefaultCoreDoGlobalCommitInterceptor implements InstanceMethodsArou
3232
@Override
3333
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
3434
MethodInterceptResult result) throws Throwable {
35-
AbstractSpan activeSpan = ContextManager.createLocalSpan(SWSeataConstants.SEATA_NAME + "/TC/doGlobalCommit");
36-
// activeSpan.setComponent(ComponentsDefine.SEATA);
35+
AbstractSpan activeSpan = ContextManager.createLocalSpan(ComponentsDefine.SEATA.getName() + "/TC/doGlobalCommit");
36+
activeSpan.setComponent(ComponentsDefine.SEATA);
3737
}
3838

3939
@Override

ext/apm-seata-skywalking-plugin/src/main/java/io/seata/apm/skywalking/plugin/NettyRemotingClientSendSyncInterceptor.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
2727
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
2828
import io.seata.apm.skywalking.plugin.common.SWSeataUtils;
29+
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
2930

3031
import java.lang.reflect.Method;
3132

@@ -46,7 +47,7 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
4647
String operationName = SWSeataUtils.convertOperationName(rpcMessage);
4748
ContextCarrier contextCarrier = new ContextCarrier();
4849
AbstractSpan activeSpan = ContextManager.createExitSpan(operationName, contextCarrier, peer);
49-
// activeSpan.setComponent(ComponentsDefine.SEATA);
50+
activeSpan.setComponent(ComponentsDefine.SEATA);
5051
activeSpan.setPeer(peer);
5152
SpanLayer.asRPCFramework(activeSpan);
5253
CarrierItem next = contextCarrier.items();

ext/apm-seata-skywalking-plugin/src/main/java/io/seata/apm/skywalking/plugin/RemotingProcessorProcessInterceptor.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
2626
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
2727
import io.seata.apm.skywalking.plugin.common.SWSeataUtils;
28+
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
2829

2930
import java.lang.reflect.Method;
3031

@@ -46,7 +47,7 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
4647
}
4748
AbstractSpan activeSpan = ContextManager.createEntrySpan(operationName, contextCarrier);
4849
SpanLayer.asRPCFramework(activeSpan);
49-
// activeSpan.setComponent(ComponentsDefine.SEATA);
50+
activeSpan.setComponent(ComponentsDefine.SEATA);
5051
}
5152

5253
@Override

ext/apm-seata-skywalking-plugin/src/main/java/io/seata/apm/skywalking/plugin/common/SWSeataConstants.java

+13-26
Original file line numberDiff line numberDiff line change
@@ -15,39 +15,26 @@
1515
*/
1616
package io.seata.apm.skywalking.plugin.common;
1717

18-
import java.util.HashMap;
19-
import java.util.Map;
18+
import java.util.HashSet;
19+
import java.util.Set;
2020

2121
/**
2222
* @author zhaoyuguang
2323
*/
2424
public class SWSeataConstants {
2525

26-
public static final Map<String, String> OPERATION_NAME_MAPPING = new HashMap<>();
27-
public static final String SEATA_NAME = "SEATA";
26+
private static final Set<String> TRANSACTION_MANAGER_OPERATION_NAME_MAPPING = new HashSet<>();
2827

2928
static {
30-
OPERATION_NAME_MAPPING.put("GlobalBeginRequest", SEATA_NAME + "/TM/" + "GlobalBeginRequest");
31-
OPERATION_NAME_MAPPING.put("GlobalBeginResponse", SEATA_NAME + "/TM/" + "GlobalBeginResponse");
32-
OPERATION_NAME_MAPPING.put("GlobalRollbackRequest", SEATA_NAME + "/TM/" + "GlobalRollbackRequest");
33-
OPERATION_NAME_MAPPING.put("GlobalRollbackResponse", SEATA_NAME + "/TM/" + "GlobalRollbackResponse");
34-
OPERATION_NAME_MAPPING.put("GlobalCommitRequest", SEATA_NAME + "/TM/" + "GlobalCommitRequest");
35-
OPERATION_NAME_MAPPING.put("GlobalCommitResponse", SEATA_NAME + "/TM/" + "GlobalCommitResponse");
36-
OPERATION_NAME_MAPPING.put("BranchRegisterRequest", SEATA_NAME + "/RM/" + "BranchRegisterRequest");
37-
OPERATION_NAME_MAPPING.put("BranchRegisterResponse", SEATA_NAME + "/RM/" + "BranchRegisterResponse");
38-
OPERATION_NAME_MAPPING.put("BranchRollbackRequest", SEATA_NAME + "/RM/" + "BranchRollbackRequest");
39-
OPERATION_NAME_MAPPING.put("BranchRollbackResponse", SEATA_NAME + "/RM/" + "BranchRollbackResponse");
40-
OPERATION_NAME_MAPPING.put("BranchCommitRequest", SEATA_NAME + "/RM/" + "BranchCommitRequest");
41-
OPERATION_NAME_MAPPING.put("BranchCommitResponse", SEATA_NAME + "/RM/" + "BranchCommitResponse");
42-
OPERATION_NAME_MAPPING.put("BranchReportRequest", SEATA_NAME + "/RM/" + "BranchReportRequest");
43-
OPERATION_NAME_MAPPING.put("BranchReportResponse", SEATA_NAME + "/RM/" + "BranchReportResponse");
44-
OPERATION_NAME_MAPPING.put("GlobalLockQueryRequest", SEATA_NAME + "/RM/" + "GlobalLockQueryRequest");
45-
OPERATION_NAME_MAPPING.put("GlobalLockQueryResponse", SEATA_NAME + "/RM/" + "GlobalLockQueryResponse");
46-
OPERATION_NAME_MAPPING.put("UndoLogDeleteRequest", SEATA_NAME + "/RM/" + "UndoLogDeleteRequest");
47-
OPERATION_NAME_MAPPING.put("UndoLogDeleteResponse", SEATA_NAME + "/RM/" + "UndoLogDeleteResponse");
48-
OPERATION_NAME_MAPPING.put("RegisterRMRequest", SEATA_NAME + "/RM/" + "RegisterRMRequest");
49-
OPERATION_NAME_MAPPING.put("RegisterRMResponse", SEATA_NAME + "/RM/" + "RegisterRMResponse");
50-
OPERATION_NAME_MAPPING.put("RegisterTMRequest", SEATA_NAME + "/RM/" + "RegisterTMRequest");
51-
OPERATION_NAME_MAPPING.put("RegisterTMResponse", SEATA_NAME + "/RM/" + "RegisterTMResponse");
29+
TRANSACTION_MANAGER_OPERATION_NAME_MAPPING.add("GlobalBeginRequest");
30+
TRANSACTION_MANAGER_OPERATION_NAME_MAPPING.add("GlobalBeginResponse");
31+
TRANSACTION_MANAGER_OPERATION_NAME_MAPPING.add("GlobalRollbackRequest");
32+
TRANSACTION_MANAGER_OPERATION_NAME_MAPPING.add("GlobalRollbackResponse");
33+
TRANSACTION_MANAGER_OPERATION_NAME_MAPPING.add("GlobalCommitRequest");
34+
TRANSACTION_MANAGER_OPERATION_NAME_MAPPING.add("GlobalCommitResponse");
35+
}
36+
37+
public static boolean isTransactionManagerOperationName(String operationName) {
38+
return TRANSACTION_MANAGER_OPERATION_NAME_MAPPING.contains(operationName);
5239
}
5340
}

ext/apm-seata-skywalking-plugin/src/main/java/io/seata/apm/skywalking/plugin/common/SWSeataUtils.java

+5-6
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import io.netty.channel.Channel;
1919
import io.seata.core.protocol.RpcMessage;
20+
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
2021

2122
/**
2223
* @author zhaoyuguang
@@ -34,13 +35,11 @@ public static String convertPeer(Channel channel) {
3435
public static String convertOperationName(RpcMessage rpcMessage) {
3536
String requestSimpleName = rpcMessage.getBody().getClass().getSimpleName();
3637
if (SeataPluginConfig.Plugin.SEATA.SERVER) {
37-
return SWSeataConstants.SEATA_NAME + "/TC/" + requestSimpleName;
38+
return ComponentsDefine.SEATA.getName() + "/TC/" + requestSimpleName;
3839
}
39-
String operationName = SWSeataConstants.OPERATION_NAME_MAPPING.get(requestSimpleName);
40-
if (operationName == null) {
41-
operationName = SWSeataConstants.SEATA_NAME + "/" + requestSimpleName;
40+
if (SWSeataConstants.isTransactionManagerOperationName(requestSimpleName)) {
41+
return ComponentsDefine.SEATA.getName() + "/TM/" + requestSimpleName;
4242
}
43-
return operationName;
44-
43+
return ComponentsDefine.SEATA.getName() + "/RM/" + requestSimpleName;
4544
}
4645
}

ext/apm-seata-skywalking-plugin/src/main/java/io/seata/apm/skywalking/plugin/define/DefaultCoreInstrumentation.java

-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
/**
2929
* @author zhaoyuguang
3030
*/
31-
3231
public class DefaultCoreInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
3332

3433
private static final String ENHANCE_CLASS_TM = "io.seata.server.coordinator.DefaultCore";

ext/apm-seata-skywalking-plugin/src/main/java/io/seata/apm/skywalking/plugin/define/RemotingProcessorInstrumentation.java

-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
/**
2929
* @author zhaoyuguang
3030
*/
31-
3231
public class RemotingProcessorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
3332

3433
private static final String INTERCEPTOR_CLASS = "io.seata.apm.skywalking.plugin.RemotingProcessorProcessInterceptor";
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 1999-2019 Seata.io Group.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.seata.apm.skywalking.plugin.common;
17+
18+
import io.seata.core.protocol.AbstractMessage;
19+
import io.seata.core.protocol.RegisterRMRequest;
20+
import io.seata.core.protocol.RegisterRMResponse;
21+
import io.seata.core.protocol.RpcMessage;
22+
import io.seata.core.protocol.transaction.GlobalBeginRequest;
23+
import org.junit.jupiter.api.Assertions;
24+
import org.junit.jupiter.api.Test;
25+
26+
/**
27+
* @author zhaoyuguang
28+
*/
29+
public class SWSeataUtilsTest {
30+
31+
@Test
32+
public void testConvertOperationName() {
33+
{
34+
RpcMessage rpcMessage = new RpcMessage();
35+
AbstractMessage abstractMessage = new GlobalBeginRequest();
36+
rpcMessage.setBody(abstractMessage);
37+
Assertions.assertEquals(SWSeataUtils.convertOperationName(rpcMessage), "Seata/TM/GlobalBeginRequest");
38+
}
39+
{
40+
RpcMessage rpcMessage = new RpcMessage();
41+
AbstractMessage abstractMessage = new RegisterRMRequest();
42+
rpcMessage.setBody(abstractMessage);
43+
Assertions.assertEquals(SWSeataUtils.convertOperationName(rpcMessage), "Seata/RM/RegisterRMRequest");
44+
}
45+
{
46+
SeataPluginConfig.Plugin.SEATA.SERVER = true;
47+
RpcMessage rpcMessage = new RpcMessage();
48+
AbstractMessage abstractMessage = new RegisterRMResponse();
49+
rpcMessage.setBody(abstractMessage);
50+
Assertions.assertEquals(SWSeataUtils.convertOperationName(rpcMessage), "Seata/TC/RegisterRMResponse");
51+
}
52+
}
53+
}

0 commit comments

Comments
 (0)