Skip to content

Commit

Permalink
chore: more review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinHH committed Mar 7, 2025
1 parent dbe2998 commit 90c8ea8
Showing 1 changed file with 15 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ import pekko.util.OptionVal

override def onPush(): Unit = {
val source = grab(in)
setSource(source)
setupCurrentSource(source)
tryPull(in)
}

Expand All @@ -67,21 +67,21 @@ import pekko.util.OptionVal
if (isAvailable(out)) tryPushOut()
}

setHandler(in, this)
setHandler(out, this)
setHandlers(in, out, this)

def tryPushOut(): Unit = {
if (source.isDefined) {
val src = source.get
if (src.isAvailable) {
push(out, src.grab())
if (!src.isClosed) src.pull()
else removeCurrentSource(completeIfClosed = true)
}
source match {
case OptionVal.Some(src) =>
if (src.isAvailable) {
push(out, src.grab())
if (!src.isClosed) src.pull()
else removeCurrentSource(completeIfClosed = true)
}
case OptionVal.None =>
}
}

def setSource(source: Graph[SourceShape[T], M]): Unit = {
def setupCurrentSource(source: Graph[SourceShape[T], M]): Unit = {
cancelCurrentSource()
removeCurrentSource(completeIfClosed = false)
val sinkIn = new SubSinkInlet[T]("SwitchSink")
Expand Down Expand Up @@ -109,8 +109,10 @@ import pekko.util.OptionVal
}

private def cancelCurrentSource(): Unit = {
if (source.isDefined) {
source.get.cancel()
source match {
case OptionVal.Some(src) =>
src.cancel()
case OptionVal.None =>
}
}

Expand Down

0 comments on commit 90c8ea8

Please sign in to comment.