From a1d683a7c9df6e55c0876172bb23bb3cc1f78daf Mon Sep 17 00:00:00 2001 From: paderlol Date: Mon, 15 May 2023 17:33:26 +0800 Subject: [PATCH] 0.4.9-pre (#325) * Feat/sync support2.x#mutiple thread sync02 (#304) * update port * Multithreading sync * solve conflict * imple SyncService * adapter deregister * optimization some code * fix deregister instance equals logic Co-authored-by: Oliver Co-authored-by: paderlol * Optimize the code for assigning tasks. (#320) * Develop (#321) * Optimize the code for assigning tasks. * Adds prefix to the input string if it doesn't already have it.#308 * Fix #305 (#322) * Optimize the code for assigning tasks. * Adds prefix to the input string if it doesn't already have it.#308 * Fix .#305 * Fix cyclic dependency code (#323) * Optimize the code for assigning tasks. * Adds prefix to the input string if it doesn't already have it.#308 * Fix .#305 * Fix cyclic dependency code. * Refactoring the Nacos Sync to Consul Logic (#324) * Optimize the code for assigning tasks. * Adds prefix to the input string if it doesn't already have it.#308 * Fix .#305 * Fix cyclic dependency code. * Refactoring the Nacos Sync to Consul Logic. --------- Co-authored-by: chenhao26 <35129699+chenhao26-nineteen@users.noreply.github.com> Co-authored-by: Oliver --- nacossync-distribution/bin/nacosSync.sql | 3 + nacossync-worker/pom.xml | 12 + .../cache/SkyWalkerCacheServices.java | 15 + .../constant/SkyWalkerConstants.java | 2 + .../nacossync/dao/ClusterAccessService.java | 8 + .../nacossync/dao/TaskAccessService.java | 4 + .../dao/repository/TaskRepository.java | 7 + .../event/listener/EventListener.java | 4 +- .../extension/SyncManagerService.java | 4 +- .../nacossync/extension/SyncService.java | 7 +- .../holder/AbstractServerHolderImpl.java | 3 +- .../extension/holder/EurekaServerHolder.java | 22 +- .../extension/holder/NacosServerHolder.java | 72 +++- .../extension/impl/AbstractNacosSync.java | 18 +- .../impl/ConsulSyncToNacosServiceImpl.java | 4 +- .../impl/EurekaSyncToNacosServiceImpl.java | 4 +- .../impl/NacosSyncToConsulServiceImpl.java | 161 +++---- .../impl/NacosSyncToNacosServiceImpl.java | 400 ++++++++++++++++-- .../impl/NacosSyncToZookeeperServiceImpl.java | 2 +- .../impl/ZookeeperSyncToNacosServiceImpl.java | 169 ++++---- .../nacossync/pojo/model/ClusterDO.java | 4 +- .../alibaba/nacossync/pojo/model/TaskDO.java | 5 + .../template/processor/TaskAddProcessor.java | 2 +- .../processor/TaskUpdateProcessor.java | 14 +- .../timer/CheckRunningStatusAllThread.java | 126 ++++++ .../nacossync/timer/FastSyncHelper.java | 169 ++++++++ .../nacossync/timer/QuerySyncTaskTimer.java | 16 +- .../com/alibaba/nacossync/util/Tuple.java | 207 +++++++++ .../src/main/resources/application.properties | 1 - .../ConsulSyncToNacosServiceImplTest.java | 4 +- .../EurekaSyncToNacosServiceImplTest.java | 4 +- .../impl/NacosSyncToNacosServiceImplTest.java | 20 +- .../NacosSyncToZookeeperServiceImplTest.java | 4 +- .../ZookeeperSyncToNacosServiceImplTest.java | 6 +- .../nacossync/timer/FastSyncHelperTest.java | 90 ++++ pom.xml | 6 +- 36 files changed, 1337 insertions(+), 262 deletions(-) create mode 100644 nacossync-worker/src/main/java/com/alibaba/nacossync/timer/CheckRunningStatusAllThread.java create mode 100644 nacossync-worker/src/main/java/com/alibaba/nacossync/timer/FastSyncHelper.java create mode 100644 nacossync-worker/src/main/java/com/alibaba/nacossync/util/Tuple.java create mode 100644 nacossync-worker/src/test/java/com/alibaba/nacossync/timer/FastSyncHelperTest.java diff --git a/nacossync-distribution/bin/nacosSync.sql b/nacossync-distribution/bin/nacosSync.sql index 6ce15180..84b15e74 100644 --- a/nacossync-distribution/bin/nacosSync.sql +++ b/nacossync-distribution/bin/nacosSync.sql @@ -10,6 +10,8 @@ CREATE TABLE `cluster` ( `connect_key_list` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL, `user_name` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL, `password` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL, + `namespace` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL, + `cluster_level` int default 0, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; /******************************************/ @@ -39,5 +41,6 @@ CREATE TABLE `task` ( `task_status` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL, `version` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL, `worker_ip` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL, + `status` int default null , PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; diff --git a/nacossync-worker/pom.xml b/nacossync-worker/pom.xml index 315f3698..2a553059 100644 --- a/nacossync-worker/pom.xml +++ b/nacossync-worker/pom.xml @@ -71,6 +71,10 @@ io.springfox springfox-swagger-ui + + org.springframework.boot + spring-boot-starter-test + com.alibaba.nacos @@ -161,6 +165,14 @@ + + org.apache.maven.plugins + maven-compiler-plugin + + 9 + 9 + + diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/cache/SkyWalkerCacheServices.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/cache/SkyWalkerCacheServices.java index 17ceab28..7f4a40fd 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/cache/SkyWalkerCacheServices.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/cache/SkyWalkerCacheServices.java @@ -96,10 +96,25 @@ public FinishedTask getFinishedTask(TaskDO taskDO) { return finishedTaskMap.get(operationId); } + + public FinishedTask getFinishedTask(String operationId) { + if (StringUtils.isEmpty(operationId)) { + return null; + } + return finishedTaskMap.get(operationId); + } + + public FinishedTask removeFinishedTask(String operationId) { + if (StringUtils.isEmpty(operationId)) { + return null; + } + return finishedTaskMap.remove(operationId); + } public Map getFinishedTaskMap() { return finishedTaskMap; } + } diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/constant/SkyWalkerConstants.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/constant/SkyWalkerConstants.java index 1b0ba30a..0379e73e 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/constant/SkyWalkerConstants.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/constant/SkyWalkerConstants.java @@ -35,5 +35,7 @@ public class SkyWalkerConstants { public static final String GROUP_NAME_PARAM="groupNameParam"; public static final String PAGE_NO="pageNo"; public static final String PAGE_SIZE="pageSize"; + public static final String SYNC_INSTANCE_TAG="sync.instance.tag"; + public static final Integer MAX_THREAD_NUM = 200; } diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/dao/ClusterAccessService.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/dao/ClusterAccessService.java index c3246577..2e11ca33 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/dao/ClusterAccessService.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/dao/ClusterAccessService.java @@ -103,4 +103,12 @@ private List getPredicates(Root root, CriteriaBuilder crit predicates.add(criteriaBuilder.like(root.get("clusterName"), "%" + queryCondition.getServiceName() + "%")); return predicates; } + + public int findClusterLevel(String sourceClusterId){ + ClusterDO clusterDO = clusterRepository.findByClusterId(sourceClusterId); + if (clusterDO != null) { + return clusterDO.getClusterLevel(); + } + return -1; + } } diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/dao/TaskAccessService.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/dao/TaskAccessService.java index 3742c698..649eaf41 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/dao/TaskAccessService.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/dao/TaskAccessService.java @@ -114,5 +114,9 @@ private Page getTaskDOS(QueryCondition queryCondition, Pageable pageable }, pageable); } + + public List findServiceNameIsNull() { + return taskRepository.findAllByServiceNameEquals("ALL"); + } } diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/dao/repository/TaskRepository.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/dao/repository/TaskRepository.java index af8f787f..fed44c45 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/dao/repository/TaskRepository.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/dao/repository/TaskRepository.java @@ -41,5 +41,12 @@ public interface TaskRepository extends CrudRepository, JpaRepo List findAllByTaskIdIn(List taskIds); List getAllByWorkerIp(String workerIp); + + /** + * query service is all,use ns leven sync data + * @param serviceName + * @return + */ + List findAllByServiceNameEquals(String serviceName); } diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/event/listener/EventListener.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/event/listener/EventListener.java index e0b3889e..40b3c12a 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/event/listener/EventListener.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/event/listener/EventListener.java @@ -62,7 +62,7 @@ public void listenerSyncTaskEvent(SyncTaskEvent syncTaskEvent) { try { long start = System.currentTimeMillis(); - if (syncManagerService.sync(syncTaskEvent.getTaskDO())) { + if (syncManagerService.sync(syncTaskEvent.getTaskDO(), null)) { skyWalkerCacheServices.addFinishedTask(syncTaskEvent.getTaskDO()); metricsManager.record(MetricsStatisticsType.SYNC_TASK_RT, System.currentTimeMillis() - start); } else { @@ -88,7 +88,5 @@ public void listenerDeleteTaskEvent(DeleteTaskEvent deleteTaskEvent) { } catch (Exception e) { log.warn("listenerDeleteTaskEvent process error", e); } - } - } diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/SyncManagerService.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/SyncManagerService.java index 0d6ebc4f..252ee6e9 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/SyncManagerService.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/SyncManagerService.java @@ -52,9 +52,9 @@ public boolean delete(TaskDO taskDO) throws NacosException { } - public boolean sync(TaskDO taskDO) { + public boolean sync(TaskDO taskDO, Integer index) { - return getSyncService(taskDO.getSourceClusterId(), taskDO.getDestClusterId()).sync(taskDO); + return getSyncService(taskDO.getSourceClusterId(), taskDO.getDestClusterId()).sync(taskDO, index); } diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/SyncService.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/SyncService.java index e9f1f09e..44f72fd3 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/SyncService.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/SyncService.java @@ -35,15 +35,18 @@ public interface SyncService { * execute sync * * @param taskDO + * @param index * @return */ - boolean sync(TaskDO taskDO); + boolean sync(TaskDO taskDO, Integer index); /** * Determines that the current instance data is from another source cluster */ default boolean needSync(Map sourceMetaData) { - return StringUtils.isBlank(sourceMetaData.get(SkyWalkerConstants.SOURCE_CLUSTERID_KEY)); + boolean syncTag = StringUtils.isBlank(sourceMetaData.get(SkyWalkerConstants.SYNC_INSTANCE_TAG)); + boolean blank = StringUtils.isBlank(sourceMetaData.get(SkyWalkerConstants.SOURCE_CLUSTERID_KEY)); + return syncTag && blank; } /** diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/AbstractServerHolderImpl.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/AbstractServerHolderImpl.java index af477cd5..dfde9e4d 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/AbstractServerHolderImpl.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/AbstractServerHolderImpl.java @@ -26,7 +26,8 @@ @Slf4j public abstract class AbstractServerHolderImpl implements Holder { - private final Map serviceMap = new ConcurrentHashMap<>(); + protected final Map serviceMap = new ConcurrentHashMap<>(); + @Autowired protected SkyWalkerCacheServices skyWalkerCacheServices; diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/EurekaServerHolder.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/EurekaServerHolder.java index 81a4b4bf..96a763b6 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/EurekaServerHolder.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/EurekaServerHolder.java @@ -10,6 +10,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ + package com.alibaba.nacossync.extension.holder; import com.alibaba.nacossync.extension.eureka.EurekaNamingService; @@ -30,12 +31,27 @@ @Service @Slf4j public class EurekaServerHolder extends AbstractServerHolderImpl { + + private static final String HTTP_PREFIX = "http://"; + + private static final String HTTPS_PREFIX = "https://"; + @Override EurekaNamingService createServer(String clusterId, Supplier serverAddressSupplier) throws Exception { - RestTemplateTransportClientFactory restTemplateTransportClientFactory = - new RestTemplateTransportClientFactory(); - EurekaEndpoint eurekaEndpoint = new DefaultEndpoint(serverAddressSupplier.get()); + RestTemplateTransportClientFactory restTemplateTransportClientFactory = new RestTemplateTransportClientFactory(); + EurekaEndpoint eurekaEndpoint = new DefaultEndpoint(addHttpPrefix(serverAddressSupplier.get())); EurekaHttpClient eurekaHttpClient = restTemplateTransportClientFactory.newClient(eurekaEndpoint); return new EurekaNamingService(eurekaHttpClient); } + + public String addHttpPrefix(String input) { + if (input == null || input.isEmpty()) { + return input; + } + if (!input.startsWith(HTTP_PREFIX) && !input.startsWith(HTTPS_PREFIX)) { + input = HTTP_PREFIX + input; + } + + return input; + } } diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/NacosServerHolder.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/NacosServerHolder.java index 49d5932a..dda88a53 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/NacosServerHolder.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/NacosServerHolder.java @@ -13,17 +13,23 @@ package com.alibaba.nacossync.extension.holder; import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.naming.NamingFactory; import com.alibaba.nacos.api.naming.NamingService; +import com.alibaba.nacossync.constant.SkyWalkerConstants; import com.alibaba.nacossync.dao.ClusterAccessService; +import com.alibaba.nacossync.dao.TaskAccessService; import com.alibaba.nacossync.pojo.model.ClusterDO; +import com.alibaba.nacossync.pojo.model.TaskDO; import com.google.common.base.Joiner; import java.util.List; import java.util.Optional; import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; import org.apache.logging.log4j.util.Strings; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** @@ -35,17 +41,30 @@ public class NacosServerHolder extends AbstractServerHolderImpl { private final ClusterAccessService clusterAccessService; + + private final TaskAccessService taskAccessService; + + private static ConcurrentHashMap globalNameService = new ConcurrentHashMap<>(16); - public NacosServerHolder(ClusterAccessService clusterAccessService) { + public NacosServerHolder(ClusterAccessService clusterAccessService, TaskAccessService taskAccessService) { this.clusterAccessService = clusterAccessService; + this.taskAccessService = taskAccessService; } @Override NamingService createServer(String clusterId, Supplier serverAddressSupplier) throws Exception { + String newClusterId; + if (clusterId.contains(":")) { + String[] split = clusterId.split(":"); + newClusterId = split[1]; + } else { + newClusterId = clusterId; + } + //代表此时为组合key,确定target集群中的nameService是不同的 List allClusterConnectKey = skyWalkerCacheServices - .getAllClusterConnectKey(clusterId); - ClusterDO clusterDO = clusterAccessService.findByClusterId(clusterId); + .getAllClusterConnectKey(newClusterId); + ClusterDO clusterDO = clusterAccessService.findByClusterId(newClusterId); String serverList = Joiner.on(",").join(allClusterConnectKey); Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverList); @@ -58,6 +77,51 @@ NamingService createServer(String clusterId, Supplier serverAddressSuppl Optional.ofNullable(clusterDO.getPassword()).ifPresent(value -> properties.setProperty(PropertyKeyConst.PASSWORD, value) ); - return NamingFactory.createNamingService(properties); + NamingService namingService = NamingFactory.createNamingService(properties); + globalNameService.put(clusterId,namingService); + return namingService; + } + + /** + * Get NamingService for different clients + * @param clusterId clusterId + * @return Returns Naming Service objects for different clusters + */ + public NamingService getNameService(String clusterId){ + return globalNameService.get(clusterId); + } + + public NamingService getSourceNamingService(String taskId, String sourceClusterId) { + String key = taskId + sourceClusterId; + return serviceMap.computeIfAbsent(key, k->{ + try { + log.info("Starting create source cluster server, key={}", key); + //代表此时为组合key,确定target集群中的nameService是不同的 + List allClusterConnectKey = skyWalkerCacheServices + .getAllClusterConnectKey(sourceClusterId); + ClusterDO clusterDO = clusterAccessService.findByClusterId(sourceClusterId); + TaskDO task = taskAccessService.findByTaskId(taskId); + String serverList = Joiner.on(",").join(allClusterConnectKey); + Properties properties = new Properties(); + properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverList); + properties.setProperty(PropertyKeyConst.NAMESPACE, Optional.ofNullable(clusterDO.getNamespace()).orElse( + Strings.EMPTY)); + Optional.ofNullable(clusterDO.getUserName()).ifPresent(value -> + properties.setProperty(PropertyKeyConst.USERNAME, value) + ); + + Optional.ofNullable(clusterDO.getPassword()).ifPresent(value -> + properties.setProperty(PropertyKeyConst.PASSWORD, value) + ); + properties.setProperty(SkyWalkerConstants.SOURCE_CLUSTERID_KEY,task.getSourceClusterId()); + properties.setProperty(SkyWalkerConstants.DEST_CLUSTERID_KEY,task.getDestClusterId()); + return NamingFactory.createNamingService(properties); + }catch (NacosException e) { + log.error("start source server fail,taskId:{},sourceClusterId:{}" + , taskId, sourceClusterId, e); + return null; + } + }); + } } diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/AbstractNacosSync.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/AbstractNacosSync.java index bd8aab7c..9e4bda19 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/AbstractNacosSync.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/AbstractNacosSync.java @@ -101,7 +101,7 @@ public boolean delete(TaskDO taskDO) { } @Override - public boolean sync(TaskDO taskDO) { + public boolean sync(TaskDO taskDO, Integer index) { String taskId = taskDO.getTaskId(); try { NamingService sourceNamingService = nacosServerHolder.get(taskDO.getSourceClusterId()); @@ -128,7 +128,7 @@ public boolean sync(TaskDO taskDO) { return true; } - private void doSync(String taskId, TaskDO taskDO, NamingService sourceNamingService) throws NacosException { + private void doSync(String taskId, TaskDO taskDO, NamingService sourceNamingService) throws Exception { if (syncTaskTap.putIfAbsent(taskId, 1) != null) { log.info("任务Id:{}上一个同步任务尚未结束", taskId); return; @@ -172,7 +172,7 @@ private void syncNewInstance(TaskDO taskDO, List sourceInstances) thro } - private void removeInvalidInstance(TaskDO taskDO, List sourceInstances) throws NacosException { + private void removeInvalidInstance(TaskDO taskDO, List sourceInstances) throws Exception { String taskId = taskDO.getTaskId(); if (this.sourceInstanceSnapshot.containsKey(taskId)) { Set oldInstanceKeys = this.sourceInstanceSnapshot.get(taskId); @@ -187,13 +187,23 @@ private void removeInvalidInstance(TaskDO taskDO, List sourceInstances } } + @Override + public boolean needDelete(Map destMetaData, TaskDO taskDO) { + return SyncService.super.needDelete(destMetaData, taskDO); + } + + @Override + public boolean needSync(Map sourceMetaData) { + return SyncService.super.needSync(sourceMetaData); + } + public abstract String composeInstanceKey(String ip, int port); public abstract void register(TaskDO taskDO, Instance instance); public abstract void deregisterInstance(TaskDO taskDO) throws Exception; - public abstract void removeInvalidInstance(TaskDO taskDO, Set invalidInstanceKeys); + public abstract void removeInvalidInstance(TaskDO taskDO, Set invalidInstanceKeys) throws Exception; public NacosServerHolder getNacosServerHolder() { return nacosServerHolder; diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/ConsulSyncToNacosServiceImpl.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/ConsulSyncToNacosServiceImpl.java index 3db72cd8..1f8e3b59 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/ConsulSyncToNacosServiceImpl.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/ConsulSyncToNacosServiceImpl.java @@ -96,7 +96,7 @@ public boolean delete(TaskDO taskDO) { } @Override - public boolean sync(TaskDO taskDO) { + public boolean sync(TaskDO taskDO, Integer index) { try { ConsulClient consulClient = consulServerHolder.get(taskDO.getSourceClusterId()); NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId()); @@ -106,7 +106,7 @@ public boolean sync(TaskDO taskDO) { Set instanceKeys = new HashSet<>(); overrideAllInstance(taskDO, destNamingService, healthServiceList, instanceKeys); cleanAllOldInstance(taskDO, destNamingService, instanceKeys); - specialSyncEventBus.subscribe(taskDO, this::sync); + specialSyncEventBus.subscribe(taskDO, t->sync(t, index)); } catch (Exception e) { log.error("Sync task from consul to nacos was failed, taskId:{}", taskDO.getTaskId(), e); metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR); diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/EurekaSyncToNacosServiceImpl.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/EurekaSyncToNacosServiceImpl.java index 1aab27bc..e2b4c634 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/EurekaSyncToNacosServiceImpl.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/EurekaSyncToNacosServiceImpl.java @@ -87,7 +87,7 @@ public boolean delete(TaskDO taskDO) { } @Override - public boolean sync(TaskDO taskDO) { + public boolean sync(TaskDO taskDO,Integer index) { try { EurekaNamingService eurekaNamingService = eurekaServerHolder.get(taskDO.getSourceClusterId()); @@ -107,7 +107,7 @@ public boolean sync(TaskDO taskDO) { } addValidInstance(taskDO, destNamingService, eurekaInstances); } - specialSyncEventBus.subscribe(taskDO, this::sync); + specialSyncEventBus.subscribe(taskDO, t->sync(t, index)); } catch (Exception e) { log.error("sync task from eureka to nacos was failed, taskId:{}", taskDO.getTaskId(), e); metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR); diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/NacosSyncToConsulServiceImpl.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/NacosSyncToConsulServiceImpl.java index f8aa86f5..2dd70ca8 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/NacosSyncToConsulServiceImpl.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/NacosSyncToConsulServiceImpl.java @@ -10,150 +10,95 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ + package com.alibaba.nacossync.extension.impl; -import com.alibaba.nacos.api.naming.NamingService; -import com.alibaba.nacos.api.naming.listener.EventListener; -import com.alibaba.nacos.api.naming.listener.NamingEvent; import com.alibaba.nacos.api.naming.pojo.Instance; import com.alibaba.nacossync.cache.SkyWalkerCacheServices; import com.alibaba.nacossync.constant.ClusterTypeEnum; -import com.alibaba.nacossync.constant.MetricsStatisticsType; import com.alibaba.nacossync.constant.SkyWalkerConstants; -import com.alibaba.nacossync.extension.SyncService; import com.alibaba.nacossync.extension.annotation.NacosSyncService; import com.alibaba.nacossync.extension.holder.ConsulServerHolder; -import com.alibaba.nacossync.extension.holder.NacosServerHolder; -import com.alibaba.nacossync.monitor.MetricsManager; import com.alibaba.nacossync.pojo.model.TaskDO; import com.alibaba.nacossync.util.ConsulUtils; -import com.alibaba.nacossync.util.NacosUtils; import com.ecwid.consul.v1.ConsulClient; import com.ecwid.consul.v1.QueryParams; import com.ecwid.consul.v1.Response; import com.ecwid.consul.v1.agent.model.NewService; import com.ecwid.consul.v1.health.model.HealthService; import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; + +import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; -import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -import lombok.extern.slf4j.Slf4j; /** * @author zhanglong */ @Slf4j @NacosSyncService(sourceCluster = ClusterTypeEnum.NACOS, destinationCluster = ClusterTypeEnum.CONSUL) -public class NacosSyncToConsulServiceImpl implements SyncService { - - private Map nacosListenerMap = new ConcurrentHashMap<>(); - - private final MetricsManager metricsManager; - +public class NacosSyncToConsulServiceImpl extends AbstractNacosSync { + + private final SkyWalkerCacheServices skyWalkerCacheServices; - - private final NacosServerHolder nacosServerHolder; + private final ConsulServerHolder consulServerHolder; - - public NacosSyncToConsulServiceImpl(MetricsManager metricsManager, SkyWalkerCacheServices skyWalkerCacheServices, - NacosServerHolder nacosServerHolder, ConsulServerHolder consulServerHolder) { - this.metricsManager = metricsManager; + + public NacosSyncToConsulServiceImpl(SkyWalkerCacheServices skyWalkerCacheServices, + ConsulServerHolder consulServerHolder) { this.skyWalkerCacheServices = skyWalkerCacheServices; - this.nacosServerHolder = nacosServerHolder; this.consulServerHolder = consulServerHolder; } - + + @Override - public boolean delete(TaskDO taskDO) { - try { - - NamingService sourceNamingService = - nacosServerHolder.get(taskDO.getSourceClusterId()); - ConsulClient consulClient = consulServerHolder.get(taskDO.getDestClusterId()); - - sourceNamingService.unsubscribe(taskDO.getServiceName(), - NacosUtils.getGroupNameOrDefault(taskDO.getGroupName()), nacosListenerMap.get(taskDO.getTaskId())); - - // 删除目标集群中同步的实例列表 - Response> serviceResponse = - consulClient.getHealthServices(taskDO.getServiceName(), true, QueryParams.DEFAULT); - List healthServices = serviceResponse.getValue(); - for (HealthService healthService : healthServices) { - - if (needDelete(ConsulUtils.transferMetadata(healthService.getService().getTags()), taskDO)) { - consulClient.agentServiceDeregister(URLEncoder - .encode(healthService.getService().getId(), StandardCharsets.UTF_8.name())); - } + public String composeInstanceKey(String ip, int port) { + return ip + ":" + port; + } + + @Override + public void register(TaskDO taskDO, Instance instance) { + ConsulClient consulClient = consulServerHolder.get(taskDO.getDestClusterId()); + consulClient.agentServiceRegister(buildSyncInstance(instance, taskDO)); + } + + @Override + public void deregisterInstance(TaskDO taskDO) throws Exception { + ConsulClient consulClient = consulServerHolder.get(taskDO.getDestClusterId()); + Response> serviceResponse = consulClient.getHealthServices(taskDO.getServiceName(), true, + QueryParams.DEFAULT); + List healthServices = serviceResponse.getValue(); + for (HealthService healthService : healthServices) { + + if (needDelete(ConsulUtils.transferMetadata(healthService.getService().getTags()), taskDO)) { + consulClient.agentServiceDeregister( + URLEncoder.encode(healthService.getService().getId(), StandardCharsets.UTF_8.name())); } - } catch (Exception e) { - log.error("delete a task from nacos to nacos was failed, taskId:{}", taskDO.getTaskId(), e); - metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR); - return false; } - return true; } - + @Override - public boolean sync(TaskDO taskDO) { - try { - NamingService sourceNamingService = - nacosServerHolder.get(taskDO.getSourceClusterId()); - ConsulClient consulClient = consulServerHolder.get(taskDO.getDestClusterId()); - - nacosListenerMap.putIfAbsent(taskDO.getTaskId(), event -> { - if (event instanceof NamingEvent) { - try { - Set instanceKeySet = new HashSet<>(); - List sourceInstances = sourceNamingService.getAllInstances(taskDO.getServiceName(), - NacosUtils.getGroupNameOrDefault(taskDO.getGroupName())); - // 先将新的注册一遍 - for (Instance instance : sourceInstances) { - if (needSync(instance.getMetadata())) { - consulClient.agentServiceRegister(buildSyncInstance(instance, taskDO)); - instance.getInstanceId(); - instanceKeySet.add(composeInstanceKey(instance.getIp(), instance.getPort())); - } - } - - // 再将不存在的删掉 - Response> serviceResponse = - consulClient.getHealthServices(taskDO.getServiceName(), true, QueryParams.DEFAULT); - List healthServices = serviceResponse.getValue(); - for (HealthService healthService : healthServices) { - - if (needDelete(ConsulUtils.transferMetadata(healthService.getService().getTags()), taskDO) - && !instanceKeySet.contains(composeInstanceKey(healthService.getService().getAddress(), - healthService.getService().getPort()))) { - consulClient.agentServiceDeregister(URLEncoder - .encode(healthService.getService().getId(), StandardCharsets.UTF_8.toString())); - } - } - } catch (Exception e) { - log.error("event process fail, taskId:{}", taskDO.getTaskId(), e); - metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR); - } - } - }); - - sourceNamingService.subscribe(taskDO.getServiceName(), - NacosUtils.getGroupNameOrDefault(taskDO.getGroupName()), nacosListenerMap.get(taskDO.getTaskId())); - } catch (Exception e) { - log.error("sync task from nacos to nacos was failed, taskId:{}", taskDO.getTaskId(), e); - metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR); - return false; + public void removeInvalidInstance(TaskDO taskDO, Set invalidInstanceKeys) + throws UnsupportedEncodingException { + ConsulClient consulClient = consulServerHolder.get(taskDO.getDestClusterId()); + Response> serviceResponse = consulClient.getHealthServices(taskDO.getServiceName(), true, + QueryParams.DEFAULT); + List healthServices = serviceResponse.getValue(); + for (HealthService healthService : healthServices) { + + if (needDelete(ConsulUtils.transferMetadata(healthService.getService().getTags()), taskDO) + && !invalidInstanceKeys.contains(composeInstanceKey(healthService.getService().getAddress(), + healthService.getService().getPort()))) { + consulClient.agentServiceDeregister( + URLEncoder.encode(healthService.getService().getId(), StandardCharsets.UTF_8.toString())); + } } - return true; } - - private String composeInstanceKey(String ip, int port) { - return ip + ":" + port; - } - + public NewService buildSyncInstance(Instance instance, TaskDO taskDO) { NewService newService = new NewService(); newService.setAddress(instance.getIp()); @@ -162,13 +107,13 @@ public NewService buildSyncInstance(Instance instance, TaskDO taskDO) { newService.setId(instance.getInstanceId()); List tags = Lists.newArrayList(); tags.addAll(instance.getMetadata().entrySet().stream() - .map(entry -> String.join("=", entry.getKey(), entry.getValue())).collect(Collectors.toList())); + .map(entry -> String.join("=", entry.getKey(), entry.getValue())).collect(Collectors.toList())); tags.add(String.join("=", SkyWalkerConstants.DEST_CLUSTERID_KEY, taskDO.getDestClusterId())); tags.add(String.join("=", SkyWalkerConstants.SYNC_SOURCE_KEY, - skyWalkerCacheServices.getClusterType(taskDO.getSourceClusterId()).getCode())); + skyWalkerCacheServices.getClusterType(taskDO.getSourceClusterId()).getCode())); tags.add(String.join("=", SkyWalkerConstants.SOURCE_CLUSTERID_KEY, taskDO.getSourceClusterId())); newService.setTags(tags); return newService; } - + } diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/NacosSyncToNacosServiceImpl.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/NacosSyncToNacosServiceImpl.java index 4cc3b830..89eced92 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/NacosSyncToNacosServiceImpl.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/NacosSyncToNacosServiceImpl.java @@ -15,21 +15,42 @@ import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.naming.NamingService; +import com.alibaba.nacos.api.naming.listener.EventListener; +import com.alibaba.nacos.api.naming.listener.NamingEvent; import com.alibaba.nacos.api.naming.pojo.Instance; +import com.alibaba.nacos.api.naming.pojo.ListView; +import com.alibaba.nacos.common.utils.CollectionUtils; +import com.alibaba.nacos.common.utils.ConcurrentHashSet; import com.alibaba.nacossync.cache.SkyWalkerCacheServices; import com.alibaba.nacossync.constant.ClusterTypeEnum; +import com.alibaba.nacossync.constant.MetricsStatisticsType; import com.alibaba.nacossync.constant.SkyWalkerConstants; +import com.alibaba.nacossync.dao.ClusterAccessService; +import com.alibaba.nacossync.extension.SyncService; import com.alibaba.nacossync.extension.annotation.NacosSyncService; +import com.alibaba.nacossync.extension.holder.NacosServerHolder; +import com.alibaba.nacossync.monitor.MetricsManager; import com.alibaba.nacossync.pojo.model.TaskDO; +import com.alibaba.nacossync.template.processor.TaskUpdateProcessor; +import com.alibaba.nacossync.timer.FastSyncHelper; +import com.alibaba.nacossync.util.StringUtils; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import static com.alibaba.nacossync.constant.SkyWalkerConstants.SOURCE_CLUSTERID_KEY; import static com.alibaba.nacossync.util.NacosUtils.getGroupNameOrDefault; /** @@ -39,58 +60,381 @@ @Slf4j @NacosSyncService(sourceCluster = ClusterTypeEnum.NACOS, destinationCluster = ClusterTypeEnum.NACOS) -public class NacosSyncToNacosServiceImpl extends AbstractNacosSync { +public class NacosSyncToNacosServiceImpl implements SyncService, InitializingBean { + private Map listenerMap = new ConcurrentHashMap<>(); + + private final Map syncTaskTap = new ConcurrentHashMap<>(); + + @Autowired + private MetricsManager metricsManager; @Autowired private SkyWalkerCacheServices skyWalkerCacheServices; + @Autowired + private NacosServerHolder nacosServerHolder; + + private ConcurrentHashMap allSyncTaskMap = new ConcurrentHashMap<>(); + + @Autowired + private ClusterAccessService clusterAccessService; + + public static Map> serviceClient = new ConcurrentHashMap<>(); + + @Autowired + private FastSyncHelper fastSyncHelper; + + @Autowired + private TaskUpdateProcessor taskUpdateProcessor; + + /** + * 因为网络故障等原因,nacos sync的同步任务会失败,导致目标集群注册中心缺少同步实例, 为避免目标集群注册中心长时间缺少同步实例,每隔5分钟启动一个兜底工作线程执行一遍全部的同步任务。 + */ + @Override - public String composeInstanceKey(String ip, int port) { - return ip + ":" + port; + public void afterPropertiesSet() { + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r); + t.setDaemon(true); + t.setName("com.alibaba.nacossync.basic.synctask"); + return t; + }); + + executorService.scheduleWithFixedDelay(() -> { + if (allSyncTaskMap.size() == 0) { + return; + } + + try { + Collection taskCollections = allSyncTaskMap.values(); + List taskDOList = new ArrayList<>(taskCollections); + + if (CollectionUtils.isNotEmpty(taskDOList)) { + fastSyncHelper.syncWithThread(taskDOList, this::timeSync); + } + + } catch (Throwable e) { + log.warn("basic synctask thread error", e); + } + }, 0, 300, TimeUnit.SECONDS); } @Override - public void register(TaskDO taskDO, Instance instance) { - NamingService destNamingService = getNacosServerHolder().get(taskDO.getDestClusterId()); + public boolean delete(TaskDO taskDO) { try { - destNamingService.registerInstance(taskDO.getServiceName(), getGroupNameOrDefault(taskDO.getGroupName()), - buildSyncInstance(instance, taskDO)); - } catch (NacosException e) { - log.error("Register instance={} to Nacos failed", taskDO.getServiceName(), e); + NamingService sourceNamingService = nacosServerHolder.getSourceNamingService(taskDO.getTaskId(), + taskDO.getSourceClusterId()); + + if ("ALL".equals(taskDO.getServiceName())) { + String operationId = taskUpdateProcessor.getTaskIdAndOperationIdMap(taskDO.getTaskId()); + if (!StringUtils.isEmpty(operationId)) { + allSyncTaskMap.remove(operationId); + } + + //处理group级别的服务任务删除 + ListView servicesOfServer = sourceNamingService.getServicesOfServer(0, Integer.MAX_VALUE, + taskDO.getGroupName()); + List serviceNames = servicesOfServer.getData(); + for (String serviceName : serviceNames) { + String operationKey = taskDO.getTaskId() + serviceName; + skyWalkerCacheServices.removeFinishedTask(operationKey); + allSyncTaskMap.remove(operationKey); + NamingService destNamingService = popNamingService(taskDO); + sourceNamingService.unsubscribe(serviceName, getGroupNameOrDefault(taskDO.getGroupName()), + listenerMap.remove(taskDO.getTaskId() + serviceName)); + + List sourceInstances = sourceNamingService.getAllInstances(serviceName, + getGroupNameOrDefault(taskDO.getGroupName()), new ArrayList<>(), false); + for (Instance instance : sourceInstances) { + if (needSync(instance.getMetadata())) { + destNamingService.deregisterInstance(serviceName, + getGroupNameOrDefault(taskDO.getGroupName()), instance.getIp(), instance.getPort()); + } + } + } + } else { + //处理服务级别的任务删除 + String operationId = taskUpdateProcessor.getTaskIdAndOperationIdMap(taskDO.getTaskId()); + if (StringUtils.isEmpty(operationId)) { + log.warn("operationId is null data synchronization is not currently performed.{}", operationId); + return false; + } + + sourceNamingService.unsubscribe(taskDO.getServiceName(), getGroupNameOrDefault(taskDO.getGroupName()), + listenerMap.remove(operationId)); + List sourceInstances = sourceNamingService.getAllInstances(taskDO.getServiceName(), + getGroupNameOrDefault(taskDO.getGroupName()), new ArrayList<>(), false); + + NamingService destNamingService = popNamingService(taskDO); + for (Instance instance : sourceInstances) { + if (needSync(instance.getMetadata())) { + destNamingService.deregisterInstance(taskDO.getServiceName(), + getGroupNameOrDefault(taskDO.getGroupName()), instance); + } + } + // 移除任务 + skyWalkerCacheServices.removeFinishedTask(operationId); + // 移除所有需要同步的Task + allSyncTaskMap.remove(operationId); + } + } catch (Exception e) { + log.error("delete task from nacos to nacos was failed, operationalId:{}", taskDO.getOperationId(), e); + metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR); + return false; } + return true; } @Override - public void deregisterInstance(TaskDO taskDO) throws Exception { - NamingService destNamingService = getNacosServerHolder().get(taskDO.getDestClusterId()); - List allInstances = destNamingService.getAllInstances(taskDO.getServiceName(), - getGroupNameOrDefault(taskDO.getGroupName()), new ArrayList<>(), true); - for (Instance instance : allInstances) { - if (needDelete(instance.getMetadata(), taskDO)) { - destNamingService.deregisterInstance(taskDO.getServiceName(), - getGroupNameOrDefault(taskDO.getGroupName()), instance.getIp(), instance.getPort()); + public boolean sync(TaskDO taskDO, Integer index) { + log.info("线程 {} 开始同步 {} ", Thread.currentThread().getId(), System.currentTimeMillis()); + String operationId = taskDO.getOperationId(); + try { + NamingService sourceNamingService = nacosServerHolder.getSourceNamingService(taskDO.getTaskId(), + taskDO.getSourceClusterId()); + NamingService destNamingService = getDestNamingService(taskDO, index); + allSyncTaskMap.put(operationId, taskDO); + //防止暂停同步任务后,重新同步/或删除任务以后新建任务不会再接收到新的事件导致不能同步,所以每次订阅事件之前,先全量同步一次任务 + long startTime = System.currentTimeMillis(); + doSync(operationId, taskDO, sourceNamingService, destNamingService); + log.info("同步一个服务注册耗时:{} ms", System.currentTimeMillis() - startTime); + this.listenerMap.putIfAbsent(operationId, event -> { + if (event instanceof NamingEvent) { + NamingEvent namingEvent = (NamingEvent) event; + log.info("监听到服务{}信息改变, taskId:{},实例数:{},发起同步", namingEvent.getServiceName(), + operationId, namingEvent.getInstances() == null ? null : namingEvent.getInstances().size()); + try { + doSync(operationId, taskDO, sourceNamingService, destNamingService); + log.info("监听到服务{}同步结束", namingEvent.getServiceName()); + } catch (Exception e) { + log.error("event process fail, operationId:{}", operationId, e); + metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR); + } + } + }); + sourceNamingService.subscribe(taskDO.getServiceName(), getGroupNameOrDefault(taskDO.getGroupName()), + listenerMap.get(operationId)); + } catch (Exception e) { + log.error("sync task from nacos to nacos was failed, operationId:{}", operationId, e); + metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR); + return false; + } + return true; + } + + /** + * basic sync + * + * @param taskDO + */ + public void timeSync(TaskDO taskDO) { + log.debug("线程{}开始同步{}", Thread.currentThread().getId(), System.currentTimeMillis()); + String operationId = taskDO.getOperationId(); + try { + NamingService sourceNamingService = nacosServerHolder.getSourceNamingService(taskDO.getTaskId(), + taskDO.getSourceClusterId()); + //获取目标集群client + NamingService destNamingService = popNamingService(taskDO); + long startTime = System.currentTimeMillis(); + doSync(operationId, taskDO, sourceNamingService, destNamingService); + log.info("同步一个服务注册耗时:{} ms", System.currentTimeMillis() - startTime); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private NamingService getDestNamingService(TaskDO taskDO, Integer index) { + String key = taskDO.getSourceClusterId() + ":" + taskDO.getDestClusterId() + ":" + index; + return nacosServerHolder.get(key); + } + + private void doSync(String taskId, TaskDO taskDO, NamingService sourceNamingService, + NamingService destNamingService) throws NacosException { + if (syncTaskTap.putIfAbsent(taskId, 1) != null) { + log.info("任务Id:{}上一个同步任务尚未结束", taskId); + return; + } + //记录目标集群的Client + recordNamingService(taskDO, destNamingService); + try { + + List sourceInstances = sourceNamingService.getAllInstances(taskDO.getServiceName(), + getGroupNameOrDefault(taskDO.getGroupName()), new ArrayList<>(), true); + + int level = clusterAccessService.findClusterLevel(taskDO.getSourceClusterId()); + if (CollectionUtils.isNotEmpty(sourceInstances) && sourceInstances.get(0).isEphemeral()) { + //处临实例的批量数据同步,需要获取当前所有的服务实例,TODO,当Client为1.x的时候,执行和持久化实例一样的同步方式 + handlerPersistenceInstance(taskDO, destNamingService, sourceInstances, level); + } else if (CollectionUtils.isEmpty(sourceInstances)) { + //如果当前源集群是空的 ,那么直接注销目标集群的实例 + log.debug("service {} need sync Ephemeral instance num is null: serviceName ", taskDO.getServiceName()); + processDeRegisterInstances(taskDO, destNamingService); + } else { + //处临持久化实例的批量数据同步 + handlerPersistenceInstance(taskDO, destNamingService, sourceInstances, level); } + } finally { + syncTaskTap.remove(taskId); } } - @Override - public void removeInvalidInstance(TaskDO taskDO, Set invalidInstanceKeys) { - NamingService destNamingService = getNacosServerHolder().get(taskDO.getDestClusterId()); + private void handlerPersistenceInstance(TaskDO taskDO, NamingService destNamingService, + List sourceInstances, int level) throws NacosException { + List needRegisterInstance = new ArrayList<>(); + for (Instance instance : sourceInstances) { + if (needSync(instance.getMetadata(), level, taskDO.getDestClusterId())) { + needRegisterInstance.add(instance); + } + } + List destAllInstances = destNamingService.getAllInstances(taskDO.getServiceName(), + getGroupNameOrDefault(taskDO.getGroupName()), new ArrayList<>(), true); - for (String instanceKey : invalidInstanceKeys) { - String[] split = instanceKey.split(":", -1); - try { + // 获取目标集群自己已经同步的实例 + List destHasSyncInstances = destAllInstances.stream() + .filter(instance -> hasSync(instance, taskDO.getSourceClusterId())).collect(Collectors.toList()); + + //获取新增的实例,遍历新增 + List newInstances = new ArrayList<>(needRegisterInstance); + instanceRemove(destHasSyncInstances, newInstances); + //注册 + for (Instance newInstance : newInstances) { + destNamingService.registerInstance(taskDO.getServiceName(), getGroupNameOrDefault(taskDO.getGroupName()), + buildSyncInstance(newInstance, taskDO)); + } + + List notRemoveInstances = new ArrayList<>(); + for (Instance destHasSyncInstance : destHasSyncInstances) { + for (Instance instance : needRegisterInstance) { + if (instanceEquals(destHasSyncInstance, instance)) { + notRemoveInstances.add(destHasSyncInstance); + } + } + } + destHasSyncInstances.removeAll(notRemoveInstances); + + if (CollectionUtils.isNotEmpty(destHasSyncInstances)) { + log.info("taskid:{},服务 {} 发生反注册,执行数量 {} ", taskDO.getTaskId(), taskDO.getServiceName(), + destHasSyncInstances.size()); + } + + for (Instance destAllInstance : destHasSyncInstances) { + destNamingService.deregisterInstance(taskDO.getServiceName(), getGroupNameOrDefault(taskDO.getGroupName()), + destAllInstance); + } + } + + + public static boolean instanceEquals(Instance ins1, Instance ins2) { + return (ins1.getIp().equals(ins2.getIp())) && (ins1.getPort() == ins2.getPort()) && (ins1.getWeight() + == ins2.getWeight()) && (ins1.isHealthy() == ins2.isHealthy()) && (ins1.isEphemeral() + == ins2.isEphemeral()) && (ins1.getClusterName().equals(ins2.getClusterName())) + && (ins1.getServiceName().equals(ins2.getServiceName())); + } + + private void instanceRemove(List destHasSyncInstances, List newInstances) { + List needRemoveInstance = new ArrayList<>(); + for (Instance destHasSyncInstance : destHasSyncInstances) { + for (Instance newInstance : newInstances) { + if (destHasSyncInstance.equals(newInstance)) { + //如果目标集群已经存在了源集群同步过来的实例,就不需要同步了 + needRemoveInstance.add(newInstance); + } + } + } + // eg:A Cluster 已经同步到 B Cluster的实例数据,就不需要再重复同步过来了 + newInstances.removeAll(needRemoveInstance); + } + + private boolean hasSync(Instance instance, String sourceClusterId) { + if (instance.getMetadata() != null) { + String sourceClusterKey = instance.getMetadata().get(SkyWalkerConstants.SOURCE_CLUSTERID_KEY); + return sourceClusterKey != null && sourceClusterKey.equals(sourceClusterId); + } + return false; + } + + + /** + * 当源集群需要同步的实例个数为0时,目标集群如果还有源集群同步的实例,执行反注册 + * + * @param taskDO + * @param destNamingService + * @throws NacosException + */ + private void processDeRegisterInstances(TaskDO taskDO, NamingService destNamingService) throws NacosException { + //如果此时sourceInstance中的实例为空,证明此时实例下线或实例不存在 + List destInstances = destNamingService.getAllInstances(taskDO.getServiceName(), + getGroupNameOrDefault(taskDO.getGroupName()), new ArrayList<>(), false); + // 如果目标集群中的数据实例也为空了,则测试无需操作 + if (CollectionUtils.isEmpty(destInstances)) { + return; + } + deRegisterFilter(destInstances, taskDO.getSourceClusterId()); + if (CollectionUtils.isNotEmpty(destInstances)) { + //逐个执行反注册,拿出一个实例即可, 需要处理redo,否则会被重新注册上来 + for (Instance destInstance : destInstances) { destNamingService.deregisterInstance(taskDO.getServiceName(), - getGroupNameOrDefault(taskDO.getGroupName()), split[0], Integer.parseInt(split[1])); - } catch (NacosException e) { - log.error("Remove instance={} from Nacos failed", taskDO.getServiceName(), e); - + getGroupNameOrDefault(taskDO.getGroupName()), destInstance); } } } + private void deRegisterFilter(List destInstances, String sourceClusterId) { + List newDestInstance = new ArrayList<>(); + for (Instance destInstance : destInstances) { + Map metadata = destInstance.getMetadata(); + String destSourceClusterId = metadata.get(SkyWalkerConstants.SOURCE_CLUSTERID_KEY); + if (needDeregister(destSourceClusterId, sourceClusterId)) { + // 需要执行反注册 + newDestInstance.add(destInstance); + } + } + destInstances = newDestInstance; + } + + private boolean needDeregister(String destClusterId, String sourceClusterId) { + if (!StringUtils.isEmpty(destClusterId)) { + return destClusterId.equals(sourceClusterId); + } + return false; + } + + private boolean needSync(Map sourceMetaData, int level, String destClusterId) { + //普通集群(默认) + if (level == 0) { + return SyncService.super.needSync(sourceMetaData); + } + //中心集群,只要不是目标集群传过来的实例,都需要同步(扩展功能) + if (!destClusterId.equals(sourceMetaData.get(SOURCE_CLUSTERID_KEY))) { + return true; + } + return false; + } + + private void recordNamingService(TaskDO taskDO, NamingService destNamingService) { + String key = buildClientKey(taskDO); + serviceClient.computeIfAbsent(key, clientKey -> { + Set hashSet = new ConcurrentHashSet<>(); + hashSet.add(destNamingService); + return hashSet; + }); + } + + public NamingService popNamingService(TaskDO taskDO) { + String key = buildClientKey(taskDO); + Set namingServices = serviceClient.get(key); + if (CollectionUtils.isNotEmpty(namingServices)) { + return namingServices.iterator().next(); + } + log.warn("{} 无可用 namingservice", key); + return null; + } + + private static String buildClientKey(TaskDO taskDO) { + return taskDO.getId() + ":" + taskDO.getServiceName(); + } private Instance buildSyncInstance(Instance instance, TaskDO taskDO) { Instance temp = new Instance(); @@ -107,6 +451,8 @@ private Instance buildSyncInstance(Instance instance, TaskDO taskDO) { metaData.put(SkyWalkerConstants.SYNC_SOURCE_KEY, skyWalkerCacheServices.getClusterType(taskDO.getSourceClusterId()).getCode()); metaData.put(SkyWalkerConstants.SOURCE_CLUSTERID_KEY, taskDO.getSourceClusterId()); + //标识是同步实例 + metaData.put(SkyWalkerConstants.SYNC_INSTANCE_TAG, taskDO.getSourceClusterId() + "@@" + taskDO.getVersion()); temp.setMetadata(metaData); return temp; } diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/NacosSyncToZookeeperServiceImpl.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/NacosSyncToZookeeperServiceImpl.java index 8caf8ec8..b746d2d6 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/NacosSyncToZookeeperServiceImpl.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/NacosSyncToZookeeperServiceImpl.java @@ -125,7 +125,7 @@ public boolean delete(TaskDO taskDO) { } @Override - public boolean sync(TaskDO taskDO) { + public boolean sync(TaskDO taskDO, Integer index) { try { NamingService sourceNamingService = nacosServerHolder.get(taskDO.getSourceClusterId()); diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/ZookeeperSyncToNacosServiceImpl.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/ZookeeperSyncToNacosServiceImpl.java index 8c73e168..d2747723 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/ZookeeperSyncToNacosServiceImpl.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/ZookeeperSyncToNacosServiceImpl.java @@ -10,23 +10,8 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package com.alibaba.nacossync.extension.impl; -import static com.alibaba.nacossync.util.DubboConstants.ALL_SERVICE_NAME_PATTERN; -import static com.alibaba.nacossync.util.DubboConstants.DUBBO_PATH_FORMAT; -import static com.alibaba.nacossync.util.DubboConstants.DUBBO_ROOT_PATH; -import static com.alibaba.nacossync.util.DubboConstants.GROUP_KEY; -import static com.alibaba.nacossync.util.DubboConstants.INSTANCE_IP_KEY; -import static com.alibaba.nacossync.util.DubboConstants.INSTANCE_PORT_KEY; -import static com.alibaba.nacossync.util.DubboConstants.INTERFACE_KEY; -import static com.alibaba.nacossync.util.DubboConstants.PROTOCOL_KEY; -import static com.alibaba.nacossync.util.DubboConstants.VERSION_KEY; -import static com.alibaba.nacossync.util.DubboConstants.WEIGHT_KEY; -import static com.alibaba.nacossync.util.DubboConstants.ZOOKEEPER_SEPARATOR; -import static com.alibaba.nacossync.util.DubboConstants.createServiceName; -import static com.alibaba.nacossync.util.NacosUtils.getGroupNameOrDefault; -import static com.alibaba.nacossync.util.StringUtils.parseIpAndPortString; -import static com.alibaba.nacossync.util.StringUtils.parseQueryString; +package com.alibaba.nacossync.extension.impl; import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.naming.NamingService; @@ -41,6 +26,14 @@ import com.alibaba.nacossync.extension.holder.ZookeeperServerHolder; import com.alibaba.nacossync.monitor.MetricsManager; import com.alibaba.nacossync.pojo.model.TaskDO; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.TreeCache; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.curator.utils.CloseableUtils; +import org.springframework.beans.factory.annotation.Autowired; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -49,13 +42,22 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.cache.TreeCache; -import org.apache.curator.framework.recipes.cache.TreeCacheEvent; -import org.apache.curator.utils.CloseableUtils; -import org.springframework.beans.factory.annotation.Autowired; + +import static com.alibaba.nacossync.util.DubboConstants.ALL_SERVICE_NAME_PATTERN; +import static com.alibaba.nacossync.util.DubboConstants.DUBBO_PATH_FORMAT; +import static com.alibaba.nacossync.util.DubboConstants.DUBBO_ROOT_PATH; +import static com.alibaba.nacossync.util.DubboConstants.GROUP_KEY; +import static com.alibaba.nacossync.util.DubboConstants.INSTANCE_IP_KEY; +import static com.alibaba.nacossync.util.DubboConstants.INSTANCE_PORT_KEY; +import static com.alibaba.nacossync.util.DubboConstants.INTERFACE_KEY; +import static com.alibaba.nacossync.util.DubboConstants.PROTOCOL_KEY; +import static com.alibaba.nacossync.util.DubboConstants.VERSION_KEY; +import static com.alibaba.nacossync.util.DubboConstants.WEIGHT_KEY; +import static com.alibaba.nacossync.util.DubboConstants.ZOOKEEPER_SEPARATOR; +import static com.alibaba.nacossync.util.DubboConstants.createServiceName; +import static com.alibaba.nacossync.util.NacosUtils.getGroupNameOrDefault; +import static com.alibaba.nacossync.util.StringUtils.parseIpAndPortString; +import static com.alibaba.nacossync.util.StringUtils.parseQueryString; /** * @author paderlol @@ -65,40 +67,41 @@ @Slf4j @NacosSyncService(sourceCluster = ClusterTypeEnum.ZK, destinationCluster = ClusterTypeEnum.NACOS) public class ZookeeperSyncToNacosServiceImpl implements SyncService { - + @Autowired private MetricsManager metricsManager; - + /** * Listener cache of Zookeeper format taskId -> PathChildrenCache instance */ private Map treeCacheMap = new ConcurrentHashMap<>(); + /** * service name cache */ private Map nacosServiceNameMap = new ConcurrentHashMap<>(); - + private final ZookeeperServerHolder zookeeperServerHolder; - + private final NacosServerHolder nacosServerHolder; - + private final SkyWalkerCacheServices skyWalkerCacheServices; - + @Autowired public ZookeeperSyncToNacosServiceImpl(ZookeeperServerHolder zookeeperServerHolder, - NacosServerHolder nacosServerHolder, SkyWalkerCacheServices skyWalkerCacheServices) { + NacosServerHolder nacosServerHolder, SkyWalkerCacheServices skyWalkerCacheServices) { this.zookeeperServerHolder = zookeeperServerHolder; this.nacosServerHolder = nacosServerHolder; this.skyWalkerCacheServices = skyWalkerCacheServices; } - + @Override - public boolean sync(TaskDO taskDO) { + public boolean sync(TaskDO taskDO, Integer index) { try { if (treeCacheMap.containsKey(taskDO.getTaskId())) { return true; } - + TreeCache treeCache = getTreeCache(taskDO); NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId()); // 初次执行任务统一注册所有实例 @@ -106,7 +109,7 @@ public boolean sync(TaskDO taskDO) { //注册ZK监听 Objects.requireNonNull(treeCache).getListenable().addListener((client, event) -> { try { - + String path = event.getData().getPath(); Map queryParam = parseQueryString(path); if (isMatch(taskDO, queryParam) && needSync(queryParam)) { @@ -116,7 +119,7 @@ public boolean sync(TaskDO taskDO) { log.error("event process from Zookeeper to Nacos was failed, taskId:{}", taskDO.getTaskId(), e); metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR); } - + }); } catch (Exception e) { log.error("sync task from Zookeeper to Nacos was failed, taskId:{}", taskDO.getTaskId(), e); @@ -125,37 +128,35 @@ public boolean sync(TaskDO taskDO) { } return true; } - + private void processEvent(TaskDO taskDO, NamingService destNamingService, TreeCacheEvent event, String path, - Map queryParam) throws NacosException { + Map queryParam) throws NacosException { if (!com.alibaba.nacossync.util.StringUtils.isDubboProviderPath(path)) { return; } - + Map ipAndPortParam = parseIpAndPortString(path); Instance instance = buildSyncInstance(queryParam, ipAndPortParam, taskDO); String serviceName = queryParam.get(INTERFACE_KEY); switch (event.getType()) { case NODE_ADDED: case NODE_UPDATED: - - destNamingService.registerInstance( - getServiceNameFromCache(serviceName, queryParam), getGroupNameOrDefault(taskDO.getGroupName()), - instance); + + destNamingService.registerInstance(getServiceNameFromCache(serviceName, queryParam), + getGroupNameOrDefault(taskDO.getGroupName()), instance); break; case NODE_REMOVED: - - destNamingService.deregisterInstance( - getServiceNameFromCache(serviceName, queryParam), getGroupNameOrDefault(taskDO.getGroupName()), - ipAndPortParam.get(INSTANCE_IP_KEY), - Integer.parseInt(ipAndPortParam.get(INSTANCE_PORT_KEY))); + + destNamingService.deregisterInstance(getServiceNameFromCache(serviceName, queryParam), + getGroupNameOrDefault(taskDO.getGroupName()), ipAndPortParam.get(INSTANCE_IP_KEY), + Integer.parseInt(ipAndPortParam.get(INSTANCE_PORT_KEY))); nacosServiceNameMap.remove(serviceName); break; default: break; } } - + private void registerAllInstances(TaskDO taskDO, NamingService destNamingService) throws Exception { CuratorFramework zk = zookeeperServerHolder.get(taskDO.getSourceClusterId()); if (!ALL_SERVICE_NAME_PATTERN.equals(taskDO.getServiceName())) { @@ -168,9 +169,9 @@ private void registerAllInstances(TaskDO taskDO, NamingService destNamingService } } } - + private void registerALLInstances0(TaskDO taskDO, NamingService destNamingService, CuratorFramework zk, - String serviceName) throws Exception { + String serviceName) throws Exception { String path = String.format(DUBBO_PATH_FORMAT, serviceName); if (zk.getChildren() == null) { return; @@ -182,58 +183,55 @@ private void registerALLInstances0(TaskDO taskDO, NamingService destNamingServic Map ipAndPortParam = parseIpAndPortString(path + ZOOKEEPER_SEPARATOR + provider); Instance instance = buildSyncInstance(queryParam, ipAndPortParam, taskDO); destNamingService.registerInstance(getServiceNameFromCache(serviceName, queryParam), - getGroupNameOrDefault(taskDO.getGroupName()), - instance); + getGroupNameOrDefault(taskDO.getGroupName()), instance); } } } - + @Override public boolean delete(TaskDO taskDO) { if (taskDO.getServiceName() == null) { return true; } try { - + CloseableUtils.closeQuietly(treeCacheMap.get(taskDO.getTaskId())); NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId()); if (!ALL_SERVICE_NAME_PATTERN.equals(taskDO.getServiceName())) { if (nacosServiceNameMap.containsKey(taskDO.getServiceName())) { - List allInstances = - destNamingService.getAllInstances(nacosServiceNameMap.get(taskDO.getServiceName()), + List allInstances = destNamingService.getAllInstances( + nacosServiceNameMap.get(taskDO.getServiceName()), getGroupNameOrDefault(taskDO.getGroupName()), new ArrayList<>(), true); for (Instance instance : allInstances) { if (needDelete(instance.getMetadata(), taskDO)) { destNamingService.deregisterInstance(instance.getServiceName(), - getGroupNameOrDefault(taskDO.getGroupName()), instance.getIp(), - instance.getPort()); + getGroupNameOrDefault(taskDO.getGroupName()), instance.getIp(), instance.getPort()); } nacosServiceNameMap.remove(taskDO.getServiceName()); - + } } } else { Set serviceNames = nacosServiceNameMap.keySet(); for (String serviceName : serviceNames) { - + if (nacosServiceNameMap.containsKey(serviceName)) { - List allInstances = - destNamingService.getAllInstances(serviceName, getGroupNameOrDefault(taskDO.getGroupName()), - new ArrayList<>(), true); + List allInstances = destNamingService.getAllInstances(serviceName, + getGroupNameOrDefault(taskDO.getGroupName()), new ArrayList<>(), true); for (Instance instance : allInstances) { if (needDelete(instance.getMetadata(), taskDO)) { destNamingService.deregisterInstance(instance.getServiceName(), - getGroupNameOrDefault(taskDO.getGroupName()), instance.getIp(), - instance.getPort()); + getGroupNameOrDefault(taskDO.getGroupName()), instance.getIp(), + instance.getPort()); } nacosServiceNameMap.remove(serviceName); - + } } } } - - + + } catch (Exception e) { log.error("delete task from zookeeper to nacos was failed, taskId:{}", taskDO.getTaskId(), e); metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR); @@ -241,15 +239,14 @@ public boolean delete(TaskDO taskDO) { } return true; } - + /** * fetch the Path cache when the task sync */ protected TreeCache getTreeCache(TaskDO taskDO) { return treeCacheMap.computeIfAbsent(taskDO.getTaskId(), (key) -> { try { - TreeCache treeCache = - new TreeCache(zookeeperServerHolder.get(taskDO.getSourceClusterId()), + TreeCache treeCache = new TreeCache(zookeeperServerHolder.get(taskDO.getSourceClusterId()), DUBBO_ROOT_PATH); treeCache.start(); return treeCache; @@ -258,21 +255,25 @@ protected TreeCache getTreeCache(TaskDO taskDO) { return null; } }); - + } - + /** * The instance information that needs to be synchronized is matched based on the dubbo version and the grouping * name */ protected boolean isMatch(TaskDO taskDO, Map queryParam) { - Predicate isVersionEq = (task) -> StringUtils.isBlank(taskDO.getVersion()) - || StringUtils.equals(task.getVersion(), queryParam.get(VERSION_KEY)); - Predicate isGroupEq = (task) -> StringUtils.isBlank(taskDO.getGroupName()) - || StringUtils.equals(task.getGroupName(), queryParam.get(GROUP_KEY)); - return isVersionEq.and(isGroupEq).test(taskDO); + Predicate isVersionEq = (task) -> StringUtils.isBlank(taskDO.getVersion()) || StringUtils.equals( + task.getVersion(), queryParam.get(VERSION_KEY)); + Predicate isGroupEq = (task) -> StringUtils.isBlank(taskDO.getGroupName()) || StringUtils.equals( + task.getGroupName(), queryParam.get(GROUP_KEY)); + Predicate isServiceEq = (task) -> StringUtils.isNotBlank(taskDO.getServiceName()) && StringUtils.equals( + task.getServiceName(), queryParam.get(INTERFACE_KEY)); + Predicate isMatchAll = (task) -> StringUtils.isNotBlank(taskDO.getServiceName()) && StringUtils.equals( + taskDO.getServiceName(), ALL_SERVICE_NAME_PATTERN); + return isVersionEq.and(isGroupEq).and(isServiceEq).or(isMatchAll).test(taskDO); } - + /** * create Nacos service instance * @@ -280,24 +281,24 @@ protected boolean isMatch(TaskDO taskDO, Map queryParam) { * @param ipAndPortMap dubbo ip and address */ protected Instance buildSyncInstance(Map queryParam, Map ipAndPortMap, - TaskDO taskDO) { + TaskDO taskDO) { Instance temp = new Instance(); temp.setIp(ipAndPortMap.get(INSTANCE_IP_KEY)); temp.setPort(Integer.parseInt(ipAndPortMap.get(INSTANCE_PORT_KEY))); temp.setServiceName(getServiceNameFromCache(taskDO.getTaskId(), queryParam)); temp.setWeight(Double.parseDouble(queryParam.get(WEIGHT_KEY) == null ? "1.0" : queryParam.get(WEIGHT_KEY))); temp.setHealthy(true); - + Map metaData = new HashMap<>(queryParam); metaData.put(PROTOCOL_KEY, ipAndPortMap.get(PROTOCOL_KEY)); metaData.put(SkyWalkerConstants.DEST_CLUSTERID_KEY, taskDO.getDestClusterId()); metaData.put(SkyWalkerConstants.SYNC_SOURCE_KEY, - skyWalkerCacheServices.getClusterType(taskDO.getSourceClusterId()).getCode()); + skyWalkerCacheServices.getClusterType(taskDO.getSourceClusterId()).getCode()); metaData.put(SkyWalkerConstants.SOURCE_CLUSTERID_KEY, taskDO.getSourceClusterId()); temp.setMetadata(metaData); return temp; } - + /** * cteate Dubbo service name * @@ -307,5 +308,5 @@ protected Instance buildSyncInstance(Map queryParam, Map queryParam) { return nacosServiceNameMap.computeIfAbsent(serviceName, (key) -> createServiceName(queryParam)); } - + } diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/pojo/model/ClusterDO.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/pojo/model/ClusterDO.java index 0018af86..67fcc4ee 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/pojo/model/ClusterDO.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/pojo/model/ClusterDO.java @@ -67,5 +67,7 @@ public class ClusterDO implements Serializable { private String password; private String namespace; - + + private Integer clusterLevel; + } diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/pojo/model/TaskDO.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/pojo/model/TaskDO.java index 47372aa0..7b6f0a8f 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/pojo/model/TaskDO.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/pojo/model/TaskDO.java @@ -70,4 +70,9 @@ public class TaskDO implements Serializable { * operation id,The operation id follow when the task status changes */ private String operationId; + + /** + * current task status + */ + private Integer status; } diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/template/processor/TaskAddProcessor.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/template/processor/TaskAddProcessor.java index 3316b4e0..c9f981e4 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/template/processor/TaskAddProcessor.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/template/processor/TaskAddProcessor.java @@ -89,7 +89,7 @@ public void process(TaskAddRequest taskAddRequest, TaskAddResult taskAddResult, taskDO.setTaskStatus(TaskStatusEnum.SYNC.getCode()); taskDO.setWorkerIp(SkyWalkerUtil.getLocalIp()); taskDO.setOperationId(SkyWalkerUtil.generateOperationId()); - + } else { taskDO.setTaskStatus(TaskStatusEnum.SYNC.getCode()); diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/template/processor/TaskUpdateProcessor.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/template/processor/TaskUpdateProcessor.java index 84fbe90e..8b8d4e4a 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/template/processor/TaskUpdateProcessor.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/template/processor/TaskUpdateProcessor.java @@ -30,6 +30,9 @@ import com.alibaba.nacossync.pojo.request.TaskUpdateRequest; import com.alibaba.nacossync.template.Processor; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + /** * @author NacosSync * @version $Id: TaskUpdateProcessor.java, v 0.1 2018-10-17 PM11:11 NacosSync Exp $$ @@ -39,6 +42,8 @@ public class TaskUpdateProcessor implements Processor { @Autowired private TaskAccessService taskAccessService; + + private Map taskIdAndOperationIdMap = new ConcurrentHashMap<>(); @Override public void process(TaskUpdateRequest taskUpdateRequest, BaseResult baseResult, @@ -56,10 +61,17 @@ public void process(TaskUpdateRequest taskUpdateRequest, BaseResult baseResult, throw new SkyWalkerException("taskDo is null ,taskId is :" + taskUpdateRequest.getTaskId()); } - + taskDO.setTaskStatus(taskUpdateRequest.getTaskStatus()); + //在id生成之前保存好操作id,可以在删除操作里面进行 + taskIdAndOperationIdMap.put(taskDO.getTaskId(),taskDO.getOperationId()); + taskDO.setOperationId(SkyWalkerUtil.generateOperationId()); taskAccessService.addTask(taskDO); } + + public String getTaskIdAndOperationIdMap(String taskId) { + return taskIdAndOperationIdMap.remove(taskId); + } } diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/timer/CheckRunningStatusAllThread.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/timer/CheckRunningStatusAllThread.java new file mode 100644 index 00000000..bd332220 --- /dev/null +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/timer/CheckRunningStatusAllThread.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacossync.timer; + +import com.alibaba.nacos.api.naming.NamingService; +import com.alibaba.nacos.api.naming.pojo.ListView; +import com.alibaba.nacos.client.naming.utils.CollectionUtils; +import com.alibaba.nacossync.cache.SkyWalkerCacheServices; +import com.alibaba.nacossync.constant.MetricsStatisticsType; +import com.alibaba.nacossync.constant.TaskStatusEnum; +import com.alibaba.nacossync.dao.TaskAccessService; +import com.alibaba.nacossync.extension.holder.NacosServerHolder; +import com.alibaba.nacossync.monitor.MetricsManager; +import com.alibaba.nacossync.pojo.model.TaskDO; +import com.google.common.eventbus.EventBus; +import lombok.extern.slf4j.Slf4j; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * when the database task service name is empty, check all the services in the cluster and create a synchronization task. + * @ClassName: CheckRunningStatusAllThread + * @Author: ChenHao26 + * @Date: 2022/7/20 10:30 + * @Description: muti sync data + */ +@Slf4j +public class CheckRunningStatusAllThread implements Runnable{ + + private MetricsManager metricsManager; + + private SkyWalkerCacheServices skyWalkerCacheServices; + + private TaskAccessService taskAccessService; + + private EventBus eventBus; + + private NacosServerHolder nacosServerHolder; + + private FastSyncHelper fastSyncHelper; + + public CheckRunningStatusAllThread(MetricsManager metricsManager, SkyWalkerCacheServices skyWalkerCacheServices, + TaskAccessService taskAccessService, EventBus eventBus, NacosServerHolder nacosServerHolder, + FastSyncHelper fastSyncHelper) { + this.metricsManager = metricsManager; + this.skyWalkerCacheServices = skyWalkerCacheServices; + this.taskAccessService = taskAccessService; + this.eventBus = eventBus; + this.nacosServerHolder = nacosServerHolder; + this.fastSyncHelper = fastSyncHelper; + } + + /** + * 根据ns级别进行数据同步 + */ + @Override + public void run() { + Long startTime = System.currentTimeMillis(); + try { + List taskDOS = taskAccessService.findServiceNameIsNull() + .stream().filter(t -> t.getStatus() == null || t.getStatus() == 0) + .collect(Collectors.toList()); + if (CollectionUtils.isEmpty(taskDOS)) { + return; + } + for (TaskDO taskDO : taskDOS) { + List serviceNameList = getServiceNameList(taskDO); + if (CollectionUtils.isEmpty(serviceNameList)) { + continue; + } + + //如果是null,证明此时没有处理完成 + List filterService = serviceNameList.stream() + .filter(serviceName -> skyWalkerCacheServices.getFinishedTask(taskDO.getTaskId() + serviceName ) == null) + .collect(Collectors.toList()); + + if (CollectionUtils.isEmpty(filterService)) { + continue; + } + + // 当删除任务后,此时任务的状态为DELETE,不会执行数据同步 + if (TaskStatusEnum.SYNC.getCode().equals(taskDO.getTaskStatus())) { + fastSyncHelper.syncWithThread(taskDO, filterService); + } + } + }catch (Exception e) { + log.warn("CheckRunningStatusThread Exception ", e); + } + metricsManager.record(MetricsStatisticsType.DISPATCHER_TASK, System.currentTimeMillis() - startTime); + } + + /** + * get serviceName list. + * @param taskDO task info + * @return service list or empty list + */ + private List getServiceNameList(TaskDO taskDO) { + NamingService namingService = nacosServerHolder.get(taskDO.getSourceClusterId()); + try { + ListView servicesOfServer = namingService.getServicesOfServer(0, Integer.MAX_VALUE, + taskDO.getGroupName()); + return servicesOfServer.getData(); + } catch (Exception e) { + log.error("query service list failure",e); + } + + return Collections.emptyList(); + } +} diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/timer/FastSyncHelper.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/timer/FastSyncHelper.java new file mode 100644 index 00000000..f379bf1a --- /dev/null +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/timer/FastSyncHelper.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacossync.timer; + +import com.alibaba.nacossync.cache.SkyWalkerCacheServices; +import com.alibaba.nacossync.constant.MetricsStatisticsType; +import com.alibaba.nacossync.extension.SyncManagerService; +import com.alibaba.nacossync.monitor.MetricsManager; +import com.alibaba.nacossync.pojo.model.TaskDO; +import com.alibaba.nacossync.util.Tuple; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeanUtils; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Consumer; + +import static com.alibaba.nacossync.constant.SkyWalkerConstants.MAX_THREAD_NUM; + +/** + * multi-threaded synchronization Task DO. + * + * @ClassName: FastSyncHelper + * @Author: ChenHao26 + * @Date: 2022/7/19 17:02 + * @Description: + */ +@Service +@Slf4j +public class FastSyncHelper { + + private final SkyWalkerCacheServices skyWalkerCacheServices; + + private final MetricsManager metricsManager; + + private final SyncManagerService syncManagerService; + + + private final ExecutorService executorService = Executors.newFixedThreadPool(MAX_THREAD_NUM); + + public FastSyncHelper(SkyWalkerCacheServices skyWalkerCacheServices, MetricsManager metricsManager, + SyncManagerService syncManagerService) { + this.skyWalkerCacheServices = skyWalkerCacheServices; + this.metricsManager = metricsManager; + this.syncManagerService = syncManagerService; + } + + + /** + * every 200 services start a thread to perform synchronization. + * + * @param taskDOS task list + */ + public void syncWithThread(List taskDOS, Consumer timeSyncInvoke) { + sync(taskDOS, tuple -> { + for (TaskDO task : tuple.getT2()) { + //执行兜底的定时同步 + timeSyncInvoke.accept(task); + } + }); + } + + + /** + * every 200 services start a thread to perform synchronization. + * + * @param taskDO task info + * @param filterServices filterServices + */ + public void syncWithThread(TaskDO taskDO, List filterServices) { + sync(filterServices, tuple -> { + // 执行数据同步 + for (String serviceName : tuple.getT2()) { + syncByIndex(taskDO, serviceName, tuple.getT1()); + } + }); + } + + public void sync(List items, Consumer>> itemConsumer) { + long startTime = System.currentTimeMillis(); + List>> taskGroupList = averageAssign(items, MAX_THREAD_NUM); + + // 等待所有任务完成 + CompletableFuture allTasks = CompletableFuture.allOf(taskGroupList.stream() + .map(tuple -> CompletableFuture.runAsync(() -> performSync(tuple, itemConsumer), executorService)) + .toArray(CompletableFuture[]::new)); + try { + allTasks.get(); + } catch (Exception e) { + e.printStackTrace(); + } + + log.info("新增同步任务数量 {}, 执行耗时:{}ms", items.size(), System.currentTimeMillis() - startTime); + } + + private void performSync(Tuple> tuple, Consumer>> itemConsumer) { + if (tuple == null || tuple.getT2() == null || tuple.getT2().isEmpty()) { + return; + } + itemConsumer.accept(tuple); + + } + + + private void syncByIndex(TaskDO taskDO, String serviceName, int index) { + long startTime = System.currentTimeMillis(); + TaskDO task = new TaskDO(); + BeanUtils.copyProperties(taskDO, task); + task.setServiceName(serviceName); + task.setOperationId(taskDO.getTaskId() + serviceName); + if (syncManagerService.sync(task, index)) { + skyWalkerCacheServices.addFinishedTask(task); + log.info("sync thread : {} sync finish ,time consuming :{}", Thread.currentThread().getId(), + System.currentTimeMillis() - startTime); + metricsManager.record(MetricsStatisticsType.SYNC_TASK_RT, System.currentTimeMillis() - startTime); + } else { + log.warn("listenerSyncTaskEvent sync failure."); + } + } + + /** + * 将一个List均分成n个list,主要通过偏移量来实现的 + * + * @param source 源集合 + * @param limit 最大值 + * @return + */ + public static List>> averageAssign(List source, int limit) { + if (null == source || source.isEmpty()) { + return Collections.emptyList(); + } + int size = source.size(); + List>> result = new ArrayList<>(); + // 通过减去1并加1,我们可以确保将多余的元素放在最后一个子列表中。在上述示例中,计算结果为 ((10 - 1) / 3 + 1) = 4,我们创建了4个子列表,其中最后一个子列表包含2个元素,而不是1个。这样可以更均匀地分配源列表的元素. + int listCount = (int) Math.ceil((double) source.size() / limit); // 计算子列表数量,使用 Math.ceil 向上取整,确保多余的元素放在最后一个子列表中 + int remainder = source.size() % listCount; // 计算多余的元素数量 + int assigned = 0; // 记录已分配的元素索引 + for (int i = 0; i < listCount; i++) { + int sublistSize = size / listCount + (remainder-- > 0 ? 1 : 0); // 计算子列表大小,平均分配元素,并在有多余元素时将其分配到子列表中 + List sublist = source.subList(assigned, assigned + sublistSize); // 获取子列表 + result.add(Tuple.of(i, sublist)); // 将子列表添加到结果列表 + assigned += sublistSize; // 更新已分配的元素索引 + } + + return result; + } + + +} diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/timer/QuerySyncTaskTimer.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/timer/QuerySyncTaskTimer.java index 1c72d0e9..b840ac85 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/timer/QuerySyncTaskTimer.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/timer/QuerySyncTaskTimer.java @@ -22,6 +22,8 @@ import com.alibaba.nacossync.dao.TaskAccessService; import com.alibaba.nacossync.event.DeleteTaskEvent; import com.alibaba.nacossync.event.SyncTaskEvent; +import com.alibaba.nacossync.extension.SyncManagerService; +import com.alibaba.nacossync.extension.holder.NacosServerHolder; import com.alibaba.nacossync.monitor.MetricsManager; import com.alibaba.nacossync.pojo.model.TaskDO; import com.google.common.eventbus.EventBus; @@ -54,13 +56,25 @@ public class QuerySyncTaskTimer implements CommandLineRunner { @Autowired private ScheduledExecutorService scheduledExecutorService; + + @Autowired + private NacosServerHolder nacosServerHolder; + + @Autowired + private SyncManagerService syncManagerService; + + @Autowired + private FastSyncHelper fastSyncHelper; @Override public void run(String... args) { /** Fetch the task list from the database every 3 seconds */ scheduledExecutorService.scheduleWithFixedDelay(new CheckRunningStatusThread(), 0, 3000, TimeUnit.MILLISECONDS); - + + scheduledExecutorService.scheduleWithFixedDelay(new CheckRunningStatusAllThread(metricsManager,skyWalkerCacheServices, + taskAccessService,eventBus, nacosServerHolder, fastSyncHelper), 0, 3000, + TimeUnit.MILLISECONDS); } private class CheckRunningStatusThread implements Runnable { diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/util/Tuple.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/util/Tuple.java new file mode 100644 index 00000000..0c94d112 --- /dev/null +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/util/Tuple.java @@ -0,0 +1,207 @@ +/* + * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacossync.util; + +import reactor.util.annotation.NonNull; +import reactor.util.annotation.Nullable; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.function.Function; + +/** + * A tuple that holds two non-null values. + * + * @param The type of the first non-null value held by this tuple + * @param The type of the second non-null value held by this tuple + * @author Jon Brisbin + * @author Stephane Maldini + */ +@SuppressWarnings("rawtypes") +public class Tuple implements Iterable, Serializable { + + private static final long serialVersionUID = -3518082018884860684L; + + @NonNull + final T1 t1; + + @NonNull + final T2 t2; + + Tuple(T1 t1, T2 t2) { + this.t1 = Objects.requireNonNull(t1, "t1"); + this.t2 = Objects.requireNonNull(t2, "t2"); + } + + /** + * Type-safe way to get the first object of this {@link Tuples}. + * + * @return The first object + */ + public T1 getT1() { + return t1; + } + + /** + * Type-safe way to get the second object of this {@link Tuples}. + * + * @return The second object + */ + public T2 getT2() { + return t2; + } + + /** + * Map the left-hand part (T1) of this {@link Tuple} into a different value and type, keeping the right-hand part + * (T2). + * + * @param mapper the mapping {@link Function} for the left-hand part + * @param the new type for the left-hand part + * @return a new {@link Tuple2} with a different left (T1) value + */ + public Tuple mapT1(Function mapper) { + return new Tuple<>(mapper.apply(t1), t2); + } + + /** + * Map the right-hand part (T2) of this {@link Tuple} into a different value and type, keeping the left-hand part + * (T1). + * + * @param mapper the mapping {@link Function} for the right-hand part + * @param the new type for the right-hand part + * @return a new {@link Tuple2} with a different right (T2) value + */ + public Tuple mapT2(Function mapper) { + return new Tuple<>(t1, mapper.apply(t2)); + } + + /** + * Get the object at the given index. + * + * @param index The index of the object to retrieve. Starts at 0. + * @return The object or {@literal null} if out of bounds. + */ + @Nullable + public Object get(int index) { + switch (index) { + case 0: + return t1; + case 1: + return t2; + default: + return null; + } + } + + /** + * Turn this {@code Tuple} into a {@link List List<Object>}. The list isn't tied to this Tuple but is a + * copy with limited mutability ({@code add} and {@code remove} are not supported, but {@code set} + * is). + * + * @return A copy of the tuple as a new {@link List List<Object>}. + */ + public List toList() { + return Arrays.asList(toArray()); + } + + /** + * Turn this {@code Tuple} into a plain {@code Object[]}. The array isn't tied to this Tuple but is a + * copy. + * + * @return A copy of the tuple as a new {@link Object Object[]}. + */ + public Object[] toArray() { + return new Object[] {t1, t2}; + } + + /** + * Return an immutable {@link Iterator Iterator<Object>} around the content of this + * {@code Tuple}. + * + * @return An unmodifiable {@link Iterator} over the elements in this Tuple. + * @implNote As an {@link Iterator} is always tied to its {@link Iterable} source by definition, the iterator cannot + * be mutable without the iterable also being mutable. Since {@link Tuples} are immutable, so is + * the {@link Iterator} returned by this method. + */ + @Override + public Iterator iterator() { + return Collections.unmodifiableList(toList()).iterator(); + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + Tuple tuple = (Tuple) o; + + return t1.equals(tuple.t1) && t2.equals(tuple.t2); + + } + + @Override + public int hashCode() { + int result = size(); + result = 31 * result + t1.hashCode(); + result = 31 * result + t2.hashCode(); + return result; + } + + /** + * Return the number of elements in this {@literal Tuples}. + * + * @return The size of this {@literal Tuples}. + */ + public int size() { + return 2; + } + + /** + * A Tuple String representation is the comma separated list of values, enclosed in square brackets. + * + * @return the Tuple String representation + */ + @Override + public final String toString() { + Object[] values = toArray(); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < values.length; i++) { + Object t = values[i]; + if (i != 0) { + sb.append(','); + } + if (t != null) { + sb.append(t); + } + } + return sb.insert(0, '[').append(']').toString(); + } + + public static Tuple of(T1 t1, T2 t2) { + return new Tuple<>(t1, t2); + } +} diff --git a/nacossync-worker/src/main/resources/application.properties b/nacossync-worker/src/main/resources/application.properties index 5d59a44b..0644087c 100644 --- a/nacossync-worker/src/main/resources/application.properties +++ b/nacossync-worker/src/main/resources/application.properties @@ -11,6 +11,5 @@ spring.cloud.discovery.enabled=false spring.datasource.url=jdbc:mysql://127.0.0.1:3306/nacos_sync?characterEncoding=utf8 spring.datasource.username=root spring.datasource.password=root - management.endpoints.web.exposure.include=* management.endpoint.health.show-details=always diff --git a/nacossync-worker/src/test/java/com/alibaba/nacossync/extension/impl/ConsulSyncToNacosServiceImplTest.java b/nacossync-worker/src/test/java/com/alibaba/nacossync/extension/impl/ConsulSyncToNacosServiceImplTest.java index 46ccd300..64d6b22d 100644 --- a/nacossync-worker/src/test/java/com/alibaba/nacossync/extension/impl/ConsulSyncToNacosServiceImplTest.java +++ b/nacossync-worker/src/test/java/com/alibaba/nacossync/extension/impl/ConsulSyncToNacosServiceImplTest.java @@ -67,7 +67,7 @@ public void testConsulSyncToNacos() throws Exception { TaskDO taskDO = mock(TaskDO.class); mockSync(taskDO); // TODO Test the core logic in the future - Assert.assertTrue(consulSyncToNacosService.sync(taskDO)); + Assert.assertTrue(consulSyncToNacosService.sync(taskDO,null)); } @Test @@ -80,7 +80,7 @@ public void testConsulDeleteSyncToNacos() throws Exception { @Test(expected = Exception.class) public void testConsulSyncToNacosWithException() throws Exception { - Assert.assertFalse(consulSyncToNacosService.sync(null)); + Assert.assertFalse(consulSyncToNacosService.sync(null,null)); } @Test(expected = Exception.class) diff --git a/nacossync-worker/src/test/java/com/alibaba/nacossync/extension/impl/EurekaSyncToNacosServiceImplTest.java b/nacossync-worker/src/test/java/com/alibaba/nacossync/extension/impl/EurekaSyncToNacosServiceImplTest.java index c1fc59d4..4067a5e0 100644 --- a/nacossync-worker/src/test/java/com/alibaba/nacossync/extension/impl/EurekaSyncToNacosServiceImplTest.java +++ b/nacossync-worker/src/test/java/com/alibaba/nacossync/extension/impl/EurekaSyncToNacosServiceImplTest.java @@ -61,7 +61,7 @@ public class EurekaSyncToNacosServiceImplTest { public void testEurekaSyncToNacos() throws Exception { TaskDO taskDO = mock(TaskDO.class); mockSync(taskDO); - Assert.assertTrue(eurekaSyncToNacosService.sync(taskDO)); + Assert.assertTrue(eurekaSyncToNacosService.sync(taskDO,null)); } @@ -74,7 +74,7 @@ public void testEurekaDeleteSyncToNacos() throws Exception { } @Test(expected = Exception.class) public void testEurekaSyncToNacosWithException() throws Exception { - Assert.assertFalse(eurekaSyncToNacosService.sync(null)); + Assert.assertFalse(eurekaSyncToNacosService.sync(null, null)); } @Test(expected = Exception.class) public void testEurekaDeleteToNacosWithException() throws Exception { diff --git a/nacossync-worker/src/test/java/com/alibaba/nacossync/extension/impl/NacosSyncToNacosServiceImplTest.java b/nacossync-worker/src/test/java/com/alibaba/nacossync/extension/impl/NacosSyncToNacosServiceImplTest.java index bed06822..81db1454 100644 --- a/nacossync-worker/src/test/java/com/alibaba/nacossync/extension/impl/NacosSyncToNacosServiceImplTest.java +++ b/nacossync-worker/src/test/java/com/alibaba/nacossync/extension/impl/NacosSyncToNacosServiceImplTest.java @@ -7,6 +7,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.naming.NamingService; import com.alibaba.nacos.api.naming.pojo.Instance; import com.alibaba.nacossync.constant.SkyWalkerConstants; @@ -51,7 +52,22 @@ public void testZookeeperSyncToNacos() throws Exception { TaskDO taskDO = mock(TaskDO.class); mockSync(taskDO); // TODO Test the core logic in the future - Assert.assertTrue(nacosSyncToNacosService.sync(taskDO)); + Assert.assertTrue(nacosSyncToNacosService.sync(taskDO,null)); + } + + @Test + public void testZookeeperSyncToNacosWithTimeSync() throws Exception { + TaskDO taskDO = mock(TaskDO.class); + try { + nacosSyncToNacosService.timeSync(taskDO); + }catch (Exception e) { + Assert.assertEquals(e, NacosException.class); + } + } + + @Test(expected = Exception.class) + public void testZookeeperSyncToNacosWithTimeSync2() throws Exception { + nacosSyncToNacosService.timeSync(null); } @Test @@ -64,7 +80,7 @@ public void testDeleteSyncToNacos() throws Exception { @Test(expected = Exception.class) public void testNacosSyncToNacosWithException() throws Exception { - Assert.assertFalse(nacosSyncToNacosService.sync(null)); + Assert.assertFalse(nacosSyncToNacosService.sync(null, null)); } @Test(expected = Exception.class) public void testNacosDeleteToNacosWithException() throws Exception { diff --git a/nacossync-worker/src/test/java/com/alibaba/nacossync/extension/impl/NacosSyncToZookeeperServiceImplTest.java b/nacossync-worker/src/test/java/com/alibaba/nacossync/extension/impl/NacosSyncToZookeeperServiceImplTest.java index 920cc834..797c68de 100644 --- a/nacossync-worker/src/test/java/com/alibaba/nacossync/extension/impl/NacosSyncToZookeeperServiceImplTest.java +++ b/nacossync-worker/src/test/java/com/alibaba/nacossync/extension/impl/NacosSyncToZookeeperServiceImplTest.java @@ -60,7 +60,7 @@ public void testNacosDeleteToZookeeper() throws Exception { @Test(expected = Exception.class) public void testNacosSyncToZookeeperWithException() throws Exception { - Assert.assertFalse(nacosSyncToZookeeperService.sync(null)); + Assert.assertFalse(nacosSyncToZookeeperService.sync(null,null)); } @Test(expected = Exception.class) public void testNacosDeleteToZookeeperWithException() throws Exception { @@ -75,7 +75,7 @@ public boolean mockSync(TaskDO taskDO) throws Exception { doReturn(sourceNamingService).when(nacosServerHolder).get(any()); //TODO Test the core logic in the future - return nacosSyncToZookeeperService.sync(taskDO); + return nacosSyncToZookeeperService.sync(taskDO, null); } public boolean mockDelete(TaskDO taskDO) throws Exception { diff --git a/nacossync-worker/src/test/java/com/alibaba/nacossync/extension/impl/ZookeeperSyncToNacosServiceImplTest.java b/nacossync-worker/src/test/java/com/alibaba/nacossync/extension/impl/ZookeeperSyncToNacosServiceImplTest.java index cba4a951..bfa5022e 100644 --- a/nacossync-worker/src/test/java/com/alibaba/nacossync/extension/impl/ZookeeperSyncToNacosServiceImplTest.java +++ b/nacossync-worker/src/test/java/com/alibaba/nacossync/extension/impl/ZookeeperSyncToNacosServiceImplTest.java @@ -71,7 +71,7 @@ public void testZookeeperSyncToNacos() { public void testZookeeperDeleteToNacos() throws Exception { TaskDO taskDO = mock(TaskDO.class); mockSync(taskDO); - zookeeperSyncToNacosService.sync(taskDO); + zookeeperSyncToNacosService.sync(taskDO,null); Assert.assertTrue(mockDelete(taskDO)); @@ -79,7 +79,7 @@ public void testZookeeperDeleteToNacos() throws Exception { @Test(expected = Exception.class) public void tesZookeeperSyncToNacosWithException() throws Exception { - Assert.assertFalse(zookeeperSyncToNacosService.sync(null)); + Assert.assertFalse(zookeeperSyncToNacosService.sync(null, null)); } @Test(expected = Exception.class) @@ -100,7 +100,7 @@ public boolean mockSync(TaskDO taskDO) { when(treeCache.getCurrentData(any())).thenReturn(childData); doReturn(ClusterTypeEnum.ZK).when(skyWalkerCacheServices).getClusterType(any()); when(treeCache.getListenable()).thenReturn(listeners); - return zookeeperSyncToNacosService.sync(taskDO); + return zookeeperSyncToNacosService.sync(taskDO,null); } public boolean mockDelete(TaskDO taskDO) throws Exception { diff --git a/nacossync-worker/src/test/java/com/alibaba/nacossync/timer/FastSyncHelperTest.java b/nacossync-worker/src/test/java/com/alibaba/nacossync/timer/FastSyncHelperTest.java new file mode 100644 index 00000000..b02e6e9d --- /dev/null +++ b/nacossync-worker/src/test/java/com/alibaba/nacossync/timer/FastSyncHelperTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacossync.timer; + +import com.alibaba.nacossync.cache.SkyWalkerCacheServices; +import com.alibaba.nacossync.extension.SyncManagerService; +import com.alibaba.nacossync.extension.impl.NacosSyncToNacosServiceImpl; +import com.alibaba.nacossync.monitor.MetricsManager; +import com.alibaba.nacossync.pojo.model.TaskDO; +import com.alibaba.nacossync.util.Tuple; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.runners.MockitoJUnitRunner; + +import java.util.ArrayList; +import java.util.List; + +import static org.mockito.Mockito.mock; + +/** + * test data synchronization. + * + * @ClassName: FastSyncHelperTest + * @Author: ChenHao26 + * @Date: 2022/7/26 15:16 + * @Description: + */ +@RunWith(MockitoJUnitRunner.class) +public class FastSyncHelperTest { + + @Mock + private SkyWalkerCacheServices skyWalkerCacheServices; + + @Mock + private MetricsManager metricsManager; + + @Mock + private SyncManagerService syncManagerService; + + @Mock + private NacosSyncToNacosServiceImpl nacosSyncToNacosService; + + @InjectMocks + @Spy + private FastSyncHelper fastSyncHelper; + + @Test + public void testSyncWithThread() { + TaskDO taskDO = mock(TaskDO.class); + List list = new ArrayList<>(); + list.add(taskDO); + try { + fastSyncHelper.syncWithThread(list, nacosSyncToNacosService::timeSync); + } catch (Exception e) { + Assert.assertEquals(e, InterruptedException.class); + e.printStackTrace(); + } + } + + @Test + public void testAverageAssign() { + int limit = 2; + List sourceList = new ArrayList<>(); + sourceList.add("1"); + sourceList.add("2"); + sourceList.add("3"); + List>> lists = FastSyncHelper.averageAssign(sourceList, limit); + Assert.assertEquals(lists.get(0).getT2().size(), limit); + Assert.assertNotEquals(lists.get(0).getT2().size(), 3); + } +} diff --git a/pom.xml b/pom.xml index 261c8e20..539b3a45 100644 --- a/pom.xml +++ b/pom.xml @@ -25,7 +25,7 @@ pom - 0.4.8 + 0.4.9 2.5.14 2020.0.2 UTF-8 @@ -39,8 +39,8 @@ 3.12.0 30.1-jre 2.2 - 1.8 - 1.8 + 11 + 11 3.2.0 1.4.1 3.0.1