Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add switchMap operator #1787

Merged
merged 10 commits into from
Mar 8, 2025
Merged

feat: Add switchMap operator #1787

merged 10 commits into from
Mar 8, 2025

Conversation

MartinHH
Copy link
Contributor

@MartinHH MartinHH commented Mar 5, 2025

Implements #974


/*
* Copyright (C) 2014-2022 Lightbend Inc. <https://www.lightbend.com>
*/
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is needed, but: a good fraction of my test code is a result of copy/paste/modify from org.apache.pekko.stream.scaladsl.FlowFlattenMergeSpec which is code "inherited" from Lightbend. (See various comments // copied (with modifications) from org.apache.pekko.stream.scaladsl.FlowFlattenMergeSpec below.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - if you copy code from a class that has headers like this into a new source file then that source file needs the headers from the original file(s).


package org.apache.pekko.stream.scaladsl

import org.apache.pekko.NotUsed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one nit is that the project code style is to import org.apache.pekko and then the subsequent imports can begin import pekko.stream... (ie omit the org.apache bit).

* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

brand new source files with no copying from existing source files should have the standard Apache source header not this one.

https://github.com/apache/pekko/blob/main/project/AddMetaInfLicenseFiles.scala#L1-L16

*/
package org.apache.pekko.stream.impl.fusing

import org.apache.pekko.annotation.InternalApi
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see comment elsewhere about scala imports for import org.apache.pekko

@pjfanning pjfanning added this to the 1.2.0-M2 milestone Mar 5, 2025
MartinHH added 2 commits March 5, 2025 15:57
- adjust import style
- adjust header
(add missing newline)
@pjfanning pjfanning added the t:stream Pekko Streams label Mar 5, 2025
@pjfanning
Copy link
Contributor

needs

  1. sbt javafmtAll -- some of the Java files are failing the style check
  2. doc link issue
[03-05 15:14:56.422] [error] java.lang.RuntimeException: Unable to extract details from /home/runner/work/pekko/pekko/docs/src/main/paradox/stream/operators/Source-or-Flow/switchMap.md
[03-05 15:14:56.422] [error] 	at StreamOperatorsIndexGenerator$.getDetails(StreamOperatorsIndexGenerator.scala:281)
[03-05 15:14:56.422] [error] 	at StreamOperatorsIndexGenerator$.$anonfun$generateAlphabeticalIndex$13(StreamOperatorsIndexGenerator.scala:209)
[03-05 15:14:56.422] [error] 	at scala.collection.immutable.List.map(List.scala:297)
[03-05 15:14:56.422] [error] 	at StreamOperatorsIndexGenerator$.$anonfun$generateAlphabeticalIndex$1(StreamOperatorsIndexGenerator.scala:207)
[03-05 15:14:56.423] [error] 	at scala.Function1.$anonfun$compose$1(Function1.scala:49)
[03-05 15:14:56.423] [error] 	at sbt.internal.util.$tilde$greater.$anonfun$$u2219$1(TypeFunctions.scala:63)
[03-05 15:14:56.423] [error] 	at sbt.std.Transform$$anon$4.work(Transform.scala:69)
[03-05 15:14:56.423] [error] 	at sbt.Execute.$anonfun$submit$2(Execute.scala:283)
[03-05 15:14:56.423] [error] 	at sbt.internal.util.ErrorHandling$.wideConvert(ErrorHandling.scala:24)
[03-05 15:14:56.423] [error] 	at sbt.Execute.work(Execute.scala:292)
[03-05 15:14:56.423] [error] 	at sbt.Execute.$anonfun$submit$1(Execute.scala:283)
[03-05 15:14:56.423] [error] 	at sbt.ConcurrentRestrictions$$anon$4.$anonfun$submitValid$1(ConcurrentRestrictions.scala:265)
[03-05 15:14:56.423] [error] 	at sbt.CompletionService$$anon$2.call(CompletionService.scala:65)
[03-05 15:14:56.423] [error] 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
[03-05 15:14:56.423] [error] 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
[03-05 15:14:56.423] [error] 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
[03-05 15:14:56.423] [error] 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
[03-05 15:14:56.423] [error] 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
[03-05 15:14:56.423] [error] 	at java.base/java.lang.Thread.run(Thread.java:1583)
[03-05 15:14:56.423] [error] Caused by: java.lang.IllegalArgumentException: requirement failed: category link in /home/runner/work/pekko/pekko/docs/src/main/paradox/stream/operators/Source-or-Flow/switchMap.md should start with @ref, but saw ""
[03-05 15:14:56.423] [error] 	at scala.Predef$.require(Predef.scala:281)
[03-05 15:14:56.423] [error] 	at StreamOperatorsIndexGenerator$.getDetails(StreamOperatorsIndexGenerator.scala:273)
[03-05 15:14:56.423] [error] 	at StreamOperatorsIndexGenerator$.$anonfun$generateAlphabeticalIndex$13(StreamOperatorsIndexGenerator.scala:209)
[03-05 15:14:56.423] [error] 	at scala.collection.immutable.List.map(List.scala:297)
[03-05 15:14:56.423] [error] 	at StreamOperatorsIndexGenerator$.$anonfun$generateAlphabeticalIndex$1(StreamOperatorsIndexGenerator.scala:207)
[03-05 15:14:56.423] [error] 	at scala.Function1.$anonfun$compose$1(Function1.scala:49)
[03-05 15:14:56.423] [error] 	at sbt.internal.util.$tilde$greater.$anonfun$$u2219$1(TypeFunctions.scala:63)
[03-05 15:14:56.423] [error] 	at sbt.std.Transform$$anon$4.work(Transform.scala:69)
[03-05 15:14:56.423] [error] 	at sbt.Execute.$anonfun$submit$2(Execute.scala:283)
[03-05 15:14:56.423] [error] 	at sbt.internal.util.ErrorHandling$.wideConvert(ErrorHandling.scala:24)
[03-05 15:14:56.423] [error] 	at sbt.Execute.work(Execute.scala:292)
[03-05 15:14:56.423] [error] 	at sbt.Execute.$anonfun$submit$1(Execute.scala:283)
[03-05 15:14:56.423] [error] 	at sbt.ConcurrentRestrictions$$anon$4.$anonfun$submitValid$1(ConcurrentRestrictions.scala:265)
[03-05 15:14:56.423] [error] 	at sbt.CompletionService$$anon$2.call(CompletionService.scala:65)
[03-05 15:14:56.423] [error] 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
[03-05 15:14:56.423] [error] 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
[03-05 15:14:56.423] [error] 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
[03-05 15:14:56.423] [error] 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
[03-05 15:14:56.423] [error] 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
[03-05 15:14:56.423] [error] 	at java.base/java.lang.Thread.run(Thread.java:1583)
[03-05 15:14:56.433] [error] (docs / Compile / managedResources) Unable to extract details from /home/runner/work/pekko/pekko/docs/src/main/paradox/stream/operators/Source-or-Flow/switchMap.md

@MartinHH
Copy link
Contributor Author

MartinHH commented Mar 5, 2025

needs

1. `sbt javafmtAll` -- some of the Java files are failing the style check

2. doc link issue

Thanks - and sorry for choosing the "submit your PR straight away ... without reading the rest of this wonderful doc" route.
😉

@@ -0,0 +1,29 @@
# switchMap

Transforms each input element into a `Source` of output elements that is then flattened into the output stream until a new input element is received.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and then the old stream is been cancelled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be good to be more explicit in the docs about the behaviour

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was struggling to find the right trade of between brevety (after all, there is a one-line-limit enforced for this part here) and explicitness. I now added a little bit more to this line and even more in the desciption section below.

(I'd be very open for any specific wording suggestions if you feel the updated version is not clear enough.)

}

setHandler(in,
new InHandler {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fold the in handler and the outhandler to the graphstagelogic to avoid some allocation

override def createLogic(enclosingAttributes: Attributes) =
new GraphStageLogic(shape) {

var source = Option.empty[SubSinkInlet[T]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use OptionVal to avoid boxing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private[pekko] object OptionVal {

def setSource(source: Graph[SourceShape[T], M]): Unit = {
cancelCurrentSource()
removeCurrentSource(completeIfClosed = false)
val sinkIn = new SubSinkInlet[T]("SwitchSink")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can avoid some materialization if the source is SingleSource or IterableSource.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a brief shot at that before I submitted the PR but then refrained from that because my initial naive approach lead to inconsistencies (my test case "not behave differently for substreams Source.single(x) and Source(List(x))" results from that).

The thing I found challenging: I'd expect that as long as further values are available from upstream, any previous values from upstream are discarded without emitting a single item.

For example, the following should and currently does result in Seq(2, 3) (with the 1 being discarded):

Source(List(Source(List(1)), Source(List(2, 3))))
        .switchMap(identity)
        .runWith(Sink.seq)

I'd expect the same for

Source(List(Source.single(1), Source(List(2, 3))))
        .switchMap(identity)
        .runWith(Sink.seq)

That means that for example, I detect a SingleSource as current substream, I cannot just emit the single value, but need to check wether further items are available from upstream before doing that (and discard the single value if there are).

At the point where I had understood that, I decided to keep the initial implementation simple.

Now that almost everything else seems to be more or less agreed on, I might try to give it another shot sometime during the coming week and see if I can get that optimization done without breaking the above expectations (maybe one of you has some pointers?). But I might end up at a point where I'd suggest to merge this at is it is (then hoping for someone else to get that optoimization done).

@He-Pin
Copy link
Member

He-Pin commented Mar 6, 2025

Thank you for this. I have some pr that avoids the materialization of subSource, which you can refer to.
#1775

@He-Pin He-Pin changed the title feat: switchMap feat: Add switchMap operator Mar 6, 2025
private val in = Inlet[Graph[SourceShape[T], M]]("switch.in")
private val out = Outlet[T]("switch.out")

override def initialAttributes = DefaultAttributes.switch
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better with the lambda line number too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@He-Pin do you still want this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/**
* INTERNAL API
*/
@InternalApi private[pekko] final class Switch[T, M]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe SwitchMap?

Copy link
Contributor Author

@MartinHH MartinHH Mar 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be fine with renaming this, but maybe it's worth to explain my choice of name before we decide whether to rename:

  • This GraphStage does not evaluate the lambda - similar to flatMapMerge and flatMapConcat, the evalation of f is delegated to the map operator and then only the "flattening" is done in the GraphStage. (My reasons for implementing it that way: consistency with the afforementioned operators and less redundancy in code because I did not need to care about things like f throwing.)
  • For those other operators that delegate evaluation of f to map, the naming scheme seems to be to not have Map in the name of the GraphStage (the stage for flatMapMerge is called FlattenMerge)
  • The transformation that is implemented by this GraphStage ("switching" without the "mapping" part) corresponds to what an operator that is named switch in both ReactiveX and Monix does

So following my train of thought, if this stage were named SwitchMap one should move the evaluation of f into the GraphStage.

Don't get me wrong: I'd be fine with any solution (keep it as it is; rename to SwitchMap and move evaluation of f into the GraphStage or just rename and keep evaluation of f where it is), I just wanted to share my reasoning before I make any changes.

@pjfanning
Copy link
Contributor

pjfanning commented Mar 6, 2025

@He-Pin @MartinHH I am comfortable with merging this with just maybe the rename of the Switch class to SwitchMap and then following up with an optimisation PR that avoids allocations (but uses longer and more complicated code).

The doc update would be good too and maybe the OptionVal change.

@He-Pin
Copy link
Member

He-Pin commented Mar 7, 2025

I'm Ok with the current shape too, which we can polish it later, but let's wait a little longer for @MartinHH if he has some more updates, wdyt.

I would also like to have #1702 later If you have time @pjfanning .

@He-Pin
Copy link
Member

He-Pin commented Mar 7, 2025

@pjfanning @MartinHH BTW, the Kotlin flow is using flatmapLatest as a switchMap, I'm not sure which name is better, maybe switchMap is good enough?

@MartinHH
Copy link
Contributor Author

MartinHH commented Mar 7, 2025

@pjfanning @MartinHH BTW, the Kotlin flow is using flatmapLatest as a switchMap, I'm not sure which name is better, maybe switchMap is good enough?

I'm kind of biased to the naming from certain libraries (RxScala, Monix) because I learned about all those stream operators back when the official scala coursera courses had Erik Meijer teach RxScala, so for me, this is "switchMap". Given that Rx is still very popular in certain ecosystems (e.g. for Js/Ts frontend), I'd say one can't go wrong following their naming, but then again: I don't really mind.

@He-Pin
Copy link
Member

He-Pin commented Mar 7, 2025

@MartinHH Yes, I see the code in Spring Ai, which is using Flux#switchMap too. Thanks for the input.

*
* '''Emits when''' the current substream has an element available
*
* '''Backpressures when''' never
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this never is really great!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought: this might be formally correct as it never backpressures on upstream (and backpressure on upstream is what is to be adressed here if I'm not mistaken), but maybe it would be more helpful to mention the backpressure on substream, e.g.:

Suggested change
* '''Backpressures when''' never
* '''Backpressures when''' never on upstream (backpressures on current substream when downstream backpressures)

wdyt?

* @since 1.2.0
*/
def switchMap[T, M](f: Out => Graph[SourceShape[T], M]): Repr[T] =
map(f).via(new Switch[T, M].addAttributes(Attributes(SourceLocation.forLambda(f))))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You implemented it with a map and switch cool.

@He-Pin He-Pin linked an issue Mar 7, 2025 that may be closed by this pull request
}

setHandler(in, this)
setHandler(out, this)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setHandlers(in, out, this)

}

private def cancelCurrentSource(): Unit = {
if (source.isDefined) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better with pattern matching

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I stopped being passionate about such things so I just followed your suggestion, although I do not agree 😉 .

}
}

override def postStop(): Unit = cancelCurrentSource()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should cancel the current source in onDownStreamFinished instead of here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. I followed the example of FlattenMerge where the cancelling of substreams is done in postStop():

override def postStop(): Unit = sources.foreach(_.cancel())
(and onDownStreamFinished is not overriden).

I'd expect that postStop will eventually be called in any case and don't see a risk of cancelCurrentSource() being called from here when it shouldn't, so doing it in this "catch-all" handler feels safer to me. (But maybe I'm just missing the significant difference?)

}
}

def setSource(source: Graph[SourceShape[T], M]): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setupCurrentSource maybe a better name?

Copy link
Member

@He-Pin He-Pin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall lgtm, thank you very much for this.

@MartinHH
Copy link
Contributor Author

MartinHH commented Mar 7, 2025

I submitted another set of suggested changes. Looking at the sunny weather forecast, I doubt that I will get to figure out how to properly implement avoiding materialization for certain sources in the coming days, so I'd suggest to merge if this my latest changes get approval.

@He-Pin
Copy link
Member

He-Pin commented Mar 8, 2025

@MartinHH, I think we can merge this. I can prepare to continue a PR after your excellent work.

Copy link
Contributor

@pjfanning pjfanning left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@MartinHH
Copy link
Contributor Author

MartinHH commented Mar 8, 2025

Ok, I think I'm done here, so go ahead and merge as you feel.

Btw: thanks for the welcoming experience on here!

@He-Pin He-Pin merged commit 57af84a into apache:main Mar 8, 2025
9 checks passed
@He-Pin
Copy link
Member

He-Pin commented Mar 8, 2025

Thank you very much, it completes the pekko stream for ai scenarios.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
t:stream Pekko Streams
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Feature request: Add Flow#switchMap operator.
3 participants