diff --git a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java index dd0ceb841f2..312a641200f 100644 --- a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java +++ b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java @@ -18,7 +18,6 @@ /** * The type Configuration keys. - * */ public interface ConfigurationKeys { /** @@ -68,6 +67,11 @@ public interface ConfigurationKeys { */ String SEATA_PREFIX = SEATA_FILE_ROOT_CONFIG + "."; + /** + * The constant SECURITY_PREFIX + */ + String SECURITY_PREFIX = "security."; + /** * The constant SERVICE_PREFIX. */ @@ -1011,4 +1015,24 @@ public interface ConfigurationKeys { * The constant SERVER_APPLICATION_DATA_SIZE_CHECK */ String SERVER_APPLICATION_DATA_SIZE_CHECK = SERVER_PREFIX + "applicationDataLimitCheck"; + + /** + * The constant SECURITY_USERNAME; + */ + String SECURITY_USERNME = SECURITY_PREFIX + "username"; + + /** + * The constant SECURITY_PASSWORD; + */ + String SECURITY_PASSWORD = SECURITY_PREFIX + "password"; + + /** + * The constant SECURITY_SECRET_KEY; + */ + String SECURITY_SECRET_KEY = SECURITY_PREFIX + "secretKey"; + + /** + * The constant SECURITY_TOKEN_VALID_TIME; + */ + String SECURITY_TOKEN_VALID_TIME = SECURITY_PREFIX + "tokenValidityInMilliseconds"; } diff --git a/common/src/main/java/org/apache/seata/common/util/StringUtils.java b/common/src/main/java/org/apache/seata/common/util/StringUtils.java index 5bfe519eb3a..ffa479ace6a 100644 --- a/common/src/main/java/org/apache/seata/common/util/StringUtils.java +++ b/common/src/main/java/org/apache/seata/common/util/StringUtils.java @@ -27,6 +27,7 @@ import java.util.Date; import java.util.Iterator; import java.util.Map; +import java.util.HashMap; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -35,6 +36,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.seata.common.ConfigurationKeys.EXTRA_DATA_KV_CHAR; +import static org.apache.seata.common.ConfigurationKeys.EXTRA_DATA_SPLIT_CHAR; + /** * The type String utils. * @@ -427,4 +431,36 @@ public static String join(Iterator iterator, String separator) { return builder.toString(); } + public static HashMap string2Map(String inputString) { + HashMap resultMap = new HashMap<>(); + if (StringUtils.isBlank(inputString)) { + return resultMap; + } + String[] keyValuePairs = inputString.split(EXTRA_DATA_SPLIT_CHAR); + for (String pair : keyValuePairs) { + String[] keyValue = pair.trim().split(EXTRA_DATA_KV_CHAR); + if (keyValue.length == 2) { + resultMap.put(keyValue[0].trim(), keyValue[1].trim()); + } + } + return resultMap; + } + + public static String map2String(HashMap inputMap) { + if (inputMap == null || inputMap.isEmpty()) { + return ""; + } + StringBuilder resultString = new StringBuilder(); + for (Map.Entry entry : inputMap.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + String pair = key + EXTRA_DATA_KV_CHAR + value + EXTRA_DATA_SPLIT_CHAR; + resultString.append(pair); + } + if (resultString.length() > 0) { + resultString.deleteCharAt(resultString.length() - 1); + } + return resultString.toString(); + } + } diff --git a/core/src/main/java/org/apache/seata/core/auth/JwtAuthManager.java b/core/src/main/java/org/apache/seata/core/auth/JwtAuthManager.java new file mode 100644 index 00000000000..ded66c07379 --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/auth/JwtAuthManager.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.auth; + + +import org.apache.seata.common.ConfigurationKeys; +import org.apache.seata.common.util.StringUtils; +import org.apache.seata.config.ConfigurationFactory; + +import java.util.HashMap; + + +public class JwtAuthManager { + private String accessToken; + + private String username; + + private String password; + + public final static String PRO_USERNAME = "username"; + + public final static String PRO_PASSWORD = "password"; + + public final static String PRO_TOKEN = "token"; + + private static JwtAuthManager instance; + + private JwtAuthManager() { + } + + public static JwtAuthManager getInstance() { + if (instance == null) { + instance = new JwtAuthManager(); + } + return instance; + } + + public void init() { + username = ConfigurationFactory.CURRENT_FILE_INSTANCE.getConfig(ConfigurationKeys.SECURITY_USERNME); + password = ConfigurationFactory.CURRENT_FILE_INSTANCE.getConfig(ConfigurationKeys.SECURITY_PASSWORD); + } + + public String getToken() { + return accessToken; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public void refreshToken(String newToken) { + accessToken = newToken; + } + + public void setAccessToken(String token) { + accessToken = token; + } + + public static String refreshAuthData(String extraData) { + HashMap extraDataMap = StringUtils.string2Map(extraData); + extraDataMap.remove(PRO_TOKEN); + if (null != getInstance().getToken()) { + extraDataMap.put(PRO_TOKEN, getInstance().getToken()); + } else if (null != getInstance().getUsername() && null != getInstance().getPassword()) { + extraDataMap.put(PRO_USERNAME, getInstance().getUsername()); + extraDataMap.put(PRO_PASSWORD, getInstance().getPassword()); + } + return StringUtils.map2String(extraDataMap); + } + + +} diff --git a/core/src/main/java/org/apache/seata/core/protocol/RegisterRMRequest.java b/core/src/main/java/org/apache/seata/core/protocol/RegisterRMRequest.java index d5e6c3edead..5581210ca02 100644 --- a/core/src/main/java/org/apache/seata/core/protocol/RegisterRMRequest.java +++ b/core/src/main/java/org/apache/seata/core/protocol/RegisterRMRequest.java @@ -41,6 +41,17 @@ public RegisterRMRequest(String applicationId, String transactionServiceGroup) { super(applicationId, transactionServiceGroup); } + /** + * Instantiates a new Register rm request. + * + * @param applicationId the application id + * @param transactionServiceGroup the transaction service group + * @param extraData the extra data + */ + public RegisterRMRequest(String applicationId, String transactionServiceGroup, String extraData) { + super(applicationId, transactionServiceGroup, extraData); + } + private String resourceIds; /** diff --git a/core/src/main/java/org/apache/seata/core/protocol/ResultCode.java b/core/src/main/java/org/apache/seata/core/protocol/ResultCode.java index d3338eb7f10..71e53e61a7c 100644 --- a/core/src/main/java/org/apache/seata/core/protocol/ResultCode.java +++ b/core/src/main/java/org/apache/seata/core/protocol/ResultCode.java @@ -32,7 +32,13 @@ public enum ResultCode { * Success result code. */ // Success - Success; + Success, + + /** + * Retry result code. + */ + // Retry + Retry; /** * Get result code. diff --git a/core/src/main/java/org/apache/seata/core/rpc/RegisterCheckAuthHandler.java b/core/src/main/java/org/apache/seata/core/rpc/RegisterCheckAuthHandler.java index c17cbbad1f4..ef8249a4582 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/RegisterCheckAuthHandler.java +++ b/core/src/main/java/org/apache/seata/core/rpc/RegisterCheckAuthHandler.java @@ -16,6 +16,8 @@ */ package org.apache.seata.core.rpc; +import org.apache.seata.common.exception.RetryableException; +import org.apache.seata.core.protocol.AbstractIdentifyRequest; import org.apache.seata.core.protocol.RegisterRMRequest; import org.apache.seata.core.protocol.RegisterTMRequest; @@ -31,7 +33,7 @@ public interface RegisterCheckAuthHandler { * @param request the request * @return the boolean */ - boolean regTransactionManagerCheckAuth(RegisterTMRequest request); + boolean regTransactionManagerCheckAuth(RegisterTMRequest request) throws RetryableException; /** * Reg resource manager check auth boolean. @@ -39,5 +41,11 @@ public interface RegisterCheckAuthHandler { * @param request the request * @return the boolean */ - boolean regResourceManagerCheckAuth(RegisterRMRequest request); + boolean regResourceManagerCheckAuth(RegisterRMRequest request) throws RetryableException; + + /** + * Refresh token + * @return the String + */ + String refreshAuthToken(AbstractIdentifyRequest abstractIdentifyRequest) ; } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java index 2b7b35aca00..cf22e8fc567 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java @@ -18,6 +18,7 @@ import java.lang.reflect.Field; import java.net.InetSocketAddress; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -29,6 +30,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Function; + import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandler.Sharable; @@ -43,6 +45,7 @@ import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.common.util.NetUtil; import org.apache.seata.common.util.StringUtils; +import org.apache.seata.core.auth.JwtAuthManager; import org.apache.seata.core.protocol.AbstractMessage; import org.apache.seata.core.protocol.HeartbeatMessage; import org.apache.seata.core.protocol.MergeMessage; @@ -63,15 +66,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import static org.apache.seata.common.exception.FrameworkErrorCode.NoAvailableService; /** * The netty remoting client. - * */ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNettyRemotingClient.class); + private static final String PRO_NEW_TOKEN = "newToken"; private static final String MSG_ID_PREFIX = "msgId:"; private static final String FUTURES_PREFIX = "futures:"; private static final String SINGLE_LOG_POSTFIX = ";"; @@ -91,7 +95,7 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting /** * When batch sending is enabled, the message will be stored to basketMap - * Send via asynchronous thread {@link AbstractNettyRemotingClient.MergedSendRunnable} + * Send via asynchronous thread {@link org.apache.seata.core.rpc.netty.AbstractNettyRemotingClient.MergedSendRunnable} * {@link AbstractNettyRemotingClient#isEnableClientBatchSendRequest()} */ protected final ConcurrentHashMap> basketMap = new ConcurrentHashMap<>(); @@ -100,10 +104,12 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting private final NettyPoolKey.TransactionRole transactionRole; private ExecutorService mergeSendExecutorService; private TransactionMessageHandler transactionMessageHandler; + protected JwtAuthManager jwtAuthManager = JwtAuthManager.getInstance(); protected volatile boolean enableClientBatchSendRequest; @Override public void init() { + jwtAuthManager.init(); timerExecutor.scheduleAtFixedRate(() -> { try { clientChannelManager.reconnect(getTransactionServiceGroup()); @@ -172,7 +178,7 @@ public Object sendSyncRequest(Object msg) throws TimeoutException { } catch (Exception exx) { LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), serverAddress, rpcMessage.getBody()); if (exx instanceof TimeoutException) { - throw (TimeoutException)exx; + throw (TimeoutException) exx; } else { throw new RuntimeException(exx); } @@ -295,6 +301,21 @@ protected String getXid(Object msg) { return StringUtils.isBlank(xid) ? String.valueOf(ThreadLocalRandom.current().nextLong(Long.MAX_VALUE)) : xid; } + protected String getAuthData() { + return JwtAuthManager.refreshAuthData(null); + } + + protected void refreshAuthToken(String extraData) { + if (StringUtils.isBlank(extraData)) { + return; + } + HashMap extraDataMap = StringUtils.string2Map(extraData); + String newToken = extraDataMap.get(PRO_NEW_TOKEN); + if (StringUtils.isNotBlank(newToken)) { + jwtAuthManager.refreshToken(newToken); + } + } + private String getThreadPrefix() { return AbstractNettyRemotingClient.MERGE_THREAD_PREFIX + THREAD_PREFIX_SPLIT_CHAR + transactionRole.name(); } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyPoolableFactory.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyPoolableFactory.java index 7755091eaf6..cfed1244bd7 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyPoolableFactory.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyPoolableFactory.java @@ -21,15 +21,19 @@ import io.netty.channel.Channel; import org.apache.seata.common.exception.FrameworkException; import org.apache.seata.common.util.NetUtil; +import org.apache.seata.core.auth.JwtAuthManager; +import org.apache.seata.core.protocol.AbstractIdentifyRequest; +import org.apache.seata.core.protocol.RegisterRMRequest; +import org.apache.seata.core.protocol.RegisterTMRequest; import org.apache.seata.core.protocol.RegisterRMResponse; import org.apache.seata.core.protocol.RegisterTMResponse; +import org.apache.seata.core.protocol.ResultCode; import org.apache.commons.pool.KeyedPoolableObjectFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * The type Netty key poolable factory. - * */ public class NettyPoolableFactory implements KeyedPoolableObjectFactory { @@ -64,6 +68,19 @@ public Channel makeObject(NettyPoolKey key) { } try { response = rpcRemotingClient.sendSyncRequest(tmpChannel, key.getMessage()); + if (isRegisterExpired(response, key.getTransactionRole())) { + // relogin to get token + JwtAuthManager.getInstance().refreshToken(null); + AbstractIdentifyRequest request; + if (key.getTransactionRole().equals(NettyPoolKey.TransactionRole.TMROLE)) { + request = (RegisterTMRequest) key.getMessage(); + } else { + request = (RegisterRMRequest) key.getMessage(); + } + String identifyExtraData = JwtAuthManager.refreshAuthData(request.getExtraData()); + request.setExtraData(identifyExtraData); + response = rpcRemotingClient.sendSyncRequest(tmpChannel, request); + } if (!isRegisterSuccess(response, key.getTransactionRole())) { rpcRemotingClient.onRegisterMsgFail(key.getAddress(), tmpChannel, response, key.getMessage()); } else { @@ -85,6 +102,26 @@ public Channel makeObject(NettyPoolKey key) { return channelToServer; } + private boolean isRegisterExpired(Object response, NettyPoolKey.TransactionRole transactionRole) { + if (response == null) { + return false; + } + if (transactionRole.equals(NettyPoolKey.TransactionRole.TMROLE)) { + if (!(response instanceof RegisterTMResponse)) { + return false; + } + RegisterTMResponse registerTMResponse = (RegisterTMResponse) response; + return ResultCode.Retry.equals(registerTMResponse.getResultCode()); + } else if (transactionRole.equals(NettyPoolKey.TransactionRole.RMROLE)) { + if (!(response instanceof RegisterRMResponse)) { + return false; + } + RegisterRMResponse registerRMResponse = (RegisterRMResponse) response; + return ResultCode.Retry.equals(registerRMResponse.getResultCode()); + } + return false; + } + private boolean isRegisterSuccess(Object response, NettyPoolKey.TransactionRole transactionRole) { if (response == null) { return false; @@ -93,13 +130,13 @@ private boolean isRegisterSuccess(Object response, NettyPoolKey.TransactionRole if (!(response instanceof RegisterTMResponse)) { return false; } - RegisterTMResponse registerTMResponse = (RegisterTMResponse)response; + RegisterTMResponse registerTMResponse = (RegisterTMResponse) response; return registerTMResponse.isIdentified(); } else if (transactionRole.equals(NettyPoolKey.TransactionRole.RMROLE)) { if (!(response instanceof RegisterRMResponse)) { return false; } - RegisterRMResponse registerRMResponse = (RegisterRMResponse)response; + RegisterRMResponse registerRMResponse = (RegisterRMResponse) response; return registerRMResponse.isIdentified(); } return false; diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java index 61f23687d98..0b0391bb118 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java @@ -55,7 +55,6 @@ /** * The Rm netty client. - * */ public final class RmNettyRemotingClient extends AbstractNettyRemotingClient { @@ -78,11 +77,11 @@ public void init() { // Found one or more resources that were registered before initialization if (resourceManager != null - && !resourceManager.getManagedResources().isEmpty() - && StringUtils.isNotBlank(transactionServiceGroup)) { + && !resourceManager.getManagedResources().isEmpty() + && StringUtils.isNotBlank(transactionServiceGroup)) { boolean failFast = ConfigurationFactory.getInstance().getBoolean( - ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST, - DefaultValues.DEFAULT_CLIENT_CHANNEL_CHECK_FAIL_FAST); + ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST, + DefaultValues.DEFAULT_CLIENT_CHANNEL_CHECK_FAIL_FAST); getClientChannelManager().initReconnect(transactionServiceGroup, failFast); } } @@ -93,7 +92,7 @@ private RmNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutor super(nettyClientConfig, eventExecutorGroup, messageExecutor, TransactionRole.RMROLE); // set enableClientBatchSendRequest this.enableClientBatchSendRequest = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.ENABLE_RM_CLIENT_BATCH_SEND_REQUEST, - ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.ENABLE_CLIENT_BATCH_SEND_REQUEST,DefaultValues.DEFAULT_ENABLE_RM_CLIENT_BATCH_SEND_REQUEST)); + ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.ENABLE_CLIENT_BATCH_SEND_REQUEST, DefaultValues.DEFAULT_ENABLE_RM_CLIENT_BATCH_SEND_REQUEST)); ConfigurationCache.addConfigListener(ConfigurationKeys.ENABLE_RM_CLIENT_BATCH_SEND_REQUEST, new ConfigurationChangeListener() { @Override public void onChangeEvent(ConfigurationChangeEvent event) { @@ -172,8 +171,9 @@ public void setResourceManager(ResourceManager resourceManager) { @Override public void onRegisterMsgSuccess(String serverAddress, Channel channel, Object response, AbstractMessage requestMessage) { - RegisterRMRequest registerRMRequest = (RegisterRMRequest)requestMessage; - RegisterRMResponse registerRMResponse = (RegisterRMResponse)response; + RegisterRMRequest registerRMRequest = (RegisterRMRequest) requestMessage; + RegisterRMResponse registerRMResponse = (RegisterRMResponse) response; + refreshAuthToken(registerRMResponse.getExtraData()); if (LOGGER.isInfoEnabled()) { LOGGER.info("register RM success. client version:{}, server version:{},channel:{}", registerRMRequest.getVersion(), registerRMResponse.getVersion(), channel); } @@ -190,8 +190,8 @@ public void onRegisterMsgSuccess(String serverAddress, Channel channel, Object r @Override public void onRegisterMsgFail(String serverAddress, Channel channel, Object response, AbstractMessage requestMessage) { - RegisterRMRequest registerRMRequest = (RegisterRMRequest)requestMessage; - RegisterRMResponse registerRMResponse = (RegisterRMResponse)response; + RegisterRMRequest registerRMRequest = (RegisterRMRequest) requestMessage; + RegisterRMResponse registerRMResponse = (RegisterRMResponse) response; String errMsg = String.format( "register RM failed. client version: %s,server version: %s, errorMsg: %s, " + "channel: %s", registerRMRequest.getVersion(), registerRMResponse.getVersion(), registerRMResponse.getMsg(), channel); throw new FrameworkException(errMsg); @@ -218,8 +218,8 @@ public void registerResource(String resourceGroupId, String resourceId) { if (getClientChannelManager().getChannels().isEmpty()) { boolean failFast = ConfigurationFactory.getInstance().getBoolean( - ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST, - DefaultValues.DEFAULT_CLIENT_CHANNEL_CHECK_FAIL_FAST); + ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST, + DefaultValues.DEFAULT_CLIENT_CHANNEL_CHECK_FAIL_FAST); getClientChannelManager().initReconnect(transactionServiceGroup, failFast); return; } @@ -236,7 +236,7 @@ public void registerResource(String resourceGroupId, String resourceId) { } public void sendRegisterMessage(String serverAddress, Channel channel, String resourceId) { - RegisterRMRequest message = new RegisterRMRequest(applicationId, transactionServiceGroup); + RegisterRMRequest message = new RegisterRMRequest(applicationId, transactionServiceGroup, getExtraData()); message.setResourceIds(resourceId); try { super.sendAsyncRequest(channel, message); @@ -289,12 +289,16 @@ protected Function getPoolKeyFunction() { if (resourceIds != null && LOGGER.isInfoEnabled()) { LOGGER.info("RM will register :{}", resourceIds); } - RegisterRMRequest message = new RegisterRMRequest(applicationId, transactionServiceGroup); + RegisterRMRequest message = new RegisterRMRequest(applicationId, transactionServiceGroup, getExtraData()); message.setResourceIds(resourceIds); return new NettyPoolKey(NettyPoolKey.TransactionRole.RMROLE, serverAddress, message); }; } + private String getExtraData() { + return getAuthData(); + } + @Override protected String getTransactionServiceGroup() { return transactionServiceGroup; diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java index f9efc611679..ab4d35b5e6b 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java @@ -45,6 +45,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.seata.core.constants.ConfigurationKeys.EXTRA_DATA_KV_CHAR; +import static org.apache.seata.core.constants.ConfigurationKeys.EXTRA_DATA_SPLIT_CHAR; +import static org.apache.seata.core.constants.ConfigurationKeys.SEATA_ACCESS_KEY; +import static org.apache.seata.core.constants.ConfigurationKeys.SEATA_SECRET_KEY; + /** * The rm netty client. * @@ -163,7 +168,7 @@ protected void setAccessKey(String accessKey) { this.accessKey = accessKey; return; } - this.accessKey = System.getProperty(org.apache.seata.common.ConfigurationKeys.SEATA_ACCESS_KEY); + this.accessKey = System.getProperty(SEATA_ACCESS_KEY); } /** @@ -176,7 +181,7 @@ protected void setSecretKey(String secretKey) { this.secretKey = secretKey; return; } - this.secretKey = System.getProperty(org.apache.seata.common.ConfigurationKeys.SEATA_SECRET_KEY); + this.secretKey = System.getProperty(SEATA_SECRET_KEY); } @Override @@ -211,6 +216,7 @@ public void onRegisterMsgSuccess(String serverAddress, Channel channel, Object r AbstractMessage requestMessage) { RegisterTMRequest registerTMRequest = (RegisterTMRequest) requestMessage; RegisterTMResponse registerTMResponse = (RegisterTMResponse) response; + refreshAuthToken(registerTMResponse.getExtraData()); if (LOGGER.isInfoEnabled()) { LOGGER.info("register TM success. client version:{}, server version:{},channel:{}", registerTMRequest.getVersion(), registerTMResponse.getVersion(), channel); } @@ -270,15 +276,12 @@ private String getExtraData() { } String digest = signer.sign(digestSource, secretKey); StringBuilder sb = new StringBuilder(); - sb.append(RegisterTMRequest.UDATA_AK).append(org.apache.seata.common.ConfigurationKeys.EXTRA_DATA_KV_CHAR).append(accessKey).append( - org.apache.seata.common.ConfigurationKeys.EXTRA_DATA_SPLIT_CHAR); - sb.append(RegisterTMRequest.UDATA_DIGEST).append(org.apache.seata.common.ConfigurationKeys.EXTRA_DATA_KV_CHAR).append(digest).append( - org.apache.seata.common.ConfigurationKeys.EXTRA_DATA_SPLIT_CHAR); - sb.append(RegisterTMRequest.UDATA_TIMESTAMP).append(org.apache.seata.common.ConfigurationKeys.EXTRA_DATA_KV_CHAR).append(timestamp).append( - org.apache.seata.common.ConfigurationKeys.EXTRA_DATA_SPLIT_CHAR); - sb.append(RegisterTMRequest.UDATA_AUTH_VERSION).append( - org.apache.seata.common.ConfigurationKeys.EXTRA_DATA_KV_CHAR).append(signer.getSignVersion()).append( - org.apache.seata.common.ConfigurationKeys.EXTRA_DATA_SPLIT_CHAR); + sb.append(RegisterTMRequest.UDATA_AK).append(EXTRA_DATA_KV_CHAR).append(accessKey).append(EXTRA_DATA_SPLIT_CHAR); + sb.append(RegisterTMRequest.UDATA_DIGEST).append(EXTRA_DATA_KV_CHAR).append(digest).append(EXTRA_DATA_SPLIT_CHAR); + sb.append(RegisterTMRequest.UDATA_TIMESTAMP).append(EXTRA_DATA_KV_CHAR).append(timestamp).append(EXTRA_DATA_SPLIT_CHAR); + sb.append(RegisterTMRequest.UDATA_AUTH_VERSION).append(EXTRA_DATA_KV_CHAR).append(signer.getSignVersion()).append(EXTRA_DATA_SPLIT_CHAR); + String authExtraData = getAuthData(); + sb.append(authExtraData); return sb.toString(); } diff --git a/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegRmProcessor.java b/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegRmProcessor.java index 622f30039fa..95126bbefa4 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegRmProcessor.java +++ b/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegRmProcessor.java @@ -17,12 +17,14 @@ package org.apache.seata.core.rpc.processor.server; import io.netty.channel.ChannelHandlerContext; +import org.apache.seata.common.exception.RetryableException; import org.apache.seata.common.loader.EnhancedServiceLoader; import org.apache.seata.common.util.NetUtil; import org.apache.seata.core.protocol.RegisterRMRequest; import org.apache.seata.core.protocol.RegisterRMResponse; import org.apache.seata.core.protocol.RpcMessage; import org.apache.seata.core.protocol.Version; +import org.apache.seata.core.protocol.ResultCode; import org.apache.seata.core.rpc.netty.ChannelManager; import org.apache.seata.core.rpc.RemotingServer; import org.apache.seata.core.rpc.RegisterCheckAuthHandler; @@ -62,15 +64,18 @@ private void onRegRmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) { String ipAndPort = NetUtil.toStringAddress(ctx.channel().remoteAddress()); boolean isSuccess = false; String errorInfo = StringUtils.EMPTY; + ResultCode resultCode = ResultCode.Failed; try { if (null == checkAuthHandler || checkAuthHandler.regResourceManagerCheckAuth(message)) { ChannelManager.registerRMChannel(message, ctx.channel()); Version.putChannelVersion(ctx.channel(), message.getVersion()); isSuccess = true; + resultCode = ResultCode.Success; if (LOGGER.isDebugEnabled()) { LOGGER.debug("RM checkAuth for client:{},vgroup:{},applicationId:{} is OK", ipAndPort, message.getTransactionServiceGroup(), message.getApplicationId()); } } else { + errorInfo = "RM checkAuth failed!Please check your username/password."; if (LOGGER.isWarnEnabled()) { LOGGER.warn("RM checkAuth for client:{},vgroup:{},applicationId:{} is FAIL", ipAndPort, message.getTransactionServiceGroup(), message.getApplicationId()); } @@ -79,11 +84,19 @@ private void onRegRmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) { isSuccess = false; errorInfo = exx.getMessage(); LOGGER.error("RM register fail, error message:{}", errorInfo); + if (exx instanceof RetryableException) { + resultCode = ResultCode.Retry; + } } RegisterRMResponse response = new RegisterRMResponse(isSuccess); + String newToken = checkAuthHandler.refreshAuthToken(message); + if (StringUtils.isNotBlank(newToken)) { + response.setExtraData(newToken); + } if (StringUtils.isNotEmpty(errorInfo)) { response.setMsg(errorInfo); } + response.setResultCode(resultCode); remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), response); if (isSuccess && LOGGER.isInfoEnabled()) { LOGGER.info("RM register success,message:{},channel:{},client version:{}", message, ctx.channel(), diff --git a/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegTmProcessor.java b/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegTmProcessor.java index 0afee867f8b..01dad2a5221 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegTmProcessor.java +++ b/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegTmProcessor.java @@ -17,12 +17,14 @@ package org.apache.seata.core.rpc.processor.server; import io.netty.channel.ChannelHandlerContext; +import org.apache.seata.common.exception.RetryableException; import org.apache.seata.common.loader.EnhancedServiceLoader; import org.apache.seata.common.util.NetUtil; import org.apache.seata.core.protocol.RegisterTMRequest; import org.apache.seata.core.protocol.RegisterTMResponse; -import org.apache.seata.core.protocol.RpcMessage; +import org.apache.seata.core.protocol.ResultCode; import org.apache.seata.core.protocol.Version; +import org.apache.seata.core.protocol.RpcMessage; import org.apache.seata.core.rpc.netty.ChannelManager; import org.apache.seata.core.rpc.RemotingServer; import org.apache.seata.core.rpc.RegisterCheckAuthHandler; @@ -63,30 +65,41 @@ private void onRegTmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) { Version.putChannelVersion(ctx.channel(), message.getVersion()); boolean isSuccess = false; String errorInfo = StringUtils.EMPTY; + ResultCode resultCode = ResultCode.Failed; try { if (null == checkAuthHandler || checkAuthHandler.regTransactionManagerCheckAuth(message)) { ChannelManager.registerTMChannel(message, ctx.channel()); Version.putChannelVersion(ctx.channel(), message.getVersion()); isSuccess = true; + resultCode = ResultCode.Success; if (LOGGER.isDebugEnabled()) { LOGGER.debug("TM checkAuth for client:{},vgroup:{},applicationId:{} is OK", ipAndPort, message.getTransactionServiceGroup(), message.getApplicationId()); } } else { + errorInfo = "TM checkAuth failed!Please check your username/password."; if (LOGGER.isWarnEnabled()) { LOGGER.warn("TM checkAuth for client:{},vgroup:{},applicationId:{} is FAIL", - ipAndPort, message.getTransactionServiceGroup(), message.getApplicationId()); + ipAndPort, message.getTransactionServiceGroup(), message.getApplicationId()); } } } catch (Exception exx) { isSuccess = false; errorInfo = exx.getMessage(); LOGGER.error("TM register fail, error message:{}", errorInfo); + if (exx instanceof RetryableException) { + resultCode = ResultCode.Retry; + } } RegisterTMResponse response = new RegisterTMResponse(isSuccess); + String newToken = checkAuthHandler.refreshAuthToken(message); + if (StringUtils.isNotBlank(newToken)) { + response.setExtraData(newToken); + } if (StringUtils.isNotEmpty(errorInfo)) { response.setMsg(errorInfo); } + response.setResultCode(resultCode); remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), response); if (isSuccess && LOGGER.isInfoEnabled()) { LOGGER.info("TM register success,message:{},channel:{},client version:{}", message, ctx.channel(), diff --git a/core/src/test/java/org/apache/seata/core/protocol/ResultCodeTest.java b/core/src/test/java/org/apache/seata/core/protocol/ResultCodeTest.java index b1c7ef3e741..1db37f41a59 100644 --- a/core/src/test/java/org/apache/seata/core/protocol/ResultCodeTest.java +++ b/core/src/test/java/org/apache/seata/core/protocol/ResultCodeTest.java @@ -29,8 +29,9 @@ class ResultCodeTest { void getByte() { Assertions.assertEquals(ResultCode.Failed, ResultCode.get((byte) 0)); Assertions.assertEquals(ResultCode.Success, ResultCode.get((byte) 1)); + Assertions.assertEquals(ResultCode.Retry, ResultCode.get((byte) 2)); Assertions.assertThrows(IllegalArgumentException.class, () -> { - ResultCode.get((byte) 2); + ResultCode.get((byte) 3); }); } @@ -38,14 +39,15 @@ void getByte() { void getInt() { Assertions.assertEquals(ResultCode.Failed, ResultCode.get(0)); Assertions.assertEquals(ResultCode.Success, ResultCode.get(1)); + Assertions.assertEquals(ResultCode.Retry, ResultCode.get(2)); Assertions.assertThrows(IllegalArgumentException.class, () -> { - ResultCode.get(2); + ResultCode.get(3); }); } @Test void values() { - Assertions.assertArrayEquals(new ResultCode[]{ResultCode.Failed, ResultCode.Success}, ResultCode.values()); + Assertions.assertArrayEquals(new ResultCode[]{ResultCode.Failed, ResultCode.Success,ResultCode.Retry}, ResultCode.values()); } @Test diff --git a/script/client/conf/registry.conf b/script/client/conf/registry.conf index 7bfb4c02b11..541f7ed4faf 100644 --- a/script/client/conf/registry.conf +++ b/script/client/conf/registry.conf @@ -14,7 +14,10 @@ # See the License for the specific language governing permissions and # limitations under the License. # - +security { + username = + password = +} registry { # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa、custom、raft type = "file" diff --git a/script/client/spring/application.properties b/script/client/spring/application.properties index a3d6456e37b..7d6fbf680a8 100755 --- a/script/client/spring/application.properties +++ b/script/client/spring/application.properties @@ -84,6 +84,9 @@ seata.transport.enable-rm-client-batch-send-request=true seata.transport.rpc-rm-request-timeout=15000 seata.transport.rpc-tm-request-timeout=30000 +seata.security.username= +seata.security.password= + seata.config.type=file seata.config.consul.server-addr=127.0.0.1:8500 diff --git a/script/client/spring/application.yml b/script/client/spring/application.yml index 7bb002cdf6a..0256301036c 100755 --- a/script/client/spring/application.yml +++ b/script/client/spring/application.yml @@ -27,6 +27,9 @@ seata: scan-packages: firstPackage,secondPackage excludes-for-scanning: firstBeanNameForExclude,secondBeanNameForExclude excludes-for-auto-proxying: firstClassNameForExclude,secondClassNameForExclude + security: + username: + password: client: rm: async-commit-buffer-limit: 10000 diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-client/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/SeataProperties.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-client/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/SeataProperties.java index 70775df3ba4..a0c241e68cb 100644 --- a/seata-spring-autoconfigure/seata-spring-autoconfigure-client/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/SeataProperties.java +++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-client/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/SeataProperties.java @@ -74,6 +74,16 @@ public class SeataProperties { */ private String secretKey; + /** + * the username + */ + private String username; + + /** + * the password + */ + private String password; + @Autowired private SpringCloudAlibabaConfiguration springCloudAlibabaConfiguration; @@ -180,4 +190,22 @@ public SeataProperties setSecretKey(String secretKey) { this.secretKey = secretKey; return this; } + + public String getUsername() { + return username; + } + + public SeataProperties setUsername(String username) { + this.username = username; + return this; + } + + public String getPassword() { + return password; + } + + public SeataProperties setPassword(String password) { + this.password = password; + return this; + } } diff --git a/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/protocol/AbstractIdentifyResponseCodec.java b/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/protocol/AbstractIdentifyResponseCodec.java index 56840f9e618..3706775f97b 100644 --- a/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/protocol/AbstractIdentifyResponseCodec.java +++ b/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/protocol/AbstractIdentifyResponseCodec.java @@ -23,7 +23,6 @@ /** * The type Abstract identify response. - * */ public abstract class AbstractIdentifyResponseCodec extends AbstractResultMessageCodec { @@ -34,25 +33,38 @@ public Class getMessageClassType() { @Override public void encode(T t, ByteBuf out) { - AbstractIdentifyResponse abstractIdentifyResponse = (AbstractIdentifyResponse)t; + super.encode(t,out); + AbstractIdentifyResponse abstractIdentifyResponse = (AbstractIdentifyResponse) t; boolean identified = abstractIdentifyResponse.isIdentified(); String version = abstractIdentifyResponse.getVersion(); + String extraData = abstractIdentifyResponse.getExtraData(); - out.writeByte(identified ? (byte)1 : (byte)0); + out.writeByte(identified ? (byte) 1 : (byte) 0); if (version != null) { byte[] bs = version.getBytes(UTF8); - out.writeShort((short)bs.length); + out.writeShort((short) bs.length); + if (bs.length > 0) { + out.writeBytes(bs); + } + } else { + out.writeShort((short) 0); + } + + if (extraData != null) { + byte[] bs = extraData.getBytes(UTF8); + out.writeShort((short) bs.length); if (bs.length > 0) { out.writeBytes(bs); } } else { - out.writeShort((short)0); + out.writeShort((short) 0); } } @Override public void decode(T t, ByteBuffer in) { - AbstractIdentifyResponse abstractIdentifyResponse = (AbstractIdentifyResponse)t; + super.decode(t,in); + AbstractIdentifyResponse abstractIdentifyResponse = (AbstractIdentifyResponse) t; abstractIdentifyResponse.setIdentified(in.get() == 1); short len = in.getShort(); @@ -65,6 +77,20 @@ public void decode(T t, ByteBuffer in) { byte[] bs = new byte[len]; in.get(bs); abstractIdentifyResponse.setVersion(new String(bs, UTF8)); + + //ExtraData len + if (in.remaining() < 2) { + return; + } + len = in.getShort(); + + if (in.remaining() >= len) { + bs = new byte[len]; + in.get(bs); + abstractIdentifyResponse.setExtraData(new String(bs, UTF8)); + } else { + //maybe null + } } } diff --git a/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/protocol/AbstractResultMessageCodec.java b/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/protocol/AbstractResultMessageCodec.java index 3eedf496872..6d183939a56 100644 --- a/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/protocol/AbstractResultMessageCodec.java +++ b/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/protocol/AbstractResultMessageCodec.java @@ -25,7 +25,6 @@ /** * The type Abstract result message codec. - * */ public abstract class AbstractResultMessageCodec extends AbstractMessageCodec { @@ -36,12 +35,15 @@ public Class getMessageClassType() { @Override public void encode(T t, ByteBuf out) { - AbstractResultMessage abstractResultMessage = (AbstractResultMessage)t; + AbstractResultMessage abstractResultMessage = (AbstractResultMessage) t; ResultCode resultCode = abstractResultMessage.getResultCode(); String resultMsg = abstractResultMessage.getMsg(); - - out.writeByte(resultCode.ordinal()); - if (resultCode == ResultCode.Failed) { + if (null != resultCode) { + out.writeByte(resultCode.ordinal()); + } else { + out.writeByte(ResultCode.values().length); + } + if (resultCode == ResultCode.Failed || resultCode == ResultCode.Retry) { if (StringUtils.isNotEmpty(resultMsg)) { String msg; if (resultMsg.length() > Short.MAX_VALUE) { @@ -50,21 +52,24 @@ public void encode(T t, ByteBuf out) { msg = resultMsg; } byte[] bs = msg.getBytes(UTF8); - out.writeShort((short)bs.length); + out.writeShort((short) bs.length); out.writeBytes(bs); } else { - out.writeShort((short)0); + out.writeShort((short) 0); } } } @Override public void decode(T t, ByteBuffer in) { - AbstractResultMessage abstractResultMessage = (AbstractResultMessage)t; - - ResultCode resultCode = ResultCode.get(in.get()); - abstractResultMessage.setResultCode(resultCode); - if (resultCode == ResultCode.Failed) { + AbstractResultMessage abstractResultMessage = (AbstractResultMessage) t; + ResultCode resultCode = null; + byte resultCodeOrdinal = in.get(); + if (resultCodeOrdinal < ResultCode.values().length) { + resultCode = ResultCode.get(resultCodeOrdinal); + abstractResultMessage.setResultCode(resultCode); + } + if (resultCode == ResultCode.Failed || resultCode == ResultCode.Retry) { short len = in.getShort(); if (len > 0) { byte[] msg = new byte[len]; diff --git a/serializer/seata-serializer-seata/src/test/java/org/apache/seata/serializer/seata/protocol/RegisterTMResponseSerializerTest.java b/serializer/seata-serializer-seata/src/test/java/org/apache/seata/serializer/seata/protocol/RegisterTMResponseSerializerTest.java index 349ef194941..4f22cbce9c1 100644 --- a/serializer/seata-serializer-seata/src/test/java/org/apache/seata/serializer/seata/protocol/RegisterTMResponseSerializerTest.java +++ b/serializer/seata-serializer-seata/src/test/java/org/apache/seata/serializer/seata/protocol/RegisterTMResponseSerializerTest.java @@ -57,4 +57,15 @@ public void test_codec(){ // Assert.assertEquals(registerTMResponse2.getMsg(), registerTMResponse.getMsg()); // Assert.assertEquals(registerTMResponse2.getByCode(), registerTMResponse.getByCode()); } + + @Test + public void test_codec1() { + RegisterTMResponse registerTMResponse = new RegisterTMResponse(); + registerTMResponse.setIdentified(true); + registerTMResponse.setVersion("2.1.0-SNAPSHOT"); + byte[] bytes = seataSerializer.serialize(registerTMResponse); + RegisterTMResponse registerTMResponse2 = seataSerializer.deserialize(bytes); + assertThat(registerTMResponse2.isIdentified()).isEqualTo(registerTMResponse.isIdentified()); + assertThat(registerTMResponse2.getVersion()).isEqualTo(registerTMResponse.getVersion()); + } } diff --git a/server/src/main/java/org/apache/seata/server/auth/AbstractCheckAuthHandler.java b/server/src/main/java/org/apache/seata/server/auth/AbstractCheckAuthHandler.java index 705ec2d2c9b..13926c99da2 100644 --- a/server/src/main/java/org/apache/seata/server/auth/AbstractCheckAuthHandler.java +++ b/server/src/main/java/org/apache/seata/server/auth/AbstractCheckAuthHandler.java @@ -16,38 +16,55 @@ */ package org.apache.seata.server.auth; +import org.apache.seata.common.exception.RetryableException; import org.apache.seata.config.ConfigurationFactory; import org.apache.seata.core.constants.ConfigurationKeys; +import org.apache.seata.core.protocol.AbstractIdentifyRequest; import org.apache.seata.core.protocol.RegisterRMRequest; import org.apache.seata.core.protocol.RegisterTMRequest; import org.apache.seata.core.rpc.RegisterCheckAuthHandler; +import static org.apache.seata.common.ConfigurationKeys.EXTRA_DATA_KV_CHAR; +import static org.apache.seata.common.ConfigurationKeys.EXTRA_DATA_SPLIT_CHAR; import static org.apache.seata.common.DefaultValues.DEFAULT_SERVER_ENABLE_CHECK_AUTH; /** */ public abstract class AbstractCheckAuthHandler implements RegisterCheckAuthHandler { + private static final String NEW_TOKEN = "newToken"; private static final Boolean ENABLE_CHECK_AUTH = ConfigurationFactory.getInstance().getBoolean( ConfigurationKeys.SERVER_ENABLE_CHECK_AUTH, DEFAULT_SERVER_ENABLE_CHECK_AUTH); @Override - public boolean regTransactionManagerCheckAuth(RegisterTMRequest request) { + public boolean regTransactionManagerCheckAuth(RegisterTMRequest request) throws RetryableException { if (!ENABLE_CHECK_AUTH) { return true; } return doRegTransactionManagerCheck(request); } - public abstract boolean doRegTransactionManagerCheck(RegisterTMRequest request); + public abstract boolean doRegTransactionManagerCheck(RegisterTMRequest request) throws RetryableException; @Override - public boolean regResourceManagerCheckAuth(RegisterRMRequest request) { + public boolean regResourceManagerCheckAuth(RegisterRMRequest request) throws RetryableException { if (!ENABLE_CHECK_AUTH) { return true; } return doRegResourceManagerCheck(request); } - public abstract boolean doRegResourceManagerCheck(RegisterRMRequest request); + public abstract boolean doRegResourceManagerCheck(RegisterRMRequest request) throws RetryableException; + + @Override + public String refreshAuthToken(AbstractIdentifyRequest abstractIdentifyRequest) { + if (needRefreshToken(abstractIdentifyRequest)) { + return NEW_TOKEN + EXTRA_DATA_KV_CHAR + refreshToken(abstractIdentifyRequest) + EXTRA_DATA_SPLIT_CHAR; + } + return null; + } + + public abstract boolean needRefreshToken(AbstractIdentifyRequest abstractIdentifyRequest); + + public abstract String refreshToken(AbstractIdentifyRequest abstractIdentifyRequest); } diff --git a/server/src/main/java/org/apache/seata/server/auth/DefaultCheckAuthHandler.java b/server/src/main/java/org/apache/seata/server/auth/DefaultCheckAuthHandler.java index 05329eb27ee..243617c759f 100644 --- a/server/src/main/java/org/apache/seata/server/auth/DefaultCheckAuthHandler.java +++ b/server/src/main/java/org/apache/seata/server/auth/DefaultCheckAuthHandler.java @@ -17,14 +17,19 @@ package org.apache.seata.server.auth; import org.apache.seata.common.loader.LoadLevel; +import org.apache.seata.core.protocol.AbstractIdentifyRequest; import org.apache.seata.core.protocol.RegisterRMRequest; import org.apache.seata.core.protocol.RegisterTMRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** */ -@LoadLevel(name = "defaultCheckAuthHandler", order = 100) +@LoadLevel(name = "defaultCheckAuthHandler", order = 2) public class DefaultCheckAuthHandler extends AbstractCheckAuthHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultCheckAuthHandler.class); + @Override public boolean doRegTransactionManagerCheck(RegisterTMRequest request) { return true; @@ -34,4 +39,15 @@ public boolean doRegTransactionManagerCheck(RegisterTMRequest request) { public boolean doRegResourceManagerCheck(RegisterRMRequest request) { return true; } + + @Override + public boolean needRefreshToken(AbstractIdentifyRequest abstractIdentifyRequest) { + return false; + } + + @Override + public String refreshToken(AbstractIdentifyRequest abstractIdentifyRequest) { + LOGGER.error("This method is not supported."); + return null; + } } diff --git a/server/src/main/java/org/apache/seata/server/auth/JwtCheckAuthHandler.java b/server/src/main/java/org/apache/seata/server/auth/JwtCheckAuthHandler.java new file mode 100644 index 00000000000..3665ddd11c6 --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/auth/JwtCheckAuthHandler.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.auth; + + +import io.jsonwebtoken.Claims; +import io.jsonwebtoken.Jws; +import io.jsonwebtoken.Jwts; +import io.jsonwebtoken.SignatureAlgorithm; +import io.jsonwebtoken.io.Decoders; +import io.jsonwebtoken.ExpiredJwtException; +import org.apache.seata.common.ConfigurationKeys; +import org.apache.seata.common.exception.RetryableException; +import org.apache.seata.common.loader.LoadLevel; +import org.apache.seata.common.util.StringUtils; +import org.apache.seata.config.ConfigurationFactory; +import org.apache.seata.core.auth.JwtAuthManager; +import org.apache.seata.core.protocol.AbstractIdentifyRequest; +import org.apache.seata.core.protocol.RegisterRMRequest; +import org.apache.seata.core.protocol.RegisterTMRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.crypto.spec.SecretKeySpec; +import java.util.Date; +import java.util.HashMap; + +@LoadLevel(name = "jwtCheckAuthHandler", order = 1) +public class JwtCheckAuthHandler extends AbstractCheckAuthHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(JwtCheckAuthHandler.class); + + private static final String AUTHORITIES_KEY = "auth"; + + private static final String TOKEN = "token"; + + private static final String PRO_USERNAME = "username"; + + private static final String PRO_PASSWORD = "password"; + + private JwtAuthManager authManager = JwtAuthManager.getInstance(); + + @Override + public boolean doRegTransactionManagerCheck(RegisterTMRequest request) throws RetryableException { + return checkAuthData(request.getExtraData()); + } + + @Override + public boolean doRegResourceManagerCheck(RegisterRMRequest request) throws RetryableException { + return checkAuthData(request.getExtraData()); + } + + @Override + public boolean needRefreshToken(AbstractIdentifyRequest abstractIdentifyRequest) { + try { + if (!checkAuthData(abstractIdentifyRequest.getExtraData())) { + return false; + } + } catch (RetryableException e) { + LOGGER.warn("auth failed!"); + return false; + } + + HashMap authDataMap = StringUtils.string2Map(abstractIdentifyRequest.getExtraData()); + // 1.if use username/password for authentication, need refresh token. + if (!authDataMap.containsKey(TOKEN)) { + return true; + } else { + // 2.if token will be expired, need refresh token. + try { + String accessToken = authDataMap.get(TOKEN); + String secretKey = ConfigurationFactory.getInstance().getConfig(ConfigurationKeys.SECURITY_SECRET_KEY); + String tokenValidWindow = ConfigurationFactory.getInstance().getConfig(ConfigurationKeys.SECURITY_TOKEN_VALID_TIME); + Jws claimsJws = Jwts.parser().setSigningKey(secretKey).parseClaimsJws(accessToken); + Claims claims = claimsJws.getBody(); + Date expiration = claims.getExpiration(); + if (System.currentTimeMillis() > expiration.getTime() - Long.parseLong(tokenValidWindow) / 3) { + return true; + } + } catch (Exception e) { + LOGGER.error("jwt token authentication failed: " + e); + } + } + return false; + } + + @Override + public String refreshToken(AbstractIdentifyRequest abstractIdentifyRequest) { + String subject; + String secretKey = ConfigurationFactory.getInstance().getConfig(ConfigurationKeys.SECURITY_SECRET_KEY); + String expirationMillis = ConfigurationFactory.getInstance().getConfig(ConfigurationKeys.SECURITY_TOKEN_VALID_TIME); + if (authManager.getUsername() != null) { + subject = authManager.getUsername(); + } else { + String accessToken = authManager.getToken(); + Jws claimsJws = Jwts.parser().setSigningKey(secretKey).parseClaimsJws(accessToken); + Claims claims = claimsJws.getBody(); + subject = claims.getSubject(); + } + + SecretKeySpec secretKeySpec = new SecretKeySpec(Decoders.BASE64.decode(secretKey), + SignatureAlgorithm.HS256.getJcaName()); + + return Jwts.builder() + .setSubject(subject) + .claim(AUTHORITIES_KEY, "") + .setExpiration(new Date(System.currentTimeMillis() + Long.parseLong(expirationMillis))) + .signWith(secretKeySpec, SignatureAlgorithm.HS256) + .compact(); + } + + private boolean checkAuthData(String extraData) throws RetryableException { + if (null == extraData) { + return false; + } + HashMap extraDataMap = StringUtils.string2Map(extraData); + // 1.check username/password + String username = extraDataMap.get(PRO_USERNAME); + String password = extraDataMap.get(PRO_PASSWORD); + if (null != username && null != password + && StringUtils.equals(username, ConfigurationFactory.getInstance().getConfig(ConfigurationKeys.SECURITY_USERNME)) + && StringUtils.equals(password, ConfigurationFactory.getInstance().getConfig(ConfigurationKeys.SECURITY_PASSWORD))) { + authManager.setUsername(username); + authManager.setPassword(password); + return true; + } else if (extraDataMap.get(TOKEN) != null) { + // 2.check token + try { + String accessToken = extraDataMap.get(TOKEN); + String secretKey = ConfigurationFactory.getInstance().getConfig(ConfigurationKeys.SECURITY_SECRET_KEY); + Jwts.parser().setSigningKey(secretKey).parseClaimsJws(accessToken); + authManager.setAccessToken(accessToken); + return true; + } catch (ExpiredJwtException e) { + LOGGER.warn("jwt token has been expired: " + e); + throw new RetryableException(); + } catch (Exception e) { + LOGGER.error("jwt token authentication failed: " + e); + } + } + return false; + } + +} diff --git a/server/src/main/resources/META-INF/services/org.apache.seata.core.rpc.RegisterCheckAuthHandler b/server/src/main/resources/META-INF/services/org.apache.seata.core.rpc.RegisterCheckAuthHandler index 05fdd67be41..437d44a4d50 100644 --- a/server/src/main/resources/META-INF/services/org.apache.seata.core.rpc.RegisterCheckAuthHandler +++ b/server/src/main/resources/META-INF/services/org.apache.seata.core.rpc.RegisterCheckAuthHandler @@ -1 +1,2 @@ +org.apache.seata.server.auth.JwtCheckAuthHandler org.apache.seata.server.auth.DefaultCheckAuthHandler \ No newline at end of file diff --git a/server/src/main/resources/application.yml b/server/src/main/resources/application.yml index a5a7e1f3e32..c5f64eff372 100644 --- a/server/src/main/resources/application.yml +++ b/server/src/main/resources/application.yml @@ -50,6 +50,8 @@ seata: # server: # service-port: 8091 #If not configured, the default is '${server.port} + 1000' security: + username: seata + password: seata secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017 tokenValidityInMilliseconds: 1800000 ignore: diff --git a/test/src/test/resources/registry.conf b/test/src/test/resources/registry.conf index bab6e8ec0ef..13678007b35 100644 --- a/test/src/test/resources/registry.conf +++ b/test/src/test/resources/registry.conf @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # - registry { # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa type = "file"