-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Auto-cancelled Sink still accepts emissions #3715
Comments
Hi, I'm lucky to come across this issue. I'm currently learning Reactor and have also encountered this problem, which has left me confused. Additionally, I have a question related to the EmitResult code, and perhaps it can be addressed together in this issue. Is it necessary to use a new response code to represent "warm up" and distinguish whether subscribers exist when using tryEmitNext?Issue #2338 mentions: do we really need a separate fail code (vs reusing The solution related to #2338 is to introduce the FAIL_ZERO_SUBSCRIBER status code. However, triggering the FAIL_ZERO_SUBSCRIBER response code requires meeting the following two conditions:
Perhaps adjusting or extending the meaning of the EmitResult code can help us better understand the state of Sinks.Many when using tryEmitNext. I'm not sure if the following points are reasonable:
When the "warm up" response code appears, it can indicate the possibility of FAIL_OVERFLOW risk. |
@kaqqao thanks for the report! I reviewed the other |
@qnnn glad to hear you're learning Reactor 🚀 Hope the experience is pleasant so far. Regarding the suggestion you made: If you feel this is falling short in some scenarios and there are situations that changing this behaviour would enable new flows, please open a new issue with ideas about the migration path if there's a behaviour change proposed. Let's not mix two concerns in the same issue. Thanks in advance and good luck on your learning journey! |
@chemicL I really appreciate your patient response, and it has been very helpful to me. I apologize for bringing it up in this issue. Best wishes! |
Nice to see this discussion finally started |
This issue is still a valid one and is open for contributions. The attempt in #3725 had some concerns and was closed due to inactivity so there's a chance to build upon that feedback. |
Hi @chemicL I am also learning reactor. I am interested to work on this issue. I saw your concern in the other PR. I am not sure whether it is possible/feasible. But is it a good idea to somehow freeze the queue once the sink is canceled so that it does not accept any more items? |
Hey, @bajibalu! Happy to hear you're learning reactor and are interested in contributing 🎉 The queues we have do not have a concept of freezing/closing. However, there are other mechanisms to prevent inserting an item. On the other hand, it's also not an issue if the item is injected as long as somthing later notices that a cancellation has happened and takes care of clearing the queue. After looking at the code one more time I think it is the case already that an item inserted after cancellation should be removed from the queue due to the drain operation that follows. Unfortunately, I couldn't get it to work because somehow the WIP (work-in-progress) marker is left in an unclean state. Debug the below code to see what's happening:
If nothing was emitted before, then no queue is created:
Solving this issue would be the first step to a comprehensive solution. One potential issue is that in case of a emit/cancel race we wouldn't communicate the cancellation to the caller if cancellation happens after checks for cancellation but before the item is inserted into the queue. But perhaps that's not a big deal as long as something discards the item from the queue (so fixing the above nuance is necessary). What I would love to see in a PR is:
@bajibalu in case you'd like to work on this, just leave a note here so if anyone else interested in this wants to take over we can re-assign in case you don't find the time. Thanks! |
As explained in the comment to the above PR, fixing this would require a comprehensive approach with guarantees that the existing functionality is not dramatically affected performance-wise. It is open to an external contribution, but it needs to address a lot of concerns and requires care and be a comprehensive PR requiring little initial guidance from the team. |
A sink created via
Sinks.many().multicast().onBackpressureBuffer()
will keep accepting emissions even after it's cancelled byautoCancel
(which is implicitly enabled, and unfortunately not mentioned in the method's Javadoc), until the internal buffer is filled. Javadoc on the overloadonBackpressureBuffer(int bufferSize, boolean autoCancel)
mentions:Expected Behavior
After the sink is shut down (cancelled),
sink.tryEmitNext(...)
should fail.Actual Behavior
sink.tryEmitNext(...)
succeeds and keeps filling the buffer.Steps to Reproduce
In this state, no subscriber will ever be able to subscribe to the sink and consume
"Test2"
or any future emissions, yet emissions are accepted and buffered.Possible Solution
Add
to the beginning of
tryEmitNext
,tryEmitError
andtryEmitComplete
inSinkManyEmitterProcessor.java
;Your Environment
java -version
): 17, 21The text was updated successfully, but these errors were encountered: