Skip to content

Commit

Permalink
Implement streaming with state changes
Browse files Browse the repository at this point in the history
  • Loading branch information
nightscape committed Sep 23, 2020
1 parent dffd8d0 commit 9e45776
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 46 deletions.
53 changes: 43 additions & 10 deletions actors/src/main/scala/zio/actors/Actor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,51 @@ package zio.actors

import zio.actors.Actor.PendingMessage
import zio.clock.Clock
import zio.{ Supervisor => _, _ }
import zio.stream.ZStream
import zio.{Supervisor => _, _}

object Actor {

private[actors] type PendingMessage[F[_], A] = (F[A], Promise[Throwable, A])
trait ActorResponse[R, S, A] {
type ResponseType

def materialize(state: Ref[S], promise: Promise[Throwable, A], supervisor: Supervisor[R]): URIO[R with Clock, Unit]
}

trait LowPriorityActorResponses {
def oneTimeResponse[R, S, A](response: RIO[R, (S, A)]): ActorResponse[R, S, A] =
new ActorResponse[R, S, A] {
override type ResponseType = RIO[R, (S, A)]

override def materialize(state: Ref[S], promise: Promise[Throwable, A], supervisor: Supervisor[R]): URIO[R with Clock, Unit] = {
val completer = ((s: S, a: A) => (state.set(s) *> promise.succeed(a)).ignore).tupled
response.foldM(
e =>
supervisor
.supervise[R, (S, A)](response, e)
.foldM(_ => promise.fail(e).ignore, completer),
completer
)
}
}
}

object ActorResponse extends LowPriorityActorResponses {
type Aux[R, S, A, T] = ActorResponse[R, S, A] {
type ResponseType = T
}
def streamingResponse[R, S, A](response: ZStream[R, Throwable, (S, A)]): ActorResponse[R, S, ZStream[R, Throwable, A]] =
new ActorResponse[R, S, ZStream[R, Throwable, A]] {
override type ResponseType = ZStream[R, Throwable, (S, A)]

override def materialize(state: Ref[S], promise: Promise[Throwable, ZStream[R, Throwable, A]], supervisor: Supervisor[R]): URIO[R with Clock, Unit] = {
// TODO: Supervision
val outputStream = response.mapM { case (s, a) => state.set(s) *> UIO(a) }
promise.succeed(outputStream).ignore
}
}
}

/**
* Description of actor behavior (can act as FSM)
Expand All @@ -26,7 +66,7 @@ object Actor {
* @tparam A - domain of return entities
* @return effectful result
*/
def receive[A](state: S, msg: F[A], context: Context): RIO[R, (S, A)]
def receive[A](state: S, msg: F[A], context: Context): ActorResponse[R, S, A]

/* INTERNAL API */

Expand All @@ -42,14 +82,7 @@ object Actor {
s <- state.get
(fa, promise) = msg
receiver = receive(s, fa, context)
completer = ((s: S, a: A) => state.set(s) *> promise.succeed(a)).tupled
_ <- receiver.foldM(
e =>
supervisor
.supervise(receiver, e)
.foldM(_ => promise.fail(e), completer),
completer
)
_ <- receiver.materialize(state, promise, supervisor)
} yield ()

for {
Expand Down
2 changes: 1 addition & 1 deletion actors/src/main/scala/zio/actors/Supervisor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package zio.actors
import zio.clock.Clock
import zio.{ IO, RIO, Schedule, URIO, ZIO }

private[actors] trait Supervisor[-R] {
trait Supervisor[-R] {
def supervise[R0 <: R, A](zio: RIO[R0, A], error: Throwable): ZIO[R0 with Clock, Unit, A]
}

Expand Down
45 changes: 23 additions & 22 deletions actors/src/test/scala/zio/actors/ActorsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package zio.actors

import java.util.concurrent.atomic.AtomicBoolean

import zio.actors.Actor.Stateful
import zio.stream.Stream
import zio.{ Chunk, IO, Ref, Schedule, Task, UIO }
import zio.actors.Actor.ActorResponse._
import zio.actors.Actor.{ActorResponse, Stateful}
import zio.stream.{Stream, ZStream}
import zio.{Chunk, IO, Ref, Schedule, UIO}
import zio.test._
import zio.test.Assertion._

Expand All @@ -13,7 +14,7 @@ object CounterUtils {
case object Reset extends Message[Unit]
case object Increase extends Message[Unit]
case object Get extends Message[Int]
case class IncreaseUpTo(upper: Int) extends Message[Stream[Nothing, Int]]
case class IncreaseUpTo(upper: Int) extends Message[ZStream[Any, Throwable, Int]]
}

object TickUtils {
Expand All @@ -37,12 +38,12 @@ object ActorsSpec extends DefaultRunnableSpec {
state: Int,
msg: Message[A],
context: Context
): UIO[(Int, A)] =
): ActorResponse[Any, Int, A] =
msg match {
case Reset => UIO((0, ()))
case Increase => UIO((state + 1, ()))
case Get => UIO((state, state))
case IncreaseUpTo(upper) => UIO((upper, Stream.fromIterable(state until upper)))
case Reset => oneTimeResponse(UIO((0, ())))
case Increase => oneTimeResponse(UIO((state + 1, ())))
case Get => oneTimeResponse(UIO((state, state)))
case IncreaseUpTo(upper) => streamingResponse[Any, Int, Int](Stream.fromIterable((state to upper).map(i => (i, i)))).asInstanceOf[ActorResponse[Any, Int, A]]
}
}

Expand All @@ -55,11 +56,11 @@ object ActorsSpec extends DefaultRunnableSpec {
_ <- actor ! Reset
c2 <- actor ? Get
c3 <- actor ? IncreaseUpTo(20)
vals <- c3.runCollect
vals <- c3.mapM(i => (actor ? Get).map(i2 => (i, i2))).runCollect
c4 <- actor ? Get
} yield assert(c1)(equalTo(2)) &&
assert(c2)(equalTo(0)) &&
assert(vals)(equalTo(Chunk.apply(0 until 20: _*))) &&
assert(vals)(equalTo(Chunk.apply((0 to 20).map(i => (i, i)): _*))) &&
assert(c4)(equalTo(20))
},
testM("Error recovery by retrying") {
Expand All @@ -73,15 +74,15 @@ object ActorsSpec extends DefaultRunnableSpec {
state: Unit,
msg: Message[A],
context: Context
): Task[(Unit, A)] =
): ActorResponse[Any, Unit, A] =
msg match {
case Tick =>
ref
oneTimeResponse(ref
.updateAndGet(_ + 1)
.flatMap { v =>
if (v < maxRetries) IO.fail(new Exception("fail"))
else IO.succeed((state, state))
}
})
}
}

Expand All @@ -104,9 +105,9 @@ object ActorsSpec extends DefaultRunnableSpec {
state: Unit,
msg: Message[A],
context: Context
): IO[Throwable, (Unit, A)] =
): ActorResponse[Any, Unit, A] =
msg match {
case Tick => IO.fail(new Exception("fail"))
case Tick => oneTimeResponse(IO.fail(new Exception("fail")))
}
}

Expand Down Expand Up @@ -134,9 +135,9 @@ object ActorsSpec extends DefaultRunnableSpec {
state: Unit,
msg: Msg[A],
context: Context
): IO[Throwable, (Unit, A)] =
): ActorResponse[Any, Unit, A] =
msg match {
case Letter => IO.succeed(((), ()))
case Letter => oneTimeResponse(IO.succeed(((), ())))
}
}
for {
Expand All @@ -158,9 +159,9 @@ object ActorsSpec extends DefaultRunnableSpec {
state: Unit,
msg: Message[A],
context: Context
): IO[Throwable, (Unit, A)] =
): ActorResponse[Any, Unit, A] =
msg match {
case Tick => IO.succeed(((), ()))
case Tick => oneTimeResponse(IO.succeed(((), ())))
}
}
for {
Expand All @@ -179,9 +180,9 @@ object ActorsSpec extends DefaultRunnableSpec {
state: Unit,
msg: Message[A],
context: Context
): IO[Throwable, (Unit, A)] =
): ActorResponse[Any, Unit, A] =
msg match {
case Tick => IO.succeed(((), ()))
case Tick => oneTimeResponse(IO.succeed(((), ())))
}
}

Expand Down
27 changes: 14 additions & 13 deletions actors/src/test/scala/zio/actors/RemoteSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package zio.actors
import java.io.File
import java.net.ConnectException

import zio.actors.Actor.Stateful
import zio.{ clock, console, IO }
import zio.actors.Actor.{ActorResponse, Stateful}
import zio.{IO, clock, console}
import zio.test.DefaultRunnableSpec
import zio.test._
import zio.test.Assertion._
import zio.duration._
import zio.test.environment.TestConsole
import SpecUtils._
import zio.actors.Actor.ActorResponse.oneTimeResponse

object SpecUtils {
sealed trait Message[+A]
Expand All @@ -24,10 +25,10 @@ object SpecUtils {
state: Int,
msg: Message[A],
context: Context
): IO[MyErrorDomain, (Int, A)] =
): ActorResponse[Any, Int, A] =
msg match {
case Str(value) =>
IO.effectTotal((state + 1, value + "received plus " + state + 1))
oneTimeResponse(IO.effectTotal((state + 1, value + "received plus " + state + 1)))
}
}

Expand All @@ -41,27 +42,27 @@ object SpecUtils {
state: Unit,
msg: PingPongProto[A],
context: Context
): IO[Throwable, (Unit, A)] =
): ActorResponse[Any, Unit, A] =
msg match {
case Ping(sender) =>
(for {
ActorResponse.oneTimeResponse((for {
path <- sender.path
_ <- console.putStrLn(s"Ping from: $path, sending pong")
_ <- sender ! Pong
} yield ((), ())).asInstanceOf[IO[Throwable, (Unit, A)]]
} yield ((), ())).asInstanceOf[IO[Throwable, (Unit, A)]])

case Pong =>
(for {
ActorResponse.oneTimeResponse((for {
_ <- console.putStrLn("Received pong")
_ <- IO.succeed(1)
} yield ((), ())).asInstanceOf[IO[Throwable, (Unit, A)]]
} yield ((), ())).asInstanceOf[IO[Throwable, (Unit, A)]])

case GameInit(to) =>
(for {
ActorResponse.oneTimeResponse((for {
_ <- console.putStrLn("The game starts...")
self <- context.self[PingPongProto]
_ <- to ! Ping(self)
} yield ((), ())).asInstanceOf[IO[Throwable, (Unit, A)]]
} yield ((), ())).asInstanceOf[IO[Throwable, (Unit, A)]])
}
}

Expand All @@ -73,9 +74,9 @@ object SpecUtils {
state: Unit,
msg: ErrorProto[A],
context: Context
): IO[Throwable, (Unit, A)] =
): ActorResponse[Any, Unit, A] =
msg match {
case UnsafeMessage => IO.fail(new Exception("Error on remote side"))
case UnsafeMessage => oneTimeResponse(IO.fail(new Exception("Error on remote side")))
}
}

Expand Down

0 comments on commit 9e45776

Please sign in to comment.