Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single#concatPropagateCancel(Publisher) to force cancel propagation #2381

Merged
merged 2 commits into from
Oct 3, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ public final Single<T> concat(Completable next) {
* @see #concat(Publisher, boolean)
*/
public final Publisher<T> concat(Publisher<? extends T> next) {
return new SingleConcatWithPublisher<>(this, next, false);
return concat(next, false);
}

/**
Expand All @@ -793,7 +793,29 @@ public final Publisher<T> concat(Publisher<? extends T> next) {
* all elements from {@code next} {@link Publisher}.
*/
public final Publisher<T> concat(Publisher<? extends T> next, boolean deferSubscribe) {
return new SingleConcatWithPublisher<>(this, next, deferSubscribe);
return new SingleConcatWithPublisher<>(this, next, deferSubscribe, false);
}

/**
* Returns a {@link Publisher} that first emits the result of this {@link Single} and then subscribes and emits all
* elements from {@code next} {@link Publisher}. Any error emitted by this {@link Single} or {@code next}
* {@link Publisher} is forwarded to the returned {@link Publisher}.
* <p>
* This method provides a means to sequence the execution of two asynchronous sources and in sequential programming
* is similar to:
* <pre>{@code
* List<T> results = new ...;
* results.add(resultOfThisSingle());
* results.addAll(nextStream());
* return results;
* }</pre>
* @param next {@link Publisher} to concat. will be subscribed to and cancelled if this {@link Publisher} is
* cancelled or terminates with {@link Subscriber#onError(Throwable)}.
* @return New {@link Publisher} that first emits the result of this {@link Single} and then subscribes and emits
* all elements from {@code next} {@link Publisher}.
*/
public final Publisher<T> concatPropagateCancel(Publisher<? extends T> next) {
return new SingleConcatWithPublisher<>(this, next, false, true);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.internal.EmptySubscriptions.EMPTY_SUBSCRIPTION;
import static io.servicetalk.concurrent.internal.SubscriberUtils.isRequestNValid;
import static io.servicetalk.concurrent.internal.SubscriberUtils.newExceptionForInvalidRequestN;
import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater;
Expand All @@ -31,19 +32,21 @@ final class SingleConcatWithPublisher<T> extends AbstractNoHandleSubscribePublis
private final Single<? extends T> original;
private final Publisher<? extends T> next;
private final boolean deferSubscribe;
private final boolean propagateCancel;

SingleConcatWithPublisher(final Single<? extends T> original, final Publisher<? extends T> next,
final boolean deferSubscribe) {
final boolean deferSubscribe, final boolean propagateCancel) {
this.original = original;
this.next = Objects.requireNonNull(next, "next");
this.deferSubscribe = deferSubscribe;
this.propagateCancel = propagateCancel;
}

@Override
void handleSubscribe(final Subscriber<? super T> subscriber,
final ContextMap contextMap, final AsyncContextProvider contextProvider) {
original.delegateSubscribe(deferSubscribe ? new ConcatDeferNextSubscriber<>(subscriber, next) :
new ConcatSubscriber<>(subscriber, next), contextMap, contextProvider);
original.delegateSubscribe(deferSubscribe ? new ConcatDeferNextSubscriber<>(subscriber, next, propagateCancel) :
new ConcatSubscriber<>(subscriber, next, propagateCancel), contextMap, contextProvider);
}

private abstract static class AbstractConcatSubscriber<T> extends DelayedCancellableThenSubscription
Expand All @@ -63,6 +66,7 @@ private abstract static class AbstractConcatSubscriber<T> extends DelayedCancell
static final AtomicReferenceFieldUpdater<AbstractConcatSubscriber, Object> mayBeResultUpdater =
newUpdater(AbstractConcatSubscriber.class, Object.class, "mayBeResult");

private final boolean propagateCancel;
final Subscriber<? super T> target;
final Publisher<? extends T> next;

Expand All @@ -72,9 +76,11 @@ private abstract static class AbstractConcatSubscriber<T> extends DelayedCancell
@Nullable
volatile Object mayBeResult = INITIAL;

AbstractConcatSubscriber(final Subscriber<? super T> target, final Publisher<? extends T> next) {
AbstractConcatSubscriber(final Subscriber<? super T> target, final Publisher<? extends T> next,
final boolean propagateCancel) {
this.target = target;
this.next = next;
this.propagateCancel = propagateCancel;
}

@Override
Expand All @@ -90,28 +96,94 @@ public final void onSubscribe(final Subscription subscription) {

@Override
public final void onNext(@Nullable final T t) {
// propagateCancel - if cancel does subscribe to the Publisher there will be no demand propagated upstream
// so we don't have to worry about concurrency or use-after-terminate here.
target.onNext(t);
}

@Override
public final void onError(final Throwable t) {
target.onError(t);
if (propagateCancel) {
onErrorPropagateCancel(t);
} else {
target.onError(t);
}
}

private void onErrorPropagateCancel(Throwable t) {
for (;;) {
final Object oldValue = mayBeResult;
if (oldValue == CANCELLED) {
// CANCELLED means downstream doesn't care about being notified and internal state tracking also
// uses this state if terminal has been propagated, so avoid duplicate terminal. We may
// subscribe to both sources in parallel if cancellation occurs, so allowing terminal to
// propagate would mean ordering and concurrency needs to be accounted for between Single and
// Publisher, because cancel allows for no more future delivery we avoid future invocation of
// the target subscriber.
break;
} else if (finallyShouldSubscribeToNext(oldValue)) {
if (mayBeResultUpdater.compareAndSet(this, oldValue, CANCELLED)) {
forceCancelNextOnSubscribe();
try {
target.onError(t);
} finally {
next.subscribeInternal(this);
}
break;
}
} else if (mayBeResultUpdater.compareAndSet(this, oldValue, CANCELLED)) {
target.onError(t);
break;
}
}
}

@Override
public final void onComplete() {
target.onComplete();
if (propagateCancel) {
onCompletePropagateCancel();
} else {
target.onComplete();
}
}

private void onCompletePropagateCancel() {
for (;;) {
final Object oldValue = mayBeResult;
if (oldValue == CANCELLED) {
// CANCELLED means downstream doesn't care about being notified and internal state tracking also
// uses this state if terminal has been propagated, so avoid duplicate terminal. We may
// subscribe to both sources in parallel if cancellation occurs, so allowing terminal to
// propagate would mean ordering and concurrency needs to be accounted for between Single and
// Publisher, because cancel allows for no more future delivery we avoid future invocation of
// the target subscriber.
break;
} else if (mayBeResultUpdater.compareAndSet(this, oldValue, CANCELLED)) {
// onComplete() can only be called after we subscribe to next Publisher, no need to check if we need
// to subscribe to next.
target.onComplete();
break;
}
}
}

@Override
public final void cancel() {
// We track cancelled here because we need to make sure if cancel() happens subsequent calls to request(n)
// are NOOPs [1].
// [1] https://github.com/reactive-streams/reactive-streams-jvm#3.6
mayBeResult = CANCELLED;
super.cancel();
final Object oldValue = mayBeResultUpdater.getAndSet(this, CANCELLED);
try {
super.cancel(); // call cancel first, so if we do subscribe to next we won't propagate demand.
} finally {
if (propagateCancel && oldValue != CANCELLED && finallyShouldSubscribeToNext(oldValue)) {
next.subscribeInternal(this);
}
}
}

abstract boolean finallyShouldSubscribeToNext(@Nullable Object oldState);

/**
* Helper method to invoke {@link DelayedCancellableThenSubscription#cancel()} from subclasses.
*/
Expand All @@ -120,36 +192,65 @@ final void superCancel() {
}

final boolean tryEmitSingleSuccessToTarget(@Nullable final T result) {
// If we are in this method cancel() is not allowed to subscribe because that may introduce concurrency on
// target if the Publisher terminates without any demand.
try {
target.onNext(result);
return true;
} catch (Throwable cause) {
mayBeResult = CANCELLED;
target.onError(cause);
return false;
return handleOnNextThrowable(cause);
}
}

private boolean handleOnNextThrowable(Throwable cause) {
// Switch state to CANCELLED to prevent any further interaction with target. For example if propagateCancel
// then we will subscribe and the next subscriber may send another terminal without any demand. We don't
// have to explicitly cancel here because the Single has already terminated.
mayBeResult = CANCELLED;
target.onError(cause);
if (propagateCancel) {
forceCancelNextOnSubscribe();
return true;
}
return false;
}

private void forceCancelNextOnSubscribe() {
// When onSubscribe(Subscription) is called this ensures we don't propagate any demand upstream
// and forces cancel() when onSubscribe is called.
delayedSubscription(EMPTY_SUBSCRIPTION);
}
}

private static final class ConcatSubscriber<T> extends AbstractConcatSubscriber<T> {
/**
* If {@link #request(long)} (with a valid n) invoked before {@link #onSuccess(Object)}.
*/
private static final Object REQUESTED = new Object();
private static final Object REQUESTED_SUBSCRIBED = new Object();

ConcatSubscriber(final Subscriber<? super T> target, final Publisher<? extends T> next,
final boolean propagateCancel) {
super(target, next, propagateCancel);
}

ConcatSubscriber(final Subscriber<? super T> target, final Publisher<? extends T> next) {
super(target, next);
@Override
boolean finallyShouldSubscribeToNext(@Nullable final Object oldState) {
return oldState != REQUESTED_SUBSCRIBED;
}

@Override
public void onSuccess(@Nullable final T result) {
for (;;) {
final Object oldValue = mayBeResult;
assert oldValue != REQUESTED_SUBSCRIBED;
if (oldValue == REQUESTED) {
if (tryEmitSingleSuccessToTarget(result)) {
next.subscribeInternal(this);
if (mayBeResultUpdater.compareAndSet(this, REQUESTED, REQUESTED_SUBSCRIBED)) {
if (tryEmitSingleSuccessToTarget(result)) {
next.subscribeInternal(this);
}
break;
}
break;
} else if (oldValue == CANCELLED || mayBeResultUpdater.compareAndSet(this, INITIAL, result)) {
break;
}
Expand All @@ -162,18 +263,20 @@ public void request(long n) {
final Object oldVal = mayBeResult;
if (oldVal == CANCELLED) {
break;
} else if (oldVal == REQUESTED) {
} else if (oldVal == REQUESTED || oldVal == REQUESTED_SUBSCRIBED) {
super.request(n);
break;
} else if (!isRequestNValid(n)) {
mayBeResult = CANCELLED;
try {
target.onError(newExceptionForInvalidRequestN(n));
} finally {
superCancel();
if (mayBeResultUpdater.compareAndSet(this, oldVal, CANCELLED)) {
try {
superCancel();
} finally {
target.onError(newExceptionForInvalidRequestN(n));
}
break;
}
break;
} else if (mayBeResultUpdater.compareAndSet(this, oldVal, REQUESTED)) {
} else if (mayBeResultUpdater.compareAndSet(this, oldVal,
oldVal != INITIAL ? REQUESTED_SUBSCRIBED : REQUESTED)) {
// We need to ensure that the queued result is delivered in order (first). Upstream demand is
// delayed via DelayedSubscription until onSubscribe which preserves ordering, and there are some
// scenarios where subscribing to the concat Publisher may block on demand (e.g.
Expand Down Expand Up @@ -220,8 +323,14 @@ private static final class ConcatDeferNextSubscriber<T> extends AbstractConcatSu
*/
private static final Object PUBLISHER_SUBSCRIBED = new Object();

ConcatDeferNextSubscriber(final Subscriber<? super T> target, final Publisher<? extends T> next) {
super(target, next);
ConcatDeferNextSubscriber(final Subscriber<? super T> target, final Publisher<? extends T> next,
final boolean propagateCancel) {
super(target, next, propagateCancel);
}

@Override
boolean finallyShouldSubscribeToNext(@Nullable final Object oldState) {
return oldState != PUBLISHER_SUBSCRIBED;
}

@Override
Expand All @@ -235,16 +344,16 @@ public void onSuccess(@Nullable final T result) {
if (oldValue == CANCELLED) {
break;
} else if (oldValue == INITIAL) {
if (mayBeResultUpdater.compareAndSet(this, oldValue, result)) {
if (mayBeResultUpdater.compareAndSet(this, INITIAL, result)) {
break;
}
} else if (oldValue == REQUESTED_ONE) {
if (mayBeResultUpdater.compareAndSet(this, oldValue, SINGLE_DELIVERING)) {
if (mayBeResultUpdater.compareAndSet(this, REQUESTED_ONE, SINGLE_DELIVERING)) {
emitSingleSuccessToTarget(result);
break;
}
} else if (oldValue == REQUESTED_MORE &&
mayBeResultUpdater.compareAndSet(this, oldValue, PUBLISHER_SUBSCRIBED)) {
mayBeResultUpdater.compareAndSet(this, REQUESTED_MORE, PUBLISHER_SUBSCRIBED)) {
if (tryEmitSingleSuccessToTarget(result)) {
next.subscribeInternal(this);
}
Expand All @@ -263,22 +372,23 @@ public void request(long n) {
super.request(n);
break;
} else if (!isRequestNValid(n)) {
mayBeResult = CANCELLED;
try {
target.onError(newExceptionForInvalidRequestN(n));
} finally {
superCancel();
if (mayBeResultUpdater.compareAndSet(this, oldVal, CANCELLED)) {
try {
superCancel();
} finally {
target.onError(newExceptionForInvalidRequestN(n));
}
break;
}
break;
} else if (oldVal == INITIAL) {
if (n > 1) {
if (mayBeResultUpdater.compareAndSet(this, oldVal, REQUESTED_MORE)) {
if (mayBeResultUpdater.compareAndSet(this, INITIAL, REQUESTED_MORE)) {
super.request(n - 1);
break;
}
} else {
assert n == 1;
if (mayBeResultUpdater.compareAndSet(this, oldVal, REQUESTED_ONE)) {
if (mayBeResultUpdater.compareAndSet(this, INITIAL, REQUESTED_ONE)) {
break;
}
}
Expand All @@ -288,18 +398,24 @@ public void request(long n) {
break;
}
} else if (oldVal == SINGLE_DELIVERED) {
if (mayBeResultUpdater.compareAndSet(this, oldVal, PUBLISHER_SUBSCRIBED)) {
super.request(n);
next.subscribeInternal(this);
if (mayBeResultUpdater.compareAndSet(this, SINGLE_DELIVERED, PUBLISHER_SUBSCRIBED)) {
try {
super.request(n);
} finally {
next.subscribeInternal(this);
}
break;
}
} else if (n > 1) {
if (mayBeResultUpdater.compareAndSet(this, oldVal, PUBLISHER_SUBSCRIBED)) {
@SuppressWarnings("unchecked")
final T tVal = (T) oldVal;
if (tryEmitSingleSuccessToTarget(tVal)) {
super.request(n - 1);
next.subscribeInternal(this);
try {
super.request(n - 1);
} finally {
next.subscribeInternal(this);
}
}
break;
}
Expand Down
Loading