From ed2c50b86800dc50853ab002daf247d42d6dedb2 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Fri, 30 Sep 2022 00:18:20 -0700 Subject: [PATCH 1/2] Single#concatPropagateCancel(Publisher) to force cancel propagation Motivation: In some scenarios concat is used where the right hand side has assocated state that is desirable to always trigger (concat payload body to meta data for http response processing). Regular concat will not subscribe if the Single terminates with onError or is cancelled which will prevent terminal event visibility. --- .../io/servicetalk/concurrent/api/Single.java | 26 ++- .../api/SingleConcatWithPublisher.java | 200 ++++++++++++++---- .../single/SingleConcatWithPublisherTest.java | 95 +++++++-- ...rDeferSubscribePropagateCancelTckTest.java | 27 +++ .../tck/SingleConcatWithPublisherTckTest.java | 11 +- 5 files changed, 293 insertions(+), 66 deletions(-) create mode 100644 servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/SingleConcatWithPublisherDeferSubscribePropagateCancelTckTest.java diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Single.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Single.java index 946402991e..67d45f481a 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Single.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Single.java @@ -766,7 +766,7 @@ public final Single concat(Completable next) { * @see #concat(Publisher, boolean) */ public final Publisher concat(Publisher next) { - return new SingleConcatWithPublisher<>(this, next, false); + return concat(next, false); } /** @@ -793,7 +793,29 @@ public final Publisher concat(Publisher next) { * all elements from {@code next} {@link Publisher}. */ public final Publisher concat(Publisher next, boolean deferSubscribe) { - return new SingleConcatWithPublisher<>(this, next, deferSubscribe); + return new SingleConcatWithPublisher<>(this, next, deferSubscribe, false); + } + + /** + * Returns a {@link Publisher} that first emits the result of this {@link Single} and then subscribes and emits all + * elements from {@code next} {@link Publisher}. Any error emitted by this {@link Single} or {@code next} + * {@link Publisher} is forwarded to the returned {@link Publisher}. + *

+ * This method provides a means to sequence the execution of two asynchronous sources and in sequential programming + * is similar to: + *

{@code
+     *     List results = new ...;
+     *     results.add(resultOfThisSingle());
+     *     results.addAll(nextStream());
+     *     return results;
+     * }
+ * @param next {@link Publisher} to concat. will be subscribed to and cancelled if this {@link Publisher} is + * cancelled or terminates with {@link Subscriber#onError(Throwable)}. + * @return New {@link Publisher} that first emits the result of this {@link Single} and then subscribes and emits + * all elements from {@code next} {@link Publisher}. + */ + public final Publisher concatPropagateCancel(Publisher next) { + return new SingleConcatWithPublisher<>(this, next, false, true); } /** diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleConcatWithPublisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleConcatWithPublisher.java index df1e5a536a..89cfde9553 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleConcatWithPublisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleConcatWithPublisher.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.annotation.Nullable; +import static io.servicetalk.concurrent.internal.EmptySubscriptions.EMPTY_SUBSCRIPTION; import static io.servicetalk.concurrent.internal.SubscriberUtils.isRequestNValid; import static io.servicetalk.concurrent.internal.SubscriberUtils.newExceptionForInvalidRequestN; import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater; @@ -31,19 +32,21 @@ final class SingleConcatWithPublisher extends AbstractNoHandleSubscribePublis private final Single original; private final Publisher next; private final boolean deferSubscribe; + private final boolean propagateCancel; SingleConcatWithPublisher(final Single original, final Publisher next, - final boolean deferSubscribe) { + final boolean deferSubscribe, final boolean propagateCancel) { this.original = original; this.next = Objects.requireNonNull(next, "next"); this.deferSubscribe = deferSubscribe; + this.propagateCancel = propagateCancel; } @Override void handleSubscribe(final Subscriber subscriber, final ContextMap contextMap, final AsyncContextProvider contextProvider) { - original.delegateSubscribe(deferSubscribe ? new ConcatDeferNextSubscriber<>(subscriber, next) : - new ConcatSubscriber<>(subscriber, next), contextMap, contextProvider); + original.delegateSubscribe(deferSubscribe ? new ConcatDeferNextSubscriber<>(subscriber, next, propagateCancel) : + new ConcatSubscriber<>(subscriber, next, propagateCancel), contextMap, contextProvider); } private abstract static class AbstractConcatSubscriber extends DelayedCancellableThenSubscription @@ -63,6 +66,7 @@ private abstract static class AbstractConcatSubscriber extends DelayedCancell static final AtomicReferenceFieldUpdater mayBeResultUpdater = newUpdater(AbstractConcatSubscriber.class, Object.class, "mayBeResult"); + private final boolean propagateCancel; final Subscriber target; final Publisher next; @@ -72,9 +76,11 @@ private abstract static class AbstractConcatSubscriber extends DelayedCancell @Nullable volatile Object mayBeResult = INITIAL; - AbstractConcatSubscriber(final Subscriber target, final Publisher next) { + AbstractConcatSubscriber(final Subscriber target, final Publisher next, + final boolean propagateCancel) { this.target = target; this.next = next; + this.propagateCancel = propagateCancel; } @Override @@ -90,17 +96,75 @@ public final void onSubscribe(final Subscription subscription) { @Override public final void onNext(@Nullable final T t) { + // propagateCancel - if cancel does subscribe to the Publisher there will be no demand propagated upstream + // so we don't have to worry about concurrency or use-after-terminate here. target.onNext(t); } @Override public final void onError(final Throwable t) { - target.onError(t); + if (propagateCancel) { + onErrorPropagateCancel(t); + } else { + target.onError(t); + } + } + + private void onErrorPropagateCancel(Throwable t) { + for (;;) { + final Object oldValue = mayBeResult; + if (oldValue == CANCELLED) { + // CANCELLED means downstream doesn't care about being notified and internal state tracking also + // uses this state if terminal has been propagated, so avoid duplicate terminal. We may + // subscribe to both sources in parallel if cancellation occurs, so allowing terminal to + // propagate would mean ordering and concurrency needs to be accounted for between Single and + // Publisher, because cancel allows for no more future delivery we avoid future invocation of + // the target subscriber. + break; + } else if (finallyShouldSubscribeToNext(oldValue)) { + if (mayBeResultUpdater.compareAndSet(this, oldValue, CANCELLED)) { + forceCancelNextOnSubscribe(); + try { + target.onError(t); + } finally { + next.subscribeInternal(this); + } + break; + } + } else if (mayBeResultUpdater.compareAndSet(this, oldValue, CANCELLED)) { + target.onError(t); + break; + } + } } @Override public final void onComplete() { - target.onComplete(); + if (propagateCancel) { + onCompletePropagateCancel(); + } else { + target.onComplete(); + } + } + + private void onCompletePropagateCancel() { + for (;;) { + final Object oldValue = mayBeResult; + if (oldValue == CANCELLED) { + // CANCELLED means downstream doesn't care about being notified and internal state tracking also + // uses this state if terminal has been propagated, so avoid duplicate terminal. We may + // subscribe to both sources in parallel if cancellation occurs, so allowing terminal to + // propagate would mean ordering and concurrency needs to be accounted for between Single and + // Publisher, because cancel allows for no more future delivery we avoid future invocation of + // the target subscriber. + break; + } else if (mayBeResultUpdater.compareAndSet(this, oldValue, CANCELLED)) { + // onComplete() can only be called after we subscribe to next Publisher, no need to check if we need + // to subscribe to next. + target.onComplete(); + break; + } + } } @Override @@ -108,10 +172,18 @@ public final void cancel() { // We track cancelled here because we need to make sure if cancel() happens subsequent calls to request(n) // are NOOPs [1]. // [1] https://github.com/reactive-streams/reactive-streams-jvm#3.6 - mayBeResult = CANCELLED; - super.cancel(); + final Object oldValue = mayBeResultUpdater.getAndSet(this, CANCELLED); + try { + super.cancel(); // call cancel first, so if we do subscribe to next we won't propagate demand. + } finally { + if (propagateCancel && oldValue != CANCELLED && finallyShouldSubscribeToNext(oldValue)) { + next.subscribeInternal(this); + } + } } + abstract boolean finallyShouldSubscribeToNext(@Nullable Object oldState); + /** * Helper method to invoke {@link DelayedCancellableThenSubscription#cancel()} from subclasses. */ @@ -120,15 +192,34 @@ final void superCancel() { } final boolean tryEmitSingleSuccessToTarget(@Nullable final T result) { + // If we are in this method cancel() is not allowed to subscribe because that may introduce concurrency on + // target if the Publisher terminates without any demand. try { target.onNext(result); return true; } catch (Throwable cause) { - mayBeResult = CANCELLED; - target.onError(cause); - return false; + return handleOnNextThrowable(cause); } } + + private boolean handleOnNextThrowable(Throwable cause) { + // Switch state to CANCELLED to prevent any further interaction with target. For example if propagateCancel + // then we will subscribe and the next subscriber may send another terminal without any demand. We don't + // have to explicitly cancel here because the Single has already terminated. + mayBeResult = CANCELLED; + target.onError(cause); + if (propagateCancel) { + forceCancelNextOnSubscribe(); + return true; + } + return false; + } + + private void forceCancelNextOnSubscribe() { + // When onSubscribe(Subscription) is called this ensures we don't propagate any demand upstream + // and forces cancel() when onSubscribe is called. + delayedSubscription(EMPTY_SUBSCRIPTION); + } } private static final class ConcatSubscriber extends AbstractConcatSubscriber { @@ -136,20 +227,30 @@ private static final class ConcatSubscriber extends AbstractConcatSubscriber< * If {@link #request(long)} (with a valid n) invoked before {@link #onSuccess(Object)}. */ private static final Object REQUESTED = new Object(); + private static final Object REQUESTED_SUBSCRIBED = new Object(); + + ConcatSubscriber(final Subscriber target, final Publisher next, + final boolean propagateCancel) { + super(target, next, propagateCancel); + } - ConcatSubscriber(final Subscriber target, final Publisher next) { - super(target, next); + @Override + boolean finallyShouldSubscribeToNext(@Nullable final Object oldState) { + return oldState != REQUESTED_SUBSCRIBED; } @Override public void onSuccess(@Nullable final T result) { for (;;) { final Object oldValue = mayBeResult; + assert oldValue != REQUESTED_SUBSCRIBED; if (oldValue == REQUESTED) { - if (tryEmitSingleSuccessToTarget(result)) { - next.subscribeInternal(this); + if (mayBeResultUpdater.compareAndSet(this, REQUESTED, REQUESTED_SUBSCRIBED)) { + if (tryEmitSingleSuccessToTarget(result)) { + next.subscribeInternal(this); + } + break; } - break; } else if (oldValue == CANCELLED || mayBeResultUpdater.compareAndSet(this, INITIAL, result)) { break; } @@ -162,18 +263,20 @@ public void request(long n) { final Object oldVal = mayBeResult; if (oldVal == CANCELLED) { break; - } else if (oldVal == REQUESTED) { + } else if (oldVal == REQUESTED || oldVal == REQUESTED_SUBSCRIBED) { super.request(n); break; } else if (!isRequestNValid(n)) { - mayBeResult = CANCELLED; - try { - target.onError(newExceptionForInvalidRequestN(n)); - } finally { - superCancel(); + if (mayBeResultUpdater.compareAndSet(this, oldVal, CANCELLED)) { + try { + superCancel(); + } finally { + target.onError(newExceptionForInvalidRequestN(n)); + } + break; } - break; - } else if (mayBeResultUpdater.compareAndSet(this, oldVal, REQUESTED)) { + } else if (mayBeResultUpdater.compareAndSet(this, oldVal, + oldVal != INITIAL ? REQUESTED_SUBSCRIBED : REQUESTED)) { // We need to ensure that the queued result is delivered in order (first). Upstream demand is // delayed via DelayedSubscription until onSubscribe which preserves ordering, and there are some // scenarios where subscribing to the concat Publisher may block on demand (e.g. @@ -220,8 +323,14 @@ private static final class ConcatDeferNextSubscriber extends AbstractConcatSu */ private static final Object PUBLISHER_SUBSCRIBED = new Object(); - ConcatDeferNextSubscriber(final Subscriber target, final Publisher next) { - super(target, next); + ConcatDeferNextSubscriber(final Subscriber target, final Publisher next, + final boolean propagateCancel) { + super(target, next, propagateCancel); + } + + @Override + boolean finallyShouldSubscribeToNext(@Nullable final Object oldState) { + return oldState != PUBLISHER_SUBSCRIBED; } @Override @@ -235,16 +344,16 @@ public void onSuccess(@Nullable final T result) { if (oldValue == CANCELLED) { break; } else if (oldValue == INITIAL) { - if (mayBeResultUpdater.compareAndSet(this, oldValue, result)) { + if (mayBeResultUpdater.compareAndSet(this, INITIAL, result)) { break; } } else if (oldValue == REQUESTED_ONE) { - if (mayBeResultUpdater.compareAndSet(this, oldValue, SINGLE_DELIVERING)) { + if (mayBeResultUpdater.compareAndSet(this, REQUESTED_ONE, SINGLE_DELIVERING)) { emitSingleSuccessToTarget(result); break; } } else if (oldValue == REQUESTED_MORE && - mayBeResultUpdater.compareAndSet(this, oldValue, PUBLISHER_SUBSCRIBED)) { + mayBeResultUpdater.compareAndSet(this, REQUESTED_MORE, PUBLISHER_SUBSCRIBED)) { if (tryEmitSingleSuccessToTarget(result)) { next.subscribeInternal(this); } @@ -263,22 +372,23 @@ public void request(long n) { super.request(n); break; } else if (!isRequestNValid(n)) { - mayBeResult = CANCELLED; - try { - target.onError(newExceptionForInvalidRequestN(n)); - } finally { - superCancel(); + if (mayBeResultUpdater.compareAndSet(this, oldVal, CANCELLED)) { + try { + superCancel(); + } finally { + target.onError(newExceptionForInvalidRequestN(n)); + } + break; } - break; } else if (oldVal == INITIAL) { if (n > 1) { - if (mayBeResultUpdater.compareAndSet(this, oldVal, REQUESTED_MORE)) { + if (mayBeResultUpdater.compareAndSet(this, INITIAL, REQUESTED_MORE)) { super.request(n - 1); break; } } else { assert n == 1; - if (mayBeResultUpdater.compareAndSet(this, oldVal, REQUESTED_ONE)) { + if (mayBeResultUpdater.compareAndSet(this, INITIAL, REQUESTED_ONE)) { break; } } @@ -288,9 +398,12 @@ public void request(long n) { break; } } else if (oldVal == SINGLE_DELIVERED) { - if (mayBeResultUpdater.compareAndSet(this, oldVal, PUBLISHER_SUBSCRIBED)) { - super.request(n); - next.subscribeInternal(this); + if (mayBeResultUpdater.compareAndSet(this, SINGLE_DELIVERED, PUBLISHER_SUBSCRIBED)) { + try { + super.request(n); + } finally { + next.subscribeInternal(this); + } break; } } else if (n > 1) { @@ -298,8 +411,11 @@ public void request(long n) { @SuppressWarnings("unchecked") final T tVal = (T) oldVal; if (tryEmitSingleSuccessToTarget(tVal)) { - super.request(n - 1); - next.subscribeInternal(this); + try { + super.request(n - 1); + } finally { + next.subscribeInternal(this); + } } break; } diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithPublisherTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithPublisherTest.java index cd5bab845b..b66ae16238 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithPublisherTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithPublisherTest.java @@ -21,11 +21,13 @@ import io.servicetalk.concurrent.api.TestPublisher; import io.servicetalk.concurrent.api.TestSingle; import io.servicetalk.concurrent.api.TestSubscription; +import io.servicetalk.concurrent.internal.DeliberateException; import io.servicetalk.concurrent.test.internal.TestPublisherSubscriber; import org.hamcrest.Matchers; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; @@ -60,7 +62,12 @@ class SingleConcatWithPublisherTest { private TestPublisher next = new TestPublisher.Builder().disableAutoOnSubscribe().build(); void setUp(boolean deferSubscribe) { - toSource(source.concat(next, deferSubscribe)).subscribe(subscriber); + setUp(deferSubscribe, false); + } + + void setUp(boolean deferSubscribe, boolean propagateCancel) { + toSource(propagateCancel ? source.concatPropagateCancel(next) : source.concat(next, deferSubscribe)) + .subscribe(subscriber); source.onSubscribe(cancellable); subscriber.awaitSubscription(); } @@ -77,19 +84,23 @@ private static Stream invalidRequestN() { private static Collection onNextErrorPropagatedParams() { List args = new ArrayList<>(); for (boolean deferSubscribe : asList(false, true)) { - for (long requestN : asList(1, 2)) { - for (boolean singleCompletesFirst : asList(false, true)) { - args.add(Arguments.of(deferSubscribe, requestN, singleCompletesFirst)); + for (boolean propagateCancel : asList(false, true)) { + for (long requestN : asList(1, 2)) { + for (boolean singleCompletesFirst : asList(false, true)) { + args.add(Arguments.of(deferSubscribe, propagateCancel, requestN, singleCompletesFirst)); + } } } } return args; } - @ParameterizedTest(name = "deferSubscribe={0} requestN={1} singleCompletesFirst={2}") + @ParameterizedTest(name = "deferSubscribe={0} propagateCancel={1} requestN={2} singleCompletesFirst={3}") @MethodSource("onNextErrorPropagatedParams") - void onNextErrorPropagated(boolean deferSubscribe, long n, boolean singleCompletesFirst) { - toSource(source.concat(next, deferSubscribe).map(x -> { + void onNextErrorPropagated(boolean deferSubscribe, boolean propagateCancel, long n, boolean singleCompletesFirst) + throws Exception { + toSource((propagateCancel ? source.concatPropagateCancel(next) : source.concat(next, deferSubscribe)) + .map(x -> { throw DELIBERATE_EXCEPTION; })).subscribe(subscriber); source.onSubscribe(cancellable); @@ -102,7 +113,13 @@ void onNextErrorPropagated(boolean deferSubscribe, long n, boolean singleComplet source.onSuccess(1); } assertThat(subscriber.awaitOnError(), is(DELIBERATE_EXCEPTION)); - assertThat(next.isSubscribed(), is(false)); + if (propagateCancel) { + next.awaitSubscribed(); + next.onSubscribe(subscription); + subscription.awaitCancelled(); + } else { + assertThat(next.isSubscribed(), is(false)); + } } @ParameterizedTest(name = "deferSubscribe={0}") @@ -189,23 +206,61 @@ void request0PropagatedAfterSuccess(boolean deferSubscribe) { is(true)); } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void sourceError(boolean deferSubscribe) { - setUp(deferSubscribe); + @ParameterizedTest(name = "deferSubscribe={0} propagateCancel={1} error={2}") + @CsvSource(value = {"false,false,false", "false,true,false", "false,true,true", "true,false,false", + "true,true,false", "true,true,true"}) + void sourceError(boolean deferSubscribe, boolean propagateCancel, boolean error) throws InterruptedException { + setUp(deferSubscribe, propagateCancel); source.onError(DELIBERATE_EXCEPTION); - assertThat("Unexpected subscriber termination.", subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); - assertThat("Next source subscribed unexpectedly.", next.isSubscribed(), is(false)); + assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); + if (propagateCancel) { + next.awaitSubscribed(); + next.onSubscribe(subscription); + subscription.awaitCancelled(); + + // Test that no duplicate terminal events are delivered. + if (error) { + next.onError(new DeliberateException()); + } else { + next.onComplete(); + } + } else { + assertThat(next.isSubscribed(), is(false)); + } } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void cancelSource(boolean deferSubscribe) { - setUp(deferSubscribe); + @ParameterizedTest(name = "deferSubscribe={0} propagateCancel={1} error={2}") + @CsvSource(value = {"false,false,false", "false,true,false", "false,true,true", "true,false,false", + "true,true,false", "true,true,true"}) + void cancelSource(boolean deferSubscribe, boolean propagateCancel, boolean error) throws InterruptedException { + setUp(deferSubscribe, propagateCancel); assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); - subscriber.awaitSubscription().cancel(); + Subscription subscription1 = subscriber.awaitSubscription(); + subscription1.request(2); + subscription1.cancel(); assertThat("Original single not cancelled.", cancellable.isCancelled(), is(true)); - assertThat("Next source subscribed unexpectedly.", next.isSubscribed(), is(false)); + if (propagateCancel) { + next.awaitSubscribed(); + next.onSubscribe(subscription); + subscription.awaitCancelled(); + + if (error) { + next.onError(new DeliberateException()); + } else { + next.onComplete(); + } + } else { + assertThat(next.isSubscribed(), is(false)); + if (error) { + source.onError(new DeliberateException()); + } else { + source.onSuccess(1); + } + assertThat(next.isSubscribed(), is(false)); + } + // It is not required that no terminal is delivered after cancel but verifies the current implementation for + // thread safety on the subscriber and to avoid duplicate terminals. + assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); } @ParameterizedTest(name = "deferSubscribe={0}") diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/SingleConcatWithPublisherDeferSubscribePropagateCancelTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/SingleConcatWithPublisherDeferSubscribePropagateCancelTckTest.java new file mode 100644 index 0000000000..4e3bb43916 --- /dev/null +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/SingleConcatWithPublisherDeferSubscribePropagateCancelTckTest.java @@ -0,0 +1,27 @@ +/* + * Copyright © 2022 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.reactivestreams.tck; + +import org.testng.annotations.Test; + +@Test +public class SingleConcatWithPublisherDeferSubscribePropagateCancelTckTest extends SingleConcatWithPublisherTckTest { + + @Override + boolean propagateCancel() { + return true; + } +} diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/SingleConcatWithPublisherTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/SingleConcatWithPublisherTckTest.java index b37e5cac5d..41adbee4c5 100644 --- a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/SingleConcatWithPublisherTckTest.java +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/SingleConcatWithPublisherTckTest.java @@ -20,6 +20,8 @@ import org.testng.annotations.Test; +import static io.servicetalk.concurrent.reactivestreams.tck.TckUtils.newPublisher; + @Test public class SingleConcatWithPublisherTckTest extends AbstractSingleTckTest { @@ -27,13 +29,18 @@ boolean deferSubscribe() { return false; } + boolean propagateCancel() { + return false; + } + @Override protected Publisher createServiceTalkPublisher(long elements) { if (elements < 2) { return Single.succeeded(1).toPublisher(); } - return Single.succeeded(1) - .concat(TckUtils.newPublisher(TckUtils.requestNToInt(elements) - 1), deferSubscribe()); + Single s = Single.succeeded(1); + Publisher p = newPublisher(TckUtils.requestNToInt(elements) - 1); + return propagateCancel() ? s.concatPropagateCancel(p) : s.concat(p, deferSubscribe()); } @Override From a78ad63254d19ee2e75d8d0ab5d85b044b38901c Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Fri, 30 Sep 2022 22:37:52 -0700 Subject: [PATCH 2/2] review comments --- .../io/servicetalk/concurrent/api/Single.java | 9 +- .../api/SingleConcatWithPublisher.java | 111 ++++--- .../single/SingleConcatWithPublisherTest.java | 290 ++++++++++-------- ...tWithPublisherPropagateCancelTckTest.java} | 2 +- 4 files changed, 225 insertions(+), 187 deletions(-) rename servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/{SingleConcatWithPublisherDeferSubscribePropagateCancelTckTest.java => SingleConcatWithPublisherPropagateCancelTckTest.java} (87%) diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Single.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Single.java index 67d45f481a..a35dcd7f45 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Single.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Single.java @@ -764,6 +764,7 @@ public final Single concat(Completable next) { * @return New {@link Publisher} that first emits the result of this {@link Single} and then subscribes and emits * all elements from {@code next} {@link Publisher}. * @see #concat(Publisher, boolean) + * @see #concatPropagateCancel(Publisher) */ public final Publisher concat(Publisher next) { return concat(next, false); @@ -797,9 +798,8 @@ public final Publisher concat(Publisher next, boolean deferSubsc } /** - * Returns a {@link Publisher} that first emits the result of this {@link Single} and then subscribes and emits all - * elements from {@code next} {@link Publisher}. Any error emitted by this {@link Single} or {@code next} - * {@link Publisher} is forwarded to the returned {@link Publisher}. + * This method is like {@link #concat(Completable)} except {@code next} will be subscribed to and cancelled if this + * {@link Publisher} is cancelled or terminates with {@link Subscriber#onError(Throwable)}. *

* This method provides a means to sequence the execution of two asynchronous sources and in sequential programming * is similar to: @@ -809,10 +809,11 @@ public final Publisher concat(Publisher next, boolean deferSubsc * results.addAll(nextStream()); * return results; * } - * @param next {@link Publisher} to concat. will be subscribed to and cancelled if this {@link Publisher} is + * @param next {@link Publisher} to concat. Will be subscribed to and cancelled if this {@link Publisher} is * cancelled or terminates with {@link Subscriber#onError(Throwable)}. * @return New {@link Publisher} that first emits the result of this {@link Single} and then subscribes and emits * all elements from {@code next} {@link Publisher}. + * @see #concat(Completable) */ public final Publisher concatPropagateCancel(Publisher next) { return new SingleConcatWithPublisher<>(this, next, false, true); diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleConcatWithPublisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleConcatWithPublisher.java index 89cfde9553..1d7450ad49 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleConcatWithPublisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleConcatWithPublisher.java @@ -51,7 +51,6 @@ void handleSubscribe(final Subscriber subscriber, private abstract static class AbstractConcatSubscriber extends DelayedCancellableThenSubscription implements SingleSource.Subscriber, Subscriber { - /** * Initial state upon creation. */ @@ -61,6 +60,14 @@ private abstract static class AbstractConcatSubscriber extends DelayedCancell * delivered. */ static final Object CANCELLED = new Object(); + /** + * Cancelled after {@link #onSuccess(Object)} or terminal signal received (prevents duplicate terminals). + */ + static final Object TERMINAL = new Object(); + /** + * After {@link #onSuccess(Object)} and {@link #request(long)} but before subscribing to {@link #next}. + */ + static final Object PUBLISHER_SUBSCRIBED = new Object(); @SuppressWarnings("rawtypes") static final AtomicReferenceFieldUpdater mayBeResultUpdater = @@ -113,26 +120,21 @@ public final void onError(final Throwable t) { private void onErrorPropagateCancel(Throwable t) { for (;;) { final Object oldValue = mayBeResult; - if (oldValue == CANCELLED) { - // CANCELLED means downstream doesn't care about being notified and internal state tracking also - // uses this state if terminal has been propagated, so avoid duplicate terminal. We may - // subscribe to both sources in parallel if cancellation occurs, so allowing terminal to - // propagate would mean ordering and concurrency needs to be accounted for between Single and - // Publisher, because cancel allows for no more future delivery we avoid future invocation of - // the target subscriber. + if (oldValue == TERMINAL) { + // Only propagate terminal if we were cancelled after the first source terminated. Otherwise, + // we may deliver items out of order and fail the TCK tests by delivering terminal after cancel. break; - } else if (finallyShouldSubscribeToNext(oldValue)) { - if (mayBeResultUpdater.compareAndSet(this, oldValue, CANCELLED)) { + } else if (mayBeResultUpdater.compareAndSet(this, oldValue, TERMINAL)) { + if (finallyShouldSubscribeToNext(oldValue)) { forceCancelNextOnSubscribe(); try { target.onError(t); } finally { next.subscribeInternal(this); } - break; + } else { + target.onError(t); } - } else if (mayBeResultUpdater.compareAndSet(this, oldValue, CANCELLED)) { - target.onError(t); break; } } @@ -150,15 +152,11 @@ public final void onComplete() { private void onCompletePropagateCancel() { for (;;) { final Object oldValue = mayBeResult; - if (oldValue == CANCELLED) { - // CANCELLED means downstream doesn't care about being notified and internal state tracking also - // uses this state if terminal has been propagated, so avoid duplicate terminal. We may - // subscribe to both sources in parallel if cancellation occurs, so allowing terminal to - // propagate would mean ordering and concurrency needs to be accounted for between Single and - // Publisher, because cancel allows for no more future delivery we avoid future invocation of - // the target subscriber. + if (oldValue == TERMINAL) { + // Only propagate terminal if we were cancelled after the first source terminated. Otherwise, + // we may deliver items out of order and fail the TCK tests by delivering terminal after cancel. break; - } else if (mayBeResultUpdater.compareAndSet(this, oldValue, CANCELLED)) { + } else if (mayBeResultUpdater.compareAndSet(this, oldValue, TERMINAL)) { // onComplete() can only be called after we subscribe to next Publisher, no need to check if we need // to subscribe to next. target.onComplete(); @@ -172,18 +170,25 @@ public final void cancel() { // We track cancelled here because we need to make sure if cancel() happens subsequent calls to request(n) // are NOOPs [1]. // [1] https://github.com/reactive-streams/reactive-streams-jvm#3.6 - final Object oldValue = mayBeResultUpdater.getAndSet(this, CANCELLED); - try { - super.cancel(); // call cancel first, so if we do subscribe to next we won't propagate demand. - } finally { - if (propagateCancel && oldValue != CANCELLED && finallyShouldSubscribeToNext(oldValue)) { - next.subscribeInternal(this); + for (;;) { + final Object oldValue = mayBeResult; + if (oldValue == CANCELLED || oldValue == TERMINAL) { + break; + } + final boolean firstCancel = finallyShouldSubscribeToNext(oldValue); + if (mayBeResultUpdater.compareAndSet(this, oldValue, firstCancel ? TERMINAL : CANCELLED)) { + try { + super.cancel(); // call cancel first, so if we do subscribe to next we won't propagate demand. + } finally { + if (propagateCancel && firstCancel) { + next.subscribeInternal(this); + } + } + break; } } } - abstract boolean finallyShouldSubscribeToNext(@Nullable Object oldState); - /** * Helper method to invoke {@link DelayedCancellableThenSubscription#cancel()} from subclasses. */ @@ -202,11 +207,15 @@ final boolean tryEmitSingleSuccessToTarget(@Nullable final T result) { } } + private boolean finallyShouldSubscribeToNext(@Nullable Object oldState) { + return oldState != PUBLISHER_SUBSCRIBED; + } + private boolean handleOnNextThrowable(Throwable cause) { - // Switch state to CANCELLED to prevent any further interaction with target. For example if propagateCancel + // Switch state to TERMINAL to prevent any further interaction with target. For example if propagateCancel // then we will subscribe and the next subscriber may send another terminal without any demand. We don't // have to explicitly cancel here because the Single has already terminated. - mayBeResult = CANCELLED; + mayBeResult = TERMINAL; target.onError(cause); if (propagateCancel) { forceCancelNextOnSubscribe(); @@ -227,31 +236,26 @@ private static final class ConcatSubscriber extends AbstractConcatSubscriber< * If {@link #request(long)} (with a valid n) invoked before {@link #onSuccess(Object)}. */ private static final Object REQUESTED = new Object(); - private static final Object REQUESTED_SUBSCRIBED = new Object(); ConcatSubscriber(final Subscriber target, final Publisher next, final boolean propagateCancel) { super(target, next, propagateCancel); } - @Override - boolean finallyShouldSubscribeToNext(@Nullable final Object oldState) { - return oldState != REQUESTED_SUBSCRIBED; - } - @Override public void onSuccess(@Nullable final T result) { for (;;) { final Object oldValue = mayBeResult; - assert oldValue != REQUESTED_SUBSCRIBED; + assert oldValue != PUBLISHER_SUBSCRIBED; if (oldValue == REQUESTED) { - if (mayBeResultUpdater.compareAndSet(this, REQUESTED, REQUESTED_SUBSCRIBED)) { + if (mayBeResultUpdater.compareAndSet(this, REQUESTED, PUBLISHER_SUBSCRIBED)) { if (tryEmitSingleSuccessToTarget(result)) { next.subscribeInternal(this); } break; } - } else if (oldValue == CANCELLED || mayBeResultUpdater.compareAndSet(this, INITIAL, result)) { + } else if (oldValue == CANCELLED || oldValue == TERMINAL || + mayBeResultUpdater.compareAndSet(this, INITIAL, result)) { break; } } @@ -261,13 +265,13 @@ public void onSuccess(@Nullable final T result) { public void request(long n) { for (;;) { final Object oldVal = mayBeResult; - if (oldVal == CANCELLED) { + if (oldVal == CANCELLED || oldVal == TERMINAL) { break; - } else if (oldVal == REQUESTED || oldVal == REQUESTED_SUBSCRIBED) { + } else if (oldVal == REQUESTED || oldVal == PUBLISHER_SUBSCRIBED) { super.request(n); break; } else if (!isRequestNValid(n)) { - if (mayBeResultUpdater.compareAndSet(this, oldVal, CANCELLED)) { + if (mayBeResultUpdater.compareAndSet(this, oldVal, TERMINAL)) { try { superCancel(); } finally { @@ -276,7 +280,7 @@ public void request(long n) { break; } } else if (mayBeResultUpdater.compareAndSet(this, oldVal, - oldVal != INITIAL ? REQUESTED_SUBSCRIBED : REQUESTED)) { + oldVal != INITIAL ? PUBLISHER_SUBSCRIBED : REQUESTED)) { // We need to ensure that the queued result is delivered in order (first). Upstream demand is // delayed via DelayedSubscription until onSubscribe which preserves ordering, and there are some // scenarios where subscribing to the concat Publisher may block on demand (e.g. @@ -317,22 +321,12 @@ private static final class ConcatDeferNextSubscriber extends AbstractConcatSu * delivered to the target. */ private static final Object SINGLE_DELIVERED = new Object(); - /** - * If more than one item was {@link #request(long) requested}, {@link #onSuccess(Object)} invoked, and we - * subscribed to the next {@link Publisher}. - */ - private static final Object PUBLISHER_SUBSCRIBED = new Object(); ConcatDeferNextSubscriber(final Subscriber target, final Publisher next, final boolean propagateCancel) { super(target, next, propagateCancel); } - @Override - boolean finallyShouldSubscribeToNext(@Nullable final Object oldState) { - return oldState != PUBLISHER_SUBSCRIBED; - } - @Override public void onSuccess(@Nullable final T result) { for (;;) { @@ -341,7 +335,7 @@ public void onSuccess(@Nullable final T result) { assert oldValue != SINGLE_DELIVERED; assert oldValue != PUBLISHER_SUBSCRIBED; - if (oldValue == CANCELLED) { + if (oldValue == CANCELLED || oldValue == TERMINAL) { break; } else if (oldValue == INITIAL) { if (mayBeResultUpdater.compareAndSet(this, INITIAL, result)) { @@ -366,13 +360,13 @@ public void onSuccess(@Nullable final T result) { public void request(long n) { for (;;) { final Object oldVal = mayBeResult; - if (oldVal == CANCELLED) { + if (oldVal == CANCELLED || oldVal == TERMINAL) { break; } else if (oldVal == PUBLISHER_SUBSCRIBED || oldVal == REQUESTED_MORE) { super.request(n); break; } else if (!isRequestNValid(n)) { - if (mayBeResultUpdater.compareAndSet(this, oldVal, CANCELLED)) { + if (mayBeResultUpdater.compareAndSet(this, oldVal, TERMINAL)) { try { superCancel(); } finally { @@ -436,7 +430,8 @@ private void emitSingleSuccessToTarget(@Nullable final T result) { // more demand appeared while we were delivering the single result next.subscribeInternal(this); } else { - assert mayBeResult == CANCELLED; + final Object oldValue = mayBeResult; + assert oldValue == CANCELLED || oldValue == TERMINAL; } } } diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithPublisherTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithPublisherTest.java index b66ae16238..1e339620bb 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithPublisherTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithPublisherTest.java @@ -27,9 +27,8 @@ import org.hamcrest.Matchers; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; import java.util.ArrayList; import java.util.Collection; @@ -42,12 +41,16 @@ import static io.servicetalk.concurrent.api.Publisher.never; import static io.servicetalk.concurrent.api.Single.succeeded; import static io.servicetalk.concurrent.api.SourceAdapters.toSource; +import static io.servicetalk.concurrent.api.single.SingleConcatWithPublisherTest.ConcatMode.CONCAT; +import static io.servicetalk.concurrent.api.single.SingleConcatWithPublisherTest.ConcatMode.DEFER_SUBSCRIBE; +import static io.servicetalk.concurrent.api.single.SingleConcatWithPublisherTest.ConcatMode.PROPAGATE_CANCEL; import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; -import static io.servicetalk.utils.internal.PlatformDependent.throwException; +import static io.servicetalk.utils.internal.ThrowableUtils.throwException; import static java.util.Arrays.asList; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; @@ -61,46 +64,62 @@ class SingleConcatWithPublisherTest { private TestSingle source = new TestSingle.Builder().disableAutoOnSubscribe().build(); private TestPublisher next = new TestPublisher.Builder().disableAutoOnSubscribe().build(); - void setUp(boolean deferSubscribe) { - setUp(deferSubscribe, false); + enum ConcatMode { + CONCAT, DEFER_SUBSCRIBE, PROPAGATE_CANCEL } - void setUp(boolean deferSubscribe, boolean propagateCancel) { - toSource(propagateCancel ? source.concatPropagateCancel(next) : source.concat(next, deferSubscribe)) - .subscribe(subscriber); + void setUp(ConcatMode mode) { + toSource(mode == PROPAGATE_CANCEL ? + source.concatPropagateCancel(next) : + source.concat(next, mode == DEFER_SUBSCRIBE)) + .subscribe(subscriber); source.onSubscribe(cancellable); subscriber.awaitSubscription(); } @SuppressWarnings("unused") private static Stream invalidRequestN() { - return Stream.of(Arguments.of(false, -1), - Arguments.of(false, 0), - Arguments.of(true, -1), - Arguments.of(true, 0)); + return Stream.of( + Arguments.of(CONCAT, -1), + Arguments.of(CONCAT, 0), + Arguments.of(DEFER_SUBSCRIBE, -1), + Arguments.of(DEFER_SUBSCRIBE, 0), + Arguments.of(PROPAGATE_CANCEL, -1), + Arguments.of(PROPAGATE_CANCEL, 0)); + } + + @SuppressWarnings("unused") + private static Stream modeAndError() { + return Stream.of( + Arguments.of(CONCAT, true), + Arguments.of(CONCAT, false), + Arguments.of(DEFER_SUBSCRIBE, true), + Arguments.of(DEFER_SUBSCRIBE, false), + Arguments.of(PROPAGATE_CANCEL, true), + Arguments.of(PROPAGATE_CANCEL, false)); } @SuppressWarnings("unused") private static Collection onNextErrorPropagatedParams() { List args = new ArrayList<>(); - for (boolean deferSubscribe : asList(false, true)) { - for (boolean propagateCancel : asList(false, true)) { - for (long requestN : asList(1, 2)) { - for (boolean singleCompletesFirst : asList(false, true)) { - args.add(Arguments.of(deferSubscribe, propagateCancel, requestN, singleCompletesFirst)); - } + for (ConcatMode mode : ConcatMode.values()) { + for (long requestN : asList(1, 2)) { + for (boolean singleCompletesFirst : asList(false, true)) { + args.add(Arguments.of(mode, requestN, singleCompletesFirst)); } } } return args; } - @ParameterizedTest(name = "deferSubscribe={0} propagateCancel={1} requestN={2} singleCompletesFirst={3}") + @ParameterizedTest(name = "mode={0} requestN={2} singleCompletesFirst={3}") @MethodSource("onNextErrorPropagatedParams") - void onNextErrorPropagated(boolean deferSubscribe, boolean propagateCancel, long n, boolean singleCompletesFirst) + void onNextErrorPropagated(ConcatMode mode, long n, boolean singleCompletesFirst) throws Exception { - toSource((propagateCancel ? source.concatPropagateCancel(next) : source.concat(next, deferSubscribe)) - .map(x -> { + toSource((mode == PROPAGATE_CANCEL ? + source.concatPropagateCancel(next) : + source.concat(next, mode == DEFER_SUBSCRIBE)) + .map(x -> { throw DELIBERATE_EXCEPTION; })).subscribe(subscriber); source.onSubscribe(cancellable); @@ -113,7 +132,7 @@ void onNextErrorPropagated(boolean deferSubscribe, boolean propagateCancel, long source.onSuccess(1); } assertThat(subscriber.awaitOnError(), is(DELIBERATE_EXCEPTION)); - if (propagateCancel) { + if (mode == PROPAGATE_CANCEL) { next.awaitSubscribed(); next.onSubscribe(subscription); subscription.awaitCancelled(); @@ -122,11 +141,11 @@ void onNextErrorPropagated(boolean deferSubscribe, boolean propagateCancel, long } } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void bothCompletion(boolean deferSubscribe) { - setUp(deferSubscribe); - long requested = triggerNextSubscribe(deferSubscribe); + @ParameterizedTest(name = "mode={0}") + @EnumSource(ConcatMode.class) + void bothCompletion(ConcatMode mode) { + setUp(mode); + long requested = triggerNextSubscribe(mode); assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); subscriber.awaitSubscription().request(2); assertThat("Unexpected items requested.", subscription.requested(), is(requested - 1 + 2)); @@ -136,84 +155,85 @@ void bothCompletion(boolean deferSubscribe) { subscriber.awaitOnComplete(); } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void sourceCompletionNextError(boolean deferSubscribe) { - setUp(deferSubscribe); - triggerNextSubscribe(deferSubscribe); + @ParameterizedTest(name = "mode={0}") + @EnumSource(ConcatMode.class) + void sourceCompletionNextError(ConcatMode mode) { + setUp(mode); + triggerNextSubscribe(mode); assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); next.onError(DELIBERATE_EXCEPTION); assertThat(subscriber.takeOnNext(), is(1)); assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); } - @ParameterizedTest(name = "deferSubscribe={0} invalidRequestN={1}") + @ParameterizedTest(name = "mode={0} invalidRequestN={1}") @MethodSource("invalidRequestN") - void invalidRequestNBeforeNextSubscribe(boolean deferSubscribe, long invalidRequestN) { - setUp(deferSubscribe); + void invalidRequestNBeforeNextSubscribe(ConcatMode mode, long invalidRequestN) { + setUp(mode); subscriber.awaitSubscription().request(invalidRequestN); source.onSuccess(1); assertThat(subscriber.awaitOnError(), instanceOf(IllegalArgumentException.class)); } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void invalidRequestNWithInlineSourceCompletion(boolean deferSubscribe) { - toSource(succeeded(1).concat(empty(), deferSubscribe)).subscribe(subscriber); + @ParameterizedTest(name = "mode={0}") + @EnumSource(ConcatMode.class) + void invalidRequestNWithInlineSourceCompletion(ConcatMode mode) { + toSource(mode == PROPAGATE_CANCEL ? + succeeded(1).concatPropagateCancel(empty()) : + succeeded(1).concat(empty(), mode == DEFER_SUBSCRIBE)).subscribe(subscriber); subscriber.awaitSubscription().request(-1); assertThat(subscriber.awaitOnError(), instanceOf(IllegalArgumentException.class)); } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void invalidRequestAfterNextSubscribe(boolean deferSubscribe) { - setUp(deferSubscribe); - triggerNextSubscribe(deferSubscribe); + @ParameterizedTest(name = "mode={0}") + @EnumSource(ConcatMode.class) + void invalidRequestAfterNextSubscribe(ConcatMode mode) { + setUp(mode); + triggerNextSubscribe(mode); subscriber.awaitSubscription().request(-1); assertThat("Invalid request-n not propagated.", subscription.requested(), is(lessThan(0L))); } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void multipleInvalidRequest(boolean deferSubscribe) { - setUp(deferSubscribe); + @ParameterizedTest(name = "mode={0}") + @EnumSource(ConcatMode.class) + void multipleInvalidRequest(ConcatMode mode) { + setUp(mode); subscriber.awaitSubscription().request(-1); subscriber.awaitSubscription().request(-10); assertThat(subscriber.awaitOnError(), instanceOf(IllegalArgumentException.class)); } - @ParameterizedTest(name = "deferSubscribe={0} invalidRequestN={1}") + @ParameterizedTest(name = "mode={0} invalidRequestN={1}") @MethodSource("invalidRequestN") - void invalidThenValidRequest(boolean deferSubscribe, long invalidRequestN) { - setUp(deferSubscribe); + void invalidThenValidRequest(ConcatMode mode, long invalidRequestN) { + setUp(mode); subscriber.awaitSubscription().request(invalidRequestN); subscriber.awaitSubscription().request(1); assertThat(subscriber.awaitOnError(), instanceOf(IllegalArgumentException.class)); assertThat(cancellable.isCancelled(), is(true)); } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void request0PropagatedAfterSuccess(boolean deferSubscribe) { - setUp(deferSubscribe); + @ParameterizedTest(name = "mode={0}") + @EnumSource(ConcatMode.class) + void request0PropagatedAfterSuccess(ConcatMode mode) { + setUp(mode); source.onSuccess(1); - subscriber.awaitSubscription().request(deferSubscribe ? 2 : 1); // get the success from the Single + subscriber.awaitSubscription().request(mode == DEFER_SUBSCRIBE ? 2 : 1); // get the success from the Single assertThat("Next source not subscribed.", next.isSubscribed(), is(true)); next.onSubscribe(subscription); - assertThat(subscription.requested(), is(deferSubscribe ? 1L : 0L)); + assertThat(subscription.requested(), is(mode == DEFER_SUBSCRIBE ? 1L : 0L)); subscriber.awaitSubscription().request(0); assertThat("Invalid request-n not propagated " + subscription, subscription.requestedEquals(0), is(true)); } - @ParameterizedTest(name = "deferSubscribe={0} propagateCancel={1} error={2}") - @CsvSource(value = {"false,false,false", "false,true,false", "false,true,true", "true,false,false", - "true,true,false", "true,true,true"}) - void sourceError(boolean deferSubscribe, boolean propagateCancel, boolean error) throws InterruptedException { - setUp(deferSubscribe, propagateCancel); + @ParameterizedTest(name = "mode={0} error={1}") + @MethodSource("modeAndError") + void sourceError(ConcatMode mode, boolean error) throws InterruptedException { + setUp(mode); source.onError(DELIBERATE_EXCEPTION); assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); - if (propagateCancel) { + if (mode == PROPAGATE_CANCEL) { next.awaitSubscribed(); next.onSubscribe(subscription); subscription.awaitCancelled(); @@ -229,17 +249,23 @@ void sourceError(boolean deferSubscribe, boolean propagateCancel, boolean error) } } - @ParameterizedTest(name = "deferSubscribe={0} propagateCancel={1} error={2}") - @CsvSource(value = {"false,false,false", "false,true,false", "false,true,true", "true,false,false", - "true,true,false", "true,true,true"}) - void cancelSource(boolean deferSubscribe, boolean propagateCancel, boolean error) throws InterruptedException { - setUp(deferSubscribe, propagateCancel); + @ParameterizedTest(name = "mode={0} error={1}") + @MethodSource("modeAndError") + void cancelSource(ConcatMode mode, boolean error) throws InterruptedException { + setUp(mode); assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); Subscription subscription1 = subscriber.awaitSubscription(); subscription1.request(2); subscription1.cancel(); assertThat("Original single not cancelled.", cancellable.isCancelled(), is(true)); - if (propagateCancel) { + + if (error) { + source.onError(DELIBERATE_EXCEPTION); + } else { + source.onSuccess(1); + } + + if (mode == PROPAGATE_CANCEL) { next.awaitSubscribed(); next.onSubscribe(subscription); subscription.awaitCancelled(); @@ -249,49 +275,61 @@ void cancelSource(boolean deferSubscribe, boolean propagateCancel, boolean error } else { next.onComplete(); } + + // It is not required that no terminal is delivered after cancel but verifies the current implementation for + // thread safety on the subscriber and to avoid duplicate terminals. + assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); } else { assertThat(next.isSubscribed(), is(false)); + if (error) { - source.onError(new DeliberateException()); + assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); } else { - source.onSuccess(1); + // It is not required that no terminal is delivered after cancel but verifies the current implementation + // for thread safety on the subscriber and to avoid duplicate terminals. + assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); } - assertThat(next.isSubscribed(), is(false)); } - // It is not required that no terminal is delivered after cancel but verifies the current implementation for - // thread safety on the subscriber and to avoid duplicate terminals. - assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void cancelSourcePostRequest(boolean deferSubscribe) { - setUp(deferSubscribe); + @ParameterizedTest(name = "mode={0}") + @EnumSource(ConcatMode.class) + void cancelSourcePostRequest(ConcatMode mode) { + setUp(mode); assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); subscriber.awaitSubscription().request(1); subscriber.awaitSubscription().cancel(); assertThat("Original single not cancelled.", cancellable.isCancelled(), is(true)); - assertThat("Next source subscribed unexpectedly.", next.isSubscribed(), is(false)); + assertThat("Next source subscribed unexpectedly.", next.isSubscribed(), is(mode == PROPAGATE_CANCEL)); } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void cancelNext(boolean deferSubscribe) { - setUp(deferSubscribe); - triggerNextSubscribe(deferSubscribe); + @ParameterizedTest(name = "mode={0} error={1}") + @MethodSource("modeAndError") + void cancelNext(ConcatMode mode, boolean error) { + setUp(mode); + triggerNextSubscribe(mode); assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); subscriber.awaitSubscription().cancel(); assertThat("Original single cancelled unexpectedly.", cancellable.isCancelled(), is(false)); assertThat("Next source not cancelled.", subscription.isCancelled(), is(true)); + + assertThat(subscriber.takeOnNext(), equalTo(1)); + if (error) { + next.onError(DELIBERATE_EXCEPTION); + assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); + } else { + next.onComplete(); + subscriber.awaitOnComplete(); + } } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void zeroIsNotRequestedOnTransitionToSubscription(boolean deferSubscribe) { - setUp(deferSubscribe); + @ParameterizedTest(name = "mode={0}") + @EnumSource(ConcatMode.class) + void zeroIsNotRequestedOnTransitionToSubscription(ConcatMode mode) { + setUp(mode); subscriber.awaitSubscription().request(1); source.onSuccess(1); - if (deferSubscribe) { + if (mode == DEFER_SUBSCRIBE) { assertThat(next.isSubscribed(), is(false)); subscriber.awaitSubscription().request(1); assertThat(next.isSubscribed(), is(true)); @@ -304,14 +342,14 @@ void zeroIsNotRequestedOnTransitionToSubscription(boolean deferSubscribe) { } } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void publisherSubscribeBlockDemandMakesProgress(boolean deferSubscribe) { + @ParameterizedTest(name = "mode={0}") + @EnumSource(ConcatMode.class) + void publisherSubscribeBlockDemandMakesProgress(ConcatMode mode) { source = new TestSingle<>(); next = new TestPublisher.Builder().build(sub1 -> { sub1.onSubscribe(subscription); try { - // Simulate the a blocking operation on demand, like ConnectablePayloadWriter. + // Simulate a blocking operation on demand, like ConnectablePayloadWriter. subscription.awaitRequestN(1); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -319,7 +357,7 @@ void publisherSubscribeBlockDemandMakesProgress(boolean deferSubscribe) { } return sub1; }); - setUp(deferSubscribe); + setUp(mode); source.onSuccess(10); // Give at least 2 demand so there is enough to unblock the awaitRequestN and deliver data below. @@ -331,10 +369,10 @@ void publisherSubscribeBlockDemandMakesProgress(boolean deferSubscribe) { subscriber.awaitOnComplete(); } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void onErrorAfterInvalidRequestN(boolean deferSubscribe) { - setUp(deferSubscribe); + @ParameterizedTest(name = "mode={0}") + @EnumSource(ConcatMode.class) + void onErrorAfterInvalidRequestN(ConcatMode mode) { + setUp(mode); source.onSuccess(1); subscriber.awaitSubscription().request(2L); assertThat("Unexpected next element.", subscriber.takeOnNext(), is(1)); @@ -352,10 +390,10 @@ void onErrorAfterInvalidRequestN(boolean deferSubscribe) { subscriber.awaitOnError(), instanceOf(IllegalArgumentException.class)); } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void singleCompletesWithNull(boolean deferSubscribe) { - setUp(deferSubscribe); + @ParameterizedTest(name = "mode={0}") + @EnumSource(ConcatMode.class) + void singleCompletesWithNull(ConcatMode mode) { + setUp(mode); source.onSuccess(null); subscriber.awaitSubscription().request(2); assertThat("Next source not subscribed.", next.isSubscribed(), is(true)); @@ -365,10 +403,10 @@ void singleCompletesWithNull(boolean deferSubscribe) { subscriber.awaitOnComplete(); } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void demandAccumulatedBeforeSingleCompletes(boolean deferSubscribe) { - setUp(deferSubscribe); + @ParameterizedTest(name = "mode={0}") + @EnumSource(ConcatMode.class) + void demandAccumulatedBeforeSingleCompletes(ConcatMode mode) { + setUp(mode); subscriber.awaitSubscription().request(3L); assertThat("Next source subscribed unexpectedly.", next.isSubscribed(), is(false)); assertThat(subscription.requested(), is(0L)); @@ -385,10 +423,10 @@ void demandAccumulatedBeforeSingleCompletes(boolean deferSubscribe) { subscriber.awaitOnComplete(); } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void requestOneThenMore(boolean deferSubscribe) { - setUp(deferSubscribe); + @ParameterizedTest(name = "mode={0}") + @EnumSource(ConcatMode.class) + void requestOneThenMore(ConcatMode mode) { + setUp(mode); subscriber.awaitSubscription().request(1L); subscriber.awaitSubscription().request(1L); assertThat(subscription.requested(), is(0L)); @@ -403,13 +441,15 @@ void requestOneThenMore(boolean deferSubscribe) { subscriber.awaitOnComplete(); } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void reentryWithMoreDemand(boolean deferSubscribe) { + @ParameterizedTest(name = "mode={0}") + @EnumSource(ConcatMode.class) + void reentryWithMoreDemand(ConcatMode mode) { List emitted = new ArrayList<>(); boolean[] completed = {false}; - toSource(succeeded(1).concat(from(2), deferSubscribe)).subscribe(new Subscriber() { - + toSource(mode == PROPAGATE_CANCEL ? + succeeded(1).concatPropagateCancel(from(2)) : + succeeded(1).concat(from(2), mode == DEFER_SUBSCRIBE) + ).subscribe(new Subscriber() { @Nullable private Subscription subscription; @@ -440,13 +480,14 @@ public void onComplete() { assertThat(completed[0], is(true)); } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void cancelledDuringFirstOnNext(boolean deferSubscribe) { + @ParameterizedTest(name = "mode={0}") + @EnumSource(ConcatMode.class) + void cancelledDuringFirstOnNext(ConcatMode mode) { List emitted = new ArrayList<>(); boolean[] terminated = {false}; - toSource(succeeded(1).concat(never(), deferSubscribe)).subscribe(new Subscriber() { - + toSource(mode == PROPAGATE_CANCEL ? + succeeded(1).concatPropagateCancel(never()) : + succeeded(1).concat(never(), mode == DEFER_SUBSCRIBE)).subscribe(new Subscriber() { @Nullable private Subscription subscription; @@ -478,10 +519,11 @@ public void onComplete() { assertThat(terminated[0], is(false)); } - private long triggerNextSubscribe(boolean deferSubscribe) { - final long n = deferSubscribe ? 2 : 1; + private long triggerNextSubscribe(ConcatMode mode) { + final long n = mode == DEFER_SUBSCRIBE ? 2 : 1; subscriber.awaitSubscription().request(n); source.onSuccess(1); + next.awaitSubscribed(); next.onSubscribe(subscription); return n; } diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/SingleConcatWithPublisherDeferSubscribePropagateCancelTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/SingleConcatWithPublisherPropagateCancelTckTest.java similarity index 87% rename from servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/SingleConcatWithPublisherDeferSubscribePropagateCancelTckTest.java rename to servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/SingleConcatWithPublisherPropagateCancelTckTest.java index 4e3bb43916..aea2de3044 100644 --- a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/SingleConcatWithPublisherDeferSubscribePropagateCancelTckTest.java +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/SingleConcatWithPublisherPropagateCancelTckTest.java @@ -18,7 +18,7 @@ import org.testng.annotations.Test; @Test -public class SingleConcatWithPublisherDeferSubscribePropagateCancelTckTest extends SingleConcatWithPublisherTckTest { +public class SingleConcatWithPublisherPropagateCancelTckTest extends SingleConcatWithPublisherTckTest { @Override boolean propagateCancel() {