diff --git a/packages/opencode/src/config/config.ts b/packages/opencode/src/config/config.ts index 6520fb3ab92..49d1263828e 100644 --- a/packages/opencode/src/config/config.ts +++ b/packages/opencode/src/config/config.ts @@ -811,6 +811,13 @@ export namespace Config { .optional() .describe("Tools that should only be available to primary agents."), continue_loop_on_deny: z.boolean().optional().describe("Continue the agent loop when a tool call is denied"), + backgroundTasks: z + .object({ + timeoutMs: z.number().int().positive().optional().describe("Timeout for background tasks in ms"), + maxConcurrent: z.number().int().positive().optional().describe("Max concurrent background tasks"), + }) + .optional() + .describe("Background task runner settings"), }) .optional(), }) diff --git a/packages/opencode/src/server/server.ts b/packages/opencode/src/server/server.ts index e92c46225d5..b85c755cb95 100644 --- a/packages/opencode/src/server/server.ts +++ b/packages/opencode/src/server/server.ts @@ -32,6 +32,7 @@ import { ProjectRoute } from "./project" import { ToolRegistry } from "../tool/registry" import { zodToJsonSchema } from "zod-to-json-schema" import { SessionPrompt } from "../session/prompt" +import { SessionRunner } from "../session/runner" import { SessionCompaction } from "../session/compaction" import { SessionRevert } from "../session/revert" import { lazy } from "../util/lazy" @@ -1325,13 +1326,14 @@ export namespace Server { ), validator("json", SessionPrompt.PromptInput.omit({ sessionID: true })), async (c) => { - c.status(204) - c.header("Content-Type", "application/json") - return stream(c, async () => { - const sessionID = c.req.valid("param").sessionID - const body = c.req.valid("json") - SessionPrompt.prompt({ ...body, sessionID }) - }) + const sessionID = c.req.valid("param").sessionID + const body = c.req.valid("json") + void SessionRunner.promptBackground({ + ...body, + sessionID, + kind: "session.prompt_async", + }).catch((err) => log.error("prompt_async failed", { sessionID, error: err })) + return c.body(null, 204) }, ) .post( diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index e393e2fab9f..3d8b1b109a1 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -606,11 +606,13 @@ export namespace SessionPrompt { args, }, ) + const toolPart = input.processor.partFromToolCall(options.toolCallId) const result = await item.execute(args, { sessionID: input.sessionID, abort: options.abortSignal!, messageID: input.processor.message.id, callID: options.toolCallId, + toolPartID: toolPart?.id, extra: { model: input.model }, agent: input.agent.name, metadata: async (val) => { diff --git a/packages/opencode/src/session/runner.ts b/packages/opencode/src/session/runner.ts new file mode 100644 index 00000000000..aaf441fbf02 --- /dev/null +++ b/packages/opencode/src/session/runner.ts @@ -0,0 +1,483 @@ +import z from "zod" +import { Identifier } from "../id/id" +import { Log } from "../util/log" +import { SessionPrompt } from "./prompt" +import { MessageV2 } from "./message-v2" +import { Instance } from "../project/instance" +import { Bus } from "../bus" +import { BusEvent } from "../bus/bus-event" +import { Config } from "../config/config" +import { fn } from "@/util/fn" +import { ulid } from "ulid" + +export namespace SessionRunner { + const log = Log.create({ service: "session.runner" }) + + const DEFAULT_TIMEOUT_MS = 10 * 60 * 1000 // 10 minutes + const DEFAULT_MAX_CONCURRENT = 2 + + export const JobKind = z.enum(["session.loop", "session.prompt_async", "task.child_session"]).meta({ + ref: "SessionRunnerJobKind", + }) + export type JobKind = z.infer + + export const JobStatus = z.enum(["queued", "running", "completed", "failed", "canceled", "timed_out"]).meta({ + ref: "SessionRunnerJobStatus", + }) + export type JobStatus = z.infer + + export const JobError = z + .object({ + name: z.string().optional(), + message: z.string(), + }) + .meta({ ref: "SessionRunnerJobError" }) + export type JobError = z.infer + + export const Job = z + .object({ + id: z.string(), + kind: JobKind, + targetSessionID: z.string(), + parentSessionID: z.string().optional(), + toolCallID: z.string().optional(), + createdAt: z.number(), + startedAt: z.number().optional(), + finishedAt: z.number().optional(), + timeoutMs: z.number().optional(), + status: JobStatus, + error: JobError.optional(), + }) + .meta({ ref: "SessionRunnerJob" }) + export type Job = z.infer + + export const EnqueueOptions = z + .object({ + timeoutMs: z.number().optional(), + parentSessionID: z.string().optional(), + toolCallID: z.string().optional(), + dedupeKey: z.string().optional(), + }) + .meta({ ref: "SessionRunnerEnqueueOptions" }) + export type EnqueueOptions = z.infer + + export const Event = { + Queued: BusEvent.define( + "session.background.queued", + z.object({ + job: Job, + }), + ), + Started: BusEvent.define( + "session.background.started", + z.object({ + job: Job, + }), + ), + Completed: BusEvent.define( + "session.background.completed", + z.object({ + job: Job, + }), + ), + Failed: BusEvent.define( + "session.background.failed", + z.object({ + job: Job, + }), + ), + Canceled: BusEvent.define( + "session.background.canceled", + z.object({ + job: Job, + }), + ), + TimedOut: BusEvent.define( + "session.background.timed_out", + z.object({ + job: Job, + }), + ), + } + + export const Options = z + .object({ + model: z.object({ + providerID: z.string(), + modelID: z.string(), + }), + agent: z.string(), + tools: z.record(z.string(), z.boolean()).optional(), + origin: z + .object({ + parentSessionID: Identifier.schema("session").optional(), + parentMessageID: Identifier.schema("message").optional(), + description: z.string().optional(), + command: z.string().optional(), + }) + .optional(), + timeoutMs: z.number().optional(), + maxSteps: z.number().optional(), + }) + .meta({ ref: "SessionRunnerOptions" }) + export type Options = z.infer + + export const RunResult = z + .object({ + sessionID: Identifier.schema("session"), + message: MessageV2.WithParts, + success: z.boolean(), + error: z.string().optional(), + }) + .meta({ ref: "SessionRunnerResult" }) + export type RunResult = z.infer + + interface QueuedJob { + job: Job + run: (abort: AbortSignal) => Promise + resolve: (job: Job) => void + } + + const MAX_HISTORY = 100 + + const state = Instance.state( + () => ({ + queue: [] as QueuedJob[], + jobsById: {} as Record, + abortById: {} as Record, + dedupeKeys: {} as Record, + completionPromises: {} as Record>, + running: 0, + }), + async (s) => { + for (const queued of s.queue) { + queued.job.status = "canceled" + queued.job.finishedAt = Date.now() + queued.job.error = { message: "Instance disposed" } + queued.resolve(queued.job) + } + s.queue = [] + for (const id of Object.keys(s.abortById)) { + s.abortById[id].abort(new Error("disposed")) + } + }, + ) + + async function getConfig() { + const cfg = await Config.get() + return { + timeoutMs: cfg.experimental?.backgroundTasks?.timeoutMs ?? DEFAULT_TIMEOUT_MS, + maxConcurrent: cfg.experimental?.backgroundTasks?.maxConcurrent ?? DEFAULT_MAX_CONCURRENT, + } + } + + async function processQueue() { + const s = state() + const config = await getConfig() + + while (s.queue.length > 0 && s.running < config.maxConcurrent) { + const next = s.queue.shift() + if (!next) break + + s.running++ + runJob(next).finally(() => { + s.running-- + processQueue() + }) + } + } + + async function runJob(queued: QueuedJob) { + const s = state() + const config = await getConfig() + const job = queued.job + const timeout = job.timeoutMs ?? config.timeoutMs + + const abort = new AbortController() + s.abortById[job.id] = abort + + job.status = "running" + job.startedAt = Date.now() + s.jobsById[job.id] = job + Bus.publish(Event.Started, { job }) + log.info("job started", { id: job.id, kind: job.kind }) + + let timer: ReturnType | undefined + + try { + await Promise.race([ + queued.run(abort.signal), + new Promise((_, reject) => { + timer = setTimeout(() => { + abort.abort(new Error("timeout")) + reject(new Error("timeout")) + }, timeout) + }), + new Promise((_, reject) => { + abort.signal.addEventListener("abort", () => reject(abort.signal.reason), { once: true }) + }), + ]) + job.status = "completed" + job.finishedAt = Date.now() + Bus.publish(Event.Completed, { job }) + log.info("job completed", { id: job.id }) + } catch (err) { + job.finishedAt = Date.now() + const msg = err instanceof Error ? err.message : String(err) + if (msg === "timeout" || abort.signal.reason?.message === "timeout") { + job.status = "timed_out" + job.error = { message: "Job timed out" } + Bus.publish(Event.TimedOut, { job }) + log.warn("job timed out", { id: job.id }) + } else if (abort.signal.aborted) { + job.status = "canceled" + job.error = { message: "Job canceled" } + Bus.publish(Event.Canceled, { job }) + log.info("job canceled", { id: job.id }) + } else { + job.status = "failed" + job.error = { + name: err instanceof Error ? err.name : undefined, + message: msg, + } + Bus.publish(Event.Failed, { job }) + log.error("job failed", { id: job.id, error: job.error }) + } + } finally { + if (timer) clearTimeout(timer) + delete s.abortById[job.id] + s.jobsById[job.id] = job + pruneHistory(s) + queued.resolve(job) + } + } + + function pruneHistory(s: ReturnType) { + const ids = Object.keys(s.jobsById) + if (ids.length <= MAX_HISTORY) return + + const completed = ids + .filter((id) => { + const status = s.jobsById[id].status + return status !== "queued" && status !== "running" + }) + .sort((a, b) => (s.jobsById[a].finishedAt ?? 0) - (s.jobsById[b].finishedAt ?? 0)) + + const toRemove = completed.slice(0, ids.length - MAX_HISTORY) + for (const id of toRemove) { + delete s.jobsById[id] + delete s.completionPromises[id] + } + + for (const [key, jobId] of Object.entries(s.dedupeKeys)) { + if (!s.jobsById[jobId]) delete s.dedupeKeys[key] + } + } + + export async function enqueue( + kind: JobKind, + targetSessionID: string, + run: (abort: AbortSignal) => Promise, + opts?: EnqueueOptions, + ): Promise { + const s = state() + + if (opts?.dedupeKey) { + const existing = s.dedupeKeys[opts.dedupeKey] + if (existing && s.jobsById[existing]) { + const job = s.jobsById[existing] + if (job.status === "queued" || job.status === "running") { + log.info("dedupe hit", { key: opts.dedupeKey, id: existing }) + return s.completionPromises[existing] + } + } + } + + const job: Job = { + id: `job_${ulid()}`, + kind, + targetSessionID, + parentSessionID: opts?.parentSessionID, + toolCallID: opts?.toolCallID, + createdAt: Date.now(), + timeoutMs: opts?.timeoutMs, + status: "queued", + } + + s.jobsById[job.id] = job + if (opts?.dedupeKey) { + s.dedupeKeys[opts.dedupeKey] = job.id + } + + Bus.publish(Event.Queued, { job }) + log.info("job queued", { id: job.id, kind, targetSessionID }) + + const completionPromise = new Promise((resolve) => { + s.queue.push({ job, run, resolve }) + processQueue() + }) + s.completionPromises[job.id] = completionPromise + + return completionPromise + } + + export function cancel(id: string): boolean { + const s = state() + const job = s.jobsById[id] + if (!job) return false + + if (job.status === "queued") { + const idx = s.queue.findIndex((q) => q.job.id === id) + if (idx !== -1) { + const removed = s.queue.splice(idx, 1)[0] + job.status = "canceled" + job.finishedAt = Date.now() + job.error = { message: "Job canceled" } + s.jobsById[id] = job + Bus.publish(Event.Canceled, { job }) + removed.resolve(job) + log.info("job canceled (queued)", { id }) + return true + } + } + + if (job.status === "running") { + const abort = s.abortById[id] + if (abort) { + abort.abort(new Error("canceled")) + log.info("job cancel requested", { id }) + return true + } + } + + return false + } + + export function get(id: string): Job | undefined { + return state().jobsById[id] + } + + export function list(): Job[] { + return Object.values(state().jobsById) + } + + export function listQueued(): Job[] { + return state().queue.map((q) => q.job) + } + + export function listRunning(): Job[] { + return Object.values(state().jobsById).filter((j) => j.status === "running") + } + + export function isRunning(id: string): boolean { + return id in state().abortById + } + + export function listActive(): string[] { + return Object.keys(state().abortById) + } + + export async function runOnce(input: SessionPrompt.PromptInput): Promise { + log.info("runOnce", { sessionID: input.sessionID, agent: input.agent }) + return SessionPrompt.prompt(input) + } + + export const LoopBackgroundInput = z.object({ + sessionID: Identifier.schema("session"), + kind: JobKind.optional(), + parentSessionID: Identifier.schema("session").optional(), + toolCallID: z.string().optional(), + timeoutMs: z.number().optional(), + dedupeKey: z.string().optional(), + }) + export type LoopBackgroundInput = z.infer + + export const loopBackground = fn(LoopBackgroundInput, async (input): Promise => { + log.info("loopBackground", { sessionID: input.sessionID }) + return enqueue( + input.kind ?? "session.loop", + input.sessionID, + async () => { + await SessionPrompt.loop(input.sessionID) + }, + { + parentSessionID: input.parentSessionID, + toolCallID: input.toolCallID, + timeoutMs: input.timeoutMs, + dedupeKey: input.dedupeKey, + }, + ) + }) + + // Note: Defined inline to avoid circular dependency with SessionPrompt + // (prompt.ts -> task.ts -> runner.ts -> prompt.ts) + export const PromptBackgroundInput = z.object({ + sessionID: Identifier.schema("session"), + messageID: Identifier.schema("message").optional(), + model: z + .object({ + providerID: z.string(), + modelID: z.string(), + }) + .optional(), + agent: z.string().optional(), + noReply: z.boolean().optional(), + tools: z.record(z.string(), z.boolean()).optional(), + system: z.string().optional(), + parts: z.array( + z.discriminatedUnion("type", [ + MessageV2.TextPart.omit({ messageID: true, sessionID: true }).partial({ id: true }), + MessageV2.FilePart.omit({ messageID: true, sessionID: true }).partial({ id: true }), + MessageV2.AgentPart.omit({ messageID: true, sessionID: true }).partial({ id: true }), + MessageV2.SubtaskPart.omit({ messageID: true, sessionID: true }).partial({ id: true }), + ]), + ), + kind: JobKind.optional(), + parentSessionID: Identifier.schema("session").optional(), + toolCallID: z.string().optional(), + timeoutMs: z.number().optional(), + dedupeKey: z.string().optional(), + }) + export type PromptBackgroundInput = z.infer + + export const PromptBackgroundResult = z.object({ + job: Job, + message: MessageV2.WithParts, + }) + export type PromptBackgroundResult = z.infer + + export const promptBackground = fn(PromptBackgroundInput, async (input): Promise => { + log.info("promptBackground", { sessionID: input.sessionID, agent: input.agent }) + const message = await SessionPrompt.prompt({ ...input, noReply: true }) + const job = await loopBackground({ + sessionID: input.sessionID, + kind: input.kind ?? "session.prompt_async", + parentSessionID: input.parentSessionID, + toolCallID: input.toolCallID, + timeoutMs: input.timeoutMs, + dedupeKey: input.dedupeKey, + }) + return { job, message } + }) + + export function cancelBySession(sessionID: string): boolean { + const s = state() + for (const job of Object.values(s.jobsById)) { + if (job.targetSessionID === sessionID && (job.status === "queued" || job.status === "running")) { + SessionPrompt.cancel(sessionID) + return cancel(job.id) + } + } + return false + } + + export function getBySession(sessionID: string): Job | undefined { + const s = state() + for (const job of Object.values(s.jobsById)) { + if (job.targetSessionID === sessionID && (job.status === "queued" || job.status === "running")) { + return job + } + } + return undefined + } +} diff --git a/packages/opencode/src/tool/task.ts b/packages/opencode/src/tool/task.ts index bc93f497a91..44276bdfd36 100644 --- a/packages/opencode/src/tool/task.ts +++ b/packages/opencode/src/tool/task.ts @@ -7,9 +7,13 @@ import { MessageV2 } from "../session/message-v2" import { Identifier } from "../id/id" import { Agent } from "../agent/agent" import { SessionPrompt } from "../session/prompt" +import { SessionRunner } from "../session/runner" import { iife } from "@/util/iife" -import { defer } from "@/util/defer" import { Config } from "../config/config" +import { Log } from "../util/log" +import { Storage } from "../storage/storage" + +const log = Log.create({ service: "tool.task" }) export const TaskTool = Tool.define("task", async () => { const agents = await Agent.list().then((x) => x.filter((a) => a.mode !== "primary")) @@ -45,15 +49,50 @@ export const TaskTool = Tool.define("task", async () => { const msg = await MessageV2.get({ sessionID: ctx.sessionID, messageID: ctx.messageID }) if (msg.info.role !== "assistant") throw new Error("Not an assistant message") - ctx.metadata({ - title: params.description, - metadata: { - sessionId: session.id, - }, - }) - const messageID = Identifier.ascending("message") const parts: Record = {} + + const model = agent.model ?? { + modelID: msg.info.modelID, + providerID: msg.info.providerID, + } + + const promptParts = await SessionPrompt.resolvePromptParts(params.prompt) + const config = await Config.get() + + const cancelChild = () => { + SessionRunner.cancelBySession(session.id) + SessionPrompt.cancel(session.id) + } + ctx.abort.addEventListener("abort", cancelChild, { once: true }) + + // Helper to update parent tool part metadata (works after execute returns) + const updateParentToolPart = async (metadata: { + summary: typeof parts[string][] + sessionId: string + jobId?: string + status?: string + }) => { + if (!ctx.toolPartID) return + const currentPart = await Storage.read(["part", ctx.messageID, ctx.toolPartID]).catch( + (err) => { + log.warn("failed to read parent tool part", { error: err, partID: ctx.toolPartID }) + return undefined + }, + ) + if (!currentPart || currentPart.type !== "tool") return + // Skip pending (no metadata field) and error (terminal state) + if (currentPart.state.status === "pending" || currentPart.state.status === "error") return + await Session.updatePart({ + ...currentPart, + state: { + ...currentPart.state, + metadata, + }, + }).catch((err) => log.warn("failed to update parent tool part", { error: err })) + } + + // Subscribe to child session part updates for live progress const unsub = Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { if (evt.properties.part.sessionID !== session.id) return if (evt.properties.part.messageID === messageID) return @@ -67,69 +106,85 @@ export const TaskTool = Tool.define("task", async () => { title: part.state.status === "completed" ? part.state.title : undefined, }, } - ctx.metadata({ - title: params.description, - metadata: { - summary: Object.values(parts).sort((a, b) => a.id.localeCompare(b.id)), - sessionId: session.id, - }, + await updateParentToolPart({ + summary: Object.values(parts).sort((a, b) => a.id.localeCompare(b.id)), + sessionId: session.id, }) }) - const model = agent.model ?? { - modelID: msg.info.modelID, - providerID: msg.info.providerID, + // Cleanup function for all subscriptions + const cleanup = () => { + unsub() + ctx.abort.removeEventListener("abort", cancelChild) } - function cancel() { - SessionPrompt.cancel(session.id) - } - ctx.abort.addEventListener("abort", cancel) - using _ = defer(() => ctx.abort.removeEventListener("abort", cancel)) - const promptParts = await SessionPrompt.resolvePromptParts(params.prompt) + // Subscribe to job lifecycle events for cleanup and status updates + const jobEvents = [ + SessionRunner.Event.Completed, + SessionRunner.Event.Failed, + SessionRunner.Event.Canceled, + SessionRunner.Event.TimedOut, + ] as const + const jobUnsubs = jobEvents.map((event) => + Bus.subscribe(event, async (evt) => { + if (evt.properties.job.targetSessionID !== session.id) return + const job = evt.properties.job + // Update parent metadata with final status + await updateParentToolPart({ + summary: Object.values(parts).sort((a, b) => a.id.localeCompare(b.id)), + sessionId: session.id, + status: job.status, + }) + if (job.status !== "completed") { + log.info("child session job ended", { jobId: job.id, status: job.status, error: job.error }) + } + cleanup() + jobUnsubs.forEach((u) => u()) + }), + ) - const config = await Config.get() - const result = await SessionPrompt.prompt({ - messageID, - sessionID: session.id, - model: { - modelID: model.modelID, - providerID: model.providerID, + // Enqueue the child session work (fire-and-forget) + SessionRunner.enqueue( + "task.child_session", + session.id, + async () => { + await SessionPrompt.prompt({ + messageID, + sessionID: session.id, + model: { + modelID: model.modelID, + providerID: model.providerID, + }, + agent: agent.name, + tools: { + todowrite: false, + todoread: false, + task: false, + ...Object.fromEntries((config.experimental?.primary_tools ?? []).map((t) => [t, false])), + ...agent.tools, + }, + parts: promptParts, + }) }, - agent: agent.name, - tools: { - todowrite: false, - todoread: false, - task: false, - ...Object.fromEntries((config.experimental?.primary_tools ?? []).map((t) => [t, false])), - ...agent.tools, + { + parentSessionID: ctx.sessionID, + toolCallID: ctx.callID ?? ctx.messageID, }, - parts: promptParts, + ).catch((err) => { + log.error("failed to enqueue child session", { error: err, sessionID: session.id }) + cleanup() + jobUnsubs.forEach((u) => u()) }) - unsub() - const messages = await Session.messages({ sessionID: session.id }) - const summary = messages - .filter((x) => x.info.role === "assistant") - .flatMap((msg) => msg.parts.filter((x: any) => x.type === "tool") as MessageV2.ToolPart[]) - .map((part) => ({ - id: part.id, - tool: part.tool, - state: { - status: part.state.status, - title: part.state.status === "completed" ? part.state.title : undefined, - }, - })) - const text = result.parts.findLast((x) => x.type === "text")?.text ?? "" - - const output = text + "\n\n" + ["", `session_id: ${session.id}`, ""].join("\n") + // Return immediately without waiting for job completion return { title: params.description, metadata: { - summary, + summary: [] as (typeof parts)[string][], sessionId: session.id, + status: "running", }, - output, + output: `Task started in background.\n\n\nsession_id: ${session.id}\n`, } }, } diff --git a/packages/opencode/src/tool/tool.ts b/packages/opencode/src/tool/tool.ts index 80b6abe8c74..6b2753d20ed 100644 --- a/packages/opencode/src/tool/tool.ts +++ b/packages/opencode/src/tool/tool.ts @@ -12,6 +12,7 @@ export namespace Tool { agent: string abort: AbortSignal callID?: string + toolPartID?: string extra?: { [key: string]: any } metadata(input: { title?: string; metadata?: M }): void } diff --git a/packages/opencode/test/session/runner.test.ts b/packages/opencode/test/session/runner.test.ts new file mode 100644 index 00000000000..85e7db13606 --- /dev/null +++ b/packages/opencode/test/session/runner.test.ts @@ -0,0 +1,400 @@ +import { describe, expect, test } from "bun:test" +import { SessionRunner } from "../../src/session/runner" +import { Instance } from "../../src/project/instance" +import { tmpdir } from "../fixture/fixture" + +describe("SessionRunner", () => { + describe("Options schema", () => { + test("validates valid options", () => { + const valid = { + model: { providerID: "anthropic", modelID: "claude-3-5-sonnet" }, + agent: "code", + } + expect(SessionRunner.Options.safeParse(valid).success).toBe(true) + }) + + test("validates options with tools", () => { + const opts = { + model: { providerID: "anthropic", modelID: "claude-3-5-sonnet" }, + agent: "code", + tools: { bash: true, read: true, write: false }, + } + expect(SessionRunner.Options.safeParse(opts).success).toBe(true) + }) + + test("validates options with timeout", () => { + const opts = { + model: { providerID: "openai", modelID: "gpt-4" }, + agent: "general", + timeoutMs: 30000, + maxSteps: 10, + } + expect(SessionRunner.Options.safeParse(opts).success).toBe(true) + }) + + test("rejects missing model", () => { + expect(SessionRunner.Options.safeParse({ agent: "code" }).success).toBe(false) + }) + + test("rejects missing agent", () => { + const opts = { model: { providerID: "anthropic", modelID: "claude-3-5-sonnet" } } + expect(SessionRunner.Options.safeParse(opts).success).toBe(false) + }) + + test("rejects invalid model structure", () => { + const opts = { model: { providerID: "anthropic" }, agent: "code" } + expect(SessionRunner.Options.safeParse(opts).success).toBe(false) + }) + }) + + describe("Job schema", () => { + test("validates job with required fields", () => { + const job = { + id: "job_123", + kind: "session.loop", + targetSessionID: "ses_abc", + createdAt: Date.now(), + status: "queued", + } + expect(SessionRunner.Job.safeParse(job).success).toBe(true) + }) + + test("validates job with all fields", () => { + const job = { + id: "job_123", + kind: "task.child_session", + targetSessionID: "ses_abc", + parentSessionID: "ses_parent", + toolCallID: "call_xyz", + createdAt: Date.now(), + startedAt: Date.now(), + finishedAt: Date.now(), + timeoutMs: 30000, + status: "completed", + error: { name: "Error", message: "failed" }, + } + expect(SessionRunner.Job.safeParse(job).success).toBe(true) + }) + + test("validates all job kinds", () => { + for (const kind of ["session.loop", "session.prompt_async", "task.child_session"]) { + const job = { id: "j", kind, targetSessionID: "s", createdAt: 0, status: "queued" } + expect(SessionRunner.Job.safeParse(job).success).toBe(true) + } + }) + + test("validates all job statuses", () => { + for (const status of ["queued", "running", "completed", "failed", "canceled", "timed_out"]) { + const job = { id: "j", kind: "session.loop", targetSessionID: "s", createdAt: 0, status } + expect(SessionRunner.Job.safeParse(job).success).toBe(true) + } + }) + }) + + describe("EnqueueOptions schema", () => { + test("validates empty options", () => { + expect(SessionRunner.EnqueueOptions.safeParse({}).success).toBe(true) + }) + + test("validates full options", () => { + const opts = { + timeoutMs: 60000, + parentSessionID: "ses_parent", + toolCallID: "call_123", + dedupeKey: "my-key", + } + expect(SessionRunner.EnqueueOptions.safeParse(opts).success).toBe(true) + }) + }) + + describe("LoopBackgroundInput schema", () => { + test("validates minimal input", () => { + const input = { sessionID: "session_123" } + expect(SessionRunner.LoopBackgroundInput.safeParse(input).success).toBe(true) + }) + + test("validates full input", () => { + const input = { + sessionID: "session_123", + kind: "session.loop" as const, + parentSessionID: "session_parent", + toolCallID: "tool_123", + timeoutMs: 30000, + dedupeKey: "my-key", + } + expect(SessionRunner.LoopBackgroundInput.safeParse(input).success).toBe(true) + }) + }) + + describe("PromptBackgroundInput schema", () => { + test("validates minimal input", () => { + const input = { + sessionID: "session_123", + parts: [{ type: "text" as const, text: "hello" }], + } + expect(SessionRunner.PromptBackgroundInput.safeParse(input).success).toBe(true) + }) + + test("validates full input with job options", () => { + const input = { + sessionID: "session_123", + parts: [{ type: "text" as const, text: "hello" }], + model: { providerID: "anthropic", modelID: "claude-3-5-sonnet" }, + agent: "build", + kind: "session.prompt_async" as const, + parentSessionID: "session_parent", + timeoutMs: 60000, + } + expect(SessionRunner.PromptBackgroundInput.safeParse(input).success).toBe(true) + }) + }) + + describe("helper functions", () => { + test("cancelBySession returns false for unknown session", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: () => { + expect(SessionRunner.cancelBySession("unknown_session")).toBe(false) + }, + }) + }) + + test("getBySession returns undefined for unknown session", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: () => { + expect(SessionRunner.getBySession("unknown_session")).toBeUndefined() + }, + }) + }) + + test("getBySession finds active job", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + let resolve: () => void + const blocker = new Promise((r) => { + resolve = r + }) + + const jobPromise = SessionRunner.enqueue("session.loop", "ses_target", async () => { + await blocker + }) + + await new Promise((r) => setTimeout(r, 10)) + + const found = SessionRunner.getBySession("ses_target") + expect(found).toBeDefined() + expect(found?.targetSessionID).toBe("ses_target") + expect(["queued", "running"]).toContain(found!.status) + + resolve!() + await jobPromise + }, + }) + }) + + test("cancelBySession cancels running job", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + let started = false + const jobPromise = SessionRunner.enqueue("session.loop", "ses_cancel", async () => { + started = true + await new Promise((r) => setTimeout(r, 5000)) + }) + + while (!started) { + await new Promise((r) => setTimeout(r, 5)) + } + + const cancelled = SessionRunner.cancelBySession("ses_cancel") + expect(cancelled).toBe(true) + + const job = await jobPromise + expect(job.status).toBe("canceled") + }, + }) + }) + }) + + describe("job management", () => { + test("get returns undefined for unknown job", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: () => { + expect(SessionRunner.get("nonexistent")).toBeUndefined() + }, + }) + }) + + test("list returns empty array initially", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: () => { + expect(SessionRunner.list()).toEqual([]) + }, + }) + }) + + test("cancel returns false for unknown job", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: () => { + expect(SessionRunner.cancel("nonexistent")).toBe(false) + }, + }) + }) + + test("isRunning returns false for unknown job", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: () => { + expect(SessionRunner.isRunning("nonexistent")).toBe(false) + }, + }) + }) + + test("enqueue creates job and runs it", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + let executed = false + const job = await SessionRunner.enqueue( + "session.loop", + "ses_test", + async () => { + executed = true + }, + ) + + expect(job.id).toMatch(/^job_/) + expect(job.kind).toBe("session.loop") + expect(job.targetSessionID).toBe("ses_test") + expect(job.status).toBe("completed") + expect(executed).toBe(true) + }, + }) + }) + + test("enqueue with dedupeKey reuses existing job", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + let count = 0 + const run = async () => { + count++ + await new Promise((r) => setTimeout(r, 50)) + } + + const [job1, job2] = await Promise.all([ + SessionRunner.enqueue("session.loop", "ses_test", run, { dedupeKey: "same" }), + SessionRunner.enqueue("session.loop", "ses_test", run, { dedupeKey: "same" }), + ]) + + expect(job1.id).toBe(job2.id) + expect(count).toBe(1) + }, + }) + }) + + test("cancel stops queued job", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + let executed = false + + const blocker = SessionRunner.enqueue("session.loop", "ses_blocker", async () => { + await new Promise((r) => setTimeout(r, 100)) + }) + const blocker2 = SessionRunner.enqueue("session.loop", "ses_blocker2", async () => { + await new Promise((r) => setTimeout(r, 100)) + }) + + const jobPromise = SessionRunner.enqueue("session.loop", "ses_test", async () => { + executed = true + }) + + await new Promise((r) => setTimeout(r, 10)) + + const queued = SessionRunner.listQueued() + if (queued.length > 0) { + const cancelled = SessionRunner.cancel(queued[0].id) + expect(cancelled).toBe(true) + } + + await Promise.all([blocker, blocker2, jobPromise]) + }, + }) + }) + + test("job times out", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const job = await SessionRunner.enqueue( + "session.loop", + "ses_test", + async () => { + await new Promise((r) => setTimeout(r, 500)) + }, + { timeoutMs: 50 }, + ) + + expect(job.status).toBe("timed_out") + expect(job.error?.message).toBe("Job timed out") + }, + }) + }) + + test("job fails on error", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const job = await SessionRunner.enqueue("session.loop", "ses_test", async () => { + throw new Error("test error") + }) + + expect(job.status).toBe("failed") + expect(job.error?.message).toBe("test error") + }, + }) + }) + + test("respects concurrency limit", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + let maxConcurrent = 0 + let current = 0 + + const jobs = Array.from({ length: 5 }, (_, i) => + SessionRunner.enqueue("session.loop", `ses_${i}`, async () => { + current++ + maxConcurrent = Math.max(maxConcurrent, current) + await new Promise((r) => setTimeout(r, 30)) + current-- + }), + ) + + await Promise.all(jobs) + expect(maxConcurrent).toBeLessThanOrEqual(2) + }, + }) + }) + }) +})