From 5d22672b817065a3b5e76bb12e0fa8355a998319 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Wed, 26 Oct 2022 23:25:13 -0500 Subject: [PATCH] `BeforeFinallyHttpOperator`: support re-subscribe to the message body (#2409) Motivation: `BeforeFinallyHttpOperator` has a single `state` inside `ResponseCompletionSubscriber` that is shared between `Single` and `Publisher` of the message body. If users re-subscribe to the message body, the state is already in `TERMINATED` position and does not propagate signals. Modifications: - Move message body processing logic into a new `MessageBodySubscriber` class; - Each subscribe operation creates a new `MessageBodySubscriber` and their states are managed independently; - Improve `IdleTimeoutConnectionFilterTest` to always create a new response object (otherwise, the response payload body can be transformed multiple times); Result: Users of `BeforeFinallyOperator` always see all terminal events, even for re-subscribe. Terminal signal will always be delivered to the second subscriber, even when `discardEventsAfterCancel` is set. --- .../http/utils/BeforeFinallyHttpOperator.java | 455 +++++++++--------- .../utils/BeforeFinallyHttpOperatorTest.java | 47 ++ .../IdleTimeoutConnectionFilterTest.java | 37 +- 3 files changed, 299 insertions(+), 240 deletions(-) diff --git a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java index 62e1052b50..68097d9c5f 100644 --- a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java +++ b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java @@ -112,9 +112,7 @@ public SingleSource.Subscriber apply( private static final class ResponseCompletionSubscriber implements SingleSource.Subscriber { private static final int IDLE = 0; private static final int PROCESSING_PAYLOAD = 1; - private static final int DELIVERING_PAYLOAD = 2; - private static final int AWAITING_CANCEL = 3; - private static final int TERMINATED = 4; + private static final int TERMINATED = -1; private static final AtomicIntegerFieldUpdater stateUpdater = newUpdater(ResponseCompletionSubscriber.class, "state"); private static final SingleSource.Subscriber NOOP_SUBSCRIBER = @@ -165,228 +163,8 @@ public void onSuccess(@Nullable final StreamingHttpResponse response) { sendNullResponse(); } else if (stateUpdater.compareAndSet(this, IDLE, PROCESSING_PAYLOAD)) { subscriber.onSuccess(response.transformMessageBody(payload -> - payload.liftSync(subscriber -> - new Subscriber() { - @Nullable - private Subscription subscription; - - @Override - public void onSubscribe(final Subscription subscription) { - this.subscription = subscription; - subscriber.onSubscribe(new Subscription() { - @Override - public void request(final long n) { - subscription.request(n); - } - - @Override - public void cancel() { - if (!discardEventsAfterCancel) { - try { - if (stateUpdater.compareAndSet( - ResponseCompletionSubscriber.this, - PROCESSING_PAYLOAD, TERMINATED)) { - beforeFinally.cancel(); - } - } finally { - subscription.cancel(); - } - return; - } - - for (;;) { - final int state = ResponseCompletionSubscriber.this.state; - assert state != IDLE; - if (state == PROCESSING_PAYLOAD) { - if (stateUpdater.compareAndSet( - ResponseCompletionSubscriber.this, - PROCESSING_PAYLOAD, TERMINATED)) { - try { - beforeFinally.cancel(); - } finally { - subscription.cancel(); - } - break; - } - } else if (state == DELIVERING_PAYLOAD) { - if (stateUpdater.compareAndSet( - ResponseCompletionSubscriber.this, - DELIVERING_PAYLOAD, AWAITING_CANCEL)) { - break; - } - } else if (state == TERMINATED) { - // still propagate cancel to the original subscription: - subscription.cancel(); - break; - } else { - // cancel can be invoked multiple times - assert state == AWAITING_CANCEL; - break; - } - } - } - }); - } - - @Override - public void onNext(@Nullable final Object o) { - if (!discardEventsAfterCancel) { - subscriber.onNext(o); - return; - } - - boolean reentry = false; - for (;;) { - final int state = ResponseCompletionSubscriber.this.state; - assert state != IDLE; - if (state == TERMINATED) { - // We already cancelled and have to discard further events - return; - } - if (state == DELIVERING_PAYLOAD || state == AWAITING_CANCEL) { - reentry = true; - break; - } - if (stateUpdater.compareAndSet(ResponseCompletionSubscriber.this, - PROCESSING_PAYLOAD, DELIVERING_PAYLOAD)) { - break; - } - } - - try { - subscriber.onNext(o); - } finally { - // Re-entry -> don't unlock - if (!reentry) { - for (;;) { - final int state = ResponseCompletionSubscriber.this.state; - assert state != IDLE; - assert state != PROCESSING_PAYLOAD; - if (state == TERMINATED) { - break; - } - if (state == DELIVERING_PAYLOAD) { - if (stateUpdater.compareAndSet( - ResponseCompletionSubscriber.this, - DELIVERING_PAYLOAD, PROCESSING_PAYLOAD)) { - break; - } - } else if (stateUpdater.compareAndSet( - ResponseCompletionSubscriber.this, - AWAITING_CANCEL, TERMINATED)) { - try { - beforeFinally.cancel(); - } finally { - assert subscription != null; - subscription.cancel(); - } - break; - } - } - } - } - } - - @Override - public void onError(final Throwable t) { - if (!discardEventsAfterCancel) { - try { - if (stateUpdater.compareAndSet(ResponseCompletionSubscriber.this, - PROCESSING_PAYLOAD, TERMINATED)) { - beforeFinally.onError(t); - } - } catch (Throwable cause) { - addSuppressed(t, cause); - } - subscriber.onError(t); - return; - } - - final int prevState = setTerminalState(); - if (prevState == TERMINATED) { - // We already cancelled and have to discard further events - return; - } - // Propagate original cancel to let Subscription observe it - final boolean propagateCancel = prevState == AWAITING_CANCEL; - - try { - beforeFinally.onError(t); - } catch (Throwable cause) { - addSuppressed(t, cause); - } - try { - subscriber.onError(t); - } finally { - cancel0(propagateCancel); - } - } - - @Override - public void onComplete() { - if (!discardEventsAfterCancel) { - try { - if (stateUpdater.compareAndSet(ResponseCompletionSubscriber.this, - PROCESSING_PAYLOAD, TERMINATED)) { - beforeFinally.onComplete(); - } - } catch (Throwable cause) { - subscriber.onError(cause); - return; - } - subscriber.onComplete(); - return; - } - - final int prevState = setTerminalState(); - if (prevState == TERMINATED) { - // We already cancelled and have to discard further events - return; - } - // Propagate original cancel to let Subscription observe it - final boolean propagateCancel = prevState == AWAITING_CANCEL; - - try { - try { - beforeFinally.onComplete(); - } catch (Throwable cause) { - subscriber.onError(cause); - return; - } - subscriber.onComplete(); - } finally { - cancel0(propagateCancel); - } - } - - private int setTerminalState() { - for (;;) { - final int state = ResponseCompletionSubscriber.this.state; - assert state != IDLE; - if (state == TERMINATED) { - // We already cancelled and have to discard further events - return state; - } - if (state == PROCESSING_PAYLOAD) { - if (stateUpdater.compareAndSet(ResponseCompletionSubscriber.this, - PROCESSING_PAYLOAD, TERMINATED)) { - return state; - } - } else if (stateUpdater.compareAndSet(ResponseCompletionSubscriber.this, - state, TERMINATED)) { - // re-entry, but we can terminate because this is a final event: - return state; - } - } - } - - private void cancel0(final boolean propagateCancel) { - if (propagateCancel) { - assert subscription != null; - subscription.cancel(); - } - } - }) + payload.liftSync(messageBodySubscriber -> new MessageBodySubscriber(messageBodySubscriber, + beforeFinally, discardEventsAfterCancel)) )); } else { // Invoking a terminal method multiple times is not allowed by the RS spec, so we assume we have been @@ -444,4 +222,231 @@ private void dereferenceSubscriber() { subscriber = NOOP_SUBSCRIBER; } } + + private static final class MessageBodySubscriber implements Subscriber { + + private static final int PROCESSING_PAYLOAD = 0; + private static final int DELIVERING_PAYLOAD = 1; + private static final int AWAITING_CANCEL = 2; + private static final int TERMINATED = -1; + + private static final AtomicIntegerFieldUpdater stateUpdater = + newUpdater(MessageBodySubscriber.class, "state"); + + private final Subscriber subscriber; + private final TerminalSignalConsumer beforeFinally; + private final boolean discardEventsAfterCancel; + private volatile int state; + @Nullable + private Subscription subscription; + + MessageBodySubscriber(final Subscriber subscriber, + final TerminalSignalConsumer beforeFinally, + final boolean discardEventsAfterCancel) { + this.subscriber = subscriber; + this.beforeFinally = beforeFinally; + this.discardEventsAfterCancel = discardEventsAfterCancel; + } + + @Override + public void onSubscribe(final Subscription subscription) { + this.subscription = subscription; + subscriber.onSubscribe(new Subscription() { + @Override + public void request(final long n) { + subscription.request(n); + } + + @Override + public void cancel() { + if (!discardEventsAfterCancel) { + try { + if (stateUpdater.compareAndSet(MessageBodySubscriber.this, + PROCESSING_PAYLOAD, TERMINATED)) { + beforeFinally.cancel(); + } + } finally { + subscription.cancel(); + } + return; + } + + for (;;) { + final int state = MessageBodySubscriber.this.state; + if (state == PROCESSING_PAYLOAD) { + if (stateUpdater.compareAndSet(MessageBodySubscriber.this, + PROCESSING_PAYLOAD, TERMINATED)) { + try { + beforeFinally.cancel(); + } finally { + subscription.cancel(); + } + break; + } + } else if (state == DELIVERING_PAYLOAD) { + if (stateUpdater.compareAndSet(MessageBodySubscriber.this, + DELIVERING_PAYLOAD, AWAITING_CANCEL)) { + break; + } + } else if (state == TERMINATED) { + // still propagate cancel to the original subscription: + subscription.cancel(); + break; + } else { + // cancel can be invoked multiple times + assert state == AWAITING_CANCEL; + break; + } + } + } + }); + } + + @Override + public void onNext(@Nullable final Object o) { + if (!discardEventsAfterCancel) { + subscriber.onNext(o); + return; + } + + boolean reentry = false; + for (;;) { + final int state = this.state; + if (state == TERMINATED) { + // We already cancelled and have to discard further events + return; + } + if (state == DELIVERING_PAYLOAD || state == AWAITING_CANCEL) { + reentry = true; + break; + } + if (stateUpdater.compareAndSet(this, PROCESSING_PAYLOAD, DELIVERING_PAYLOAD)) { + break; + } + } + + try { + subscriber.onNext(o); + } finally { + // Re-entry -> don't unlock + if (!reentry) { + for (;;) { + final int state = this.state; + assert state != PROCESSING_PAYLOAD; + if (state == TERMINATED) { + break; + } + if (state == DELIVERING_PAYLOAD) { + if (stateUpdater.compareAndSet(this, DELIVERING_PAYLOAD, PROCESSING_PAYLOAD)) { + break; + } + } else if (stateUpdater.compareAndSet(this, AWAITING_CANCEL, TERMINATED)) { + try { + beforeFinally.cancel(); + } finally { + assert subscription != null; + subscription.cancel(); + } + break; + } + } + } + } + } + + @Override + public void onError(final Throwable t) { + if (!discardEventsAfterCancel) { + try { + if (stateUpdater.compareAndSet(this, PROCESSING_PAYLOAD, TERMINATED)) { + beforeFinally.onError(t); + } + } catch (Throwable cause) { + addSuppressed(t, cause); + } + subscriber.onError(t); + return; + } + + final int prevState = setTerminalState(); + if (prevState == TERMINATED) { + // We already cancelled and have to discard further events + return; + } + // Propagate original cancel to let Subscription observe it + final boolean propagateCancel = prevState == AWAITING_CANCEL; + + try { + beforeFinally.onError(t); + } catch (Throwable cause) { + addSuppressed(t, cause); + } + try { + subscriber.onError(t); + } finally { + cancel0(propagateCancel); + } + } + + @Override + public void onComplete() { + if (!discardEventsAfterCancel) { + try { + if (stateUpdater.compareAndSet(this, PROCESSING_PAYLOAD, TERMINATED)) { + beforeFinally.onComplete(); + } + } catch (Throwable cause) { + subscriber.onError(cause); + return; + } + subscriber.onComplete(); + return; + } + + final int prevState = setTerminalState(); + if (prevState == TERMINATED) { + // We already cancelled and have to discard further events + return; + } + // Propagate original cancel to let Subscription observe it + final boolean propagateCancel = prevState == AWAITING_CANCEL; + + try { + try { + beforeFinally.onComplete(); + } catch (Throwable cause) { + subscriber.onError(cause); + return; + } + subscriber.onComplete(); + } finally { + cancel0(propagateCancel); + } + } + + private int setTerminalState() { + for (;;) { + final int state = this.state; + if (state == TERMINATED) { + // We already cancelled and have to discard further events + return state; + } + if (state == PROCESSING_PAYLOAD) { + if (stateUpdater.compareAndSet(this, PROCESSING_PAYLOAD, TERMINATED)) { + return state; + } + } else if (stateUpdater.compareAndSet(this, state, TERMINATED)) { + // re-entry, but we can terminate because this is a final event: + return state; + } + } + } + + private void cancel0(final boolean propagateCancel) { + if (propagateCancel) { + assert subscription != null; + subscription.cancel(); + } + } + } } diff --git a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java index 2d09fc74d9..abdcc4a5db 100644 --- a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java +++ b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java @@ -23,10 +23,12 @@ import io.servicetalk.concurrent.SingleSource; import io.servicetalk.concurrent.SingleSource.Subscriber; import io.servicetalk.concurrent.api.LegacyTestSingle; +import io.servicetalk.concurrent.api.Processors; import io.servicetalk.concurrent.api.Single; import io.servicetalk.concurrent.api.TerminalSignalConsumer; import io.servicetalk.concurrent.api.TestPublisher; import io.servicetalk.concurrent.api.TestSubscription; +import io.servicetalk.concurrent.internal.DuplicateSubscribeException; import io.servicetalk.concurrent.internal.TerminalNotification; import io.servicetalk.concurrent.test.internal.TestPublisherSubscriber; import io.servicetalk.http.api.DefaultHttpHeadersFactory; @@ -38,6 +40,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; @@ -55,6 +58,7 @@ import static io.servicetalk.buffer.netty.BufferAllocators.DEFAULT_ALLOCATOR; import static io.servicetalk.concurrent.Cancellable.IGNORE_CANCEL; import static io.servicetalk.concurrent.api.Publisher.never; +import static io.servicetalk.concurrent.api.SourceAdapters.fromSource; import static io.servicetalk.concurrent.api.SourceAdapters.toSource; import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1; @@ -69,7 +73,9 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; @@ -544,6 +550,47 @@ void payloadComplete(boolean discardEventsAfterCancel) { verify(beforeFinally).onComplete(); } + @ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0} payloadError={1}") + @CsvSource(value = {"true,true", "true,false", "false,true", "false,false"}) + void resubscribeToPayloadBody(boolean discardEventsAfterCancel, boolean payloadError) { + LegacyTestSingle responseSingle = new LegacyTestSingle<>(true); + final ResponseSubscriber subscriber = new ResponseSubscriber(); + toSource(responseSingle + .liftSync(new BeforeFinallyHttpOperator(beforeFinally, discardEventsAfterCancel))) + .subscribe(subscriber); + assertThat("onSubscribe not called.", subscriber.cancellable, is(notNullValue())); + + PublisherSource.Processor payload = Processors.newPublisherProcessor(); + final StreamingHttpResponse response = reqRespFactory.ok().payloadBody(fromSource(payload)); + responseSingle.onSuccess(response); + + verifyNoInteractions(beforeFinally); + responseSingle.verifyNotCancelled(); + subscriber.verifyResponseReceived(); + assert subscriber.response != null; + + // Subscribe for the first time. + TestPublisherSubscriber payloadSubscriber1 = new TestPublisherSubscriber<>(); + toSource(subscriber.response.payloadBody()).subscribe(payloadSubscriber1); + payloadSubscriber1.awaitSubscription().request(MAX_VALUE); + if (payloadError) { + payload.onError(DELIBERATE_EXCEPTION); + verify(beforeFinally).onError(DELIBERATE_EXCEPTION); + assertThat(payloadSubscriber1.awaitOnError(), is(sameInstance(DELIBERATE_EXCEPTION))); + } else { + payload.onComplete(); + verify(beforeFinally).onComplete(); + payloadSubscriber1.awaitOnComplete(); + } + + // Subscribe for the second time. + TestPublisherSubscriber payloadSubscriber2 = new TestPublisherSubscriber<>(); + toSource(subscriber.response.payloadBody()).subscribe(payloadSubscriber2); + payloadSubscriber2.awaitSubscription().request(MAX_VALUE); + assertThat(payloadSubscriber2.awaitOnError(), is(instanceOf(DuplicateSubscribeException.class))); + verify(beforeFinally).onError(any(DuplicateSubscribeException.class)); + } + private static final class ResponseSubscriber implements SingleSource.Subscriber { @Nullable diff --git a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/IdleTimeoutConnectionFilterTest.java b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/IdleTimeoutConnectionFilterTest.java index 76d3084999..dcf26b6ef3 100644 --- a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/IdleTimeoutConnectionFilterTest.java +++ b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/IdleTimeoutConnectionFilterTest.java @@ -25,6 +25,7 @@ import io.servicetalk.http.api.StreamingHttpRequest; import io.servicetalk.http.api.StreamingHttpRequester; import io.servicetalk.http.api.StreamingHttpResponse; +import io.servicetalk.http.api.StreamingHttpResponses; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; @@ -43,7 +44,6 @@ import static io.servicetalk.concurrent.api.SourceAdapters.toSource; import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1; import static io.servicetalk.http.api.HttpResponseStatus.OK; -import static io.servicetalk.http.api.StreamingHttpResponses.newResponse; import static java.time.Duration.ZERO; import static java.time.Duration.ofMillis; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -61,8 +61,7 @@ class IdleTimeoutConnectionFilterTest { private static final StreamingHttpRequest REQUEST = mock(StreamingHttpRequest.class); private static final StreamingHttpRequest REQUEST_SUCCESS = mock(StreamingHttpRequest.class); private static final StreamingHttpRequest REQUEST_FAIL = mock(StreamingHttpRequest.class); - private static final StreamingHttpResponse RESPONSE = newResponse(OK, HTTP_1_1, - DefaultHttpHeadersFactory.INSTANCE.newHeaders(), DEFAULT_ALLOCATOR, DefaultHttpHeadersFactory.INSTANCE); + private static final StreamingHttpResponse RESPONSE = newResponse(); private static final Throwable DELIBERATE_EXCEPTION = new RuntimeException("DELIBERATE_EXCEPTION"); private static final long TIMEOUT_MILLIS = 60_000; @@ -127,7 +126,7 @@ void closedManually() { } @Test - void hadSuccessfulResponse() { + void hadSuccessfulResponse() throws Exception { StreamingHttpRequester requester = applyTimeout(); executor.advanceTimeByNoExecuteTasks(TIMEOUT_MILLIS / 2, MILLISECONDS); assertSuccessfulResponse(requester); @@ -157,7 +156,7 @@ void hadFailedResponse() { } @Test - void twoConcurrentRequests() { + void twoConcurrentRequests() throws Exception { StreamingHttpRequester requester = applyTimeout(); executor.advanceTimeByNoExecuteTasks(TIMEOUT_MILLIS / 2, MILLISECONDS); @@ -175,8 +174,9 @@ void twoConcurrentRequests() { assertNotClosed(); // we still have the 1st "in-flight" request // Complete the 1st request: - firstResponseProcessor.onSuccess(RESPONSE); - assertResponse(responseSubscriber); + StreamingHttpResponse firstResponse = newResponse(); + firstResponseProcessor.onSuccess(firstResponse); + assertResponse(responseSubscriber, firstResponse); executor.advanceTimeBy(TIMEOUT_MILLIS / 2, MILLISECONDS); assertNotClosed(); // timeout was reset after completion of the 1st request @@ -188,7 +188,7 @@ void twoConcurrentRequests() { } @Test - void inFlightRequest() { + void inFlightRequest() throws Exception { StreamingHttpRequester requester = applyTimeout(); executor.advanceTimeByNoExecuteTasks(TIMEOUT_MILLIS / 2, MILLISECONDS); @@ -201,8 +201,9 @@ void inFlightRequest() { executor.advanceTimeBy(TIMEOUT_MILLIS, MILLISECONDS); assertNotClosed(); - responseProcessor.onSuccess(RESPONSE); - assertResponse(responseSubscriber); + StreamingHttpResponse response = newResponse(); + responseProcessor.onSuccess(response); + assertResponse(responseSubscriber, response); executor.advanceTimeBy(TIMEOUT_MILLIS / 2, MILLISECONDS); assertNotClosed(); @@ -221,17 +222,18 @@ private void assertNotClosed() { assertThat(closedTimes.get(), is(0)); } - private static void assertSuccessfulResponse(StreamingHttpRequester requester) { + private static void assertSuccessfulResponse(StreamingHttpRequester requester) throws Exception { TestSingleSubscriber responseSubscriber = new TestSingleSubscriber<>(); toSource(requester.request(REQUEST_SUCCESS)).subscribe(responseSubscriber); - assertResponse(responseSubscriber); + assertResponse(responseSubscriber, RESPONSE); } - private static void assertResponse(TestSingleSubscriber responseSubscriber) { + private static void assertResponse(TestSingleSubscriber responseSubscriber, + StreamingHttpResponse expectedResponse) throws Exception { StreamingHttpResponse response = responseSubscriber.awaitOnSuccess(); assertThat(response, is(notNullValue())); - assertThat(response, is(RESPONSE)); - response.payloadBody().ignoreElements().subscribe(); + assertThat(response, is(expectedResponse)); + response.payloadBody().ignoreElements().toFuture().get(); } private static void assertFailedResponse(StreamingHttpRequester requester) { @@ -245,4 +247,9 @@ private static void assertClosedChannelException(StreamingHttpRequester requeste toSource(requester.request(REQUEST)).subscribe(responseSubscriber); assertThat(responseSubscriber.awaitOnError(), instanceOf(ClosedChannelException.class)); } + + private static StreamingHttpResponse newResponse() { + return StreamingHttpResponses.newResponse(OK, HTTP_1_1, DefaultHttpHeadersFactory.INSTANCE.newHeaders(), + DEFAULT_ALLOCATOR, DefaultHttpHeadersFactory.INSTANCE); + } }