Skip to content

Commit 020cf5c

Browse files
committed
feat(session): centralize async session work through SessionRunner
- Fix circular import by using regular function for runOnce
1 parent ce3927a commit 020cf5c

File tree

4 files changed

+261
-45
lines changed

4 files changed

+261
-45
lines changed

packages/opencode/src/server/server.ts

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import { ProjectRoute } from "./project"
3232
import { ToolRegistry } from "../tool/registry"
3333
import { zodToJsonSchema } from "zod-to-json-schema"
3434
import { SessionPrompt } from "../session/prompt"
35+
import { SessionRunner } from "../session/runner"
3536
import { SessionCompaction } from "../session/compaction"
3637
import { SessionRevert } from "../session/revert"
3738
import { lazy } from "../util/lazy"
@@ -1325,13 +1326,14 @@ export namespace Server {
13251326
),
13261327
validator("json", SessionPrompt.PromptInput.omit({ sessionID: true })),
13271328
async (c) => {
1328-
c.status(204)
1329-
c.header("Content-Type", "application/json")
1330-
return stream(c, async () => {
1331-
const sessionID = c.req.valid("param").sessionID
1332-
const body = c.req.valid("json")
1333-
SessionPrompt.prompt({ ...body, sessionID })
1334-
})
1329+
const sessionID = c.req.valid("param").sessionID
1330+
const body = c.req.valid("json")
1331+
void SessionRunner.promptBackground({
1332+
...body,
1333+
sessionID,
1334+
kind: "session.prompt_async",
1335+
}).catch((err) => log.error("prompt_async failed", { sessionID, error: err }))
1336+
return c.body(null, 204)
13351337
},
13361338
)
13371339
.post(

packages/opencode/src/session/runner.ts

Lines changed: 95 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -377,20 +377,107 @@ export namespace SessionRunner {
377377
return Object.keys(state().abortById)
378378
}
379379

380-
export const runOnce = fn(SessionPrompt.PromptInput, async (input): Promise<MessageV2.WithParts> => {
380+
export async function runOnce(input: SessionPrompt.PromptInput): Promise<MessageV2.WithParts> {
381381
log.info("runOnce", { sessionID: input.sessionID, agent: input.agent })
382382
return SessionPrompt.prompt(input)
383+
}
384+
385+
export const LoopBackgroundInput = z.object({
386+
sessionID: Identifier.schema("session"),
387+
kind: JobKind.optional(),
388+
parentSessionID: Identifier.schema("session").optional(),
389+
toolCallID: z.string().optional(),
390+
timeoutMs: z.number().optional(),
391+
dedupeKey: z.string().optional(),
392+
})
393+
export type LoopBackgroundInput = z.infer<typeof LoopBackgroundInput>
394+
395+
export const loopBackground = fn(LoopBackgroundInput, async (input): Promise<Job> => {
396+
log.info("loopBackground", { sessionID: input.sessionID })
397+
return enqueue(
398+
input.kind ?? "session.loop",
399+
input.sessionID,
400+
async () => {
401+
await SessionPrompt.loop(input.sessionID)
402+
},
403+
{
404+
parentSessionID: input.parentSessionID,
405+
toolCallID: input.toolCallID,
406+
timeoutMs: input.timeoutMs,
407+
dedupeKey: input.dedupeKey,
408+
},
409+
)
383410
})
384411

385-
export function runBackground(_id: string, _options: Options): void {
386-
throw new Error("SessionRunner.runBackground not yet implemented")
387-
}
412+
// Note: Defined inline to avoid circular dependency with SessionPrompt
413+
// (prompt.ts -> task.ts -> runner.ts -> prompt.ts)
414+
export const PromptBackgroundInput = z.object({
415+
sessionID: Identifier.schema("session"),
416+
messageID: Identifier.schema("message").optional(),
417+
model: z
418+
.object({
419+
providerID: z.string(),
420+
modelID: z.string(),
421+
})
422+
.optional(),
423+
agent: z.string().optional(),
424+
noReply: z.boolean().optional(),
425+
tools: z.record(z.string(), z.boolean()).optional(),
426+
system: z.string().optional(),
427+
parts: z.array(
428+
z.discriminatedUnion("type", [
429+
MessageV2.TextPart.omit({ messageID: true, sessionID: true }).partial({ id: true }),
430+
MessageV2.FilePart.omit({ messageID: true, sessionID: true }).partial({ id: true }),
431+
MessageV2.AgentPart.omit({ messageID: true, sessionID: true }).partial({ id: true }),
432+
MessageV2.SubtaskPart.omit({ messageID: true, sessionID: true }).partial({ id: true }),
433+
]),
434+
),
435+
kind: JobKind.optional(),
436+
parentSessionID: Identifier.schema("session").optional(),
437+
toolCallID: z.string().optional(),
438+
timeoutMs: z.number().optional(),
439+
dedupeKey: z.string().optional(),
440+
})
441+
export type PromptBackgroundInput = z.infer<typeof PromptBackgroundInput>
442+
443+
export const PromptBackgroundResult = z.object({
444+
job: Job,
445+
message: MessageV2.WithParts,
446+
})
447+
export type PromptBackgroundResult = z.infer<typeof PromptBackgroundResult>
448+
449+
export const promptBackground = fn(PromptBackgroundInput, async (input): Promise<PromptBackgroundResult> => {
450+
log.info("promptBackground", { sessionID: input.sessionID, agent: input.agent })
451+
const message = await SessionPrompt.prompt({ ...input, noReply: true })
452+
const job = await loopBackground({
453+
sessionID: input.sessionID,
454+
kind: input.kind ?? "session.prompt_async",
455+
parentSessionID: input.parentSessionID,
456+
toolCallID: input.toolCallID,
457+
timeoutMs: input.timeoutMs,
458+
dedupeKey: input.dedupeKey,
459+
})
460+
return { job, message }
461+
})
388462

389-
export function cancelBackground(_id: string): boolean {
390-
throw new Error("SessionRunner.cancelBackground not yet implemented")
463+
export function cancelBySession(sessionID: string): boolean {
464+
const s = state()
465+
for (const job of Object.values(s.jobsById)) {
466+
if (job.targetSessionID === sessionID && (job.status === "queued" || job.status === "running")) {
467+
SessionPrompt.cancel(sessionID)
468+
return cancel(job.id)
469+
}
470+
}
471+
return false
391472
}
392473

393-
export async function waitFor(_id: string): Promise<RunResult> {
394-
throw new Error("SessionRunner.waitFor not yet implemented")
474+
export function getBySession(sessionID: string): Job | undefined {
475+
const s = state()
476+
for (const job of Object.values(s.jobsById)) {
477+
if (job.targetSessionID === sessionID && (job.status === "queued" || job.status === "running")) {
478+
return job
479+
}
480+
}
481+
return undefined
395482
}
396483
}

packages/opencode/src/tool/task.ts

Lines changed: 51 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ import { MessageV2 } from "../session/message-v2"
77
import { Identifier } from "../id/id"
88
import { Agent } from "../agent/agent"
99
import { SessionPrompt } from "../session/prompt"
10+
import { SessionRunner } from "../session/runner"
1011
import { iife } from "@/util/iife"
11-
import { defer } from "@/util/defer"
1212
import { Config } from "../config/config"
1313

1414
export const TaskTool = Tool.define("task", async () => {
@@ -81,32 +81,62 @@ export const TaskTool = Tool.define("task", async () => {
8181
providerID: msg.info.providerID,
8282
}
8383

84-
function cancel() {
84+
const promptParts = await SessionPrompt.resolvePromptParts(params.prompt)
85+
const config = await Config.get()
86+
87+
const cancelChild = () => {
88+
SessionRunner.cancelBySession(session.id)
8589
SessionPrompt.cancel(session.id)
8690
}
87-
ctx.abort.addEventListener("abort", cancel)
88-
using _ = defer(() => ctx.abort.removeEventListener("abort", cancel))
89-
const promptParts = await SessionPrompt.resolvePromptParts(params.prompt)
91+
ctx.abort.addEventListener("abort", cancelChild, { once: true })
9092

91-
const config = await Config.get()
92-
const result = await SessionPrompt.prompt({
93-
messageID,
94-
sessionID: session.id,
95-
model: {
96-
modelID: model.modelID,
97-
providerID: model.providerID,
93+
let result: MessageV2.WithParts | undefined
94+
const job = await SessionRunner.enqueue(
95+
"task.child_session",
96+
session.id,
97+
async () => {
98+
result = await SessionPrompt.prompt({
99+
messageID,
100+
sessionID: session.id,
101+
model: {
102+
modelID: model.modelID,
103+
providerID: model.providerID,
104+
},
105+
agent: agent.name,
106+
tools: {
107+
todowrite: false,
108+
todoread: false,
109+
task: false,
110+
...Object.fromEntries((config.experimental?.primary_tools ?? []).map((t) => [t, false])),
111+
...agent.tools,
112+
},
113+
parts: promptParts,
114+
})
98115
},
99-
agent: agent.name,
100-
tools: {
101-
todowrite: false,
102-
todoread: false,
103-
task: false,
104-
...Object.fromEntries((config.experimental?.primary_tools ?? []).map((t) => [t, false])),
105-
...agent.tools,
116+
{
117+
parentSessionID: ctx.sessionID,
118+
toolCallID: ctx.messageID,
106119
},
107-
parts: promptParts,
108-
})
120+
)
109121
unsub()
122+
ctx.abort.removeEventListener("abort", cancelChild)
123+
124+
if (job.status === "canceled" || ctx.abort.aborted) {
125+
return {
126+
title: params.description,
127+
metadata: { summary: [], sessionId: session.id },
128+
output: "Task was canceled",
129+
}
130+
}
131+
132+
if (job.status !== "completed" || !result) {
133+
const error = job.error?.message ?? "Task failed"
134+
return {
135+
title: params.description,
136+
metadata: { summary: [], sessionId: session.id },
137+
output: `Task failed: ${error}`,
138+
}
139+
}
110140
const messages = await Session.messages({ sessionID: session.id })
111141
const summary = messages
112142
.filter((x) => x.info.role === "assistant")

packages/opencode/test/session/runner.test.ts

Lines changed: 106 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -107,21 +107,118 @@ describe("SessionRunner", () => {
107107
})
108108
})
109109

110-
describe("stub methods", () => {
111-
test("runBackground throws", () => {
112-
const opts: SessionRunner.Options = {
110+
describe("LoopBackgroundInput schema", () => {
111+
test("validates minimal input", () => {
112+
const input = { sessionID: "session_123" }
113+
expect(SessionRunner.LoopBackgroundInput.safeParse(input).success).toBe(true)
114+
})
115+
116+
test("validates full input", () => {
117+
const input = {
118+
sessionID: "session_123",
119+
kind: "session.loop" as const,
120+
parentSessionID: "session_parent",
121+
toolCallID: "tool_123",
122+
timeoutMs: 30000,
123+
dedupeKey: "my-key",
124+
}
125+
expect(SessionRunner.LoopBackgroundInput.safeParse(input).success).toBe(true)
126+
})
127+
})
128+
129+
describe("PromptBackgroundInput schema", () => {
130+
test("validates minimal input", () => {
131+
const input = {
132+
sessionID: "session_123",
133+
parts: [{ type: "text" as const, text: "hello" }],
134+
}
135+
expect(SessionRunner.PromptBackgroundInput.safeParse(input).success).toBe(true)
136+
})
137+
138+
test("validates full input with job options", () => {
139+
const input = {
140+
sessionID: "session_123",
141+
parts: [{ type: "text" as const, text: "hello" }],
113142
model: { providerID: "anthropic", modelID: "claude-3-5-sonnet" },
114-
agent: "code",
143+
agent: "build",
144+
kind: "session.prompt_async" as const,
145+
parentSessionID: "session_parent",
146+
timeoutMs: 60000,
115147
}
116-
expect(() => SessionRunner.runBackground("session_123", opts)).toThrow("not yet implemented")
148+
expect(SessionRunner.PromptBackgroundInput.safeParse(input).success).toBe(true)
149+
})
150+
})
151+
152+
describe("helper functions", () => {
153+
test("cancelBySession returns false for unknown session", async () => {
154+
await using tmp = await tmpdir({ git: true })
155+
await Instance.provide({
156+
directory: tmp.path,
157+
fn: () => {
158+
expect(SessionRunner.cancelBySession("unknown_session")).toBe(false)
159+
},
160+
})
161+
})
162+
163+
test("getBySession returns undefined for unknown session", async () => {
164+
await using tmp = await tmpdir({ git: true })
165+
await Instance.provide({
166+
directory: tmp.path,
167+
fn: () => {
168+
expect(SessionRunner.getBySession("unknown_session")).toBeUndefined()
169+
},
170+
})
117171
})
118172

119-
test("cancelBackground throws", () => {
120-
expect(() => SessionRunner.cancelBackground("session_123")).toThrow("not yet implemented")
173+
test("getBySession finds active job", async () => {
174+
await using tmp = await tmpdir({ git: true })
175+
await Instance.provide({
176+
directory: tmp.path,
177+
fn: async () => {
178+
let resolve: () => void
179+
const blocker = new Promise<void>((r) => {
180+
resolve = r
181+
})
182+
183+
const jobPromise = SessionRunner.enqueue("session.loop", "ses_target", async () => {
184+
await blocker
185+
})
186+
187+
await new Promise((r) => setTimeout(r, 10))
188+
189+
const found = SessionRunner.getBySession("ses_target")
190+
expect(found).toBeDefined()
191+
expect(found?.targetSessionID).toBe("ses_target")
192+
expect(["queued", "running"]).toContain(found!.status)
193+
194+
resolve!()
195+
await jobPromise
196+
},
197+
})
121198
})
122199

123-
test("waitFor throws", async () => {
124-
await expect(SessionRunner.waitFor("session_123")).rejects.toThrow("not yet implemented")
200+
test("cancelBySession cancels running job", async () => {
201+
await using tmp = await tmpdir({ git: true })
202+
await Instance.provide({
203+
directory: tmp.path,
204+
fn: async () => {
205+
let started = false
206+
const jobPromise = SessionRunner.enqueue("session.loop", "ses_cancel", async () => {
207+
started = true
208+
await new Promise((r) => setTimeout(r, 5000))
209+
})
210+
211+
while (!started) {
212+
await new Promise((r) => setTimeout(r, 5))
213+
}
214+
215+
const cancelled = SessionRunner.cancelBySession("ses_cancel")
216+
expect(cancelled).toBe(true)
217+
218+
const job = await jobPromise
219+
expect(job.status).toBe("canceled")
220+
},
221+
})
125222
})
126223
})
127224

0 commit comments

Comments
 (0)