Skip to content

Commit

Permalink
Fix the abnormal startup of the raft cluster(apache#10874
Browse files Browse the repository at this point in the history
  • Loading branch information
MatthewAden committed Aug 19, 2024
1 parent e79a641 commit c638010
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import java.util.Map;
import java.util.Set;

import javax.annotation.PostConstruct;

import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -59,14 +61,22 @@ public class RegistryClient {

public RegistryClient(Registry registry) {
this.registry = registry;
if (!registry.exists(RegistryNodeType.MASTER.getRegistryPath())) {
registry.put(RegistryNodeType.MASTER.getRegistryPath(), EMPTY, false);
}
if (!registry.exists(RegistryNodeType.WORKER.getRegistryPath())) {
registry.put(RegistryNodeType.WORKER.getRegistryPath(), EMPTY, false);
}
if (!registry.exists(RegistryNodeType.ALERT_SERVER.getRegistryPath())) {
registry.put(RegistryNodeType.ALERT_SERVER.getRegistryPath(), EMPTY, false);
}

@PostConstruct
public void initializeRegistryPaths() {
try {
if (!registry.exists(RegistryNodeType.MASTER.getRegistryPath())) {
registry.put(RegistryNodeType.MASTER.getRegistryPath(), EMPTY, false);
}
if (!registry.exists(RegistryNodeType.WORKER.getRegistryPath())) {
registry.put(RegistryNodeType.WORKER.getRegistryPath(), EMPTY, false);
}
if (!registry.exists(RegistryNodeType.ALERT_SERVER.getRegistryPath())) {
registry.put(RegistryNodeType.ALERT_SERVER.getRegistryPath(), EMPTY, false);
}
} catch (Exception e) {
log.error("Failed to initialize registry paths", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import static com.google.common.base.Preconditions.checkNotNull;

import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.plugin.registry.raft.client.IRaftRegisterClient;
import org.apache.dolphinscheduler.plugin.registry.raft.client.RaftRegisterClient;
import org.apache.dolphinscheduler.plugin.registry.raft.client.IRaftRegistryClient;
import org.apache.dolphinscheduler.plugin.registry.raft.client.RaftRegistryClient;
import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.Registry;
import org.apache.dolphinscheduler.registry.api.RegistryException;
Expand All @@ -37,21 +37,19 @@
public class RaftRegistry implements Registry {

private static final long RECONNECT_WAIT_TIME_MS = 50L;
private final IRaftRegisterClient raftRegisterClient;
private final IRaftRegistryClient raftRegistryClient;
public RaftRegistry(RaftRegistryProperties raftRegistryProperties) {
this.raftRegisterClient = new RaftRegisterClient(raftRegistryProperties);
this.raftRegistryClient = new RaftRegistryClient(raftRegistryProperties);
}

@Override
public void start() {
log.info("starting raft registry...");
raftRegisterClient.start();
log.info("raft registry started successfully");
raftRegistryClient.start();
}

@Override
public boolean isConnected() {
return raftRegisterClient.isConnectivity();
return raftRegistryClient.isConnectivity();
}

@Override
Expand All @@ -61,7 +59,7 @@ public void connectUntilTimeout(@NonNull Duration timeout) throws RegistryExcept
long endTimeMillis = timeout.toMillis() > 0 ? startTimeMillis + timeout.toMillis() : Long.MAX_VALUE;

while (System.currentTimeMillis() < endTimeMillis) {
if (raftRegisterClient.isConnectivity()) {
if (raftRegistryClient.isConnectivity()) {
return;
}
}
Expand All @@ -75,68 +73,66 @@ public void connectUntilTimeout(@NonNull Duration timeout) throws RegistryExcept
public void subscribe(String path, SubscribeListener listener) {
checkNotNull(path);
checkNotNull(listener);
raftRegisterClient.subscribeRaftRegistryDataChange(path, listener);
raftRegistryClient.subscribeRaftRegistryDataChange(path, listener);
}

@Override
public void addConnectionStateListener(ConnectionListener listener) {
checkNotNull(listener);
raftRegisterClient.subscribeConnectionStateChange(listener);
raftRegistryClient.subscribeConnectionStateChange(listener);
}

@Override
public String get(String key) {
checkNotNull(key);
return raftRegisterClient.getRegistryDataByKey(key);
return raftRegistryClient.getRegistryDataByKey(key);
}

@Override
public void put(String key, String value, boolean deleteOnDisconnect) {
checkNotNull(key);
raftRegisterClient.putRegistryData(key, value, deleteOnDisconnect);
raftRegistryClient.putRegistryData(key, value, deleteOnDisconnect);
}

@Override
public void delete(String key) {
checkNotNull(key);
raftRegisterClient.deleteRegistryDataByKey(key);
raftRegistryClient.deleteRegistryDataByKey(key);
}

@Override
public Collection<String> children(String key) {
checkNotNull(key);
return raftRegisterClient.getRegistryDataChildren(key);
return raftRegistryClient.getRegistryDataChildren(key);
}

@Override
public boolean exists(String key) {
checkNotNull(key);
return raftRegisterClient.existRaftRegistryDataKey(key);
return raftRegistryClient.existRaftRegistryDataKey(key);
}

@Override
public boolean acquireLock(String key) {
checkNotNull(key);
return raftRegisterClient.acquireRaftRegistryLock(key);
return raftRegistryClient.acquireRaftRegistryLock(key);
}

@Override
public boolean acquireLock(String key, long timeout) {
checkNotNull(key);
return raftRegisterClient.acquireRaftRegistryLock(key, timeout);
return raftRegistryClient.acquireRaftRegistryLock(key, timeout);
}

@Override
public boolean releaseLock(String key) {
checkNotNull(key);
return raftRegisterClient.releaseRaftRegistryLock(key);
return raftRegistryClient.releaseRaftRegistryLock(key);
}

@Override
public void close() {
log.info("closing raft registry...");
raftRegisterClient.close();
log.info("raft registry closed");
raftRegistryClient.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.dolphinscheduler.plugin.registry.raft;

import org.apache.dolphinscheduler.plugin.registry.raft.server.RaftRegisterServer;
import org.apache.dolphinscheduler.plugin.registry.raft.server.RaftRegistryServer;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -34,37 +34,37 @@
public class RaftRegistryAutoConfiguration {

public RaftRegistryAutoConfiguration() {
log.info("Load RaftRegisterAutoConfiguration");
log.info("Load RaftRegistryAutoConfiguration");
}

@Bean
@ConditionalOnProperty(prefix = "registry", name = "module", havingValue = "master")
public RaftRegisterServer raftRegisterServer(RaftRegistryProperties raftRegistryProperties) {
RaftRegisterServer raftRegisterServer = new RaftRegisterServer(raftRegistryProperties);
raftRegisterServer.start();
return raftRegisterServer;
public RaftRegistryServer raftRegistryServer(RaftRegistryProperties raftRegistryProperties) {
RaftRegistryServer raftRegistryServer = new RaftRegistryServer(raftRegistryProperties);
raftRegistryServer.start();
return raftRegistryServer;
}

@Bean
@DependsOn("raftRegisterServer")
@DependsOn("raftRegistryServer")
@ConditionalOnProperty(prefix = "registry", name = "module", havingValue = "master")
public RaftRegistry masterRegisterClient(RaftRegistryProperties raftRegistryProperties) {
public RaftRegistry masterRaftRegistryClient(RaftRegistryProperties raftRegistryProperties) {
RaftRegistry raftRegistry = new RaftRegistry(raftRegistryProperties);
raftRegistry.start();
return raftRegistry;
}

@Bean
@ConditionalOnProperty(prefix = "registry", name = "module", havingValue = "worker")
public RaftRegistry workerRegisterClient(RaftRegistryProperties raftRegistryProperties) {
public RaftRegistry workerRaftRegistryClient(RaftRegistryProperties raftRegistryProperties) {
RaftRegistry raftRegistry = new RaftRegistry(raftRegistryProperties);
raftRegistry.start();
return raftRegistry;
}

@Bean
@ConditionalOnProperty(prefix = "registry", name = "module", havingValue = "api")
public RaftRegistry apiRegisterClient(RaftRegistryProperties raftRegistryProperties) {
public RaftRegistry apiRaftRegistryClient(RaftRegistryProperties raftRegistryProperties) {
RaftRegistry raftRegistry = new RaftRegistry(raftRegistryProperties);
raftRegistry.start();
return raftRegistry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import java.util.Collection;

public interface IRaftRegisterClient extends AutoCloseable {
public interface IRaftRegistryClient extends AutoCloseable {

/**
* Start the raft registry client. Once started, the client will connect to the raft registry server and then it can be used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import com.alipay.sofa.jraft.rhea.storage.KVEntry;

@Slf4j
public class RaftRegisterClient implements IRaftRegisterClient {
public class RaftRegistryClient implements IRaftRegistryClient {

private final RheaKVStore rheaKvStore;
private final RaftRegistryProperties raftRegistryProperties;
Expand All @@ -61,7 +61,7 @@ public class RaftRegisterClient implements IRaftRegisterClient {
private final IRaftLockManager raftLockManager;
private volatile boolean started;
private static final String MASTER_MODULE = "master";
public RaftRegisterClient(RaftRegistryProperties raftRegistryProperties) {
public RaftRegistryClient(RaftRegistryProperties raftRegistryProperties) {
this.raftRegistryProperties = raftRegistryProperties;
this.rheaKvStore = new DefaultRheaKVStore();
this.raftConnectionStateManager = new RaftConnectionStateManager(raftRegistryProperties);
Expand Down Expand Up @@ -90,14 +90,16 @@ private void initRheakv() {
@Override
public void start() {
if (this.started) {
log.info("RaftRegisterClient is already started");
log.info("RaftRegistryClient is already started");
return;
}
log.info("starting raft client registry...");
if (raftRegistryProperties.getModule().equals(MASTER_MODULE)) {
raftSubscribeDataManager.start();
}
raftConnectionStateManager.start();
this.started = true;
log.info("raft client registry started successfully");
}

@Override
Expand Down Expand Up @@ -154,25 +156,25 @@ public Collection<String> getRegistryDataChildren(String key) {
}
List<KVEntry> kvEntries = rheaKvStore.bScan(basePath + Constants.SINGLE_SLASH,
basePath + Constants.SINGLE_SLASH + Constants.RAFT_END_KEY);
return getRegisterList(kvEntries);
return getRegistryList(kvEntries);
}

@Override
public boolean existRaftRegistryDataKey(String key) {
return rheaKvStore.bContainsKey(key);
}

private Collection<String> getRegisterList(List<KVEntry> kvEntries) {
private Collection<String> getRegistryList(List<KVEntry> kvEntries) {
if (kvEntries == null || kvEntries.isEmpty()) {
return new ArrayList<>();
}
List<String> registerList = new ArrayList<>();
List<String> registryList = new ArrayList<>();
for (KVEntry kvEntry : kvEntries) {
String entryKey = readUtf8(kvEntry.getKey());
String childKey = entryKey.substring(entryKey.lastIndexOf(Constants.SINGLE_SLASH) + 1);
registerList.add(childKey);
registryList.add(childKey);
}
return registerList;
return registryList;
}

@Override
Expand All @@ -182,7 +184,7 @@ public boolean acquireRaftRegistryLock(String lockKey) {
} catch (Exception ex) {
log.error("acquire raft registry lock error", ex);
raftLockManager.releaseLock(lockKey);
throw new RegistryException("acquire raft register lock error: " + lockKey, ex);
throw new RegistryException("acquire raft registry lock error: " + lockKey, ex);
}
}

Expand All @@ -193,7 +195,7 @@ public boolean acquireRaftRegistryLock(String lockKey, long timeout) {
} catch (Exception ex) {
log.error("acquire raft registry lock error", ex);
raftLockManager.releaseLock(lockKey);
throw new RegistryException("acquire raft register lock error: " + lockKey, ex);
throw new RegistryException("acquire raft registry lock error: " + lockKey, ex);
}
}

Expand All @@ -209,11 +211,11 @@ public boolean releaseRaftRegistryLock(String lockKey) {

@Override
public void close() {
log.info("start close raft register client");
log.info("ready to close raft registry client");
if (rheaKvStore != null) {
rheaKvStore.shutdown();
}
this.started = false;
log.info("closed raft register client");
log.info("closed raft registry client");
}
}
Loading

0 comments on commit c638010

Please sign in to comment.