-
Notifications
You must be signed in to change notification settings - Fork 186
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Publisher#concatPropagateCancel(Completable) to force cancel propagation #2372
Conversation
Motivation: In some scenarios concat is used where the right hand side has assocated state that is desirable to always trigger (concat payload body to meta data for http response processing). Regular concat will not subscribe if the Publisher terminates with onError or is cancelled which will prevent terminal event visibility.
test failure attributed to #2069 |
.../servicetalk/concurrent/api/publisher/PublisherConcatWithCompletablePropagateCancelTest.java
Outdated
Show resolved
Hide resolved
.../servicetalk/concurrent/api/publisher/PublisherConcatWithCompletablePropagateCancelTest.java
Outdated
Show resolved
Hide resolved
...ncurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherConcatWithCompletable.java
Outdated
Show resolved
Hide resolved
...ncurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherConcatWithCompletable.java
Show resolved
Hide resolved
servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java
Outdated
Show resolved
Hide resolved
test failure attributed to #2265 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My comments related to tests are in this patch:
Index: servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/publisher/PublisherConcatWithCompletableTest.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/publisher/PublisherConcatWithCompletableTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/publisher/PublisherConcatWithCompletableTest.java
--- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/publisher/PublisherConcatWithCompletableTest.java (revision bcad9a44fec7c69744d6b54edd87bb5350c3a1b3)
+++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/publisher/PublisherConcatWithCompletableTest.java (date 1664812054953)
@@ -148,16 +148,16 @@
} else {
source.onComplete();
}
- assertThat("Next source not subscribed.", completable.isSubscribed(), is(propagateCancel || !onError));
- subscriber.awaitSubscription().cancel();
- if (propagateCancel) {
- completable.awaitSubscribed();
+ if (propagateCancel || !onError) {
+ assertThat("Next source not subscribed.", completable.isSubscribed(), is(true));
+ subscriber.awaitSubscription().cancel();
+ assertThat("Source subscription unexpectedly cancelled.", subscription.isCancelled(), is(false));
assertThat("Next cancellable not cancelled.", cancellable.isCancelled(), is(true));
} else {
- if (!onError) {
- completable.awaitSubscribed();
- }
- assertThat("Next cancellable not cancelled.", cancellable.isCancelled(), is(!onError));
+ assertThat("Next source unexpectedly subscribed.", completable.isSubscribed(), is(false));
+ subscriber.awaitSubscription().cancel();
+ assertThat("Source subscription not cancelled.", subscription.isCancelled(), is(true));
+ assertThat("Next cancellable unexpectedly cancelled.", cancellable.isCancelled(), is(false));
}
if (onError) {
Index: servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherConcatWithCompletableCancelTckTest.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherConcatWithCompletableCancelTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherConcatWithCompletablePropagateCancelTckTest.java
rename from servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherConcatWithCompletableCancelTckTest.java
rename to servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherConcatWithCompletablePropagateCancelTckTest.java
--- a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherConcatWithCompletableCancelTckTest.java (revision bcad9a44fec7c69744d6b54edd87bb5350c3a1b3)
+++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherConcatWithCompletablePropagateCancelTckTest.java (date 1664810883896)
@@ -21,7 +21,7 @@
import org.testng.annotations.Test;
@Test
-public class PublisherConcatWithCompletableCancelTckTest extends AbstractPublisherOperatorTckTest<Integer> {
+public class PublisherConcatWithCompletablePropagateCancelTckTest extends AbstractPublisherOperatorTckTest<Integer> {
@Override
protected Publisher<Integer> composePublisher(Publisher<Integer> publisher, int elements) {
return publisher.concatPropagateCancel(Completable.completed());
completable.onComplete(); | ||
} | ||
// Cancel before the publisher completes means we don't propagate terminals. | ||
assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, now I see the problem with propagating terminals after cancel
. Thanks for this test!
Motivation:
In some scenarios concat is used where the right hand side has assocated state that is desirable to always trigger (concat payload body to meta data for http response processing). Regular concat will not subscribe if the Publisher terminates with onError or is cancelled which will prevent terminal event visibility.