Skip to content

Commit

Permalink
chore: Handle NormalShutdownReason in MergeHub (#1741)
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin authored Mar 3, 2025
1 parent 9d1516f commit ba05791
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.pekko
import pekko.Done
import pekko.stream.KillSwitches
import pekko.stream.ThrottleMode
import pekko.stream.impl.ActorPublisher
import pekko.stream.testkit.StreamSpec
import pekko.stream.testkit.TestPublisher
import pekko.stream.testkit.TestSubscriber
Expand Down Expand Up @@ -51,6 +52,16 @@ class HubSpec extends StreamSpec {
upstream.expectCancellation()
}

"do not throw exceptions when upstream completes normally" in {
EventFilter.error("Upstream producer failed with exception, removing from MergeHub now",
occurrences = 0).intercept {
val (sink, result) = MergeHub.source[Int](16).take(10).toMat(Sink.seq)(Keep.both).run()
Source.failed(ActorPublisher.NormalShutdownReason).runWith(sink)
Source(1 to 10).runWith(sink)
result.futureValue.sorted should ===(1 to 10)
}
}

"notify existing producers if consumer cancels after a few elements" in {
val (sink, result) = MergeHub.source[Int](16).take(5).toMat(Sink.seq)(Keep.both).run()
val upstream = TestPublisher.probe[Int]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import pekko.annotation.InternalApi
import pekko.dispatch.AbstractNodeQueue
import pekko.stream._
import pekko.stream.Attributes.LogLevels
import pekko.stream.impl.ActorPublisher
import pekko.stream.stage._

/**
Expand Down Expand Up @@ -356,7 +357,8 @@ private[pekko] class MergeHub[T](perProducerBufferSize: Int, drainingEnabled: Bo

// Make some noise
override def onUpstreamFailure(ex: Throwable): Unit = {
throw new MergeHub.ProducerFailed(
if (ex eq ActorPublisher.NormalShutdownReason) completeStage()
else throw new MergeHub.ProducerFailed(
"Upstream producer failed with exception, " +
"removing from MergeHub now",
ex)
Expand Down

0 comments on commit ba05791

Please sign in to comment.