Skip to content

Commit 0bebd9c

Browse files
committed
[Fix #987] Refactor shell impl
Signed-off-by: fjtirado <[email protected]>
1 parent 51d85a9 commit 0bebd9c

File tree

1 file changed

+105
-108
lines changed

1 file changed

+105
-108
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunShellExecutor.java

Lines changed: 105 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -17,141 +17,138 @@
1717

1818
import io.serverlessworkflow.api.types.RunShell;
1919
import io.serverlessworkflow.api.types.RunTaskConfiguration;
20+
import io.serverlessworkflow.api.types.RunTaskConfiguration.ProcessReturnType;
2021
import io.serverlessworkflow.api.types.Shell;
2122
import io.serverlessworkflow.impl.TaskContext;
22-
import io.serverlessworkflow.impl.WorkflowApplication;
2323
import io.serverlessworkflow.impl.WorkflowContext;
2424
import io.serverlessworkflow.impl.WorkflowDefinition;
25-
import io.serverlessworkflow.impl.WorkflowError;
26-
import io.serverlessworkflow.impl.WorkflowException;
2725
import io.serverlessworkflow.impl.WorkflowModel;
2826
import io.serverlessworkflow.impl.WorkflowModelFactory;
2927
import io.serverlessworkflow.impl.WorkflowUtils;
30-
import io.serverlessworkflow.impl.expressions.ExpressionUtils;
28+
import io.serverlessworkflow.impl.WorkflowValueResolver;
3129
import java.io.IOException;
30+
import java.io.UncheckedIOException;
3231
import java.nio.charset.StandardCharsets;
32+
import java.util.LinkedHashMap;
3333
import java.util.Map;
34+
import java.util.Optional;
3435
import java.util.concurrent.CompletableFuture;
36+
import java.util.stream.Collectors;
3537

3638
public class RunShellExecutor implements RunnableTask<RunShell> {
3739

38-
private ShellResultSupplier shellResultSupplier;
39-
private ProcessBuilderSupplier processBuilderSupplier;
40+
private WorkflowValueResolver<String> shellCommand;
41+
private Map<WorkflowValueResolver<String>, Optional<WorkflowValueResolver<String>>>
42+
shellArguments;
43+
private Optional<WorkflowValueResolver<Map<String, Object>>> shellEnv;
44+
private Optional<ProcessReturnType> returnType;
4045

41-
@FunctionalInterface
42-
private interface ShellResultSupplier {
43-
WorkflowModel apply(
44-
TaskContext taskContext, WorkflowModel input, ProcessBuilder processBuilder);
45-
}
46+
@Override
47+
public CompletableFuture<WorkflowModel> apply(
48+
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel model) {
49+
StringBuilder commandBuilder =
50+
new StringBuilder(shellCommand.apply(workflowContext, taskContext, model));
51+
for (var entry : shellArguments.entrySet()) {
52+
commandBuilder.append(" ").append(entry.getKey().apply(workflowContext, taskContext, model));
53+
entry
54+
.getValue()
55+
.ifPresent(
56+
v -> commandBuilder.append("=").append(v.apply(workflowContext, taskContext, model)));
57+
}
4658

47-
@FunctionalInterface
48-
private interface ProcessBuilderSupplier {
49-
ProcessBuilder apply(WorkflowContext workflowContext, TaskContext taskContext);
59+
ProcessBuilder builder = new ProcessBuilder("sh", "-c", commandBuilder.toString());
60+
shellEnv.ifPresent(
61+
map -> {
62+
for (Map.Entry<String, Object> entry :
63+
map.apply(workflowContext, taskContext, model).entrySet()) {
64+
builder.environment().put(entry.getKey(), (String) entry.getValue());
65+
}
66+
});
67+
68+
return returnType
69+
.map(
70+
type ->
71+
CompletableFuture.supplyAsync(
72+
() ->
73+
buildResultFromProcess(
74+
workflowContext.definition().application().modelFactory(),
75+
uncheckedStart(builder),
76+
type)
77+
.orElse(model),
78+
workflowContext.definition().application().executorService()))
79+
.orElseGet(
80+
() -> {
81+
uncheckedStart(builder);
82+
return CompletableFuture.completedFuture(model);
83+
});
5084
}
5185

52-
@Override
53-
public CompletableFuture<WorkflowModel> apply(
54-
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
55-
ProcessBuilder processBuilder = this.processBuilderSupplier.apply(workflowContext, taskContext);
56-
return CompletableFuture.supplyAsync(
57-
() -> this.shellResultSupplier.apply(taskContext, input, processBuilder));
86+
private Process uncheckedStart(ProcessBuilder builder) {
87+
try {
88+
return builder.start();
89+
} catch (IOException e) {
90+
throw new UncheckedIOException(e);
91+
}
5892
}
5993

6094
@Override
6195
public void init(RunShell taskConfiguration, WorkflowDefinition definition) {
6296
Shell shell = taskConfiguration.getShell();
63-
final String shellCommand = shell.getCommand();
64-
65-
if (shellCommand == null || shellCommand.isBlank()) {
97+
if (!WorkflowUtils.isValid(taskConfiguration.getShell().getCommand())) {
6698
throw new IllegalStateException("Missing shell command in RunShell task configuration");
6799
}
68-
this.processBuilderSupplier =
69-
(workflowContext, taskContext) -> {
70-
WorkflowApplication application = definition.application();
71-
72-
StringBuilder commandBuilder =
73-
new StringBuilder(
74-
ExpressionUtils.isExpr(shellCommand)
75-
? WorkflowUtils.buildStringFilter(application, shellCommand)
76-
.apply(workflowContext, taskContext, taskContext.input())
77-
: shellCommand);
78-
79-
if (shell.getArguments() != null
80-
&& shell.getArguments().getAdditionalProperties() != null) {
81-
for (Map.Entry<String, Object> entry :
82-
shell.getArguments().getAdditionalProperties().entrySet()) {
83-
commandBuilder
84-
.append(" ")
85-
.append(
86-
ExpressionUtils.isExpr(entry.getKey())
87-
? WorkflowUtils.buildStringFilter(application, entry.getKey())
88-
.apply(workflowContext, taskContext, taskContext.input())
89-
: entry.getKey());
90-
if (entry.getValue() != null) {
91-
92-
commandBuilder
93-
.append("=")
94-
.append(
95-
ExpressionUtils.isExpr(entry.getValue())
96-
? WorkflowUtils.buildStringFilter(
97-
application, entry.getValue().toString())
98-
.apply(workflowContext, taskContext, taskContext.input())
99-
: entry.getValue().toString());
100-
}
101-
}
102-
}
103-
104-
// TODO: support Windows cmd.exe
105-
ProcessBuilder builder = new ProcessBuilder("sh", "-c", commandBuilder.toString());
106-
if (shell.getEnvironment() != null
107-
&& shell.getEnvironment().getAdditionalProperties() != null) {
108-
for (Map.Entry<String, Object> entry :
109-
shell.getEnvironment().getAdditionalProperties().entrySet()) {
110-
String value =
111-
ExpressionUtils.isExpr(entry.getValue())
112-
? WorkflowUtils.buildStringFilter(application, entry.getValue().toString())
113-
.apply(workflowContext, taskContext, taskContext.input())
114-
: entry.getValue().toString();
115-
116-
// configure environments
117-
builder.environment().put(entry.getKey(), value);
118-
}
119-
}
120-
return builder;
121-
};
122-
123-
this.shellResultSupplier =
124-
(taskContext, input, processBuilder) -> {
125-
try {
126-
Process process = processBuilder.start();
127-
return taskConfiguration.isAwait()
128-
? buildResultFromProcess(taskConfiguration, definition, process)
129-
: input;
130-
} catch (IOException | InterruptedException e) {
131-
throw new WorkflowException(WorkflowError.runtime(taskContext, e).build(), e);
132-
}
133-
};
100+
shellCommand =
101+
WorkflowUtils.buildStringFilter(
102+
definition.application(), taskConfiguration.getShell().getCommand());
103+
104+
shellArguments =
105+
shell.getArguments() != null && shell.getArguments().getAdditionalProperties() != null
106+
? shell.getArguments().getAdditionalProperties().entrySet().stream()
107+
.collect(
108+
Collectors.toMap(
109+
e -> WorkflowUtils.buildStringFilter(definition.application(), e.getKey()),
110+
e ->
111+
e.getValue() != null
112+
? Optional.of(
113+
WorkflowUtils.buildStringFilter(
114+
definition.application(), e.getValue().toString()))
115+
: Optional.empty(),
116+
(x, y) -> y,
117+
LinkedHashMap::new))
118+
: Map.of();
119+
120+
shellEnv =
121+
shell.getEnvironment() != null && shell.getEnvironment().getAdditionalProperties() != null
122+
? Optional.of(
123+
WorkflowUtils.buildMapResolver(
124+
definition.application(), shell.getEnvironment().getAdditionalProperties()))
125+
: Optional.empty();
126+
127+
returnType =
128+
taskConfiguration.isAwait() ? Optional.of(taskConfiguration.getReturn()) : Optional.empty();
134129
}
135130

136-
/**
137-
* Builds the WorkflowModel result from the executed process. It waits for the process to finish
138-
* and captures the exit code, stdout, and stderr based on the task configuration.
139-
*/
140-
private WorkflowModel buildResultFromProcess(
141-
RunShell taskConfiguration, WorkflowDefinition definition, Process process)
142-
throws IOException, InterruptedException {
143-
int exitCode = process.waitFor();
144-
String stdout = new String(process.getInputStream().readAllBytes(), StandardCharsets.UTF_8);
145-
String stderr = new String(process.getErrorStream().readAllBytes(), StandardCharsets.UTF_8);
146-
147-
WorkflowModelFactory modelFactory = definition.application().modelFactory();
148-
return switch (taskConfiguration.getReturn()) {
149-
case ALL -> modelFactory.fromAny(new ProcessResult(exitCode, stdout.trim(), stderr.trim()));
150-
case NONE -> modelFactory.fromNull();
151-
case CODE -> modelFactory.from(exitCode);
152-
case STDOUT -> modelFactory.from(stdout.trim());
153-
case STDERR -> modelFactory.from(stderr.trim());
154-
};
131+
private static Optional<WorkflowModel> buildResultFromProcess(
132+
WorkflowModelFactory modelFactory, Process process, ProcessReturnType type) {
133+
try {
134+
int exitCode = process.waitFor();
135+
String stdout = new String(process.getInputStream().readAllBytes(), StandardCharsets.UTF_8);
136+
String stderr = new String(process.getErrorStream().readAllBytes(), StandardCharsets.UTF_8);
137+
return Optional.of(
138+
switch (type) {
139+
case ALL ->
140+
modelFactory.fromAny(new ProcessResult(exitCode, stdout.trim(), stderr.trim()));
141+
case NONE -> modelFactory.fromNull();
142+
case CODE -> modelFactory.from(exitCode);
143+
case STDOUT -> modelFactory.from(stdout.trim());
144+
case STDERR -> modelFactory.from(stderr.trim());
145+
});
146+
} catch (IOException e) {
147+
throw new UncheckedIOException(e);
148+
} catch (InterruptedException e) {
149+
Thread.currentThread().interrupt();
150+
return Optional.empty();
151+
}
155152
}
156153

157154
@Override

0 commit comments

Comments
 (0)