Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
11ec9d7
feat(web): add queue and steer follow-up behavior
leonardoxr Mar 28, 2026
8f0600a
Merge branch 'main' into feat/follow-up-behavior-1462
leonardoxr Mar 28, 2026
3a033b0
feat: move queued follow-ups into orchestration
leonardoxr Mar 28, 2026
81ba2c4
Merge origin/main into feat/follow-up-behavior-1462
leonardoxr Mar 28, 2026
91df7c2
fix: address queued follow-up review feedback
leonardoxr Mar 28, 2026
b263589
fix: revoke queued follow-up preview urls on clear
leonardoxr Mar 28, 2026
775ee42
fix: harden queued follow-up composer flow
leonardoxr Mar 28, 2026
cace8fc
fix: address queued follow-up review regressions
leonardoxr Mar 28, 2026
f4587c9
Merge branch 'main' into feat/follow-up-behavior-1462
leonardoxr Mar 28, 2026
06d68cd
fix: harden queued follow-up dispatch
leonardoxr Mar 28, 2026
46f95da
fix: resolve queued steer review feedback
leonardoxr Mar 28, 2026
d92e28b
Merge origin/main into feat/follow-up-behavior-1462
leonardoxr Mar 29, 2026
1ad44cc
fix: resolve remaining queued follow-up review comments
leonardoxr Mar 29, 2026
f049043
Merge branch 'main' into feat/follow-up-behavior-1462
leonardoxr Mar 29, 2026
52c2d73
Merge origin/main into feat/follow-up-behavior-1462
leonardoxr Mar 29, 2026
8162f86
Merge remote-tracking branch 'leonardoxr/feat/follow-up-behavior-1462…
leonardoxr Mar 29, 2026
711f86d
fix: address remaining follow-up review feedback
leonardoxr Mar 29, 2026
6f39aeb
fix: address remaining follow-up review threads
leonardoxr Mar 29, 2026
a2754fa
fix: address queued follow-up review feedback
leonardoxr Mar 29, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 35 additions & 7 deletions apps/server/integration/OrchestrationEngineHarness.integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import { CheckpointReactorLive } from "../src/orchestration/Layers/CheckpointRea
import { OrchestrationEngineLive } from "../src/orchestration/Layers/OrchestrationEngine.ts";
import { OrchestrationProjectionPipelineLive } from "../src/orchestration/Layers/ProjectionPipeline.ts";
import { OrchestrationProjectionSnapshotQueryLive } from "../src/orchestration/Layers/ProjectionSnapshotQuery.ts";
import { QueuedFollowUpReactorLive } from "../src/orchestration/Layers/QueuedFollowUpReactor.ts";
import { RuntimeReceiptBusLive } from "../src/orchestration/Layers/RuntimeReceiptBus.ts";
import { OrchestrationReactorLive } from "../src/orchestration/Layers/OrchestrationReactor.ts";
import { ProviderCommandReactorLive } from "../src/orchestration/Layers/ProviderCommandReactor.ts";
Expand Down Expand Up @@ -165,6 +166,7 @@ export interface OrchestrationIntegrationHarness {
readonly dbPath: string;
readonly adapterHarness: TestProviderAdapterHarness | null;
readonly engine: OrchestrationEngineShape;
readonly startReactor: Effect.Effect<void, never>;
readonly snapshotQuery: ProjectionSnapshotQuery["Service"];
readonly providerService: ProviderService["Service"];
readonly checkpointStore: CheckpointStore["Service"];
Expand Down Expand Up @@ -211,6 +213,8 @@ export interface OrchestrationIntegrationHarness {
interface MakeOrchestrationIntegrationHarnessOptions {
readonly provider?: ProviderKind;
readonly realCodex?: boolean;
readonly rootDir?: string;
readonly autoStartReactor?: boolean;
}

export const makeOrchestrationIntegrationHarness = (
Expand All @@ -236,16 +240,25 @@ export const makeOrchestrationIntegrationHarness = (
listProviders: () => Effect.succeed([adapterHarness.provider]),
} as typeof ProviderAdapterRegistry.Service)
: null;
const rootDir = yield* fileSystem.makeTempDirectoryScoped({
prefix: "t3-orchestration-integration-",
});
const rootDir =
options?.rootDir ??
(yield* fileSystem.makeTempDirectoryScoped({
prefix: "t3-orchestration-integration-",
}));
const workspaceDir = path.join(rootDir, "workspace");
const { stateDir, dbPath } = yield* deriveServerPaths(rootDir, undefined).pipe(
Effect.provideService(Path.Path, path),
);
yield* fileSystem.makeDirectory(rootDir, { recursive: true });
yield* fileSystem.makeDirectory(workspaceDir, { recursive: true });
yield* fileSystem.makeDirectory(stateDir, { recursive: true });
yield* initializeGitWorkspace(workspaceDir);
const workspaceGitDir = path.join(workspaceDir, ".git");
const gitDirExists = yield* fileSystem
.exists(workspaceGitDir)
.pipe(Effect.orElseSucceed(() => false));
if (!gitDirExists) {
yield* initializeGitWorkspace(workspaceDir);
}

const persistenceLayer = makeSqlitePersistenceLive(dbPath);
const orchestrationLayer = OrchestrationEngineLive.pipe(
Expand Down Expand Up @@ -318,10 +331,14 @@ export const makeOrchestrationIntegrationHarness = (
const checkpointReactorLayer = CheckpointReactorLive.pipe(
Layer.provideMerge(runtimeServicesLayer),
);
const queuedFollowUpReactorLayer = QueuedFollowUpReactorLive.pipe(
Layer.provideMerge(runtimeServicesLayer),
);
const orchestrationReactorLayer = OrchestrationReactorLive.pipe(
Layer.provideMerge(runtimeIngestionLayer),
Layer.provideMerge(providerCommandReactorLayer),
Layer.provideMerge(checkpointReactorLayer),
Layer.provideMerge(queuedFollowUpReactorLayer),
);
const layer = orchestrationReactorLayer.pipe(
Layer.provide(persistenceLayer),
Expand Down Expand Up @@ -359,9 +376,19 @@ export const makeOrchestrationIntegrationHarness = (
).pipe(Effect.orDie);

const scope = yield* Scope.make("sequential");
yield* tryRuntimePromise("start OrchestrationReactor", () =>
runtime.runPromise(reactor.start.pipe(Scope.provide(scope))),
).pipe(Effect.orDie);
let reactorStarted = false;
const startReactor = Effect.gen(function* () {
if (reactorStarted) {
return;
}
reactorStarted = true;
yield* tryRuntimePromise("start OrchestrationReactor", () =>
runtime.runPromise(reactor.start.pipe(Scope.provide(scope))),
).pipe(Effect.orDie);
}).pipe(Effect.orDie);
if (options?.autoStartReactor !== false) {
yield* startReactor;
}
const receiptHistory = yield* Ref.make<ReadonlyArray<OrchestrationRuntimeReceipt>>([]);
yield* Stream.runForEach(runtimeReceiptBus.stream, (receipt) =>
Ref.update(receiptHistory, (history) => [...history, receipt]).pipe(Effect.asVoid),
Expand Down Expand Up @@ -492,6 +519,7 @@ export const makeOrchestrationIntegrationHarness = (
dbPath,
adapterHarness,
engine,
startReactor,
snapshotQuery,
providerService,
checkpointStore,
Expand Down
95 changes: 95 additions & 0 deletions apps/server/integration/orchestrationEngine.integration.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";

import {
Expand Down Expand Up @@ -94,6 +95,17 @@ function withHarness<A, E>(
).pipe(Effect.provide(NodeServices.layer));
}

function withHarnessOptions<A, E>(
options: Parameters<typeof makeOrchestrationIntegrationHarness>[0],
use: (harness: OrchestrationIntegrationHarness) => Effect.Effect<A, E>,
) {
return Effect.acquireUseRelease(
makeOrchestrationIntegrationHarness(options),
use,
(harness) => harness.dispose,
).pipe(Effect.provide(NodeServices.layer));
}

function withRealCodexHarness<A, E>(
use: (harness: OrchestrationIntegrationHarness) => Effect.Effect<A, E>,
) {
Expand Down Expand Up @@ -252,6 +264,89 @@ it.live("runs a single turn end-to-end and persists checkpoint state in sqlite +
),
);

it.live("replays queued follow-ups after orchestration restarts", () =>
Effect.gen(function* () {
const rootDir = fs.mkdtempSync(path.join(os.tmpdir(), "t3-orchestration-queue-restart-"));

yield* withHarnessOptions({ rootDir }, (harness) =>
Effect.gen(function* () {
yield* seedProjectAndThread(harness);

yield* harness.engine.dispatch({
type: "thread.queued-follow-up.enqueue",
commandId: CommandId.makeUnsafe("cmd-queued-follow-up-restart-enqueue"),
threadId: THREAD_ID,
followUp: {
id: "follow-up-restart-1",
createdAt: nowIso(),
prompt: "Resume this after restart",
attachments: [],
terminalContexts: [],
modelSelection: {
provider: "codex",
model: DEFAULT_MODEL_BY_PROVIDER.codex,
},
runtimeMode: "approval-required",
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
lastSendError: null,
},
createdAt: nowIso(),
});

const queuedThread = yield* harness.waitForThread(
THREAD_ID,
(thread) => thread.queuedFollowUps.length === 1,
);
assert.equal(queuedThread.queuedFollowUps[0]?.prompt, "Resume this after restart");
}),
);

yield* withHarnessOptions({ rootDir, autoStartReactor: false }, (harness) =>
Effect.gen(function* () {
yield* harness.adapterHarness!.queueTurnResponseForNextSession({
events: [
{
type: "turn.started",
...runtimeBase("evt-queued-restart-1", "2026-03-28T12:05:00.000Z"),
threadId: THREAD_ID,
turnId: FIXTURE_TURN_ID,
},
{
type: "message.delta",
...runtimeBase("evt-queued-restart-2", "2026-03-28T12:05:00.050Z"),
threadId: THREAD_ID,
turnId: FIXTURE_TURN_ID,
delta: "Recovered queued follow-up output.\n",
},
{
type: "turn.completed",
...runtimeBase("evt-queued-restart-3", "2026-03-28T12:05:00.100Z"),
threadId: THREAD_ID,
turnId: FIXTURE_TURN_ID,
status: "completed",
},
],
});

yield* harness.startReactor;

const recoveredThread = yield* harness.waitForThread(
THREAD_ID,
(thread) =>
thread.queuedFollowUps.length === 0 &&
thread.messages.some(
(message) =>
message.role === "assistant" &&
message.text.includes("Recovered queued follow-up output."),
),
);

assert.equal(recoveredThread.queuedFollowUps.length, 0);
}),
);
}).pipe(Effect.provide(NodeServices.layer)),
);

it.live.skipIf(!process.env.CODEX_BINARY_PATH)(
"keeps the same Codex provider thread across runtime mode switches",
() =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ function makeSnapshot(input: {
archivedAt: null,
deletedAt: null,
messages: [],
queuedFollowUps: [],
activities: [],
proposedPlans: [],
checkpoints: [
Expand Down
10 changes: 10 additions & 0 deletions apps/server/src/orchestration/Layers/OrchestrationReactor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { CheckpointReactor } from "../Services/CheckpointReactor.ts";
import { ProviderCommandReactor } from "../Services/ProviderCommandReactor.ts";
import { ProviderRuntimeIngestionService } from "../Services/ProviderRuntimeIngestion.ts";
import { OrchestrationReactor } from "../Services/OrchestrationReactor.ts";
import { QueuedFollowUpReactor } from "../Services/QueuedFollowUpReactor.ts";
import { makeOrchestrationReactor } from "./OrchestrationReactor.ts";

describe("OrchestrationReactor", () => {
Expand Down Expand Up @@ -46,6 +47,14 @@ describe("OrchestrationReactor", () => {
drain: Effect.void,
}),
),
Layer.provideMerge(
Layer.succeed(QueuedFollowUpReactor, {
start: Effect.sync(() => {
started.push("queued-follow-up-reactor");
}),
drain: Effect.void,
}),
),
),
);

Expand All @@ -57,6 +66,7 @@ describe("OrchestrationReactor", () => {
"provider-runtime-ingestion",
"provider-command-reactor",
"checkpoint-reactor",
"queued-follow-up-reactor",
]);

await Effect.runPromise(Scope.close(scope, Exit.void));
Expand Down
3 changes: 3 additions & 0 deletions apps/server/src/orchestration/Layers/OrchestrationReactor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@ import {
import { CheckpointReactor } from "../Services/CheckpointReactor.ts";
import { ProviderCommandReactor } from "../Services/ProviderCommandReactor.ts";
import { ProviderRuntimeIngestionService } from "../Services/ProviderRuntimeIngestion.ts";
import { QueuedFollowUpReactor } from "../Services/QueuedFollowUpReactor.ts";

export const makeOrchestrationReactor = Effect.gen(function* () {
const providerRuntimeIngestion = yield* ProviderRuntimeIngestionService;
const providerCommandReactor = yield* ProviderCommandReactor;
const checkpointReactor = yield* CheckpointReactor;
const queuedFollowUpReactor = yield* QueuedFollowUpReactor;

const start: OrchestrationReactorShape["start"] = Effect.gen(function* () {
yield* providerRuntimeIngestion.start;
yield* providerCommandReactor.start;
yield* checkpointReactor.start;
yield* queuedFollowUpReactor.start;
});

return {
Expand Down
Loading
Loading