From 90c8ea8b7aca4ecd0c5e39b94e101174d8ddd617 Mon Sep 17 00:00:00 2001 From: Martin Hansen Date: Fri, 7 Mar 2025 16:21:59 +0100 Subject: [PATCH] chore: more review changes --- .../pekko/stream/impl/fusing/Switch.scala | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Switch.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Switch.scala index 1b154b6a4f..6d446d32c6 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Switch.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Switch.scala @@ -57,7 +57,7 @@ import pekko.util.OptionVal override def onPush(): Unit = { val source = grab(in) - setSource(source) + setupCurrentSource(source) tryPull(in) } @@ -67,21 +67,21 @@ import pekko.util.OptionVal if (isAvailable(out)) tryPushOut() } - setHandler(in, this) - setHandler(out, this) + setHandlers(in, out, this) def tryPushOut(): Unit = { - if (source.isDefined) { - val src = source.get - if (src.isAvailable) { - push(out, src.grab()) - if (!src.isClosed) src.pull() - else removeCurrentSource(completeIfClosed = true) - } + source match { + case OptionVal.Some(src) => + if (src.isAvailable) { + push(out, src.grab()) + if (!src.isClosed) src.pull() + else removeCurrentSource(completeIfClosed = true) + } + case OptionVal.None => } } - def setSource(source: Graph[SourceShape[T], M]): Unit = { + def setupCurrentSource(source: Graph[SourceShape[T], M]): Unit = { cancelCurrentSource() removeCurrentSource(completeIfClosed = false) val sinkIn = new SubSinkInlet[T]("SwitchSink") @@ -109,8 +109,10 @@ import pekko.util.OptionVal } private def cancelCurrentSource(): Unit = { - if (source.isDefined) { - source.get.cancel() + source match { + case OptionVal.Some(src) => + src.cancel() + case OptionVal.None => } }