Skip to content

Commit

Permalink
undo log skip
Browse files Browse the repository at this point in the history
  • Loading branch information
Bughue committed Nov 7, 2024
1 parent 6cfdf72 commit a2cad95
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 4 deletions.
21 changes: 21 additions & 0 deletions core/src/main/java/org/apache/seata/core/protocol/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class Version {
/**
* The constant VERSION_MAP.
*/
public static final Map<String, String> VERSION_MAP = new ConcurrentHashMap<>();

private Version() {

Expand All @@ -59,6 +60,26 @@ public static String getCurrent() {
return CURRENT;
}

/**
* Put channel version.
*
* @param c the c
* @param v the v
*/
public static void putChannelVersion(Channel c, String v) {
VERSION_MAP.put(NetUtil.toStringAddress(c.remoteAddress()), v);
}

/**
* Gets channel version.
*
* @param c the c
* @return the channel version
*/
public static String getChannelVersion(Channel c) {
return VERSION_MAP.get(NetUtil.toStringAddress(c.remoteAddress()));
}

/**
* Determine whether the client version is greater than or equal to version 1.5.0
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ public class MsgVersionHelper {
private static final List<Short> SKIP_MSG_CODE_V0 = Arrays.asList(MessageType.TYPE_RM_DELETE_UNDOLOG);

public static boolean versionNotSupport(Channel channel, RpcMessage rpcMessage){
if(rpcMessage==null || rpcMessage.getBody() == null){
if(rpcMessage==null || rpcMessage.getBody() == null || channel == null){
return false;
}
Object msg = rpcMessage.getBody();
RpcContext rpcContext = ChannelManager.getContextFromIdentified(channel);
if (rpcContext == null || StringUtils.isBlank(rpcContext.getVersion()) || msg == null) {
String version = Version.getChannelVersion(channel);
if (StringUtils.isBlank(version) || msg == null) {
return false;
}
boolean aboveV0 = Version.isAboveOrEqualVersion071(rpcContext.getVersion());
boolean aboveV0 = Version.isAboveOrEqualVersion071(version);
if(aboveV0 || !(msg instanceof MessageTypeAware)){
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ private void onRegRmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
try {
if (null == checkAuthHandler || checkAuthHandler.regResourceManagerCheckAuth(message)) {
ChannelManager.registerRMChannel(message, ctx.channel());
Version.putChannelVersion(ctx.channel(), message.getVersion());
isSuccess = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("RM checkAuth for client:{},vgroup:{},applicationId:{} is OK", ipAndPort, message.getTransactionServiceGroup(), message.getApplicationId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,13 @@ public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exc
private void onRegTmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
RegisterTMRequest message = (RegisterTMRequest) rpcMessage.getBody();
String ipAndPort = NetUtil.toStringAddress(ctx.channel().remoteAddress());
Version.putChannelVersion(ctx.channel(), message.getVersion());
boolean isSuccess = false;
String errorInfo = StringUtils.EMPTY;
try {
if (null == checkAuthHandler || checkAuthHandler.regTransactionManagerCheckAuth(message)) {
ChannelManager.registerTMChannel(message, ctx.channel());
Version.putChannelVersion(ctx.channel(), message.getVersion());
isSuccess = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("TM checkAuth for client:{},vgroup:{},applicationId:{} is OK",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exc
LOGGER.info("message = " + message);

ChannelManager.registerTMChannel(message, ctx.channel());
Version.putChannelVersion(ctx.channel(), message.getVersion());

RegisterTMResponse resp = new RegisterTMResponse();
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resp);
Expand All @@ -63,6 +64,7 @@ public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exc
LOGGER.info("message = " + message);

ChannelManager.registerRMChannel(message, ctx.channel());
Version.putChannelVersion(ctx.channel(), message.getVersion());

RegisterRMResponse resp = new RegisterRMResponse();
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resp);
Expand Down

0 comments on commit a2cad95

Please sign in to comment.