-
Notifications
You must be signed in to change notification settings - Fork 8.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
optimize : skip sending some request if client-version is v0 #6998
base: 2.x
Are you sure you want to change the base?
Changes from 13 commits
bf81cf0
cbd0825
6cfdf72
a2cad95
ecc66c6
16f09c3
f1fe15c
382aa1b
b1af528
c57ffb1
9ce40a3
f75043f
628c965
3e3f5da
cfde859
1cde9d3
206b3f1
3e616e5
331afa7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.seata.core.protocol; | ||
|
||
/** | ||
* The type Version not support message. | ||
* | ||
*/ | ||
public class VersionNotSupportMessage extends AbstractMessage { | ||
@Override | ||
public short getTypeCode() { | ||
return MessageType.VERSION_NOT_SUPPORT; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.seata.core.rpc; | ||
|
||
import io.netty.channel.Channel; | ||
import org.apache.seata.common.util.StringUtils; | ||
import org.apache.seata.core.protocol.MessageType; | ||
import org.apache.seata.core.protocol.MessageTypeAware; | ||
import org.apache.seata.core.protocol.RpcMessage; | ||
import org.apache.seata.core.protocol.Version; | ||
|
||
import java.util.Arrays; | ||
import java.util.List; | ||
|
||
/** | ||
* the type ServerSkipMsgHelper | ||
**/ | ||
public class MsgVersionHelper { | ||
|
||
private static final List<Short> SKIP_MSG_CODE_V0 = Arrays.asList(MessageType.TYPE_RM_DELETE_UNDOLOG); | ||
|
||
public static boolean versionNotSupport(Channel channel, RpcMessage rpcMessage) { | ||
if (rpcMessage == null || rpcMessage.getBody() == null || channel == null) { | ||
return false; | ||
} | ||
Object msg = rpcMessage.getBody(); | ||
String version = Version.getChannelVersion(channel); | ||
if (StringUtils.isBlank(version) || msg == null) { | ||
return false; | ||
} | ||
boolean aboveV0 = Version.isAboveOrEqualVersion071(version); | ||
if (aboveV0 || !(msg instanceof MessageTypeAware)) { | ||
return false; | ||
} | ||
short typeCode = ((MessageTypeAware) msg).getTypeCode(); | ||
return SKIP_MSG_CODE_V0.contains(typeCode); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pls confirm that the zero value is a message TypeCode with no business significance. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SKIP_MSG_CODE_V0 is not a zero value, it is a list of types to skip.
|
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,7 +43,9 @@ | |
import org.apache.seata.core.protocol.MessageTypeAware; | ||
import org.apache.seata.core.protocol.ProtocolConstants; | ||
import org.apache.seata.core.protocol.RpcMessage; | ||
import org.apache.seata.core.protocol.VersionNotSupportMessage; | ||
import org.apache.seata.core.rpc.Disposable; | ||
import org.apache.seata.core.rpc.MsgVersionHelper; | ||
import org.apache.seata.core.rpc.hook.RpcHook; | ||
import org.apache.seata.core.rpc.processor.Pair; | ||
import org.apache.seata.core.rpc.processor.RemotingProcessor; | ||
|
@@ -173,6 +175,12 @@ protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMi | |
LOGGER.warn("sendSync nothing, caused by null channel."); | ||
return null; | ||
} | ||
if (MsgVersionHelper.versionNotSupport(channel, rpcMessage)) { | ||
if (LOGGER.isDebugEnabled()) { | ||
LOGGER.debug("Message sending will be skipped as the client version does not support it,{}", rpcMessage); | ||
} | ||
return new VersionNotSupportMessage(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 看起来没有使用到它的地方,后续将有何作用? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
比如以后的版本里如果要sync发送某个全新的request类,旧版本无法识别,在remote通信这层会返回VersionNotSupportMessage,发送者判断到返回类型是这个后做忽略处理或者抛出异常 For example, if you want to send a sync to a new request class in a future version that is not recognized by the old version, the versionNotSupportMessage will be returned at the remote communication level, and the sender will either ignore the return type or throw an exception. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not throw an exception? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If an exception is thrown, the server may abort the current execution. In that case, the message-sending step should be skipped, yet still considered successfully completed. |
||
} | ||
|
||
MessageFuture messageFuture = new MessageFuture(); | ||
messageFuture.setRequestMessage(rpcMessage); | ||
|
@@ -216,6 +224,12 @@ protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMi | |
* @param rpcMessage rpc message | ||
*/ | ||
protected void sendAsync(Channel channel, RpcMessage rpcMessage) { | ||
if (MsgVersionHelper.versionNotSupport(channel, rpcMessage)) { | ||
if (LOGGER.isDebugEnabled()) { | ||
LOGGER.debug("Message sending will be skipped as the client version does not support it,{}", rpcMessage); | ||
} | ||
return; | ||
} | ||
channelWritableCheck(channel, rpcMessage.getBody()); | ||
if (LOGGER.isDebugEnabled()) { | ||
LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.seata.core.rpc.netty; | ||
|
||
import io.netty.channel.Channel; | ||
import org.apache.seata.common.ConfigurationKeys; | ||
import org.apache.seata.common.ConfigurationTestHelper; | ||
import org.apache.seata.common.XID; | ||
import org.apache.seata.common.util.NetUtil; | ||
import org.apache.seata.common.util.UUIDGenerator; | ||
import org.apache.seata.core.protocol.ProtocolConstants; | ||
import org.apache.seata.core.protocol.RpcMessage; | ||
import org.apache.seata.core.protocol.Version; | ||
import org.apache.seata.core.protocol.VersionNotSupportMessage; | ||
import org.apache.seata.core.protocol.transaction.UndoLogDeleteRequest; | ||
import org.apache.seata.core.rpc.MsgVersionHelper; | ||
import org.apache.seata.server.coordinator.DefaultCoordinator; | ||
import org.apache.seata.server.session.SessionHolder; | ||
import org.junit.jupiter.api.AfterAll; | ||
import org.junit.jupiter.api.Assertions; | ||
import org.junit.jupiter.api.BeforeAll; | ||
import org.junit.jupiter.api.Test; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.concurrent.LinkedBlockingQueue; | ||
import java.util.concurrent.ThreadPoolExecutor; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
/** | ||
* MsgVersionHelper Test | ||
**/ | ||
public class MsgVersionHelperTest { | ||
private static final Logger LOGGER = LoggerFactory.getLogger(MsgVersionHelperTest.class); | ||
|
||
@BeforeAll | ||
public static void init(){ | ||
ConfigurationTestHelper.putConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL, "8091"); | ||
} | ||
@AfterAll | ||
public static void after() { | ||
ConfigurationTestHelper.removeConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL); | ||
} | ||
|
||
public static ThreadPoolExecutor initMessageExecutor() { | ||
return new ThreadPoolExecutor(5, 5, 500, TimeUnit.SECONDS, | ||
new LinkedBlockingQueue<>(20000), new ThreadPoolExecutor.CallerRunsPolicy()); | ||
} | ||
@Test | ||
public void testSendMsgWithResponse() throws Exception { | ||
ThreadPoolExecutor workingThreads = initMessageExecutor(); | ||
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads); | ||
new Thread(() -> { | ||
SessionHolder.init(null); | ||
nettyRemotingServer.setHandler(DefaultCoordinator.getInstance(nettyRemotingServer)); | ||
// set registry | ||
XID.setIpAddress(NetUtil.getLocalIp()); | ||
XID.setPort(8091); | ||
// init snowflake for transactionId, branchId | ||
UUIDGenerator.init(1L); | ||
nettyRemotingServer.init(); | ||
}).start(); | ||
Thread.sleep(3000); | ||
|
||
String applicationId = "app 1"; | ||
String transactionServiceGroup = "default_tx_group"; | ||
TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup); | ||
tmNettyRemotingClient.init(); | ||
|
||
String serverAddress = "0.0.0.0:8091"; | ||
Channel channel = TmNettyRemotingClient.getInstance().getClientChannelManager().acquireChannel(serverAddress); | ||
|
||
RpcMessage rpcMessage = buildUndoLogDeleteMsg(ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY); | ||
Assertions.assertFalse(MsgVersionHelper.versionNotSupport(channel, rpcMessage)); | ||
TmNettyRemotingClient.getInstance().sendAsync(channel,rpcMessage); | ||
|
||
|
||
Version.putChannelVersion(channel,"0.7.0"); | ||
Assertions.assertTrue(MsgVersionHelper.versionNotSupport(channel, rpcMessage)); | ||
TmNettyRemotingClient.getInstance().sendAsync(channel,rpcMessage); | ||
Object response = TmNettyRemotingClient.getInstance().sendSync(channel, rpcMessage, 100); | ||
Assertions.assertTrue(response instanceof VersionNotSupportMessage); | ||
|
||
nettyRemotingServer.destroy(); | ||
tmNettyRemotingClient.destroy(); | ||
} | ||
|
||
private RpcMessage buildUndoLogDeleteMsg(byte messageType) { | ||
RpcMessage rpcMessage = new RpcMessage(); | ||
rpcMessage.setId(100); | ||
rpcMessage.setMessageType(messageType); | ||
rpcMessage.setCodec(ProtocolConstants.CONFIGURED_CODEC); | ||
rpcMessage.setCompressor(ProtocolConstants.CONFIGURED_COMPRESSOR); | ||
rpcMessage.setBody(new UndoLogDeleteRequest()); | ||
return rpcMessage; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not force cast.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rpcMessage.getBody() is of type Object, how do I getTypeCode without forced cast?