Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add switchMap operator #1787

Merged
merged 10 commits into from
Mar 8, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# switchMap

Transforms each input element into a `Source` of output elements that is then flattened into the output stream until a new input element is received.
Transforms each input element into a `Source` of output elements that is then flattened into the output stream until a new input element is received (at which point the current (now previous) substream is cancelled and the new one is flattend into the output stream).

@ref[Nesting and flattening operators](../index.md#nesting-and-flattening-operators)

Expand All @@ -11,8 +11,9 @@ Transforms each input element into a `Source` of output elements that is then fl
## Description

Transforms each input element into a `Source` of output elements that is then flattened into the output stream until a
new input element is received at which point the current (now previous) substream is cancelled (which is why this
operator is sometimes also called "flatMapLatest").
new input element is received at which point the current (now previous) substream is cancelled while the `Source` resulting
from the input element is subscribed to and flattened into the output stream. So effectively, only the "latest" `Source`
is flattened into the output stream (which is why this operator is sometimes also called "flatMapLatest").

## Reactive Streams semantics

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import pekko.stream.stage.GraphStage
import pekko.stream.stage.GraphStageLogic
import pekko.stream.stage.InHandler
import pekko.stream.stage.OutHandler
import pekko.util.OptionVal

/**
* INTERNAL API
Expand All @@ -45,35 +46,33 @@ import pekko.stream.stage.OutHandler
override val shape = FlowShape(in, out)

override def createLogic(enclosingAttributes: Attributes) =
new GraphStageLogic(shape) {
new GraphStageLogic(shape) with InHandler with OutHandler {

var source = Option.empty[SubSinkInlet[T]]
var source = OptionVal.none[SubSinkInlet[T]]

override def preStart(): Unit = {
pull(in)
super.preStart()
}

setHandler(in,
new InHandler {
override def onPush(): Unit = {
val source = grab(in)
setSource(source)
tryPull(in)
}
override def onPush(): Unit = {
val source = grab(in)
setSource(source)
tryPull(in)
}

override def onUpstreamFinish(): Unit = if (source.isEmpty) completeStage()
})
override def onUpstreamFinish(): Unit = if (source.isEmpty) completeStage()

setHandler(out,
new OutHandler {
override def onPull(): Unit = {
if (isAvailable(out)) tryPushOut()
}
})
override def onPull(): Unit = {
if (isAvailable(out)) tryPushOut()
}

setHandler(in, this)
setHandler(out, this)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setHandlers(in, out, this)


def tryPushOut(): Unit = {
source.foreach { src =>
if (source.isDefined) {
val src = source.get
if (src.isAvailable) {
push(out, src.grab())
if (!src.isClosed) src.pull()
Expand All @@ -99,17 +98,21 @@ import pekko.stream.stage.OutHandler
}
})
sinkIn.pull()
this.source = Some(sinkIn)
this.source = OptionVal.Some(sinkIn)
val graph = Source.fromGraph(source).to(sinkIn.sink)
subFusingMaterializer.materialize(graph, defaultAttributes = enclosingAttributes)
}

def removeCurrentSource(completeIfClosed: Boolean): Unit = {
source = None
source = OptionVal.none
if (completeIfClosed && isClosed(in)) completeStage()
}

private def cancelCurrentSource(): Unit = source.foreach(_.cancel())
private def cancelCurrentSource(): Unit = {
if (source.isDefined) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better with pattern matching

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I stopped being passionate about such things so I just followed your suggestion, although I do not agree 😉 .

source.get.cancel()
}
}

override def postStop(): Unit = cancelCurrentSource()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should cancel the current source in onDownStreamFinished instead of here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. I followed the example of FlattenMerge where the cancelling of substreams is done in postStop():

override def postStop(): Unit = sources.foreach(_.cancel())
(and onDownStreamFinished is not overriden).

I'd expect that postStop will eventually be called in any case and don't see a risk of cancelCurrentSource() being called from here when it shouldn't, so doing it in this "catch-all" handler feels safer to me. (But maybe I'm just missing the significant difference?)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2821,7 +2821,7 @@ trait FlowOps[+Out, +Mat] {
* @since 1.2.0
*/
def switchMap[T, M](f: Out => Graph[SourceShape[T], M]): Repr[T] =
map(f).via(new Switch[T, M])
map(f).via(new Switch[T, M].addAttributes(Attributes(SourceLocation.forLambda(f))))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You implemented it with a map and switch cool.


/**
* If the first element has not passed through this operator before the provided timeout, the stream is failed
Expand Down