diff --git a/core/src/main/java/org/apache/seata/core/protocol/ProtocolConstants.java b/core/src/main/java/org/apache/seata/core/protocol/ProtocolConstants.java index 4b2b7ef8700..f279c30aaad 100644 --- a/core/src/main/java/org/apache/seata/core/protocol/ProtocolConstants.java +++ b/core/src/main/java/org/apache/seata/core/protocol/ProtocolConstants.java @@ -38,14 +38,19 @@ public interface ProtocolConstants { byte VERSION_0 = 0; /** - * Protocol version + * Protocol version 1 */ byte VERSION_1 = 1; + /** + * Protocol version 2 + */ + byte VERSION_2 = 2; + /** * Protocol version */ - byte VERSION = VERSION_1; + byte VERSION = VERSION_2; /** * Max frame length diff --git a/core/src/main/java/org/apache/seata/core/protocol/RegisterRMResponse.java b/core/src/main/java/org/apache/seata/core/protocol/RegisterRMResponse.java index 7dbce406cd3..c068c38835d 100644 --- a/core/src/main/java/org/apache/seata/core/protocol/RegisterRMResponse.java +++ b/core/src/main/java/org/apache/seata/core/protocol/RegisterRMResponse.java @@ -39,6 +39,7 @@ public RegisterRMResponse() { public RegisterRMResponse(boolean result) { super(); setIdentified(result); + setResultCode(result ? ResultCode.Success : ResultCode.Failed); } @Override diff --git a/core/src/main/java/org/apache/seata/core/protocol/RegisterTMResponse.java b/core/src/main/java/org/apache/seata/core/protocol/RegisterTMResponse.java index 0158a9b83e2..c5ae0f10604 100644 --- a/core/src/main/java/org/apache/seata/core/protocol/RegisterTMResponse.java +++ b/core/src/main/java/org/apache/seata/core/protocol/RegisterTMResponse.java @@ -39,6 +39,7 @@ public RegisterTMResponse() { public RegisterTMResponse(boolean result) { super(); setIdentified(result); + setResultCode(result ? ResultCode.Success : ResultCode.Failed); } @Override diff --git a/core/src/main/java/org/apache/seata/core/protocol/Version.java b/core/src/main/java/org/apache/seata/core/protocol/Version.java index f32bb163a69..abbc631453e 100644 --- a/core/src/main/java/org/apache/seata/core/protocol/Version.java +++ b/core/src/main/java/org/apache/seata/core/protocol/Version.java @@ -40,6 +40,9 @@ public class Version { private static final String VERSION_0_7_1 = "0.7.1"; private static final String VERSION_1_5_0 = "1.5.0"; private static final String VERSION_2_3_0 = "2.3.0"; + + public static final String VERSION_0_7_0 = "0.7.0"; + private static final int MAX_VERSION_DOT = 3; /** @@ -94,6 +97,10 @@ public static boolean isAboveOrEqualVersion230(String version) { return isAboveOrEqualVersion(version, VERSION_2_3_0); } + public static boolean isAboveOrEqualVersion071(String version) { + return isAboveOrEqualVersion(version, VERSION_0_7_1); + } + public static boolean isAboveOrEqualVersion(String clientVersion, String divideVersion) { boolean isAboveOrEqualVersion = false; try { diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/MultiProtocolDecoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/MultiProtocolDecoder.java index 9bd95503697..49db6b3db95 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/MultiProtocolDecoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/MultiProtocolDecoder.java @@ -27,6 +27,8 @@ import org.apache.seata.core.rpc.netty.v0.ProtocolEncoderV0; import org.apache.seata.core.rpc.netty.v1.ProtocolDecoderV1; import org.apache.seata.core.rpc.netty.v1.ProtocolEncoderV1; +import org.apache.seata.core.rpc.netty.v2.ProtocolDecoderV2; +import org.apache.seata.core.rpc.netty.v2.ProtocolEncoderV2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,12 +85,16 @@ int lengthFieldLength, FullLength is int(4B). so values is 4 int initialBytesToStrip we will check magic code and version self, so do not strip any bytes. so values is 0 */ super(maxFrameLength, 3, 4, -7, 0); - this.protocolDecoderMap = - ImmutableMap.builder().put(ProtocolConstants.VERSION_0, new ProtocolDecoderV0()) - .put(ProtocolConstants.VERSION_1, new ProtocolDecoderV1()).build(); - this.protocolEncoderMap = - ImmutableMap.builder().put(ProtocolConstants.VERSION_0, new ProtocolEncoderV0()) - .put(ProtocolConstants.VERSION_1, new ProtocolEncoderV1()).build(); + this.protocolDecoderMap = ImmutableMap.builder() + .put(ProtocolConstants.VERSION_0, new ProtocolDecoderV0()) + .put(ProtocolConstants.VERSION_1, new ProtocolDecoderV1()) + .put(ProtocolConstants.VERSION_2, new ProtocolDecoderV2()) + .build(); + this.protocolEncoderMap =ImmutableMap.builder() + .put(ProtocolConstants.VERSION_0, new ProtocolEncoderV0()) + .put(ProtocolConstants.VERSION_1, new ProtocolEncoderV1()) + .put(ProtocolConstants.VERSION_2, new ProtocolEncoderV2()) + .build(); this.channelHandlers = channelHandlers; } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDecoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDecoder.java index d28506fd841..9d2cb828574 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDecoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDecoder.java @@ -27,4 +27,5 @@ public interface ProtocolDecoder { RpcMessage decodeFrame(ByteBuf in); + byte protocolVersion(); } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolEncoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolEncoder.java index 6c91164fff3..09363cc880b 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolEncoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolEncoder.java @@ -25,4 +25,6 @@ **/ public interface ProtocolEncoder { void encode(RpcMessage rpcMessage, ByteBuf out); + + byte protocolVersion(); } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java index 8c28ad96c2b..c9195f1793e 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java @@ -130,7 +130,7 @@ public RpcMessage decodeFrame(ByteBuf in) { bs2[1] = (byte) (0x00FF & typeCode); System.arraycopy(bs, 0, bs2, 2, length); byte codecType = isSeataCodec ? SerializerType.SEATA.getCode() : SerializerType.HESSIAN.getCode(); - Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(codecType), ProtocolConstants.VERSION_0); + Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(codecType), protocolVersion()); rpcMessage.setBody(serializer.deserialize(bs2)); } catch (Exception e) { LOGGER.error("decode error", e); @@ -151,4 +151,9 @@ protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception throw new DecodeException(exx); } } + + @Override + public byte protocolVersion(){ + return ProtocolConstants.VERSION_0; + } } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolEncoderV0.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolEncoderV0.java index f217a843294..15a6f21c9ad 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolEncoderV0.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolEncoderV0.java @@ -82,7 +82,7 @@ public void encode(RpcMessage message, ByteBuf out) { } byte[] bodyBytes = null; - Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(codec), ProtocolConstants.VERSION_0); + Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(codec), protocolVersion()); bodyBytes = serializer.serialize(msg.getBody()); if (msg.isSeataCodec()) { @@ -118,4 +118,9 @@ protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws LOGGER.error("Encode request error!", e); } } + + @Override + public byte protocolVersion(){ + return ProtocolConstants.VERSION_0; + } } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java index ce48b3ce8ca..1eac5905b66 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java @@ -160,4 +160,8 @@ protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception return decoded; } + @Override + public byte protocolVersion(){ + return ProtocolConstants.VERSION_1; + } } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolEncoderV1.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolEncoderV1.java index 39180f3bdc0..34185022866 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolEncoderV1.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolEncoderV1.java @@ -74,7 +74,7 @@ public void encode(RpcMessage message, ByteBuf out) { byte messageType = rpcMessage.getMessageType(); out.writeBytes(ProtocolConstants.MAGIC_CODE_BYTES); - out.writeByte(ProtocolConstants.VERSION_1); + out.writeByte(protocolVersion()); // full Length(4B) and head length(2B) will fix in the end. out.writerIndex(out.writerIndex() + 6); out.writeByte(messageType); @@ -94,7 +94,7 @@ public void encode(RpcMessage message, ByteBuf out) { if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST && messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) { // heartbeat has no body - Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()), ProtocolConstants.VERSION_1); + Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()), protocolVersion()); bodyBytes = serializer.serialize(rpcMessage.getBody()); Compressor compressor = CompressorFactory.getCompressor(rpcMessage.getCompressor()); bodyBytes = compressor.compress(bodyBytes); @@ -134,4 +134,8 @@ protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws } } + @Override + public byte protocolVersion(){ + return ProtocolConstants.VERSION_1; + } } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v2/ProtocolDecoderV2.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v2/ProtocolDecoderV2.java new file mode 100644 index 00000000000..fbfa5028cc4 --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v2/ProtocolDecoderV2.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty.v2; + +import org.apache.seata.core.protocol.ProtocolConstants; +import org.apache.seata.core.rpc.netty.v1.ProtocolDecoderV1; + +/** + * Decoder of protocol-v2 + **/ +public class ProtocolDecoderV2 extends ProtocolDecoderV1 { + + @Override + public byte protocolVersion(){ + return ProtocolConstants.VERSION_1; + } +} diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v2/ProtocolEncoderV2.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v2/ProtocolEncoderV2.java new file mode 100644 index 00000000000..7f091eb33e7 --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v2/ProtocolEncoderV2.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty.v2; + +import org.apache.seata.core.protocol.ProtocolConstants; +import org.apache.seata.core.rpc.netty.v1.ProtocolEncoderV1; + +/** + * Encoder of protocol-v2 + **/ +public class ProtocolEncoderV2 extends ProtocolEncoderV1 { + @Override + public byte protocolVersion(){ + return ProtocolConstants.VERSION_1; + } +} diff --git a/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegRmProcessor.java b/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegRmProcessor.java index 622f30039fa..7a976dacf36 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegRmProcessor.java +++ b/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegRmProcessor.java @@ -74,6 +74,7 @@ private void onRegRmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) { if (LOGGER.isWarnEnabled()) { LOGGER.warn("RM checkAuth for client:{},vgroup:{},applicationId:{} is FAIL", ipAndPort, message.getTransactionServiceGroup(), message.getApplicationId()); } + errorInfo = "RM checkAuth fail"; } } catch (Exception exx) { isSuccess = false; diff --git a/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegTmProcessor.java b/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegTmProcessor.java index 6090232c6c4..a76445005b3 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegTmProcessor.java +++ b/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegTmProcessor.java @@ -77,6 +77,7 @@ private void onRegTmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) { LOGGER.warn("TM checkAuth for client:{},vgroup:{},applicationId:{} is FAIL", ipAndPort, message.getTransactionServiceGroup(), message.getApplicationId()); } + errorInfo = "TM checkAuth fail"; } } catch (Exception exx) { isSuccess = false; diff --git a/mock-server/src/main/java/org/apache/seata/mockserver/processor/MockRegisterProcessor.java b/mock-server/src/main/java/org/apache/seata/mockserver/processor/MockRegisterProcessor.java index d9c4aaa29e3..0a3e8567695 100644 --- a/mock-server/src/main/java/org/apache/seata/mockserver/processor/MockRegisterProcessor.java +++ b/mock-server/src/main/java/org/apache/seata/mockserver/processor/MockRegisterProcessor.java @@ -17,6 +17,8 @@ package org.apache.seata.mockserver.processor; import io.netty.channel.ChannelHandlerContext; +import org.apache.commons.lang.StringUtils; +import org.apache.seata.core.protocol.AbstractResultMessage; import org.apache.seata.core.protocol.RegisterRMRequest; import org.apache.seata.core.protocol.RegisterRMResponse; import org.apache.seata.core.protocol.RegisterTMRequest; @@ -49,27 +51,31 @@ public MockRegisterProcessor(RemotingServer remotingServer, Role role) { @Override public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { - if (role == Role.TM) { - RegisterTMRequest message = (RegisterTMRequest) rpcMessage.getBody(); - LOGGER.info("message = " + message); - - ChannelManager.registerTMChannel(message, ctx.channel()); - Version.putChannelVersion(ctx.channel(), message.getVersion()); - - RegisterTMResponse resp = new RegisterTMResponse(); - remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resp); - LOGGER.info("sendAsyncResponse: {}", resp); - } else if (role == Role.RM) { - RegisterRMRequest message = (RegisterRMRequest) rpcMessage.getBody(); - LOGGER.info("message = " + message); - - ChannelManager.registerRMChannel(message, ctx.channel()); - Version.putChannelVersion(ctx.channel(), message.getVersion()); - - RegisterRMResponse resp = new RegisterRMResponse(); - remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resp); - LOGGER.info("sendAsyncResponse: {}", resp); + String errorInfo = StringUtils.EMPTY; + AbstractResultMessage response = null; + try{ + if (role == Role.TM) { + RegisterTMRequest message = (RegisterTMRequest) rpcMessage.getBody(); + LOGGER.info("reg message = " + message); + ChannelManager.registerTMChannel(message, ctx.channel()); + Version.putChannelVersion(ctx.channel(), message.getVersion()); + response = new RegisterTMResponse(); + } else if (role == Role.RM) { + RegisterRMRequest message = (RegisterRMRequest) rpcMessage.getBody(); + LOGGER.info("reg message = " + message); + ChannelManager.registerRMChannel(message, ctx.channel()); + Version.putChannelVersion(ctx.channel(), message.getVersion()); + response = new RegisterRMResponse(); + } + }catch (Exception e){ + errorInfo = e.getMessage(); + LOGGER.error(role +" register fail, error message:{}", errorInfo); + } + if (StringUtils.isNotEmpty(errorInfo)) { + response.setMsg(errorInfo); } + remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), response); + LOGGER.info("sendAsyncResponse: {}", response); } diff --git a/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/MessageCodecFactory.java b/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/MessageCodecFactory.java index 6231c72bbde..b17a24effd1 100644 --- a/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/MessageCodecFactory.java +++ b/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/MessageCodecFactory.java @@ -19,6 +19,7 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import org.apache.seata.core.protocol.ProtocolConstants; import org.apache.seata.serializer.seata.protocol.BatchResultMessageCodec; import org.apache.seata.serializer.seata.protocol.MergeResultMessageCodec; import org.apache.seata.serializer.seata.protocol.MergedWarpMessageCodec; @@ -77,6 +78,8 @@ import org.apache.seata.core.protocol.transaction.GlobalStatusRequest; import org.apache.seata.core.protocol.transaction.GlobalStatusResponse; import org.apache.seata.core.protocol.transaction.UndoLogDeleteRequest; +import org.apache.seata.serializer.seata.protocol.v2.RegisterRMResponseCodecV2; +import org.apache.seata.serializer.seata.protocol.v2.RegisterTMResponseCodecV2; /** * The type Message codec factory. @@ -117,13 +120,21 @@ public static MessageSeataCodec getMessageCodec(short typeCode, byte version) { msgCodec = new RegisterTMRequestCodec(); break; case MessageType.TYPE_REG_CLT_RESULT: - msgCodec = new RegisterTMResponseCodec(); + if (version == ProtocolConstants.VERSION_2) { + msgCodec = new RegisterTMResponseCodecV2(); + } else { + msgCodec = new RegisterTMResponseCodec(); + } break; case MessageType.TYPE_REG_RM: msgCodec = new RegisterRMRequestCodec(); break; case MessageType.TYPE_REG_RM_RESULT: - msgCodec = new RegisterRMResponseCodec(); + if (version == ProtocolConstants.VERSION_2) { + msgCodec = new RegisterRMResponseCodecV2(); + } else { + msgCodec = new RegisterRMResponseCodec(); + } break; case MessageType.TYPE_BRANCH_COMMIT: msgCodec = new BranchCommitRequestCodec(); diff --git a/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/MultiVersionCodec.java b/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/MultiVersionCodec.java new file mode 100644 index 00000000000..2374f0d0ecf --- /dev/null +++ b/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/MultiVersionCodec.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.serializer.seata; + + +import java.util.Map; + +/** + * interface MultiVersionCodec + */ +public interface MultiVersionCodec { + + Map oldVersionCodec(); + + /** + * version range (begin, end] + */ + class VersionRange { + private String begin; + private String end; + + public VersionRange(String begin, String end) { + this.begin = begin; + this.end = end; + } + + public VersionRange(String end) { + this.begin = "0"; + this.end = end; + } + + public String getBegin() { + return begin; + } + + public String getEnd() { + return end; + } + } +} diff --git a/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/MultiVersionCodecHelper.java b/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/MultiVersionCodecHelper.java new file mode 100644 index 00000000000..a560e265e66 --- /dev/null +++ b/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/MultiVersionCodecHelper.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.serializer.seata; + +import org.apache.seata.core.protocol.IncompatibleVersionException; +import org.apache.seata.core.protocol.Version; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * the type MultiVersionCodecHelper + * + **/ +public class MultiVersionCodecHelper { + + private static final Logger LOGGER = LoggerFactory.getLogger(MultiVersionCodecHelper.class); + + public static MessageSeataCodec match(String v, MessageSeataCodec messageCodec) { + try { + if (!(messageCodec instanceof MultiVersionCodec)) { + return null; + } + Map map = ((MultiVersionCodec) messageCodec).oldVersionCodec(); + long version = Version.convertVersion(v); + for (MultiVersionCodec.VersionRange range : map.keySet()) { + if (version > Version.convertVersion(range.getBegin()) && + version <= Version.convertVersion(range.getEnd())) { + return map.get(version); + } + } + } catch (IncompatibleVersionException e) { + LOGGER.error("match version error", e); + } + return null; + } +} diff --git a/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/SeataSerializer.java b/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/SeataSerializer.java index 81730db3d24..e1fb09d5154 100644 --- a/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/SeataSerializer.java +++ b/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/SeataSerializer.java @@ -24,6 +24,8 @@ import org.apache.seata.core.protocol.AbstractMessage; import org.apache.seata.core.protocol.ProtocolConstants; import org.apache.seata.core.serializer.Serializer; +import org.apache.seata.serializer.seata.serializer.SeataSerializerV1; +import org.apache.seata.serializer.seata.serializer.SeataSerializerV2; import java.nio.ByteBuffer; @@ -39,6 +41,8 @@ public SeataSerializer(Byte version) { versionSeataSerializer = SeataSerializerV0.getInstance(); } else if (version == ProtocolConstants.VERSION_1) { versionSeataSerializer = SeataSerializerV1.getInstance(); + } else if (version == ProtocolConstants.VERSION_2) { + versionSeataSerializer = SeataSerializerV2.getInstance(); } if (versionSeataSerializer == null) { throw new UnsupportedOperationException("version is not supported"); @@ -56,59 +60,7 @@ public T deserialize(byte[] bytes) { } - static class SeataSerializerV1 implements Serializer { - private static volatile SeataSerializerV1 instance; - - private SeataSerializerV1() { - } - - public static SeataSerializerV1 getInstance() { - if (instance == null) { - synchronized (SeataSerializerV1.class) { - if (instance == null) { - instance = new SeataSerializerV1(); - } - } - } - return instance; - } - - @Override - public byte[] serialize(T t) { - if (!(t instanceof AbstractMessage)) { - throw new IllegalArgumentException("AbstractMessage isn't available."); - } - AbstractMessage abstractMessage = (AbstractMessage) t; - //type code - short typecode = abstractMessage.getTypeCode(); - //msg codec - MessageSeataCodec messageCodec = MessageCodecFactory.getMessageCodec(typecode, ProtocolConstants.VERSION_1); - //get empty ByteBuffer - ByteBuf out = Unpooled.buffer(1024); - //msg encode - messageCodec.encode(t, out); - byte[] body = new byte[out.readableBytes()]; - out.readBytes(body); - - ByteBuffer byteBuffer; - - //typecode + body - byteBuffer = ByteBuffer.allocate(2 + body.length); - byteBuffer.putShort(typecode); - byteBuffer.put(body); - - BufferUtils.flip(byteBuffer); - byte[] content = new byte[byteBuffer.limit()]; - byteBuffer.get(content); - return content; - } - - @Override - public T deserialize(byte[] bytes) { - return deserializeByVersion(bytes, ProtocolConstants.VERSION_1); - } - } static class SeataSerializerV0 implements Serializer { private static volatile SeataSerializerV0 instance; @@ -162,7 +114,7 @@ public T deserialize(byte[] bytes) { } - private static T deserializeByVersion(byte[] bytes, byte version) { + public static T deserializeByVersion(byte[] bytes, byte version) { if (bytes == null || bytes.length == 0) { throw new IllegalArgumentException("Nothing to decode."); } diff --git a/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/protocol/v2/AbstractIdentifyResponseCodecV2.java b/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/protocol/v2/AbstractIdentifyResponseCodecV2.java new file mode 100644 index 00000000000..c74d9bea35f --- /dev/null +++ b/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/protocol/v2/AbstractIdentifyResponseCodecV2.java @@ -0,0 +1,55 @@ +package org.apache.seata.serializer.seata.protocol.v2; + +import io.netty.buffer.ByteBuf; +import org.apache.seata.core.protocol.AbstractIdentifyResponse; +import org.apache.seata.serializer.seata.protocol.AbstractResultMessageCodec; + +import java.nio.ByteBuffer; + +/** + * The type Abstract identify request codec.(v2) + **/ +public class AbstractIdentifyResponseCodecV2 extends AbstractResultMessageCodec { + @Override + public Class getMessageClassType() { + return AbstractIdentifyResponse.class; + } + + @Override + public void encode(T t, ByteBuf out) { + super.encode(t, out); + AbstractIdentifyResponse abstractIdentifyResponse = (AbstractIdentifyResponse) t; + boolean identified = abstractIdentifyResponse.isIdentified(); + String version = abstractIdentifyResponse.getVersion(); + + out.writeByte(identified ? (byte) 1 : (byte) 0); + if (version != null) { + byte[] bs = version.getBytes(UTF8); + out.writeShort((short) bs.length); + if (bs.length > 0) { + out.writeBytes(bs); + } + } else { + out.writeShort((short) 0); + } + } + + @Override + public void decode(T t, ByteBuffer in) { + super.decode(t, in); + AbstractIdentifyResponse abstractIdentifyResponse = (AbstractIdentifyResponse) t; + + abstractIdentifyResponse.setIdentified(in.get() == 1); + short len = in.getShort(); + if (len <= 0) { + return; + } + if (in.remaining() < len) { + return; + } + byte[] bs = new byte[len]; + in.get(bs); + abstractIdentifyResponse.setVersion(new String(bs, UTF8)); + } + +} diff --git a/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/protocol/v2/RegisterRMResponseCodecV2.java b/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/protocol/v2/RegisterRMResponseCodecV2.java new file mode 100644 index 00000000000..6f9b1c2f1dd --- /dev/null +++ b/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/protocol/v2/RegisterRMResponseCodecV2.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.serializer.seata.protocol.v2; + +import org.apache.seata.core.protocol.RegisterRMResponse; + +/** + * The type Register rm response codec.(v2) + */ +public class RegisterRMResponseCodecV2 extends AbstractIdentifyResponseCodecV2 { + + @Override + public Class getMessageClassType() { + return RegisterRMResponse.class; + } + + +} diff --git a/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/protocol/v2/RegisterTMResponseCodecV2.java b/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/protocol/v2/RegisterTMResponseCodecV2.java new file mode 100644 index 00000000000..17a238d2f30 --- /dev/null +++ b/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/protocol/v2/RegisterTMResponseCodecV2.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.serializer.seata.protocol.v2; + + +import org.apache.seata.core.protocol.RegisterTMResponse; + +/** + * The type Register tm response codec.(v2) + */ +public class RegisterTMResponseCodecV2 extends AbstractIdentifyResponseCodecV2 { + + @Override + public Class getMessageClassType() { + return RegisterTMResponse.class; + } + +} diff --git a/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/serializer/SeataSerializerV1.java b/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/serializer/SeataSerializerV1.java new file mode 100644 index 00000000000..46d5b43a712 --- /dev/null +++ b/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/serializer/SeataSerializerV1.java @@ -0,0 +1,74 @@ +package org.apache.seata.serializer.seata.serializer; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.apache.seata.common.util.BufferUtils; +import org.apache.seata.core.protocol.AbstractMessage; +import org.apache.seata.core.protocol.ProtocolConstants; +import org.apache.seata.core.serializer.Serializer; +import org.apache.seata.serializer.seata.MessageCodecFactory; +import org.apache.seata.serializer.seata.MessageSeataCodec; +import org.apache.seata.serializer.seata.SeataSerializer; + +import java.nio.ByteBuffer; + +/** + * SeataSerializer of V1 + **/ +public class SeataSerializerV1 implements Serializer { + + private static volatile SeataSerializerV1 instance; + + protected SeataSerializerV1() { + } + + public static SeataSerializerV1 getInstance() { + if (instance == null) { + synchronized (SeataSerializerV1.class) { + if (instance == null) { + instance = new SeataSerializerV1(); + } + } + } + return instance; + } + + @Override + public byte[] serialize(T t) { + if (!(t instanceof AbstractMessage)) { + throw new IllegalArgumentException("AbstractMessage isn't available."); + } + AbstractMessage abstractMessage = (AbstractMessage) t; + //type code + short typecode = abstractMessage.getTypeCode(); + //msg codec + MessageSeataCodec messageCodec = MessageCodecFactory.getMessageCodec(typecode, protocolVersion()); + //get empty ByteBuffer + ByteBuf out = Unpooled.buffer(1024); + //msg encode + messageCodec.encode(t, out); + byte[] body = new byte[out.readableBytes()]; + out.readBytes(body); + + ByteBuffer byteBuffer; + + //typecode + body + byteBuffer = ByteBuffer.allocate(2 + body.length); + byteBuffer.putShort(typecode); + byteBuffer.put(body); + + BufferUtils.flip(byteBuffer); + byte[] content = new byte[byteBuffer.limit()]; + byteBuffer.get(content); + return content; + } + + @Override + public T deserialize(byte[] bytes) { + return SeataSerializer.deserializeByVersion(bytes, protocolVersion()); + } + + public byte protocolVersion(){ + return ProtocolConstants.VERSION_1; + } +} diff --git a/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/serializer/SeataSerializerV2.java b/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/serializer/SeataSerializerV2.java new file mode 100644 index 00000000000..5ab44f8a92b --- /dev/null +++ b/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/serializer/SeataSerializerV2.java @@ -0,0 +1,28 @@ +package org.apache.seata.serializer.seata.serializer; + +import org.apache.seata.core.protocol.ProtocolConstants; + +/** + * the type SeataSerializer(V2) + **/ +public class SeataSerializerV2 extends SeataSerializerV1{ + private static volatile SeataSerializerV2 instance; + + protected SeataSerializerV2() { + } + + public static SeataSerializerV2 getInstance() { + if (instance == null) { + synchronized (SeataSerializerV2.class) { + if (instance == null) { + instance = new SeataSerializerV2(); + } + } + } + return instance; + } + + public byte protocolVersion(){ + return ProtocolConstants.VERSION_2; + } +} diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/TmNettyClientTest.java b/test/src/test/java/org/apache/seata/core/rpc/netty/TmNettyClientTest.java index 9a46210aee2..551a3b90bc1 100644 --- a/test/src/test/java/org/apache/seata/core/rpc/netty/TmNettyClientTest.java +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/TmNettyClientTest.java @@ -22,6 +22,8 @@ import org.apache.seata.common.XID; import org.apache.seata.common.util.NetUtil; import org.apache.seata.core.model.GlobalStatus; +import org.apache.seata.core.protocol.RegisterTMRequest; +import org.apache.seata.core.protocol.RegisterTMResponse; import org.apache.seata.core.protocol.transaction.GlobalCommitRequest; import org.apache.seata.core.protocol.transaction.GlobalCommitResponse; import org.apache.seata.saga.engine.db.AbstractServerTest; @@ -189,4 +191,48 @@ public void testSendMsgWithResponse() throws Exception { nettyRemotingServer.destroy(); tmNettyRemotingClient.destroy(); } + + @Test + public void testRegisterTMCodec() throws Exception { + ThreadPoolExecutor workingThreads = initMessageExecutor(); + NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads); + new Thread(() -> { + SessionHolder.init(null); + nettyRemotingServer.setHandler(DefaultCoordinator.getInstance(nettyRemotingServer)); + // set registry + XID.setIpAddress(NetUtil.getLocalIp()); + XID.setPort(8091); + // init snowflake for transactionId, branchId + UUIDGenerator.init(1L); + nettyRemotingServer.init(); + }).start(); + Thread.sleep(3000); + + String applicationId = "app 1"; + String transactionServiceGroup = "default_tx_group"; + TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup); + tmNettyRemotingClient.init(); + + String serverAddress = "0.0.0.0:8091"; + Channel channel = TmNettyRemotingClient.getInstance().getClientChannelManager().acquireChannel(serverAddress); + Assertions.assertNotNull(channel); + + // test old version + RegisterTMRequest request = new RegisterTMRequest(applicationId, transactionServiceGroup); + // todo + request.setVersion(""); + RegisterTMResponse response = (RegisterTMResponse)tmNettyRemotingClient.sendSyncRequest(request); + + + + +// Assertions.assertNotNull(globalCommitResponse); +// Assertions.assertEquals(GlobalStatus.Finished, globalCommitResponse.getGlobalStatus()); + + + + nettyRemotingServer.destroy(); + tmNettyRemotingClient.destroy(); + } + }