Skip to content

Commit c106744

Browse files
committed
Add listen tasks
Signed-off-by: Ricardo Zanini <[email protected]>
1 parent 7df70a2 commit c106744

File tree

24 files changed

+614
-330
lines changed

24 files changed

+614
-330
lines changed

experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelCollection.java

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,12 @@
3131
public class AgenticModelCollection extends JavaModelCollection {
3232

3333
private final AgenticScope agenticScope;
34-
private final ObjectMapper mapper = new ObjectMapper();
34+
private final AgenticScopeCloudEventsHandler ceHandler;
3535

36-
AgenticModelCollection(AgenticScope agenticScope) {
36+
AgenticModelCollection(AgenticScope agenticScope, AgenticScopeCloudEventsHandler ceHandler) {
3737
super(Collections.emptyList());
3838
this.agenticScope = agenticScope;
39+
this.ceHandler = ceHandler;
3940
}
4041

4142
@Override
@@ -47,18 +48,9 @@ public boolean add(WorkflowModel e) {
4748
}
4849

4950
// Update the agenticScope with the event body, so agents can use the event data as input
50-
Object javaObj = e.asJavaObject();
51-
try {
52-
if (javaObj instanceof CloudEvent ce && ce.getData() != null) {
53-
agenticScope.writeStates(
54-
mapper.readValue(ce.getData().toBytes(), new TypeReference<>() {}));
55-
} else if (javaObj instanceof CloudEventData ced) {
56-
agenticScope.writeStates(mapper.readValue(ced.toBytes(), new TypeReference<>() {}));
57-
} else {
58-
agenticScope.writeState(AgenticModelFactory.DEFAULT_AGENTIC_SCOPE_STATE_KEY, javaObj);
59-
}
60-
} catch (IOException ex) {
61-
throw new IllegalArgumentException("Unable to parse CloudEvent data as JSON", ex);
51+
Object value = e.asJavaObject();
52+
if (!ceHandler.writeStateIfCloudEvent(this.agenticScope, value)) {
53+
this.agenticScope.writeState(AgenticModelFactory.DEFAULT_AGENTIC_SCOPE_STATE_KEY, value);
6254
}
6355

6456
// add to the collection

experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java

Lines changed: 23 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -30,35 +30,31 @@ class AgenticModelFactory implements WorkflowModelFactory {
3030
static final String DEFAULT_AGENTIC_SCOPE_STATE_KEY = "input";
3131
private final AgenticScopeRegistryAssessor scopeRegistryAssessor =
3232
new AgenticScopeRegistryAssessor();
33+
private final AgenticScopeCloudEventsHandler scopeCloudEventsHandler = new AgenticScopeCloudEventsHandler();
3334

34-
private void updateAgenticScope(Object value) {
35-
this.scopeRegistryAssessor.getAgenticScope().writeState(DEFAULT_AGENTIC_SCOPE_STATE_KEY, value);
36-
}
35+
@SuppressWarnings("unchecked")
36+
private AgenticModel newAgenticModel(Object state) {
37+
if (state == null) {
38+
return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), null);
39+
}
3740

38-
private void updateAgenticScope(Map<String, Object> state) {
39-
this.scopeRegistryAssessor.getAgenticScope().writeStates(state);
40-
}
41+
if (state instanceof Map) {
42+
this.scopeRegistryAssessor.writeStates((Map<String, Object>) state);
43+
} else {
44+
this.scopeRegistryAssessor.writeState(DEFAULT_AGENTIC_SCOPE_STATE_KEY, state);
45+
}
4146

42-
private AgenticModel asAgenticModel(Object value) {
43-
return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), value);
47+
return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), state);
4448
}
4549

4650
@Override
47-
@SuppressWarnings("unchecked")
4851
public WorkflowModel fromAny(WorkflowModel prev, Object obj) {
4952
// TODO: we shouldn't update the state if the previous task was an agent call since under the
5053
// hood, the agent already updated it.
5154
if (prev instanceof AgenticModel agenticModel) {
5255
this.scopeRegistryAssessor.setAgenticScope(agenticModel.getAgenticScope());
5356
}
54-
55-
if (obj instanceof Map) {
56-
this.updateAgenticScope((Map<String, Object>) obj);
57-
} else {
58-
this.updateAgenticScope(obj);
59-
}
60-
61-
return asAgenticModel(obj);
57+
return newAgenticModel(obj);
6258
}
6359

6460
@Override
@@ -71,60 +67,54 @@ public WorkflowModel combine(Map<String, WorkflowModel> workflowVariables) {
7167

7268
@Override
7369
public WorkflowModelCollection createCollection() {
74-
return new AgenticModelCollection(this.scopeRegistryAssessor.getAgenticScope());
70+
return new AgenticModelCollection(this.scopeRegistryAssessor.getAgenticScope(), scopeCloudEventsHandler);
7571
}
7672

7773
@Override
7874
public WorkflowModel from(boolean value) {
79-
this.updateAgenticScope(value);
80-
return asAgenticModel(value);
75+
return newAgenticModel(value);
8176
}
8277

8378
@Override
8479
public WorkflowModel from(Number value) {
85-
this.updateAgenticScope(value);
86-
return asAgenticModel(value);
80+
return newAgenticModel(value);
8781
}
8882

8983
@Override
9084
public WorkflowModel from(String value) {
91-
this.updateAgenticScope(value);
92-
return asAgenticModel(value);
85+
return newAgenticModel(value);
9386
}
9487

9588
@Override
9689
public WorkflowModel from(CloudEvent ce) {
97-
return asAgenticModel(ce);
90+
return from(scopeCloudEventsHandler.extractDataAsMap(ce));
9891
}
9992

10093
@Override
10194
public WorkflowModel from(CloudEventData ce) {
102-
return asAgenticModel(ce);
95+
return from(scopeCloudEventsHandler.extractDataAsMap(ce));
10396
}
10497

10598
@Override
10699
public WorkflowModel from(OffsetDateTime value) {
107-
this.updateAgenticScope(value);
108-
return asAgenticModel(value);
100+
return newAgenticModel(value);
109101
}
110102

111103
@Override
112104
public WorkflowModel from(Map<String, Object> map) {
113-
this.updateAgenticScope(map);
114-
return asAgenticModel(map);
105+
return newAgenticModel(map);
115106
}
116107

117108
@Override
118109
public WorkflowModel fromNull() {
119-
return asAgenticModel(null);
110+
return newAgenticModel(null);
120111
}
121112

122113
@Override
123114
public WorkflowModel fromOther(Object value) {
124115
if (value instanceof AgenticScope scope) {
125116
return new AgenticModel(scope, scope.state());
126117
}
127-
this.updateAgenticScope(value);
128-
return asAgenticModel(value);
118+
return newAgenticModel(value);
129119
}
130120
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package io.serverlessworkflow.impl.expressions.agentic;
2+
3+
import java.io.IOException;
4+
import java.util.Map;
5+
6+
import com.fasterxml.jackson.core.type.TypeReference;
7+
import com.fasterxml.jackson.databind.ObjectMapper;
8+
import dev.langchain4j.agentic.scope.AgenticScope;
9+
import io.cloudevents.CloudEvent;
10+
import io.cloudevents.CloudEventData;
11+
12+
public final class AgenticScopeCloudEventsHandler {
13+
14+
private final ObjectMapper mapper = new ObjectMapper();
15+
16+
AgenticScopeCloudEventsHandler() {}
17+
18+
public void writeState(final AgenticScope scope, final CloudEvent cloudEvent) {
19+
if (cloudEvent != null) {
20+
writeState(scope, cloudEvent.getData());
21+
}
22+
}
23+
24+
public void writeState(final AgenticScope scope, final CloudEventData cloudEvent) {
25+
scope.writeStates(extractDataAsMap(cloudEvent));
26+
}
27+
28+
public boolean writeStateIfCloudEvent(final AgenticScope scope, final Object value) {
29+
if (value instanceof CloudEvent) {
30+
writeState(scope, (CloudEvent) value);
31+
return true;
32+
} else if (value instanceof CloudEventData) {
33+
writeState(scope, (CloudEventData) value);
34+
return true;
35+
}
36+
return false;
37+
}
38+
39+
public Map<String, Object> extractDataAsMap(final CloudEventData ce) {
40+
try {
41+
if (ce != null) {
42+
return mapper.readValue(ce.toBytes(), new TypeReference<>() {
43+
});
44+
}
45+
} catch (IOException e) {
46+
throw new IllegalArgumentException("Unable to parse CloudEvent data as JSON", e);
47+
}
48+
return Map.of();
49+
}
50+
51+
public Map<String, Object> extractDataAsMap(final CloudEvent ce) {
52+
if (ce != null) {
53+
return extractDataAsMap(ce.getData());
54+
}
55+
return Map.of();
56+
}
57+
}

experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/langchain4j/AgenticScopeRegistryAssessor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import dev.langchain4j.agentic.scope.AgenticScope;
2020
import dev.langchain4j.agentic.scope.AgenticScopeRegistry;
2121
import dev.langchain4j.agentic.scope.DefaultAgenticScope;
22+
23+
import java.util.Map;
2224
import java.util.Objects;
2325
import java.util.UUID;
2426
import java.util.concurrent.atomic.AtomicReference;
@@ -62,6 +64,14 @@ public void setAgenticScope(AgenticScope agenticScope) {
6264
this.agenticScope = Objects.requireNonNull(agenticScope, "AgenticScope cannot be null");
6365
}
6466

67+
public void writeState(String key, Object value) {
68+
this.getAgenticScope().writeState(key, value);
69+
}
70+
71+
public void writeStates(Map<String, Object> states) {
72+
this.getAgenticScope().writeStates(states);
73+
}
74+
6575
@Override
6676
public AgenticScopeOwner withAgenticScope(DefaultAgenticScope agenticScope) {
6777
this.setAgenticScope(agenticScope);

fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentDoTaskBuilder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.serverlessworkflow.fluent.agentic;
1717

18+
import dev.langchain4j.agentic.Agent;
1819
import io.serverlessworkflow.fluent.agentic.spi.AgentDoFluent;
1920
import io.serverlessworkflow.fluent.func.FuncCallTaskBuilder;
2021
import io.serverlessworkflow.fluent.func.FuncEmitTaskBuilder;
@@ -83,7 +84,7 @@ public AgentDoTaskBuilder emit(String name, Consumer<FuncEmitTaskBuilder> itemsC
8384
}
8485

8586
@Override
86-
public AgentDoTaskBuilder listen(String name, Consumer<FuncListenTaskBuilder> itemsConfigurer) {
87+
public AgentDoTaskBuilder listen(String name, Consumer<AgentListenTaskBuilder> itemsConfigurer) {
8788
this.listBuilder().listen(name, itemsConfigurer);
8889
return self();
8990
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package io.serverlessworkflow.fluent.agentic;
2+
3+
import java.util.function.Predicate;
4+
5+
import io.serverlessworkflow.api.types.AnyEventConsumptionStrategy;
6+
import io.serverlessworkflow.api.types.ListenTask;
7+
import io.serverlessworkflow.api.types.func.UntilPredicate;
8+
import io.serverlessworkflow.fluent.spec.ListenTaskBuilder;
9+
10+
public class AgentListenTaskBuilder extends ListenTaskBuilder<AgentTaskItemListBuilder> {
11+
12+
private UntilPredicate untilPredicate;
13+
14+
public AgentListenTaskBuilder() {
15+
super(new AgentTaskItemListBuilder());
16+
}
17+
18+
public <T> AgentListenTaskBuilder until(Predicate<T> predicate, Class<T> predClass) {
19+
untilPredicate = new UntilPredicate().withPredicate(predicate, predClass);
20+
return this;
21+
}
22+
23+
@Override
24+
public ListenTask build() {
25+
ListenTask task = super.build();
26+
AnyEventConsumptionStrategy anyEvent =
27+
task.getListen().getTo().getAnyEventConsumptionStrategy();
28+
if (untilPredicate != null && anyEvent != null) {
29+
anyEvent.withUntil(untilPredicate);
30+
}
31+
return task;
32+
}
33+
34+
}

fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentTaskItemListBuilder.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,10 @@ public AgentTaskItemListBuilder emit(String name, Consumer<FuncEmitTaskBuilder>
116116

117117
@Override
118118
public AgentTaskItemListBuilder listen(
119-
String name, Consumer<FuncListenTaskBuilder> itemsConfigurer) {
120-
this.delegate.listen(name, itemsConfigurer);
119+
String name, Consumer<AgentListenTaskBuilder> itemsConfigurer) {
120+
final AgentListenTaskBuilder builder = new AgentListenTaskBuilder();
121+
itemsConfigurer.accept(builder);
122+
this.addTaskItem(new TaskItem(name, new Task().withListenTask(builder.build())));
121123
return self();
122124
}
123125

fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/spi/AgentDoFluent.java

Lines changed: 44 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,40 +15,61 @@
1515
*/
1616
package io.serverlessworkflow.fluent.agentic.spi;
1717

18-
import io.serverlessworkflow.fluent.agentic.LoopAgentsBuilder;
19-
import io.serverlessworkflow.fluent.func.spi.FuncDoFluent;
2018
import java.util.UUID;
2119
import java.util.function.Consumer;
2220

23-
public interface AgentDoFluent<SELF extends AgentDoFluent<SELF>> extends FuncDoFluent<SELF> {
21+
import io.serverlessworkflow.fluent.agentic.AgentListenTaskBuilder;
22+
import io.serverlessworkflow.fluent.agentic.LoopAgentsBuilder;
23+
import io.serverlessworkflow.fluent.func.FuncCallTaskBuilder;
24+
import io.serverlessworkflow.fluent.func.FuncEmitTaskBuilder;
25+
import io.serverlessworkflow.fluent.func.FuncForTaskBuilder;
26+
import io.serverlessworkflow.fluent.func.FuncForkTaskBuilder;
27+
import io.serverlessworkflow.fluent.func.FuncSetTaskBuilder;
28+
import io.serverlessworkflow.fluent.func.FuncSwitchTaskBuilder;
29+
import io.serverlessworkflow.fluent.func.spi.CallFnFluent;
30+
import io.serverlessworkflow.fluent.spec.spi.EmitFluent;
31+
import io.serverlessworkflow.fluent.spec.spi.ForEachFluent;
32+
import io.serverlessworkflow.fluent.spec.spi.ForkFluent;
33+
import io.serverlessworkflow.fluent.spec.spi.ListenFluent;
34+
import io.serverlessworkflow.fluent.spec.spi.SetFluent;
35+
import io.serverlessworkflow.fluent.spec.spi.SwitchFluent;
36+
37+
public interface AgentDoFluent<SELF extends AgentDoFluent<SELF>>
38+
extends SetFluent<FuncSetTaskBuilder, SELF>,
39+
EmitFluent<FuncEmitTaskBuilder, SELF>,
40+
ForEachFluent<FuncForTaskBuilder, SELF>,
41+
SwitchFluent<FuncSwitchTaskBuilder, SELF>,
42+
ForkFluent<FuncForkTaskBuilder, SELF>,
43+
ListenFluent<AgentListenTaskBuilder, SELF>,
44+
CallFnFluent<FuncCallTaskBuilder, SELF> {
2445

25-
SELF agent(String name, Object agent);
46+
SELF agent(String name, Object agent);
2647

27-
default SELF agent(Object agent) {
28-
return agent(UUID.randomUUID().toString(), agent);
29-
}
48+
default SELF agent(Object agent) {
49+
return agent(UUID.randomUUID().toString(), agent);
50+
}
3051

31-
SELF sequence(String name, Object... agents);
52+
SELF sequence(String name, Object... agents);
3253

33-
default SELF sequence(Object... agents) {
34-
return sequence("seq-" + UUID.randomUUID(), agents);
35-
}
54+
default SELF sequence(Object... agents) {
55+
return sequence("seq-" + UUID.randomUUID(), agents);
56+
}
3657

37-
SELF loop(String name, Consumer<LoopAgentsBuilder> builder);
58+
SELF loop(String name, Consumer<LoopAgentsBuilder> builder);
3859

39-
default SELF loop(Consumer<LoopAgentsBuilder> builder) {
40-
return loop("loop-" + UUID.randomUUID(), builder);
41-
}
60+
default SELF loop(Consumer<LoopAgentsBuilder> builder) {
61+
return loop("loop-" + UUID.randomUUID(), builder);
62+
}
4263

43-
SELF loop(String name, LoopAgentsBuilder builder);
64+
SELF loop(String name, LoopAgentsBuilder builder);
4465

45-
default SELF loop(LoopAgentsBuilder builder) {
46-
return loop("loop-" + UUID.randomUUID(), builder);
47-
}
66+
default SELF loop(LoopAgentsBuilder builder) {
67+
return loop("loop-" + UUID.randomUUID(), builder);
68+
}
4869

49-
SELF parallel(String name, Object... agents);
70+
SELF parallel(String name, Object... agents);
5071

51-
default SELF parallel(Object... agents) {
52-
return parallel("par-" + UUID.randomUUID(), agents);
53-
}
72+
default SELF parallel(Object... agents) {
73+
return parallel("par-" + UUID.randomUUID(), agents);
74+
}
5475
}

0 commit comments

Comments
 (0)