Skip to content

Commit

Permalink
feature: add grpc serializer (#6992)
Browse files Browse the repository at this point in the history
  • Loading branch information
PleaseGiveMeTheCoke authored Nov 10, 2024
1 parent 835ef47 commit 60a81b1
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 10 deletions.
1 change: 1 addition & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6881](https://github.com/apache/incubator-seata/pull/6881)] support grpc
- [[#6864](https://github.com/apache/incubator-seata/pull/6864)] support shentong database
- [[#6974](https://github.com/apache/incubator-seata/pull/6974)] support fastjson2 undolog parser
- [[#6992](https://github.com/apache/incubator-seata/pull/6992)] support grpc serializer


### bugfix:
Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- [[#6881](https://github.com/apache/incubator-seata/pull/6881)] client和server支持grpc协议
- [[#6864](https://github.com/apache/incubator-seata/pull/6864)] 支持神通数据库(oscar)
- [[#6974](https://github.com/apache/incubator-seata/pull/6974)] 支持UndoLog的fastjson2序列化方式
- [[#6992](https://github.com/apache/incubator-seata/pull/6992)] 支持grpc序列化器


### bugfix:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void onDataRead(ChannelHandlerContext ctx, Http2DataFrame msg) throws Exc
bodyBytes = compressor.decompress(bodyBytes);
}
String codecValue = headMap.get(GrpcHeaderEnum.CODEC_TYPE.header);
int codec = StringUtils.isBlank(codecValue) ? SerializerType.PROTOBUF.getCode()
int codec = StringUtils.isBlank(codecValue) ? SerializerType.GRPC.getCode()
: Integer.parseInt(codecValue);
SerializerType serializerType = SerializerType.getByCode(codec);
rpcMsg.setCodec(serializerType.getCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
ByteString dataBytes;
if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
&& messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(SerializerType.PROTOBUF.getCode()));
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(SerializerType.GRPC.getCode()));
byte[] serializedBytes = serializer.serialize(body);
Compressor compressor = CompressorFactory.getCompressor(rpcMessage.getCompressor());
dataBytes = ByteString.copyFrom(compressor.compress(serializedBytes));
} else {
dataBytes = ByteString.EMPTY;
}
headMap.put(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.PROTOBUF.getCode()));
headMap.put(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.GRPC.getCode()));
headMap.put(GrpcHeaderEnum.COMPRESS_TYPE.header, String.valueOf(rpcMessage.getCompressor()));
GrpcMessageProto.Builder builder = GrpcMessageProto.newBuilder()
.putAllHeadMap(headMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,14 @@ public enum SerializerType {
* Math.pow(2, 6)
*/
FASTJSON2((byte)0x64),
;


/**
* The grpc
* <p>
* Math.pow(2, 7)
*/
GRPC((byte) 0x128);

private final byte code;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.serializer.protobuf;

import com.google.protobuf.Any;
import com.google.protobuf.Message;
import org.apache.seata.common.exception.ShouldNeverHappenException;
import org.apache.seata.common.loader.LoadLevel;
import org.apache.seata.core.serializer.Serializer;
import org.apache.seata.serializer.protobuf.convertor.PbConvertor;
import org.apache.seata.serializer.protobuf.manager.ProtobufConvertManager;

@LoadLevel(name = "GRPC")
public class GrpcSerializer implements Serializer {
@Override
public <T> byte[] serialize(T t) {
PbConvertor pbConvertor = ProtobufConvertManager.getInstance()
.fetchConvertor(t.getClass().getName());
Any grpcBody = Any.pack((Message) pbConvertor.convert2Proto(t));

return grpcBody.toByteArray();
}

@Override
public <T> T deserialize(byte[] bytes) {
try {
Any body = Any.parseFrom(bytes);
final Class clazz = ProtobufConvertManager.getInstance().fetchProtoClass(getTypeNameFromTypeUrl(body.getTypeUrl()));
if (body.is(clazz)) {
Object ob = body.unpack(clazz);
PbConvertor pbConvertor = ProtobufConvertManager.getInstance().fetchReversedConvertor(clazz.getName());

return (T) pbConvertor.convert2Model(ob);
}
} catch (Throwable e) {
throw new ShouldNeverHappenException("GrpcSerializer deserialize error", e);
}

return null;
}

private String getTypeNameFromTypeUrl(String typeUri) {
int pos = typeUri.lastIndexOf('/');
return pos == -1 ? "" : typeUri.substring(pos + 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.apache.seata.serializer.protobuf.ProtobufSerializer
org.apache.seata.serializer.protobuf.ProtobufSerializer
org.apache.seata.serializer.protobuf.GrpcSerializer
5 changes: 5 additions & 0 deletions test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@
<artifactId>seata-tm</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seata</groupId>
<artifactId>seata-serializer-protobuf</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-alts</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.seata.core.rpc.netty.mockserver;

import com.google.protobuf.Any;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
Expand All @@ -25,6 +26,8 @@
import org.apache.seata.core.protocol.generated.GrpcMessageProto;
import org.apache.seata.core.rpc.netty.RmNettyRemotingClient;
import org.apache.seata.core.rpc.netty.TmNettyRemotingClient;
import org.apache.seata.core.rpc.netty.grpc.GrpcHeaderEnum;
import org.apache.seata.core.serializer.SerializerType;
import org.apache.seata.mockserver.MockServer;
import org.apache.seata.serializer.protobuf.generated.*;
import org.apache.seata.core.protocol.generated.SeataServiceGrpc;
Expand Down Expand Up @@ -69,15 +72,15 @@ private GrpcMessageProto getRegisterTMRequest() {
.setAbstractIdentifyRequest(abstractIdentifyRequestProto)
.build();

return GrpcMessageProto.newBuilder().setBody(registerTMRequestProto.toByteString()).build();
return GrpcMessageProto.newBuilder().putHeadMap(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.GRPC.getCode())).setBody(Any.pack(registerTMRequestProto).toByteString()).build();
}

private GrpcMessageProto getGlobalBeginRequest() {
GlobalBeginRequestProto globalBeginRequestProto = GlobalBeginRequestProto.newBuilder()
.setTransactionName("test-transaction")
.setTimeout(2000)
.build();
return GrpcMessageProto.newBuilder().setBody(globalBeginRequestProto.toByteString()).build();
return GrpcMessageProto.newBuilder().putHeadMap(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.GRPC.getCode())).setBody(Any.pack(globalBeginRequestProto).toByteString()).build();
}

private GrpcMessageProto getBranchRegisterRequest() {
Expand All @@ -89,7 +92,7 @@ private GrpcMessageProto getBranchRegisterRequest() {
.setApplicationData("{\"mock\":\"mock\"}")
.build();

return GrpcMessageProto.newBuilder().setBody(branchRegisterRequestProto.toByteString()).build();
return GrpcMessageProto.newBuilder().putHeadMap(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.GRPC.getCode())).setBody(Any.pack(branchRegisterRequestProto).toByteString()).build();
}

private GrpcMessageProto getGlobalCommitRequest() {
Expand All @@ -100,7 +103,7 @@ private GrpcMessageProto getGlobalCommitRequest() {
.setAbstractGlobalEndRequest(globalEndRequestProto)
.build();

return GrpcMessageProto.newBuilder().setBody(globalCommitRequestProto.toByteString()).build();
return GrpcMessageProto.newBuilder().putHeadMap(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.GRPC.getCode())).setBody(Any.pack(globalCommitRequestProto).toByteString()).build();
}

private GrpcMessageProto getGlobalRollbackRequest() {
Expand All @@ -111,7 +114,7 @@ private GrpcMessageProto getGlobalRollbackRequest() {
.setAbstractGlobalEndRequest(globalEndRequestProto)
.build();

return GrpcMessageProto.newBuilder().setBody(globalRollbackRequestProto.toByteString()).build();
return GrpcMessageProto.newBuilder().putHeadMap(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.GRPC.getCode())).setBody(Any.pack(globalRollbackRequestProto).toByteString()).build();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.apache.seata.serializer.protobuf.GrpcSerializer

0 comments on commit 60a81b1

Please sign in to comment.