diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProperties.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProperties.java index 90f6240f33d..b9829da7ce8 100644 --- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProperties.java +++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProperties.java @@ -35,6 +35,7 @@ public interface ClientProperties { Boolean.parseBoolean(SystemPropertyAction.getPropertyOrNull(THREAD_SAFE_CLIENT_PROP)); Integer THREAD_SAFE_CLIENT_STATE_CLEANUP_PERIOD = getIntValue(SystemPropertyAction.getPropertyOrNull(THREAD_SAFE_CLIENT_STATE_CLEANUP_PROP)); + String SSE_REQUEST_ENTITY = "sse.request.entity"; static Integer getIntValue(Object o) { return o instanceof Integer ? (Integer)o : o instanceof String ? Integer.valueOf(o.toString()) : null; diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java index 7bda05c5cc1..ecda257cef2 100644 --- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java @@ -30,6 +30,7 @@ import java.util.function.Consumer; import java.util.logging.Logger; +import javax.ws.rs.client.Entity; import javax.ws.rs.client.Invocation; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.Configuration; @@ -41,6 +42,7 @@ import org.apache.cxf.common.logging.LogUtils; import org.apache.cxf.endpoint.Endpoint; +import org.apache.cxf.jaxrs.client.ClientProperties; import org.apache.cxf.jaxrs.client.WebClient; import org.apache.cxf.jaxrs.impl.RetryAfterHeaderProvider; import org.apache.cxf.jaxrs.utils.ExceptionUtils; @@ -195,7 +197,19 @@ private void connect(String lastEventId) { if (lastEventId != null) { builder.header(HttpHeaders.LAST_EVENT_ID_HEADER, lastEventId); } - response = builder.get(); + + Object o = target.getConfiguration().getProperty(ClientProperties.SSE_REQUEST_ENTITY); + + if (o == null) { + response = builder.get(); + } else if (o instanceof Entity) { + LOG.fine("Using POST for SSE endpoint " + target.getUri() + " with entity " + o); + response = builder.post((Entity) o); + } else { + throw new IllegalArgumentException("The " + ClientProperties.SSE_REQUEST_ENTITY + + " property is not an entity " + o.getClass()); + } + // A client can be told to stop reconnecting using the HTTP 204 No Content // response code. In this case, we should give up. diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java index d616e137ddb..956e12c0171 100644 --- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java +++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java @@ -18,12 +18,7 @@ */ package org.apache.cxf.systest.jaxrs.sse; -import java.io.BufferedReader; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -38,13 +33,13 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; -import javax.ws.rs.ext.MessageBodyReader; import javax.ws.rs.sse.InboundSseEvent; import javax.ws.rs.sse.SseEventSource; import javax.ws.rs.sse.SseEventSource.Builder; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; + +import org.apache.cxf.jaxrs.client.ClientProperties; import org.junit.Before; import org.junit.Test; @@ -54,9 +49,7 @@ import static org.hamcrest.CoreMatchers.hasItems; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; public abstract class AbstractSseTest extends AbstractSseBaseTest { @@ -118,49 +111,15 @@ public void testBooksStreamIsReturnedFromInboundSseEvents() throws InterruptedEx @SuppressWarnings("unchecked") @Test public void testBooksStreamIsReturnedFromInboundSseEventsWithPOST() throws InterruptedException, IOException { - final WebTarget target = createWebTarget("/rest/api/bookstore/sse/0"); + final WebTarget target = createWebTarget("/rest/api/bookstore/sse/0") + .property(ClientProperties.SSE_REQUEST_ENTITY, Entity.text("42")); final Collection books = new ArrayList<>(); - @SuppressWarnings("rawtypes") - MessageBodyReader mbr = new JacksonJsonProvider(); - - Response response = target.request(MediaType.SERVER_SENT_EVENTS) - .post(Entity.entity(42, MediaType.TEXT_PLAIN)); - - try (BufferedReader br = new BufferedReader(new InputStreamReader(response.readEntity(InputStream.class)))) { - String s; - Integer id = null; - Book book = null; - - while ((s = br.readLine()) != null) { - if (s.trim().isEmpty()) { - if (id == null && book == null) { - continue; - } else if (id != null && book != null) { - books.add(book); - id = null; - book = null; - continue; - } - fail("The event did not contain both an id " + id + " and a book " + book); - } - if (s.startsWith("event:")) { - assertEquals("Not a book event", "event: book", s.trim()); - continue; - } - if (s.startsWith("id:")) { - assertNull("There was an existing id " + id, id); - id = Integer.parseInt(s.substring(3).trim()); - continue; - } - if (s.startsWith("data:")) { - assertNull("There was an existing book " + book, book); - book = (Book) mbr.readFrom(Book.class, Book.class, null, MediaType.APPLICATION_JSON_TYPE, null, - new ByteArrayInputStream(s.substring(5).trim().getBytes(StandardCharsets.UTF_8))); - continue; - } - fail("Unexpected String content returned by SSE POST " + s); - } + try (SseEventSource eventSource = SseEventSource.target(target).build()) { + eventSource.register(collect(books), System.out::println); + eventSource.open(); + // Give the SSE stream some time to collect all events + awaitEvents(5000, books, 4); } // Easing the test verification here, it does not work well for Atm + Jetty