Skip to content

Commit

Permalink
Fix registry repeat export same service
Browse files Browse the repository at this point in the history
  • Loading branch information
AlbumenJ committed Jun 20, 2023
1 parent ea35f7e commit e42951b
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.dubbo.registry;

import org.apache.dubbo.common.beans.factory.ScopeBeanFactory;
import org.apache.dubbo.registry.integration.ExporterFactory;
import org.apache.dubbo.registry.support.RegistryManager;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.FrameworkModel;
Expand All @@ -26,7 +27,8 @@
public class RegistryScopeModelInitializer implements ScopeModelInitializer {
@Override
public void initializeFrameworkModel(FrameworkModel frameworkModel) {

ScopeBeanFactory beanFactory = frameworkModel.getBeanFactory();
beanFactory.registerBean(ExporterFactory.class);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.registry.integration;

import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Protocol;
import org.apache.dubbo.rpc.model.FrameworkModel;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class ExporterFactory {
private final Map<String, ReferenceCountExporter<?>> exporters = new ConcurrentHashMap<>();

private final Protocol protocol;

public ExporterFactory(FrameworkModel frameworkModel) {
this.protocol = frameworkModel.getExtensionLoader(Protocol.class).getAdaptiveExtension();
}

protected ReferenceCountExporter<?> createExporter(String providerKey, Invoker<?> invoker) {
return exporters.computeIfAbsent(providerKey,
key -> new ReferenceCountExporter<>(protocol.export(invoker), key, this));
}

protected void remove(String key, ReferenceCountExporter<?> exporter) {
exporters.remove(key, exporter);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.registry.integration;

import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invoker;

import java.util.concurrent.atomic.AtomicInteger;

public class ReferenceCountExporter<T> implements Exporter<T> {
private final Exporter<T> exporter;
private final String providerKey;
private final ExporterFactory exporterFactory;
private final AtomicInteger count = new AtomicInteger(0);

public ReferenceCountExporter(Exporter<T> exporter, String providerKey, ExporterFactory exporterFactory) {
this.exporter = exporter;
this.providerKey = providerKey;
this.exporterFactory = exporterFactory;
}

@Override
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}

public void increaseCount() {
count.incrementAndGet();
}

@Override
public void unexport() {
if (count.decrementAndGet() == 0) {
exporter.unexport();
}
exporterFactory.remove(providerKey, this);
}

@Override
public void register() {

}

@Override
public void unregister() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ public class RegistryProtocol implements Protocol, ScopeModelAware {
private ConcurrentMap<URL, ReExportTask> reExportFailedTasks = new ConcurrentHashMap<>();
private HashedWheelTimer retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboReexportTimer", true), DEFAULT_REGISTRY_RETRY_PERIOD, TimeUnit.MILLISECONDS, 128);
private FrameworkModel frameworkModel;
private ExporterFactory exporterFactory;

//Filter the parameters that do not need to be output in url(Starting with .)
private static String[] getFilteredKeys(URL url) {
Expand All @@ -190,6 +191,7 @@ public RegistryProtocol() {
@Override
public void setFrameworkModel(FrameworkModel frameworkModel) {
this.frameworkModel = frameworkModel;
this.exporterFactory = frameworkModel.getBeanFactory().getBean(ExporterFactory.class);
}

public void setProtocol(Protocol protocol) {
Expand Down Expand Up @@ -314,9 +316,10 @@ private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originIn
String registryUrlKey = getRegistryUrlKey(originInvoker);

return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(providerUrlKey, _k -> new ConcurrentHashMap<>())
.computeIfAbsent(registryUrlKey, s ->{
.computeIfAbsent(registryUrlKey, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
return new ExporterChangeableWrapper<>(
(ReferenceCountExporter<T>) exporterFactory.createExporter(providerUrlKey, invokerDelegate), originInvoker);
});
}

Expand Down Expand Up @@ -953,8 +956,9 @@ private class ExporterChangeableWrapper<T> implements Exporter<T> {
private NotifyListener notifyListener;
private final AtomicBoolean registered = new AtomicBoolean(false);

public ExporterChangeableWrapper(Exporter<T> exporter, Invoker<T> originInvoker) {
public ExporterChangeableWrapper(ReferenceCountExporter<T> exporter, Invoker<T> originInvoker) {
this.exporter = exporter;
exporter.increaseCount();
this.originInvoker = originInvoker;
FrameworkExecutorRepository frameworkExecutorRepository = originInvoker.getUrl().getOrDefaultFrameworkModel().getBeanFactory()
.getBean(FrameworkExecutorRepository.class);
Expand Down

0 comments on commit e42951b

Please sign in to comment.