diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index 8ba55503320..cfd89f80cec 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -8,6 +8,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6881](https://github.com/apache/incubator-seata/pull/6881)] support grpc
- [[#6864](https://github.com/apache/incubator-seata/pull/6864)] support shentong database
- [[#6974](https://github.com/apache/incubator-seata/pull/6974)] support fastjson2 undolog parser
+- [[#6992](https://github.com/apache/incubator-seata/pull/6992)] support grpc serializer
### bugfix:
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index 915a039cbb9..6aeb138b529 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -8,6 +8,7 @@
- [[#6881](https://github.com/apache/incubator-seata/pull/6881)] client和server支持grpc协议
- [[#6864](https://github.com/apache/incubator-seata/pull/6864)] 支持神通数据库(oscar)
- [[#6974](https://github.com/apache/incubator-seata/pull/6974)] 支持UndoLog的fastjson2序列化方式
+- [[#6992](https://github.com/apache/incubator-seata/pull/6992)] 支持grpc序列化器
### bugfix:
diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java
index e227d5dc7c7..5544e994a55 100644
--- a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java
+++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java
@@ -94,7 +94,7 @@ public void onDataRead(ChannelHandlerContext ctx, Http2DataFrame msg) throws Exc
bodyBytes = compressor.decompress(bodyBytes);
}
String codecValue = headMap.get(GrpcHeaderEnum.CODEC_TYPE.header);
- int codec = StringUtils.isBlank(codecValue) ? SerializerType.PROTOBUF.getCode()
+ int codec = StringUtils.isBlank(codecValue) ? SerializerType.GRPC.getCode()
: Integer.parseInt(codecValue);
SerializerType serializerType = SerializerType.getByCode(codec);
rpcMsg.setCodec(serializerType.getCode());
diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java
index dbbbfe1be48..2601a2f0a6e 100644
--- a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java
+++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java
@@ -64,14 +64,14 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
ByteString dataBytes;
if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
&& messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
- Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(SerializerType.PROTOBUF.getCode()));
+ Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(SerializerType.GRPC.getCode()));
byte[] serializedBytes = serializer.serialize(body);
Compressor compressor = CompressorFactory.getCompressor(rpcMessage.getCompressor());
dataBytes = ByteString.copyFrom(compressor.compress(serializedBytes));
} else {
dataBytes = ByteString.EMPTY;
}
- headMap.put(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.PROTOBUF.getCode()));
+ headMap.put(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.GRPC.getCode()));
headMap.put(GrpcHeaderEnum.COMPRESS_TYPE.header, String.valueOf(rpcMessage.getCompressor()));
GrpcMessageProto.Builder builder = GrpcMessageProto.newBuilder()
.putAllHeadMap(headMap)
diff --git a/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java b/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java
index 56fd8136d17..39772b34ce0 100644
--- a/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java
+++ b/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java
@@ -70,7 +70,14 @@ public enum SerializerType {
* Math.pow(2, 6)
*/
FASTJSON2((byte)0x64),
- ;
+
+
+ /**
+ * The grpc
+ *
+ * Math.pow(2, 7)
+ */
+ GRPC((byte) 0x128);
private final byte code;
diff --git a/serializer/seata-serializer-protobuf/src/main/java/org/apache/seata/serializer/protobuf/GrpcSerializer.java b/serializer/seata-serializer-protobuf/src/main/java/org/apache/seata/serializer/protobuf/GrpcSerializer.java
new file mode 100644
index 00000000000..2ef8eac784e
--- /dev/null
+++ b/serializer/seata-serializer-protobuf/src/main/java/org/apache/seata/serializer/protobuf/GrpcSerializer.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.serializer.protobuf;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.Message;
+import org.apache.seata.common.exception.ShouldNeverHappenException;
+import org.apache.seata.common.loader.LoadLevel;
+import org.apache.seata.core.serializer.Serializer;
+import org.apache.seata.serializer.protobuf.convertor.PbConvertor;
+import org.apache.seata.serializer.protobuf.manager.ProtobufConvertManager;
+
+@LoadLevel(name = "GRPC")
+public class GrpcSerializer implements Serializer {
+ @Override
+ public byte[] serialize(T t) {
+ PbConvertor pbConvertor = ProtobufConvertManager.getInstance()
+ .fetchConvertor(t.getClass().getName());
+ Any grpcBody = Any.pack((Message) pbConvertor.convert2Proto(t));
+
+ return grpcBody.toByteArray();
+ }
+
+ @Override
+ public T deserialize(byte[] bytes) {
+ try {
+ Any body = Any.parseFrom(bytes);
+ final Class clazz = ProtobufConvertManager.getInstance().fetchProtoClass(getTypeNameFromTypeUrl(body.getTypeUrl()));
+ if (body.is(clazz)) {
+ Object ob = body.unpack(clazz);
+ PbConvertor pbConvertor = ProtobufConvertManager.getInstance().fetchReversedConvertor(clazz.getName());
+
+ return (T) pbConvertor.convert2Model(ob);
+ }
+ } catch (Throwable e) {
+ throw new ShouldNeverHappenException("GrpcSerializer deserialize error", e);
+ }
+
+ return null;
+ }
+
+ private String getTypeNameFromTypeUrl(String typeUri) {
+ int pos = typeUri.lastIndexOf('/');
+ return pos == -1 ? "" : typeUri.substring(pos + 1);
+ }
+}
diff --git a/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer b/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer
index 71098c53674..f6fbf709dea 100644
--- a/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer
+++ b/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer
@@ -14,4 +14,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-org.apache.seata.serializer.protobuf.ProtobufSerializer
\ No newline at end of file
+org.apache.seata.serializer.protobuf.ProtobufSerializer
+org.apache.seata.serializer.protobuf.GrpcSerializer
\ No newline at end of file
diff --git a/test/pom.xml b/test/pom.xml
index d35f25bad5e..e9991c688db 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -71,6 +71,11 @@
seata-tm
${project.version}
+
+ org.apache.seata
+ seata-serializer-protobuf
+ ${project.version}
+
io.grpc
grpc-alts
diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java
index 0d63d2eb70f..042160a9ba2 100644
--- a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java
+++ b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.seata.core.rpc.netty.mockserver;
+import com.google.protobuf.Any;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
@@ -25,6 +26,8 @@
import org.apache.seata.core.protocol.generated.GrpcMessageProto;
import org.apache.seata.core.rpc.netty.RmNettyRemotingClient;
import org.apache.seata.core.rpc.netty.TmNettyRemotingClient;
+import org.apache.seata.core.rpc.netty.grpc.GrpcHeaderEnum;
+import org.apache.seata.core.serializer.SerializerType;
import org.apache.seata.mockserver.MockServer;
import org.apache.seata.serializer.protobuf.generated.*;
import org.apache.seata.core.protocol.generated.SeataServiceGrpc;
@@ -69,7 +72,7 @@ private GrpcMessageProto getRegisterTMRequest() {
.setAbstractIdentifyRequest(abstractIdentifyRequestProto)
.build();
- return GrpcMessageProto.newBuilder().setBody(registerTMRequestProto.toByteString()).build();
+ return GrpcMessageProto.newBuilder().putHeadMap(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.GRPC.getCode())).setBody(Any.pack(registerTMRequestProto).toByteString()).build();
}
private GrpcMessageProto getGlobalBeginRequest() {
@@ -77,7 +80,7 @@ private GrpcMessageProto getGlobalBeginRequest() {
.setTransactionName("test-transaction")
.setTimeout(2000)
.build();
- return GrpcMessageProto.newBuilder().setBody(globalBeginRequestProto.toByteString()).build();
+ return GrpcMessageProto.newBuilder().putHeadMap(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.GRPC.getCode())).setBody(Any.pack(globalBeginRequestProto).toByteString()).build();
}
private GrpcMessageProto getBranchRegisterRequest() {
@@ -89,7 +92,7 @@ private GrpcMessageProto getBranchRegisterRequest() {
.setApplicationData("{\"mock\":\"mock\"}")
.build();
- return GrpcMessageProto.newBuilder().setBody(branchRegisterRequestProto.toByteString()).build();
+ return GrpcMessageProto.newBuilder().putHeadMap(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.GRPC.getCode())).setBody(Any.pack(branchRegisterRequestProto).toByteString()).build();
}
private GrpcMessageProto getGlobalCommitRequest() {
@@ -100,7 +103,7 @@ private GrpcMessageProto getGlobalCommitRequest() {
.setAbstractGlobalEndRequest(globalEndRequestProto)
.build();
- return GrpcMessageProto.newBuilder().setBody(globalCommitRequestProto.toByteString()).build();
+ return GrpcMessageProto.newBuilder().putHeadMap(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.GRPC.getCode())).setBody(Any.pack(globalCommitRequestProto).toByteString()).build();
}
private GrpcMessageProto getGlobalRollbackRequest() {
@@ -111,7 +114,7 @@ private GrpcMessageProto getGlobalRollbackRequest() {
.setAbstractGlobalEndRequest(globalEndRequestProto)
.build();
- return GrpcMessageProto.newBuilder().setBody(globalRollbackRequestProto.toByteString()).build();
+ return GrpcMessageProto.newBuilder().putHeadMap(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.GRPC.getCode())).setBody(Any.pack(globalRollbackRequestProto).toByteString()).build();
}
@Test
diff --git a/test/src/test/resources/META-INF/services/org.apache.seata.core.serializer.Serializer b/test/src/test/resources/META-INF/services/org.apache.seata.core.serializer.Serializer
new file mode 100644
index 00000000000..81c5235e259
--- /dev/null
+++ b/test/src/test/resources/META-INF/services/org.apache.seata.core.serializer.Serializer
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+org.apache.seata.serializer.protobuf.GrpcSerializer
\ No newline at end of file