Skip to content

Commit ce3927a

Browse files
committed
feat(session): implement background job runner with queue and lifecycle events
- Add Job, JobKind, JobStatus, EnqueueOptions schemas - Add enqueue(), cancel(), get(), list() APIs - Add lifecycle events (Queued, Started, Completed, Failed, Canceled, TimedOut) - Add configurable timeout and concurrency via experimental.backgroundTasks - Fix: queue now restarts after idle (removed stale started flag) - Fix: dedupe now returns completion promise, not immediate job reference
1 parent c5cd7c4 commit ce3927a

File tree

3 files changed

+573
-11
lines changed

3 files changed

+573
-11
lines changed

packages/opencode/src/config/config.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -811,6 +811,13 @@ export namespace Config {
811811
.optional()
812812
.describe("Tools that should only be available to primary agents."),
813813
continue_loop_on_deny: z.boolean().optional().describe("Continue the agent loop when a tool call is denied"),
814+
backgroundTasks: z
815+
.object({
816+
timeoutMs: z.number().int().positive().optional().describe("Timeout for background tasks in ms"),
817+
maxConcurrent: z.number().int().positive().optional().describe("Max concurrent background tasks"),
818+
})
819+
.optional()
820+
.describe("Background task runner settings"),
814821
})
815822
.optional(),
816823
})

packages/opencode/src/session/runner.ts

Lines changed: 328 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,102 @@ import { Log } from "../util/log"
44
import { SessionPrompt } from "./prompt"
55
import { MessageV2 } from "./message-v2"
66
import { Instance } from "../project/instance"
7+
import { Bus } from "../bus"
8+
import { BusEvent } from "../bus/bus-event"
9+
import { Config } from "../config/config"
710
import { fn } from "@/util/fn"
11+
import { ulid } from "ulid"
812

913
export namespace SessionRunner {
1014
const log = Log.create({ service: "session.runner" })
1115

16+
const DEFAULT_TIMEOUT_MS = 10 * 60 * 1000 // 10 minutes
17+
const DEFAULT_MAX_CONCURRENT = 2
18+
19+
export const JobKind = z.enum(["session.loop", "session.prompt_async", "task.child_session"]).meta({
20+
ref: "SessionRunnerJobKind",
21+
})
22+
export type JobKind = z.infer<typeof JobKind>
23+
24+
export const JobStatus = z.enum(["queued", "running", "completed", "failed", "canceled", "timed_out"]).meta({
25+
ref: "SessionRunnerJobStatus",
26+
})
27+
export type JobStatus = z.infer<typeof JobStatus>
28+
29+
export const JobError = z
30+
.object({
31+
name: z.string().optional(),
32+
message: z.string(),
33+
})
34+
.meta({ ref: "SessionRunnerJobError" })
35+
export type JobError = z.infer<typeof JobError>
36+
37+
export const Job = z
38+
.object({
39+
id: z.string(),
40+
kind: JobKind,
41+
targetSessionID: z.string(),
42+
parentSessionID: z.string().optional(),
43+
toolCallID: z.string().optional(),
44+
createdAt: z.number(),
45+
startedAt: z.number().optional(),
46+
finishedAt: z.number().optional(),
47+
timeoutMs: z.number().optional(),
48+
status: JobStatus,
49+
error: JobError.optional(),
50+
})
51+
.meta({ ref: "SessionRunnerJob" })
52+
export type Job = z.infer<typeof Job>
53+
54+
export const EnqueueOptions = z
55+
.object({
56+
timeoutMs: z.number().optional(),
57+
parentSessionID: z.string().optional(),
58+
toolCallID: z.string().optional(),
59+
dedupeKey: z.string().optional(),
60+
})
61+
.meta({ ref: "SessionRunnerEnqueueOptions" })
62+
export type EnqueueOptions = z.infer<typeof EnqueueOptions>
63+
64+
export const Event = {
65+
Queued: BusEvent.define(
66+
"session.background.queued",
67+
z.object({
68+
job: Job,
69+
}),
70+
),
71+
Started: BusEvent.define(
72+
"session.background.started",
73+
z.object({
74+
job: Job,
75+
}),
76+
),
77+
Completed: BusEvent.define(
78+
"session.background.completed",
79+
z.object({
80+
job: Job,
81+
}),
82+
),
83+
Failed: BusEvent.define(
84+
"session.background.failed",
85+
z.object({
86+
job: Job,
87+
}),
88+
),
89+
Canceled: BusEvent.define(
90+
"session.background.canceled",
91+
z.object({
92+
job: Job,
93+
}),
94+
),
95+
TimedOut: BusEvent.define(
96+
"session.background.timed_out",
97+
z.object({
98+
job: Job,
99+
}),
100+
),
101+
}
102+
12103
export const Options = z
13104
.object({
14105
model: z.object({
@@ -41,23 +132,249 @@ export namespace SessionRunner {
41132
.meta({ ref: "SessionRunnerResult" })
42133
export type RunResult = z.infer<typeof RunResult>
43134

44-
const state = Instance.state(() => ({
45-
active: {} as Record<
46-
string,
47-
{
48-
startedAt: number
49-
options: Options
50-
promise: Promise<RunResult>
135+
interface QueuedJob {
136+
job: Job
137+
run: (abort: AbortSignal) => Promise<void>
138+
resolve: (job: Job) => void
139+
}
140+
141+
const MAX_HISTORY = 100
142+
143+
const state = Instance.state(
144+
() => ({
145+
queue: [] as QueuedJob[],
146+
jobsById: {} as Record<string, Job>,
147+
abortById: {} as Record<string, AbortController>,
148+
dedupeKeys: {} as Record<string, string>,
149+
completionPromises: {} as Record<string, Promise<Job>>,
150+
running: 0,
151+
}),
152+
async (s) => {
153+
for (const queued of s.queue) {
154+
queued.job.status = "canceled"
155+
queued.job.finishedAt = Date.now()
156+
queued.job.error = { message: "Instance disposed" }
157+
queued.resolve(queued.job)
51158
}
52-
>,
53-
}))
159+
s.queue = []
160+
for (const id of Object.keys(s.abortById)) {
161+
s.abortById[id].abort(new Error("disposed"))
162+
}
163+
},
164+
)
165+
166+
async function getConfig() {
167+
const cfg = await Config.get()
168+
return {
169+
timeoutMs: cfg.experimental?.backgroundTasks?.timeoutMs ?? DEFAULT_TIMEOUT_MS,
170+
maxConcurrent: cfg.experimental?.backgroundTasks?.maxConcurrent ?? DEFAULT_MAX_CONCURRENT,
171+
}
172+
}
173+
174+
async function processQueue() {
175+
const s = state()
176+
const config = await getConfig()
177+
178+
while (s.queue.length > 0 && s.running < config.maxConcurrent) {
179+
const next = s.queue.shift()
180+
if (!next) break
181+
182+
s.running++
183+
runJob(next).finally(() => {
184+
s.running--
185+
processQueue()
186+
})
187+
}
188+
}
189+
190+
async function runJob(queued: QueuedJob) {
191+
const s = state()
192+
const config = await getConfig()
193+
const job = queued.job
194+
const timeout = job.timeoutMs ?? config.timeoutMs
195+
196+
const abort = new AbortController()
197+
s.abortById[job.id] = abort
198+
199+
job.status = "running"
200+
job.startedAt = Date.now()
201+
s.jobsById[job.id] = job
202+
Bus.publish(Event.Started, { job })
203+
log.info("job started", { id: job.id, kind: job.kind })
204+
205+
let timer: ReturnType<typeof setTimeout> | undefined
206+
207+
try {
208+
await Promise.race([
209+
queued.run(abort.signal),
210+
new Promise<never>((_, reject) => {
211+
timer = setTimeout(() => {
212+
abort.abort(new Error("timeout"))
213+
reject(new Error("timeout"))
214+
}, timeout)
215+
}),
216+
new Promise<never>((_, reject) => {
217+
abort.signal.addEventListener("abort", () => reject(abort.signal.reason), { once: true })
218+
}),
219+
])
220+
job.status = "completed"
221+
job.finishedAt = Date.now()
222+
Bus.publish(Event.Completed, { job })
223+
log.info("job completed", { id: job.id })
224+
} catch (err) {
225+
job.finishedAt = Date.now()
226+
const msg = err instanceof Error ? err.message : String(err)
227+
if (msg === "timeout" || abort.signal.reason?.message === "timeout") {
228+
job.status = "timed_out"
229+
job.error = { message: "Job timed out" }
230+
Bus.publish(Event.TimedOut, { job })
231+
log.warn("job timed out", { id: job.id })
232+
} else if (abort.signal.aborted) {
233+
job.status = "canceled"
234+
job.error = { message: "Job canceled" }
235+
Bus.publish(Event.Canceled, { job })
236+
log.info("job canceled", { id: job.id })
237+
} else {
238+
job.status = "failed"
239+
job.error = {
240+
name: err instanceof Error ? err.name : undefined,
241+
message: msg,
242+
}
243+
Bus.publish(Event.Failed, { job })
244+
log.error("job failed", { id: job.id, error: job.error })
245+
}
246+
} finally {
247+
if (timer) clearTimeout(timer)
248+
delete s.abortById[job.id]
249+
s.jobsById[job.id] = job
250+
pruneHistory(s)
251+
queued.resolve(job)
252+
}
253+
}
254+
255+
function pruneHistory(s: ReturnType<typeof state>) {
256+
const ids = Object.keys(s.jobsById)
257+
if (ids.length <= MAX_HISTORY) return
258+
259+
const completed = ids
260+
.filter((id) => {
261+
const status = s.jobsById[id].status
262+
return status !== "queued" && status !== "running"
263+
})
264+
.sort((a, b) => (s.jobsById[a].finishedAt ?? 0) - (s.jobsById[b].finishedAt ?? 0))
265+
266+
const toRemove = completed.slice(0, ids.length - MAX_HISTORY)
267+
for (const id of toRemove) {
268+
delete s.jobsById[id]
269+
delete s.completionPromises[id]
270+
}
271+
272+
for (const [key, jobId] of Object.entries(s.dedupeKeys)) {
273+
if (!s.jobsById[jobId]) delete s.dedupeKeys[key]
274+
}
275+
}
276+
277+
export async function enqueue(
278+
kind: JobKind,
279+
targetSessionID: string,
280+
run: (abort: AbortSignal) => Promise<void>,
281+
opts?: EnqueueOptions,
282+
): Promise<Job> {
283+
const s = state()
284+
285+
if (opts?.dedupeKey) {
286+
const existing = s.dedupeKeys[opts.dedupeKey]
287+
if (existing && s.jobsById[existing]) {
288+
const job = s.jobsById[existing]
289+
if (job.status === "queued" || job.status === "running") {
290+
log.info("dedupe hit", { key: opts.dedupeKey, id: existing })
291+
return s.completionPromises[existing]
292+
}
293+
}
294+
}
295+
296+
const job: Job = {
297+
id: `job_${ulid()}`,
298+
kind,
299+
targetSessionID,
300+
parentSessionID: opts?.parentSessionID,
301+
toolCallID: opts?.toolCallID,
302+
createdAt: Date.now(),
303+
timeoutMs: opts?.timeoutMs,
304+
status: "queued",
305+
}
306+
307+
s.jobsById[job.id] = job
308+
if (opts?.dedupeKey) {
309+
s.dedupeKeys[opts.dedupeKey] = job.id
310+
}
311+
312+
Bus.publish(Event.Queued, { job })
313+
log.info("job queued", { id: job.id, kind, targetSessionID })
314+
315+
const completionPromise = new Promise<Job>((resolve) => {
316+
s.queue.push({ job, run, resolve })
317+
processQueue()
318+
})
319+
s.completionPromises[job.id] = completionPromise
320+
321+
return completionPromise
322+
}
323+
324+
export function cancel(id: string): boolean {
325+
const s = state()
326+
const job = s.jobsById[id]
327+
if (!job) return false
328+
329+
if (job.status === "queued") {
330+
const idx = s.queue.findIndex((q) => q.job.id === id)
331+
if (idx !== -1) {
332+
const removed = s.queue.splice(idx, 1)[0]
333+
job.status = "canceled"
334+
job.finishedAt = Date.now()
335+
job.error = { message: "Job canceled" }
336+
s.jobsById[id] = job
337+
Bus.publish(Event.Canceled, { job })
338+
removed.resolve(job)
339+
log.info("job canceled (queued)", { id })
340+
return true
341+
}
342+
}
343+
344+
if (job.status === "running") {
345+
const abort = s.abortById[id]
346+
if (abort) {
347+
abort.abort(new Error("canceled"))
348+
log.info("job cancel requested", { id })
349+
return true
350+
}
351+
}
352+
353+
return false
354+
}
355+
356+
export function get(id: string): Job | undefined {
357+
return state().jobsById[id]
358+
}
359+
360+
export function list(): Job[] {
361+
return Object.values(state().jobsById)
362+
}
363+
364+
export function listQueued(): Job[] {
365+
return state().queue.map((q) => q.job)
366+
}
367+
368+
export function listRunning(): Job[] {
369+
return Object.values(state().jobsById).filter((j) => j.status === "running")
370+
}
54371

55372
export function isRunning(id: string): boolean {
56-
return id in state().active
373+
return id in state().abortById
57374
}
58375

59376
export function listActive(): string[] {
60-
return Object.keys(state().active)
377+
return Object.keys(state().abortById)
61378
}
62379

63380
export const runOnce = fn(SessionPrompt.PromptInput, async (input): Promise<MessageV2.WithParts> => {

0 commit comments

Comments
 (0)