From fd0b8e9d1c6ee68e83870c393e4d0a3af8d0f2e8 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Sat, 1 Mar 2025 20:58:13 -0500 Subject: [PATCH] CXF-8992: WebClient.fromClient() broken due to garbage collection (#2281) (cherry picked from commit 9756ab897d93ffd28cbd86487abd3ae0df19ec47) # Conflicts: # rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/WebClientTest.java --- .../cxf/jaxrs/client/AbstractClient.java | 86 +++++++-- .../apache/cxf/jaxrs/client/WebClient.java | 19 +- .../client/JAXRSClientFactoryBeanTest.java | 43 +++++ .../cxf/jaxrs/client/WebClientTest.java | 166 +++++++++++++++++- 4 files changed, 301 insertions(+), 13 deletions(-) diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AbstractClient.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AbstractClient.java index 3b839638047..9b25a4dddd8 100644 --- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AbstractClient.java +++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AbstractClient.java @@ -41,10 +41,12 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; import javax.xml.stream.XMLStreamWriter; +import jakarta.annotation.Nullable; import jakarta.ws.rs.PathParam; import jakarta.ws.rs.ProcessingException; import jakarta.ws.rs.WebApplicationException; @@ -130,6 +132,34 @@ public abstract class AbstractClient implements Client { protected ClientConfiguration cfg = new ClientConfiguration(); private ClientState state; private final AtomicBoolean closed = new AtomicBoolean(); + private volatile ConfigurationReference configurationReference = new ConfigurationReference(cfg); + + protected static final class ConfigurationReference { + private final AtomicLong refCount = new AtomicLong(1); + private final ClientConfiguration cfg; + + public ConfigurationReference(ClientConfiguration cfg) { + this.cfg = cfg; + } + + public ConfigurationReference acquire() { + refCount.incrementAndGet(); + return this; + } + + public long refCount() { + return refCount.get(); + } + + public long release() { + return refCount.decrementAndGet(); + } + + public ClientConfiguration getConfiguration() { + return cfg; + } + } + protected AbstractClient(ClientState initialState) { this.state = initialState; } @@ -349,7 +379,11 @@ public void close() { if (cfg.getBus() == null) { return; } - cfg.getEndpoint().getCleanupHooks(). + + final ConfigurationReference reference = configurationReference; + final boolean shutdownConfiguration = reference == null || reference.release() == 0L; + if (shutdownConfiguration) { + cfg.getEndpoint().getCleanupHooks(). forEach(c -> { try { c.close(); @@ -357,26 +391,35 @@ public void close() { //ignore } }); - ClientLifeCycleManager mgr = cfg.getBus().getExtension(ClientLifeCycleManager.class); + } + + final ClientLifeCycleManager mgr = cfg.getBus().getExtension(ClientLifeCycleManager.class); if (null != mgr) { mgr.clientDestroyed(new FrontendClientAdapter(getConfiguration())); } - if (cfg.getConduitSelector() instanceof Closeable) { - try { - ((Closeable)cfg.getConduitSelector()).close(); - } catch (IOException e) { - //ignore, we're destroying anyway + if (shutdownConfiguration) { + if (cfg.getConduitSelector() instanceof Closeable) { + try { + ((Closeable)cfg.getConduitSelector()).close(); + } catch (IOException e) { + //ignore, we're destroying anyway + } + } else { + cfg.getConduit().close(); } - } else { - cfg.getConduit().close(); } + + // reset state state.reset(); - if (cfg.isShutdownBusOnClose()) { + + if (shutdownConfiguration && cfg.isShutdownBusOnClose()) { cfg.getBus().shutdown(false); } + state = null; cfg = null; + configurationReference = null; } } @@ -908,6 +951,7 @@ public ClientConfiguration getConfiguration() { protected void setConfiguration(ClientConfiguration config) { cfg = config; + configurationReference = new ConfigurationReference(config); } // Note that some conduit selectors may update Message.ENDPOINT_ADDRESS @@ -1328,4 +1372,26 @@ protected void closeAsyncResponseIfPossible(Response r, Message outMessage, Jaxr protected void handleAsyncFault(Message message) { } } + + /** + * References the configuration (by another client) so it won't shut down till all the + * client instances are closed. + * @param reference configuration reference + */ + protected void setConfigurationReference(ConfigurationReference reference) { + if (reference == null) { + throw new IllegalArgumentException("The configuration reference is not set " + + "(the client was already closed)"); + } + this.configurationReference = reference.acquire(); + this.cfg = configurationReference.getConfiguration(); + } + + /** + * Returns configuration reference + * @return configuration reference + */ + protected @Nullable ConfigurationReference getConfigurationReference() { + return this.configurationReference; + } } diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java index e0db63b1587..7a5dc9d3bea 100644 --- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java +++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java @@ -81,6 +81,9 @@ * */ public class WebClient extends AbstractClient { + // Use client configuration reference instead of sharing the instance directly + public static final String USE_CONFIGURATION_REFERENCE_WHEN_COPY = "use.configuration.reference.when.copy"; + private static final String REQUEST_CLASS = "request.class"; private static final String REQUEST_TYPE = "request.type"; private static final String REQUEST_ANNS = "request.annotations"; @@ -1232,7 +1235,21 @@ protected void doWriteBody(Message outMessage, static void copyProperties(Client toClient, Client fromClient) { AbstractClient newClient = toAbstractClient(toClient); AbstractClient oldClient = toAbstractClient(fromClient); - newClient.setConfiguration(oldClient.getConfiguration()); + final ClientConfiguration oldCfg = oldClient.getConfiguration(); + + boolean useConfigurationReference = true; + if (oldCfg != null && oldCfg.getBus() != null) { + Object useConfigurationReferenceProp = oldCfg.getBus().getProperty(USE_CONFIGURATION_REFERENCE_WHEN_COPY); + if (useConfigurationReferenceProp != null) { + useConfigurationReference = PropertyUtils.isTrue(useConfigurationReferenceProp); + } + } + + if (useConfigurationReference) { + newClient.setConfigurationReference(oldClient.getConfigurationReference()); + } else { + newClient.setConfiguration(oldClient.getConfiguration()); + } } private static AbstractClient toAbstractClient(Object client) { diff --git a/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/JAXRSClientFactoryBeanTest.java b/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/JAXRSClientFactoryBeanTest.java index 94b8fac2d80..f9162ec1b73 100644 --- a/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/JAXRSClientFactoryBeanTest.java +++ b/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/JAXRSClientFactoryBeanTest.java @@ -54,6 +54,11 @@ import org.junit.Test; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; @@ -268,6 +273,44 @@ public void testInvokePathEmptyAllowed() throws Exception { assertNotNull(store.getBook("")); } + @Test + public void testCreateClientFrom() throws Exception { + JAXRSClientFactoryBean bean = new JAXRSClientFactoryBean(); + bean.setAddress("http://bar"); + bean.setResourceClass(BookStore.class); + + final Client client = bean.create(); + final WebClient wc = WebClient.fromClient(client); + assertThat(wc.getConfigurationReference(), is(not(nullValue()))); + assertThat(wc.getConfigurationReference().refCount(), equalTo(2L)); + + client.close(); + assertThat(wc.getConfigurationReference().refCount(), equalTo(1L)); + + wc.close(); + assertThat(wc.getConfigurationReference(), is(nullValue())); + } + + @Test + public void testCreateClientFromAndInvoke() throws Exception { + final SuperBookStore superBookResource = JAXRSClientFactory + .create("http://localhost:9000", SuperBookStore.class); + final Client client = (Client) superBookResource; + final WebClient wc = WebClient.fromClient(client); + + final Book book = superBookResource.getNewBook("id4", true); + assertNotNull(book); + + assertThat(wc.getConfigurationReference(), is(not(nullValue()))); + assertThat(wc.getConfigurationReference().refCount(), equalTo(2L)); + + client.close(); + assertThat(wc.getConfigurationReference().refCount(), equalTo(1L)); + + wc.close(); + assertThat(wc.getConfigurationReference(), is(nullValue())); + + } private final class TestFeature extends AbstractFeature { private TestInterceptor testInterceptor; diff --git a/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/WebClientTest.java b/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/WebClientTest.java index fa90cf37e5b..5ebdae56a86 100644 --- a/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/WebClientTest.java +++ b/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/WebClientTest.java @@ -22,25 +22,39 @@ import java.lang.annotation.Annotation; import java.lang.reflect.Type; import java.net.URI; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import jakarta.ws.rs.core.Cookie; import jakarta.ws.rs.core.HttpHeaders; import jakarta.ws.rs.ext.ParamConverter; import jakarta.ws.rs.ext.ParamConverterProvider; +import org.apache.cxf.Bus; +import org.apache.cxf.Bus.BusState; import org.apache.cxf.jaxrs.resources.BookInterface; import org.apache.cxf.jaxrs.resources.BookStore; import org.junit.Test; +import static org.hamcrest.CoreMatchers.anyOf; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - public class WebClientTest { @Test @@ -364,6 +378,154 @@ public void testCookieNoVersion() { containsInAnyOrder("a=1", "b=2")); } + @Test + public void testWebClientClose() { + final WebClient wc = WebClient.create("http://foo").language("en_CA"); + wc.getConfiguration().setShutdownBusOnClose(true); + + final Bus bus = wc.getConfiguration().getBus(); + assertThat(wc.getConfigurationReference(), is(not(nullValue()))); + assertThat(wc.getConfigurationReference().refCount(), equalTo(1L)); + wc.close(); + + assertThat(wc.getConfigurationReference(), is(nullValue())); + assertThat(wc.getConfiguration(), is(nullValue())); + + assertThat(bus.getState(), equalTo(BusState.SHUTDOWN)); + } + + @Test + public void testWebClientFromConcurrently() throws InterruptedException, ExecutionException, TimeoutException { + final WebClient wc = WebClient.create("http://foo").language("en_CA"); + wc.getConfiguration().setShutdownBusOnClose(true); + + final List> futures = new ArrayList<>(100); + final ExecutorService executor = Executors.newFixedThreadPool(8); + for (int i = 0; i < 100; ++i) { + futures.add(executor.submit(() -> WebClient.fromClient(wc))); + } + + final Bus bus = wc.getConfiguration().getBus(); + final List clients = new ArrayList<>(100); + try { + for (Future future: futures) { + final WebClient client = future.get(5, TimeUnit.SECONDS); + assertThat(client, is(not(nullValue()))); + clients.add(client); + } + + assertThat(bus.getState(), anyOf(equalTo(BusState.RUNNING), + equalTo(BusState.INITIALIZING), equalTo(BusState.INITIAL))); + assertThat(wc.getConfigurationReference(), is(not(nullValue()))); + assertThat(wc.getConfigurationReference().refCount(), equalTo(101L)); + + for (WebClient client: clients) { + executor.submit(client::close); + } + } finally { + executor.shutdown(); + assertThat(executor.awaitTermination(5, TimeUnit.SECONDS), equalTo(true)); + } + + assertThat(wc.getConfigurationReference(), is(not(nullValue()))); + assertThat(wc.getConfigurationReference().refCount(), equalTo(1L)); + wc.close(); + + assertThat(wc.getConfigurationReference(), is(nullValue())); + assertThat(wc.getConfiguration(), is(nullValue())); + + assertThat(bus.getState(), equalTo(BusState.SHUTDOWN)); + } + + @Test + public void testWebClientFrom() { + final WebClient wc = WebClient.create("http://foo").language("en_CA"); + wc.getConfiguration().setShutdownBusOnClose(true); + + assertThat(wc.getConfigurationReference(), is(not(nullValue()))); + assertThat(wc.getConfigurationReference().refCount(), equalTo(1L)); + + final WebClient wc1 = WebClient.fromClient(wc); + assertThat(wc.getConfigurationReference(), equalTo(wc1.getConfigurationReference())); + assertThat(wc.getConfigurationReference().refCount(), equalTo(2L)); + wc.close(); + + final ClientConfiguration configuration = wc1.getConfiguration(); + assertThat(configuration, is(not(nullValue()))); + assertThat(configuration.getBus().getState(), anyOf(equalTo(BusState.RUNNING), + equalTo(BusState.INITIALIZING), equalTo(BusState.INITIAL))); + + assertThat(wc.getConfigurationReference(), is(nullValue())); + assertThat(wc1.getConfigurationReference().refCount(), equalTo(1L)); + wc1.close(); + + assertThat(wc1.getConfigurationReference(), is(nullValue())); + assertThat(configuration.getBus().getState(), equalTo(BusState.SHUTDOWN)); + } + + @Test + public void testWebClientFromChained() { + final WebClient wc = WebClient.create("http://foo").language("en_CA"); + wc.getConfiguration().setShutdownBusOnClose(true); + + assertThat(wc.getConfigurationReference(), is(not(nullValue()))); + assertThat(wc.getConfigurationReference().refCount(), equalTo(1L)); + + final WebClient wc1 = WebClient.fromClient(wc); + assertThat(wc.getConfigurationReference(), equalTo(wc1.getConfigurationReference())); + assertThat(wc.getConfigurationReference().refCount(), equalTo(2L)); + + final WebClient wc2 = WebClient.fromClient(wc); + assertThat(wc.getConfigurationReference(), equalTo(wc2.getConfigurationReference())); + assertThat(wc.getConfigurationReference().refCount(), equalTo(3L)); + wc.close(); + + final ClientConfiguration configuration1 = wc1.getConfiguration(); + assertThat(configuration1, is(not(nullValue()))); + assertThat(configuration1.getBus().getState(), anyOf(equalTo(BusState.RUNNING), + equalTo(BusState.INITIALIZING), equalTo(BusState.INITIAL))); + assertThat(wc1.getConfigurationReference().refCount(), equalTo(2L)); + + final ClientConfiguration configuration2 = wc2.getConfiguration(); + assertThat(configuration2, is(not(nullValue()))); + assertThat(configuration2.getBus(), is(configuration1.getBus())); + wc1.close(); + + assertThat(wc.getConfigurationReference(), is(nullValue())); + assertThat(wc2.getConfigurationReference().refCount(), equalTo(1L)); + wc2.close(); + + assertThat(wc2.getConfigurationReference(), is(nullValue())); + assertThat(configuration2.getBus().getState(), equalTo(BusState.SHUTDOWN)); + } + + @Test + public void testWebClientFromShare() { + final WebClient wc = WebClient.create("http://foo").language("en_CA"); + wc.getConfiguration().setShutdownBusOnClose(true); + wc.getConfiguration().getBus().setProperty(WebClient.USE_CONFIGURATION_REFERENCE_WHEN_COPY, false); + + assertThat(wc.getConfigurationReference(), is(not(nullValue()))); + assertThat(wc.getConfigurationReference().refCount(), equalTo(1L)); + + final WebClient wc1 = WebClient.fromClient(wc); + assertThat(wc.getConfigurationReference(), not(equalTo(wc1.getConfigurationReference()))); + assertThat(wc.getConfigurationReference().refCount(), equalTo(1L)); + assertThat(wc1.getConfigurationReference().refCount(), equalTo(1L)); + wc.close(); + + final ClientConfiguration configuration = wc1.getConfiguration(); + assertThat(configuration, is(not(nullValue()))); + assertThat(configuration.getBus().getState(), equalTo(BusState.SHUTDOWN)); + + assertThat(wc.getConfigurationReference(), is(nullValue())); + assertThat(wc1.getConfigurationReference().refCount(), equalTo(1L)); + wc1.close(); + + assertThat(wc1.getConfigurationReference(), is(nullValue())); + assertThat(configuration.getBus().getState(), equalTo(BusState.SHUTDOWN)); + } + private static final class ParamConverterProviderImpl implements ParamConverterProvider { @SuppressWarnings("unchecked")