diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 1c1e5385a65..3b299aebdf5 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -27,6 +27,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#7135](https://github.com/apache/incubator-seata/pull/7135)] treating a unique index conflict during rollback as a dirty write - [[#7150](https://github.com/apache/incubator-seata/pull/7150)] The time difference between the raft node and the follower node cannot synchronize data - [[#7102](https://github.com/apache/incubator-seata/pull/7150)] bugfix: modify XA mode pre commit transaction from commit phase to before close phase +- [[#7188](https://github.com/apache/incubator-seata/pull/7188)] bugfix: Fix missing branchType in BusinessActionContext ### optimize: @@ -46,7 +47,8 @@ Add changes here for all PR submitted to the 2.x branch. - [[#7142](https://github.com/apache/incubator-seata/pull/7142)] upgrade commons-compress to 1.27.1 - [[#7149](https://github.com/apache/incubator-seata/pull/7149)] Fix abnormal character display issues in ./distribution/NOTICE.md - [[#7170](https://github.com/apache/incubator-seata/pull/7170)] Optimize seata client I/O processing by adjusting thread count - +- [[#7179](https://github.com/apache/incubator-seata/pull/7179)] Use shared EventLoop for TM and RM clients to reduce thread overhead and improve performance +- [[#7194](https://github.com/apache/incubator-seata/pull/7194)] automatically skipping proxy for datasource of type AbstractRoutingDataSource ### security: - [[#6069](https://github.com/apache/incubator-seata/pull/6069)] Upgrade Guava dependencies to fix security vulnerabilities @@ -87,5 +89,6 @@ Thanks to these contributors for their code commits. Please report an unintended - [xingfudeshi](https://github.com/xingfudeshi) - [YongGoose](https://github.com/YongGoose) - [Monilnarang](https://github.com/Monilnarang) +- [iAmClever](https://github.com/iAmClever) Also, we receive many valuable issues, questions and advices from our community. Thanks for you all. diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index f28cba82abe..c4ee117c4c1 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -27,6 +27,8 @@ - [[#7135](https://github.com/apache/incubator-seata/pull/7135)] 回滚时遇到唯一索引冲突视为脏写 - [[#7150](https://github.com/apache/incubator-seata/pull/7150)] raft节点之前时间差,follower节点无法同步数据 - [[#7102](https://github.com/apache/incubator-seata/pull/7150)] 将XA模式预提交事务从提交阶段修改为关闭前阶段 +- [[#7188](https://github.com/apache/incubator-seata/pull/7188)] 修复 BusinessActionContext 中缺少的 branchType + ### optimize: @@ -46,7 +48,8 @@ - [[#7142](https://github.com/apache/incubator-seata/pull/7142)] 升级 commons-compress 至 1.27.1 版本 - [[#7149](https://github.com/apache/incubator-seata/pull/7149)] 修复./distribution/NOTICE.md文件中的异常字符串显示问题 - [[#7170](https://github.com/apache/incubator-seata/pull/7170)] 通过调整线程数优化 Seata 客户端 I/O 处理 - +- [[#7179](https://github.com/apache/incubator-seata/pull/7179)] 使用共享的 EventLoop 来减少 TM 和 RM 客户端的线程开销并提高性能 +- [[#7194](https://github.com/apache/incubator-seata/pull/7194)] 自动跳过对AbstractRoutingDataSource类型数据源的代理 ### security: - [[#6069](https://github.com/apache/incubator-seata/pull/6069)] 升级Guava依赖版本,修复安全漏洞 @@ -87,5 +90,6 @@ - [xingfudeshi](https://github.com/xingfudeshi) - [YongGoose](https://github.com/YongGoose) - [Monilnarang](https://github.com/Monilnarang) +- [iAmClever](https://github.com/iAmClever) 同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。 diff --git a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java index 0bcb83cf3c3..73746cd9202 100644 --- a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java +++ b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java @@ -709,6 +709,11 @@ public interface ConfigurationKeys { */ String WORKER_THREAD_SIZE = THREAD_FACTORY_PREFIX + "workerThreadSize"; + /** + * The constant ENABLE_SHARED_EVENTLOOP + */ + String ENABLE_CLIENT_SHARED_EVENTLOOP = TRANSPORT_PREFIX + "enableClientSharedEventLoopGroup"; + /** * The constant SHUTDOWN_PREFIX */ diff --git a/common/src/main/java/org/apache/seata/common/DefaultValues.java b/common/src/main/java/org/apache/seata/common/DefaultValues.java index 0c3480bab16..1f3cf436180 100644 --- a/common/src/main/java/org/apache/seata/common/DefaultValues.java +++ b/common/src/main/java/org/apache/seata/common/DefaultValues.java @@ -122,6 +122,10 @@ public interface DefaultValues { */ @Deprecated boolean DEFAULT_ENABLE_CLIENT_BATCH_SEND_REQUEST = true; + /** + * The constant DEFAULT_ENABLE_CLIENT_USE_SHARED_EVENT_LOOP. + */ + boolean DEFAULT_ENABLE_CLIENT_USE_SHARED_EVENT_LOOP = false; /** * The constant DEFAULT_ENABLE_TM_CLIENT_BATCH_SEND_REQUEST. */ diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java index 1c1bb748d52..08a6af4e8b5 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java @@ -56,18 +56,21 @@ /** * Rpc client. - * */ public class NettyClientBootstrap implements RemotingBootstrap { private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientBootstrap.class); + private static final String THREAD_PREFIX_SPLIT_CHAR = "_"; + + private static EventLoopGroup sharedEventLoopGroupWorker = null; + private final NettyClientConfig nettyClientConfig; private final Bootstrap bootstrap = new Bootstrap(); - private final EventLoopGroup eventLoopGroupWorker; - private EventExecutorGroup defaultEventExecutorGroup; private final AtomicBoolean initialized = new AtomicBoolean(false); - private static final String THREAD_PREFIX_SPLIT_CHAR = "_"; private final NettyPoolKey.TransactionRole transactionRole; + private final EventLoopGroup eventLoopGroupWorker; + + private EventExecutorGroup defaultEventExecutorGroup; private ChannelHandler[] channelHandlers; public NettyClientBootstrap(NettyClientConfig nettyClientConfig, final EventExecutorGroup eventExecutorGroup, @@ -81,14 +84,15 @@ public NettyClientBootstrap(NettyClientConfig nettyClientConfig, final EventExec this.nettyClientConfig = nettyClientConfig; int selectorThreadSizeThreadSize = this.nettyClientConfig.getClientSelectorThreadSize(); this.transactionRole = transactionRole; - if (NettyServerConfig.enableEpoll()) { - this.eventLoopGroupWorker = new EpollEventLoopGroup(selectorThreadSizeThreadSize, - new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()), - selectorThreadSizeThreadSize)); + + boolean enableClientSharedEventLoop = this.nettyClientConfig.getEnableClientSharedEventLoop(); + if (enableClientSharedEventLoop) { + if (sharedEventLoopGroupWorker == null) { + sharedEventLoopGroupWorker = getOrCreateEventLoopGroupWorker(selectorThreadSizeThreadSize); + } + eventLoopGroupWorker = sharedEventLoopGroupWorker; } else { - this.eventLoopGroupWorker = new NioEventLoopGroup(selectorThreadSizeThreadSize, - new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()), - selectorThreadSizeThreadSize)); + eventLoopGroupWorker = createEventLoopGroupWorker(selectorThreadSizeThreadSize); } this.defaultEventExecutorGroup = eventExecutorGroup; } @@ -123,7 +127,7 @@ public void start() { new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()), nettyClientConfig.getClientWorkerThreads())); } - this.bootstrap.group(this.eventLoopGroupWorker).channel( + this.bootstrap.group(eventLoopGroupWorker).channel( nettyClientConfig.getClientChannelClazz()).option( ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option( ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).option( @@ -170,7 +174,7 @@ public void initChannel(SocketChannel ch) { @Override public void shutdown() { try { - this.eventLoopGroupWorker.shutdownGracefully(); + eventLoopGroupWorker.shutdownGracefully(); if (this.defaultEventExecutorGroup != null) { this.defaultEventExecutorGroup.shutdownGracefully(); } @@ -233,4 +237,23 @@ public void handlerAdded(ChannelHandlerContext ctx) { private String getThreadPrefix(String threadPrefix) { return threadPrefix + THREAD_PREFIX_SPLIT_CHAR + transactionRole.name(); } + + private EventLoopGroup getOrCreateEventLoopGroupWorker(int selectorThreadSizeThreadSize) { + if (eventLoopGroupWorker == null) { + return createEventLoopGroupWorker(selectorThreadSizeThreadSize); + } + return eventLoopGroupWorker; + } + + private EventLoopGroup createEventLoopGroupWorker(int selectorThreadSizeThreadSize) { + if (NettyServerConfig.enableEpoll()) { + return new EpollEventLoopGroup(selectorThreadSizeThreadSize, + new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()), + selectorThreadSizeThreadSize)); + } + + return new NioEventLoopGroup(selectorThreadSizeThreadSize, + new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()), + selectorThreadSizeThreadSize)); + } } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientConfig.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientConfig.java index d31794cbe48..87877653ebf 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientConfig.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientConfig.java @@ -17,7 +17,7 @@ package org.apache.seata.core.rpc.netty; import io.netty.channel.Channel; -import org.apache.seata.core.constants.ConfigurationKeys; +import org.apache.seata.common.ConfigurationKeys; import org.apache.seata.core.rpc.TransportServerType; import static org.apache.seata.common.DefaultValues.DEFAULT_ENABLE_CLIENT_BATCH_SEND_REQUEST; @@ -25,6 +25,7 @@ import static org.apache.seata.common.DefaultValues.DEFAULT_RPC_RM_REQUEST_TIMEOUT; import static org.apache.seata.common.DefaultValues.DEFAULT_RPC_TM_REQUEST_TIMEOUT; import static org.apache.seata.common.DefaultValues.DEFAULT_SELECTOR_THREAD_PREFIX; +import static org.apache.seata.common.DefaultValues.DEFAULT_ENABLE_CLIENT_USE_SHARED_EVENT_LOOP; import static org.apache.seata.common.DefaultValues.DEFAULT_WORKER_THREAD_PREFIX; /** @@ -354,6 +355,10 @@ public int getClientSelectorThreadSize() { return threadSize > 0 ? threadSize : WorkThreadMode.Default.getValue(); } + public boolean getEnableClientSharedEventLoop() { + return CONFIG.getBoolean(ConfigurationKeys.ENABLE_CLIENT_SHARED_EVENTLOOP, DEFAULT_ENABLE_CLIENT_USE_SHARED_EVENT_LOOP); + } + /** * Get max acquire conn mills long. * diff --git a/core/src/test/java/org/apache/seata/core/rpc/netty/NettyClientBootstrapTest.java b/core/src/test/java/org/apache/seata/core/rpc/netty/NettyClientBootstrapTest.java new file mode 100644 index 00000000000..5c896852cc9 --- /dev/null +++ b/core/src/test/java/org/apache/seata/core/rpc/netty/NettyClientBootstrapTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty; + +import io.netty.channel.EventLoopGroup; +import io.netty.util.concurrent.DefaultEventExecutorGroup; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class NettyClientBootstrapTest { + + @Mock + private NettyClientConfig nettyClientConfig; + private DefaultEventExecutorGroup eventExecutorGroup; + + @BeforeEach + void init() { + eventExecutorGroup = new DefaultEventExecutorGroup(1); + } + + @Test + void testSharedEventLoopGroupEnabled() { + when(nettyClientConfig.getEnableClientSharedEventLoop()).thenReturn(true); + NettyClientBootstrap tmNettyClientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, NettyPoolKey.TransactionRole.TMROLE); + EventLoopGroup tmEventLoopGroupWorker = getEventLoopGroupWorker(tmNettyClientBootstrap); + + NettyClientBootstrap rmNettyClientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, NettyPoolKey.TransactionRole.RMROLE); + EventLoopGroup rmEventLoopGroupWorker = getEventLoopGroupWorker(rmNettyClientBootstrap); + + Assertions.assertEquals(tmEventLoopGroupWorker, rmEventLoopGroupWorker); + } + + @Test + void testSharedEventLoopGroupDisabled() { + when(nettyClientConfig.getEnableClientSharedEventLoop()).thenReturn(false); + NettyClientBootstrap tmNettyClientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, NettyPoolKey.TransactionRole.TMROLE); + EventLoopGroup tmEventLoopGroupWorker = getEventLoopGroupWorker(tmNettyClientBootstrap); + + NettyClientBootstrap rmNettyClientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, NettyPoolKey.TransactionRole.RMROLE); + EventLoopGroup rmEventLoopGroupWorker = getEventLoopGroupWorker(rmNettyClientBootstrap); + + Assertions.assertNotEquals(tmEventLoopGroupWorker, rmEventLoopGroupWorker); + } + + private EventLoopGroup getEventLoopGroupWorker(NettyClientBootstrap bootstrap) { + try { + java.lang.reflect.Field field = NettyClientBootstrap.class.getDeclaredField("eventLoopGroupWorker"); + field.setAccessible(true); + return (EventLoopGroup) field.get(bootstrap); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} \ No newline at end of file diff --git a/script/client/conf/file.conf b/script/client/conf/file.conf index 80858d0c1f8..e664dc2ed10 100644 --- a/script/client/conf/file.conf +++ b/script/client/conf/file.conf @@ -34,6 +34,8 @@ transport { rpcTmRequestTimeout = 30000 # the rm client rpc request timeout rpcRmRequestTimeout = 15000 + # the shared event loop group enable + enableClientSharedEventLoopGroup = false #thread factory for netty threadFactory { bossThreadPrefix = "NettyBoss" diff --git a/script/client/spring/application.properties b/script/client/spring/application.properties index af8c19ae605..f3f14b02655 100755 --- a/script/client/spring/application.properties +++ b/script/client/spring/application.properties @@ -84,6 +84,7 @@ seata.transport.enable-tm-client-batch-send-request=false seata.transport.enable-rm-client-batch-send-request=true seata.transport.rpc-rm-request-timeout=15000 seata.transport.rpc-tm-request-timeout=30000 +seata.transport.enable-client-shared-event-loop-group=false seata.config.type=file diff --git a/script/client/spring/application.yml b/script/client/spring/application.yml index 5ad6816a860..a25879b7afe 100755 --- a/script/client/spring/application.yml +++ b/script/client/spring/application.yml @@ -95,6 +95,7 @@ seata: enable-rm-client-batch-send-request: true rpc-rm-request-timeout: 15000 rpc-tm-request-timeout: 30000 + enable-client-shared-event-loop-group: false config: type: file consul: diff --git a/script/config-center/config.txt b/script/config-center/config.txt index 92039c90db9..cf736689b68 100644 --- a/script/config-center/config.txt +++ b/script/config-center/config.txt @@ -27,6 +27,7 @@ transport.enableTcServerBatchSendResponse=false transport.rpcRmRequestTimeout=30000 transport.rpcTmRequestTimeout=30000 transport.rpcTcRequestTimeout=30000 +transport.enableClientSharedEventLoopGroup=false transport.threadFactory.bossThreadPrefix=NettyBoss transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/TransportProperties.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/TransportProperties.java index 64f38a385d0..57c5585e2fc 100644 --- a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/TransportProperties.java +++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/TransportProperties.java @@ -28,6 +28,7 @@ import static org.apache.seata.common.DefaultValues.DEFAULT_RPC_TC_REQUEST_TIMEOUT; import static org.apache.seata.common.DefaultValues.DEFAULT_RPC_TM_REQUEST_TIMEOUT; import static org.apache.seata.common.DefaultValues.DEFAULT_TRANSPORT_HEARTBEAT; +import static org.apache.seata.common.DefaultValues.DEFAULT_ENABLE_CLIENT_USE_SHARED_EVENT_LOOP; import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.TRANSPORT_PREFIX; @@ -92,6 +93,11 @@ public class TransportProperties { */ private long rpcTcRequestTimeout = DEFAULT_RPC_TC_REQUEST_TIMEOUT; + /** + * use shared event loop group + */ + private boolean enableClientSharedEventLoop = DEFAULT_ENABLE_CLIENT_USE_SHARED_EVENT_LOOP; + public String getType() { return type; @@ -193,10 +199,18 @@ public long getRpcTcRequestTimeout() { return rpcTcRequestTimeout; } + public boolean isEnableClientSharedEventLoop() { + return enableClientSharedEventLoop; + } + public void setRpcTcRequestTimeout(long rpcTcRequestTimeout) { this.rpcTcRequestTimeout = rpcTcRequestTimeout; } + public void setEnableClientSharedEventLoop(boolean useSharedEventLoop) { + this.enableClientSharedEventLoop = useSharedEventLoop; + } + public String getProtocol() { return protocol; } diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/test/java/org/apache/seata/spring/boot/autoconfigure/CorePropertiesTest.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/test/java/org/apache/seata/spring/boot/autoconfigure/CorePropertiesTest.java index decf64708d7..618fc6e8617 100644 --- a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/test/java/org/apache/seata/spring/boot/autoconfigure/CorePropertiesTest.java +++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/test/java/org/apache/seata/spring/boot/autoconfigure/CorePropertiesTest.java @@ -70,6 +70,7 @@ public void testTransportProperties() { assertEquals("seata", context.getBean(TransportProperties.class).getSerialization()); assertEquals("none", context.getBean(TransportProperties.class).getCompressor()); assertTrue(context.getBean(TransportProperties.class).isEnableClientBatchSendRequest()); + assertFalse(context.getBean(TransportProperties.class).isEnableClientSharedEventLoop()); } @Test diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/test/java/org/apache/seata/spring/boot/autoconfigure/properties/TransportPropertiesTest.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/test/java/org/apache/seata/spring/boot/autoconfigure/properties/TransportPropertiesTest.java index 3bb56b1cc44..429950dd110 100644 --- a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/test/java/org/apache/seata/spring/boot/autoconfigure/properties/TransportPropertiesTest.java +++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/test/java/org/apache/seata/spring/boot/autoconfigure/properties/TransportPropertiesTest.java @@ -36,6 +36,7 @@ public void testTransportProperties() { transportProperties.setRpcRmRequestTimeout(1); transportProperties.setRpcTmRequestTimeout(1); transportProperties.setRpcTcRequestTimeout(1); + transportProperties.setEnableClientSharedEventLoop(true); Assertions.assertEquals("server", transportProperties.getServer()); Assertions.assertEquals("type", transportProperties.getType()); @@ -49,5 +50,6 @@ public void testTransportProperties() { Assertions.assertEquals(1, transportProperties.getRpcRmRequestTimeout()); Assertions.assertEquals(1, transportProperties.getRpcTmRequestTimeout()); Assertions.assertEquals(1, transportProperties.getRpcTcRequestTimeout()); + Assertions.assertTrue(transportProperties.isEnableClientSharedEventLoop()); } } diff --git a/spring/src/main/java/org/apache/seata/spring/annotation/datasource/SeataAutoDataSourceProxyCreator.java b/spring/src/main/java/org/apache/seata/spring/annotation/datasource/SeataAutoDataSourceProxyCreator.java index 50aff3449dc..ef50a211c31 100644 --- a/spring/src/main/java/org/apache/seata/spring/annotation/datasource/SeataAutoDataSourceProxyCreator.java +++ b/spring/src/main/java/org/apache/seata/spring/annotation/datasource/SeataAutoDataSourceProxyCreator.java @@ -71,7 +71,7 @@ protected boolean shouldSkip(Class beanClass, String beanName) { @Override protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) { // we only care DataSource bean - if (!(bean instanceof DataSource)) { + if (!(bean instanceof DataSource) || isAbstractRoutingDataSource(bean)) { return bean; } @@ -108,6 +108,22 @@ protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) return originEnhancer; } + /** + * Checks if the given bean is an instance of AbstractRoutingDataSource. + * + * @param bean the object to check + * @return true if the bean is an instance of AbstractRoutingDataSource, false otherwise + */ + private boolean isAbstractRoutingDataSource(Object bean) { + try { + Class clazz = Class.forName("org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource"); + return clazz.isAssignableFrom(bean.getClass()); + } catch (ClassNotFoundException e) { + // AbstractRoutingDataSource not found + return false; + } + } + SeataDataSourceProxy buildProxy(DataSource origin, String proxyMode) { if (BranchType.AT.name().equalsIgnoreCase(proxyMode)) { return new DataSourceProxy(origin); diff --git a/tcc/src/main/java/org/apache/seata/rm/tcc/TCCResourceManager.java b/tcc/src/main/java/org/apache/seata/rm/tcc/TCCResourceManager.java index ca49136ae1c..e252caf0ca4 100644 --- a/tcc/src/main/java/org/apache/seata/rm/tcc/TCCResourceManager.java +++ b/tcc/src/main/java/org/apache/seata/rm/tcc/TCCResourceManager.java @@ -121,6 +121,7 @@ public BranchStatus branchCommit(BranchType branchType, String xid, long branchI //BusinessActionContext businessActionContext = BusinessActionContextUtil.getBusinessActionContext(xid, branchId, resourceId, applicationData); + businessActionContext.setBranchType(branchType); Object[] args = this.getTwoPhaseCommitArgs(tccResource, businessActionContext); //share actionContext implicitly @@ -188,6 +189,8 @@ public BranchStatus branchRollback(BranchType branchType, String xid, long branc //BusinessActionContext businessActionContext = BusinessActionContextUtil.getBusinessActionContext(xid, branchId, resourceId, applicationData); + businessActionContext.setBranchType(branchType); + Object[] args = this.getTwoPhaseRollbackArgs(tccResource, businessActionContext); //share actionContext implicitly BusinessActionContextUtil.setContext(businessActionContext);