From 977ac05b43f552a62bac9a57877ae97e3c968da4 Mon Sep 17 00:00:00 2001 From: Michael Nitschinger Date: Fri, 16 Aug 2024 17:02:01 +0200 Subject: [PATCH] Improve Single (+Completable) amb* documentation (#3044) --- .../concurrent/api/AmbSingles.java | 4 +-- .../concurrent/api/Completable.java | 26 ++++++++------- .../io/servicetalk/concurrent/api/Single.java | 32 +++++++++++-------- .../internal/DelayedCancellable.java | 3 ++ 4 files changed, 39 insertions(+), 26 deletions(-) diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/AmbSingles.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/AmbSingles.java index e28159ab69..ea70a9d8e1 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/AmbSingles.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/AmbSingles.java @@ -90,13 +90,13 @@ public void onSubscribe(final Cancellable cancellable) { @Override public void onSuccess(@Nullable final T result) { - ignoreCancel(); + ignoreCancel(); // prevents cancel propagation after termination state.trySuccess(result); } @Override public void onError(final Throwable t) { - ignoreCancel(); + ignoreCancel(); // prevents cancel propagation after termination state.tryError(t); } } diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Completable.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Completable.java index f5136b9037..62e0e2ba21 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Completable.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Completable.java @@ -1647,9 +1647,9 @@ public final Completable setContextOnSubscribe(ContextMap context) { /** * Creates a new {@link Completable} that terminates with the result (either success or error) of either this - * {@link Completable} or the passed {@code other} {@link Completable}, whichever terminates first. Therefore the + * {@link Completable} or the passed {@code other} {@link Completable}, whichever terminates first. Therefore, the * result is said to be ambiguous relative to which source it originated from. After the first - * source terminates the non-terminated source will be cancelled. + * source terminates, only the non-terminated source will be cancelled. *

* From a sequential programming point of view this method is roughly equivalent to the following: *

{@code
@@ -1662,7 +1662,7 @@ public final Completable setContextOnSubscribe(ContextMap context) {
      * @param other {@link Completable} to subscribe to and race with this {@link Completable} to propagate to the
      * return value.
      * @return A new {@link Completable} that terminates with the result (either success or error) of either this
-     * {@link Completable} or the passed {@code other} {@link Completable}, whichever terminates first. Therefore the
+     * {@link Completable} or the passed {@code other} {@link Completable}, whichever terminates first. Therefore, the
      * result is said to be ambiguous relative to which source it originated from.
      * @see ReactiveX amb operator.
      */
@@ -2132,9 +2132,9 @@ public static Completable mergeAllDelayError(int maxConcurrency, Completable...
 
     /**
      * Creates a new {@link Completable} that terminates with the result (either success or error) of whichever amongst
-     * the passed {@code completables} that terminates first. Therefore the result is said to be
-     * ambiguous relative to which source it originated from. After the first source terminates the
-     * non-terminated sources will be cancelled.
+     * the passed {@code completables} that terminates first. Therefore, the result is said to be
+     * ambiguous relative to which source it originated from. After the first source terminates, only
+     * the non-terminated sources will be cancelled.
      * 

* From a sequential programming point of view this method is roughly equivalent to the following: *

{@code
@@ -2146,7 +2146,7 @@ public static Completable mergeAllDelayError(int maxConcurrency, Completable...
      *
      * @param completables {@link Completable}s to subscribe to and race to propagate to the return value.
      * @return A new {@link Completable} that terminates with the result (either success or error) of whichever amongst
-     * the passed {@code completables} that terminates first. Therefore the result is said to be
+     * the passed {@code completables} that terminates first. Therefore, the result is said to be
      * ambiguous relative to which source it originated from.
      * @see ReactiveX amb operator.
      */
@@ -2157,7 +2157,7 @@ public static Completable amb(final Completable... completables) {
 
     /**
      * Creates a new {@link Completable} that terminates with the result (either success or error) of whichever amongst
-     * the passed {@code completables} that terminates first. After the first source terminates the non-terminated
+     * the passed {@code completables} that terminates first. After the first source terminates, only the non-terminated
      * sources will be cancelled.
      * 

* From a sequential programming point of view this method is roughly equivalent to the following: @@ -2170,7 +2170,7 @@ public static Completable amb(final Completable... completables) { * * @param completables {@link Completable}s to subscribe to and race to propagate to the return value. * @return A new {@link Completable} that terminates with the result (either success or error) of whichever amongst - * the passed {@code completables} that terminates first. Therefore the result is said to be + * the passed {@code completables} that terminates first. Therefore, the result is said to be * ambiguous relative to which source it originated from. * @see ReactiveX amb operator. */ @@ -2181,7 +2181,9 @@ public static Completable amb(final Iterable completables) { /** * Creates a new {@link Completable} that terminates with the result (either success or error) of whichever amongst - * the passed {@code completables} that terminates first. + * the passed {@code completables} that terminates first. Therefore, the result is said to be + * ambiguous relative to which source it originated from. After the first source terminates, only + * the non-terminated sources will be cancelled. *

* From a sequential programming point of view this method is roughly equivalent to the following: *

{@code
@@ -2202,7 +2204,9 @@ public static Completable anyOf(final Completable... completables) {
 
     /**
      * Creates a new {@link Completable} that terminates with the result (either success or error) of whichever amongst
-     * the passed {@code completables} that terminates first.
+     * the passed {@code completables} that terminates first. Therefore, the result is said to be
+     * ambiguous relative to which source it originated from. After the first source terminates, only
+     * the non-terminated sources will be cancelled.
      * 

* From a sequential programming point of view this method is roughly equivalent to the following: *

{@code
diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Single.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Single.java
index f6cf6cccca..cd32d895fd 100644
--- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Single.java
+++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Single.java
@@ -1787,9 +1787,9 @@ public final  Single liftAsync(SingleOperator oper
 
     /**
      * Creates a new {@link Single} that terminates with the result (either success or error) of either this
-     * {@link Single} or the passed {@code other} {@link Single}, whichever terminates first. Therefore the result is
+     * {@link Single} or the passed {@code other} {@link Single}, whichever terminates first. Therefore, the result is
      * said to be ambiguous relative to which source it originated from. After the first source
-     * terminates the non-terminated source will be cancelled.
+     * terminates, only the non-terminated source will be cancelled.
      * 

* From a sequential programming point of view this method is roughly equivalent to the following: *

{@code
@@ -1801,7 +1801,7 @@ public final  Single liftAsync(SingleOperator oper
      *
      * @param other {@link Single} to subscribe to and race with this {@link Single} to propagate to the return value.
      * @return A new {@link Single} that terminates with the result (either success or error) of either this
-     * {@link Single} or the passed {@code other} {@link Single}, whichever terminates first. Therefore the result is
+     * {@link Single} or the passed {@code other} {@link Single}, whichever terminates first. Therefore, the result is
      * said to be ambiguous relative to which source it originated from.
      * @see ReactiveX amb operator.
      */
@@ -2315,9 +2315,9 @@ public static  Single fromStage(CompletionStage stage) {
 
     /**
      * Creates a new {@link Single} that terminates with the result (either success or error) of whichever amongst the
-     * passed {@code singles} that terminates first. Therefore the result is said to be ambiguous
-     * relative to which source it originated from. After the first source terminates the non-terminated sources will be
-     * cancelled.
+     * passed {@code singles} that terminates first. Therefore, the result is said to be ambiguous
+     * relative to which source it originated from. After the first source terminates, only the non-terminated sources
+     * will be cancelled.
      * 

* From a sequential programming point of view this method is roughly equivalent to the following: *

{@code
@@ -2330,7 +2330,7 @@ public static  Single fromStage(CompletionStage stage) {
      * @param singles {@link Single}s to subscribe to and race to propagate to the return value.
      * @param  Type of the result of the individual {@link Single}s
      * @return A new {@link Single} that terminates with the result (either success or error) of whichever amongst the
-     * passed {@code singles} that terminates first. Therefore the result is said to be ambiguous
+     * passed {@code singles} that terminates first. Therefore, the result is said to be ambiguous
      * relative to which source it originated from.
      * @see ReactiveX amb operator.
      */
@@ -2341,9 +2341,9 @@ public static  Single amb(final Single... singles) {
 
     /**
      * Creates a new {@link Single} that terminates with the result (either success or error) of whichever amongst the
-     * passed {@code singles} that terminates first. Therefore the result is said to be ambiguous
-     * relative to which source it originated from. After the first source terminates the non-terminated sources will be
-     * cancelled.
+     * passed {@code singles} that terminates first. Therefore, the result is said to be ambiguous
+     * relative to which source it originated from. After the first source terminates, only the non-terminated sources
+     * will be cancelled.
      * 

* From a sequential programming point of view this method is roughly equivalent to the following: *

{@code
@@ -2356,7 +2356,7 @@ public static  Single amb(final Single... singles) {
      * @param singles {@link Single}s to subscribe to and race to propagate to the return value.
      * @param  Type of the result of the individual {@link Single}s
      * @return A new {@link Single} that terminates with the result (either success or error) of whichever amongst the
-     * passed {@code singles} that terminates first. Therefore the result is said to be ambiguous
+     * passed {@code singles} that terminates first. Therefore, the result is said to be ambiguous
      * relative to which source it originated from.
      * @see ReactiveX amb operator.
      */
@@ -2366,7 +2366,9 @@ public static  Single amb(final Iterable> singles) {
 
     /**
      * Creates a new {@link Single} that terminates with the result (either success or error) of whichever amongst the
-     * passed {@code singles} that terminates first.
+     * passed {@code singles} that terminates first. Therefore, the result is said to be ambiguous
+     * relative to which source it originated from. After the first source terminates, only the non-terminated sources
+     * will be cancelled.
      * 

* From a sequential programming point of view this method is roughly equivalent to the following: *

{@code
@@ -2380,6 +2382,7 @@ public static  Single amb(final Iterable> singles) {
      * @param  Type of the result of the individual {@link Single}s
      * @return A new {@link Single} that terminates with the result (either success or error) of whichever amongst the
      * passed {@code singles} that terminates first.
+     * @see #amb(Single[])
      * @see ReactiveX amb operator.
      */
     @SafeVarargs
@@ -2389,7 +2392,9 @@ public static  Single anyOf(final Single... singles) {
 
     /**
      * Creates a new {@link Single} that terminates with the result (either success or error) of whichever amongst the
-     * passed {@code singles} that terminates first.
+     * passed {@code singles} that terminates first. Therefore, the result is said to be ambiguous
+     * relative to which source it originated from. After the first source terminates, only the non-terminated sources
+     * will be cancelled.
      * 

* From a sequential programming point of view this method is roughly equivalent to the following: *

{@code
@@ -2403,6 +2408,7 @@ public static  Single anyOf(final Single... singles) {
      * @param  Type of the result of the individual {@link Single}s
      * @return A new {@link Single} that terminates with the result (either success or error) of whichever amongst the
      * passed {@code singles} that terminates first.
+     * @see #amb(Iterable)
      * @see ReactiveX amb operator.
      */
     public static  Single anyOf(final Iterable> singles) {
diff --git a/servicetalk-concurrent-internal/src/main/java/io/servicetalk/concurrent/internal/DelayedCancellable.java b/servicetalk-concurrent-internal/src/main/java/io/servicetalk/concurrent/internal/DelayedCancellable.java
index 3d53b1501e..ab39659f3f 100644
--- a/servicetalk-concurrent-internal/src/main/java/io/servicetalk/concurrent/internal/DelayedCancellable.java
+++ b/servicetalk-concurrent-internal/src/main/java/io/servicetalk/concurrent/internal/DelayedCancellable.java
@@ -55,6 +55,9 @@ public void cancel() {
 
     /**
      * Ignores any subsequent calls to {@link #cancel()}, preventing propagating the cancellation further up the stream.
+     * 

+ * Note: if {@link #delayedCancellable(Cancellable)} is called after this method, the provided {@link Cancellable} + * will be immediately cancelled to prevent leakage of resources. */ protected final void ignoreCancel() { currentUpdater.set(this, IGNORE_CANCEL);