Skip to content

Commit

Permalink
Merge branch 'master' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
paderlol authored Apr 20, 2021
2 parents e72d3a9 + 144d5f7 commit 7cb9d73
Show file tree
Hide file tree
Showing 22 changed files with 1,139 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ class ServiceSync extends React.Component {
<Table dataSource={taskModels} loading={loading}>
<Table.Column title={locale.serviceName} dataIndex="serviceName" />
<Table.Column title={locale.groupName} dataIndex="groupName" />
<Table.Column title={locale.nameSpace} dataIndex="nameSpace" />
<Table.Column
title={locale.sourceCluster}
dataIndex="sourceClusterId"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const I18N_CONF = {
nameSpace: '命名空间',
serviceName: '服务名',
groupName: '分组',
nameSpace: '命名空间',
sourceCluster: '源集群',
destCluster: '目标集群',
instancesCount: '实例数',
Expand All @@ -63,6 +64,8 @@ const I18N_CONF = {
serviceNamePlaceholder: '请输入服务名',
groupName: '分组名',
groupNamePlaceholder: '请输入分组名',
nameSpace: '命名空间',
nameSpacePlaceholder: '请输入命名空间',
sourceCluster: '源集群',
destCluster: '目标集群',
version: '版本',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.alibaba.nacossync.constant;

/**
* Created by maj on 2020/11/18.
*/
public enum ShardingLogTypeEnum {

ADD("add", "新增"),

DELETE("DELETE", "删除");

private String type;
private String desc;

ShardingLogTypeEnum(String type, String desc) {
this.type = type;
this.desc = desc;
}

public String getType() {
return type;
}

public void setType(String type) {
this.type = type;
}

public String getDesc() {
return desc;
}

public void setDesc(String desc) {
this.desc = desc;
}

public static boolean contains(String type) {

for (ShardingLogTypeEnum shardingLogTypeEnum : ShardingLogTypeEnum.values()) {

if (shardingLogTypeEnum.getType().equals(type)) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,22 @@
*/
package com.alibaba.nacossync.extension;

import static com.alibaba.nacossync.util.SkyWalkerUtil.generateSyncKey;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacossync.cache.SkyWalkerCacheServices;
import com.alibaba.nacossync.constant.ClusterTypeEnum;
import com.alibaba.nacossync.extension.annotation.NacosSyncService;
import com.alibaba.nacossync.pojo.model.TaskDO;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Service;

import java.util.concurrent.ConcurrentHashMap;

import static com.alibaba.nacossync.util.SkyWalkerUtil.generateSyncKey;

/**
* @author NacosSync
* @version $Id: SyncManagerService.java, v 0.1 2018-09-25 PM5:17 NacosSync Exp $$
Expand All @@ -42,20 +43,17 @@ public class SyncManagerService implements InitializingBean, ApplicationContextA
private ApplicationContext applicationContext;

public SyncManagerService(
SkyWalkerCacheServices skyWalkerCacheServices) {
SkyWalkerCacheServices skyWalkerCacheServices) {
this.skyWalkerCacheServices = skyWalkerCacheServices;
}

public boolean delete(TaskDO taskDO) throws NacosException {

return getSyncService(taskDO.getSourceClusterId(), taskDO.getDestClusterId()).delete(taskDO);

}

public boolean sync(TaskDO taskDO) {

return getSyncService(taskDO.getSourceClusterId(), taskDO.getDestClusterId()).sync(taskDO);

}

@Override
Expand All @@ -80,5 +78,4 @@ public SyncService getSyncService(String sourceClusterId, String destClusterId)

return syncServiceMap.get(generateSyncKey(sourceClusterType, destClusterType));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
*/
package com.alibaba.nacossync.extension.impl;

import static com.alibaba.nacossync.util.StringUtils.convertDubboFullPathForZk;
import static com.alibaba.nacossync.util.StringUtils.convertDubboProvidersPath;

import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
Expand All @@ -28,17 +25,13 @@
import com.alibaba.nacossync.extension.annotation.NacosSyncService;
import com.alibaba.nacossync.extension.holder.NacosServerHolder;
import com.alibaba.nacossync.extension.holder.ZookeeperServerHolder;
import com.alibaba.nacossync.extension.impl.extend.NacosSyncToZookeeperServicesSharding;
import com.alibaba.nacossync.extension.impl.extend.Sharding;
import com.alibaba.nacossync.monitor.MetricsManager;
import com.alibaba.nacossync.pojo.model.TaskDO;
import com.alibaba.nacossync.util.DubboConstants;
import com.alibaba.nacossync.util.ExpirySet;
import com.google.common.collect.Sets;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
Expand All @@ -47,6 +40,16 @@
import org.apache.zookeeper.CreateMode;
import org.springframework.beans.factory.annotation.Autowired;

import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static com.alibaba.nacossync.util.StringUtils.convertDubboFullPathForZk;
import static com.alibaba.nacossync.util.StringUtils.convertDubboProvidersPath;

/**
* Nacos 同步 Zk 数据
*
Expand Down Expand Up @@ -90,9 +93,16 @@ public class NacosSyncToZookeeperServiceImpl implements SyncService {

private final ZookeeperServerHolder zookeeperServerHolder;

private static ExpirySet<String> serviceNameSet = new ExpirySet<String>();

private static ExecutorService EXECUTOR = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

@Resource(type = NacosSyncToZookeeperServicesSharding.class)
private Sharding sharding;

@Autowired
public NacosSyncToZookeeperServiceImpl(SkyWalkerCacheServices skyWalkerCacheServices,
NacosServerHolder nacosServerHolder, ZookeeperServerHolder zookeeperServerHolder) {
NacosServerHolder nacosServerHolder, ZookeeperServerHolder zookeeperServerHolder) {
this.skyWalkerCacheServices = skyWalkerCacheServices;
this.nacosServerHolder = nacosServerHolder;
this.zookeeperServerHolder = zookeeperServerHolder;
Expand All @@ -101,9 +111,9 @@ public NacosSyncToZookeeperServiceImpl(SkyWalkerCacheServices skyWalkerCacheServ
@Override
public boolean delete(TaskDO taskDO) {
try {

NamingService sourceNamingService =
nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getGroupName());
//nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getGroupName());//
nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getNameSpace());//fix with no nameSpaceName
EventListener eventListener = nacosListenerMap.remove(taskDO.getTaskId());
PathChildrenCache pathChildrenCache = pathChildrenCacheMap.get(taskDO.getTaskId());
sourceNamingService.unsubscribe(taskDO.getServiceName(), eventListener);
Expand All @@ -113,6 +123,7 @@ public boolean delete(TaskDO taskDO) {
for (String instanceUrl : instanceUrlSet) {
client.delete().quietly().forPath(instanceUrl);
}
sharding.stop(taskDO);
} catch (Exception e) {
log.error("delete task from nacos to zk was failed, taskId:{}", taskDO.getTaskId(), e);
metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR);
Expand All @@ -123,37 +134,7 @@ public boolean delete(TaskDO taskDO) {

@Override
public boolean sync(TaskDO taskDO) {
try {
NamingService sourceNamingService =
nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getGroupName());
CuratorFramework client = zookeeperServerHolder.get(taskDO.getDestClusterId(), taskDO.getGroupName());
nacosListenerMap.putIfAbsent(taskDO.getTaskId(), event -> {
if (event instanceof NamingEvent) {
try {

List<Instance> sourceInstances = sourceNamingService.getAllInstances(taskDO.getServiceName());
Set<String> newInstanceUrlSet = getWaitingToAddInstance(taskDO, client, sourceInstances);

// 获取之前的备份 删除无效实例
deleteInvalidInstances(taskDO, client, newInstanceUrlSet);
// 替换当前备份为最新备份
instanceBackupMap.put(taskDO.getTaskId(), newInstanceUrlSet);
// 尝试恢复因为zk客户端意外断开导致的实例数据
tryToCompensate(taskDO, sourceNamingService, sourceInstances);
} catch (Exception e) {
log.error("event process fail, taskId:{}", taskDO.getTaskId(), e);
metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);

}
}
});

sourceNamingService.subscribe(taskDO.getServiceName(), nacosListenerMap.get(taskDO.getTaskId()));
} catch (Exception e) {
log.error("sync task from nacos to zk was failed, taskId:{}", taskDO.getTaskId(), e);
metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
return false;
}
sharding.start(taskDO);
return true;
}

Expand All @@ -164,13 +145,13 @@ private void tryToCompensate(TaskDO taskDO, NamingService sourceNamingService, L
pathCache.getListenable().addListener((zkClient, zkEvent) -> {
if (zkEvent.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) {
List<Instance> allInstances =
sourceNamingService.getAllInstances(taskDO.getServiceName());
sourceNamingService.getAllInstances(taskDO.getServiceName());
for (Instance instance : allInstances) {
String instanceUrl = buildSyncInstance(instance, taskDO);
String zkInstancePath = zkEvent.getData().getPath();
if (zkInstancePath.equals(instanceUrl)) {
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
.forPath(zkInstancePath);
.forPath(zkInstancePath);
break;
}
}
Expand All @@ -182,9 +163,9 @@ private void tryToCompensate(TaskDO taskDO, NamingService sourceNamingService, L
}

private void deleteInvalidInstances(TaskDO taskDO, CuratorFramework client, Set<String> newInstanceUrlSet)
throws Exception {
throws Exception {
Set<String> instanceBackup =
instanceBackupMap.getOrDefault(taskDO.getTaskId(), Sets.newHashSet());
instanceBackupMap.getOrDefault(taskDO.getTaskId(), Sets.newHashSet());
for (String instanceUrl : instanceBackup) {
if (newInstanceUrlSet.contains(instanceUrl)) {
continue;
Expand All @@ -194,15 +175,17 @@ private void deleteInvalidInstances(TaskDO taskDO, CuratorFramework client, Set<
}

private HashSet<String> getWaitingToAddInstance(TaskDO taskDO, CuratorFramework client,
List<Instance> sourceInstances) throws Exception {
List<Instance> sourceInstances) throws Exception {
HashSet<String> waitingToAddInstance = new HashSet<>();
for (Instance instance : sourceInstances) {
if (needSync(instance.getMetadata())) {
log.info("nacos->zk ,real sync service :{},and instance :{}", instance.getServiceName(), instance.getIp());
String instanceUrl = buildSyncInstance(instance, taskDO);
if (null == client.checkExists().forPath(instanceUrl)) {
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
.forPath(instanceUrl);
if (null != client.checkExists().forPath(instanceUrl)) {
client.delete().quietly().forPath(instanceUrl);
}
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
.forPath(instanceUrl);
waitingToAddInstance.add(instanceUrl);
}
}
Expand All @@ -214,11 +197,11 @@ protected String buildSyncInstance(Instance instance, TaskDO taskDO) throws Unsu
metaData.putAll(instance.getMetadata());
metaData.put(SkyWalkerConstants.DEST_CLUSTERID_KEY, taskDO.getDestClusterId());
metaData.put(SkyWalkerConstants.SYNC_SOURCE_KEY,
skyWalkerCacheServices.getClusterType(taskDO.getSourceClusterId()).getCode());
skyWalkerCacheServices.getClusterType(taskDO.getSourceClusterId()).getCode());
metaData.put(SkyWalkerConstants.SOURCE_CLUSTERID_KEY, taskDO.getSourceClusterId());

String servicePath = monitorPath.computeIfAbsent(taskDO.getTaskId(),
key -> convertDubboProvidersPath(metaData.get(DubboConstants.INTERFACE_KEY)));
key -> convertDubboProvidersPath(metaData.get(DubboConstants.INTERFACE_KEY)));

return convertDubboFullPathForZk(metaData, servicePath, instance.getIp(), instance.getPort());
}
Expand All @@ -234,7 +217,7 @@ private PathChildrenCache getPathCache(TaskDO taskDO) {
return pathChildrenCacheMap.computeIfAbsent(taskDO.getTaskId(), (key) -> {
try {
PathChildrenCache pathChildrenCache = new PathChildrenCache(
zookeeperServerHolder.get(taskDO.getDestClusterId(), ""), monitorPath.get(key), false);
zookeeperServerHolder.get(taskDO.getDestClusterId(), ""), monitorPath.get(key), false);
pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
return pathChildrenCache;
} catch (Exception e) {
Expand All @@ -246,4 +229,74 @@ private PathChildrenCache getPathCache(TaskDO taskDO) {
}


private class SyncThread implements Runnable {

NamingService sourceNamingService;

TaskDO taskDO;

CuratorFramework client;

SyncThread(NamingService sourceNamingService, TaskDO taskDO, CuratorFramework client) {
this.sourceNamingService = sourceNamingService;
this.taskDO = taskDO;
this.client = client;
}

@Override
public void run() {
try {

//List<Instance> sourceInstances = sourceNamingService.getAllInstances(taskDO.getServiceName());
List<Instance> sourceInstances = sourceNamingService.getAllInstances(taskDO.getServiceName(), taskDO.getGroupName());//fix with no group
Set<String> newInstanceUrlSet = getWaitingToAddInstance(taskDO, client, sourceInstances);

// 获取之前的备份 删除无效实例
deleteInvalidInstances(taskDO, client, newInstanceUrlSet);
// 替换当前备份为最新备份
instanceBackupMap.put(taskDO.getTaskId(), newInstanceUrlSet);
// 尝试恢复因为zk客户端意外断开导致的实例数据
tryToCompensate(taskDO, sourceNamingService, filterNeedSync(sourceInstances));
} catch (Exception e) {
log.error("event process fail, taskId:{}", taskDO.getTaskId(), e);
metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);

} finally {
//serviceNameSet.remove(((NamingEvent) event).getServiceName());//如果考虑高实时性 可以手动remove 这样时间窗口的大小就不固定 依赖处理速度 窗口大小作为兜底
}
}
}

private List<Instance> filterNeedSync(List<Instance> sourceInstances) {
Iterator<Instance> iterator = sourceInstances.iterator();
while (iterator.hasNext()) {
if (!needSync(iterator.next().getMetadata())) {
iterator.remove();
}
}
return sourceInstances;
}

public boolean addSyncService(TaskDO taskDO) {
try {
NamingService sourceNamingService =
//nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getGroupName());
nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getNameSpace());//fix with no nameSpaceName
CuratorFramework client = zookeeperServerHolder.get(taskDO.getDestClusterId(), taskDO.getGroupName());
nacosListenerMap.putIfAbsent(taskDO.getTaskId(), event -> {
if (event instanceof NamingEvent) {
if (serviceNameSet.set(((NamingEvent) event).getServiceName())) {// add event merge
EXECUTOR.execute(new SyncThread(sourceNamingService, taskDO, client));
}
}
});
sourceNamingService.subscribe(taskDO.getServiceName(), nacosListenerMap.get(taskDO.getTaskId()));
} catch (Exception e) {
log.error("sync task from nacos to zk was failed, taskId:{}", taskDO.getTaskId(), e);
metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
return false;
}
return true;
}

}
Loading

0 comments on commit 7cb9d73

Please sign in to comment.