Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
6a0cb57
Fix things
siri-varma Feb 10, 2026
f01d1a8
fix things
siri-varma Feb 10, 2026
e216360
Merge branch 'master' into users/sveigraju/add-sub-orche
siri-varma Feb 13, 2026
039cf42
Merge branch 'master' into users/sveigraju/add-sub-orche
siri-varma Feb 13, 2026
3ab2f35
Merge branch 'master' into users/sveigraju/add-sub-orche
siri-varma Feb 16, 2026
4670d07
Update things
siri-varma Feb 16, 2026
718e1dc
Merge branch 'users/sveigraju/add-sub-orche' of https://github.com/si…
siri-varma Feb 16, 2026
cc9459b
Add sub orchestration its
siri-varma Feb 16, 2026
62ea4e3
Add sub orchestration its
siri-varma Feb 16, 2026
5c24d8d
Add sub orchestration its
siri-varma Feb 16, 2026
7b14d3e
Merge branch 'master' into users/sveigraju/add-sub-orche
siri-varma Feb 17, 2026
ac0b082
Merge branch 'master' into users/sveigraju/add-sub-orche
siri-varma Feb 17, 2026
706939d
Merge branch 'master' into users/sveigraju/add-sub-orche
siri-varma Feb 18, 2026
908a1f0
fix things
siri-varma Feb 18, 2026
189cadd
Merge branch 'users/sveigraju/add-sub-orche' of https://github.com/si…
siri-varma Feb 18, 2026
2fa2dc0
fix things
siri-varma Feb 18, 2026
4a43b67
fix things
siri-varma Feb 18, 2026
d1ae3d8
Merge branch 'master' into users/sveigraju/add-sub-orche
siri-varma Feb 18, 2026
6c680cb
Merge branch 'master' into users/sveigraju/add-sub-orche
siri-varma Feb 19, 2026
33a82e2
Merge branch 'master' into users/sveigraju/add-sub-orche
siri-varma Feb 20, 2026
fe78f38
Merge branch 'master' into users/sveigraju/add-sub-orche
siri-varma Feb 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,14 @@ private void setAppId(String appId) {
this.appId = appId;
}

private boolean hasSourceAppId() {
return this.appId != null && !this.appId.isEmpty();
}

private boolean hasTargetAppId(TaskOptions options) {
return options != null && options.hasAppID();
}

@Override
public Instant getCurrentInstant() {
// TODO: Throw if instant is null
Expand Down Expand Up @@ -345,36 +353,32 @@ public <V> Task<V> callActivity(
}

// Add router information for cross-app routing
// Router always has a source app ID from EXECUTIONSTARTED event
OrchestratorService.TaskRouter.Builder routerBuilder = OrchestratorService.TaskRouter.newBuilder()
.setSourceAppID(this.appId);

// Add target app ID if specified in options
if (options != null && options.hasAppID()) {
OrchestratorService.TaskRouter router = null;
if (hasSourceAppId() && hasTargetAppId(options)) {
String targetAppId = options.getAppID();
OrchestratorService.TaskRouter router = OrchestratorService.TaskRouter.newBuilder()
scheduleTaskBuilder.setRouter(OrchestratorService.TaskRouter.newBuilder()
.setSourceAppID(this.appId)
.setTargetAppID(targetAppId)
.build();
scheduleTaskBuilder.setRouter(router);
.build());
this.logger.fine(() -> String.format(
"cross app routing detected: source=%s, target=%s",
this.appId, targetAppId));
}

// Capture for use inside lambda
final OrchestratorService.TaskRouter actionRouter = router;

TaskFactory<V> taskFactory = () -> {
int id = this.sequenceNumber++;
OrchestratorService.ScheduleTaskAction scheduleTaskAction = scheduleTaskBuilder.build();
OrchestratorService.OrchestratorAction.Builder actionBuilder = OrchestratorService.OrchestratorAction
.newBuilder()
.setId(id)
.setScheduleTask(scheduleTaskBuilder);
if (options != null && options.hasAppID()) {
String targetAppId = options.getAppID();
OrchestratorService.TaskRouter actionRouter = OrchestratorService.TaskRouter.newBuilder()
if (hasSourceAppId() && hasTargetAppId(options)) {
actionBuilder.setRouter(OrchestratorService.TaskRouter.newBuilder()
.setSourceAppID(this.appId)
.setTargetAppID(targetAppId)
.build();
actionBuilder.setRouter(actionRouter);
.setTargetAppID(options.getAppID())
.build());
}
this.pendingActions.put(id, actionBuilder.build());

Expand Down Expand Up @@ -515,13 +519,40 @@ public <V> Task<V> callSubOrchestrator(
}
createSubOrchestrationActionBuilder.setInstanceId(instanceId);

// TODO: @cicoyle - add suborchestration cross app logic here when its supported
// Add router information for cross-app routing of sub-orchestrations
if (hasSourceAppId()) {
OrchestratorService.TaskRouter.Builder routerBuilder = OrchestratorService.TaskRouter.newBuilder()
.setSourceAppID(this.appId);

// Add target app ID if specified in options
if (hasTargetAppId(options)) {
routerBuilder.setTargetAppID(options.getAppID());
this.logger.fine(() -> String.format(
"cross app sub-orchestration routing detected: source=%s, target=%s",
this.appId, options.getAppID()));
}

createSubOrchestrationActionBuilder.setRouter(routerBuilder.build());
}

TaskFactory<V> taskFactory = () -> {
int id = this.sequenceNumber++;
this.pendingActions.put(id, OrchestratorService.OrchestratorAction.newBuilder()
OrchestratorService.OrchestratorAction.Builder actionBuilder = OrchestratorService.OrchestratorAction
.newBuilder()
.setId(id)
.setCreateSubOrchestration(createSubOrchestrationActionBuilder)
.build());
.setCreateSubOrchestration(createSubOrchestrationActionBuilder);

// Set router on the OrchestratorAction for cross-app routing
if (hasSourceAppId()) {
OrchestratorService.TaskRouter.Builder actionRouterBuilder = OrchestratorService.TaskRouter.newBuilder()
.setSourceAppID(this.appId);
if (hasTargetAppId(options)) {
actionRouterBuilder.setTargetAppID(options.getAppID());
}
actionBuilder.setRouter(actionRouterBuilder.build());
}

this.pendingActions.put(id, actionBuilder.build());

if (!this.isReplaying) {
this.logger.fine(() -> String.format(
Expand Down Expand Up @@ -957,11 +988,20 @@ private void completeInternal(
}

int id = this.sequenceNumber++;
OrchestratorService.OrchestratorAction action = OrchestratorService.OrchestratorAction.newBuilder()
OrchestratorService.OrchestratorAction.Builder actionBuilder = OrchestratorService.OrchestratorAction
.newBuilder()
.setId(id)
.setCompleteOrchestration(builder.build())
.build();
this.pendingActions.put(id, action);
.setCompleteOrchestration(builder.build());

// Add router to completion action for cross-app routing back to parent
if (hasSourceAppId()) {
actionBuilder.setRouter(
OrchestratorService.TaskRouter.newBuilder()
.setSourceAppID(this.appId)
.build());
}

this.pendingActions.put(id, actionBuilder.build());
this.isComplete = true;
}

Expand Down Expand Up @@ -1025,7 +1065,16 @@ private void processEvent(OrchestratorService.HistoryEvent e) {
this.setInput(executionStarted.getInput().getValue());
this.setInstanceId(executionStarted.getOrchestrationInstance().getInstanceId());
this.logger.fine(() -> this.instanceId + ": Workflow execution started");
this.setAppId(e.getRouter().getSourceAppID());
// For cross-app suborchestrations, if the router has a target, use that as our appID
// since that's where we're actually executing
if (e.hasRouter()) {
OrchestratorService.TaskRouter router = e.getRouter();
if (router.hasTargetAppID()) {
this.setAppId(router.getTargetAppID());
} else {
this.setAppId(router.getSourceAppID());
}
}

var versionName = "";
if (!StringUtils.isEmpty(this.orchestratorVersionName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,138 @@ void subOrchestration() throws TimeoutException {
}
}

@Test
void subOrchestrationWithActivity() throws TimeoutException {
final String parentOrchestratorName = "ParentOrchestrator";
final String childOrchestratorName = "ChildOrchestrator";
final String activityName = "PlusOne";

DurableTaskGrpcWorker worker = this.createWorkerBuilder()
.addOrchestrator(parentOrchestratorName, ctx -> {
int input = ctx.getInput(int.class);
int childResult = ctx.callSubOrchestrator(childOrchestratorName, input, int.class).await();
ctx.complete(childResult);
})
.addOrchestrator(childOrchestratorName, ctx -> {
int input = ctx.getInput(int.class);
int result = ctx.callActivity(activityName, input, int.class).await();
ctx.complete(result);
})
.addActivity(activityName, ctx -> ctx.getInput(int.class) + 1)
.buildAndStart();

DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
try (worker; client) {
String instanceId = client.scheduleNewOrchestrationInstance(parentOrchestratorName, 10);
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
assertNotNull(instance);
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
assertEquals(11, instance.readOutputAs(int.class));
}
}

@Test
void subOrchestrationChain() throws TimeoutException {
final String orchestratorName = "ChainOrchestrator";
final String leafOrchestratorName = "LeafOrchestrator";
final String activityName = "Double";

DurableTaskGrpcWorker worker = this.createWorkerBuilder()
.addOrchestrator(orchestratorName, ctx -> {
int input = ctx.getInput(int.class);
// Chain: parent calls child which calls leaf
int result = ctx.callSubOrchestrator(leafOrchestratorName, input, int.class).await();
// Call activity after sub-orchestration completes
result = ctx.callActivity(activityName, result, int.class).await();
ctx.complete(result);
})
.addOrchestrator(leafOrchestratorName, ctx -> {
int input = ctx.getInput(int.class);
int result = ctx.callActivity(activityName, input, int.class).await();
ctx.complete(result);
})
.addActivity(activityName, ctx -> ctx.getInput(int.class) * 2)
.buildAndStart();

DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
try (worker; client) {
// input=3 -> leaf doubles to 6 -> parent doubles to 12
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, 3);
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
assertNotNull(instance);
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
assertEquals(12, instance.readOutputAs(int.class));
}
}

@Test
void subOrchestrationFanOut() throws TimeoutException {
final String parentOrchestratorName = "FanOutParent";
final String childOrchestratorName = "FanOutChild";
final String activityName = "Square";
final int childCount = 5;

DurableTaskGrpcWorker worker = this.createWorkerBuilder()
.addOrchestrator(parentOrchestratorName, ctx -> {
// Fan out: launch multiple sub-orchestrations in parallel
List<Task<Integer>> tasks = IntStream.range(1, childCount + 1)
.mapToObj(i -> ctx.callSubOrchestrator(childOrchestratorName, i, int.class))
.collect(Collectors.toList());

List<Integer> results = ctx.allOf(tasks).await();
int sum = results.stream().mapToInt(Integer::intValue).sum();
ctx.complete(sum);
})
.addOrchestrator(childOrchestratorName, ctx -> {
int input = ctx.getInput(int.class);
int result = ctx.callActivity(activityName, input, int.class).await();
ctx.complete(result);
})
.addActivity(activityName, ctx -> {
int val = ctx.getInput(int.class);
return val * val;
})
.buildAndStart();

DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
try (worker; client) {
String instanceId = client.scheduleNewOrchestrationInstance(parentOrchestratorName, 0);
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
assertNotNull(instance);
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
// 1^2 + 2^2 + 3^2 + 4^2 + 5^2 = 1 + 4 + 9 + 16 + 25 = 55
assertEquals(55, instance.readOutputAs(int.class));
}
}

@Test
void subOrchestrationWithInstanceId() throws TimeoutException {
final String parentOrchestratorName = "ParentWithInstanceId";
final String childOrchestratorName = "ChildWithInstanceId";

DurableTaskGrpcWorker worker = this.createWorkerBuilder()
.addOrchestrator(parentOrchestratorName, ctx -> {
String childInstanceId = ctx.getInstanceId() + ":child";
String result = ctx.callSubOrchestrator(
childOrchestratorName, "hello", childInstanceId, String.class).await();
ctx.complete(result);
})
.addOrchestrator(childOrchestratorName, ctx -> {
String input = ctx.getInput(String.class);
ctx.complete(input + " world");
})
.buildAndStart();

DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
try (worker; client) {
String instanceId = client.scheduleNewOrchestrationInstance(parentOrchestratorName, "test");
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
assertNotNull(instance);
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
assertEquals("hello world", instance.readOutputAs(String.class));
}
}

@Test
void continueAsNew() throws TimeoutException {
final String orchestratorName = "continueAsNew";
Expand Down
Loading
Loading