Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document using .subscribe for fire-and-forget scenarios #3358

Open
62mkv opened this issue Feb 18, 2023 · 2 comments
Open

Document using .subscribe for fire-and-forget scenarios #3358

62mkv opened this issue Feb 18, 2023 · 2 comments
Labels
for/stackoverflow Questions are best asked on SO or Gitter good first issue Ideal for a new contributor, we'll help type/documentation A documentation update

Comments

@62mkv
Copy link

62mkv commented Feb 18, 2023

Documentation Issue

Sometimes in various "Reactive" projects I see usage of such pattern as

Mono<Void> initiateSideEffects(T arg) {

    thirdPartyService.sideEffectOperation(arg)
          .subscribe();

    yetAnotherService.sideEffectOperation(arg)
          .subscribe();

    return Mono.empty();
}

I could not find any mention in the Reference documentation regarding such usage of .subscribe(). All instances of .subscribe() in Reference documentation seem to refer to calls from imperative blocking code, to observe Publisher-s behavior.

How safe is such usage from the Reactive code though? What if GC gets to this Disposable and purges it, what happens to underlying Subscription and / or Publisher?

I've asked this question on SO, and the only answer I've got thus far seems to imply that as long as .subscribeOn is used on the Publisher with an appropriate Scheduler, all should be fine.

Can we please have this covered in the Reference documentation too?

Improvement Suggestion

Document "proper" (idiomatic) implementation of "fire-and-forget" scenario in Reactive code (closest analogy in imperative code would be to run a Thread with some Runnable and move on; or an @Async Spring annotation)

Document implication of calling .subscribe() and "forgetting" the returned Disposable, both in assembly-phase code and in subscription-phase code.

Additional context

SO ticket: https://stackoverflow.com/questions/75434622/is-it-safe-to-just-subscribe-to-publishers-in-the-assembly-phase-and-leave-it-at/75445943

@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Feb 18, 2023
@OlegDokuka OlegDokuka added type/documentation A documentation update good first issue Ideal for a new contributor, we'll help for/stackoverflow Questions are best asked on SO or Gitter and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels Feb 20, 2023
@OlegDokuka
Copy link
Contributor

OlegDokuka commented Feb 20, 2023

Hi, @62mkv!

In general, you should chain all the calls since the actual downstream in your chain may want to control the lifecycle of all allocated streams. You should follow that rule unless you know that the subscription is something cheap or should not be canceled or managed by the downstream.

According to your suggestions - there is no concept of "idiomatic" fnf in reactive streams. Fnf call assumes no control over the running process (including any possibility of terminating it earlier). That says Reactive Streams does not fit well there since A - such a Mono should complete immediately upon termination or B - if it is not, it should be terminatable which means you may want to keep returned Disposable somewhere. Also, don't forget about errors and the supervising concept which means all of them to have to be handled somehow.

So, by writing to that point I'm not even sure such an FNF scenario should be added to the docs.

My suggestion is to change the underlying implementation to either have a non-reactive API or to make the response completed immediately.

Alternatively, the code you wrote should have a kind of dead-letter queue where all the errors can be delivered later, e.g.

final ErrorSubscriber errorSubs = ...


Mono<Void> initiateSideEffects(T arg) {

    thirdPartyService.sideEffectOperation(arg)
          .subscribe(errorSubs);

    yetAnotherService.sideEffectOperation(arg)
          .subscribe(errorSubs);

    return Mono.empty();
}

where errorSubscriber can deliver exceptions somewhere (logs, another service, etc).

P.S. I highly suggest rethinking the API. Returning Mono.empty and keeping Mono contract makes no sense unless you know that there is another implementation which may have different Mono behavior, so you just keep the contract for that purpose

@62mkv
Copy link
Author

62mkv commented Feb 25, 2023

Hi @OlegDokuka thanks for your comments.

Let me be more specific: we use such patterns currently in situations where we want to do something not-too-critical, but potentially time-consuming, in a context where completion speed is more important than the actual outcome. Say, like notifying third party via WebClient call from a Kafka consumer. If we were to chain all the calls, even if one of them times out, this might have detrimental effect on the event consumption.

By your comments, I assume that if we make sure that every "discarded" subscription is wrapped in a catch-all .onErrorResume or similar handler, it should be relatively safe to use?

What about also applying .subscribeOn(Schedulers.boundedElastic()) - would that make it safer yet?

More to the point: I am not saying that this usage should be encouraged; rather I am trying to collect evidence suggesting otherwise, that I could then show to my co-workers and suggest that we probably should not do this. And a note in reference documentation would certainly be helpful in this respect.

Thanks again!

PS: concerning API: the code in my example could indeed be re-written using just void return type. And it has issues indeed, if called at inappropriate time, e.g. assembly phase. So probably it would be a tiny bit better if it were written like this instead:

Mono<Void> initiateSideEffects(T arg) {

    return Mono.defer(() -> {

      thirdPartyService.sideEffectOperation(arg)
          .subscribe(errorSubs);

      yetAnotherService.sideEffectOperation(arg)
          .subscribe(errorSubs);

      return Mono.empty();
    });
}

with this code at least nothing happens before subscribe is called.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for/stackoverflow Questions are best asked on SO or Gitter good first issue Ideal for a new contributor, we'll help type/documentation A documentation update
Projects
None yet
Development

No branches or pull requests

3 participants