diff --git a/packages/opencode/src/config/config.ts b/packages/opencode/src/config/config.ts index 6520fb3ab92..49d1263828e 100644 --- a/packages/opencode/src/config/config.ts +++ b/packages/opencode/src/config/config.ts @@ -811,6 +811,13 @@ export namespace Config { .optional() .describe("Tools that should only be available to primary agents."), continue_loop_on_deny: z.boolean().optional().describe("Continue the agent loop when a tool call is denied"), + backgroundTasks: z + .object({ + timeoutMs: z.number().int().positive().optional().describe("Timeout for background tasks in ms"), + maxConcurrent: z.number().int().positive().optional().describe("Max concurrent background tasks"), + }) + .optional() + .describe("Background task runner settings"), }) .optional(), }) diff --git a/packages/opencode/src/session/runner.ts b/packages/opencode/src/session/runner.ts new file mode 100644 index 00000000000..2beb33dd4dc --- /dev/null +++ b/packages/opencode/src/session/runner.ts @@ -0,0 +1,396 @@ +import z from "zod" +import { Identifier } from "../id/id" +import { Log } from "../util/log" +import { SessionPrompt } from "./prompt" +import { MessageV2 } from "./message-v2" +import { Instance } from "../project/instance" +import { Bus } from "../bus" +import { BusEvent } from "../bus/bus-event" +import { Config } from "../config/config" +import { fn } from "@/util/fn" +import { ulid } from "ulid" + +export namespace SessionRunner { + const log = Log.create({ service: "session.runner" }) + + const DEFAULT_TIMEOUT_MS = 10 * 60 * 1000 // 10 minutes + const DEFAULT_MAX_CONCURRENT = 2 + + export const JobKind = z.enum(["session.loop", "session.prompt_async", "task.child_session"]).meta({ + ref: "SessionRunnerJobKind", + }) + export type JobKind = z.infer + + export const JobStatus = z.enum(["queued", "running", "completed", "failed", "canceled", "timed_out"]).meta({ + ref: "SessionRunnerJobStatus", + }) + export type JobStatus = z.infer + + export const JobError = z + .object({ + name: z.string().optional(), + message: z.string(), + }) + .meta({ ref: "SessionRunnerJobError" }) + export type JobError = z.infer + + export const Job = z + .object({ + id: z.string(), + kind: JobKind, + targetSessionID: z.string(), + parentSessionID: z.string().optional(), + toolCallID: z.string().optional(), + createdAt: z.number(), + startedAt: z.number().optional(), + finishedAt: z.number().optional(), + timeoutMs: z.number().optional(), + status: JobStatus, + error: JobError.optional(), + }) + .meta({ ref: "SessionRunnerJob" }) + export type Job = z.infer + + export const EnqueueOptions = z + .object({ + timeoutMs: z.number().optional(), + parentSessionID: z.string().optional(), + toolCallID: z.string().optional(), + dedupeKey: z.string().optional(), + }) + .meta({ ref: "SessionRunnerEnqueueOptions" }) + export type EnqueueOptions = z.infer + + export const Event = { + Queued: BusEvent.define( + "session.background.queued", + z.object({ + job: Job, + }), + ), + Started: BusEvent.define( + "session.background.started", + z.object({ + job: Job, + }), + ), + Completed: BusEvent.define( + "session.background.completed", + z.object({ + job: Job, + }), + ), + Failed: BusEvent.define( + "session.background.failed", + z.object({ + job: Job, + }), + ), + Canceled: BusEvent.define( + "session.background.canceled", + z.object({ + job: Job, + }), + ), + TimedOut: BusEvent.define( + "session.background.timed_out", + z.object({ + job: Job, + }), + ), + } + + export const Options = z + .object({ + model: z.object({ + providerID: z.string(), + modelID: z.string(), + }), + agent: z.string(), + tools: z.record(z.string(), z.boolean()).optional(), + origin: z + .object({ + parentSessionID: Identifier.schema("session").optional(), + parentMessageID: Identifier.schema("message").optional(), + description: z.string().optional(), + command: z.string().optional(), + }) + .optional(), + timeoutMs: z.number().optional(), + maxSteps: z.number().optional(), + }) + .meta({ ref: "SessionRunnerOptions" }) + export type Options = z.infer + + export const RunResult = z + .object({ + sessionID: Identifier.schema("session"), + message: MessageV2.WithParts, + success: z.boolean(), + error: z.string().optional(), + }) + .meta({ ref: "SessionRunnerResult" }) + export type RunResult = z.infer + + interface QueuedJob { + job: Job + run: (abort: AbortSignal) => Promise + resolve: (job: Job) => void + } + + const MAX_HISTORY = 100 + + const state = Instance.state( + () => ({ + queue: [] as QueuedJob[], + jobsById: {} as Record, + abortById: {} as Record, + dedupeKeys: {} as Record, + completionPromises: {} as Record>, + running: 0, + }), + async (s) => { + for (const queued of s.queue) { + queued.job.status = "canceled" + queued.job.finishedAt = Date.now() + queued.job.error = { message: "Instance disposed" } + queued.resolve(queued.job) + } + s.queue = [] + for (const id of Object.keys(s.abortById)) { + s.abortById[id].abort(new Error("disposed")) + } + }, + ) + + async function getConfig() { + const cfg = await Config.get() + return { + timeoutMs: cfg.experimental?.backgroundTasks?.timeoutMs ?? DEFAULT_TIMEOUT_MS, + maxConcurrent: cfg.experimental?.backgroundTasks?.maxConcurrent ?? DEFAULT_MAX_CONCURRENT, + } + } + + async function processQueue() { + const s = state() + const config = await getConfig() + + while (s.queue.length > 0 && s.running < config.maxConcurrent) { + const next = s.queue.shift() + if (!next) break + + s.running++ + runJob(next).finally(() => { + s.running-- + processQueue() + }) + } + } + + async function runJob(queued: QueuedJob) { + const s = state() + const config = await getConfig() + const job = queued.job + const timeout = job.timeoutMs ?? config.timeoutMs + + const abort = new AbortController() + s.abortById[job.id] = abort + + job.status = "running" + job.startedAt = Date.now() + s.jobsById[job.id] = job + Bus.publish(Event.Started, { job }) + log.info("job started", { id: job.id, kind: job.kind }) + + let timer: ReturnType | undefined + + try { + await Promise.race([ + queued.run(abort.signal), + new Promise((_, reject) => { + timer = setTimeout(() => { + abort.abort(new Error("timeout")) + reject(new Error("timeout")) + }, timeout) + }), + new Promise((_, reject) => { + abort.signal.addEventListener("abort", () => reject(abort.signal.reason), { once: true }) + }), + ]) + job.status = "completed" + job.finishedAt = Date.now() + Bus.publish(Event.Completed, { job }) + log.info("job completed", { id: job.id }) + } catch (err) { + job.finishedAt = Date.now() + const msg = err instanceof Error ? err.message : String(err) + if (msg === "timeout" || abort.signal.reason?.message === "timeout") { + job.status = "timed_out" + job.error = { message: "Job timed out" } + Bus.publish(Event.TimedOut, { job }) + log.warn("job timed out", { id: job.id }) + } else if (abort.signal.aborted) { + job.status = "canceled" + job.error = { message: "Job canceled" } + Bus.publish(Event.Canceled, { job }) + log.info("job canceled", { id: job.id }) + } else { + job.status = "failed" + job.error = { + name: err instanceof Error ? err.name : undefined, + message: msg, + } + Bus.publish(Event.Failed, { job }) + log.error("job failed", { id: job.id, error: job.error }) + } + } finally { + if (timer) clearTimeout(timer) + delete s.abortById[job.id] + s.jobsById[job.id] = job + pruneHistory(s) + queued.resolve(job) + } + } + + function pruneHistory(s: ReturnType) { + const ids = Object.keys(s.jobsById) + if (ids.length <= MAX_HISTORY) return + + const completed = ids + .filter((id) => { + const status = s.jobsById[id].status + return status !== "queued" && status !== "running" + }) + .sort((a, b) => (s.jobsById[a].finishedAt ?? 0) - (s.jobsById[b].finishedAt ?? 0)) + + const toRemove = completed.slice(0, ids.length - MAX_HISTORY) + for (const id of toRemove) { + delete s.jobsById[id] + delete s.completionPromises[id] + } + + for (const [key, jobId] of Object.entries(s.dedupeKeys)) { + if (!s.jobsById[jobId]) delete s.dedupeKeys[key] + } + } + + export async function enqueue( + kind: JobKind, + targetSessionID: string, + run: (abort: AbortSignal) => Promise, + opts?: EnqueueOptions, + ): Promise { + const s = state() + + if (opts?.dedupeKey) { + const existing = s.dedupeKeys[opts.dedupeKey] + if (existing && s.jobsById[existing]) { + const job = s.jobsById[existing] + if (job.status === "queued" || job.status === "running") { + log.info("dedupe hit", { key: opts.dedupeKey, id: existing }) + return s.completionPromises[existing] + } + } + } + + const job: Job = { + id: `job_${ulid()}`, + kind, + targetSessionID, + parentSessionID: opts?.parentSessionID, + toolCallID: opts?.toolCallID, + createdAt: Date.now(), + timeoutMs: opts?.timeoutMs, + status: "queued", + } + + s.jobsById[job.id] = job + if (opts?.dedupeKey) { + s.dedupeKeys[opts.dedupeKey] = job.id + } + + Bus.publish(Event.Queued, { job }) + log.info("job queued", { id: job.id, kind, targetSessionID }) + + const completionPromise = new Promise((resolve) => { + s.queue.push({ job, run, resolve }) + processQueue() + }) + s.completionPromises[job.id] = completionPromise + + return completionPromise + } + + export function cancel(id: string): boolean { + const s = state() + const job = s.jobsById[id] + if (!job) return false + + if (job.status === "queued") { + const idx = s.queue.findIndex((q) => q.job.id === id) + if (idx !== -1) { + const removed = s.queue.splice(idx, 1)[0] + job.status = "canceled" + job.finishedAt = Date.now() + job.error = { message: "Job canceled" } + s.jobsById[id] = job + Bus.publish(Event.Canceled, { job }) + removed.resolve(job) + log.info("job canceled (queued)", { id }) + return true + } + } + + if (job.status === "running") { + const abort = s.abortById[id] + if (abort) { + abort.abort(new Error("canceled")) + log.info("job cancel requested", { id }) + return true + } + } + + return false + } + + export function get(id: string): Job | undefined { + return state().jobsById[id] + } + + export function list(): Job[] { + return Object.values(state().jobsById) + } + + export function listQueued(): Job[] { + return state().queue.map((q) => q.job) + } + + export function listRunning(): Job[] { + return Object.values(state().jobsById).filter((j) => j.status === "running") + } + + export function isRunning(id: string): boolean { + return id in state().abortById + } + + export function listActive(): string[] { + return Object.keys(state().abortById) + } + + export const runOnce = fn(SessionPrompt.PromptInput, async (input): Promise => { + log.info("runOnce", { sessionID: input.sessionID, agent: input.agent }) + return SessionPrompt.prompt(input) + }) + + export function runBackground(_id: string, _options: Options): void { + throw new Error("SessionRunner.runBackground not yet implemented") + } + + export function cancelBackground(_id: string): boolean { + throw new Error("SessionRunner.cancelBackground not yet implemented") + } + + export async function waitFor(_id: string): Promise { + throw new Error("SessionRunner.waitFor not yet implemented") + } +} diff --git a/packages/opencode/test/session/runner.test.ts b/packages/opencode/test/session/runner.test.ts new file mode 100644 index 00000000000..86c3d0d9786 --- /dev/null +++ b/packages/opencode/test/session/runner.test.ts @@ -0,0 +1,303 @@ +import { describe, expect, test } from "bun:test" +import { SessionRunner } from "../../src/session/runner" +import { Instance } from "../../src/project/instance" +import { tmpdir } from "../fixture/fixture" + +describe("SessionRunner", () => { + describe("Options schema", () => { + test("validates valid options", () => { + const valid = { + model: { providerID: "anthropic", modelID: "claude-3-5-sonnet" }, + agent: "code", + } + expect(SessionRunner.Options.safeParse(valid).success).toBe(true) + }) + + test("validates options with tools", () => { + const opts = { + model: { providerID: "anthropic", modelID: "claude-3-5-sonnet" }, + agent: "code", + tools: { bash: true, read: true, write: false }, + } + expect(SessionRunner.Options.safeParse(opts).success).toBe(true) + }) + + test("validates options with timeout", () => { + const opts = { + model: { providerID: "openai", modelID: "gpt-4" }, + agent: "general", + timeoutMs: 30000, + maxSteps: 10, + } + expect(SessionRunner.Options.safeParse(opts).success).toBe(true) + }) + + test("rejects missing model", () => { + expect(SessionRunner.Options.safeParse({ agent: "code" }).success).toBe(false) + }) + + test("rejects missing agent", () => { + const opts = { model: { providerID: "anthropic", modelID: "claude-3-5-sonnet" } } + expect(SessionRunner.Options.safeParse(opts).success).toBe(false) + }) + + test("rejects invalid model structure", () => { + const opts = { model: { providerID: "anthropic" }, agent: "code" } + expect(SessionRunner.Options.safeParse(opts).success).toBe(false) + }) + }) + + describe("Job schema", () => { + test("validates job with required fields", () => { + const job = { + id: "job_123", + kind: "session.loop", + targetSessionID: "ses_abc", + createdAt: Date.now(), + status: "queued", + } + expect(SessionRunner.Job.safeParse(job).success).toBe(true) + }) + + test("validates job with all fields", () => { + const job = { + id: "job_123", + kind: "task.child_session", + targetSessionID: "ses_abc", + parentSessionID: "ses_parent", + toolCallID: "call_xyz", + createdAt: Date.now(), + startedAt: Date.now(), + finishedAt: Date.now(), + timeoutMs: 30000, + status: "completed", + error: { name: "Error", message: "failed" }, + } + expect(SessionRunner.Job.safeParse(job).success).toBe(true) + }) + + test("validates all job kinds", () => { + for (const kind of ["session.loop", "session.prompt_async", "task.child_session"]) { + const job = { id: "j", kind, targetSessionID: "s", createdAt: 0, status: "queued" } + expect(SessionRunner.Job.safeParse(job).success).toBe(true) + } + }) + + test("validates all job statuses", () => { + for (const status of ["queued", "running", "completed", "failed", "canceled", "timed_out"]) { + const job = { id: "j", kind: "session.loop", targetSessionID: "s", createdAt: 0, status } + expect(SessionRunner.Job.safeParse(job).success).toBe(true) + } + }) + }) + + describe("EnqueueOptions schema", () => { + test("validates empty options", () => { + expect(SessionRunner.EnqueueOptions.safeParse({}).success).toBe(true) + }) + + test("validates full options", () => { + const opts = { + timeoutMs: 60000, + parentSessionID: "ses_parent", + toolCallID: "call_123", + dedupeKey: "my-key", + } + expect(SessionRunner.EnqueueOptions.safeParse(opts).success).toBe(true) + }) + }) + + describe("stub methods", () => { + test("runBackground throws", () => { + const opts: SessionRunner.Options = { + model: { providerID: "anthropic", modelID: "claude-3-5-sonnet" }, + agent: "code", + } + expect(() => SessionRunner.runBackground("session_123", opts)).toThrow("not yet implemented") + }) + + test("cancelBackground throws", () => { + expect(() => SessionRunner.cancelBackground("session_123")).toThrow("not yet implemented") + }) + + test("waitFor throws", async () => { + await expect(SessionRunner.waitFor("session_123")).rejects.toThrow("not yet implemented") + }) + }) + + describe("job management", () => { + test("get returns undefined for unknown job", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: () => { + expect(SessionRunner.get("nonexistent")).toBeUndefined() + }, + }) + }) + + test("list returns empty array initially", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: () => { + expect(SessionRunner.list()).toEqual([]) + }, + }) + }) + + test("cancel returns false for unknown job", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: () => { + expect(SessionRunner.cancel("nonexistent")).toBe(false) + }, + }) + }) + + test("isRunning returns false for unknown job", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: () => { + expect(SessionRunner.isRunning("nonexistent")).toBe(false) + }, + }) + }) + + test("enqueue creates job and runs it", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + let executed = false + const job = await SessionRunner.enqueue( + "session.loop", + "ses_test", + async () => { + executed = true + }, + ) + + expect(job.id).toMatch(/^job_/) + expect(job.kind).toBe("session.loop") + expect(job.targetSessionID).toBe("ses_test") + expect(job.status).toBe("completed") + expect(executed).toBe(true) + }, + }) + }) + + test("enqueue with dedupeKey reuses existing job", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + let count = 0 + const run = async () => { + count++ + await new Promise((r) => setTimeout(r, 50)) + } + + const [job1, job2] = await Promise.all([ + SessionRunner.enqueue("session.loop", "ses_test", run, { dedupeKey: "same" }), + SessionRunner.enqueue("session.loop", "ses_test", run, { dedupeKey: "same" }), + ]) + + expect(job1.id).toBe(job2.id) + expect(count).toBe(1) + }, + }) + }) + + test("cancel stops queued job", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + let executed = false + + const blocker = SessionRunner.enqueue("session.loop", "ses_blocker", async () => { + await new Promise((r) => setTimeout(r, 100)) + }) + const blocker2 = SessionRunner.enqueue("session.loop", "ses_blocker2", async () => { + await new Promise((r) => setTimeout(r, 100)) + }) + + const jobPromise = SessionRunner.enqueue("session.loop", "ses_test", async () => { + executed = true + }) + + await new Promise((r) => setTimeout(r, 10)) + + const queued = SessionRunner.listQueued() + if (queued.length > 0) { + const cancelled = SessionRunner.cancel(queued[0].id) + expect(cancelled).toBe(true) + } + + await Promise.all([blocker, blocker2, jobPromise]) + }, + }) + }) + + test("job times out", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const job = await SessionRunner.enqueue( + "session.loop", + "ses_test", + async () => { + await new Promise((r) => setTimeout(r, 500)) + }, + { timeoutMs: 50 }, + ) + + expect(job.status).toBe("timed_out") + expect(job.error?.message).toBe("Job timed out") + }, + }) + }) + + test("job fails on error", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const job = await SessionRunner.enqueue("session.loop", "ses_test", async () => { + throw new Error("test error") + }) + + expect(job.status).toBe("failed") + expect(job.error?.message).toBe("test error") + }, + }) + }) + + test("respects concurrency limit", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + let maxConcurrent = 0 + let current = 0 + + const jobs = Array.from({ length: 5 }, (_, i) => + SessionRunner.enqueue("session.loop", `ses_${i}`, async () => { + current++ + maxConcurrent = Math.max(maxConcurrent, current) + await new Promise((r) => setTimeout(r, 30)) + current-- + }), + ) + + await Promise.all(jobs) + expect(maxConcurrent).toBeLessThanOrEqual(2) + }, + }) + }) + }) +})