Skip to content

Commit

Permalink
Fix propagation of interruption from CIO to ZIO (#697)
Browse files Browse the repository at this point in the history
* Fix propagation of interruption from CIO to ZIO

* Wrap method in F.delay

* Allow interruption of interruption

* Fix compiling

* Make sure to complete the interruption CB

* Simplify logic by making interruption un-cancellable
  • Loading branch information
kyri-petrou committed Jul 27, 2024
1 parent 879070e commit e8aa54d
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import zio.interop.catz.*
import zio.test.*
import zio.*

import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicBoolean

object CatsInteropSpec extends CatsRunnableSpec {
def spec: Spec[Any, Throwable] = suite("Cats interop")(
test("cats fiber wrapped in Resource can be canceled") {
Expand Down Expand Up @@ -203,6 +206,25 @@ object CatsInteropSpec extends CatsRunnableSpec {
!exception.get.getMessage.contains("Boxed Exception") &&
exception.get.getMessage.contains("The fiber was canceled")
)
}
},
test("CIO propagates interruption to ZIO") {
ZIO.succeedBlocking {
val latch = new CompletableFuture[Unit]()
val ref = new AtomicBoolean(false)
val zioF: CIO[Nothing] =
(ZIO.yieldNow.as(latch.complete(())) *> ZIO.never)
.onInterrupt(ZIO.succeed(ref.set(true)))
.toEffect[CIO]

val value = zioF.start
.productL(CIO.fromCompletableFuture(CIO(latch)))
.flatMap { (fib: cats.effect.FiberIO[Nothing]) =>
fib.cancel *> CIO(ref.get())
}
.unsafeRunSync()

assertTrue(value)
}
} @@ TestAspect.nonFlaky(1000)
)
}
24 changes: 15 additions & 9 deletions zio-interop-cats/shared/src/main/scala/zio/interop/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,27 @@ package object interop {
@inline def toEffect[F[_], R, A](rio: RIO[R, A])(implicit R: Runtime[R], F: Async[F], trace: Trace): F[A] =
F.defer {
val interrupted = new AtomicBoolean(true)
F.async[Exit[Throwable, A]] { cb =>
Unsafe.unsafe { implicit unsafe =>
val fiber = R.unsafe.fork {
F.asyncCheckAttempt[Exit[Throwable, A]] { cb =>
F.delay {
implicit val unsafe: Unsafe = Unsafe.unsafe

val out = R.unsafe.runOrFork {
signalOnNoExternalInterrupt {
rio
}(ZIO.succeed(interrupted.set(false)))
}
fiber.unsafe
.addObserver(exit => cb(Right(exit)))
val cancelerEffect = F.delay {
val _ = fiber.interrupt
out match {
case Left(fiber) =>
val completeCb = (exit: Exit[Throwable, A]) => cb(Right(exit))
fiber.unsafe.addObserver(completeCb)
Left(Some(F.async_ { cb =>
fiber.unsafe.addObserver(_ => cb(Right(())))
fiber.unsafe.removeObserver(completeCb)
fiber.tellInterrupt(Cause.interrupt(fiber.id))
}))
case Right(v) => Right(v) // No need to invoke the callback, sync resumption will take place
}
F.pure(Some(cancelerEffect))
}

}.flatMap { exit =>
toOutcomeThrowableOtherFiber(interrupted.get())(F.pure(_: A), exit) match {
case Outcome.Succeeded(fa) =>
Expand Down

0 comments on commit e8aa54d

Please sign in to comment.