Skip to content

Commit 5c18870

Browse files
fix(compose): recover state handler panic (#446)
1 parent 562f366 commit 5c18870

File tree

1 file changed

+35
-13
lines changed

1 file changed

+35
-13
lines changed

compose/graph_manager.go

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -287,12 +287,9 @@ func (t *taskManager) submit(tasks []*task) error {
287287
// 1. the new task is the only one
288288
// 2. the task manager mode is set to needAll
289289
for _, currentTask := range tasks {
290-
if currentTask.call.preProcessor != nil && !currentTask.skipPreHandler {
291-
nInput, err := t.runWrapper(currentTask.ctx, currentTask.call.preProcessor, currentTask.input, currentTask.option...)
292-
if err != nil {
293-
return fmt.Errorf("run node[%s] pre processor fail: %w", currentTask.nodeKey, err)
294-
}
295-
currentTask.input = nInput
290+
err := runPreHandler(currentTask, t.runWrapper)
291+
if err != nil {
292+
return err
296293
}
297294
}
298295
var syncTask *task
@@ -335,13 +332,7 @@ func (t *taskManager) waitOne() (*task, bool) {
335332
if ta.err != nil {
336333
return ta, true
337334
}
338-
if ta.call.postProcessor != nil {
339-
nOutput, err := t.runWrapper(ta.ctx, ta.call.postProcessor, ta.output, ta.option...)
340-
if err != nil {
341-
ta.err = fmt.Errorf("run node[%s] post processor fail: %w", ta.nodeKey, err)
342-
}
343-
ta.output = nOutput
344-
}
335+
runPostHandler(ta, t.runWrapper)
345336
return ta, true
346337
}
347338

@@ -355,3 +346,34 @@ func (t *taskManager) waitAll() []*task {
355346
result = append(result, ta)
356347
}
357348
}
349+
350+
func runPreHandler(ta *task, runWrapper runnableCallWrapper) (err error) {
351+
defer func() {
352+
if e := recover(); e != nil {
353+
err = safe.NewPanicErr(fmt.Errorf("panic in pre handler: %v", e), debug.Stack())
354+
}
355+
}()
356+
if ta.call.preProcessor != nil && !ta.skipPreHandler {
357+
nInput, err := runWrapper(ta.ctx, ta.call.preProcessor, ta.input, ta.option...)
358+
if err != nil {
359+
return fmt.Errorf("run node[%s] pre processor fail: %w", ta.nodeKey, err)
360+
}
361+
ta.input = nInput
362+
}
363+
return nil
364+
}
365+
366+
func runPostHandler(ta *task, runWrapper runnableCallWrapper) {
367+
defer func() {
368+
if e := recover(); e != nil {
369+
ta.err = safe.NewPanicErr(fmt.Errorf("panic in post handler: %v", e), debug.Stack())
370+
}
371+
}()
372+
if ta.call.postProcessor != nil {
373+
nOutput, err := runWrapper(ta.ctx, ta.call.postProcessor, ta.output, ta.option...)
374+
if err != nil {
375+
ta.err = fmt.Errorf("run node[%s] post processor fail: %w", ta.nodeKey, err)
376+
}
377+
ta.output = nOutput
378+
}
379+
}

0 commit comments

Comments
 (0)