Skip to content

Commit c2d3c95

Browse files
committed
[Fix #987] Refactor shell impl
Signed-off-by: fjtirado <[email protected]>
1 parent 51d85a9 commit c2d3c95

File tree

1 file changed

+101
-108
lines changed

1 file changed

+101
-108
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunShellExecutor.java

Lines changed: 101 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -17,141 +17,134 @@
1717

1818
import io.serverlessworkflow.api.types.RunShell;
1919
import io.serverlessworkflow.api.types.RunTaskConfiguration;
20+
import io.serverlessworkflow.api.types.RunTaskConfiguration.ProcessReturnType;
2021
import io.serverlessworkflow.api.types.Shell;
2122
import io.serverlessworkflow.impl.TaskContext;
22-
import io.serverlessworkflow.impl.WorkflowApplication;
2323
import io.serverlessworkflow.impl.WorkflowContext;
2424
import io.serverlessworkflow.impl.WorkflowDefinition;
25-
import io.serverlessworkflow.impl.WorkflowError;
26-
import io.serverlessworkflow.impl.WorkflowException;
2725
import io.serverlessworkflow.impl.WorkflowModel;
2826
import io.serverlessworkflow.impl.WorkflowModelFactory;
2927
import io.serverlessworkflow.impl.WorkflowUtils;
30-
import io.serverlessworkflow.impl.expressions.ExpressionUtils;
28+
import io.serverlessworkflow.impl.WorkflowValueResolver;
3129
import java.io.IOException;
30+
import java.io.UncheckedIOException;
3231
import java.nio.charset.StandardCharsets;
32+
import java.util.LinkedHashMap;
3333
import java.util.Map;
34+
import java.util.Optional;
3435
import java.util.concurrent.CompletableFuture;
36+
import java.util.stream.Collectors;
3537

3638
public class RunShellExecutor implements RunnableTask<RunShell> {
3739

38-
private ShellResultSupplier shellResultSupplier;
39-
private ProcessBuilderSupplier processBuilderSupplier;
40+
private WorkflowValueResolver<String> shellCommand;
41+
private Map<WorkflowValueResolver<String>, Optional<WorkflowValueResolver<String>>>
42+
shellArguments;
43+
private Optional<WorkflowValueResolver<Map<String, Object>>> shellEnv;
44+
private Optional<ProcessReturnType> returnType;
4045

41-
@FunctionalInterface
42-
private interface ShellResultSupplier {
43-
WorkflowModel apply(
44-
TaskContext taskContext, WorkflowModel input, ProcessBuilder processBuilder);
45-
}
46+
@Override
47+
public CompletableFuture<WorkflowModel> apply(
48+
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel model) {
49+
StringBuilder commandBuilder =
50+
new StringBuilder(shellCommand.apply(workflowContext, taskContext, model));
51+
for (var entry : shellArguments.entrySet()) {
52+
commandBuilder.append(" ").append(entry.getKey().apply(workflowContext, taskContext, model));
53+
entry.getValue().ifPresent(v -> commandBuilder.append("=").append(v));
54+
}
4655

47-
@FunctionalInterface
48-
private interface ProcessBuilderSupplier {
49-
ProcessBuilder apply(WorkflowContext workflowContext, TaskContext taskContext);
56+
ProcessBuilder builder = new ProcessBuilder("sh", "-c", commandBuilder.toString());
57+
shellEnv.ifPresent(
58+
map -> {
59+
for (Map.Entry<String, Object> entry :
60+
map.apply(workflowContext, taskContext, model).entrySet()) {
61+
builder.environment().put(entry.getKey(), (String) entry.getValue());
62+
}
63+
});
64+
65+
return returnType
66+
.map(
67+
type ->
68+
CompletableFuture.supplyAsync(
69+
() ->
70+
buildResultFromProcess(
71+
workflowContext.definition().application().modelFactory(),
72+
uncheckedStart(builder),
73+
type)
74+
.orElse(model)))
75+
.orElseGet(
76+
() -> {
77+
uncheckedStart(builder);
78+
return CompletableFuture.completedFuture(model);
79+
});
5080
}
5181

52-
@Override
53-
public CompletableFuture<WorkflowModel> apply(
54-
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
55-
ProcessBuilder processBuilder = this.processBuilderSupplier.apply(workflowContext, taskContext);
56-
return CompletableFuture.supplyAsync(
57-
() -> this.shellResultSupplier.apply(taskContext, input, processBuilder));
82+
private Process uncheckedStart(ProcessBuilder builder) {
83+
try {
84+
return builder.start();
85+
} catch (IOException e) {
86+
throw new UncheckedIOException(e);
87+
}
5888
}
5989

6090
@Override
6191
public void init(RunShell taskConfiguration, WorkflowDefinition definition) {
6292
Shell shell = taskConfiguration.getShell();
63-
final String shellCommand = shell.getCommand();
64-
65-
if (shellCommand == null || shellCommand.isBlank()) {
93+
if (!WorkflowUtils.isValid(taskConfiguration.getShell().getCommand())) {
6694
throw new IllegalStateException("Missing shell command in RunShell task configuration");
6795
}
68-
this.processBuilderSupplier =
69-
(workflowContext, taskContext) -> {
70-
WorkflowApplication application = definition.application();
71-
72-
StringBuilder commandBuilder =
73-
new StringBuilder(
74-
ExpressionUtils.isExpr(shellCommand)
75-
? WorkflowUtils.buildStringFilter(application, shellCommand)
76-
.apply(workflowContext, taskContext, taskContext.input())
77-
: shellCommand);
78-
79-
if (shell.getArguments() != null
80-
&& shell.getArguments().getAdditionalProperties() != null) {
81-
for (Map.Entry<String, Object> entry :
82-
shell.getArguments().getAdditionalProperties().entrySet()) {
83-
commandBuilder
84-
.append(" ")
85-
.append(
86-
ExpressionUtils.isExpr(entry.getKey())
87-
? WorkflowUtils.buildStringFilter(application, entry.getKey())
88-
.apply(workflowContext, taskContext, taskContext.input())
89-
: entry.getKey());
90-
if (entry.getValue() != null) {
91-
92-
commandBuilder
93-
.append("=")
94-
.append(
95-
ExpressionUtils.isExpr(entry.getValue())
96-
? WorkflowUtils.buildStringFilter(
97-
application, entry.getValue().toString())
98-
.apply(workflowContext, taskContext, taskContext.input())
99-
: entry.getValue().toString());
100-
}
101-
}
102-
}
103-
104-
// TODO: support Windows cmd.exe
105-
ProcessBuilder builder = new ProcessBuilder("sh", "-c", commandBuilder.toString());
106-
if (shell.getEnvironment() != null
107-
&& shell.getEnvironment().getAdditionalProperties() != null) {
108-
for (Map.Entry<String, Object> entry :
109-
shell.getEnvironment().getAdditionalProperties().entrySet()) {
110-
String value =
111-
ExpressionUtils.isExpr(entry.getValue())
112-
? WorkflowUtils.buildStringFilter(application, entry.getValue().toString())
113-
.apply(workflowContext, taskContext, taskContext.input())
114-
: entry.getValue().toString();
115-
116-
// configure environments
117-
builder.environment().put(entry.getKey(), value);
118-
}
119-
}
120-
return builder;
121-
};
122-
123-
this.shellResultSupplier =
124-
(taskContext, input, processBuilder) -> {
125-
try {
126-
Process process = processBuilder.start();
127-
return taskConfiguration.isAwait()
128-
? buildResultFromProcess(taskConfiguration, definition, process)
129-
: input;
130-
} catch (IOException | InterruptedException e) {
131-
throw new WorkflowException(WorkflowError.runtime(taskContext, e).build(), e);
132-
}
133-
};
96+
shellCommand =
97+
WorkflowUtils.buildStringFilter(
98+
definition.application(), taskConfiguration.getShell().getCommand());
99+
100+
shellArguments =
101+
shell.getArguments() != null && shell.getArguments().getAdditionalProperties() != null
102+
? shell.getArguments().getAdditionalProperties().entrySet().stream()
103+
.collect(
104+
Collectors.toMap(
105+
e -> WorkflowUtils.buildStringFilter(definition.application(), e.getKey()),
106+
e ->
107+
e.getValue() != null
108+
? Optional.of(
109+
WorkflowUtils.buildStringFilter(
110+
definition.application(), e.getValue().toString()))
111+
: Optional.empty(),
112+
(x, y) -> y,
113+
LinkedHashMap::new))
114+
: Map.of();
115+
116+
shellEnv =
117+
shell.getEnvironment() != null && shell.getEnvironment().getAdditionalProperties() != null
118+
? Optional.of(
119+
WorkflowUtils.buildMapResolver(
120+
definition.application(), shell.getEnvironment().getAdditionalProperties()))
121+
: Optional.empty();
122+
123+
returnType =
124+
taskConfiguration.isAwait() ? Optional.of(taskConfiguration.getReturn()) : Optional.empty();
134125
}
135126

136-
/**
137-
* Builds the WorkflowModel result from the executed process. It waits for the process to finish
138-
* and captures the exit code, stdout, and stderr based on the task configuration.
139-
*/
140-
private WorkflowModel buildResultFromProcess(
141-
RunShell taskConfiguration, WorkflowDefinition definition, Process process)
142-
throws IOException, InterruptedException {
143-
int exitCode = process.waitFor();
144-
String stdout = new String(process.getInputStream().readAllBytes(), StandardCharsets.UTF_8);
145-
String stderr = new String(process.getErrorStream().readAllBytes(), StandardCharsets.UTF_8);
146-
147-
WorkflowModelFactory modelFactory = definition.application().modelFactory();
148-
return switch (taskConfiguration.getReturn()) {
149-
case ALL -> modelFactory.fromAny(new ProcessResult(exitCode, stdout.trim(), stderr.trim()));
150-
case NONE -> modelFactory.fromNull();
151-
case CODE -> modelFactory.from(exitCode);
152-
case STDOUT -> modelFactory.from(stdout.trim());
153-
case STDERR -> modelFactory.from(stderr.trim());
154-
};
127+
private static Optional<WorkflowModel> buildResultFromProcess(
128+
WorkflowModelFactory modelFactory, Process process, ProcessReturnType type) {
129+
try {
130+
int exitCode = process.waitFor();
131+
String stdout = new String(process.getInputStream().readAllBytes(), StandardCharsets.UTF_8);
132+
String stderr = new String(process.getErrorStream().readAllBytes(), StandardCharsets.UTF_8);
133+
return Optional.of(
134+
switch (type) {
135+
case ALL ->
136+
modelFactory.fromAny(new ProcessResult(exitCode, stdout.trim(), stderr.trim()));
137+
case NONE -> modelFactory.fromNull();
138+
case CODE -> modelFactory.from(exitCode);
139+
case STDOUT -> modelFactory.from(stdout.trim());
140+
case STDERR -> modelFactory.from(stderr.trim());
141+
});
142+
} catch (IOException e) {
143+
throw new UncheckedIOException(e);
144+
} catch (InterruptedException e) {
145+
Thread.currentThread().interrupt();
146+
return Optional.empty();
147+
}
155148
}
156149

157150
@Override

0 commit comments

Comments
 (0)