Skip to content

Commit

Permalink
1.Add logic to remove ephemeral nodes upon disconnection
Browse files Browse the repository at this point in the history
2.Add corresponding test cases
3.Adjust the package locations and refactor
  • Loading branch information
MatthewAden committed Jul 29, 2024
1 parent 623e3cb commit 6f299ab
Show file tree
Hide file tree
Showing 12 changed files with 252 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class RaftRegistryProperties {
private Duration cliTimeout = Duration.ofSeconds(5);
private Duration refreshLeaderTimeout = Duration.ofSeconds(2);
private Duration connectStateCheckInterval = Duration.ofSeconds(2);
private Duration heartBeatTimeOut = Duration.ofSeconds(20);
private int subscribeListenerThreadPoolSize = 1;
private int connectionListenerThreadPoolSize = 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ public interface IRaftRegisterClient extends AutoCloseable {
* <p>
* If the key already exists, then update the value. If the key does not exist, then insert a new key-value pair.
*
* @param key the key of the register data
* @param value the value to be associated with the key
* @param deleteOnDisconnect if true, the data will be deleted when the client disconnects
* @param key the key of the register data
* @param value the value to be associated with the key
* @param deleteOnDisconnect if true, the key-value pair will be deleted when the client disconnects
*/
void putRegistryData(String key, String value, boolean deleteOnDisconnect);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
import static com.alipay.sofa.jraft.util.BytesUtil.writeUtf8;

import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.plugin.registry.raft.IRaftConnectionStateManager;
import org.apache.dolphinscheduler.plugin.registry.raft.IRaftLockManager;
import org.apache.dolphinscheduler.plugin.registry.raft.IRaftSubscribeDataManager;
import org.apache.dolphinscheduler.plugin.registry.raft.RaftConnectionStateManager;
import org.apache.dolphinscheduler.plugin.registry.raft.RaftLockManager;
import org.apache.dolphinscheduler.plugin.registry.raft.RaftRegistryProperties;
import org.apache.dolphinscheduler.plugin.registry.raft.RaftSubscribeDataManager;
import org.apache.dolphinscheduler.plugin.registry.raft.manage.IRaftConnectionStateManager;
import org.apache.dolphinscheduler.plugin.registry.raft.manage.IRaftLockManager;
import org.apache.dolphinscheduler.plugin.registry.raft.manage.IRaftSubscribeDataManager;
import org.apache.dolphinscheduler.plugin.registry.raft.manage.RaftConnectionStateManager;
import org.apache.dolphinscheduler.plugin.registry.raft.manage.RaftLockManager;
import org.apache.dolphinscheduler.plugin.registry.raft.manage.RaftSubscribeDataManager;
import org.apache.dolphinscheduler.plugin.registry.raft.model.NodeType;
import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.ConnectionState;
import org.apache.dolphinscheduler.registry.api.RegistryException;
Expand Down Expand Up @@ -116,16 +117,22 @@ public void subscribeRaftRegistryDataChange(String path, SubscribeListener liste

@Override
public String getRegistryDataByKey(String key) {
String value = readUtf8(rheaKvStore.bGet(key));
if (value == null) {
throw new RegistryException("key does not exist");
String compositeValue = readUtf8(rheaKvStore.bGet(key));
if (compositeValue == null) {
throw new RegistryException("key does not exist:" + key);
}
return value;
String[] nodeTypeAndValue = compositeValue.split("#");
if (nodeTypeAndValue.length != 2) {
throw new RegistryException("value format is incorrect for key: " + key + ", value: " + compositeValue);
}
return nodeTypeAndValue[1];
}

@Override
public void putRegistryData(String key, String value, boolean deleteOnDisconnect) {
rheaKvStore.bPut(key, writeUtf8(value));
NodeType nodeType = deleteOnDisconnect ? NodeType.EPHEMERAL : NodeType.PERSISTENT;
String compositeValue = nodeType.getName() + "#" + value;
rheaKvStore.bPut(key, writeUtf8(compositeValue));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.dolphinscheduler.plugin.registry.raft;
package org.apache.dolphinscheduler.plugin.registry.raft.manage;

import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.ConnectionState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.dolphinscheduler.plugin.registry.raft;
package org.apache.dolphinscheduler.plugin.registry.raft.manage;

/**
* Interface for managing locks in a raft registry client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.dolphinscheduler.plugin.registry.raft;
package org.apache.dolphinscheduler.plugin.registry.raft.manage;

import org.apache.dolphinscheduler.registry.api.SubscribeListener;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
* limitations under the License.
*/

package org.apache.dolphinscheduler.plugin.registry.raft;
package org.apache.dolphinscheduler.plugin.registry.raft.manage;

import org.apache.dolphinscheduler.plugin.registry.raft.RaftRegistryProperties;
import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.ConnectionState;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
* limitations under the License.
*/

package org.apache.dolphinscheduler.plugin.registry.raft;
package org.apache.dolphinscheduler.plugin.registry.raft.manage;

import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.registry.raft.RaftRegistryProperties;
import org.apache.dolphinscheduler.plugin.registry.raft.model.RaftLockEntry;

import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
* limitations under the License.
*/

package org.apache.dolphinscheduler.plugin.registry.raft;
package org.apache.dolphinscheduler.plugin.registry.raft.manage;

import static com.alipay.sofa.jraft.util.BytesUtil.readUtf8;

import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.model.BaseHeartBeat;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.registry.raft.RaftRegistryProperties;
import org.apache.dolphinscheduler.plugin.registry.raft.model.NodeItem;
import org.apache.dolphinscheduler.plugin.registry.raft.model.NodeType;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
Expand Down Expand Up @@ -76,37 +81,60 @@ public void addDataSubscribeListener(String path, SubscribeListener listener) {

private class SubscribeCheckTask implements Runnable {

private final Map<String, String> oldDataMap = new ConcurrentHashMap<>();
private final Map<String, NodeItem> oldDataMap = new ConcurrentHashMap<>();

@Override
public void run() {
final Map<String, String> newDataMap = getNodeDataMap();
if (dataSubScribeMap.isEmpty() || newDataMap.isEmpty()) {
return;
}
// find the different
final Map<String, String> addedData = new HashMap<>();
final Map<String, String> deletedData = new HashMap<>();
final Map<String, String> updatedData = new HashMap<>();
for (Map.Entry<String, String> entry : newDataMap.entrySet()) {
final String oldData = oldDataMap.get(entry.getKey());
if (oldData == null) {
addedData.put(entry.getKey(), entry.getValue());
} else {
if (!oldData.equals(entry.getValue())) {
updatedData.put(entry.getKey(), entry.getValue());
try {
final Map<String, NodeItem> newDataMap = getNodeDataMap();
if (dataSubScribeMap.isEmpty() || newDataMap.isEmpty()) {
return;
}
// find the different
final Map<String, String> addedData = new HashMap<>();
final Map<String, String> deletedData = new HashMap<>();
final Map<String, String> updatedData = new HashMap<>();
for (Map.Entry<String, NodeItem> entry : newDataMap.entrySet()) {
final NodeItem oldData = oldDataMap.get(entry.getKey());
if (oldData == null) {
addedData.put(entry.getKey(), entry.getValue().getNodeValue());
} else if (NodeType.EPHEMERAL.getName().equals(entry.getValue().getNodeType())
&& isUnHealthy(entry.getValue().getNodeValue())) {
kvStore.bDelete(entry.getKey());
newDataMap.remove(entry.getKey(), entry.getValue());
} else if (!oldData.getNodeValue().equals(entry.getValue().getNodeValue())) {
updatedData.put(entry.getKey(), entry.getValue().getNodeValue());
}
}
for (Map.Entry<String, NodeItem> entry : oldDataMap.entrySet()) {
if (!newDataMap.containsKey(entry.getKey())) {
deletedData.put(entry.getKey(), entry.getValue().getNodeValue());
}
}
oldDataMap.clear();
oldDataMap.putAll(newDataMap);
// trigger listener
triggerListener(addedData, deletedData, updatedData);
} catch (Exception ex) {
log.error("Error in SubscribeCheckTask run method", ex);
}
for (Map.Entry<String, String> entry : oldDataMap.entrySet()) {
if (!newDataMap.containsKey(entry.getKey())) {
deletedData.put(entry.getKey(), entry.getValue());
}

private boolean isUnHealthy(String heartBeat) {
try {
// consider this not a valid heartbeat instance, do not check
if (heartBeat == null || !heartBeat.contains("reportTime")) {
return false;
}
BaseHeartBeat baseHeartBeat = JSONUtils.parseObject(heartBeat, BaseHeartBeat.class);
if (baseHeartBeat != null) {
return System.currentTimeMillis() - baseHeartBeat.getReportTime() > properties.getHeartBeatTimeOut()
.toMillis();
}
} catch (Exception ex) {
log.error("Fail to parse heartBeat : {}", heartBeat, ex);
}
oldDataMap.clear();
oldDataMap.putAll(newDataMap);
// trigger listener
triggerListener(addedData, deletedData, updatedData);
return false;
}

private void triggerListener(Map<String, String> addedData, Map<String, String> deletedData,
Expand All @@ -126,20 +154,41 @@ private void triggerListener(Map<String, String> addedData, Map<String, String>
}
}

private Map<String, String> getNodeDataMap() {
final Map<String, String> dataMap = new HashMap<>();
private Map<String, NodeItem> getNodeDataMap() {
final Map<String, NodeItem> nodeItemMap = new HashMap<>();
final List<KVEntry> entryList = kvStore.bScan(RegistryNodeType.ALL_SERVERS.getRegistryPath(),
RegistryNodeType.ALL_SERVERS.getRegistryPath() + Constants.SINGLE_SLASH + Constants.RAFT_END_KEY);

for (KVEntry kvEntry : entryList) {
final String entryKey = readUtf8(kvEntry.getKey());
final String entryValue = readUtf8(kvEntry.getValue());
if (StringUtils.isEmpty(entryValue)
final String compositeValue = readUtf8(kvEntry.getValue());

if (StringUtils.isEmpty(compositeValue)
|| !entryKey.startsWith(RegistryNodeType.ALL_SERVERS.getRegistryPath())) {
continue;
}
dataMap.put(entryKey, entryValue);

String[] nodeTypeAndValue = parseCompositeValue(compositeValue);
if (nodeTypeAndValue.length < 2) {
continue;
}
String nodeType = nodeTypeAndValue[0];
String nodeValue = nodeTypeAndValue[1];

nodeItemMap.put(entryKey, NodeItem.builder().nodeValue(nodeValue).nodeType(nodeType).build());
}
return nodeItemMap;
}

private String[] parseCompositeValue(String compositeValue) {
String[] nodeTypeAndValue = compositeValue.split("#");
if (nodeTypeAndValue.length < 2) {
log.error("Invalid compositeValue: {}", compositeValue);
return new String[]{};
}
return dataMap;
String nodeType = nodeTypeAndValue[0];
String nodeValue = nodeTypeAndValue[1];
return new String[]{nodeType, nodeValue};
}

private void triggerListener(Map<String, String> nodeDataMap, String subscribeKey,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.plugin.registry.raft.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class NodeItem {

private String nodeType;
private String nodeValue;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.plugin.registry.raft.model;

import lombok.AllArgsConstructor;
import lombok.Getter;

@AllArgsConstructor
@Getter
public enum NodeType {

EPHEMERAL("ephemeralNode"),
PERSISTENT("persistentNode");
private final String name;
}
Loading

0 comments on commit 6f299ab

Please sign in to comment.