diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Single.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Single.java index 946402991e..a35dcd7f45 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Single.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Single.java @@ -764,9 +764,10 @@ public final Single concat(Completable next) { * @return New {@link Publisher} that first emits the result of this {@link Single} and then subscribes and emits * all elements from {@code next} {@link Publisher}. * @see #concat(Publisher, boolean) + * @see #concatPropagateCancel(Publisher) */ public final Publisher concat(Publisher next) { - return new SingleConcatWithPublisher<>(this, next, false); + return concat(next, false); } /** @@ -793,7 +794,29 @@ public final Publisher concat(Publisher next) { * all elements from {@code next} {@link Publisher}. */ public final Publisher concat(Publisher next, boolean deferSubscribe) { - return new SingleConcatWithPublisher<>(this, next, deferSubscribe); + return new SingleConcatWithPublisher<>(this, next, deferSubscribe, false); + } + + /** + * This method is like {@link #concat(Completable)} except {@code next} will be subscribed to and cancelled if this + * {@link Publisher} is cancelled or terminates with {@link Subscriber#onError(Throwable)}. + *

+ * This method provides a means to sequence the execution of two asynchronous sources and in sequential programming + * is similar to: + *

{@code
+     *     List results = new ...;
+     *     results.add(resultOfThisSingle());
+     *     results.addAll(nextStream());
+     *     return results;
+     * }
+ * @param next {@link Publisher} to concat. Will be subscribed to and cancelled if this {@link Publisher} is + * cancelled or terminates with {@link Subscriber#onError(Throwable)}. + * @return New {@link Publisher} that first emits the result of this {@link Single} and then subscribes and emits + * all elements from {@code next} {@link Publisher}. + * @see #concat(Completable) + */ + public final Publisher concatPropagateCancel(Publisher next) { + return new SingleConcatWithPublisher<>(this, next, false, true); } /** diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleConcatWithPublisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleConcatWithPublisher.java index df1e5a536a..1d7450ad49 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleConcatWithPublisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleConcatWithPublisher.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.annotation.Nullable; +import static io.servicetalk.concurrent.internal.EmptySubscriptions.EMPTY_SUBSCRIPTION; import static io.servicetalk.concurrent.internal.SubscriberUtils.isRequestNValid; import static io.servicetalk.concurrent.internal.SubscriberUtils.newExceptionForInvalidRequestN; import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater; @@ -31,24 +32,25 @@ final class SingleConcatWithPublisher extends AbstractNoHandleSubscribePublis private final Single original; private final Publisher next; private final boolean deferSubscribe; + private final boolean propagateCancel; SingleConcatWithPublisher(final Single original, final Publisher next, - final boolean deferSubscribe) { + final boolean deferSubscribe, final boolean propagateCancel) { this.original = original; this.next = Objects.requireNonNull(next, "next"); this.deferSubscribe = deferSubscribe; + this.propagateCancel = propagateCancel; } @Override void handleSubscribe(final Subscriber subscriber, final ContextMap contextMap, final AsyncContextProvider contextProvider) { - original.delegateSubscribe(deferSubscribe ? new ConcatDeferNextSubscriber<>(subscriber, next) : - new ConcatSubscriber<>(subscriber, next), contextMap, contextProvider); + original.delegateSubscribe(deferSubscribe ? new ConcatDeferNextSubscriber<>(subscriber, next, propagateCancel) : + new ConcatSubscriber<>(subscriber, next, propagateCancel), contextMap, contextProvider); } private abstract static class AbstractConcatSubscriber extends DelayedCancellableThenSubscription implements SingleSource.Subscriber, Subscriber { - /** * Initial state upon creation. */ @@ -58,11 +60,20 @@ private abstract static class AbstractConcatSubscriber extends DelayedCancell * delivered. */ static final Object CANCELLED = new Object(); + /** + * Cancelled after {@link #onSuccess(Object)} or terminal signal received (prevents duplicate terminals). + */ + static final Object TERMINAL = new Object(); + /** + * After {@link #onSuccess(Object)} and {@link #request(long)} but before subscribing to {@link #next}. + */ + static final Object PUBLISHER_SUBSCRIBED = new Object(); @SuppressWarnings("rawtypes") static final AtomicReferenceFieldUpdater mayBeResultUpdater = newUpdater(AbstractConcatSubscriber.class, Object.class, "mayBeResult"); + private final boolean propagateCancel; final Subscriber target; final Publisher next; @@ -72,9 +83,11 @@ private abstract static class AbstractConcatSubscriber extends DelayedCancell @Nullable volatile Object mayBeResult = INITIAL; - AbstractConcatSubscriber(final Subscriber target, final Publisher next) { + AbstractConcatSubscriber(final Subscriber target, final Publisher next, + final boolean propagateCancel) { this.target = target; this.next = next; + this.propagateCancel = propagateCancel; } @Override @@ -90,17 +103,66 @@ public final void onSubscribe(final Subscription subscription) { @Override public final void onNext(@Nullable final T t) { + // propagateCancel - if cancel does subscribe to the Publisher there will be no demand propagated upstream + // so we don't have to worry about concurrency or use-after-terminate here. target.onNext(t); } @Override public final void onError(final Throwable t) { - target.onError(t); + if (propagateCancel) { + onErrorPropagateCancel(t); + } else { + target.onError(t); + } + } + + private void onErrorPropagateCancel(Throwable t) { + for (;;) { + final Object oldValue = mayBeResult; + if (oldValue == TERMINAL) { + // Only propagate terminal if we were cancelled after the first source terminated. Otherwise, + // we may deliver items out of order and fail the TCK tests by delivering terminal after cancel. + break; + } else if (mayBeResultUpdater.compareAndSet(this, oldValue, TERMINAL)) { + if (finallyShouldSubscribeToNext(oldValue)) { + forceCancelNextOnSubscribe(); + try { + target.onError(t); + } finally { + next.subscribeInternal(this); + } + } else { + target.onError(t); + } + break; + } + } } @Override public final void onComplete() { - target.onComplete(); + if (propagateCancel) { + onCompletePropagateCancel(); + } else { + target.onComplete(); + } + } + + private void onCompletePropagateCancel() { + for (;;) { + final Object oldValue = mayBeResult; + if (oldValue == TERMINAL) { + // Only propagate terminal if we were cancelled after the first source terminated. Otherwise, + // we may deliver items out of order and fail the TCK tests by delivering terminal after cancel. + break; + } else if (mayBeResultUpdater.compareAndSet(this, oldValue, TERMINAL)) { + // onComplete() can only be called after we subscribe to next Publisher, no need to check if we need + // to subscribe to next. + target.onComplete(); + break; + } + } } @Override @@ -108,8 +170,23 @@ public final void cancel() { // We track cancelled here because we need to make sure if cancel() happens subsequent calls to request(n) // are NOOPs [1]. // [1] https://github.com/reactive-streams/reactive-streams-jvm#3.6 - mayBeResult = CANCELLED; - super.cancel(); + for (;;) { + final Object oldValue = mayBeResult; + if (oldValue == CANCELLED || oldValue == TERMINAL) { + break; + } + final boolean firstCancel = finallyShouldSubscribeToNext(oldValue); + if (mayBeResultUpdater.compareAndSet(this, oldValue, firstCancel ? TERMINAL : CANCELLED)) { + try { + super.cancel(); // call cancel first, so if we do subscribe to next we won't propagate demand. + } finally { + if (propagateCancel && firstCancel) { + next.subscribeInternal(this); + } + } + break; + } + } } /** @@ -120,14 +197,37 @@ final void superCancel() { } final boolean tryEmitSingleSuccessToTarget(@Nullable final T result) { + // If we are in this method cancel() is not allowed to subscribe because that may introduce concurrency on + // target if the Publisher terminates without any demand. try { target.onNext(result); return true; } catch (Throwable cause) { - mayBeResult = CANCELLED; - target.onError(cause); - return false; + return handleOnNextThrowable(cause); + } + } + + private boolean finallyShouldSubscribeToNext(@Nullable Object oldState) { + return oldState != PUBLISHER_SUBSCRIBED; + } + + private boolean handleOnNextThrowable(Throwable cause) { + // Switch state to TERMINAL to prevent any further interaction with target. For example if propagateCancel + // then we will subscribe and the next subscriber may send another terminal without any demand. We don't + // have to explicitly cancel here because the Single has already terminated. + mayBeResult = TERMINAL; + target.onError(cause); + if (propagateCancel) { + forceCancelNextOnSubscribe(); + return true; } + return false; + } + + private void forceCancelNextOnSubscribe() { + // When onSubscribe(Subscription) is called this ensures we don't propagate any demand upstream + // and forces cancel() when onSubscribe is called. + delayedSubscription(EMPTY_SUBSCRIPTION); } } @@ -137,20 +237,25 @@ private static final class ConcatSubscriber extends AbstractConcatSubscriber< */ private static final Object REQUESTED = new Object(); - ConcatSubscriber(final Subscriber target, final Publisher next) { - super(target, next); + ConcatSubscriber(final Subscriber target, final Publisher next, + final boolean propagateCancel) { + super(target, next, propagateCancel); } @Override public void onSuccess(@Nullable final T result) { for (;;) { final Object oldValue = mayBeResult; + assert oldValue != PUBLISHER_SUBSCRIBED; if (oldValue == REQUESTED) { - if (tryEmitSingleSuccessToTarget(result)) { - next.subscribeInternal(this); + if (mayBeResultUpdater.compareAndSet(this, REQUESTED, PUBLISHER_SUBSCRIBED)) { + if (tryEmitSingleSuccessToTarget(result)) { + next.subscribeInternal(this); + } + break; } - break; - } else if (oldValue == CANCELLED || mayBeResultUpdater.compareAndSet(this, INITIAL, result)) { + } else if (oldValue == CANCELLED || oldValue == TERMINAL || + mayBeResultUpdater.compareAndSet(this, INITIAL, result)) { break; } } @@ -160,20 +265,22 @@ public void onSuccess(@Nullable final T result) { public void request(long n) { for (;;) { final Object oldVal = mayBeResult; - if (oldVal == CANCELLED) { + if (oldVal == CANCELLED || oldVal == TERMINAL) { break; - } else if (oldVal == REQUESTED) { + } else if (oldVal == REQUESTED || oldVal == PUBLISHER_SUBSCRIBED) { super.request(n); break; } else if (!isRequestNValid(n)) { - mayBeResult = CANCELLED; - try { - target.onError(newExceptionForInvalidRequestN(n)); - } finally { - superCancel(); + if (mayBeResultUpdater.compareAndSet(this, oldVal, TERMINAL)) { + try { + superCancel(); + } finally { + target.onError(newExceptionForInvalidRequestN(n)); + } + break; } - break; - } else if (mayBeResultUpdater.compareAndSet(this, oldVal, REQUESTED)) { + } else if (mayBeResultUpdater.compareAndSet(this, oldVal, + oldVal != INITIAL ? PUBLISHER_SUBSCRIBED : REQUESTED)) { // We need to ensure that the queued result is delivered in order (first). Upstream demand is // delayed via DelayedSubscription until onSubscribe which preserves ordering, and there are some // scenarios where subscribing to the concat Publisher may block on demand (e.g. @@ -214,14 +321,10 @@ private static final class ConcatDeferNextSubscriber extends AbstractConcatSu * delivered to the target. */ private static final Object SINGLE_DELIVERED = new Object(); - /** - * If more than one item was {@link #request(long) requested}, {@link #onSuccess(Object)} invoked, and we - * subscribed to the next {@link Publisher}. - */ - private static final Object PUBLISHER_SUBSCRIBED = new Object(); - ConcatDeferNextSubscriber(final Subscriber target, final Publisher next) { - super(target, next); + ConcatDeferNextSubscriber(final Subscriber target, final Publisher next, + final boolean propagateCancel) { + super(target, next, propagateCancel); } @Override @@ -232,19 +335,19 @@ public void onSuccess(@Nullable final T result) { assert oldValue != SINGLE_DELIVERED; assert oldValue != PUBLISHER_SUBSCRIBED; - if (oldValue == CANCELLED) { + if (oldValue == CANCELLED || oldValue == TERMINAL) { break; } else if (oldValue == INITIAL) { - if (mayBeResultUpdater.compareAndSet(this, oldValue, result)) { + if (mayBeResultUpdater.compareAndSet(this, INITIAL, result)) { break; } } else if (oldValue == REQUESTED_ONE) { - if (mayBeResultUpdater.compareAndSet(this, oldValue, SINGLE_DELIVERING)) { + if (mayBeResultUpdater.compareAndSet(this, REQUESTED_ONE, SINGLE_DELIVERING)) { emitSingleSuccessToTarget(result); break; } } else if (oldValue == REQUESTED_MORE && - mayBeResultUpdater.compareAndSet(this, oldValue, PUBLISHER_SUBSCRIBED)) { + mayBeResultUpdater.compareAndSet(this, REQUESTED_MORE, PUBLISHER_SUBSCRIBED)) { if (tryEmitSingleSuccessToTarget(result)) { next.subscribeInternal(this); } @@ -257,28 +360,29 @@ public void onSuccess(@Nullable final T result) { public void request(long n) { for (;;) { final Object oldVal = mayBeResult; - if (oldVal == CANCELLED) { + if (oldVal == CANCELLED || oldVal == TERMINAL) { break; } else if (oldVal == PUBLISHER_SUBSCRIBED || oldVal == REQUESTED_MORE) { super.request(n); break; } else if (!isRequestNValid(n)) { - mayBeResult = CANCELLED; - try { - target.onError(newExceptionForInvalidRequestN(n)); - } finally { - superCancel(); + if (mayBeResultUpdater.compareAndSet(this, oldVal, TERMINAL)) { + try { + superCancel(); + } finally { + target.onError(newExceptionForInvalidRequestN(n)); + } + break; } - break; } else if (oldVal == INITIAL) { if (n > 1) { - if (mayBeResultUpdater.compareAndSet(this, oldVal, REQUESTED_MORE)) { + if (mayBeResultUpdater.compareAndSet(this, INITIAL, REQUESTED_MORE)) { super.request(n - 1); break; } } else { assert n == 1; - if (mayBeResultUpdater.compareAndSet(this, oldVal, REQUESTED_ONE)) { + if (mayBeResultUpdater.compareAndSet(this, INITIAL, REQUESTED_ONE)) { break; } } @@ -288,9 +392,12 @@ public void request(long n) { break; } } else if (oldVal == SINGLE_DELIVERED) { - if (mayBeResultUpdater.compareAndSet(this, oldVal, PUBLISHER_SUBSCRIBED)) { - super.request(n); - next.subscribeInternal(this); + if (mayBeResultUpdater.compareAndSet(this, SINGLE_DELIVERED, PUBLISHER_SUBSCRIBED)) { + try { + super.request(n); + } finally { + next.subscribeInternal(this); + } break; } } else if (n > 1) { @@ -298,8 +405,11 @@ public void request(long n) { @SuppressWarnings("unchecked") final T tVal = (T) oldVal; if (tryEmitSingleSuccessToTarget(tVal)) { - super.request(n - 1); - next.subscribeInternal(this); + try { + super.request(n - 1); + } finally { + next.subscribeInternal(this); + } } break; } @@ -320,7 +430,8 @@ private void emitSingleSuccessToTarget(@Nullable final T result) { // more demand appeared while we were delivering the single result next.subscribeInternal(this); } else { - assert mayBeResult == CANCELLED; + final Object oldValue = mayBeResult; + assert oldValue == CANCELLED || oldValue == TERMINAL; } } } diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithPublisherTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithPublisherTest.java index cd5bab845b..1e339620bb 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithPublisherTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithPublisherTest.java @@ -21,13 +21,14 @@ import io.servicetalk.concurrent.api.TestPublisher; import io.servicetalk.concurrent.api.TestSingle; import io.servicetalk.concurrent.api.TestSubscription; +import io.servicetalk.concurrent.internal.DeliberateException; import io.servicetalk.concurrent.test.internal.TestPublisherSubscriber; import org.hamcrest.Matchers; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; import java.util.ArrayList; import java.util.Collection; @@ -40,12 +41,16 @@ import static io.servicetalk.concurrent.api.Publisher.never; import static io.servicetalk.concurrent.api.Single.succeeded; import static io.servicetalk.concurrent.api.SourceAdapters.toSource; +import static io.servicetalk.concurrent.api.single.SingleConcatWithPublisherTest.ConcatMode.CONCAT; +import static io.servicetalk.concurrent.api.single.SingleConcatWithPublisherTest.ConcatMode.DEFER_SUBSCRIBE; +import static io.servicetalk.concurrent.api.single.SingleConcatWithPublisherTest.ConcatMode.PROPAGATE_CANCEL; import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; -import static io.servicetalk.utils.internal.PlatformDependent.throwException; +import static io.servicetalk.utils.internal.ThrowableUtils.throwException; import static java.util.Arrays.asList; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; @@ -59,37 +64,62 @@ class SingleConcatWithPublisherTest { private TestSingle source = new TestSingle.Builder().disableAutoOnSubscribe().build(); private TestPublisher next = new TestPublisher.Builder().disableAutoOnSubscribe().build(); - void setUp(boolean deferSubscribe) { - toSource(source.concat(next, deferSubscribe)).subscribe(subscriber); + enum ConcatMode { + CONCAT, DEFER_SUBSCRIBE, PROPAGATE_CANCEL + } + + void setUp(ConcatMode mode) { + toSource(mode == PROPAGATE_CANCEL ? + source.concatPropagateCancel(next) : + source.concat(next, mode == DEFER_SUBSCRIBE)) + .subscribe(subscriber); source.onSubscribe(cancellable); subscriber.awaitSubscription(); } @SuppressWarnings("unused") private static Stream invalidRequestN() { - return Stream.of(Arguments.of(false, -1), - Arguments.of(false, 0), - Arguments.of(true, -1), - Arguments.of(true, 0)); + return Stream.of( + Arguments.of(CONCAT, -1), + Arguments.of(CONCAT, 0), + Arguments.of(DEFER_SUBSCRIBE, -1), + Arguments.of(DEFER_SUBSCRIBE, 0), + Arguments.of(PROPAGATE_CANCEL, -1), + Arguments.of(PROPAGATE_CANCEL, 0)); + } + + @SuppressWarnings("unused") + private static Stream modeAndError() { + return Stream.of( + Arguments.of(CONCAT, true), + Arguments.of(CONCAT, false), + Arguments.of(DEFER_SUBSCRIBE, true), + Arguments.of(DEFER_SUBSCRIBE, false), + Arguments.of(PROPAGATE_CANCEL, true), + Arguments.of(PROPAGATE_CANCEL, false)); } @SuppressWarnings("unused") private static Collection onNextErrorPropagatedParams() { List args = new ArrayList<>(); - for (boolean deferSubscribe : asList(false, true)) { + for (ConcatMode mode : ConcatMode.values()) { for (long requestN : asList(1, 2)) { for (boolean singleCompletesFirst : asList(false, true)) { - args.add(Arguments.of(deferSubscribe, requestN, singleCompletesFirst)); + args.add(Arguments.of(mode, requestN, singleCompletesFirst)); } } } return args; } - @ParameterizedTest(name = "deferSubscribe={0} requestN={1} singleCompletesFirst={2}") + @ParameterizedTest(name = "mode={0} requestN={2} singleCompletesFirst={3}") @MethodSource("onNextErrorPropagatedParams") - void onNextErrorPropagated(boolean deferSubscribe, long n, boolean singleCompletesFirst) { - toSource(source.concat(next, deferSubscribe).map(x -> { + void onNextErrorPropagated(ConcatMode mode, long n, boolean singleCompletesFirst) + throws Exception { + toSource((mode == PROPAGATE_CANCEL ? + source.concatPropagateCancel(next) : + source.concat(next, mode == DEFER_SUBSCRIBE)) + .map(x -> { throw DELIBERATE_EXCEPTION; })).subscribe(subscriber); source.onSubscribe(cancellable); @@ -102,14 +132,20 @@ void onNextErrorPropagated(boolean deferSubscribe, long n, boolean singleComplet source.onSuccess(1); } assertThat(subscriber.awaitOnError(), is(DELIBERATE_EXCEPTION)); - assertThat(next.isSubscribed(), is(false)); + if (mode == PROPAGATE_CANCEL) { + next.awaitSubscribed(); + next.onSubscribe(subscription); + subscription.awaitCancelled(); + } else { + assertThat(next.isSubscribed(), is(false)); + } } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void bothCompletion(boolean deferSubscribe) { - setUp(deferSubscribe); - long requested = triggerNextSubscribe(deferSubscribe); + @ParameterizedTest(name = "mode={0}") + @EnumSource(ConcatMode.class) + void bothCompletion(ConcatMode mode) { + setUp(mode); + long requested = triggerNextSubscribe(mode); assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); subscriber.awaitSubscription().request(2); assertThat("Unexpected items requested.", subscription.requested(), is(requested - 1 + 2)); @@ -119,124 +155,181 @@ void bothCompletion(boolean deferSubscribe) { subscriber.awaitOnComplete(); } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void sourceCompletionNextError(boolean deferSubscribe) { - setUp(deferSubscribe); - triggerNextSubscribe(deferSubscribe); + @ParameterizedTest(name = "mode={0}") + @EnumSource(ConcatMode.class) + void sourceCompletionNextError(ConcatMode mode) { + setUp(mode); + triggerNextSubscribe(mode); assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); next.onError(DELIBERATE_EXCEPTION); assertThat(subscriber.takeOnNext(), is(1)); assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); } - @ParameterizedTest(name = "deferSubscribe={0} invalidRequestN={1}") + @ParameterizedTest(name = "mode={0} invalidRequestN={1}") @MethodSource("invalidRequestN") - void invalidRequestNBeforeNextSubscribe(boolean deferSubscribe, long invalidRequestN) { - setUp(deferSubscribe); + void invalidRequestNBeforeNextSubscribe(ConcatMode mode, long invalidRequestN) { + setUp(mode); subscriber.awaitSubscription().request(invalidRequestN); source.onSuccess(1); assertThat(subscriber.awaitOnError(), instanceOf(IllegalArgumentException.class)); } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void invalidRequestNWithInlineSourceCompletion(boolean deferSubscribe) { - toSource(succeeded(1).concat(empty(), deferSubscribe)).subscribe(subscriber); + @ParameterizedTest(name = "mode={0}") + @EnumSource(ConcatMode.class) + void invalidRequestNWithInlineSourceCompletion(ConcatMode mode) { + toSource(mode == PROPAGATE_CANCEL ? + succeeded(1).concatPropagateCancel(empty()) : + succeeded(1).concat(empty(), mode == DEFER_SUBSCRIBE)).subscribe(subscriber); subscriber.awaitSubscription().request(-1); assertThat(subscriber.awaitOnError(), instanceOf(IllegalArgumentException.class)); } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void invalidRequestAfterNextSubscribe(boolean deferSubscribe) { - setUp(deferSubscribe); - triggerNextSubscribe(deferSubscribe); + @ParameterizedTest(name = "mode={0}") + @EnumSource(ConcatMode.class) + void invalidRequestAfterNextSubscribe(ConcatMode mode) { + setUp(mode); + triggerNextSubscribe(mode); subscriber.awaitSubscription().request(-1); assertThat("Invalid request-n not propagated.", subscription.requested(), is(lessThan(0L))); } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void multipleInvalidRequest(boolean deferSubscribe) { - setUp(deferSubscribe); + @ParameterizedTest(name = "mode={0}") + @EnumSource(ConcatMode.class) + void multipleInvalidRequest(ConcatMode mode) { + setUp(mode); subscriber.awaitSubscription().request(-1); subscriber.awaitSubscription().request(-10); assertThat(subscriber.awaitOnError(), instanceOf(IllegalArgumentException.class)); } - @ParameterizedTest(name = "deferSubscribe={0} invalidRequestN={1}") + @ParameterizedTest(name = "mode={0} invalidRequestN={1}") @MethodSource("invalidRequestN") - void invalidThenValidRequest(boolean deferSubscribe, long invalidRequestN) { - setUp(deferSubscribe); + void invalidThenValidRequest(ConcatMode mode, long invalidRequestN) { + setUp(mode); subscriber.awaitSubscription().request(invalidRequestN); subscriber.awaitSubscription().request(1); assertThat(subscriber.awaitOnError(), instanceOf(IllegalArgumentException.class)); assertThat(cancellable.isCancelled(), is(true)); } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void request0PropagatedAfterSuccess(boolean deferSubscribe) { - setUp(deferSubscribe); + @ParameterizedTest(name = "mode={0}") + @EnumSource(ConcatMode.class) + void request0PropagatedAfterSuccess(ConcatMode mode) { + setUp(mode); source.onSuccess(1); - subscriber.awaitSubscription().request(deferSubscribe ? 2 : 1); // get the success from the Single + subscriber.awaitSubscription().request(mode == DEFER_SUBSCRIBE ? 2 : 1); // get the success from the Single assertThat("Next source not subscribed.", next.isSubscribed(), is(true)); next.onSubscribe(subscription); - assertThat(subscription.requested(), is(deferSubscribe ? 1L : 0L)); + assertThat(subscription.requested(), is(mode == DEFER_SUBSCRIBE ? 1L : 0L)); subscriber.awaitSubscription().request(0); assertThat("Invalid request-n not propagated " + subscription, subscription.requestedEquals(0), is(true)); } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void sourceError(boolean deferSubscribe) { - setUp(deferSubscribe); + @ParameterizedTest(name = "mode={0} error={1}") + @MethodSource("modeAndError") + void sourceError(ConcatMode mode, boolean error) throws InterruptedException { + setUp(mode); source.onError(DELIBERATE_EXCEPTION); - assertThat("Unexpected subscriber termination.", subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); - assertThat("Next source subscribed unexpectedly.", next.isSubscribed(), is(false)); + assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); + if (mode == PROPAGATE_CANCEL) { + next.awaitSubscribed(); + next.onSubscribe(subscription); + subscription.awaitCancelled(); + + // Test that no duplicate terminal events are delivered. + if (error) { + next.onError(new DeliberateException()); + } else { + next.onComplete(); + } + } else { + assertThat(next.isSubscribed(), is(false)); + } } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void cancelSource(boolean deferSubscribe) { - setUp(deferSubscribe); + @ParameterizedTest(name = "mode={0} error={1}") + @MethodSource("modeAndError") + void cancelSource(ConcatMode mode, boolean error) throws InterruptedException { + setUp(mode); assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); - subscriber.awaitSubscription().cancel(); + Subscription subscription1 = subscriber.awaitSubscription(); + subscription1.request(2); + subscription1.cancel(); assertThat("Original single not cancelled.", cancellable.isCancelled(), is(true)); - assertThat("Next source subscribed unexpectedly.", next.isSubscribed(), is(false)); + + if (error) { + source.onError(DELIBERATE_EXCEPTION); + } else { + source.onSuccess(1); + } + + if (mode == PROPAGATE_CANCEL) { + next.awaitSubscribed(); + next.onSubscribe(subscription); + subscription.awaitCancelled(); + + if (error) { + next.onError(new DeliberateException()); + } else { + next.onComplete(); + } + + // It is not required that no terminal is delivered after cancel but verifies the current implementation for + // thread safety on the subscriber and to avoid duplicate terminals. + assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); + } else { + assertThat(next.isSubscribed(), is(false)); + + if (error) { + assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); + } else { + // It is not required that no terminal is delivered after cancel but verifies the current implementation + // for thread safety on the subscriber and to avoid duplicate terminals. + assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); + } + } } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void cancelSourcePostRequest(boolean deferSubscribe) { - setUp(deferSubscribe); + @ParameterizedTest(name = "mode={0}") + @EnumSource(ConcatMode.class) + void cancelSourcePostRequest(ConcatMode mode) { + setUp(mode); assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); subscriber.awaitSubscription().request(1); subscriber.awaitSubscription().cancel(); assertThat("Original single not cancelled.", cancellable.isCancelled(), is(true)); - assertThat("Next source subscribed unexpectedly.", next.isSubscribed(), is(false)); + assertThat("Next source subscribed unexpectedly.", next.isSubscribed(), is(mode == PROPAGATE_CANCEL)); } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void cancelNext(boolean deferSubscribe) { - setUp(deferSubscribe); - triggerNextSubscribe(deferSubscribe); + @ParameterizedTest(name = "mode={0} error={1}") + @MethodSource("modeAndError") + void cancelNext(ConcatMode mode, boolean error) { + setUp(mode); + triggerNextSubscribe(mode); assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); subscriber.awaitSubscription().cancel(); assertThat("Original single cancelled unexpectedly.", cancellable.isCancelled(), is(false)); assertThat("Next source not cancelled.", subscription.isCancelled(), is(true)); + + assertThat(subscriber.takeOnNext(), equalTo(1)); + if (error) { + next.onError(DELIBERATE_EXCEPTION); + assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); + } else { + next.onComplete(); + subscriber.awaitOnComplete(); + } } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void zeroIsNotRequestedOnTransitionToSubscription(boolean deferSubscribe) { - setUp(deferSubscribe); + @ParameterizedTest(name = "mode={0}") + @EnumSource(ConcatMode.class) + void zeroIsNotRequestedOnTransitionToSubscription(ConcatMode mode) { + setUp(mode); subscriber.awaitSubscription().request(1); source.onSuccess(1); - if (deferSubscribe) { + if (mode == DEFER_SUBSCRIBE) { assertThat(next.isSubscribed(), is(false)); subscriber.awaitSubscription().request(1); assertThat(next.isSubscribed(), is(true)); @@ -249,14 +342,14 @@ void zeroIsNotRequestedOnTransitionToSubscription(boolean deferSubscribe) { } } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void publisherSubscribeBlockDemandMakesProgress(boolean deferSubscribe) { + @ParameterizedTest(name = "mode={0}") + @EnumSource(ConcatMode.class) + void publisherSubscribeBlockDemandMakesProgress(ConcatMode mode) { source = new TestSingle<>(); next = new TestPublisher.Builder().build(sub1 -> { sub1.onSubscribe(subscription); try { - // Simulate the a blocking operation on demand, like ConnectablePayloadWriter. + // Simulate a blocking operation on demand, like ConnectablePayloadWriter. subscription.awaitRequestN(1); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -264,7 +357,7 @@ void publisherSubscribeBlockDemandMakesProgress(boolean deferSubscribe) { } return sub1; }); - setUp(deferSubscribe); + setUp(mode); source.onSuccess(10); // Give at least 2 demand so there is enough to unblock the awaitRequestN and deliver data below. @@ -276,10 +369,10 @@ void publisherSubscribeBlockDemandMakesProgress(boolean deferSubscribe) { subscriber.awaitOnComplete(); } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void onErrorAfterInvalidRequestN(boolean deferSubscribe) { - setUp(deferSubscribe); + @ParameterizedTest(name = "mode={0}") + @EnumSource(ConcatMode.class) + void onErrorAfterInvalidRequestN(ConcatMode mode) { + setUp(mode); source.onSuccess(1); subscriber.awaitSubscription().request(2L); assertThat("Unexpected next element.", subscriber.takeOnNext(), is(1)); @@ -297,10 +390,10 @@ void onErrorAfterInvalidRequestN(boolean deferSubscribe) { subscriber.awaitOnError(), instanceOf(IllegalArgumentException.class)); } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void singleCompletesWithNull(boolean deferSubscribe) { - setUp(deferSubscribe); + @ParameterizedTest(name = "mode={0}") + @EnumSource(ConcatMode.class) + void singleCompletesWithNull(ConcatMode mode) { + setUp(mode); source.onSuccess(null); subscriber.awaitSubscription().request(2); assertThat("Next source not subscribed.", next.isSubscribed(), is(true)); @@ -310,10 +403,10 @@ void singleCompletesWithNull(boolean deferSubscribe) { subscriber.awaitOnComplete(); } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void demandAccumulatedBeforeSingleCompletes(boolean deferSubscribe) { - setUp(deferSubscribe); + @ParameterizedTest(name = "mode={0}") + @EnumSource(ConcatMode.class) + void demandAccumulatedBeforeSingleCompletes(ConcatMode mode) { + setUp(mode); subscriber.awaitSubscription().request(3L); assertThat("Next source subscribed unexpectedly.", next.isSubscribed(), is(false)); assertThat(subscription.requested(), is(0L)); @@ -330,10 +423,10 @@ void demandAccumulatedBeforeSingleCompletes(boolean deferSubscribe) { subscriber.awaitOnComplete(); } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void requestOneThenMore(boolean deferSubscribe) { - setUp(deferSubscribe); + @ParameterizedTest(name = "mode={0}") + @EnumSource(ConcatMode.class) + void requestOneThenMore(ConcatMode mode) { + setUp(mode); subscriber.awaitSubscription().request(1L); subscriber.awaitSubscription().request(1L); assertThat(subscription.requested(), is(0L)); @@ -348,13 +441,15 @@ void requestOneThenMore(boolean deferSubscribe) { subscriber.awaitOnComplete(); } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void reentryWithMoreDemand(boolean deferSubscribe) { + @ParameterizedTest(name = "mode={0}") + @EnumSource(ConcatMode.class) + void reentryWithMoreDemand(ConcatMode mode) { List emitted = new ArrayList<>(); boolean[] completed = {false}; - toSource(succeeded(1).concat(from(2), deferSubscribe)).subscribe(new Subscriber() { - + toSource(mode == PROPAGATE_CANCEL ? + succeeded(1).concatPropagateCancel(from(2)) : + succeeded(1).concat(from(2), mode == DEFER_SUBSCRIBE) + ).subscribe(new Subscriber() { @Nullable private Subscription subscription; @@ -385,13 +480,14 @@ public void onComplete() { assertThat(completed[0], is(true)); } - @ParameterizedTest(name = "deferSubscribe={0}") - @ValueSource(booleans = {false, true}) - void cancelledDuringFirstOnNext(boolean deferSubscribe) { + @ParameterizedTest(name = "mode={0}") + @EnumSource(ConcatMode.class) + void cancelledDuringFirstOnNext(ConcatMode mode) { List emitted = new ArrayList<>(); boolean[] terminated = {false}; - toSource(succeeded(1).concat(never(), deferSubscribe)).subscribe(new Subscriber() { - + toSource(mode == PROPAGATE_CANCEL ? + succeeded(1).concatPropagateCancel(never()) : + succeeded(1).concat(never(), mode == DEFER_SUBSCRIBE)).subscribe(new Subscriber() { @Nullable private Subscription subscription; @@ -423,10 +519,11 @@ public void onComplete() { assertThat(terminated[0], is(false)); } - private long triggerNextSubscribe(boolean deferSubscribe) { - final long n = deferSubscribe ? 2 : 1; + private long triggerNextSubscribe(ConcatMode mode) { + final long n = mode == DEFER_SUBSCRIBE ? 2 : 1; subscriber.awaitSubscription().request(n); source.onSuccess(1); + next.awaitSubscribed(); next.onSubscribe(subscription); return n; } diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/SingleConcatWithPublisherPropagateCancelTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/SingleConcatWithPublisherPropagateCancelTckTest.java new file mode 100644 index 0000000000..aea2de3044 --- /dev/null +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/SingleConcatWithPublisherPropagateCancelTckTest.java @@ -0,0 +1,27 @@ +/* + * Copyright © 2022 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.reactivestreams.tck; + +import org.testng.annotations.Test; + +@Test +public class SingleConcatWithPublisherPropagateCancelTckTest extends SingleConcatWithPublisherTckTest { + + @Override + boolean propagateCancel() { + return true; + } +} diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/SingleConcatWithPublisherTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/SingleConcatWithPublisherTckTest.java index b37e5cac5d..41adbee4c5 100644 --- a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/SingleConcatWithPublisherTckTest.java +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/SingleConcatWithPublisherTckTest.java @@ -20,6 +20,8 @@ import org.testng.annotations.Test; +import static io.servicetalk.concurrent.reactivestreams.tck.TckUtils.newPublisher; + @Test public class SingleConcatWithPublisherTckTest extends AbstractSingleTckTest { @@ -27,13 +29,18 @@ boolean deferSubscribe() { return false; } + boolean propagateCancel() { + return false; + } + @Override protected Publisher createServiceTalkPublisher(long elements) { if (elements < 2) { return Single.succeeded(1).toPublisher(); } - return Single.succeeded(1) - .concat(TckUtils.newPublisher(TckUtils.requestNToInt(elements) - 1), deferSubscribe()); + Single s = Single.succeeded(1); + Publisher p = newPublisher(TckUtils.requestNToInt(elements) - 1); + return propagateCancel() ? s.concatPropagateCancel(p) : s.concat(p, deferSubscribe()); } @Override