From 2d128274804eaf62f254e5d35da99c24241305a3 Mon Sep 17 00:00:00 2001 From: iYaDongWang <571905654@qq.com> Date: Tue, 9 Jan 2024 20:26:58 +0800 Subject: [PATCH] nacos to nacos support all groups sync --- .../constant/SkyWalkerConstants.java | 2 + .../nacossync/dao/ClusterAccessService.java | 6 +- .../nacossync/dao/TaskAccessService.java | 3 +- .../pojo/request/CatalogServiceResult.java | 36 +++++ .../nacossync/pojo/view/ServiceView.java | 38 ++++++ .../service/NacosEnhanceNamingService.java | 112 +++++++++++++++ .../processor/TaskAddAllProcessor.java | 127 +----------------- .../timer/CheckRunningStatusAllThread.java | 84 ++++++++---- 8 files changed, 259 insertions(+), 149 deletions(-) create mode 100644 nacossync-worker/src/main/java/com/alibaba/nacossync/pojo/request/CatalogServiceResult.java create mode 100644 nacossync-worker/src/main/java/com/alibaba/nacossync/pojo/view/ServiceView.java create mode 100644 nacossync-worker/src/main/java/com/alibaba/nacossync/service/NacosEnhanceNamingService.java diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/constant/SkyWalkerConstants.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/constant/SkyWalkerConstants.java index 0379e73e..4486507f 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/constant/SkyWalkerConstants.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/constant/SkyWalkerConstants.java @@ -38,4 +38,6 @@ public class SkyWalkerConstants { public static final String SYNC_INSTANCE_TAG="sync.instance.tag"; public static final Integer MAX_THREAD_NUM = 200; + public static final String ALL = "ALL"; + } diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/dao/ClusterAccessService.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/dao/ClusterAccessService.java index 2e11ca33..e5d0b9df 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/dao/ClusterAccessService.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/dao/ClusterAccessService.java @@ -107,7 +107,11 @@ private List getPredicates(Root root, CriteriaBuilder crit public int findClusterLevel(String sourceClusterId){ ClusterDO clusterDO = clusterRepository.findByClusterId(sourceClusterId); if (clusterDO != null) { - return clusterDO.getClusterLevel(); + Integer level = clusterDO.getClusterLevel(); + if (null == level) { + return 0; + } + return level; } return -1; } diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/dao/TaskAccessService.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/dao/TaskAccessService.java index 649eaf41..502e82f3 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/dao/TaskAccessService.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/dao/TaskAccessService.java @@ -16,6 +16,7 @@ */ package com.alibaba.nacossync.dao; +import com.alibaba.nacossync.constant.SkyWalkerConstants; import com.alibaba.nacossync.pojo.QueryCondition; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.Page; @@ -116,7 +117,7 @@ private Page getTaskDOS(QueryCondition queryCondition, Pageable pageable } public List findServiceNameIsNull() { - return taskRepository.findAllByServiceNameEquals("ALL"); + return taskRepository.findAllByServiceNameEquals(SkyWalkerConstants.ALL); } } diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/pojo/request/CatalogServiceResult.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/pojo/request/CatalogServiceResult.java new file mode 100644 index 00000000..3cba75a3 --- /dev/null +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/pojo/request/CatalogServiceResult.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.nacossync.pojo.request; + +import com.alibaba.nacossync.pojo.view.ServiceView; +import lombok.Data; + +import java.util.List; + +/** + * @author NacosSync + * @since 2024-01-04 15:54:20 + */ +@Data +public class CatalogServiceResult { + /** + * count,not equal serviceList.size . + */ + private int count; + + private List serviceList; +} diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/pojo/view/ServiceView.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/pojo/view/ServiceView.java new file mode 100644 index 00000000..f4917541 --- /dev/null +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/pojo/view/ServiceView.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.nacossync.pojo.view; + +import lombok.Data; + +/** + * @author NacosSync + * @since 2024-01-04 15:54:36 + */ +@Data +public class ServiceView { + private String name; + + private String groupName; + + private int clusterCount; + + private int ipCount; + + private int healthyInstanceCount; + + private String triggerFlag; +} diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/service/NacosEnhanceNamingService.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/service/NacosEnhanceNamingService.java new file mode 100644 index 00000000..774709b0 --- /dev/null +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/service/NacosEnhanceNamingService.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.nacossync.service; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.naming.CommonParams; +import com.alibaba.nacos.api.naming.NamingService; +import com.alibaba.nacos.client.naming.NacosNamingService; +import com.alibaba.nacos.client.naming.net.NamingProxy; +import com.alibaba.nacos.client.naming.utils.UtilAndComs; +import com.alibaba.nacos.common.utils.HttpMethod; +import com.alibaba.nacos.common.utils.JacksonUtils; +import com.alibaba.nacos.common.utils.StringUtils; +import com.alibaba.nacossync.pojo.request.CatalogServiceResult; +import org.springframework.util.ReflectionUtils; + +import javax.annotation.Nullable; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import static com.alibaba.nacossync.constant.SkyWalkerConstants.GROUP_NAME_PARAM; +import static com.alibaba.nacossync.constant.SkyWalkerConstants.PAGE_NO; +import static com.alibaba.nacossync.constant.SkyWalkerConstants.PAGE_SIZE; +import static com.alibaba.nacossync.constant.SkyWalkerConstants.SERVICE_NAME_PARAM; + +/** + * @author NacosSync + * @since 2024-01-04 15:53:40 + */ +public class NacosEnhanceNamingService { + + + protected NamingService delegate; + + protected NamingProxy serverProxy; + + public NacosEnhanceNamingService(NamingService namingService) { + if (!(namingService instanceof NacosNamingService)) { + throw new IllegalArgumentException( + "namingService only support instance of com.alibaba.nacos.client.naming.NacosNamingService."); + } + this.delegate = namingService; + + // serverProxy + final Field serverProxyField = ReflectionUtils.findField(NacosNamingService.class, "serverProxy"); + assert serverProxyField != null; + ReflectionUtils.makeAccessible(serverProxyField); + this.serverProxy = (NamingProxy) ReflectionUtils.getField(serverProxyField, delegate); + } + + public CatalogServiceResult catalogServices(@Nullable String serviceName, @Nullable String group) + throws NacosException { + int pageNo = 1; // start with 1 + int pageSize = 100; + + final CatalogServiceResult result = catalogServices(serviceName, group, pageNo, pageSize); + + CatalogServiceResult tmpResult = result; + + while (Objects.nonNull(tmpResult) && tmpResult.getServiceList().size() >= pageSize) { + pageNo++; + tmpResult = catalogServices(serviceName, group, pageNo, pageSize); + + if (tmpResult != null) { + result.getServiceList().addAll(tmpResult.getServiceList()); + } + } + + return result; + } + + /** + * @see com.alibaba.nacos.client.naming.core.HostReactor#getServiceInfoDirectlyFromServer(String, String) + */ + public CatalogServiceResult catalogServices(@Nullable String serviceName, @Nullable String group, int pageNo, + int pageSize) throws NacosException { + + // pageNo + // pageSize + // serviceNameParam + // groupNameParam + final Map params = new HashMap<>(8); + params.put(CommonParams.NAMESPACE_ID, serverProxy.getNamespaceId()); + params.put(SERVICE_NAME_PARAM, serviceName); + params.put(GROUP_NAME_PARAM, group); + params.put(PAGE_NO, String.valueOf(pageNo)); + params.put(PAGE_SIZE, String.valueOf(pageSize)); + + final String result = this.serverProxy.reqApi(UtilAndComs.nacosUrlBase + "/catalog/services", params, + HttpMethod.GET); + if (StringUtils.isNotEmpty(result)) { + return JacksonUtils.toObj(result, CatalogServiceResult.class); + } + return null; + } +} diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/template/processor/TaskAddAllProcessor.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/template/processor/TaskAddAllProcessor.java index c02cb676..91b0113b 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/template/processor/TaskAddAllProcessor.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/template/processor/TaskAddAllProcessor.java @@ -17,15 +17,7 @@ package com.alibaba.nacossync.template.processor; -import com.alibaba.nacos.api.exception.NacosException; -import com.alibaba.nacos.api.naming.CommonParams; import com.alibaba.nacos.api.naming.NamingService; -import com.alibaba.nacos.client.naming.NacosNamingService; -import com.alibaba.nacos.client.naming.net.NamingProxy; -import com.alibaba.nacos.client.naming.utils.UtilAndComs; -import com.alibaba.nacos.common.utils.HttpMethod; -import com.alibaba.nacos.common.utils.JacksonUtils; -import com.alibaba.nacos.common.utils.StringUtils; import com.alibaba.nacossync.constant.TaskStatusEnum; import com.alibaba.nacossync.dao.ClusterAccessService; import com.alibaba.nacossync.dao.TaskAccessService; @@ -34,28 +26,19 @@ import com.alibaba.nacossync.extension.holder.NacosServerHolder; import com.alibaba.nacossync.pojo.model.ClusterDO; import com.alibaba.nacossync.pojo.model.TaskDO; +import com.alibaba.nacossync.pojo.request.CatalogServiceResult; import com.alibaba.nacossync.pojo.request.TaskAddAllRequest; import com.alibaba.nacossync.pojo.request.TaskAddRequest; import com.alibaba.nacossync.pojo.result.TaskAddResult; +import com.alibaba.nacossync.pojo.view.ServiceView; +import com.alibaba.nacossync.service.NacosEnhanceNamingService; import com.alibaba.nacossync.template.Processor; import com.alibaba.nacossync.util.SkyWalkerUtil; -import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; -import org.springframework.util.ReflectionUtils; -import javax.annotation.Nullable; -import java.lang.reflect.Field; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.Objects; -import static com.alibaba.nacossync.constant.SkyWalkerConstants.GROUP_NAME_PARAM; -import static com.alibaba.nacossync.constant.SkyWalkerConstants.PAGE_NO; -import static com.alibaba.nacossync.constant.SkyWalkerConstants.PAGE_SIZE; -import static com.alibaba.nacossync.constant.SkyWalkerConstants.SERVICE_NAME_PARAM; - /** * @author NacosSync * @version $Id: TaskAddAllProcessor.java, v 0.1 2022-03-23 PM11:40 NacosSync Exp $$ @@ -103,7 +86,7 @@ public void process(TaskAddAllRequest addAllRequest, TaskAddResult taskAddResult throw new SkyWalkerException("only support sync type that the source of the Nacos."); } - final EnhanceNamingService enhanceNamingService = new EnhanceNamingService(sourceNamingService); + final NacosEnhanceNamingService enhanceNamingService = new NacosEnhanceNamingService(sourceNamingService); final CatalogServiceResult catalogServiceResult = enhanceNamingService.catalogServices(null, null); if (catalogServiceResult == null || catalogServiceResult.getCount() <= 0) { throw new SkyWalkerException("sourceCluster data empty"); @@ -146,105 +129,7 @@ private void dealTask(TaskAddAllRequest addAllRequest, TaskAddRequest taskAddReq } taskAccessService.addTask(taskDO); } - - static class EnhanceNamingService { - - protected NamingService delegate; - - protected NamingProxy serverProxy; - - protected EnhanceNamingService(NamingService namingService) { - if (!(namingService instanceof NacosNamingService)) { - throw new IllegalArgumentException( - "namingService only support instance of com.alibaba.nacos.client.naming.NacosNamingService."); - } - this.delegate = namingService; - - // serverProxy - final Field serverProxyField = ReflectionUtils.findField(NacosNamingService.class, "serverProxy"); - assert serverProxyField != null; - ReflectionUtils.makeAccessible(serverProxyField); - this.serverProxy = (NamingProxy) ReflectionUtils.getField(serverProxyField, delegate); - } - - public CatalogServiceResult catalogServices(@Nullable String serviceName, @Nullable String group) - throws NacosException { - int pageNo = 1; // start with 1 - int pageSize = 100; - - final CatalogServiceResult result = catalogServices(serviceName, group, pageNo, pageSize); - - CatalogServiceResult tmpResult = result; - - while (Objects.nonNull(tmpResult) && tmpResult.serviceList.size() >= pageSize) { - pageNo++; - tmpResult = catalogServices(serviceName, group, pageNo, pageSize); - - if (tmpResult != null) { - result.serviceList.addAll(tmpResult.serviceList); - } - } - - return result; - } - - /** - * @see com.alibaba.nacos.client.naming.core.HostReactor#getServiceInfoDirectlyFromServer(String, String) - */ - public CatalogServiceResult catalogServices(@Nullable String serviceName, @Nullable String group, int pageNo, - int pageSize) throws NacosException { - - // pageNo - // pageSize - // serviceNameParam - // groupNameParam - final Map params = new HashMap<>(8); - params.put(CommonParams.NAMESPACE_ID, serverProxy.getNamespaceId()); - params.put(SERVICE_NAME_PARAM, serviceName); - params.put(GROUP_NAME_PARAM, group); - params.put(PAGE_NO, String.valueOf(pageNo)); - params.put(PAGE_SIZE, String.valueOf(pageSize)); - - final String result = this.serverProxy.reqApi(UtilAndComs.nacosUrlBase + "/catalog/services", params, - HttpMethod.GET); - if (StringUtils.isNotEmpty(result)) { - return JacksonUtils.toObj(result, CatalogServiceResult.class); - } - return null; - } - - } - - /** - * Copy from Nacos Server. - */ - @Data - static class ServiceView { - - private String name; - - private String groupName; - - private int clusterCount; - - private int ipCount; - - private int healthyInstanceCount; - - private String triggerFlag; - - } - - @Data - static class CatalogServiceResult { - - /** - * count,not equal serviceList.size . - */ - private int count; - - private List serviceList; - - } + + } diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/timer/CheckRunningStatusAllThread.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/timer/CheckRunningStatusAllThread.java index bd332220..c51e4818 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/timer/CheckRunningStatusAllThread.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/timer/CheckRunningStatusAllThread.java @@ -17,21 +17,28 @@ package com.alibaba.nacossync.timer; +import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.naming.NamingService; import com.alibaba.nacos.api.naming.pojo.ListView; import com.alibaba.nacos.client.naming.utils.CollectionUtils; import com.alibaba.nacossync.cache.SkyWalkerCacheServices; import com.alibaba.nacossync.constant.MetricsStatisticsType; +import com.alibaba.nacossync.constant.SkyWalkerConstants; import com.alibaba.nacossync.constant.TaskStatusEnum; import com.alibaba.nacossync.dao.TaskAccessService; import com.alibaba.nacossync.extension.holder.NacosServerHolder; import com.alibaba.nacossync.monitor.MetricsManager; import com.alibaba.nacossync.pojo.model.TaskDO; +import com.alibaba.nacossync.pojo.request.CatalogServiceResult; +import com.alibaba.nacossync.pojo.view.ServiceView; +import com.alibaba.nacossync.service.NacosEnhanceNamingService; import com.google.common.eventbus.EventBus; import lombok.extern.slf4j.Slf4j; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; /** @@ -43,19 +50,19 @@ */ @Slf4j public class CheckRunningStatusAllThread implements Runnable{ - + private MetricsManager metricsManager; - + private SkyWalkerCacheServices skyWalkerCacheServices; - + private TaskAccessService taskAccessService; - + private EventBus eventBus; - + private NacosServerHolder nacosServerHolder; - + private FastSyncHelper fastSyncHelper; - + public CheckRunningStatusAllThread(MetricsManager metricsManager, SkyWalkerCacheServices skyWalkerCacheServices, TaskAccessService taskAccessService, EventBus eventBus, NacosServerHolder nacosServerHolder, FastSyncHelper fastSyncHelper) { @@ -66,9 +73,9 @@ public CheckRunningStatusAllThread(MetricsManager metricsManager, SkyWalkerCache this.nacosServerHolder = nacosServerHolder; this.fastSyncHelper = fastSyncHelper; } - + /** - * 根据ns级别进行数据同步 + * 根据ns/cluster级别进行数据同步 */ @Override public void run() { @@ -81,23 +88,27 @@ public void run() { return; } for (TaskDO taskDO : taskDOS) { - List serviceNameList = getServiceNameList(taskDO); - if (CollectionUtils.isEmpty(serviceNameList)) { - continue; - } - - //如果是null,证明此时没有处理完成 - List filterService = serviceNameList.stream() - .filter(serviceName -> skyWalkerCacheServices.getFinishedTask(taskDO.getTaskId() + serviceName ) == null) - .collect(Collectors.toList()); - - if (CollectionUtils.isEmpty(filterService)) { + Map> serviceNameListGroupByGroupName = getServiceNameListGroupByGroupName(taskDO); + if (serviceNameListGroupByGroupName.isEmpty()) { continue; } - - // 当删除任务后,此时任务的状态为DELETE,不会执行数据同步 - if (TaskStatusEnum.SYNC.getCode().equals(taskDO.getTaskStatus())) { - fastSyncHelper.syncWithThread(taskDO, filterService); + for (Map.Entry> entry : serviceNameListGroupByGroupName.entrySet()) { + taskDO.setGroupName(entry.getKey()); + List serviceNameList = entry.getValue(); + + //如果是null,证明此时没有处理完成 + List filterService = serviceNameList.stream() + .filter(serviceName -> skyWalkerCacheServices.getFinishedTask(taskDO.getTaskId() + serviceName ) == null) + .collect(Collectors.toList()); + + if (CollectionUtils.isEmpty(filterService)) { + continue; + } + + // 当删除任务后,此时任务的状态为DELETE,不会执行数据同步 + if (TaskStatusEnum.SYNC.getCode().equals(taskDO.getTaskStatus())) { + fastSyncHelper.syncWithThread(taskDO, filterService); + } } } }catch (Exception e) { @@ -105,7 +116,7 @@ public void run() { } metricsManager.record(MetricsStatisticsType.DISPATCHER_TASK, System.currentTimeMillis() - startTime); } - + /** * get serviceName list. * @param taskDO task info @@ -120,7 +131,28 @@ private List getServiceNameList(TaskDO taskDO) { } catch (Exception e) { log.error("query service list failure",e); } - + return Collections.emptyList(); } + + private Map> getServiceNameListGroupByGroupName(TaskDO taskDO) throws NacosException { + Map> map = new HashMap<>(); + NamingService namingService = nacosServerHolder.get(taskDO.getSourceClusterId()); + if (SkyWalkerConstants.ALL.equals(taskDO.getGroupName())) { + // 如果serviceName和GroutName都是ALL,则进行集群全量同步 + NacosEnhanceNamingService enhanceNamingService = new NacosEnhanceNamingService(namingService); + CatalogServiceResult catalogServiceResult = enhanceNamingService.catalogServices(null, null); + if (catalogServiceResult == null || catalogServiceResult.getCount() <= 0) { + return map; + } + List serviceList = catalogServiceResult.getServiceList(); + return serviceList.stream() + .collect(Collectors.groupingBy(ServiceView::getGroupName, Collectors.mapping(ServiceView::getName, Collectors.toList()))); + + } + ListView servicesOfServer = namingService.getServicesOfServer(0, Integer.MAX_VALUE, + taskDO.getGroupName()); + map.put(taskDO.getGroupName(), servicesOfServer.getData()); + return map; + } }