Skip to content

Commit

Permalink
Adjust to form a raft cluster with all nodes.(#10874
Browse files Browse the repository at this point in the history
  • Loading branch information
MatthewAden committed Sep 8, 2024
1 parent fca3a1a commit 9f50e1c
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 101 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
# Introduction

This module is the RAFT consensus algorithm registry plugin module, this plugin will use raft cluster as the registry center.
The Raft registration plugin consists of two parts:
the server and the client. The Master module of DolphinScheduler will form a Raft server cluster
, while the Worker modules and API modules will interact with the Raft server using the Raft client.
This is a registration plugin implemented with the raft algorithm, and the nodes in the cluster will participate in the election as nodes of the raft

# How to use

Expand All @@ -16,7 +13,7 @@ you need to set the registry properties in master/worker/api's appplication.yml,

NOTE: In production environment, in order to achieve high availability, the master must be an odd number e.g 3 or 5.

master's properties example
properties example
```yaml
registry:
type: raft
Expand All @@ -25,33 +22,10 @@ registry:
server-address: 127.0.0.1
server-port: 8181
log-storage-dir: raft-data/
listener-check-interval: 3s
distributed-lock-timeout: 3s
distributed-lock-retry-interval: 3s
module: master
```
worker's appplication.yml example
```yaml
registry:
type: raft
cluster-name: dolphinscheduler
server-address-list: 127.0.0.1:8181,127.0.0.1:8182,127.0.0.1:8183
listener-check-interval: 3s
distributed-lock-timeout: 3s
distributed-lock-retry-interval: 3s
module: worker
```
api's appplication.yml example
```yaml
registry:
type: raft
cluster-name: dolphinscheduler
server-address-list: 127.0.0.1:8181,127.0.0.1:8182,127.0.0.1:8183
listener-check-interval: 3s
distributed-lock-timeout: 3s
distributed-lock-retry-interval: 3s
module: api
```
Among them, the "type" must be set to "raft". The "cluster-name" can be set by yourself or use the default one. The "server-address-list" should be set to all addresses of the cluster.
The "server-address" is the IP+PORT where the current application is located. The "server-port" is the port listened to by raft (note that this port is different from the port where the application starts and is specially used for communication by the raft cluster).
The "log-storage-dir" is the storage address of the logs generated by raft.
Then you can start your DolphinScheduler cluster, your cluster will use raft cluster as registry center to
store server metadata.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public RaftRegistryAutoConfiguration() {
}

@Bean
@ConditionalOnProperty(prefix = "registry", name = "module", havingValue = "master")
public RaftRegistryServer raftRegistryServer(RaftRegistryProperties raftRegistryProperties) {
RaftRegistryServer raftRegistryServer = new RaftRegistryServer(raftRegistryProperties);
raftRegistryServer.start();
Expand All @@ -47,24 +46,7 @@ public RaftRegistryServer raftRegistryServer(RaftRegistryProperties raftRegistry

@Bean
@DependsOn("raftRegistryServer")
@ConditionalOnProperty(prefix = "registry", name = "module", havingValue = "master")
public RaftRegistry masterRaftRegistryClient(RaftRegistryProperties raftRegistryProperties) {
RaftRegistry raftRegistry = new RaftRegistry(raftRegistryProperties);
raftRegistry.start();
return raftRegistry;
}

@Bean
@ConditionalOnProperty(prefix = "registry", name = "module", havingValue = "worker")
public RaftRegistry workerRaftRegistryClient(RaftRegistryProperties raftRegistryProperties) {
RaftRegistry raftRegistry = new RaftRegistry(raftRegistryProperties);
raftRegistry.start();
return raftRegistry;
}

@Bean
@ConditionalOnProperty(prefix = "registry", name = "module", havingValue = "api")
public RaftRegistry apiRaftRegistryClient(RaftRegistryProperties raftRegistryProperties) {
public RaftRegistry raftRegistryClient(RaftRegistryProperties raftRegistryProperties) {
RaftRegistry raftRegistry = new RaftRegistry(raftRegistryProperties);
raftRegistry.start();
return raftRegistry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.dolphinscheduler.plugin.registry.raft;

import java.time.Duration;

import lombok.Data;

import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
Expand All @@ -36,16 +34,5 @@ public class RaftRegistryProperties {
private String serverAddress;
private int serverPort;
private String logStorageDir;
private Duration distributedLockTimeout = Duration.ofSeconds(3);
private Duration distributedLockRetryInterval = Duration.ofSeconds(5);
private String module = "master";
private Duration listenerCheckInterval = Duration.ofSeconds(3);
private int cliMaxRetries = 3;
private Duration cliTimeout = Duration.ofSeconds(5);
private Duration refreshLeaderTimeout = Duration.ofSeconds(2);
private Duration connectStateCheckInterval = Duration.ofSeconds(2);
private Duration heartBeatTimeOut = Duration.ofSeconds(20);
private int subscribeListenerThreadPoolSize = 1;
private int connectionListenerThreadPoolSize = 1;

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,12 @@ public class RaftRegistryClient implements IRaftRegistryClient {
private final IRaftSubscribeDataManager raftSubscribeDataManager;
private final IRaftLockManager raftLockManager;
private volatile boolean started;
private static final String MASTER_MODULE = "master";
public RaftRegistryClient(RaftRegistryProperties raftRegistryProperties) {
this.raftRegistryProperties = raftRegistryProperties;
this.rheaKvStore = new DefaultRheaKVStore();
this.raftConnectionStateManager = new RaftConnectionStateManager(raftRegistryProperties);
this.raftSubscribeDataManager = new RaftSubscribeDataManager(raftRegistryProperties, rheaKvStore);
this.raftLockManager = new RaftLockManager(rheaKvStore, raftRegistryProperties);
this.raftSubscribeDataManager = new RaftSubscribeDataManager(rheaKvStore);
this.raftLockManager = new RaftLockManager(rheaKvStore);

initRheakv();
}
Expand All @@ -80,9 +79,9 @@ private void initRheakv() {
.withFake(true)
.withRegionRouteTableOptionsList(regionRouteTableOptionsList)
.config();
final RheaKVStoreOptions opts = RheaKVStoreOptionsConfigured.newConfigured() //
.withClusterName(raftRegistryProperties.getClusterName()) //
.withPlacementDriverOptions(pdOpts) //
final RheaKVStoreOptions opts = RheaKVStoreOptionsConfigured.newConfigured()
.withClusterName(raftRegistryProperties.getClusterName())
.withPlacementDriverOptions(pdOpts)
.config();
this.rheaKvStore.init(opts);
}
Expand All @@ -94,9 +93,7 @@ public void start() {
return;
}
log.info("starting raft client registry...");
if (raftRegistryProperties.getModule().equals(MASTER_MODULE)) {
raftSubscribeDataManager.start();
}
raftSubscribeDataManager.start();
raftConnectionStateManager.start();
this.started = true;
log.info("raft client registry started successfully");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.ConnectionState;

import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -39,6 +41,10 @@
public class RaftConnectionStateManager implements IRaftConnectionStateManager {

private static final String DEFAULT_REGION_ID = "--1";
private static final Duration CONNECT_STATE_CHECK_INTERVAL = Duration.ofSeconds(2);
private static final int CONNECT_STATE_REFRESH_THREAD_POOL_SIZE = 1;
private static final Duration REFRESH_LEADER_TIME_OUT = Duration.ofSeconds(2);
private static final int MAX_RANDOM_DELAY_MS = 500;
private ConnectionState currentConnectionState;
private final RaftRegistryProperties properties;
private final List<ConnectionListener> connectionListeners = new CopyOnWriteArrayList<>();
Expand All @@ -49,22 +55,28 @@ public class RaftConnectionStateManager implements IRaftConnectionStateManager {
public RaftConnectionStateManager(RaftRegistryProperties properties) {
this.properties = properties;
this.cliOptions = new CliOptions();
this.cliOptions.setMaxRetry(properties.getCliMaxRetries());
this.cliOptions.setTimeoutMs((int) properties.getCliTimeout().toMillis());
this.cliClientService = new CliClientServiceImpl();
this.scheduledExecutorService = Executors.newScheduledThreadPool(
properties.getConnectionListenerThreadPoolSize(),
CONNECT_STATE_REFRESH_THREAD_POOL_SIZE,
new ThreadFactoryBuilder().setNameFormat("ConnectionStateRefreshThread").setDaemon(true).build());
}
@Override
public void start() {
cliClientService.init(cliOptions);
scheduledExecutorService.scheduleWithFixedDelay(
new ConnectionStateRefreshTask(connectionListeners),
properties.getConnectStateCheckInterval().toMillis(),
properties.getConnectStateCheckInterval().toMillis(),
getRandomizedDelay(CONNECT_STATE_CHECK_INTERVAL.toMillis()),
getRandomizedDelay(CONNECT_STATE_CHECK_INTERVAL.toMillis()),
TimeUnit.MILLISECONDS);
}

private long getRandomizedDelay(long baseDelay) {
// Add a random value in the range [0, RANDOM_DELAY_RANGE_MS]
Random random = new Random();
long randomOffset = random.nextInt(MAX_RANDOM_DELAY_MS + 1);
return baseDelay + randomOffset;
}

@Override
public void addConnectionListener(ConnectionListener listener) {
connectionListeners.add(listener);
Expand Down Expand Up @@ -113,8 +125,7 @@ private ConnectionState getCurrentConnectionState() {
try {
String groupId = properties.getClusterName() + DEFAULT_REGION_ID;
if (RouteTable.getInstance()
.refreshLeader(cliClientService, groupId, (int) properties.getRefreshLeaderTimeout().toMillis())
.isOk()) {
.refreshLeader(cliClientService, groupId, (int) REFRESH_LEADER_TIME_OUT.toMillis()).isOk()) {
return ConnectionState.CONNECTED;
} else {
return ConnectionState.DISCONNECTED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.registry.raft.RaftRegistryProperties;
import org.apache.dolphinscheduler.plugin.registry.raft.model.RaftLockEntry;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
Expand All @@ -37,13 +37,13 @@ public class RaftLockManager implements IRaftLockManager {

private final Map<String, RaftLockEntry> distributedLockMap = new ConcurrentHashMap<>();
private final RheaKVStore rheaKvStore;
private final RaftRegistryProperties raftRegistryProperties;
private static final ScheduledExecutorService WATCH_DOG = Executors.newSingleThreadScheduledExecutor();
private static final String LOCK_OWNER_PREFIX = NetUtils.getHost() + "_" + OSUtils.getProcessID() + "_";
private static final Duration DISTRIBUTE_LOCK_TIME_OUT = Duration.ofSeconds(3);
private static final Duration DISTRIBUTE_LOCK_RETRY_INTERVAL = Duration.ofMillis(50);

public RaftLockManager(RheaKVStore rheaKVStore, RaftRegistryProperties raftRegistryProperties) {
public RaftLockManager(RheaKVStore rheaKVStore) {
this.rheaKvStore = rheaKVStore;
this.raftRegistryProperties = raftRegistryProperties;
}

@Override
Expand All @@ -54,8 +54,7 @@ public boolean acquireLock(String lockKey) {
}

final DistributedLock<byte[]> distributedLock = rheaKvStore.getDistributedLock(lockKey,
raftRegistryProperties.getDistributedLockTimeout().toMillis(), TimeUnit.MILLISECONDS, WATCH_DOG);

DISTRIBUTE_LOCK_TIME_OUT.toMillis(), TimeUnit.MILLISECONDS, WATCH_DOG);
while (true) {
if (distributedLock.tryLock()) {
distributedLockMap.put(lockKey, RaftLockEntry.builder().distributedLock(distributedLock)
Expand All @@ -64,7 +63,7 @@ public boolean acquireLock(String lockKey) {
return true;
} else {
// fail to acquire lock
ThreadUtils.sleep(raftRegistryProperties.getDistributedLockRetryInterval().toMillis());
ThreadUtils.sleep(DISTRIBUTE_LOCK_RETRY_INTERVAL.toMillis());
}
}
}
Expand All @@ -77,7 +76,7 @@ public boolean acquireLock(String lockKey, long timeout) {
}
final long endTime = System.currentTimeMillis() + timeout;
final DistributedLock<byte[]> distributedLock = rheaKvStore.getDistributedLock(lockKey,
raftRegistryProperties.getDistributedLockTimeout().toMillis(), TimeUnit.MILLISECONDS, WATCH_DOG);
DISTRIBUTE_LOCK_TIME_OUT.toMillis(), TimeUnit.MILLISECONDS, WATCH_DOG);

while (System.currentTimeMillis() < endTime) {
if (distributedLock.tryLock()) {
Expand All @@ -87,7 +86,7 @@ public boolean acquireLock(String lockKey, long timeout) {
return true;
} else {
// fail to acquire lock
ThreadUtils.sleep(raftRegistryProperties.getDistributedLockRetryInterval().toMillis());
ThreadUtils.sleep(DISTRIBUTE_LOCK_RETRY_INTERVAL.toMillis());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.model.BaseHeartBeat;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.registry.raft.RaftRegistryProperties;
import org.apache.dolphinscheduler.plugin.registry.raft.model.NodeItem;
import org.apache.dolphinscheduler.plugin.registry.raft.model.NodeType;
import org.apache.dolphinscheduler.registry.api.Event;
Expand All @@ -31,11 +30,13 @@

import org.apache.commons.lang3.StringUtils;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -50,29 +51,35 @@
public class RaftSubscribeDataManager implements IRaftSubscribeDataManager {

private final Map<String, List<SubscribeListener>> dataSubScribeMap = new ConcurrentHashMap<>();

private final RaftRegistryProperties properties;

private final RheaKVStore kvStore;

private final ScheduledExecutorService scheduledExecutorService;
private static final Duration LISTENER_CHECK_INTERVAL = Duration.ofSeconds(2);
private static final Duration HEART_BEAT_TIME_OUT = Duration.ofSeconds(20);
private static final int MAX_RANDOM_DELAY_MS = 500;
private static final int SUBSCRIBE_LISTENER_THREAD_POOL_SIZE = 1;

public RaftSubscribeDataManager(RaftRegistryProperties properties, RheaKVStore kvStore) {
this.properties = properties;
public RaftSubscribeDataManager(RheaKVStore kvStore) {
this.kvStore = kvStore;
this.scheduledExecutorService = Executors.newScheduledThreadPool(
properties.getSubscribeListenerThreadPoolSize(),
SUBSCRIBE_LISTENER_THREAD_POOL_SIZE,
new ThreadFactoryBuilder().setNameFormat("SubscribeListenerCheckThread").setDaemon(true).build());
}

@Override
public void start() {
scheduledExecutorService.scheduleWithFixedDelay(new SubscribeCheckTask(),
properties.getListenerCheckInterval().toMillis(),
properties.getListenerCheckInterval().toMillis(),
getRandomizedDelay(LISTENER_CHECK_INTERVAL.toMillis()),
getRandomizedDelay(LISTENER_CHECK_INTERVAL.toMillis()),
TimeUnit.MILLISECONDS);
}

private long getRandomizedDelay(long baseDelay) {
// Add a random value in the range [0, MAX_RANDOM_DELAY_MS]
Random random = new Random();
long randomOffset = random.nextInt(MAX_RANDOM_DELAY_MS + 1);
return baseDelay + randomOffset;
}

@Override
public void addDataSubscribeListener(String path, SubscribeListener listener) {
final List<SubscribeListener> subscribeListeners =
Expand Down Expand Up @@ -132,8 +139,7 @@ private boolean isUnHealthy(String heartBeat) {
}
BaseHeartBeat baseHeartBeat = JSONUtils.parseObject(heartBeat, BaseHeartBeat.class);
if (baseHeartBeat != null) {
return System.currentTimeMillis() - baseHeartBeat.getReportTime() > properties.getHeartBeatTimeOut()
.toMillis();
return System.currentTimeMillis() - baseHeartBeat.getReportTime() > HEART_BEAT_TIME_OUT.toMillis();
}
} catch (Exception ex) {
log.error("Fail to parse heartBeat : {}", heartBeat, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ public void start() {
}
log.info("starting raft registry server...");
this.rheaKVStore.init(this.options);
log.info("raft registry server started successfully");
this.started = true;
log.info("raft registry server started successfully");
}

public void stop() {
Expand Down

0 comments on commit 9f50e1c

Please sign in to comment.