Skip to content

Commit

Permalink
Publisher#flatMapMerge allow terminal propagation after invalid demand (
Browse files Browse the repository at this point in the history
#2348)

* Publisher#flatMapMerge allow terminal propagation after invalid demand

Motivation:
Publisher#flatMapMerge has internal state that tracks demand. This state
is used to drive the internal state machine and prevents invalid demand
to avoid internal state being corrupted. However this code didn't always
allow for the subsequent terminal (e.g. likely onError) to be propagated
downstream which may result in a hang.

Modifications:
- FlatMapPublisherSubscriber demand tracking shouldn't prevent terminal
  if there is too many items delivered.
- Remove conditionals that are unnecessary for needsDemand because the
  call sites know if demand is required.
  • Loading branch information
Scottmitch authored Sep 12, 2022
1 parent ba7e826 commit 2928a29
Show file tree
Hide file tree
Showing 7 changed files with 356 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.internal.ArrayUtils;
import io.servicetalk.concurrent.internal.DelayedCancellable;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.concurrent.internal.TerminalNotification;
import io.servicetalk.context.api.ContextMap;

Expand Down Expand Up @@ -293,7 +294,7 @@ public void onSuccess(@Nullable final T result) {
break;
}
} else {
logDuplicateTerminal(this);
SubscriberUtils.logDuplicateTerminalOnSuccess(this, result);
break;
}
}
Expand All @@ -318,7 +319,7 @@ public void onError(final Throwable t) {
break;
}
} else {
logDuplicateTerminal(this);
logDuplicateTerminal(this, t);
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;

final class CompositeExceptionUtils {
/**
* Default to {@code 1} so {@link Throwable#addSuppressed(Throwable)} will not be used by default.
Expand All @@ -35,7 +33,8 @@ static <T> void addPendingError(AtomicIntegerFieldUpdater<T> updater, T owner, i
if (newSize < 0) {
updater.set(owner, Integer.MAX_VALUE);
} else if (newSize < maxDelayedErrors && original != causeToAdd) {
addSuppressed(original, causeToAdd);
// We ensure original is not equal to causeToAdd, safe to add suppressed.
original.addSuppressed(causeToAdd);
} else {
updater.decrementAndGet(owner);
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,13 @@ private void onError0(Throwable throwable, boolean cancelUpstream) {
}
}

/**
* Process an error while holding the {@link #emitting} lock. We cannot re-throw here because sources have
* already terminated. This means the source cannot deliver another terminal or else it will violate the
* reactive stream spec. The best we can do is cancel upstream and mapped subscribers, and propagate the error
* downstream.
* @param cause The cause that occurred while delivering signals down stream.
*/
private void onErrorHoldingLock(Throwable cause) {
try {
doCancel(true);
Expand Down Expand Up @@ -372,17 +379,31 @@ public void onSubscribe(Cancellable singleCancellable) {

@Override
public void onSuccess(@Nullable R result) {
if (singleCancellable == null) {
SubscriberUtils.logDuplicateTerminalOnSuccess(this, result);
return;
}
cancellableSet.remove(singleCancellable);
singleCancellable = null;

// First enqueue the result and then decrement active count. Since onComplete() checks for active count,
// if we decrement count before enqueuing, onComplete() may emit the terminal event without emitting
// the result.
tryEmitItem(wrapNull(result));
if (onSingleTerminated()) {
if (decrementActiveMappedSources()) {
enqueueAndDrain(complete());
}
}

@Override
public void onError(Throwable t) {
if (singleCancellable == null) {
SubscriberUtils.logDuplicateTerminal(this, t);
return;
}
cancellableSet.remove(singleCancellable);
singleCancellable = null;

Throwable currPendingError = pendingError;
if (source.maxDelayedErrors == 0) {
if (currPendingError == null &&
Expand All @@ -403,24 +424,14 @@ public void onError(Throwable t) {
addPendingError(pendingErrorCountUpdater, FlatMapSubscriber.this, source.maxDelayedErrors,
currPendingError, t);
}
if (onSingleTerminated()) {
if (decrementActiveMappedSources()) {
enqueueAndDrain(error(currPendingError));
} else {
// Queueing/draining may result in requestN more data.
tryEmitItem(SINGLE_ERROR);
}
}
}

private boolean onSingleTerminated() {
if (singleCancellable == null) {
SubscriberUtils.logDuplicateTerminal(this);
return false;
}
cancellableSet.remove(singleCancellable);
singleCancellable = null;
return decrementActiveMappedSources();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.ArrayDeque;
Expand All @@ -40,8 +41,10 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.stream.IntStream;
import javax.annotation.Nullable;

Expand All @@ -56,7 +59,7 @@
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
import static io.servicetalk.concurrent.internal.EmptySubscriptions.EMPTY_SUBSCRIPTION;
import static io.servicetalk.utils.internal.PlatformDependent.throwException;
import static io.servicetalk.utils.internal.ThrowableUtils.throwException;
import static java.lang.Math.min;
import static java.util.Arrays.asList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand Down Expand Up @@ -120,6 +123,61 @@ void singleToPublisherOnNextErrorPropagated(boolean delayError) {
assertThat(subscriber.awaitOnError(), is(DELIBERATE_EXCEPTION));
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void subscriptionThrowsFromTerminalHandled(boolean delayError) {
AtomicLong requestCount = new AtomicLong();
LongConsumer requestThrower = n -> {
if (requestCount.accumulateAndGet(n, Long::sum) > 1) {
throw DELIBERATE_EXCEPTION;
}
};
Function<? super Integer, ? extends Publisher<? extends Integer>> func = Publisher::from;
toSource(delayError ?
publisher.whenRequest(requestThrower).flatMapMergeDelayError(func, 1) :
publisher.whenRequest(requestThrower).flatMapMerge(func, 1))
.subscribe(subscriber);
subscriber.awaitSubscription().request(1);
publisher.onNext(1);
publisher.onComplete();
assertThat(subscriber.takeOnNext(), is(1));
assertThat(subscriber.awaitOnError(), is(DELIBERATE_EXCEPTION));
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void exceptionHandledWhileConcurrentProcessing(boolean delayError) {
assert executor != null;
TestPublisher<Integer> publisher2 = new TestPublisher<>();
Function<? super Integer, ? extends Publisher<? extends Integer>> func = i -> {
if (i == 1) {
return publisher2;
} else if (i == 2) {
return executor.submit(() -> i).toPublisher();
} else {
return never();
}
};
toSource((delayError ?
publisher.flatMapMergeDelayError(func, 2) :
publisher.flatMapMerge(func, 2))
.map(i -> {
if (i == 2) {
publisher2.onNext(12);
return i;
} else if (i == 12) {
throw DELIBERATE_EXCEPTION;
}
throw new IllegalStateException("unexpected i: " + i);
}))
.subscribe(subscriber);
subscriber.awaitSubscription().request(2);
publisher.onNext(1, 2);
publisher.onComplete();
assertThat(subscriber.takeOnNext(), is(2));
assertThat(subscriber.awaitOnError(), is(DELIBERATE_EXCEPTION));
}

@Test
void mappedRecoverMakesProgress() throws Exception {
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -265,21 +323,121 @@ void singleItemMappedErrorPostSourceComplete() {
assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION));
}

@Test
void testDuplicateTerminal() {
@ParameterizedTest(name = "{displayName} [{index}] errorFirst={0} errorSecond={1}")
@CsvSource(value = {"true,true", "true,false", "false,true", "false,false"})
void testDuplicateTerminal(boolean errorFirst, boolean errorSecond) {
PublisherSource<Integer> mappedPublisher = subscriber -> {
subscriber.onSubscribe(EMPTY_SUBSCRIPTION);
subscriber.onComplete();
if (errorFirst) {
subscriber.onError(DELIBERATE_EXCEPTION);
} else {
subscriber.onComplete();
}

// intentionally violate the RS spec to verify the operator's behavior.
// [1] https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.7
subscriber.onComplete();
if (errorSecond) {
subscriber.onError(new IllegalStateException("duplicate terminal should be discarded!"));
} else {
subscriber.onComplete();
}
};
@SuppressWarnings("unchecked")
Subscriber<Integer> mockSubscriber = mock(Subscriber.class);
toSource(publisher.flatMapMerge(i -> fromSource(mappedPublisher), 1)).subscribe(mockSubscriber);
publisher.onNext(1);

if (errorFirst) {
verify(mockSubscriber).onError(DELIBERATE_EXCEPTION);
} else {
publisher.onComplete();
verify(mockSubscriber).onComplete();
}
}

@ParameterizedTest(name = "{displayName} [{index}] delayError={0} queuedSignals={1}")
@CsvSource(value = {"true,true", "true,false", "false,true", "false,false"})
void testInvalidDemand(boolean delayError, boolean queuedSignals) throws InterruptedException {
final int firstItem = 1;
Publisher<Integer> publisher = Publisher.range(firstItem, firstItem + 10);
TestSubscription mappedSubscription = new TestSubscription();
TestPublisher<Integer> mappedPublisher = new TestPublisher.Builder<Integer>()
.disableAutoOnSubscribe().build(subscriber1 -> {
subscriber1.onSubscribe(mappedSubscription);
return subscriber1;
});
Function<Integer, Publisher<Integer>> mapper = i -> i == firstItem ? mappedPublisher : never();
toSource(delayError ? publisher.flatMapMergeDelayError(mapper, 1) : publisher.flatMapMerge(mapper, 1))
.subscribe(subscriber);
Subscription subscription = subscriber.awaitSubscription();
subscription.request(1);

mappedSubscription.awaitRequestN(1);
mappedPublisher.onNext(2);
assertEquals(2, subscriber.takeOnNext());

if (queuedSignals) {
// We issued request(1) on the outer publisher and so the inner publisher is allowed to request more to
// avoid potential deadlocks.
mappedSubscription.awaitRequestN(2);
mappedPublisher.onNext(3);
}

subscription.request(-1);

assertThat(subscriber.awaitOnError(), instanceOf(IllegalArgumentException.class));
}

@ParameterizedTest(name = "{displayName} [{index}] delayError={0} mapErrorToComplete={1}")
@CsvSource(value = {"true,true", "true,false", "false,true", "false,false"})
void testDemandNotRespectedPropagatesTerminal(boolean delayError, boolean mapErrorToComplete)
throws InterruptedException {
final int firstItem = 1;
TestSubscription upstreamSubscription = new TestSubscription();
publisher = new TestPublisher.Builder<Integer>()
.disableAutoOnSubscribe().build(subscriber1 -> {
subscriber1.onSubscribe(upstreamSubscription);
return subscriber1;
});
TestSubscription mappedSubscription = new TestSubscription();
TestPublisher<Integer> mappedPublisher = new TestPublisher.Builder<Integer>()
.disableAutoOnSubscribe().build(subscriber1 -> {
subscriber1.onSubscribe(mappedSubscription);
return subscriber1;
});
Function<Integer, Publisher<Integer>> mapper = i -> i == firstItem ?
(mapErrorToComplete ? mappedPublisher.onErrorComplete() : mappedPublisher) :
never();
toSource(delayError ? publisher.flatMapMergeDelayError(mapper, 1) : publisher.flatMapMerge(mapper, 1))
.subscribe(subscriber);
Subscription subscription = subscriber.awaitSubscription();
subscription.request(1);
upstreamSubscription.awaitRequestN(1);
publisher.onNext(firstItem);

mappedSubscription.awaitRequestN(1);
mappedPublisher.onNext(2);
assertEquals(2, subscriber.takeOnNext());

// We issued request(1) on the outer publisher and so the inner publisher is allowed to request more to avoid
// potential deadlocks.
mappedSubscription.awaitRequestN(2);
mappedPublisher.onNext(3);
assertThat(subscriber.pollOnNext(10, MILLISECONDS), is(nullValue())); // no demand means no delivery.
assertThat(mappedSubscription.requestedEquals(2), is(true));
mappedPublisher.onNext(4); // intentionally deliver an item without demand!
publisher.onComplete();
verify(mockSubscriber).onComplete();

// Drain the item in the queue in order to have the terminal event delivered.
subscription.request(1);
assertEquals(3, subscriber.takeOnNext());

if (mapErrorToComplete) {
subscriber.awaitOnComplete();
} else {
assertThat(subscriber.awaitOnError(), instanceOf(IllegalStateException.class));
}
assertThat(upstreamSubscription.isCancelled(), is(!mapErrorToComplete && !delayError));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.ArrayDeque;
Expand Down Expand Up @@ -220,21 +221,36 @@ void testSingleErrorPostSourceComplete() {
assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION));
}

@Test
void testDuplicateTerminal() {
@ParameterizedTest(name = "{displayName} [{index}] errorFirst={0} errorSecond={1}")
@CsvSource(value = {"true,true", "true,false", "false,true", "false,false"})
void testDuplicateTerminal(boolean errorFirst, boolean errorSecond) {
SingleSource<Integer> single = subscriber -> {
subscriber.onSubscribe(IGNORE_CANCEL);
subscriber.onSuccess(2);
if (errorFirst) {
subscriber.onError(DELIBERATE_EXCEPTION);
} else {
subscriber.onSuccess(2);
}

// intentionally violate the RS spec to verify the operator's behavior.
// [1] https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.7
subscriber.onSuccess(3);
if (errorSecond) {
subscriber.onError(new IllegalStateException("duplicate terminal should be discarded!"));
} else {
subscriber.onSuccess(3);
}
};
@SuppressWarnings("unchecked")
Subscriber<Integer> mockSubscriber = mock(Subscriber.class);
toSource(source.flatMapMergeSingle(integer1 -> fromSource(single), 2)).subscribe(mockSubscriber);
source.onNext(1);
source.onComplete();
verify(mockSubscriber).onComplete();

if (errorFirst) {
verify(mockSubscriber).onError(DELIBERATE_EXCEPTION);
} else {
source.onComplete();
verify(mockSubscriber).onComplete();
}
}

@Test
Expand Down
Loading

0 comments on commit 2928a29

Please sign in to comment.