From 92cbe0733cac11333c0a1e080ded57948e7bdaab Mon Sep 17 00:00:00 2001 From: fjtirado Date: Wed, 13 Aug 2025 12:22:09 +0200 Subject: [PATCH] Add Until listen predicate Signed-off-by: fjtirado --- .../impl/executors/func/JavaFuncUtils.java | 7 +++ .../func/JavaListenExecutorBuilder.java | 53 +++++++++++++++++++ .../func/JavaSwitchExecutorBuilder.java | 9 +--- .../func/JavaTaskExecutorFactory.java | 3 ++ .../api/types/func/UntilPredicate.java | 46 ++++++++++++++++ .../impl/executors/ListenExecutor.java | 28 ++++++---- .../impl/LifeCycleEventsTest.java | 2 +- 7 files changed, 129 insertions(+), 19 deletions(-) create mode 100644 experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaListenExecutorBuilder.java create mode 100644 experimental/types/src/main/java/io/serverlessworkflow/api/types/func/UntilPredicate.java diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFuncUtils.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFuncUtils.java index ed42bf50..608cf8f8 100644 --- a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFuncUtils.java +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFuncUtils.java @@ -15,8 +15,10 @@ */ package io.serverlessworkflow.impl.executors.func; +import io.serverlessworkflow.api.types.func.TypedPredicate; import io.serverlessworkflow.impl.WorkflowModel; import java.util.Optional; +import java.util.function.Predicate; public class JavaFuncUtils { @@ -24,6 +26,11 @@ static Object safeObject(Object obj) { return obj instanceof WorkflowModel model ? model.asJavaObject() : obj; } + @SuppressWarnings({"unchecked", "rawtypes"}) + static Object predObject(Predicate pred, Optional> predClass) { + return predClass.isPresent() ? new TypedPredicate(pred, predClass.orElseThrow()) : pred; + } + static T convertT(WorkflowModel model, Optional> inputClass) { return inputClass .map( diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaListenExecutorBuilder.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaListenExecutorBuilder.java new file mode 100644 index 00000000..3471cee3 --- /dev/null +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaListenExecutorBuilder.java @@ -0,0 +1,53 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.serverlessworkflow.impl.executors.func; + +import static io.serverlessworkflow.impl.executors.func.JavaFuncUtils.predObject; + +import io.serverlessworkflow.api.types.ListenTask; +import io.serverlessworkflow.api.types.Until; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.api.types.func.UntilPredicate; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowMutablePosition; +import io.serverlessworkflow.impl.WorkflowPredicate; +import io.serverlessworkflow.impl.executors.ListenExecutor.ListenExecutorBuilder; +import io.serverlessworkflow.impl.expressions.ExpressionDescriptor; +import io.serverlessworkflow.impl.resources.ResourceLoader; + +public class JavaListenExecutorBuilder extends ListenExecutorBuilder { + + protected JavaListenExecutorBuilder( + WorkflowMutablePosition position, + ListenTask task, + Workflow workflow, + WorkflowApplication application, + ResourceLoader resourceLoader) { + super(position, task, workflow, application, resourceLoader); + } + + @Override + protected WorkflowPredicate buildUntilPredicate(Until until) { + return until instanceof UntilPredicate untilPred && untilPred.predicate() != null + ? application + .expressionFactory() + .buildPredicate( + ExpressionDescriptor.object( + predObject(untilPred.predicate(), untilPred.predicateClass()))) + : super.buildUntilPredicate(until); + } +} diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaSwitchExecutorBuilder.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaSwitchExecutorBuilder.java index 4ad24fc3..2fe04b9c 100644 --- a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaSwitchExecutorBuilder.java +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaSwitchExecutorBuilder.java @@ -16,11 +16,12 @@ package io.serverlessworkflow.impl.executors.func; +import static io.serverlessworkflow.impl.executors.func.JavaFuncUtils.predObject; + import io.serverlessworkflow.api.types.SwitchCase; import io.serverlessworkflow.api.types.SwitchTask; import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.api.types.func.SwitchCaseFunction; -import io.serverlessworkflow.api.types.func.TypedPredicate; import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.WorkflowPredicate; @@ -28,7 +29,6 @@ import io.serverlessworkflow.impl.expressions.ExpressionDescriptor; import io.serverlessworkflow.impl.resources.ResourceLoader; import java.util.Optional; -import java.util.function.Predicate; public class JavaSwitchExecutorBuilder extends SwitchExecutorBuilder { @@ -52,9 +52,4 @@ protected Optional buildFilter(SwitchCase switchCase) { predObject(function.predicate(), function.predicateClass())))) : super.buildFilter(switchCase); } - - @SuppressWarnings({"unchecked", "rawtypes"}) - private Object predObject(Predicate pred, Optional> predClass) { - return predClass.isPresent() ? new TypedPredicate(pred, predClass.orElseThrow()) : pred; - } } diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaTaskExecutorFactory.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaTaskExecutorFactory.java index b9c7fa7b..1025e6f6 100644 --- a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaTaskExecutorFactory.java +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaTaskExecutorFactory.java @@ -38,6 +38,9 @@ public TaskExecutorBuilder getTaskExecutor( } else if (task.getSwitchTask() != null) { return new JavaSwitchExecutorBuilder( position, task.getSwitchTask(), workflow, application, resourceLoader); + } else if (task.getListenTask() != null) { + return new JavaListenExecutorBuilder( + position, task.getListenTask(), workflow, application, resourceLoader); } else { return super.getTaskExecutor(position, task, workflow, application, resourceLoader); } diff --git a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/UntilPredicate.java b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/UntilPredicate.java new file mode 100644 index 00000000..3d576fd1 --- /dev/null +++ b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/UntilPredicate.java @@ -0,0 +1,46 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.api.types.func; + +import io.serverlessworkflow.api.types.Until; +import java.util.Optional; +import java.util.function.Predicate; + +public class UntilPredicate extends Until { + + private Predicate predicate; + private Optional> predicateClass; + + public Until withFunction(Predicate predicate) { + this.predicate = predicate; + this.predicateClass = Optional.empty(); + return this; + } + + public Until withFunction(Predicate predicate, Class clazz) { + this.predicate = predicate; + this.predicateClass = Optional.ofNullable(clazz); + return this; + } + + public Predicate predicate() { + return predicate; + } + + public Optional> predicateClass() { + return predicateClass; + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java index 4e842c6d..06772d44 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java @@ -100,17 +100,17 @@ protected ListenExecutorBuilder( registrations = anyEvents(any); Until untilDesc = any.getUntil(); if (untilDesc != null) { - if (untilDesc.getAnyEventUntilCondition() != null) { - until = - WorkflowUtils.buildPredicate(application, untilDesc.getAnyEventUntilCondition()); - } else if (untilDesc.getAnyEventUntilConsumed() != null) { - EventConsumptionStrategy strategy = untilDesc.getAnyEventUntilConsumed(); - if (strategy.getAllEventConsumptionStrategy() != null) { - untilRegistrations = allEvents(strategy.getAllEventConsumptionStrategy()); - } else if (strategy.getAnyEventConsumptionStrategy() != null) { - untilRegistrations = anyEvents(strategy.getAnyEventConsumptionStrategy()); - } else if (strategy.getOneEventConsumptionStrategy() != null) { - untilRegistrations = oneEvent(strategy.getOneEventConsumptionStrategy()); + until = buildUntilPredicate(untilDesc); + if (until == null) { + if (untilDesc.getAnyEventUntilConsumed() != null) { + EventConsumptionStrategy strategy = untilDesc.getAnyEventUntilConsumed(); + if (strategy.getAllEventConsumptionStrategy() != null) { + untilRegistrations = allEvents(strategy.getAllEventConsumptionStrategy()); + } else if (strategy.getAnyEventConsumptionStrategy() != null) { + untilRegistrations = anyEvents(strategy.getAnyEventConsumptionStrategy()); + } else if (strategy.getOneEventConsumptionStrategy() != null) { + untilRegistrations = oneEvent(strategy.getOneEventConsumptionStrategy()); + } } } } @@ -136,6 +136,12 @@ protected ListenExecutorBuilder( } } + protected WorkflowPredicate buildUntilPredicate(Until until) { + return until.getAnyEventUntilCondition() != null + ? WorkflowUtils.buildPredicate(application, until.getAnyEventUntilCondition()) + : null; + } + private Collection registerToAll() { return application.eventConsumer().listenToAll(application); } diff --git a/impl/jackson/src/test/java/io/serverlessworkflow/impl/LifeCycleEventsTest.java b/impl/jackson/src/test/java/io/serverlessworkflow/impl/LifeCycleEventsTest.java index 56b7bc5d..47b02e74 100644 --- a/impl/jackson/src/test/java/io/serverlessworkflow/impl/LifeCycleEventsTest.java +++ b/impl/jackson/src/test/java/io/serverlessworkflow/impl/LifeCycleEventsTest.java @@ -117,7 +117,7 @@ void testSuspendResumeWait() CompletableFuture future = instance.start(); instance.suspend(); assertThat(instance.status()).isEqualTo(WorkflowStatus.WAITING); - Thread.sleep(500); + Thread.sleep(550); assertThat(instance.status()).isEqualTo(WorkflowStatus.SUSPENDED); instance.resume(); assertThat(future.get(1, TimeUnit.SECONDS).asMap().orElseThrow())