diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java index e63da75343..5b9edb64d4 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java @@ -1548,10 +1548,10 @@ public final Publisher flatMapConcatIterable(FunctionReactiveX merge operator - * @see #mergeWithDelayError(Publisher) - * @see #merge(Publisher[]) + * @see #mergeDelayError(Publisher) + * @see #mergeAll(Publisher[]) */ - public final Publisher mergeWith(Publisher other) { + public final Publisher merge(Publisher other) { return from(this, other).flatMapMerge(identity(), 2); } @@ -1587,10 +1587,10 @@ public final Publisher mergeWith(Publisher other) { * @param other The {@link Publisher} to merge with, * @return A {@link Publisher} which is the result of this {@link Publisher} and {@code other} merged together. * @see ReactiveX merge operator - * @see #mergeWith(Publisher) - * @see #mergeDelayError(Publisher[]) + * @see #merge(Publisher) + * @see #mergeAllDelayError(Publisher[]) */ - public final Publisher mergeWithDelayError(Publisher other) { + public final Publisher mergeDelayError(Publisher other) { return from(this, other).flatMapMergeDelayError(identity(), 2, 2); } @@ -4114,11 +4114,90 @@ public static Publisher defer(Supplier> * @param Type of items emitted by the returned {@link Publisher}. * @return A {@link Publisher} which is the result of this {@link Publisher} and {@code other} merged together. * @see ReactiveX merge operator - * @see #mergeDelayError(Publisher[]) + * @see #mergeAll(int, Publisher[]) + * @see #mergeAllDelayError(Publisher[]) */ @SafeVarargs - public static Publisher merge(Publisher... publishers) { - return from(publishers).flatMapMerge(identity(), publishers.length); + public static Publisher mergeAll(Publisher... publishers) { + return from(publishers).flatMapMerge(identity()); + } + + /** + * Merge all {@link Publisher}s together. There is no guaranteed ordering of events emitted from the returned + * {@link Publisher}. + *

+ * This method provides similar capabilities as expanding each result into a collection and concatenating each + * collection in sequential programming: + *

{@code
+     *     List mergedResults = ...; // concurrent safe list
+     *     for (T t : resultOfPublisher1()) {
+     *         futures.add(e.submit(() -> {
+     *             return mergedResults.add(t);
+     *         }));
+     *     }
+     *     for (T t : resultOfOtherPublisher()) {
+     *         futures.add(e.submit(() -> {
+     *             return mergedResults.add(t);
+     *         }));
+     *     }
+     *     for (Future future : futures) {
+     *        future.get(); // Throws if the processing for this item failed.
+     *     }
+     *     return mergedResults;
+     * }
+ * @param maxConcurrency The maximum amount of {@link Publisher}s from {@code publishers} to subscribe to + * concurrently. + * @param publishers The {@link Publisher}s to merge together. + * @param Type of items emitted by the returned {@link Publisher}. + * @return A {@link Publisher} which is the result of this {@link Publisher} and {@code other} merged together. + * @see ReactiveX merge operator + * @see #mergeAllDelayError(int, Publisher[]) + */ + @SafeVarargs + public static Publisher mergeAll(int maxConcurrency, Publisher... publishers) { + return from(publishers).flatMapMerge(identity(), maxConcurrency); + } + + /** + * Merge all {@link Publisher}s together. There is no guaranteed ordering of events emitted from the returned + * {@link Publisher}. If any {@link Publisher} terminates in an error, the error propagation will be delayed until + * all terminate. + *

+ * This method provides similar capabilities as expanding each result into a collection and concatenating each + * collection in sequential programming: + *

{@code
+     *     List mergedResults = ...; // concurrent safe list
+     *     for (T t : resultOfPublisher1()) {
+     *         futures.add(e.submit(() -> {
+     *             return mergedResults.add(t);
+     *         }));
+     *     }
+     *     for (T t : resultOfOtherPublisher()) {
+     *         futures.add(e.submit(() -> {
+     *             return mergedResults.add(t);
+     *         }));
+     *     }
+     *     List errors = ...;
+     *     for (Future future : futures) {
+     *         try {
+     *           future.get(); // Throws if the processing for this item failed.
+     *         } catch (Throwable cause) {
+     *           errors.add(cause);
+     *         }
+     *     }
+     *     throwExceptionIfNotEmpty(errors);
+     *     return mergedResults;
+     * }
+ * @param publishers The {@link Publisher}s to merge together. + * @param Type of items emitted by the returned {@link Publisher}. + * @return A {@link Publisher} which is the result of this {@link Publisher} and {@code other} merged together. + * @see ReactiveX merge operator + * @see #mergeAllDelayError(int, Publisher[]) + * @see #mergeAll(Publisher[]) + */ + @SafeVarargs + public static Publisher mergeAllDelayError(Publisher... publishers) { + return from(publishers).flatMapMergeDelayError(identity()); } /** @@ -4155,11 +4234,11 @@ public static Publisher merge(Publisher... publishers) { * @param Type of items emitted by the returned {@link Publisher}. * @return A {@link Publisher} which is the result of this {@link Publisher} and {@code other} merged together. * @see ReactiveX merge operator - * @see #merge(Publisher[]) + * @see #mergeAll(Publisher[]) */ @SafeVarargs - public static Publisher mergeDelayError(Publisher... publishers) { - return from(publishers).flatMapMergeDelayError(identity(), publishers.length, publishers.length); + public static Publisher mergeAllDelayError(int maxConcurrency, Publisher... publishers) { + return from(publishers).flatMapMergeDelayError(identity(), maxConcurrency); } // diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapMerge.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapMerge.java index 36761c332a..081df88867 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapMerge.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapMerge.java @@ -39,6 +39,7 @@ import static io.servicetalk.concurrent.internal.ConcurrentUtils.tryAcquireLock; import static io.servicetalk.concurrent.internal.SubscriberUtils.checkDuplicateSubscription; import static io.servicetalk.concurrent.internal.SubscriberUtils.isRequestNValid; +import static io.servicetalk.concurrent.internal.SubscriberUtils.newExceptionForInvalidRequestN; import static io.servicetalk.concurrent.internal.TerminalNotification.complete; import static io.servicetalk.concurrent.internal.TerminalNotification.error; import static io.servicetalk.utils.internal.PlatformDependent.newUnboundedMpscQueue; @@ -175,6 +176,10 @@ public void request(final long n) { incMappedDemand(n); } else { subscription.request(n); + // If the upstream source has already sent an onComplete signal, it won't be able to send an error. + // We propagate invalid demand upstream to clean-up upstream (if necessary) and force an error here to + // ensure we see an error. + enqueueAndDrain(error(newExceptionForInvalidRequestN(n))); } } diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapSingle.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapSingle.java index cb9987f98e..5fc1dac72f 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapSingle.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapSingle.java @@ -43,6 +43,7 @@ import static io.servicetalk.concurrent.internal.ConcurrentUtils.tryAcquireLock; import static io.servicetalk.concurrent.internal.SubscriberUtils.checkDuplicateSubscription; import static io.servicetalk.concurrent.internal.SubscriberUtils.isRequestNValid; +import static io.servicetalk.concurrent.internal.SubscriberUtils.newExceptionForInvalidRequestN; import static io.servicetalk.concurrent.internal.TerminalNotification.complete; import static io.servicetalk.concurrent.internal.TerminalNotification.error; import static io.servicetalk.utils.internal.PlatformDependent.newUnboundedMpscQueue; @@ -161,6 +162,10 @@ public void request(long n) { } } else { subscription.request(n); + // If the upstream source has already sent an onComplete signal, it won't be able to send an error. + // We propagate invalid demand upstream to clean-up upstream (if necessary) and force an error here to + // ensure we see an error. + enqueueAndDrain(error(newExceptionForInvalidRequestN(n))); } } diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherMergeWithTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherMergeWithTest.java index dc2cf5d30c..6789788e3f 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherMergeWithTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherMergeWithTest.java @@ -25,8 +25,8 @@ import java.util.ArrayList; import java.util.List; -import static io.servicetalk.concurrent.api.Publisher.merge; -import static io.servicetalk.concurrent.api.Publisher.mergeDelayError; +import static io.servicetalk.concurrent.api.Publisher.mergeAll; +import static io.servicetalk.concurrent.api.Publisher.mergeAllDelayError; import static io.servicetalk.concurrent.api.SourceAdapters.toSource; import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; import static java.util.Arrays.asList; @@ -66,7 +66,7 @@ private static Iterable completeSource() { @ParameterizedTest(name = "inOrderOnNext={0} inOrderTerminate={1} firstOnError={2} delayError={3}") @MethodSource("completeSource") void bothComplete(boolean inOrderOnNext, boolean inOrderTerminate, boolean firstOnError, boolean delayError) { - toSource(delayError ? first.mergeWithDelayError(second) : first.mergeWith(second)).subscribe(subscriber); + toSource(delayError ? first.mergeDelayError(second) : first.merge(second)).subscribe(subscriber); subscriber.awaitSubscription().request(2); int i = 3; int j = 4; @@ -115,7 +115,7 @@ void bothComplete(boolean inOrderOnNext, boolean inOrderTerminate, boolean first @ParameterizedTest(name = "inOrderOnNext={0} inOrderTerminate={1} firstOnError={2} delayError={3}") @MethodSource("completeSource") void allComplete(boolean inOrderOnNext, boolean inOrderTerminate, boolean firstOnError, boolean delayError) { - toSource(delayError ? mergeDelayError(first, second, third) : merge(first, second, third)) + toSource(delayError ? mergeAllDelayError(first, second, third) : mergeAll(first, second, third)) .subscribe(subscriber); subscriber.awaitSubscription().request(3); int i = 3; diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherMergeDelayErrorTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherMergeDelayErrorTckTest.java new file mode 100644 index 0000000000..46a5d58402 --- /dev/null +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherMergeDelayErrorTckTest.java @@ -0,0 +1,54 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.reactivestreams.tck; + +import io.servicetalk.concurrent.api.Publisher; + +import org.testng.annotations.Ignore; + +import static io.servicetalk.concurrent.api.Publisher.empty; + +public class PublisherMergeDelayErrorTckTest extends AbstractPublisherOperatorTckTest { + @Override + protected Publisher createServiceTalkPublisher(long elements) { + int numElements = TckUtils.requestNToInt(elements); + + if (numElements <= 1) { + return TckUtils.newPublisher(numElements).mergeDelayError(empty()); + } + + int halfElements = numElements / 2; + + // Calculate the number of elements that will not be emitted by the first publisher so we create another + // one in composePublisher(...) that will emit these. The sum of both should be == elements. + return composePublisher(TckUtils.newPublisher(halfElements), numElements - halfElements); + } + + @Override + protected Publisher composePublisher(final Publisher publisher, final int elements) { + return publisher.mergeDelayError(TckUtils.newPublisher(elements)); + } + + @Ignore("merge operator proactively requests from upstream, and will not deliver errors until demand comes") + @Override + public void required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() { + } + + @Ignore("merge operator proactively requests from upstream, and will not deliver errors until demand comes") + @Override + public void required_spec309_requestZeroMustSignalIllegalArgumentException() { + } +} diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherMergeTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherMergeTckTest.java new file mode 100644 index 0000000000..cdd012ec33 --- /dev/null +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherMergeTckTest.java @@ -0,0 +1,55 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.reactivestreams.tck; + +import io.servicetalk.concurrent.api.Publisher; + +import org.testng.annotations.Ignore; + +import static io.servicetalk.concurrent.api.Publisher.empty; + +public class PublisherMergeTckTest extends AbstractPublisherOperatorTckTest { + @Override + protected Publisher createServiceTalkPublisher(long elements) { + int numElements = TckUtils.requestNToInt(elements); + + if (numElements <= 1) { + return TckUtils.newPublisher(numElements).merge(empty()); + } + + int halfElements = numElements / 2; + + // Calculate the number of elements that will not be emitted by the first publisher so we create another + // one in composePublisher(...) that will emit these. The sum of both should be == elements. + return composePublisher(TckUtils.newPublisher(halfElements), numElements - halfElements); + } + + @Override + protected Publisher composePublisher(final Publisher publisher, final int elements) { + return publisher.merge(TckUtils.newPublisher(elements)); + } + + @Ignore("merge operator proactively requests from upstream, and will not deliver errors until demand comes") + @Override + public void required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() { + // merge operator proactively requests from upstream, and will not deliver errors until demand comes. + } + + @Ignore("merge operator proactively requests from upstream, and will not deliver errors until demand comes") + @Override + public void required_spec309_requestZeroMustSignalIllegalArgumentException() { + } +} diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZipWithDelayErrorTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZipWithDelayErrorTckTest.java deleted file mode 100644 index ee98615cb5..0000000000 --- a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZipWithDelayErrorTckTest.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright © 2023 Apple Inc. and the ServiceTalk project authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.servicetalk.concurrent.reactivestreams.tck; - -import io.servicetalk.concurrent.api.Publisher; - -import static io.servicetalk.concurrent.api.Publisher.empty; - -public class PublisherZipWithDelayErrorTckTest extends AbstractPublisherOperatorTckTest { - @Override - protected Publisher composePublisher(final Publisher publisher, final int elements) { - return publisher.mergeWithDelayError(empty()); - } -} diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZipWithTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZipWithTckTest.java deleted file mode 100644 index b1a525b8b7..0000000000 --- a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZipWithTckTest.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright © 2023 Apple Inc. and the ServiceTalk project authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.servicetalk.concurrent.reactivestreams.tck; - -import io.servicetalk.concurrent.api.Publisher; - -import static io.servicetalk.concurrent.api.Publisher.empty; - -public class PublisherZipWithTckTest extends AbstractPublisherOperatorTckTest { - @Override - protected Publisher composePublisher(final Publisher publisher, final int elements) { - return publisher.mergeWith(empty()); - } -} diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpPayloadHolder.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpPayloadHolder.java index ce8d98730d..d4d3cc0591 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpPayloadHolder.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpPayloadHolder.java @@ -267,7 +267,7 @@ private static void throwOnNextAfterTrailersException(HttpHeaders trailers, @Nul * @param s The {@link Single} to merge. * @return The result of the merge operation. */ - private static Publisher merge(Publisher p, Single s) { + private static Publisher merge(Publisher p, Single s) { // We filter null from the Single in case the publisher completes and we didn't find trailers. return from(p, s.toPublisher().filter(Objects::nonNull)).flatMapMerge(identity(), 2); }