Skip to content

Commit

Permalink
optimize version
Browse files Browse the repository at this point in the history
  • Loading branch information
Bughue committed Mar 7, 2024
1 parent 2558ba3 commit ef146b7
Show file tree
Hide file tree
Showing 11 changed files with 54 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class RpcMessage implements Serializable {
private Map<String, String> headMap = new HashMap<>();
private Object body;

private String version;
private String sdkVersion;

/**
* Gets id.
Expand Down Expand Up @@ -171,12 +171,12 @@ public void setMessageType(byte messageType) {
this.messageType = messageType;
}

public String getVersion() {
return version;
public String getSdkVersion() {
return sdkVersion;
}

public void setVersion(String version) {
this.version = version;
public void setSdkVersion(String sdkVersion) {
this.sdkVersion = sdkVersion;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ protected RpcMessage buildRequestMessage(Object msg, byte messageType, String ve
rpcMessage.setCodec(ProtocolConstants.CONFIGURED_CODEC);
rpcMessage.setCompressor(ProtocolConstants.CONFIGURED_COMPRESSOR);
rpcMessage.setBody(msg);
rpcMessage.setVersion(version);
rpcMessage.setSdkVersion(version);
return rpcMessage;
}

Expand All @@ -258,7 +258,7 @@ protected RpcMessage buildResponseMessage(RpcMessage rpcMessage, Object msg, byt
rpcMsg.setCompressor(rpcMessage.getCompressor());
rpcMsg.setBody(msg);
rpcMsg.setId(rpcMessage.getId());
rpcMsg.setVersion(version);
rpcMsg.setSdkVersion(version);
return rpcMsg;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,12 @@ public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {
try {
if (msg instanceof RpcMessage) {
RpcMessage rpcMessage = (RpcMessage) msg;
byte version = Version.calcProtocolVersion(rpcMessage.getVersion());
ProtocolEncoder encoder = protocolEncoderMap.get(version);
String sdkVersion = rpcMessage.getSdkVersion();
//todo null?
byte protocolVersion = Version.calcProtocolVersion(sdkVersion);
ProtocolEncoder encoder = protocolEncoderMap.get(protocolVersion);
if (encoder == null) {
throw new UnsupportedOperationException("Unsupported version: " + version);
throw new UnsupportedOperationException("Unsupported protocolVersion: " + protocolVersion);
}

encoder.encode(rpcMessage, out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ public interface ProtocolRpcMessage {
*/
void rpcMsg2ProtocolMsg(RpcMessage rpcMessage);

static String getVersion(Object body) {
static String getSdkVersion(Object body) {
if (body instanceof AbstractIdentifyRequest) {
return ((AbstractIdentifyRequest) body).getVersion();
} else {
return null;
}
}

static void setVersion(Object body, String version) {
static void setSdkVersion(Object body, String version) {
if (body instanceof AbstractIdentifyRequest) {
((AbstractIdentifyRequest) body).setVersion(version);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,14 @@ public RpcMessage protocolMsg2RpcMsg() {
}
rpcMessage.setBody(this.body);
rpcMessage.setId((int) this.id);
rpcMessage.setVersion(ProtocolRpcMessage.getVersion(this.body));
rpcMessage.setSdkVersion(ProtocolRpcMessage.getSdkVersion(this.body));
return rpcMessage;
}

@Override
public void rpcMsg2ProtocolMsg(RpcMessage rpcMessage) {
this.body = rpcMessage.getBody();
ProtocolRpcMessage.setVersion(this.body, rpcMessage.getVersion());
ProtocolRpcMessage.setSdkVersion(this.body, rpcMessage.getSdkVersion());
this.id = rpcMessage.getId();
this.isRequest = isRequest(rpcMessage.getMessageType());
this.isHeartbeat = isHeartbeat(rpcMessage.getMessageType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,15 @@ public RpcMessage protocolMsg2RpcMsg() {
rpcMessage.setCompressor(this.compressor);
rpcMessage.setHeadMap(this.headMap);
rpcMessage.setBody(this.body);
rpcMessage.setVersion(ProtocolRpcMessage.getVersion(this.body));
rpcMessage.setSdkVersion(ProtocolRpcMessage.getSdkVersion(this.body));
return rpcMessage;
}


@Override
public void rpcMsg2ProtocolMsg(RpcMessage rpcMessage) {
this.body = rpcMessage.getBody();
ProtocolRpcMessage.setVersion(this.body, rpcMessage.getVersion());
ProtocolRpcMessage.setSdkVersion(this.body, rpcMessage.getSdkVersion());
this.headMap = rpcMessage.getHeadMap();
this.id = rpcMessage.getId();
this.messageType = rpcMessage.getMessageType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private void onRegTmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), response);
if (isSuccess && LOGGER.isInfoEnabled()) {
LOGGER.info("TM register success,message:{},channel:{},client version:{},client protocol-version:{}"
, message, ctx.channel(), message.getVersion(), rpcMessage.getVersion());
, message, ctx.channel(), message.getVersion(), rpcMessage.getSdkVersion());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,17 @@ public class ServerOnRequestProcessor implements RemotingProcessor, Disposable {
private static final long KEEP_ALIVE_TIME = Integer.MAX_VALUE;
private static final String BATCH_RESPONSE_THREAD_PREFIX = "rpcBatchResponse";
private static final boolean PARALLEL_REQUEST_HANDLE =
ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.ENABLE_PARALLEL_REQUEST_HANDLE_KEY, true);
ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.ENABLE_PARALLEL_REQUEST_HANDLE_KEY, true);

public ServerOnRequestProcessor(RemotingServer remotingServer, TransactionMessageHandler transactionMessageHandler) {
this.remotingServer = remotingServer;
this.transactionMessageHandler = transactionMessageHandler;
if (NettyServerConfig.isEnableTcServerBatchSendResponse()) {
batchResponseExecutorService = new ThreadPoolExecutor(MAX_BATCH_RESPONSE_THREAD,
MAX_BATCH_RESPONSE_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(BATCH_RESPONSE_THREAD_PREFIX, MAX_BATCH_RESPONSE_THREAD));
MAX_BATCH_RESPONSE_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(BATCH_RESPONSE_THREAD_PREFIX, MAX_BATCH_RESPONSE_THREAD));
batchResponseExecutorService.submit(new BatchResponseRunnable());
}
}
Expand Down Expand Up @@ -153,33 +153,33 @@ private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage)
// the batch send request message
if (message instanceof MergedWarpMessage) {
if (NettyServerConfig.isEnableTcServerBatchSendResponse() && StringUtils.isNotBlank(rpcContext.getVersion())
&& Version.isAboveOrEqualVersion150(rpcContext.getVersion())) {
List<AbstractMessage> msgs = ((MergedWarpMessage)message).msgs;
List<Integer> msgIds = ((MergedWarpMessage)message).msgIds;
&& Version.isAboveOrEqualVersion150(rpcContext.getVersion())) {
List<AbstractMessage> msgs = ((MergedWarpMessage) message).msgs;
List<Integer> msgIds = ((MergedWarpMessage) message).msgIds;
for (int i = 0; i < msgs.size(); i++) {
AbstractMessage msg = msgs.get(i);
int msgId = msgIds.get(i);
if (PARALLEL_REQUEST_HANDLE) {
CompletableFuture.runAsync(
() -> handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext));
() -> handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext));
} else {
handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext);
}
}
} else {
List<AbstractResultMessage> results = new ArrayList<>();
List<CompletableFuture<AbstractResultMessage>> completableFutures = null;
for (int i = 0; i < ((MergedWarpMessage)message).msgs.size(); i++) {
for (int i = 0; i < ((MergedWarpMessage) message).msgs.size(); i++) {
if (PARALLEL_REQUEST_HANDLE) {
if (completableFutures == null) {
completableFutures = new ArrayList<>();
}
int finalI = i;
completableFutures.add(CompletableFuture.supplyAsync(() -> handleRequestsByMergedWarpMessage(
((MergedWarpMessage)message).msgs.get(finalI), rpcContext)));
((MergedWarpMessage) message).msgs.get(finalI), rpcContext)));
} else {
results.add(i,
handleRequestsByMergedWarpMessage(((MergedWarpMessage)message).msgs.get(i), rpcContext));
handleRequestsByMergedWarpMessage(((MergedWarpMessage) message).msgs.get(i), rpcContext));
}
}
if (CollectionUtils.isNotEmpty(completableFutures)) {
Expand All @@ -200,14 +200,14 @@ private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage)
final AbstractMessage msg = (AbstractMessage) message;
if (LOGGER.isInfoEnabled()) {
String receiveMsgLog = String.format("receive msg[single]: %s, clientIp: %s, vgroup: %s", message,
NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
BatchLogHandler.INSTANCE.writeLog(receiveMsgLog);
}
AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
if (LOGGER.isInfoEnabled()) {
String resultMsgLog = String.format("result msg[single]: %s, clientIp: %s, vgroup: %s", result,
NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
BatchLogHandler.INSTANCE.writeLog(resultMsgLog);
}
}
Expand All @@ -229,7 +229,7 @@ private void offerMsg(BlockingQueue<QueueItem> msgQueue, RpcMessage rpcMessage,
AbstractResultMessage resultMessage, int msgId, Channel channel) {
if (!msgQueue.offer(new QueueItem(resultMessage, msgId, rpcMessage))) {
LOGGER.error("put message into basketMap offer failed, channel:{},rpcMessage:{},resultMessage:{}",
channel, rpcMessage, resultMessage);
channel, rpcMessage, resultMessage);
}
}

Expand Down Expand Up @@ -261,14 +261,15 @@ public void run() {
while (!msgQueue.isEmpty()) {
QueueItem item = msgQueue.poll();
BatchResultMessage batchResultMessage = CollectionUtils.computeIfAbsent(batchResultMessageMap,
new ClientRequestRpcInfo(item.getRpcMessage()),
key -> new BatchResultMessage());
new ClientRequestRpcInfo(item.getRpcMessage()),
key -> new BatchResultMessage());
batchResultMessage.getResultMessages().add(item.getResultMessage());
batchResultMessage.getMsgIds().add(item.getMsgId());
}
RpcContext rpcContext = ChannelManager.getContextFromIdentified(channel);
batchResultMessageMap.forEach((clientRequestRpcInfo, batchResultMessage) ->
remotingServer.sendAsyncResponse(buildRpcMessage(clientRequestRpcInfo),
channel, batchResultMessage));
remotingServer.sendAsyncResponse(buildRpcMessage(clientRequestRpcInfo, rpcContext.getVersion()),
channel, batchResultMessage));
});
isResponding = false;
}
Expand All @@ -277,36 +278,38 @@ public void run() {

/**
* handle rpc request message
*
* @param rpcContext rpcContext
*/
private AbstractResultMessage handleRequestsByMergedWarpMessage(AbstractMessage subMessage, RpcContext rpcContext) {
if (LOGGER.isInfoEnabled()) {
String receiveMsgLog = String.format("receive msg[merged]: %s, clientIp: %s, vgroup: %s", subMessage,
NetUtil.toIpAddress(rpcContext.getChannel().remoteAddress()), rpcContext.getTransactionServiceGroup());
NetUtil.toIpAddress(rpcContext.getChannel().remoteAddress()), rpcContext.getTransactionServiceGroup());
BatchLogHandler.INSTANCE.writeLog(receiveMsgLog);
}
AbstractResultMessage resultMessage = transactionMessageHandler.onRequest(subMessage, rpcContext);
if (LOGGER.isInfoEnabled()) {
String resultMsgLog = String.format("result msg[merged]: %s, clientIp: %s, vgroup: %s", resultMessage,
NetUtil.toIpAddress(rpcContext.getChannel().remoteAddress()), rpcContext.getTransactionServiceGroup());
NetUtil.toIpAddress(rpcContext.getChannel().remoteAddress()), rpcContext.getTransactionServiceGroup());
BatchLogHandler.INSTANCE.writeLog(resultMsgLog);
}
return resultMessage;
}

/**
* handle rpc request message
* @param msg msg
* @param msgId msgId
*
* @param msg msg
* @param msgId msgId
* @param rpcMessage rpcMessage
* @param ctx ctx
* @param ctx ctx
* @param rpcContext rpcContext
*/
private void handleRequestsByMergedWarpMessageBy150(AbstractMessage msg, int msgId, RpcMessage rpcMessage,
ChannelHandlerContext ctx, RpcContext rpcContext) {
ChannelHandlerContext ctx, RpcContext rpcContext) {
if (LOGGER.isInfoEnabled()) {
String receiveMsgLog = String.format("receive msg[merged]: %s, clientIp: %s, vgroup: %s", msg,
NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
BatchLogHandler.INSTANCE.writeLog(receiveMsgLog);
}
AbstractResultMessage resultMessage = transactionMessageHandler.onRequest(msg, rpcContext);
Expand All @@ -315,7 +318,7 @@ private void handleRequestsByMergedWarpMessageBy150(AbstractMessage msg, int msg
notifyBatchRespondingThread();
if (LOGGER.isInfoEnabled()) {
String resultMsgLog = String.format("result msg[merged]: %s, clientIp: %s, vgroup: %s", resultMessage,
NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
BatchLogHandler.INSTANCE.writeLog(resultMsgLog);
}
}
Expand All @@ -324,14 +327,16 @@ private void handleRequestsByMergedWarpMessageBy150(AbstractMessage msg, int msg
* build RpcMessage
*
* @param clientRequestRpcInfo For saving client request rpc info
* @param version version
* @return rpcMessage
*/
private RpcMessage buildRpcMessage(ClientRequestRpcInfo clientRequestRpcInfo) {
private RpcMessage buildRpcMessage(ClientRequestRpcInfo clientRequestRpcInfo, String version) {
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setId(clientRequestRpcInfo.getRpcMessageId());
rpcMessage.setCodec(clientRequestRpcInfo.getCodec());
rpcMessage.setCompressor(clientRequestRpcInfo.getCompressor());
rpcMessage.setHeadMap(clientRequestRpcInfo.getHeadMap());
rpcMessage.setSdkVersion(version);
return rpcMessage;
}

Expand Down Expand Up @@ -413,7 +418,7 @@ public boolean equals(Object o) {
}
ClientRequestRpcInfo that = (ClientRequestRpcInfo) o;
return rpcMessageId == that.rpcMessageId && codec == that.codec
&& compressor == that.compressor && headMap.equals(that.headMap);
&& compressor == that.compressor && headMap.equals(that.headMap);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package org.apache.seata.core.rpc.netty.mockserver;

import io.netty.channel.Channel;
import org.apache.seata.common.ConfigurationKeys;
import org.apache.seata.common.ConfigurationTestHelper;
import org.apache.seata.core.context.RootContext;
import org.apache.seata.core.exception.TransactionException;
import org.apache.seata.core.model.BranchStatus;
Expand All @@ -27,13 +25,9 @@
import org.apache.seata.core.rpc.netty.ChannelManagerTestHelper;
import org.apache.seata.core.rpc.netty.RmNettyRemotingClient;
import org.apache.seata.integration.tx.api.interceptor.parser.DefaultResourceRegisterParser;
import org.apache.seata.mockserver.MockServer;
import org.apache.seata.rm.DefaultResourceManager;
import org.apache.seata.rm.RMClient;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,15 @@
package org.apache.seata.core.rpc.netty.mockserver;

import io.netty.channel.Channel;
import org.apache.seata.common.ConfigurationKeys;
import org.apache.seata.common.ConfigurationTestHelper;
import org.apache.seata.core.model.GlobalStatus;
import org.apache.seata.core.model.TransactionManager;
import org.apache.seata.core.protocol.ResultCode;
import org.apache.seata.core.rpc.netty.ChannelManagerTestHelper;
import org.apache.seata.core.rpc.netty.TmNettyRemotingClient;
import org.apache.seata.mockserver.MockCoordinator;
import org.apache.seata.mockserver.MockServer;
import org.apache.seata.tm.DefaultTransactionManager;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public Future sendRpc(Map<String, String> head, Object body) {
rpcMessage.setBody(body);
rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_RESQUEST_SYNC);


if (channel != null) {
DefaultPromise promise = new DefaultPromise(defaultEventExecutor);
futureMap.put(msgId, promise);
Expand Down

0 comments on commit ef146b7

Please sign in to comment.