Skip to content

Commit 111230d

Browse files
author
Amichai Rothman
committed
ARIES-2120 Change DistributionProvider to return a Closeable ImportedService so that resources can be properly released
1 parent 923d55e commit 111230d

File tree

11 files changed

+105
-41
lines changed

11 files changed

+105
-41
lines changed

itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryExport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void testDiscoveryExport() throws Exception {
6969
EndpointDescription epd = getEndpoint();
7070
EchoService service = (EchoService)tcpProvider
7171
.importEndpoint(EchoService.class.getClassLoader(),
72-
bundleContext, new Class[]{EchoService.class}, epd);
72+
bundleContext, new Class[]{EchoService.class}, epd).getService();
7373
Assert.assertEquals("test", service.echo("test"));
7474
}
7575

provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.aries.rsa.provider.fastbin.util.UuidGenerator;
3636
import org.apache.aries.rsa.spi.DistributionProvider;
3737
import org.apache.aries.rsa.spi.Endpoint;
38+
import org.apache.aries.rsa.spi.ImportedService;
3839
import org.apache.aries.rsa.spi.IntentUnsatisfiedException;
3940
import org.fusesource.hawtdispatch.Dispatch;
4041
import org.fusesource.hawtdispatch.DispatchQueue;
@@ -172,15 +173,16 @@ public void close() throws IOException {
172173
}
173174

174175
@Override
175-
public Object importEndpoint(ClassLoader cl,
176-
BundleContext consumerContext,
177-
Class[] interfaces,
178-
EndpointDescription endpoint)
176+
public ImportedService importEndpoint(ClassLoader cl,
177+
BundleContext consumerContext,
178+
Class[] interfaces,
179+
EndpointDescription endpoint)
179180
throws IntentUnsatisfiedException {
180181

181182
String address = (String) endpoint.getProperties().get(FASTBIN_ADDRESS);
182183
InvocationHandler handler = client.getProxy(address, endpoint.getId(), cl);
183-
return Proxy.newProxyInstance(cl, interfaces, handler);
184+
Object service = Proxy.newProxyInstance(cl, interfaces, handler);
185+
return () -> service;
184186
}
185187

186188
}

provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpProvider.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.aries.rsa.annotations.RSADistributionProvider;
3333
import org.apache.aries.rsa.spi.DistributionProvider;
3434
import org.apache.aries.rsa.spi.Endpoint;
35+
import org.apache.aries.rsa.spi.ImportedService;
3536
import org.apache.aries.rsa.spi.IntentUnsatisfiedException;
3637
import org.apache.aries.rsa.util.StringPlus;
3738
import org.osgi.framework.BundleContext;
@@ -126,17 +127,18 @@ private synchronized void removeServer(TcpEndpoint endpoint) {
126127
}
127128

128129
@Override
129-
public Object importEndpoint(ClassLoader cl,
130-
BundleContext consumerContext,
131-
Class[] interfaces,
132-
EndpointDescription endpoint)
130+
public ImportedService importEndpoint(ClassLoader cl,
131+
BundleContext consumerContext,
132+
Class[] interfaces,
133+
EndpointDescription endpoint)
133134
throws IntentUnsatisfiedException {
134135
try {
135136
String endpointId = endpoint.getId();
136137
URI address = new URI(endpointId);
137138
int timeout = new EndpointPropertiesParser(endpoint).getTimeoutMillis();
138139
InvocationHandler handler = new TcpInvocationHandler(cl, address.getHost(), address.getPort(), endpointId, timeout);
139-
return Proxy.newProxyInstance(cl, interfaces, handler);
140+
Object service = Proxy.newProxyInstance(cl, interfaces, handler);
141+
return () -> service;
140142
} catch (Exception e) {
141143
throw new RuntimeException(e);
142144
}

provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderPrimitiveTest.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.aries.rsa.provider.tcp.myservice.PrimitiveService;
3939
import org.apache.aries.rsa.provider.tcp.myservice.PrimitiveServiceImpl;
4040
import org.apache.aries.rsa.spi.Endpoint;
41+
import org.apache.aries.rsa.spi.ImportedService;
4142
import org.apache.aries.rsa.util.EndpointHelper;
4243
import org.easymock.EasyMock;
4344
import org.junit.AfterClass;
@@ -52,6 +53,7 @@ public class TcpProviderPrimitiveTest {
5253

5354
private static PrimitiveService myServiceProxy;
5455
private static Endpoint ep;
56+
private static ImportedService importedService;
5557

5658
@BeforeClass
5759
public static void createServerAndProxy() {
@@ -66,10 +68,11 @@ public static void createServerAndProxy() {
6668
ep = provider.exportService(myService, bc, props, exportedInterfaces);
6769
assertThat(ep.description().getId(), startsWith("tcp://localhost:"));
6870
System.out.println(ep.description());
69-
myServiceProxy = (PrimitiveService)provider.importEndpoint(PrimitiveService.class.getClassLoader(),
70-
bc,
71-
exportedInterfaces,
72-
ep.description());
71+
importedService = provider.importEndpoint(PrimitiveService.class.getClassLoader(),
72+
bc,
73+
exportedInterfaces,
74+
ep.description());
75+
myServiceProxy = (PrimitiveService)importedService.getService();
7376
}
7477

7578
@Test
@@ -164,6 +167,7 @@ public void testDTOAr() {
164167

165168
@AfterClass
166169
public static void close() throws IOException {
170+
importedService.close();
167171
ep.close();
168172
}
169173

provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.aries.rsa.provider.tcp.myservice.MyService;
4848
import org.apache.aries.rsa.provider.tcp.myservice.MyServiceImpl;
4949
import org.apache.aries.rsa.spi.Endpoint;
50+
import org.apache.aries.rsa.spi.ImportedService;
5051
import org.apache.aries.rsa.util.EndpointHelper;
5152
import org.easymock.EasyMock;
5253
import org.junit.AfterClass;
@@ -67,6 +68,8 @@ public class TcpProviderTest {
6768
private static MyService myServiceProxy2;
6869
private static Endpoint ep;
6970
private static Endpoint ep2;
71+
private static ImportedService importedService;
72+
private static ImportedService importedService2;
7073

7174
protected static int getFreePort() throws IOException {
7275
try (ServerSocket socket = new ServerSocket()) {
@@ -93,16 +96,18 @@ public static void createServerAndProxy() throws IOException {
9396
props.put("aries.rsa.id", "service2");
9497
ep2 = provider.exportService(new MyServiceImpl("service2"), bc, props, exportedInterfaces);
9598
assertThat(ep.description().getId(), startsWith("tcp://localhost:"));
96-
myServiceProxy = (MyService)provider.importEndpoint(
97-
MyService.class.getClassLoader(),
98-
bc,
99-
exportedInterfaces,
100-
ep.description());
101-
myServiceProxy2 = (MyService)provider.importEndpoint(
102-
MyService.class.getClassLoader(),
103-
bc,
104-
exportedInterfaces,
105-
ep2.description());
99+
importedService = provider.importEndpoint(
100+
MyService.class.getClassLoader(),
101+
bc,
102+
exportedInterfaces,
103+
ep.description());
104+
myServiceProxy = (MyService)importedService.getService();
105+
importedService2 = provider.importEndpoint(
106+
MyService.class.getClassLoader(),
107+
bc,
108+
exportedInterfaces,
109+
ep2.description());
110+
myServiceProxy2 = (MyService)importedService2.getService();
106111
}
107112

108113
@Test
@@ -221,7 +226,10 @@ public void testAsyncPromiseException() throws Throwable {
221226

222227
@AfterClass
223228
public static void close() throws IOException {
229+
importedService.close();
230+
importedService2.close();
224231
ep.close();
232+
ep2.close();
225233
}
226234

227235
private void runPerfTest(final MyService myServiceProxy2) throws InterruptedException {

rsa/src/main/java/org/apache/aries/rsa/core/ClientServiceFactory.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,16 @@
1818
*/
1919
package org.apache.aries.rsa.core;
2020

21+
import java.io.IOException;
2122
import java.security.AccessController;
2223
import java.security.PrivilegedAction;
2324
import java.util.ArrayList;
25+
import java.util.HashMap;
2426
import java.util.List;
27+
import java.util.Map;
2528

2629
import org.apache.aries.rsa.spi.DistributionProvider;
30+
import org.apache.aries.rsa.spi.ImportedService;
2731
import org.apache.aries.rsa.spi.IntentUnsatisfiedException;
2832
import org.osgi.framework.Bundle;
2933
import org.osgi.framework.BundleContext;
@@ -44,7 +48,7 @@ public class ClientServiceFactory implements ServiceFactory {
4448
private ImportRegistrationImpl importRegistration;
4549

4650
private boolean closeable;
47-
private int serviceCounter;
51+
private Map<Object, ImportedService> services = new HashMap<>();
4852

4953
public ClientServiceFactory(EndpointDescription endpoint,
5054
DistributionProvider handler, ImportRegistrationImpl ir) {
@@ -63,15 +67,16 @@ public Object getService(final Bundle requestingBundle, final ServiceRegistratio
6367
for (String ifaceName : interfaceNames) {
6468
interfaces.add(consumerLoader.loadClass(ifaceName));
6569
}
66-
Object proxy = AccessController.doPrivileged(new PrivilegedAction<Object>() {
67-
public Object run() {
70+
ImportedService importedService = AccessController.doPrivileged(new PrivilegedAction<ImportedService>() {
71+
public ImportedService run() {
6872
Class<?>[] ifAr = interfaces.toArray(new Class[]{});
6973
return handler.importEndpoint(consumerLoader, consumerContext, ifAr, endpoint);
7074
}
7175
});
7276

77+
Object proxy = importedService.getService();
7378
synchronized (this) {
74-
serviceCounter++;
79+
services.put(proxy, importedService);
7580
}
7681
return proxy;
7782
} catch (IntentUnsatisfiedException iue) {
@@ -85,8 +90,15 @@ public Object run() {
8590

8691
public void ungetService(Bundle requestingBundle, ServiceRegistration sreg, Object serviceObject) {
8792
synchronized (this) {
88-
serviceCounter--;
89-
LOG.debug("Services still provided by this ServiceFactory: {}", serviceCounter);
93+
ImportedService importedService = services.remove(serviceObject);
94+
if (importedService != null) {
95+
try {
96+
importedService.close();
97+
} catch (IOException e) {
98+
LOG.warn("Problem closing imported service proxy {} for {}", serviceObject, requestingBundle, e);
99+
}
100+
}
101+
LOG.debug("Services still provided by this ServiceFactory: {}", services.size());
90102
closeIfUnused();
91103
}
92104
}
@@ -99,7 +111,7 @@ public void setCloseable(boolean closeable) {
99111
}
100112

101113
private synchronized void closeIfUnused() {
102-
if (serviceCounter <= 0 && closeable) {
114+
if (services.isEmpty() && closeable) {
103115
importRegistration.closeAll();
104116
}
105117
}

rsa/src/test/java/org/apache/aries/rsa/core/ClientServiceFactoryTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ private DistributionProvider mockDistributionProvider(final Object proxy) {
7373
EasyMock.expect(handler.importEndpoint(anyObject(ClassLoader.class),
7474
anyObject(BundleContext.class),
7575
isA(Class[].class),
76-
anyObject(EndpointDescription.class))).andReturn(proxy);
76+
anyObject(EndpointDescription.class))).andReturn(() -> proxy);
7777
EasyMock.replay(handler);
7878
return handler;
7979
}

rsa/src/test/java/org/apache/aries/rsa/core/RemoteServiceAdminCoreTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.aries.rsa.core.event.EventProducer;
4444
import org.apache.aries.rsa.spi.DistributionProvider;
4545
import org.apache.aries.rsa.spi.Endpoint;
46+
import org.apache.aries.rsa.spi.ImportedService;
4647
import org.easymock.EasyMock;
4748
import org.easymock.IAnswer;
4849
import org.easymock.IMocksControl;
@@ -429,7 +430,7 @@ public Endpoint exportService(Object serviceO, BundleContext serviceContext,
429430
}
430431

431432
@Override
432-
public Object importEndpoint(ClassLoader cl, BundleContext consumerContext, Class[] interfaces,
433+
public ImportedService importEndpoint(ClassLoader cl, BundleContext consumerContext, Class[] interfaces,
433434
EndpointDescription endpoint) {
434435
return null;
435436
}

spi/src/main/java/org/apache/aries/rsa/spi/DistributionProvider.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public interface DistributionProvider {
3030

3131
/**
3232
* Called by RemoteServiceAdmin to export a service.
33-
*
33+
* <p>
3434
* The Distribution provider will be called if no config type was set or
3535
* if it supports the config type.
3636
*
@@ -46,14 +46,19 @@ Endpoint exportService(Object serviceO,
4646
Class[] exportedInterfaces);
4747

4848
/**
49+
* Called by RemoteServiceAdmin to import a service,
50+
* i.e. get a proxy that can be used to access the remote service.
51+
* <p>
4952
* @param cl classloader of the consumer bundle
5053
* @param consumerContext bundle context of the consumer bundle
5154
* @param interfaces interfaces of the service to proxy
5255
* @param endpoint description of the remote endpoint
53-
* @return service proxy to be given to the requesting bundle
56+
* @return an ImportedService that provides the service proxy
57+
* to be given to the requesting bundle, and can be closed
58+
* when the service is no longer used
5459
*/
55-
Object importEndpoint(ClassLoader cl,
56-
BundleContext consumerContext,
57-
Class[] interfaces,
58-
EndpointDescription endpoint);
60+
ImportedService importEndpoint(ClassLoader cl,
61+
BundleContext consumerContext,
62+
Class[] interfaces,
63+
EndpointDescription endpoint);
5964
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package org.apache.aries.rsa.spi;
2+
3+
import java.io.Closeable;
4+
import java.io.IOException;
5+
6+
/**
7+
* Wraps an imported service proxy, while allowing it to be properly
8+
* closed to release resources, close connections, etc.
9+
*/
10+
public interface ImportedService extends Closeable {
11+
12+
/**
13+
* Returns the service proxy to be used by consumer bundles.
14+
*
15+
* @return the service proxy
16+
*/
17+
Object getService();
18+
19+
/**
20+
* Close the service and release its resources.
21+
* <p>
22+
* Implementations should override this to release resources
23+
* associated with the service, close connections, etc.
24+
* when the service is no longer used.
25+
*
26+
* @throws IOException if an error occurs
27+
*/
28+
default void close() throws IOException {
29+
}
30+
}

0 commit comments

Comments
 (0)