@@ -10,6 +10,10 @@ import { SessionPrompt } from "../session/prompt"
1010import { SessionRunner } from "../session/runner"
1111import { iife } from "@/util/iife"
1212import { Config } from "../config/config"
13+ import { Log } from "../util/log"
14+ import { Storage } from "../storage/storage"
15+
16+ const log = Log . create ( { service : "tool.task" } )
1317
1418export const TaskTool = Tool . define ( "task" , async ( ) => {
1519 const agents = await Agent . list ( ) . then ( ( x ) => x . filter ( ( a ) => a . mode !== "primary" ) )
@@ -45,15 +49,50 @@ export const TaskTool = Tool.define("task", async () => {
4549 const msg = await MessageV2 . get ( { sessionID : ctx . sessionID , messageID : ctx . messageID } )
4650 if ( msg . info . role !== "assistant" ) throw new Error ( "Not an assistant message" )
4751
48- ctx . metadata ( {
49- title : params . description ,
50- metadata : {
51- sessionId : session . id ,
52- } ,
53- } )
54-
5552 const messageID = Identifier . ascending ( "message" )
5653 const parts : Record < string , { id : string ; tool : string ; state : { status : string ; title ?: string } } > = { }
54+
55+ const model = agent . model ?? {
56+ modelID : msg . info . modelID ,
57+ providerID : msg . info . providerID ,
58+ }
59+
60+ const promptParts = await SessionPrompt . resolvePromptParts ( params . prompt )
61+ const config = await Config . get ( )
62+
63+ const cancelChild = ( ) => {
64+ SessionRunner . cancelBySession ( session . id )
65+ SessionPrompt . cancel ( session . id )
66+ }
67+ ctx . abort . addEventListener ( "abort" , cancelChild , { once : true } )
68+
69+ // Helper to update parent tool part metadata (works after execute returns)
70+ const updateParentToolPart = async ( metadata : {
71+ summary : typeof parts [ string ] [ ]
72+ sessionId : string
73+ jobId ?: string
74+ status ?: string
75+ } ) => {
76+ if ( ! ctx . toolPartID ) return
77+ const currentPart = await Storage . read < MessageV2 . ToolPart > ( [ "part" , ctx . messageID , ctx . toolPartID ] ) . catch (
78+ ( err ) => {
79+ log . warn ( "failed to read parent tool part" , { error : err , partID : ctx . toolPartID } )
80+ return undefined
81+ } ,
82+ )
83+ if ( ! currentPart || currentPart . type !== "tool" ) return
84+ // Skip pending (no metadata field) and error (terminal state)
85+ if ( currentPart . state . status === "pending" || currentPart . state . status === "error" ) return
86+ await Session . updatePart ( {
87+ ...currentPart ,
88+ state : {
89+ ...currentPart . state ,
90+ metadata,
91+ } ,
92+ } ) . catch ( ( err ) => log . warn ( "failed to update parent tool part" , { error : err } ) )
93+ }
94+
95+ // Subscribe to child session part updates for live progress
5796 const unsub = Bus . subscribe ( MessageV2 . Event . PartUpdated , async ( evt ) => {
5897 if ( evt . properties . part . sessionID !== session . id ) return
5998 if ( evt . properties . part . messageID === messageID ) return
@@ -67,35 +106,49 @@ export const TaskTool = Tool.define("task", async () => {
67106 title : part . state . status === "completed" ? part . state . title : undefined ,
68107 } ,
69108 }
70- ctx . metadata ( {
71- title : params . description ,
72- metadata : {
73- summary : Object . values ( parts ) . sort ( ( a , b ) => a . id . localeCompare ( b . id ) ) ,
74- sessionId : session . id ,
75- } ,
109+ await updateParentToolPart ( {
110+ summary : Object . values ( parts ) . sort ( ( a , b ) => a . id . localeCompare ( b . id ) ) ,
111+ sessionId : session . id ,
76112 } )
77113 } )
78114
79- const model = agent . model ?? {
80- modelID : msg . info . modelID ,
81- providerID : msg . info . providerID ,
115+ // Cleanup function for all subscriptions
116+ const cleanup = ( ) => {
117+ unsub ( )
118+ ctx . abort . removeEventListener ( "abort" , cancelChild )
82119 }
83120
84- const promptParts = await SessionPrompt . resolvePromptParts ( params . prompt )
85- const config = await Config . get ( )
86-
87- const cancelChild = ( ) => {
88- SessionRunner . cancelBySession ( session . id )
89- SessionPrompt . cancel ( session . id )
90- }
91- ctx . abort . addEventListener ( "abort" , cancelChild , { once : true } )
121+ // Subscribe to job lifecycle events for cleanup and status updates
122+ const jobEvents = [
123+ SessionRunner . Event . Completed ,
124+ SessionRunner . Event . Failed ,
125+ SessionRunner . Event . Canceled ,
126+ SessionRunner . Event . TimedOut ,
127+ ] as const
128+ const jobUnsubs = jobEvents . map ( ( event ) =>
129+ Bus . subscribe ( event , async ( evt ) => {
130+ if ( evt . properties . job . targetSessionID !== session . id ) return
131+ const job = evt . properties . job
132+ // Update parent metadata with final status
133+ await updateParentToolPart ( {
134+ summary : Object . values ( parts ) . sort ( ( a , b ) => a . id . localeCompare ( b . id ) ) ,
135+ sessionId : session . id ,
136+ status : job . status ,
137+ } )
138+ if ( job . status !== "completed" ) {
139+ log . info ( "child session job ended" , { jobId : job . id , status : job . status , error : job . error } )
140+ }
141+ cleanup ( )
142+ jobUnsubs . forEach ( ( u ) => u ( ) )
143+ } ) ,
144+ )
92145
93- let result : MessageV2 . WithParts | undefined
94- const job = await SessionRunner . enqueue (
146+ // Enqueue the child session work (fire-and-forget)
147+ SessionRunner . enqueue (
95148 "task.child_session" ,
96149 session . id ,
97150 async ( ) => {
98- result = await SessionPrompt . prompt ( {
151+ await SessionPrompt . prompt ( {
99152 messageID,
100153 sessionID : session . id ,
101154 model : {
@@ -115,51 +168,23 @@ export const TaskTool = Tool.define("task", async () => {
115168 } ,
116169 {
117170 parentSessionID : ctx . sessionID ,
118- toolCallID : ctx . messageID ,
171+ toolCallID : ctx . callID ?? ctx . messageID ,
119172 } ,
120- )
121- unsub ( )
122- ctx . abort . removeEventListener ( "abort" , cancelChild )
123-
124- if ( job . status === "canceled" || ctx . abort . aborted ) {
125- return {
126- title : params . description ,
127- metadata : { summary : [ ] , sessionId : session . id } ,
128- output : "Task was canceled" ,
129- }
130- }
131-
132- if ( job . status !== "completed" || ! result ) {
133- const error = job . error ?. message ?? "Task failed"
134- return {
135- title : params . description ,
136- metadata : { summary : [ ] , sessionId : session . id } ,
137- output : `Task failed: ${ error } ` ,
138- }
139- }
140- const messages = await Session . messages ( { sessionID : session . id } )
141- const summary = messages
142- . filter ( ( x ) => x . info . role === "assistant" )
143- . flatMap ( ( msg ) => msg . parts . filter ( ( x : any ) => x . type === "tool" ) as MessageV2 . ToolPart [ ] )
144- . map ( ( part ) => ( {
145- id : part . id ,
146- tool : part . tool ,
147- state : {
148- status : part . state . status ,
149- title : part . state . status === "completed" ? part . state . title : undefined ,
150- } ,
151- } ) )
152- const text = result . parts . findLast ( ( x ) => x . type === "text" ) ?. text ?? ""
153-
154- const output = text + "\n\n" + [ "<task_metadata>" , `session_id: ${ session . id } ` , "</task_metadata>" ] . join ( "\n" )
173+ ) . catch ( ( err ) => {
174+ log . error ( "failed to enqueue child session" , { error : err , sessionID : session . id } )
175+ cleanup ( )
176+ jobUnsubs . forEach ( ( u ) => u ( ) )
177+ } )
155178
179+ // Return immediately without waiting for job completion
156180 return {
157181 title : params . description ,
158182 metadata : {
159- summary,
183+ summary : [ ] ,
160184 sessionId : session . id ,
185+ status : "running" ,
161186 } ,
162- output,
187+ output : `Task started in background.\n\n<task_metadata>\nsession_id: ${ session . id } \n</task_metadata>` ,
163188 }
164189 } ,
165190 }
0 commit comments