Skip to content

Commit

Permalink
feature: client discovers raft nodes through the naming server (#7183)
Browse files Browse the repository at this point in the history
  • Loading branch information
funky-eyes authored Mar 9, 2025
1 parent 62aa6af commit d917fcf
Show file tree
Hide file tree
Showing 19 changed files with 250 additions and 74 deletions.
2 changes: 2 additions & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#7114](https://github.com/apache/incubator-seata/pull/7114)] support raft mode registry to namingserver
- [[#7133](https://github.com/apache/incubator-seata/pull/7133)] Implement scheduled handling for end status transaction
- [[#7171](https://github.com/apache/incubator-seata/pull/7171)] support EpollEventLoopGroup in client
- [[#7183](https://github.com/apache/incubator-seata/pull/7183)] client discovers raft nodes through the naming server
- [[#7182](https://github.com/apache/incubator-seata/pull/7182)] use the ip of the peerId as the host of the raft node
- [[#7181](https://github.com/apache/incubator-seata/pull/7181)] raft implements domain name resolution and selects peerId



### bugfix:

- [[#7104](https://github.com/apache/incubator-seata/pull/7104)] fix impl of supportsSourceType is not defined
Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- [[#7114](https://github.com/apache/incubator-seata/pull/7114)] 支持raft集群注册至namingserver
- [[#7133](https://github.com/apache/incubator-seata/pull/7133)] 实现对残留的end状态事务定时处理
- [[#7171](https://github.com/apache/incubator-seata/pull/7171)] 客户端支持 EpollEventLoopGroup
- [[#7183](https://github.com/apache/incubator-seata/pull/7183)] 客户端支持通过namingserver发现raft节点
- [[#7182](https://github.com/apache/incubator-seata/pull/7182)] 采用peerId的ip作为raft节点的host
- [[#7181](https://github.com/apache/incubator-seata/pull/7181)] raft实现域名解析并选择peerId

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1116,11 +1116,6 @@ public interface ConfigurationKeys {
*/
String SERVER_RAFT_COMPRESSOR = SERVER_RAFT + "compressor";

/**
* The constant CLIENT_METADATA_MAX_AGE_MS.
*/
String CLIENT_METADATA_MAX_AGE_MS = CLIENT_PREFIX + "metadataMaxAgeMs";

/**
* The constant IS_USE_CLOUD_NAMESPACE_PARSING.
*/
Expand Down
5 changes: 5 additions & 0 deletions common/src/main/java/org/apache/seata/common/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -239,4 +239,9 @@ public interface Constants {
*/
int DEAD_LOCK_ERROR_CODE = 1213;

/**
* The constant RAFT_GROUP_HEADER
*/
String RAFT_GROUP_HEADER = "X-SEATA-RAFT-GROUP";

}
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ class GlobalLockInfo extends React.Component<GlobalProps, GlobalLockInfoState> {
cluster: selectedNamespace ? selectedNamespace.clusters[0] : undefined,
},
clusters: selectedNamespace ? selectedNamespace.clusters : [],
vgroups: selectedNamespace ? selectedNamespace.vgroups : [],
});
this.search();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ class TransactionInfo extends React.Component<GlobalProps, TransactionInfoState>
cluster: selectedNamespace ? selectedNamespace.clusters[0] : undefined,
},
clusters: selectedNamespace ? selectedNamespace.clusters : [],
vgroups: selectedNamespace ? selectedNamespace.vgroups : [],
});
this.search();
} else {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public class RaftRegistryServiceImpl implements RegistryService<ConfigChangeList

private static final String TOKEN_VALID_TIME_MS_KEY = "tokenValidityInMilliseconds";

private static final String META_DATA_MAX_AGE_MS = "metadataMaxAgeMs";

private static final long TOKEN_EXPIRE_TIME_IN_MILLISECONDS;

private static final String USERNAME;
Expand Down Expand Up @@ -175,7 +177,7 @@ protected static void startQueryMetadata() {
REFRESH_METADATA_EXECUTOR = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new NamedThreadFactory("refreshMetadata", 1, true));
REFRESH_METADATA_EXECUTOR.execute(() -> {
long metadataMaxAgeMs = CONFIG.getLong(ConfigurationKeys.CLIENT_METADATA_MAX_AGE_MS, 30000L);
long metadataMaxAgeMs = CONFIG.getLong(getMetadataMaxAgeMs(), 30000L);
long currentTime = System.currentTimeMillis();
while (!CLOSED.get()) {
try {
Expand Down Expand Up @@ -578,4 +580,9 @@ public List<InetSocketAddress> lookup(String key) throws Exception {
return Collections.emptyList();
}

private static String getMetadataMaxAgeMs() {
return String.join(ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR, ConfigurationKeys.FILE_ROOT_REGISTRY,
REGISTRY_TYPE, META_DATA_MAX_AGE_MS);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.seata.common.metadata.ClusterRole;
import org.apache.seata.common.metadata.Node;
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.common.util.StringUtils;
Expand All @@ -48,6 +49,7 @@
import org.springframework.web.client.AsyncRestTemplate;


import static org.apache.seata.common.Constants.RAFT_GROUP_HEADER;
import static org.apache.seata.namingserver.contants.NamingConstant.CONSOLE_PATTERN;

public class ConsoleRemotingFilter implements Filter {
Expand Down Expand Up @@ -80,15 +82,14 @@ public void doFilter(ServletRequest servletRequest, ServletResponse servletRespo
&& (StringUtils.isNotBlank(cluster) || StringUtils.isNotBlank(vgroup))) {
List<Node> list = null;
if (StringUtils.isNotBlank(vgroup)) {
list = namingManager.getInstancesByVgroupAndNamespace(namespace, vgroup);
list = namingManager.getInstancesByVgroupAndNamespace(namespace, vgroup, StringUtils.equalsIgnoreCase(request.getMethod(), HttpMethod.GET.name()));
} else if (StringUtils.isNotBlank(cluster)) {
list = namingManager.getInstances(namespace, cluster);
}
if (CollectionUtils.isNotEmpty(list)) {
// Randomly select a node from the list
Node node = list.get(ThreadLocalRandom.current().nextInt(list.size()));
Node.Endpoint controlEndpoint = node.getControl();

if (controlEndpoint != null) {
// Construct the target URL
String targetUrl = "http://" + controlEndpoint.getHost() + ":" + controlEndpoint.getPort()
Expand All @@ -97,6 +98,9 @@ public void doFilter(ServletRequest servletRequest, ServletResponse servletRespo

// Copy headers from the original request
HttpHeaders headers = new HttpHeaders();
if (node.getRole() == ClusterRole.LEADER) {
headers.add(RAFT_GROUP_HEADER, node.getGroup());
}
Collections.list(request.getHeaderNames())
.forEach(headerName -> headers.add(headerName, request.getHeader(headerName)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,21 +321,28 @@ public List<Cluster> getClusterListByVgroup(String vGroup, String namespace) {
}

public List<Node> getInstances(String namespace, String clusterName) {
return getInstances(namespace, clusterName, false);
}

public List<Node> getInstances(String namespace, String clusterName, boolean readOnly) {
Map<String, ClusterData> clusterDataHashMap = namespaceClusterDataMap.get(namespace);
ClusterData clusterData = clusterDataHashMap.get(clusterName);
if (clusterData == null) {
LOGGER.warn("no instances in {} : {}", namespace, clusterName);
return Collections.emptyList();
}
return clusterData.getInstanceList();
return readOnly ? clusterData.getInstanceList()
: clusterData.getInstanceList().stream()
.filter(node -> node.getRole() == ClusterRole.LEADER || node.getRole() == ClusterRole.MEMBER)
.collect(Collectors.toList());
}

public List<Node> getInstancesByVgroupAndNamespace(String namespace, String vgroup) {
public List<Node> getInstancesByVgroupAndNamespace(String namespace, String vgroup, boolean readOnly) {
List<Cluster> clusters = getClusterListByVgroup(vgroup, namespace);
if (CollectionUtils.isEmpty(clusters)) {
return Collections.emptyList();
} else {
return getInstances(namespace, clusters.get(0).getClusterName());
return getInstances(namespace, clusters.get(0).getClusterName(),readOnly);
}
}

Expand Down
3 changes: 2 additions & 1 deletion namingserver/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,6 @@ seata:
security:
secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
tokenValidityInMilliseconds: 1800000
csrf-ignore-urls: /naming/v1/**
ignore:
urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login,/version.json,/health,/error,/naming/v1/**
urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login,/version.json,/naming/v1/health,/error
10 changes: 3 additions & 7 deletions script/client/conf/registry.conf
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ registry {
server-addr = "127.0.0.1:8081"
namespace = "public"
heartbeat-period = 5000
username = "seata"
password = "seata"
tokenValidityInMilliseconds = 1740000
}
eureka {
serviceUrl = "http://localhost:8761/eureka"
Expand Down Expand Up @@ -93,13 +96,6 @@ registry {
config {
# file、nacos 、apollo、zk、consul、etcd3、springCloudConfig、custom
type = "file"
raft {
metadataAaxAgeMs = 30000
serverAddr = "127.0.0.1:7091"
username= "seata"
password= "seata"
tokenValidityInMilliseconds= "1740000"
}
nacos {
serverAddr = "127.0.0.1:8848"
namespace = ""
Expand Down
4 changes: 4 additions & 0 deletions script/client/spring/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ seata.registry.eureka.service-url=http://localhost:8761/eureka
seata.registry.seata.server-addr=127.0.0.1:8081
seata.registry.seata.namespace=public
seata.registry.seata.heartbeat-period=5000
seata.registry.seata.metadata-max-age-ms=30000
seata.registry.seata.username=seata
seata.registry.seata.password=seata
seata.registry.seata.tokenValidityInMilliseconds=1740000

seata.registry.nacos.application=seata-server
seata.registry.nacos.server-addr=127.0.0.1:8848
Expand Down
4 changes: 4 additions & 0 deletions script/client/spring/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ seata:
server-addr: 127.0.0.1:8081
namespace: public
heartbeat-period: 5000
metadata-max-age-ms: 30000
username: seata
password: seata
tokenValidityInMilliseconds: 1740000
raft:
server-addr:
metadata-max-age-ms: 30000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ public class RegistryNamingServerProperties {

private int heartbeatPeriod = 5000;

private Long metadataMaxAgeMs = 30000L;

private String username;

private String password;

private Long tokenValidityInMilliseconds = 29 * 60 * 1000L;

public String getCluster() {
return cluster;
}
Expand Down Expand Up @@ -63,4 +71,37 @@ public int getHeartbeatPeriod() {
public void setHeartbeatPeriod(int heartbeatPeriod) {
this.heartbeatPeriod = heartbeatPeriod;
}

public Long getMetadataMaxAgeMs() {
return metadataMaxAgeMs;
}

public void setMetadataMaxAgeMs(Long metadataMaxAgeMs) {
this.metadataMaxAgeMs = metadataMaxAgeMs;
}

public String getUsername() {
return username;
}

public void setUsername(String username) {
this.username = username;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}

public Long getTokenValidityInMilliseconds() {
return tokenValidityInMilliseconds;
}

public void setTokenValidityInMilliseconds(Long tokenValidityInMilliseconds) {
this.tokenValidityInMilliseconds = tokenValidityInMilliseconds;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.server.cluster.raft.context.SeataClusterContext;


import static org.apache.seata.common.Constants.RAFT_GROUP_HEADER;

public class RaftGroupFilter implements Filter {

@Override
Expand All @@ -36,6 +40,9 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha
throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest)request;
String unit = httpRequest.getParameter("unit");
if (StringUtils.isBlank(unit)) {
unit = httpRequest.getHeader(RAFT_GROUP_HEADER);
}
if (unit != null) {
SeataClusterContext.bindGroup(unit);
}
Expand Down
15 changes: 4 additions & 11 deletions server/src/main/resources/application.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ logging:
kafka-appender:
bootstrap-servers: 127.0.0.1:9092
topic: logback_to_logstash
console:
user:
username: seata
password: seata
seata:
config:
# support: nacos 、 consul 、 apollo 、 zk 、 etcd3
Expand Down Expand Up @@ -86,6 +82,10 @@ seata:
cluster: default
namespace: public
heartbeat-period: 5000
metadata-max-age-ms: 30000
username: seata
password: seata
tokenValidityInMilliseconds: 1740000
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
Expand Down Expand Up @@ -134,7 +134,6 @@ seata:
group: SEATA_GROUP
address-wait-time: 3000


server:
service-port: 8091 #If not configured, the default is '${server.port} + 1000'
max-commit-retry-timeout: -1
Expand Down Expand Up @@ -242,9 +241,3 @@ seata:
boss-thread-prefix: NettyBoss
worker-thread-prefix: NettyServerNIOWorker
boss-thread-size: 1
security:
secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
tokenValidityInMilliseconds: 1800000
csrf-ignore-urls: /metadata/v1/**
ignore:
urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login,/version.json,/health,/error
19 changes: 9 additions & 10 deletions server/src/main/resources/application.raft.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ logging:
kafka-appender:
bootstrap-servers: 127.0.0.1:9092
topic: logback_to_logstash
console:
user:
username: seata
password: seata
seata:
config:
# support: nacos 、 consul 、 apollo 、 zk 、 etcd3
Expand Down Expand Up @@ -76,6 +72,15 @@ seata:
# support: nacos 、 eureka 、 redis 、 zk 、 consul 、 etcd3 、 sofa
type: file
preferred-networks: 30.240.*
seata:
server-addr: 127.0.0.1:8081
cluster: default
namespace: public
heartbeat-period: 5000
metadata-max-age-ms: 30000
username: seata
password: seata
tokenValidityInMilliseconds: 1740000
server:
raft:
group: default
Expand Down Expand Up @@ -159,9 +164,3 @@ seata:
boss-thread-prefix: NettyBoss
worker-thread-prefix: NettyServerNIOWorker
boss-thread-size: 1
security:
secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
tokenValidityInMilliseconds: 1800000
csrf-ignore-urls: /metadata/v1/**
ignore:
urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login,/version.json,/health,/error
12 changes: 1 addition & 11 deletions server/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ logging:
bootstrap-servers: 127.0.0.1:9092
topic: logback_to_logstash

console:
user:
username: seata
password: seata
seata:
config:
# support: nacos, consul, apollo, zk, etcd3
Expand All @@ -48,10 +44,4 @@ seata:
# support: file 、 db 、 redis 、 raft
mode: file
# server:
# service-port: 8091 #If not configured, the default is '${server.port} + 1000'
security:
secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
tokenValidityInMilliseconds: 1800000
csrf-ignore-urls: /metadata/v1/**
ignore:
urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login,/version.json,/health,/error,/vgroup/v1/**
# service-port: 8091 #If not configured, the default is '${server.port} + 1000'

0 comments on commit d917fcf

Please sign in to comment.