Skip to content

Commit

Permalink
Add Publisher.switchMap (#2678)
Browse files Browse the repository at this point in the history
Motivation:
Publisher.switchMap can be used to flatten an async stream of
publishers while always taking results from the latest publisher
and cancelling the previous Publisher.
  • Loading branch information
Scottmitch authored Sep 5, 2023
1 parent 670cc93 commit 9f0d920
Show file tree
Hide file tree
Showing 9 changed files with 1,465 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.servicetalk.concurrent.api;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

final class CompositeExceptionUtils {
/**
Expand All @@ -26,17 +27,37 @@ final class CompositeExceptionUtils {
private CompositeExceptionUtils() {
}

static <T> void addPendingError(AtomicIntegerFieldUpdater<T> updater, T owner, int maxDelayedErrors,
Throwable original, Throwable causeToAdd) {
// optimistically increment, recover after the fact if necessary.
final int newSize = updater.incrementAndGet(owner);
if (newSize < 0) {
updater.set(owner, Integer.MAX_VALUE);
} else if (newSize < maxDelayedErrors && original != causeToAdd) {
// We ensure original is not equal to causeToAdd, safe to add suppressed.
original.addSuppressed(causeToAdd);
static <T> Throwable addPendingError(AtomicReferenceFieldUpdater<T, Throwable> causeUpdater,
AtomicIntegerFieldUpdater<T> countUpdater, T owner, int maxDelayedErrors,
Throwable causeToAdd) {
Throwable currPendingError = causeUpdater.get(owner);
if (currPendingError == null) {
if (causeUpdater.compareAndSet(owner, null, causeToAdd)) {
currPendingError = causeToAdd;
} else {
currPendingError = causeUpdater.get(owner);
assert currPendingError != null;
addPendingError(countUpdater, owner, maxDelayedErrors, currPendingError, causeToAdd);
}
} else {
updater.decrementAndGet(owner);
addPendingError(countUpdater, owner, maxDelayedErrors, currPendingError, causeToAdd);
}
return currPendingError;
}

private static <T> void addPendingError(AtomicIntegerFieldUpdater<T> updater, T owner, int maxDelayedErrors,
Throwable original, Throwable causeToAdd) {
if (original == causeToAdd) {
return;
}
for (;;) {
final int size = updater.get(owner);
if (size >= maxDelayedErrors) {
break;
} else if (updater.compareAndSet(owner, size, size + 1)) {
original.addSuppressed(causeToAdd); // original is not equal to causeToAdd, safe to add suppressed.
break;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,6 @@ public final Publisher<T> onErrorResume(Predicate<? super Throwable> predicate,
* return results;
* }</pre>
* @param mapper Convert each item emitted by this {@link Publisher} into another {@link Publisher}.
* each mapped {@link Publisher}.
* @param <R> The type of mapped {@link Publisher}.
* @return A new {@link Publisher} which flattens the emissions from all mapped {@link Publisher}s.
* @see <a href="https://reactivex.io/documentation/operators/flatmap.html">ReactiveX flatMap operator.</a>
Expand Down Expand Up @@ -871,7 +870,6 @@ public final <R> Publisher<R> flatMapMerge(Function<? super T, ? extends Publish
* createAndThrowACompositeException(errors);
* }</pre>
* @param mapper Convert each item emitted by this {@link Publisher} into another {@link Publisher}.
* each mapped {@link Publisher}.
* @param <R> The type of mapped {@link Publisher}.
* @return A new {@link Publisher} which flattens the emissions from all mapped {@link Publisher}s.
* @see <a href="https://reactivex.io/documentation/operators/flatmap.html">ReactiveX flatMap operator.</a>
Expand Down Expand Up @@ -1618,6 +1616,86 @@ public final <R> Publisher<R> flatMapConcatIterable(Function<? super T, ? extend
return new PublisherConcatMapIterable<>(this, mapper);
}

/**
* Return a {@link Publisher} that will switch to the latest {@link Publisher} emitted from {@code mapper} and the
* prior {@link Publisher} will be cancelled. Both upstream and the last switched {@link Publisher} must complete
* before the returned {@link Publisher} completes. If either upstream or the currently active {@link Publisher}
* terminate in error the returned {@link Publisher} is terminated with that error.
* <pre>{@code
* ExecutorService e = ...;
* List<Future<List<R>>> futures = ...; // assume this is thread safe
*
* for (T t : resultOfThisPublisher()) {
* // Note that flatMap process results in parallel.
* futures.add(e.submit(() -> {
* // Approximation: control flow is simplified here but when a later mapper is applied any incomplete
* // results from a previous mapper are cancelled and result in empty results.
* return mapper.apply(t); // Asynchronous result is flatten into a value by this operator.
* }));
* }
* List<R> results = new ArrayList<>(futures.size());
* // This is an approximation, this operator does not provide any ordering guarantees for the results.
* for (Future<List<R>> future : futures) {
* List<R> rList = future.get(); // Throws if the processing for this item failed.
* results.addAll(rList);
* }
* return results;
* }</pre>
* @param mapper Convert each item emitted by this {@link Publisher} into another {@link Publisher}.
* @param <R> The type of mapped {@link Publisher}.
* @return A {@link Publisher} that will switch to the latest {@link Publisher} emitted from {@code mapper} and the
* prior {@link Publisher} will be cancelled.
* @see <a href="https://reactivex.io/documentation/operators/switch.html">ReactiveX switch operator.</a>
* @see <a href=
"https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-latest.html">
Kotlin flatMapLatest</a>
* @see #switchMapDelayError(Function)
*/
public final <R> Publisher<R> switchMap(Function<? super T, ? extends Publisher<? extends R>> mapper) {
return new PublisherSwitchMap<>(this, 0, mapper);
}

/**
* Return a {@link Publisher} that will switch to the latest {@link Publisher} emitted from {@code mapper} and the
* prior {@link Publisher} will be cancelled. Both upstream and the last switched {@link Publisher} must terminate
* before the returned {@link Publisher} terminates (including errors).
* @param mapper Convert each item emitted by this {@link Publisher} into another {@link Publisher}.
* @param <R> The type of mapped {@link Publisher}.
* @return A {@link Publisher} that will switch to the latest {@link Publisher} emitted from {@code mapper} and the
* prior {@link Publisher} will be cancelled.
* @see <a href="https://reactivex.io/documentation/operators/switch.html">ReactiveX switch operator.</a>
* @see <a href=
"https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-latest.html">
Kotlin flatMapLatest</a>
* @see #switchMap(Function)
* @see #switchMapDelayError(Function, int)
*/
public final <R> Publisher<R> switchMapDelayError(Function<? super T, ? extends Publisher<? extends R>> mapper) {
return new PublisherSwitchMap<>(this, true, mapper);
}

/**
* Return a {@link Publisher} that will switch to the latest {@link Publisher} emitted from {@code mapper} and the
* prior {@link Publisher} will be cancelled. Both upstream and the last switched {@link Publisher} must terminate
* before the returned {@link Publisher} terminates (including errors).
* @param mapper Convert each item emitted by this {@link Publisher} into another {@link Publisher}.
* @param maxDelayedErrorsHint The maximum amount of errors that will be queued. After this point exceptions maybe
* discarded to reduce memory consumption.
* @param <R> The type of mapped {@link Publisher}.
* @return A {@link Publisher} that will switch to the latest {@link Publisher} emitted from {@code mapper} and the
* prior {@link Publisher} will be cancelled.
* @see <a href="https://reactivex.io/documentation/operators/switch.html">ReactiveX switch operator.</a>
* @see <a href=
"https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-latest.html">
Kotlin flatMapLatest</a>
* @see #switchMap(Function)
* @see #switchMapDelayError(Function)
*/
public final <R> Publisher<R> switchMapDelayError(Function<? super T, ? extends Publisher<? extends R>> mapper,
int maxDelayedErrorsHint) {
return new PublisherSwitchMap<>(this, maxDelayedErrorsHint, mapper);
}

/**
* Merge two {@link Publisher}s together. There is no guaranteed ordering of events emitted from the returned
* {@link Publisher}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,8 +622,9 @@ public void onError(final Throwable t) {
SubscriberUtils.logDuplicateTerminal(this, t);
return;
}
Throwable currPendingError = parent.pendingError;

if (parent.source.maxDelayedErrors == 0) {
final Throwable currPendingError = parent.pendingError;
if (currPendingError == null && pendingErrorUpdater.compareAndSet(parent, null, t)) {
try {
parent.doCancel(true);
Expand All @@ -633,19 +634,8 @@ public void onError(final Throwable t) {
}
}
} else {
if (currPendingError == null) {
if (pendingErrorUpdater.compareAndSet(parent, null, t)) {
currPendingError = t;
} else {
currPendingError = parent.pendingError;
assert currPendingError != null;
addPendingError(pendingErrorCountUpdater, parent,
parent.source.maxDelayedErrors, currPendingError, t);
}
} else {
addPendingError(pendingErrorCountUpdater, parent,
parent.source.maxDelayedErrors, currPendingError, t);
}
final Throwable currPendingError = addPendingError(pendingErrorUpdater, pendingErrorCountUpdater,
parent, parent.source.maxDelayedErrors, t);
if (parent.removeSubscriber(this, unusedDemand)) {
parent.enqueueAndDrain(error(currPendingError));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,26 +417,15 @@ public void onError(Throwable t) {
cancellableSet.remove(singleCancellable);
singleCancellable = null;

Throwable currPendingError = pendingError;
if (source.maxDelayedErrors == 0) {
final Throwable currPendingError = pendingError;
if (currPendingError == null &&
pendingErrorUpdater.compareAndSet(FlatMapSubscriber.this, null, t)) {
onError0(t, true);
}
} else {
if (currPendingError == null) {
if (pendingErrorUpdater.compareAndSet(FlatMapSubscriber.this, null, t)) {
currPendingError = t;
} else {
currPendingError = pendingError;
assert currPendingError != null;
addPendingError(pendingErrorCountUpdater, FlatMapSubscriber.this, source.maxDelayedErrors,
currPendingError, t);
}
} else {
addPendingError(pendingErrorCountUpdater, FlatMapSubscriber.this, source.maxDelayedErrors,
currPendingError, t);
}
final Throwable currPendingError = addPendingError(pendingErrorUpdater, pendingErrorCountUpdater,
FlatMapSubscriber.this, source.maxDelayedErrors, t);
if (decrementActiveMappedSources()) {
enqueueAndDrain(error(currPendingError));
} else {
Expand Down
Loading

0 comments on commit 9f0d920

Please sign in to comment.