From 5e5b2a57d99d11155d662e6362654b881669cc63 Mon Sep 17 00:00:00 2001 From: "Andy.Chen" Date: Fri, 30 Aug 2024 01:12:14 +0800 Subject: [PATCH 1/3] feat: avoid nesting in Behavior Factories --- .../pekko/actor/typed/javadsl/Behaviors.scala | 23 ++++++++++++++++++ .../actor/typed/scaladsl/Behaviors.scala | 24 +++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/javadsl/Behaviors.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/javadsl/Behaviors.scala index c9d4facd35..4ab2efff2c 100644 --- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/javadsl/Behaviors.scala +++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/javadsl/Behaviors.scala @@ -58,6 +58,17 @@ object Behaviors { factory(StashBufferImpl[T](ctx.asScala, capacity)) }) + /** + * Support for stashing messages to unstash at a later time. + * + * @since 1.1.0 + */ + def withStashSetup[T](capacity: Int, + factory: java.util.function.BiFunction[StashBuffer[T], ActorContext[T], Behavior[T]]): Behavior[T] = + setup(ctx => { + factory(StashBufferImpl[T](ctx.asScala, capacity), ctx.asJava) + }) + /** * Return this behavior from message processing in order to advise the * system to reuse the previous behavior. This is provided in order to @@ -345,6 +356,18 @@ object Behaviors { def withTimers[T](factory: pekko.japi.function.Function[TimerScheduler[T], Behavior[T]]): Behavior[T] = TimerSchedulerImpl.withTimers(timers => factory.apply(timers)) + /** + * Support for scheduled `self` messages in an actor. + * It takes care of the lifecycle of the timers such as cancelling them when the actor + * is restarted or stopped. + * + * @see [[TimerScheduler]] + * @since 1.1.0 + */ + def withTimersSetup[T]( + factory: pekko.japi.function.Function2[TimerScheduler[T], ActorContext[T], Behavior[T]]): Behavior[T] = + setup(ctx => TimerSchedulerImpl.withTimers(timers => factory.apply(timers, ctx))) + /** * Per message MDC (Mapped Diagnostic Context) logging. * diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/scaladsl/Behaviors.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/scaladsl/Behaviors.scala index 93c71616c7..2d15fb31cc 100644 --- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/scaladsl/Behaviors.scala +++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/scaladsl/Behaviors.scala @@ -48,6 +48,17 @@ object Behaviors { factory(stash) }) + /** + * Support for stashing messages to unstash at a later time. + * + * @since 1.1.0 + */ + def withStashSetup[T](capacity: Int)(factory: (StashBuffer[T], ActorContext[T]) => Behavior[T]): Behavior[T] = + setup(ctx => { + val stash = StashBuffer[T](ctx, capacity) + factory(stash, ctx) + }) + /** * Return this behavior from message processing in order to advise the * system to reuse the previous behavior. This is provided in order to @@ -269,6 +280,19 @@ object Behaviors { def withTimers[T](factory: TimerScheduler[T] => Behavior[T]): Behavior[T] = TimerSchedulerImpl.withTimers(factory) + /** + * Support for scheduled `self` messages in an actor. + * It takes care of the lifecycle of the timers such as cancelling them when the actor + * is restarted or stopped. + * + * @see [[TimerScheduler]] + * @since 1.1.0 + */ + def withTimersSetup[T](factory: (TimerScheduler[T], ActorContext[T]) => Behavior[T]): Behavior[T] = + setup(ctx => { + TimerSchedulerImpl.withTimers(factory(_, ctx)) + }) + /** * Per message MDC (Mapped Diagnostic Context) logging. * From 980333fc2094c5cd01986ab1dafbd6e7a5f96f55 Mon Sep 17 00:00:00 2001 From: "Andy.Chen" Date: Mon, 2 Sep 2024 15:43:04 +0800 Subject: [PATCH 2/3] update --- .../scala/org/apache/pekko/actor/typed/javadsl/Behaviors.scala | 2 +- .../scala/org/apache/pekko/actor/typed/scaladsl/Behaviors.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/javadsl/Behaviors.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/javadsl/Behaviors.scala index 4ab2efff2c..f520d436e0 100644 --- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/javadsl/Behaviors.scala +++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/javadsl/Behaviors.scala @@ -366,7 +366,7 @@ object Behaviors { */ def withTimersSetup[T]( factory: pekko.japi.function.Function2[TimerScheduler[T], ActorContext[T], Behavior[T]]): Behavior[T] = - setup(ctx => TimerSchedulerImpl.withTimers(timers => factory.apply(timers, ctx))) + setup(ctx => TimerSchedulerImpl.wrapWithTimers(timer => factory.apply(timer, ctx.asJava))(ctx.asScala)) /** * Per message MDC (Mapped Diagnostic Context) logging. diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/scaladsl/Behaviors.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/scaladsl/Behaviors.scala index 2d15fb31cc..2439079d09 100644 --- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/scaladsl/Behaviors.scala +++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/scaladsl/Behaviors.scala @@ -290,7 +290,7 @@ object Behaviors { */ def withTimersSetup[T](factory: (TimerScheduler[T], ActorContext[T]) => Behavior[T]): Behavior[T] = setup(ctx => { - TimerSchedulerImpl.withTimers(factory(_, ctx)) + TimerSchedulerImpl.wrapWithTimers(factory(_, ctx))(ctx) }) /** From 6b4310bc6e5ed86a116ce8ded52cd9e9a76f4a76 Mon Sep 17 00:00:00 2001 From: "Andy.Chen" Date: Tue, 3 Sep 2024 10:40:12 +0800 Subject: [PATCH 3/3] add compile tests --- .../actor/typed/javadsl/ActorCompile.java | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/javadsl/ActorCompile.java b/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/javadsl/ActorCompile.java index 847ed5aba5..81126cf52c 100644 --- a/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/javadsl/ActorCompile.java +++ b/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/javadsl/ActorCompile.java @@ -151,6 +151,16 @@ public Behavior aroundSignal( }); } + { + Behavior b = + Behaviors.withTimersSetup( + (timers, ctx) -> { + timers.startSingleTimer("key", new MyMsgB("tick"), Duration.ofSeconds(1)); + ctx.scheduleOnce(Duration.ofSeconds(1), ctx.getSelf(), new MyMsgB("tick")); + return Behaviors.ignore(); + }); + } + static class MyBehavior extends ExtensibleBehavior { @Override @@ -233,4 +243,20 @@ public Behavior receive(TypedActorContext context, MyMsg message) return Behaviors.empty(); }); } + // stash buffer with setup + { + Behavior behavior = + Behaviors.withStashSetup( + 5, + (stash, ctx) -> { + stash.forEach( + msg -> { + // checked is ok + throw new Exception("checked"); + }); + + ctx.scheduleOnce(Duration.ofSeconds(1), ctx.getSelf(), "tick"); + return Behaviors.empty(); + }); + } }