From 4252382d087cfb241c57e32be0afd9dc1c363f88 Mon Sep 17 00:00:00 2001 From: "He-Pin(kerr)" Date: Sun, 16 Feb 2025 14:27:51 +0800 Subject: [PATCH] chore: Tweak withAttribuets in Flow (#1658) --- .../stream/scaladsl/FlowStatefulMapSpec.scala | 29 ++++++-- .../stream/MapAsyncPartitionedSpec.scala | 67 ++++++++++++++++++- .../apache/pekko/stream/scaladsl/Flow.scala | 26 +++---- 3 files changed, 103 insertions(+), 19 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapSpec.scala index 2cf80a96d37..0403e62fb92 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapSpec.scala @@ -24,10 +24,7 @@ import scala.util.control.NoStackTrace import org.apache.pekko import pekko.Done -import pekko.stream.AbruptStageTerminationException -import pekko.stream.ActorAttributes -import pekko.stream.ActorMaterializer -import pekko.stream.Supervision +import pekko.stream.{ AbruptStageTerminationException, ActorAttributes, ActorMaterializer, ClosedShape, Supervision } import pekko.stream.testkit.StreamSpec import pekko.stream.testkit.TestSubscriber import pekko.stream.testkit.Utils.TE @@ -434,4 +431,28 @@ class FlowStatefulMapSpec extends StreamSpec { closedCounter.get() shouldBe 1 } } + + "support junction output ports" in { + val source = Source(List((1, 1), (2, 2))) + val g = RunnableGraph.fromGraph(GraphDSL.createGraph(TestSink.probe[(Int, Int)]) { implicit b => sink => + import GraphDSL.Implicits._ + val unzip = b.add(Unzip[Int, Int]()) + val zip = b.add(Zip[Int, Int]()) + val s = b.add(source) + // format: OFF + s ~> unzip.in + unzip.out0 ~> zip.in0 + unzip.out1 ~> zip.in1 + zip.out.statefulMap(() => None)((_, elem) => (None, elem), _ => None) ~> sink.in + // format: ON + + ClosedShape + }) + g.run() + .request(2) + .expectNext((1, 1)) + .expectNext((2, 2)) + .expectComplete() + } + } diff --git a/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala b/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala index 46800577765..a4261c8c26b 100644 --- a/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala +++ b/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala @@ -17,9 +17,23 @@ package org.apache.pekko.stream -import org.apache.pekko.actor.typed.ActorSystem -import org.apache.pekko.actor.typed.scaladsl.Behaviors -import org.apache.pekko.stream.scaladsl.{ Flow, FlowWithContext, Keep, Sink, Source, SourceWithContext } +import org.apache.pekko +import pekko.actor.typed.ActorSystem +import pekko.actor.typed.scaladsl.Behaviors +import pekko.stream.scaladsl.{ + Flow, + FlowWithContext, + GraphDSL, + Keep, + RunnableGraph, + Sink, + Source, + SourceWithContext, + Unzip, + Zip +} +import pekko.stream.testkit.scaladsl.TestSink + import org.scalacheck.{ Arbitrary, Gen } import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.ScalaFutures @@ -29,6 +43,7 @@ import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks import java.time.Instant import java.util.concurrent.Executors + import scala.annotation.nowarn import scala.concurrent.duration.{ DurationInt, FiniteDuration } import scala.concurrent.{ blocking, ExecutionContext, Future } @@ -439,6 +454,52 @@ class MapAsyncPartitionedSpec .futureValue shouldBe Seq(1 -> "A") } + it should "support junction output ports with mapAsyncPartitioned" in { + val source = Source(List((1, 1), (2, 2))) + val g = RunnableGraph.fromGraph(GraphDSL.createGraph(TestSink.probe[(Int, Int)](system.classicSystem)) { + implicit b => sink => + import GraphDSL.Implicits._ + val unzip = b.add(Unzip[Int, Int]()) + val zip = b.add(Zip[Int, Int]()) + val s = b.add(source) + // format: OFF + s ~> unzip.in + unzip.out0 ~> zip.in0 + unzip.out1 ~> zip.in1 + zip.out.mapAsyncPartitioned(1)(_ => 1)((elem, _) => Future.successful(elem)) ~> sink.in + // format: ON + ClosedShape + }) + g.run() + .request(2) + .expectNext((1, 1)) + .expectNext((2, 2)) + .expectComplete() + } + + it should "support junction output ports with mapAsyncPartitionedUnordered" in { + val source = Source(List((1, 1), (2, 2))) + val g = RunnableGraph.fromGraph(GraphDSL.createGraph(TestSink.probe[(Int, Int)](system.classicSystem)) { + implicit b => sink => + import GraphDSL.Implicits._ + val unzip = b.add(Unzip[Int, Int]()) + val zip = b.add(Zip[Int, Int]()) + val s = b.add(source) + // format: OFF + s ~> unzip.in + unzip.out0 ~> zip.in0 + unzip.out1 ~> zip.in1 + zip.out.mapAsyncPartitionedUnordered(1)(_ => 1)((elem, _) => Future.successful(elem)) ~> sink.in + // format: ON + ClosedShape + }) + g.run() + .request(2) + .expectNext((1, 1)) + .expectNext((2, 2)) + .expectComplete() + } + private implicit class MapWrapper[K, V](map: Map[K, V]) { @nowarn("msg=deprecated") def mapValues2[W](f: V => W) = map.mapValues(f) diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index e2db2e8daf9..a12a0cd9c1a 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -1149,7 +1149,8 @@ trait FlowOps[+Out, +Mat] { * @param onComplete a function that transforms the ongoing state into an optional output element */ def statefulMap[S, T](create: () => S)(f: (S, Out) => (S, T), onComplete: S => Option[T]): Repr[T] = - via(new StatefulMap[S, Out, T](create, f, onComplete).withAttributes(DefaultAttributes.statefulMap)) + via(new StatefulMap[S, Out, T](create, f, onComplete) + .withAttributes(DefaultAttributes.statefulMap and SourceLocation.forLambda(f))) /** * Transform each stream element with the help of a resource. @@ -1358,12 +1359,12 @@ trait FlowOps[+Out, +Mat] { def mapAsyncPartitioned[T, P](parallelism: Int)( partitioner: Out => P)( f: (Out, P) => Future[T]): Repr[T] = { - (if (parallelism == 1) { - via(MapAsyncUnordered(1, elem => f(elem, partitioner(elem)))) - } else { - via(new MapAsyncPartitioned(parallelism, orderedOutput = true, partitioner, f)) - }) - .withAttributes(DefaultAttributes.mapAsyncPartition and SourceLocation.forLambda(f)) + val graph: Graph[FlowShape[Out, T], _] = if (parallelism == 1) { + MapAsyncUnordered(1, elem => f(elem, partitioner(elem))) + } else { + new MapAsyncPartitioned(parallelism, orderedOutput = true, partitioner, f) + } + via(graph.withAttributes(DefaultAttributes.mapAsyncPartition and SourceLocation.forLambda(f))) } /** @@ -1396,11 +1397,12 @@ trait FlowOps[+Out, +Mat] { def mapAsyncPartitionedUnordered[T, P](parallelism: Int)( partitioner: Out => P)( f: (Out, P) => Future[T]): Repr[T] = { - (if (parallelism == 1) { - via(MapAsyncUnordered(1, elem => f(elem, partitioner(elem)))) - } else { - via(new MapAsyncPartitioned(parallelism, orderedOutput = false, partitioner, f)) - }).withAttributes(DefaultAttributes.mapAsyncPartitionUnordered and SourceLocation.forLambda(f)) + val graph: Graph[FlowShape[Out, T], _] = if (parallelism == 1) { + MapAsyncUnordered(1, elem => f(elem, partitioner(elem))) + } else { + new MapAsyncPartitioned(parallelism, orderedOutput = false, partitioner, f) + } + via(graph.withAttributes(DefaultAttributes.mapAsyncPartition and SourceLocation.forLambda(f))) } /**