Skip to content

Commit

Permalink
Merge pull request #302 from paderlol/develop
Browse files Browse the repository at this point in the history
Optimize Nacos synchronization Nacos logic.
  • Loading branch information
paderlol authored Jul 30, 2022
2 parents 4082efa + 5ae4071 commit 0ffdca3
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,11 @@ private void removeInvalidInstance(TaskDO taskDO, List<Instance> sourceInstances

public abstract void register(TaskDO taskDO, Instance instance);

public abstract void deregisterInstance(TaskDO taskDO);
public abstract void deregisterInstance(TaskDO taskDO) throws Exception;

public abstract void removeInvalidInstance(TaskDO taskDO, Set<String> invalidInstanceKeys);


public NacosServerHolder getNacosServerHolder() {
return nacosServerHolder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,38 +10,25 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package com.alibaba.nacossync.extension.impl;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.common.utils.CollectionUtils;
import com.alibaba.nacossync.cache.SkyWalkerCacheServices;
import com.alibaba.nacossync.constant.ClusterTypeEnum;
import com.alibaba.nacossync.constant.MetricsStatisticsType;
import com.alibaba.nacossync.constant.SkyWalkerConstants;
import com.alibaba.nacossync.extension.SyncService;
import com.alibaba.nacossync.extension.annotation.NacosSyncService;
import com.alibaba.nacossync.extension.holder.NacosServerHolder;
import com.alibaba.nacossync.monitor.MetricsManager;
import com.alibaba.nacossync.pojo.model.TaskDO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static com.alibaba.nacossync.util.NacosUtils.getGroupNameOrDefault;

Expand All @@ -52,196 +39,59 @@

@Slf4j
@NacosSyncService(sourceCluster = ClusterTypeEnum.NACOS, destinationCluster = ClusterTypeEnum.NACOS)
public class NacosSyncToNacosServiceImpl implements SyncService {

private final Map<String, EventListener> listenerMap = new ConcurrentHashMap<>();

private final Map<String, Set<String>> sourceInstanceSnapshot = new ConcurrentHashMap<>();

private final Map<String, Integer> syncTaskTap = new ConcurrentHashMap<>();

@Autowired
private MetricsManager metricsManager;

public class NacosSyncToNacosServiceImpl extends AbstractNacosSync {


@Autowired
private SkyWalkerCacheServices skyWalkerCacheServices;

@Autowired
private NacosServerHolder nacosServerHolder;

private ConcurrentHashMap<String, TaskDO> allSyncTaskMap = new ConcurrentHashMap<String, TaskDO>();

/**
* 因为网络故障等原因,nacos sync的同步任务会失败,导致目标集群注册中心缺少同步实例, 为避免目标集群注册中心长时间缺少同步实例,每隔5分钟启动一个兜底工作线程执行一遍全部的同步任务。
*/
@PostConstruct
public void startBasicSyncTaskThread() {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("com.alibaba.nacossync.basic.synctask");
return t;
});

executorService.scheduleWithFixedDelay(() -> {
if (allSyncTaskMap.size() == 0) {
return;
}

try {
for (TaskDO taskDO : allSyncTaskMap.values()) {
String taskId = taskDO.getTaskId();
NamingService sourceNamingService =
nacosServerHolder.get(taskDO.getSourceClusterId());
NamingService destNamingService =
nacosServerHolder.get(taskDO.getDestClusterId());
try {
doSync(taskId, taskDO, sourceNamingService, destNamingService);
} catch (Exception e) {
log.error("basic synctask process fail, taskId:{}", taskId, e);
metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
}
}
} catch (Throwable e) {
log.warn("basic synctask thread error", e);
}
}, 0, 300, TimeUnit.SECONDS);
}



@Override
public boolean delete(TaskDO taskDO) {
try {
NamingService sourceNamingService =
nacosServerHolder.get(taskDO.getSourceClusterId());
NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId());
//移除订阅
sourceNamingService
.unsubscribe(taskDO.getServiceName(), getGroupNameOrDefault(taskDO.getGroupName()),
listenerMap.remove(taskDO.getTaskId()));
sourceInstanceSnapshot.remove(taskDO.getTaskId());
allSyncTaskMap.remove(taskDO.getTaskId());

// 删除目标集群中同步的实例列表
List<Instance> sourceInstances = sourceNamingService
.getAllInstances(taskDO.getServiceName(), getGroupNameOrDefault(taskDO.getGroupName()),
new ArrayList<>(), false);
for (Instance instance : sourceInstances) {
if (needSync(instance.getMetadata())) {
destNamingService
.deregisterInstance(taskDO.getServiceName(), getGroupNameOrDefault(taskDO.getGroupName()),
instance.getIp(),
instance.getPort());
}
}
} catch (Exception e) {
log.error("delete task from nacos to nacos was failed, taskId:{}", taskDO.getTaskId(), e);
metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR);
return false;
}
return true;
public String composeInstanceKey(String ip, int port) {
return ip + ":" + port;
}

@Override
public boolean sync(TaskDO taskDO) {
String taskId = taskDO.getTaskId();
public void register(TaskDO taskDO, Instance instance) {
NamingService destNamingService = getNacosServerHolder().get(taskDO.getDestClusterId());
try {
NamingService sourceNamingService =
nacosServerHolder.get(taskDO.getSourceClusterId());
NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId());
allSyncTaskMap.put(taskId, taskDO);
//防止暂停同步任务后,重新同步/或删除任务以后新建任务不会再接收到新的事件导致不能同步,所以每次订阅事件之前,先全量同步一次任务
doSync(taskId, taskDO, sourceNamingService, destNamingService);
this.listenerMap.putIfAbsent(taskId, event -> {
if (event instanceof NamingEvent) {
try {
doSync(taskId, taskDO, sourceNamingService, destNamingService);
} catch (Exception e) {
log.error("event process fail, taskId:{}", taskId, e);
metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
}
}
});
sourceNamingService.subscribe(taskDO.getServiceName(), getGroupNameOrDefault(taskDO.getGroupName()),
listenerMap.get(taskId));
} catch (Exception e) {
log.error("sync task from nacos to nacos was failed, taskId:{}", taskId, e);
metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
return false;
destNamingService.registerInstance(taskDO.getServiceName(), getGroupNameOrDefault(taskDO.getGroupName()),
buildSyncInstance(instance, taskDO));
} catch (NacosException e) {
log.error("Register instance={} to Nacos failed", taskDO.getServiceName(), e);
}
return true;
}

private void doSync(String taskId, TaskDO taskDO, NamingService sourceNamingService,
NamingService destNamingService) throws NacosException {
if (syncTaskTap.putIfAbsent(taskId, 1) != null) {
log.info("任务Id:{}上一个同步任务尚未结束", taskId);
return;
}
try {
// 直接从本地保存的serviceInfoMap中取订阅的服务实例
List<Instance> sourceInstances = sourceNamingService.getAllInstances(taskDO.getServiceName(),

@Override
public void deregisterInstance(TaskDO taskDO) throws Exception {
NamingService destNamingService = getNacosServerHolder().get(taskDO.getDestClusterId());
List<Instance> allInstances = destNamingService.getAllInstances(taskDO.getServiceName(),
getGroupNameOrDefault(taskDO.getGroupName()), new ArrayList<>(), true);
// 先删除不存在的
this.removeInvalidInstance(taskDO, destNamingService, sourceInstances);

// 同步实例
this.syncNewInstance(taskDO, destNamingService, sourceInstances);
} finally {
syncTaskTap.remove(taskId);
}
}

private void syncNewInstance(TaskDO taskDO, NamingService destNamingService,
List<Instance> sourceInstances) throws NacosException {
Set<String> latestSyncInstance = new TreeSet<>();
//再次添加新实例
String taskId = taskDO.getTaskId();
Set<String> instanceKeys = sourceInstanceSnapshot.get(taskId);
for (Instance instance : sourceInstances) {
if (needSync(instance.getMetadata())) {
String instanceKey = composeInstanceKey(instance);
if (CollectionUtils.isEmpty(instanceKeys) || !instanceKeys.contains(instanceKey)) {
destNamingService.registerInstance(taskDO.getServiceName(),
getGroupNameOrDefault(taskDO.getGroupName()),
buildSyncInstance(instance, taskDO));
}
latestSyncInstance.add(instanceKey);
for (Instance instance : allInstances) {
if (needDelete(instance.getMetadata(), taskDO)) {
destNamingService.deregisterInstance(taskDO.getServiceName(),
getGroupNameOrDefault(taskDO.getGroupName()), instance.getIp(), instance.getPort());
}
}

if (CollectionUtils.isNotEmpty(latestSyncInstance)) {
log.info("任务Id:{},已同步实例个数:{}", taskId, latestSyncInstance.size());
sourceInstanceSnapshot.put(taskId, latestSyncInstance);
} else {
// latestSyncInstance为空表示源集群中需要同步的所有实例(即非nacos-sync同步过来的实例)已经下线,清除本地持有快照
sourceInstanceSnapshot.remove(taskId);
}
}


private void removeInvalidInstance(TaskDO taskDO, NamingService destNamingService,
List<Instance> sourceInstances) throws NacosException {
String taskId = taskDO.getTaskId();
if (this.sourceInstanceSnapshot.containsKey(taskId)) {
Set<String> oldInstanceKeys = this.sourceInstanceSnapshot.get(taskId);
Set<String> newInstanceKeys = sourceInstances.stream().map(this::composeInstanceKey)
.collect(Collectors.toSet());
oldInstanceKeys.removeAll(newInstanceKeys);
for (String instanceKey : oldInstanceKeys) {
log.info("任务Id:{},移除无效同步实例:{}", taskId, instanceKey);
String[] split = instanceKey.split(":", -1);
destNamingService
.deregisterInstance(taskDO.getServiceName(),
getGroupNameOrDefault(taskDO.getGroupName()), split[0],
Integer.parseInt(split[1]));

@Override
public void removeInvalidInstance(TaskDO taskDO, Set<String> invalidInstanceKeys) {
NamingService destNamingService = getNacosServerHolder().get(taskDO.getDestClusterId());

for (String instanceKey : invalidInstanceKeys) {
String[] split = instanceKey.split(":", -1);
try {
destNamingService.deregisterInstance(taskDO.getServiceName(),
getGroupNameOrDefault(taskDO.getGroupName()), split[0], Integer.parseInt(split[1]));
} catch (NacosException e) {
log.error("Remove instance={} from Nacos failed", taskDO.getServiceName(), e);

}
}
}

private String composeInstanceKey(Instance instance) {
return instance.getIp() + ":" + instance.getPort();
}



private Instance buildSyncInstance(Instance instance, TaskDO taskDO) {
Instance temp = new Instance();
temp.setIp(instance.getIp());
Expand All @@ -252,15 +102,14 @@ private Instance buildSyncInstance(Instance instance, TaskDO taskDO) {
temp.setHealthy(instance.isHealthy());
temp.setWeight(instance.getWeight());
temp.setEphemeral(instance.isEphemeral());
Map<String, String> metaData = new HashMap<>();
metaData.putAll(instance.getMetadata());
Map<String, String> metaData = new HashMap<>(instance.getMetadata());
metaData.put(SkyWalkerConstants.DEST_CLUSTERID_KEY, taskDO.getDestClusterId());
metaData.put(SkyWalkerConstants.SYNC_SOURCE_KEY,
skyWalkerCacheServices.getClusterType(taskDO.getSourceClusterId()).getCode());
skyWalkerCacheServices.getClusterType(taskDO.getSourceClusterId()).getCode());
metaData.put(SkyWalkerConstants.SOURCE_CLUSTERID_KEY, taskDO.getSourceClusterId());
temp.setMetadata(metaData);
return temp;
}


}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<packaging>pom</packaging>
<properties>
<revision>0.4.8</revision>
<spring-boot.version>2.4.5</spring-boot.version>
<spring-boot.version>2.5.14</spring-boot.version>
<spring-cloud.version>2020.0.2</spring-cloud.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<zookeeper.version>3.4.9</zookeeper.version>
Expand Down

0 comments on commit 0ffdca3

Please sign in to comment.