Skip to content

Commit

Permalink
fix:修复ServiceDiscovery中对于map的错误使用 (#56)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun authored Mar 1, 2024
1 parent d16ec7a commit e8b4b2f
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,16 @@ public void publishAppMetadata(SubscriberMetadataIdentifier identifier, Metadata

@Override
public MetadataInfo getAppMetadata(SubscriberMetadataIdentifier identifier, Map<String, String> instanceMetadata) {
// 这里由于查询的应用的接口定义数据,这里不能设置 version,必须显示设置 version 为空
GetServiceContractRequest request = new GetServiceContractRequest();
request.setName(formatAppMetaName(identifier));
request.setService(identifier.getApplication());
request.setVersion("");

Optional<ServiceContractProto.ServiceContract> result = getServiceContract(request);
if (!result.isPresent()) {
return new MetadataInfo();
// 这里返回一个空的 MetadataInfo
return MetadataInfo.EMPTY;
}

Map<String, MetadataInfo.ServiceInfo> serviceInfos = new HashMap<>();
Expand All @@ -163,6 +165,7 @@ public MetadataInfo getAppMetadata(SubscriberMetadataIdentifier identifier, Map<
return new MetadataInfo(identifier.getApplication(), identifier.getRevision(), serviceInfos);
}

// toDescriptor 该方法是将 dubbo 接口运维元数据转为北极星的服务契约定义进行存储
private ReportServiceContractRequest toDescriptor(MetadataIdentifier identifier, String serviceDefinitions) {
ReportServiceContractRequest request = new ReportServiceContractRequest();
request.setName(formatMetadataIdentifier(identifier));
Expand All @@ -183,6 +186,12 @@ private ReportServiceContractRequest toDescriptor(MetadataIdentifier identifier,
return request;
}

/**
* 获取服务契约的通用调用接口
*
* @param req {@link GetServiceContractRequest}
* @return {@link Optional<ServiceContractProto.ServiceContract>}
*/
private Optional<ServiceContractProto.ServiceContract> getServiceContract(GetServiceContractRequest req) {
req.setNamespace(config.getNamespace());
req.setProtocol(Consts.DUBBO_PROTOCOL);
Expand All @@ -206,7 +215,11 @@ private Optional<ServiceContractProto.ServiceContract> getServiceContract(GetSer
return Optional.empty();
}


/**
* 上报服务契约定义通用接口
*
* @param req {@link ReportServiceContractRequest}
*/
private void reportServiceContract(ReportServiceContractRequest req) {
req.setNamespace(config.getNamespace());
req.setProtocol(Consts.DUBBO_PROTOCOL);
Expand All @@ -226,6 +239,15 @@ private void reportServiceContract(ReportServiceContractRequest req) {
// ------- 和 Dubbo3 应用级注册发现有关的操作 --------
// ------- 这里必须实现,否则就需要用户指定 providerBy ------

/**
* 存储 dubbo 的接口-应用的 mapping 数据时,这里对接的北极星的服务契约时,服务、版本信息为空,必须显示设置
* InterfaceDescriptor -> 作为记录 dubbo 应用数据
*
* @param serviceKey dubbo 接口名称
* @param application dubbo 应用名称
* @param url {@link URL}
* @return 返回接口-应用 mapping 数据是否发布成功
*/
@Override
public boolean registerServiceAppMapping(String serviceKey, String application, URL url) {
ReportServiceContractRequest request = new ReportServiceContractRequest();
Expand Down Expand Up @@ -322,24 +344,30 @@ public void run() {
try {
GetServiceContractRequest request = new GetServiceContractRequest();
request.setName(formatMappingName(serviceKey));
request.setService("");
request.setVersion("");

Optional<ServiceContractProto.ServiceContract> result = report.getServiceContract(request);
result.ifPresent(serviceContract -> {
ServiceContractProto.ServiceContract saveData = report.mappingSubscribes.get(serviceKey);
boolean needNotify = false;
// 如果之前就不存在这个 mapping 数据,需要触发通知
if (Objects.isNull(saveData)) {
report.mappingSubscribes.put(serviceKey, serviceContract);
needNotify = true;
}
if (Objects.nonNull(saveData)) {
// 如果 revision 信息比较不一致,则表明出现更细,需要触发通知
if (!Objects.equals(saveData.getRevision(), serviceContract.getRevision())) {
report.mappingSubscribes.put(serviceKey, serviceContract);
needNotify = true;
}
}
if (needNotify) {
Set<String> newApplications = getAppNames(serviceContract);
report.logger.info(String.format("receive mapping change event, interface=%s applications=%s", serviceKey, newApplications));
Set<MappingListener> listeners = report.mappingListeners.getOrDefault(serviceKey, Collections.emptySet());
MappingChangedEvent event = new MappingChangedEvent(serviceKey, getAppNames(serviceContract));
MappingChangedEvent event = new MappingChangedEvent(serviceKey, newApplications);
listeners.forEach(mappingListener -> mappingListener.onEvent(event));
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,20 @@ public void addServiceInstancesChangedListener(ServiceInstancesChangedListener
}
Set<String> services = listener.getServiceNames();
for (String service : services) {
serviceListeners.computeIfAbsent(service, name -> {
ServiceListener serviceListener = new InnerServiceListener(service);
listenerMap.put(service, serviceListener);
serviceListeners.computeIfAbsent(service, name -> new ConcurrentHashSet<>());
serviceListeners.get(service).add(listener);

// 按照一个 service 一个 ServiceListener 的纬度
listenerMap.computeIfAbsent(service, s -> {
ServiceListener serviceListener = new InnerServiceListener(service);
WatchServiceRequest request = new WatchServiceRequest();
request.setNamespace(operator.getPolarisConfig().getNamespace());
request.setService(service);
request.setListeners(Collections.singletonList(serviceListener));
consumerAPI.watchService(request);
return new ConcurrentHashSet<>();
return serviceListener;
});

serviceListeners.get(service).add(listener);
}
}

Expand All @@ -158,6 +159,7 @@ public void removeServiceInstancesChangedListener(ServiceInstancesChangedListene
Set<String> services = listener.getServiceNames();
for (String service : services) {
Set<ServiceInstancesChangedListener> listeners = serviceListeners.get(service);
listeners.remove(listener);
if (CollectionUtils.isEmpty(listeners)) {
serviceListeners.remove(service);

Expand All @@ -171,9 +173,7 @@ public void removeServiceInstancesChangedListener(ServiceInstancesChangedListene
.build();
consumerAPI.unWatchService(request);
}
continue;
}
listeners.remove(listener);
}
}

Expand All @@ -188,6 +188,7 @@ private InnerServiceListener(String service) {
@Override
public void onEvent(ServiceChangeEvent event) {
String serviceName = event.getServiceKey().getService();
// 注意,这里不能走 Event 里面的服务数据列表,必须要走 ConsumerAPI 重新走正常的 Router 能力过滤掉隔离、权重为0的实例
Instance[] instances = operator.getAvailableInstances(serviceName, true);
if (Objects.isNull(instances) || instances.length == 0) {
return;
Expand Down

0 comments on commit e8b4b2f

Please sign in to comment.