From 60a81b1e93b6e2833df99c89ee21c5687512a2e7 Mon Sep 17 00:00:00 2001 From: yiqi <77573225+PleaseGiveMeTheCoke@users.noreply.github.com> Date: Sun, 10 Nov 2024 21:02:06 +0800 Subject: [PATCH] feature: add grpc serializer (#6992) --- changes/en-us/2.x.md | 1 + changes/zh-cn/2.x.md | 1 + .../core/rpc/netty/grpc/GrpcDecoder.java | 2 +- .../core/rpc/netty/grpc/GrpcEncoder.java | 4 +- .../seata/core/serializer/SerializerType.java | 9 ++- .../serializer/protobuf/GrpcSerializer.java | 60 +++++++++++++++++++ ...rg.apache.seata.core.serializer.Serializer | 3 +- test/pom.xml | 5 ++ .../core/rpc/netty/mockserver/GrpcTest.java | 13 ++-- ...rg.apache.seata.core.serializer.Serializer | 17 ++++++ 10 files changed, 105 insertions(+), 10 deletions(-) create mode 100644 serializer/seata-serializer-protobuf/src/main/java/org/apache/seata/serializer/protobuf/GrpcSerializer.java create mode 100644 test/src/test/resources/META-INF/services/org.apache.seata.core.serializer.Serializer diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 8ba55503320..cfd89f80cec 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -8,6 +8,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6881](https://github.com/apache/incubator-seata/pull/6881)] support grpc - [[#6864](https://github.com/apache/incubator-seata/pull/6864)] support shentong database - [[#6974](https://github.com/apache/incubator-seata/pull/6974)] support fastjson2 undolog parser +- [[#6992](https://github.com/apache/incubator-seata/pull/6992)] support grpc serializer ### bugfix: diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 915a039cbb9..6aeb138b529 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -8,6 +8,7 @@ - [[#6881](https://github.com/apache/incubator-seata/pull/6881)] client和server支持grpc协议 - [[#6864](https://github.com/apache/incubator-seata/pull/6864)] 支持神通数据库(oscar) - [[#6974](https://github.com/apache/incubator-seata/pull/6974)] 支持UndoLog的fastjson2序列化方式 +- [[#6992](https://github.com/apache/incubator-seata/pull/6992)] 支持grpc序列化器 ### bugfix: diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java index e227d5dc7c7..5544e994a55 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java @@ -94,7 +94,7 @@ public void onDataRead(ChannelHandlerContext ctx, Http2DataFrame msg) throws Exc bodyBytes = compressor.decompress(bodyBytes); } String codecValue = headMap.get(GrpcHeaderEnum.CODEC_TYPE.header); - int codec = StringUtils.isBlank(codecValue) ? SerializerType.PROTOBUF.getCode() + int codec = StringUtils.isBlank(codecValue) ? SerializerType.GRPC.getCode() : Integer.parseInt(codecValue); SerializerType serializerType = SerializerType.getByCode(codec); rpcMsg.setCodec(serializerType.getCode()); diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java index dbbbfe1be48..2601a2f0a6e 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java @@ -64,14 +64,14 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) ByteString dataBytes; if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST && messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) { - Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(SerializerType.PROTOBUF.getCode())); + Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(SerializerType.GRPC.getCode())); byte[] serializedBytes = serializer.serialize(body); Compressor compressor = CompressorFactory.getCompressor(rpcMessage.getCompressor()); dataBytes = ByteString.copyFrom(compressor.compress(serializedBytes)); } else { dataBytes = ByteString.EMPTY; } - headMap.put(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.PROTOBUF.getCode())); + headMap.put(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.GRPC.getCode())); headMap.put(GrpcHeaderEnum.COMPRESS_TYPE.header, String.valueOf(rpcMessage.getCompressor())); GrpcMessageProto.Builder builder = GrpcMessageProto.newBuilder() .putAllHeadMap(headMap) diff --git a/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java b/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java index 56fd8136d17..39772b34ce0 100644 --- a/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java +++ b/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java @@ -70,7 +70,14 @@ public enum SerializerType { * Math.pow(2, 6) */ FASTJSON2((byte)0x64), - ; + + + /** + * The grpc + *

+ * Math.pow(2, 7) + */ + GRPC((byte) 0x128); private final byte code; diff --git a/serializer/seata-serializer-protobuf/src/main/java/org/apache/seata/serializer/protobuf/GrpcSerializer.java b/serializer/seata-serializer-protobuf/src/main/java/org/apache/seata/serializer/protobuf/GrpcSerializer.java new file mode 100644 index 00000000000..2ef8eac784e --- /dev/null +++ b/serializer/seata-serializer-protobuf/src/main/java/org/apache/seata/serializer/protobuf/GrpcSerializer.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.serializer.protobuf; + +import com.google.protobuf.Any; +import com.google.protobuf.Message; +import org.apache.seata.common.exception.ShouldNeverHappenException; +import org.apache.seata.common.loader.LoadLevel; +import org.apache.seata.core.serializer.Serializer; +import org.apache.seata.serializer.protobuf.convertor.PbConvertor; +import org.apache.seata.serializer.protobuf.manager.ProtobufConvertManager; + +@LoadLevel(name = "GRPC") +public class GrpcSerializer implements Serializer { + @Override + public byte[] serialize(T t) { + PbConvertor pbConvertor = ProtobufConvertManager.getInstance() + .fetchConvertor(t.getClass().getName()); + Any grpcBody = Any.pack((Message) pbConvertor.convert2Proto(t)); + + return grpcBody.toByteArray(); + } + + @Override + public T deserialize(byte[] bytes) { + try { + Any body = Any.parseFrom(bytes); + final Class clazz = ProtobufConvertManager.getInstance().fetchProtoClass(getTypeNameFromTypeUrl(body.getTypeUrl())); + if (body.is(clazz)) { + Object ob = body.unpack(clazz); + PbConvertor pbConvertor = ProtobufConvertManager.getInstance().fetchReversedConvertor(clazz.getName()); + + return (T) pbConvertor.convert2Model(ob); + } + } catch (Throwable e) { + throw new ShouldNeverHappenException("GrpcSerializer deserialize error", e); + } + + return null; + } + + private String getTypeNameFromTypeUrl(String typeUri) { + int pos = typeUri.lastIndexOf('/'); + return pos == -1 ? "" : typeUri.substring(pos + 1); + } +} diff --git a/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer b/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer index 71098c53674..f6fbf709dea 100644 --- a/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer +++ b/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer @@ -14,4 +14,5 @@ # See the License for the specific language governing permissions and # limitations under the License. # -org.apache.seata.serializer.protobuf.ProtobufSerializer \ No newline at end of file +org.apache.seata.serializer.protobuf.ProtobufSerializer +org.apache.seata.serializer.protobuf.GrpcSerializer \ No newline at end of file diff --git a/test/pom.xml b/test/pom.xml index d35f25bad5e..e9991c688db 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -71,6 +71,11 @@ seata-tm ${project.version} + + org.apache.seata + seata-serializer-protobuf + ${project.version} + io.grpc grpc-alts diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java index 0d63d2eb70f..042160a9ba2 100644 --- a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java @@ -16,6 +16,7 @@ */ package org.apache.seata.core.rpc.netty.mockserver; +import com.google.protobuf.Any; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; @@ -25,6 +26,8 @@ import org.apache.seata.core.protocol.generated.GrpcMessageProto; import org.apache.seata.core.rpc.netty.RmNettyRemotingClient; import org.apache.seata.core.rpc.netty.TmNettyRemotingClient; +import org.apache.seata.core.rpc.netty.grpc.GrpcHeaderEnum; +import org.apache.seata.core.serializer.SerializerType; import org.apache.seata.mockserver.MockServer; import org.apache.seata.serializer.protobuf.generated.*; import org.apache.seata.core.protocol.generated.SeataServiceGrpc; @@ -69,7 +72,7 @@ private GrpcMessageProto getRegisterTMRequest() { .setAbstractIdentifyRequest(abstractIdentifyRequestProto) .build(); - return GrpcMessageProto.newBuilder().setBody(registerTMRequestProto.toByteString()).build(); + return GrpcMessageProto.newBuilder().putHeadMap(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.GRPC.getCode())).setBody(Any.pack(registerTMRequestProto).toByteString()).build(); } private GrpcMessageProto getGlobalBeginRequest() { @@ -77,7 +80,7 @@ private GrpcMessageProto getGlobalBeginRequest() { .setTransactionName("test-transaction") .setTimeout(2000) .build(); - return GrpcMessageProto.newBuilder().setBody(globalBeginRequestProto.toByteString()).build(); + return GrpcMessageProto.newBuilder().putHeadMap(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.GRPC.getCode())).setBody(Any.pack(globalBeginRequestProto).toByteString()).build(); } private GrpcMessageProto getBranchRegisterRequest() { @@ -89,7 +92,7 @@ private GrpcMessageProto getBranchRegisterRequest() { .setApplicationData("{\"mock\":\"mock\"}") .build(); - return GrpcMessageProto.newBuilder().setBody(branchRegisterRequestProto.toByteString()).build(); + return GrpcMessageProto.newBuilder().putHeadMap(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.GRPC.getCode())).setBody(Any.pack(branchRegisterRequestProto).toByteString()).build(); } private GrpcMessageProto getGlobalCommitRequest() { @@ -100,7 +103,7 @@ private GrpcMessageProto getGlobalCommitRequest() { .setAbstractGlobalEndRequest(globalEndRequestProto) .build(); - return GrpcMessageProto.newBuilder().setBody(globalCommitRequestProto.toByteString()).build(); + return GrpcMessageProto.newBuilder().putHeadMap(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.GRPC.getCode())).setBody(Any.pack(globalCommitRequestProto).toByteString()).build(); } private GrpcMessageProto getGlobalRollbackRequest() { @@ -111,7 +114,7 @@ private GrpcMessageProto getGlobalRollbackRequest() { .setAbstractGlobalEndRequest(globalEndRequestProto) .build(); - return GrpcMessageProto.newBuilder().setBody(globalRollbackRequestProto.toByteString()).build(); + return GrpcMessageProto.newBuilder().putHeadMap(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.GRPC.getCode())).setBody(Any.pack(globalRollbackRequestProto).toByteString()).build(); } @Test diff --git a/test/src/test/resources/META-INF/services/org.apache.seata.core.serializer.Serializer b/test/src/test/resources/META-INF/services/org.apache.seata.core.serializer.Serializer new file mode 100644 index 00000000000..81c5235e259 --- /dev/null +++ b/test/src/test/resources/META-INF/services/org.apache.seata.core.serializer.Serializer @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +org.apache.seata.serializer.protobuf.GrpcSerializer \ No newline at end of file