Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add switchMap operator #1787

Merged
merged 10 commits into from
Mar 8, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions docs/src/main/paradox/stream/operators/Source-or-Flow/switchMap.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# switchMap

Transforms each input element into a `Source` of output elements that is then flattened into the output stream until a new input element is received.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and then the old stream is been cancelled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be good to be more explicit in the docs about the behaviour

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was struggling to find the right trade of between brevety (after all, there is a one-line-limit enforced for this part here) and explicitness. I now added a little bit more to this line and even more in the desciption section below.

(I'd be very open for any specific wording suggestions if you feel the updated version is not clear enough.)


@ref[Nesting and flattening operators](../index.md#nesting-and-flattening-operators)

## Signature

@apidoc[Flow.switchMap](Flow) { scala="#switchMap[T,M](f:Out=%3Eorg.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],M]):FlowOps.this.Repr[T]" java="#switchMap(org.apache.pekko.japi.function.Function)" }

## Description

Transforms each input element into a `Source` of output elements that is then flattened into the output stream until a
new input element is received at which point the current (now previous) substream is cancelled (which is why this
operator is sometimes also called "flatMapLatest").

## Reactive Streams semantics

@@@div { .callout }

**emits** when the current substream has an element available

**backpressures** never

**completes** upstream completes and the current substream completes

@@@


2 changes: 2 additions & 0 deletions docs/src/main/paradox/stream/operators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ See the @ref:[Substreams](../stream-substream.md) page for more detail and code
|Source/Flow|<a name="prefixandtail"></a>@ref[prefixAndTail](Source-or-Flow/prefixAndTail.md)|Take up to *n* elements from the stream (less than *n* only if the upstream completes before emitting *n* elements) and returns a pair containing a strict sequence of the taken element and a stream representing the remaining elements.|
|Source/Flow|<a name="splitafter"></a>@ref[splitAfter](Source-or-Flow/splitAfter.md)|End the current substream whenever a predicate returns `true`, starting a new substream for the next element.|
|Source/Flow|<a name="splitwhen"></a>@ref[splitWhen](Source-or-Flow/splitWhen.md)|Split off elements into a new substream whenever a predicate function return `true`.|
|Source/Flow|<a name="switchmap"></a>@ref[switchMap](Source-or-Flow/switchMap.md)|Transforms each input element into a `Source` of output elements that is then flattened into the output stream until a new input element is received.|

## Time aware operators

Expand Down Expand Up @@ -591,6 +592,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [splitWhen](Source-or-Flow/splitWhen.md)
* [statefulMap](Source-or-Flow/statefulMap.md)
* [statefulMapConcat](Source-or-Flow/statefulMapConcat.md)
* [switchMap](Source-or-Flow/switchMap.md)
* [take](Source-or-Flow/take.md)
* [takeLast](Sink/takeLast.md)
* [takeWhile](Source-or-Flow/takeWhile.md)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,32 @@ public void mustBeAbleToUseFlatMapMerge() throws Exception {
assertEquals(expected, set);
}

@Test
public void mustBeAbleToUseSwitchMap() throws Exception {
final TestKit probe = new TestKit(system);
final Iterable<Integer> mainInputs = Arrays.asList(-1, 0, 1);
final Iterable<Integer> substreamInputs = Arrays.asList(10, 11, 12, 13, 14, 15, 16, 17, 18, 19);

final Flow<Integer, Integer, NotUsed> flow =
Flow.<Integer>create()
.switchMap(
new Function<Integer, Source<Integer, NotUsed>>() {
@Override
public Source<Integer, NotUsed> apply(Integer param) throws Exception {
return param > 0
? Source.fromIterator(substreamInputs::iterator)
: Source.never();
}
});

CompletionStage<List<Integer>> future =
Source.from(mainInputs).via(flow).runWith(Sink.seq(), system);

List<Integer> result = future.toCompletableFuture().get(3, TimeUnit.SECONDS);

assertEquals(substreamInputs, result);
}

@Test
public void mustBeAbleToUseBuffer() throws Exception {
final TestKit probe = new TestKit(system);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,30 @@ public void mustBeAbleToUseFlatMapMerge() throws Exception {
assertEquals(expected, set);
}

@Test
public void mustBeAbleToUseSwitchMap() throws Exception {
final TestKit probe = new TestKit(system);
final Iterable<Integer> mainInputs = Arrays.asList(-1, 0, 1);
final Iterable<Integer> substreamInputs = Arrays.asList(10, 11, 12, 13, 14, 15, 16, 17, 18, 19);

CompletionStage<List<Integer>> future =
Source.from(mainInputs)
.switchMap(
new Function<Integer, Source<Integer, NotUsed>>() {
@Override
public Source<Integer, NotUsed> apply(Integer param) throws Exception {
return param > 0
? Source.fromIterator(substreamInputs::iterator)
: Source.never();
}
})
.runWith(Sink.seq(), system);

List<Integer> result = future.toCompletableFuture().get(3, TimeUnit.SECONDS);

assertEquals(substreamInputs, result);
}

@Test
public void mustBeAbleToUseBuffer() throws Exception {
final TestKit probe = new TestKit(system);
Expand Down
Loading
Loading