Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,22 @@
*/
package io.serverlessworkflow.impl.executors.func;

import io.serverlessworkflow.api.types.func.TypedPredicate;
import io.serverlessworkflow.impl.WorkflowModel;
import java.util.Optional;
import java.util.function.Predicate;

public class JavaFuncUtils {

static Object safeObject(Object obj) {
return obj instanceof WorkflowModel model ? model.asJavaObject() : obj;
}

@SuppressWarnings({"unchecked", "rawtypes"})
static Object predObject(Predicate<?> pred, Optional<Class<?>> predClass) {
return predClass.isPresent() ? new TypedPredicate(pred, predClass.orElseThrow()) : pred;
}

static <T> T convertT(WorkflowModel model, Optional<Class<T>> inputClass) {
return inputClass
.map(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.serverlessworkflow.impl.executors.func;

import static io.serverlessworkflow.impl.executors.func.JavaFuncUtils.predObject;

import io.serverlessworkflow.api.types.ListenTask;
import io.serverlessworkflow.api.types.Until;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.api.types.func.UntilPredicate;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.WorkflowMutablePosition;
import io.serverlessworkflow.impl.WorkflowPredicate;
import io.serverlessworkflow.impl.executors.ListenExecutor.ListenExecutorBuilder;
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
import io.serverlessworkflow.impl.resources.ResourceLoader;

public class JavaListenExecutorBuilder extends ListenExecutorBuilder {

protected JavaListenExecutorBuilder(
WorkflowMutablePosition position,
ListenTask task,
Workflow workflow,
WorkflowApplication application,
ResourceLoader resourceLoader) {
super(position, task, workflow, application, resourceLoader);
}

@Override
protected WorkflowPredicate buildUntilPredicate(Until until) {
return until instanceof UntilPredicate untilPred && untilPred.predicate() != null
? application
.expressionFactory()
.buildPredicate(
ExpressionDescriptor.object(
predObject(untilPred.predicate(), untilPred.predicateClass())))
: super.buildUntilPredicate(until);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@

package io.serverlessworkflow.impl.executors.func;

import static io.serverlessworkflow.impl.executors.func.JavaFuncUtils.predObject;

import io.serverlessworkflow.api.types.SwitchCase;
import io.serverlessworkflow.api.types.SwitchTask;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.api.types.func.SwitchCaseFunction;
import io.serverlessworkflow.api.types.func.TypedPredicate;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.WorkflowMutablePosition;
import io.serverlessworkflow.impl.WorkflowPredicate;
import io.serverlessworkflow.impl.executors.SwitchExecutor.SwitchExecutorBuilder;
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
import io.serverlessworkflow.impl.resources.ResourceLoader;
import java.util.Optional;
import java.util.function.Predicate;

public class JavaSwitchExecutorBuilder extends SwitchExecutorBuilder {

Expand All @@ -52,9 +52,4 @@ protected Optional<WorkflowPredicate> buildFilter(SwitchCase switchCase) {
predObject(function.predicate(), function.predicateClass()))))
: super.buildFilter(switchCase);
}

@SuppressWarnings({"unchecked", "rawtypes"})
private Object predObject(Predicate<?> pred, Optional<Class<?>> predClass) {
return predClass.isPresent() ? new TypedPredicate(pred, predClass.orElseThrow()) : pred;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public TaskExecutorBuilder<? extends TaskBase> getTaskExecutor(
} else if (task.getSwitchTask() != null) {
return new JavaSwitchExecutorBuilder(
position, task.getSwitchTask(), workflow, application, resourceLoader);
} else if (task.getListenTask() != null) {
return new JavaListenExecutorBuilder(
position, task.getListenTask(), workflow, application, resourceLoader);
} else {
return super.getTaskExecutor(position, task, workflow, application, resourceLoader);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.api.types.func;

import io.serverlessworkflow.api.types.Until;
import java.util.Optional;
import java.util.function.Predicate;

public class UntilPredicate extends Until {

private Predicate<?> predicate;
private Optional<Class<?>> predicateClass;

public <T> Until withFunction(Predicate<T> predicate) {
this.predicate = predicate;
this.predicateClass = Optional.empty();
return this;
}

public <T> Until withFunction(Predicate<T> predicate, Class<T> clazz) {
this.predicate = predicate;
this.predicateClass = Optional.ofNullable(clazz);
return this;
}

public Predicate<?> predicate() {
return predicate;
}

public Optional<Class<?>> predicateClass() {
return predicateClass;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,17 @@ protected ListenExecutorBuilder(
registrations = anyEvents(any);
Until untilDesc = any.getUntil();
if (untilDesc != null) {
if (untilDesc.getAnyEventUntilCondition() != null) {
until =
WorkflowUtils.buildPredicate(application, untilDesc.getAnyEventUntilCondition());
} else if (untilDesc.getAnyEventUntilConsumed() != null) {
EventConsumptionStrategy strategy = untilDesc.getAnyEventUntilConsumed();
if (strategy.getAllEventConsumptionStrategy() != null) {
untilRegistrations = allEvents(strategy.getAllEventConsumptionStrategy());
} else if (strategy.getAnyEventConsumptionStrategy() != null) {
untilRegistrations = anyEvents(strategy.getAnyEventConsumptionStrategy());
} else if (strategy.getOneEventConsumptionStrategy() != null) {
untilRegistrations = oneEvent(strategy.getOneEventConsumptionStrategy());
until = buildUntilPredicate(untilDesc);
if (until == null) {
if (untilDesc.getAnyEventUntilConsumed() != null) {
EventConsumptionStrategy strategy = untilDesc.getAnyEventUntilConsumed();
if (strategy.getAllEventConsumptionStrategy() != null) {
untilRegistrations = allEvents(strategy.getAllEventConsumptionStrategy());
} else if (strategy.getAnyEventConsumptionStrategy() != null) {
untilRegistrations = anyEvents(strategy.getAnyEventConsumptionStrategy());
} else if (strategy.getOneEventConsumptionStrategy() != null) {
untilRegistrations = oneEvent(strategy.getOneEventConsumptionStrategy());
}
}
}
}
Expand All @@ -136,6 +136,12 @@ protected ListenExecutorBuilder(
}
}

protected WorkflowPredicate buildUntilPredicate(Until until) {
return until.getAnyEventUntilCondition() != null
? WorkflowUtils.buildPredicate(application, until.getAnyEventUntilCondition())
: null;
}

private Collection<EventRegistrationBuilder> registerToAll() {
return application.eventConsumer().listenToAll(application);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ void testSuspendResumeWait()
CompletableFuture<WorkflowModel> future = instance.start();
instance.suspend();
assertThat(instance.status()).isEqualTo(WorkflowStatus.WAITING);
Thread.sleep(500);
Thread.sleep(550);
assertThat(instance.status()).isEqualTo(WorkflowStatus.SUSPENDED);
instance.resume();
assertThat(future.get(1, TimeUnit.SECONDS).asMap().orElseThrow())
Expand Down