From 0b09b9753a2e281d1f2a2daa5de46bfa095980da Mon Sep 17 00:00:00 2001 From: lipandeng Date: Wed, 27 Aug 2025 12:06:29 +0800 Subject: [PATCH 01/12] fix(adk): fix prebuilt.NewPlanExecuteAgent problem refactor: del duplicate fix feat: add GenInputFn to replace instruction --- adk/prebuilt/plan_execute.go | 291 +++++++++++++++++++++--------- adk/prebuilt/plan_execute_test.go | 54 +++--- 2 files changed, 229 insertions(+), 116 deletions(-) diff --git a/adk/prebuilt/plan_execute.go b/adk/prebuilt/plan_execute.go index 044b67b1..b6dfcf18 100644 --- a/adk/prebuilt/plan_execute.go +++ b/adk/prebuilt/plan_execute.go @@ -36,6 +36,12 @@ import ( // Plan represents a structured output format with a list of steps to be executed. // This struct is used for JSON serialization/deserialization of the plan output // generated by the model. +type Plan interface { + StepList(ctx context.Context) []any + Unmarshal(ctx context.Context, arguments string) error +} + +// plan the default implementation of Plan. // // JSON Schema: // @@ -52,12 +58,24 @@ import ( // }, // "required": ["steps"] // } -type Plan struct { +type plan struct { // Steps contains the ordered list of actions to be taken. // Each step should be clear, actionable, and arranged in a logical sequence. Steps []string `json:"steps"` } +func (p *plan) StepList(ctx context.Context) []any { + stepList := make([]any, 0, len(p.Steps)) + for _, step := range p.Steps { + stepList = append(stepList, step) + } + return stepList +} + +func (p *plan) Unmarshal(ctx context.Context, arguments string) error { + return sonic.UnmarshalString(arguments, p) +} + // Response represents the final response to the user. // This struct is used for JSON serialization/deserialization of the final response // generated by the model. @@ -102,7 +120,11 @@ var ( } ExecutorUserPrompt = prompt.FromMessages(schema.FString, schema.UserMessage( - "Given the following plan:\n{plan}\nYour task is to execute the first step, which is: {task}.")) + ` +## Given the following plan: +{plan} +## Your task is to execute the first step, which is: +{task}`)) ReplannerUserPrompt = prompt.FromMessages(schema.FString, schema.UserMessage( `You are going to review the progress toward an objective. Analyze the current state and determine the optimal next action. @@ -189,7 +211,7 @@ Each step in your plan must be: PlanSessionKey = "Plan" // ExecuteResultSessionKey is the session key for the execute result. - ExecuteResultSessionKey = "ExecuteResult" + ExecuteResultSessionKey = "ExecutedStep" // ExecuteResultsSessionKey is the session key for the execute results. ExecuteResultsSessionKey = "ExecuteResults" @@ -214,16 +236,35 @@ type PlannerConfig struct { // If not provided, PlanToolInfo will be used as the default. ToolInfo *schema.ToolInfo - // Instruction is the system instruction for the planner. - // It provides context and guidance to the planner on how to generate the Plan. - // If not provided, PlannerInstruction will be used as the default. - Instruction string + GenInputFn GenPlannerInputFn + + NewPlanFn NewPlanFn +} + +type PlannerInput struct { + Input []adk.Message +} + +type GenPlannerInputFn func(ctx context.Context, in *PlannerInput) ([]adk.Message, error) + +type NewPlanFn func(ctx context.Context) Plan + +func defaultNewPlanFn(ctx context.Context) Plan { + return &plan{} +} + +func defaultGenPlannerInputFn(ctx context.Context, in *PlannerInput) ([]adk.Message, error) { + msgs := make([]adk.Message, 0, 1+len(in.Input)) + msgs = append(msgs, schema.SystemMessage(PlannerInstruction)) + msgs = append(msgs, in.Input...) + return msgs, nil } type planner struct { - toolCall bool - chatModel model.BaseChatModel - sysMsg *schema.Message + toolCall bool + chatModel model.BaseChatModel + genInputFn GenPlannerInputFn + newPlanFn NewPlanFn } func (p *planner) Name(_ context.Context) string { @@ -260,9 +301,13 @@ func (p *planner) Run(ctx context.Context, input *adk.AgentInput, generator.Close() }() - msgs := make([]*schema.Message, 0, len(input.Messages)+1) - msgs = append(msgs, p.sysMsg) - msgs = append(msgs, input.Messages...) + msgs, err := p.genInputFn(ctx, &PlannerInput{ + Input: input.Messages, + }) + if err != nil { + generator.Send(&adk.AgentEvent{Err: err}) + return + } var modelCallOptions []model.Option if p.toolCall { modelCallOptions = append(modelCallOptions, model.WithToolChoice(schema.ToolChoiceForced)) @@ -270,9 +315,9 @@ func (p *planner) Run(ctx context.Context, input *adk.AgentInput, var msg adk.Message if input.EnableStreaming { - s, err := p.chatModel.Stream(ctx, msgs, modelCallOptions...) - if err != nil { - generator.Send(&adk.AgentEvent{Err: err}) + s, err_ := p.chatModel.Stream(ctx, msgs, modelCallOptions...) + if err_ != nil { + generator.Send(&adk.AgentEvent{Err: err_}) return } @@ -287,9 +332,9 @@ func (p *planner) Run(ctx context.Context, input *adk.AgentInput, event := adk.EventFromMessage(nil, sOutput, schema.Assistant, "") generator.Send(event) - msg, err = schema.ConcatMessageStream(ss[1]) - if err != nil { - generator.Send(&adk.AgentEvent{Err: err}) + msg, err_ = schema.ConcatMessageStream(ss[1]) + if err_ != nil { + generator.Send(&adk.AgentEvent{Err: err_}) return } @@ -298,10 +343,10 @@ func (p *planner) Run(ctx context.Context, input *adk.AgentInput, return } } else { - var err error - msg, err = p.chatModel.Generate(ctx, msgs, modelCallOptions...) - if err != nil { - generator.Send(&adk.AgentEvent{Err: err}) + var err_ error + msg, err_ = p.chatModel.Generate(ctx, msgs, modelCallOptions...) + if err_ != nil { + generator.Send(&adk.AgentEvent{Err: err_}) return } @@ -320,14 +365,15 @@ func (p *planner) Run(ctx context.Context, input *adk.AgentInput, generator.Send(event) } - var plan Plan + var plan plan var planJSON string if p.toolCall { planJSON = msg.ToolCalls[0].Function.Arguments } else { planJSON = msg.Content } - err := sonic.UnmarshalString(planJSON, &plan) + plan_ := p.newPlanFn(ctx) + err = plan_.Unmarshal(ctx, planJSON) if err != nil { err = fmt.Errorf("unmarshal plan error: %w", err) generator.Send(&adk.AgentEvent{Err: err}) @@ -348,12 +394,6 @@ func (p *planner) Run(ctx context.Context, input *adk.AgentInput, // If ToolCallingChatModel is provided, it will be configured with ToolInfo (or PlanToolInfo by default) // to generate structured Plan output. func NewPlanner(_ context.Context, cfg *PlannerConfig) (adk.Agent, error) { - instruction := cfg.Instruction - if instruction == "" { - instruction = PlannerInstruction - } - sysMsg := schema.SystemMessage(instruction) - var chatModel model.BaseChatModel var toolCall bool if cfg.ChatModelWithFormattedOutput != nil { @@ -372,28 +412,48 @@ func NewPlanner(_ context.Context, cfg *PlannerConfig) (adk.Agent, error) { } } + inputFn := cfg.GenInputFn + if inputFn == nil { + inputFn = defaultGenPlannerInputFn + } + + newPlanFn := cfg.NewPlanFn + if newPlanFn == nil { + newPlanFn = defaultNewPlanFn + } + return &planner{ - toolCall: toolCall, - chatModel: chatModel, - sysMsg: sysMsg, + toolCall: toolCall, + chatModel: chatModel, + genInputFn: inputFn, + newPlanFn: newPlanFn, }, nil } +type PlanExecuteInput struct { + Input []adk.Message + Plan Plan + ExecutedSteps []ExecutedStep +} +type GenPlanExecuteInputFn func(ctx context.Context, in *PlanExecuteInput) ([]adk.Message, error) + type ExecutorConfig struct { - Instruction string Model model.ToolCallingChatModel ToolsConfig adk.ToolsConfig MaxStep int + + GenInputFn GenPlanExecuteInputFn + Instruction string } -type ExecuteResult struct { - Task string +type ExecutedStep struct { + Step any Result string } -func formatPlan(p *Plan) string { +func formatPlan(ctx context.Context, p Plan) string { var formattedPlan strings.Builder - for i, step := range p.Steps { + for i, step := range p.StepList(ctx) { formattedPlan.WriteString(fmt.Sprintf("%d. %s\n", i+1, step)) } @@ -402,31 +462,42 @@ func formatPlan(p *Plan) string { // NewExecutor creates a new executor agent. func NewExecutor(ctx context.Context, cfg *ExecutorConfig) (adk.Agent, error) { - genInput := func(ctx context.Context, instruction string, _ *adk.AgentInput) ([]adk.Message, error) { - msgs := make([]adk.Message, 0, 2) - if instruction != "" { - sp := schema.SystemMessage(instruction) - msgs = append(msgs, sp) - } + genInputFn := cfg.GenInputFn + if genInputFn == nil { + genInputFn = defaultGenExecutorInputFn + } + genInput := func(ctx context.Context, instruction string, _ *adk.AgentInput) ([]adk.Message, error) { plan, ok := adk.GetSessionValue(ctx, PlanSessionKey) if !ok { panic("impossible: plan not found") } - plan_ := plan.(*Plan) - task := plan_.Steps[0] + plan_ := plan.(Plan) - userMsgs, err := ExecutorUserPrompt.Format(ctx, map[string]any{ - "plan": formatPlan(plan_), - "task": task, - }) + userInput, ok := adk.GetSessionValue(ctx, PlanExecuteUserInputSessionKey) + if !ok { + panic("impossible: user input not found") + } + userInput_ := userInput.([]adk.Message) + + var executedSteps_ []ExecutedStep + executedStep, ok := adk.GetSessionValue(ctx, ExecuteResultsSessionKey) + if ok { + executedSteps_ = executedStep.([]ExecutedStep) + } + + in := &PlanExecuteInput{ + Input: userInput_, + Plan: plan_, + ExecutedSteps: executedSteps_, + } + + msgs, err := genInputFn(ctx, in) if err != nil { return nil, err } - msgs = append(msgs, userMsgs...) - return msgs, nil } @@ -447,16 +518,31 @@ func NewExecutor(ctx context.Context, cfg *ExecutorConfig) (adk.Agent, error) { return agent, nil } +func defaultGenExecutorInputFn(ctx context.Context, in *PlanExecuteInput) ([]adk.Message, error) { + + step := in.Plan.StepList(ctx)[0] + userMsgs, err := ExecutorUserPrompt.Format(ctx, map[string]any{ + "input": formatInput(in.Input), + "plan": formatPlan(ctx, in.Plan), + "step": step, + }) + if err != nil { + return nil, err + } + + return userMsgs, nil +} + type replanner struct { chatModel model.ToolCallingChatModel - instruction string planTool *schema.ToolInfo respondTool *schema.ToolInfo + + genInputFn GenPlanExecuteInputFn + newPlanFn NewPlanFn } type ReplannerConfig struct { - // Instruction is the system instruction for the replanner. - Instruction string // ChatModel is the model that supports tool calling capabilities. // It will be configured with PlanTool and RespondTool to generate updated plans or responses. @@ -469,6 +555,10 @@ type ReplannerConfig struct { // RespondTool defines the schema for the response tool that can be used with ToolCallingChatModel. // If not provided, the default RespondToolInfo will be used. RespondTool *schema.ToolInfo + + GenInputFn GenPlanExecuteInputFn + + NewPlanFn NewPlanFn } func formatInput(input []adk.Message) string { @@ -481,10 +571,10 @@ func formatInput(input []adk.Message) string { return sb.String() } -func formatExecutedResults(results []ExecuteResult) string { +func formatExecutedSteps(results []ExecutedStep) string { var sb strings.Builder for _, result := range results { - sb.WriteString(fmt.Sprintf("Task: %s\nResult: %s\n\n", result.Task, result.Result)) + sb.WriteString(fmt.Sprintf("Step: %s\nResult: %s\n\n", result.Step, result.Result)) } return sb.String() @@ -499,12 +589,6 @@ func (r *replanner) Description(_ context.Context) string { } func (r *replanner) genInput(ctx context.Context) ([]adk.Message, error) { - msgs := make([]adk.Message, 0, 2) - - if r.instruction != "" { - sp := schema.SystemMessage(r.instruction) - msgs = append(msgs, sp) - } executeResult, ok := adk.GetSessionValue(ctx, ExecuteResultSessionKey) if !ok { @@ -516,20 +600,20 @@ func (r *replanner) genInput(ctx context.Context) ([]adk.Message, error) { if !ok { panic("impossible: plan not found") } - plan_ := plan.(*Plan) - task := plan_.Steps[0] + plan_ := plan.(Plan) + step := plan_.StepList(ctx)[0] - var executeResults_ []ExecuteResult - executeResults, ok := adk.GetSessionValue(ctx, ExecuteResultsSessionKey) + var executedSteps_ []ExecutedStep + executedSteps, ok := adk.GetSessionValue(ctx, ExecuteResultsSessionKey) if ok { - executeResults_ = executeResults.([]ExecuteResult) + executedSteps_ = executedSteps.([]ExecutedStep) } - executeResults_ = append(executeResults_, ExecuteResult{ - Task: task, + executedSteps_ = append(executedSteps_, ExecutedStep{ + Step: step, Result: executeResult_, }) - adk.SetSessionValue(ctx, ExecuteResultsSessionKey, executeResults_) + adk.SetSessionValue(ctx, ExecuteResultsSessionKey, executedSteps_) userInput, ok := adk.GetSessionValue(ctx, PlanExecuteUserInputSessionKey) if !ok { @@ -537,19 +621,20 @@ func (r *replanner) genInput(ctx context.Context) ([]adk.Message, error) { } userInput_ := userInput.([]adk.Message) - userMsgs, err := ReplannerUserPrompt.Format(ctx, map[string]any{ - "plan": formatPlan(plan_), - "input": formatInput(userInput_), - "executed_results": formatExecutedResults(executeResults_), - "plan_tool": r.planTool.Name, - "respond_tool": r.respondTool.Name, - }) + in := &PlanExecuteInput{ + Input: userInput_, + Plan: plan_, + ExecutedSteps: executedSteps_, + } + genInputFn := r.genInputFn + if genInputFn == nil { + genInputFn = buildDefaultReplannerInputFn(in, r.planTool.Name, r.respondTool.Name) + } + msgs, err := genInputFn(ctx, in) if err != nil { return nil, err } - msgs = append(msgs, userMsgs...) - return msgs, nil } @@ -661,20 +746,37 @@ func (r *replanner) Run(ctx context.Context, input *adk.AgentInput, _ ...adk.Age return } - var plan Plan - err = sonic.UnmarshalString(planMsg.ToolCalls[0].Function.Arguments, &plan) + plan_ := r.newPlanFn(ctx) + err = plan_.Unmarshal(ctx, planMsg.ToolCalls[0].Function.Arguments) if err != nil { err = fmt.Errorf("unmarshal plan error: %w", err) generator.Send(&adk.AgentEvent{Err: err}) return } - adk.SetSessionValue(ctx, PlanSessionKey, &plan) + adk.SetSessionValue(ctx, PlanSessionKey, plan_) }() return iterator } +func buildDefaultReplannerInputFn(in *PlanExecuteInput, planToolName, respondToolName string) GenPlanExecuteInputFn { + return func(ctx context.Context, in *PlanExecuteInput) ([]adk.Message, error) { + msgs, err := ReplannerUserPrompt.Format(ctx, map[string]any{ + "plan": formatPlan(ctx, in.Plan), + "input": formatInput(in.Input), + "executed_results": formatExecutedSteps(in.ExecutedSteps), + "plan_tool": planToolName, + "respond_tool": respondToolName, + }) + if err != nil { + return nil, err + } + + return msgs, nil + } +} + func NewReplanner(_ context.Context, cfg *ReplannerConfig) (adk.Agent, error) { planTool := cfg.PlanTool if planTool == nil { @@ -691,25 +793,36 @@ func NewReplanner(_ context.Context, cfg *ReplannerConfig) (adk.Agent, error) { return nil, err } + newPlanFn := cfg.NewPlanFn + if newPlanFn == nil { + newPlanFn = defaultNewPlanFn + } + return &replanner{ chatModel: chatModel, - instruction: cfg.Instruction, planTool: planTool, respondTool: respondTool, + genInputFn: cfg.GenInputFn, + newPlanFn: newPlanFn, }, nil } type PlanExecuteConfig struct { - Planner adk.Agent - Executor adk.Agent - Replanner adk.Agent + Planner adk.Agent + Executor adk.Agent + Replanner adk.Agent + MaxReplans int } func NewPlanExecuteAgent(ctx context.Context, cfg *PlanExecuteConfig) (adk.Agent, error) { + maxReplans := cfg.MaxReplans + if maxReplans <= 0 { + maxReplans = 10 + } loop, err := adk.NewLoopAgent(ctx, &adk.LoopAgentConfig{ Name: "execute_replan", SubAgents: []adk.Agent{cfg.Executor, cfg.Replanner}, - MaxIterations: 10, + MaxIterations: maxReplans, }) if err != nil { return nil, err diff --git a/adk/prebuilt/plan_execute_test.go b/adk/prebuilt/plan_execute_test.go index 23862c6d..8bb312dd 100644 --- a/adk/prebuilt/plan_execute_test.go +++ b/adk/prebuilt/plan_execute_test.go @@ -128,13 +128,13 @@ func TestPlannerRunWithFormattedOutput(t *testing.T) { event, ok = iterator.Next() assert.False(t, ok) - var plan Plan + var plan plan err = sonic.UnmarshalString(msg.Content, &plan) assert.NoError(t, err) - assert.Equal(t, 3, len(plan.Steps)) - assert.Equal(t, "Step 1", plan.Steps[0]) - assert.Equal(t, "Step 2", plan.Steps[1]) - assert.Equal(t, "Step 3", plan.Steps[2]) + assert.Equal(t, 3, len(plan.StepList)) + assert.Equal(t, "Step 1", plan.StepList[0]) + assert.Equal(t, "Step 2", plan.StepList[1]) + assert.Equal(t, "Step 3", plan.StepList[2]) } // TestPlannerRunWithToolCalling tests the Run method of a planner created with ToolCallingChatModel @@ -194,13 +194,13 @@ func TestPlannerRunWithToolCalling(t *testing.T) { _, ok = iterator.Next() assert.False(t, ok) - var plan Plan + var plan plan err = sonic.UnmarshalString(msg.Content, &plan) assert.NoError(t, err) - assert.Equal(t, 3, len(plan.Steps)) - assert.Equal(t, "Step 1", plan.Steps[0]) - assert.Equal(t, "Step 2", plan.Steps[1]) - assert.Equal(t, "Step 3", plan.Steps[2]) + assert.Equal(t, 3, len(plan.StepList)) + assert.Equal(t, "Step 1", plan.StepList[0]) + assert.Equal(t, "Step 2", plan.StepList[1]) + assert.Equal(t, "Step 3", plan.StepList[2]) } // TestNewExecutor tests the NewExecutor function @@ -243,7 +243,7 @@ func TestExecutorRun(t *testing.T) { mockToolCallingModel := mockModel.NewMockToolCallingChatModel(ctrl) // Store a plan in the session - plan := &Plan{Steps: []string{"Step 1", "Step 2", "Step 3"}} + plan := &plan{Steps: []string{"Step 1", "Step 2", "Step 3"}} adk.SetSessionValue(ctx, PlanSessionKey, plan) // Set up expectations for the mock model @@ -389,7 +389,7 @@ func TestReplannerRunWithPlan(t *testing.T) { assert.NoError(t, err) // Store necessary values in the session - plan := &Plan{Steps: []string{"Step 1", "Step 2", "Step 3"}} + plan := &plan{Steps: []string{"Step 1", "Step 2", "Step 3"}} rp, err = AgentWithSessionKVs(ctx, rp, map[string]any{ PlanSessionKey: plan, ExecuteResultSessionKey: "Execution result", @@ -417,19 +417,19 @@ func TestReplannerRunWithPlan(t *testing.T) { // Verify the updated plan was stored in the session planValue, ok := kvs[PlanSessionKey] assert.True(t, ok) - updatedPlan, ok := planValue.(*Plan) + updatedPlan, ok := planValue.(*plan) assert.True(t, ok) - assert.Equal(t, 2, len(updatedPlan.Steps)) - assert.Equal(t, "Updated Step 1", updatedPlan.Steps[0]) - assert.Equal(t, "Updated Step 2", updatedPlan.Steps[1]) + assert.Equal(t, 2, len(updatedPlan.StepList)) + assert.Equal(t, "Updated Step 1", updatedPlan.StepList[0]) + assert.Equal(t, "Updated Step 2", updatedPlan.StepList[1]) // Verify the execute results were updated executeResultsValue, ok := kvs[ExecuteResultsSessionKey] assert.True(t, ok) - executeResults, ok := executeResultsValue.([]ExecuteResult) + executeResults, ok := executeResultsValue.([]ExecutedStep) assert.True(t, ok) assert.Equal(t, 1, len(executeResults)) - assert.Equal(t, "Step 1", executeResults[0].Task) + assert.Equal(t, "Step 1", executeResults[0].Step) assert.Equal(t, "Execution result", executeResults[0].Result) _, ok = iterator.Next() @@ -489,7 +489,7 @@ func TestReplannerRunWithRespond(t *testing.T) { assert.NoError(t, err) // Store necessary values in the session - plan := &Plan{Steps: []string{"Step 1", "Step 2", "Step 3"}} + plan := &plan{Steps: []string{"Step 1", "Step 2", "Step 3"}} rp, err = AgentWithSessionKVs(ctx, rp, map[string]any{ PlanSessionKey: plan, ExecuteResultSessionKey: "Execution result", @@ -578,9 +578,9 @@ func TestPlanExecuteAgentWithReplan(t *testing.T) { mockReplanner.EXPECT().Description(gomock.Any()).Return("a replanner agent").AnyTimes() // Create a plan - originalPlan := &Plan{Steps: []string{"Step 1", "Step 2", "Step 3"}} + originalPlan := &plan{Steps: []string{"Step 1", "Step 2", "Step 3"}} // Create an updated plan with fewer steps (after replanning) - updatedPlan := &Plan{Steps: []string{"Updated Step 2", "Updated Step 3"}} + updatedPlan := &plan{Steps: []string{"Updated Step 2", "Updated Step 3"}} // Create execute result originalExecuteResult := "Execution result for Step 1" updatedExecuteResult := "Execution result for Updated Step 2" @@ -616,10 +616,10 @@ func TestPlanExecuteAgentWithReplan(t *testing.T) { iterator, generator := adk.NewAsyncIteratorPair[*adk.AgentEvent]() plan, _ := adk.GetSessionValue(ctx, PlanSessionKey) - currentPlan := plan.(*Plan) + currentPlan := plan.(*plan) var msg adk.Message // Check if this is the first replanning (original plan has 3 steps) - if len(currentPlan.Steps) == 3 { + if len(currentPlan.StepList) == 3 { msg = schema.AssistantMessage(originalExecuteResult, nil) adk.SetSessionValue(ctx, ExecuteResultSessionKey, originalExecuteResult) } else { @@ -642,10 +642,10 @@ func TestPlanExecuteAgentWithReplan(t *testing.T) { // First call: Update the plan // Get the current plan from the session plan, _ := adk.GetSessionValue(ctx, PlanSessionKey) - currentPlan := plan.(*Plan) + currentPlan := plan.(*plan) // Check if this is the first replanning (original plan has 3 steps) - if len(currentPlan.Steps) == 3 { + if len(currentPlan.StepList) == 3 { // Send a message event with the updated plan planJSON, _ := sonic.MarshalString(updatedPlan) msg := schema.AssistantMessage(planJSON, nil) @@ -654,8 +654,8 @@ func TestPlanExecuteAgentWithReplan(t *testing.T) { // Set the updated plan & execute result in the session adk.SetSessionValue(ctx, PlanSessionKey, updatedPlan) - adk.SetSessionValue(ctx, ExecuteResultsSessionKey, []ExecuteResult{{ - Task: currentPlan.Steps[0], + adk.SetSessionValue(ctx, ExecuteResultsSessionKey, []ExecutedStep{{ + Step: currentPlan.StepList[0], Result: originalExecuteResult, }}) } else { From aa796291a51262b48e790878100816e33f59a561 Mon Sep 17 00:00:00 2001 From: lipandeng Date: Fri, 29 Aug 2025 16:35:35 +0800 Subject: [PATCH 02/12] feat: abstract Plan for customized plan struct --- adk/prebuilt/plan_execute.go | 91 +++++++++++++++++-------------- adk/prebuilt/plan_execute_test.go | 81 +++++++++++++-------------- 2 files changed, 88 insertions(+), 84 deletions(-) diff --git a/adk/prebuilt/plan_execute.go b/adk/prebuilt/plan_execute.go index b6dfcf18..0a20186d 100644 --- a/adk/prebuilt/plan_execute.go +++ b/adk/prebuilt/plan_execute.go @@ -33,15 +33,20 @@ import ( "github.com/cloudwego/eino/schema" ) +type Step interface { + Description(ctx context.Context) string +} + // Plan represents a structured output format with a list of steps to be executed. -// This struct is used for JSON serialization/deserialization of the plan output -// generated by the model. +// the instance of Plan has two methods: +// - Steps: get the list of steps. the step will be rendered into the model prompt. +// - Unmarshal: used for JSON deserialization of the plan output generated by the model. type Plan interface { - StepList(ctx context.Context) []any + Steps(ctx context.Context) []Step Unmarshal(ctx context.Context, arguments string) error } -// plan the default implementation of Plan. +// executionPlan the default implementation of Plan. // // JSON Schema: // @@ -58,21 +63,27 @@ type Plan interface { // }, // "required": ["steps"] // } -type plan struct { +type executionPlan struct { // Steps contains the ordered list of actions to be taken. // Each step should be clear, actionable, and arranged in a logical sequence. - Steps []string `json:"steps"` + Steps_ []string `json:"steps"` } -func (p *plan) StepList(ctx context.Context) []any { - stepList := make([]any, 0, len(p.Steps)) - for _, step := range p.Steps { - stepList = append(stepList, step) +type executionStep string + +func (s executionStep) Description(ctx context.Context) string { + return string(s) +} + +func (p *executionPlan) Steps(ctx context.Context) []Step { + stepList := make([]Step, 0, len(p.Steps_)) + for _, step := range p.Steps_ { + stepList = append(stepList, executionStep(step)) } return stepList } -func (p *plan) Unmarshal(ctx context.Context, arguments string) error { +func (p *executionPlan) Unmarshal(ctx context.Context, arguments string) error { return sonic.UnmarshalString(arguments, p) } @@ -124,7 +135,7 @@ var ( ## Given the following plan: {plan} ## Your task is to execute the first step, which is: -{task}`)) +{step}`)) ReplannerUserPrompt = prompt.FromMessages(schema.FString, schema.UserMessage( `You are going to review the progress toward an objective. Analyze the current state and determine the optimal next action. @@ -136,7 +147,7 @@ var ( {plan} ## COMPLETED STEPS & RESULTS -{executed_results} +{executed_steps} ## YOUR TASK Based on the progress above, you MUST choose exactly ONE action: @@ -210,11 +221,11 @@ Each step in your plan must be: // PlanSessionKey is the session key for the plan. PlanSessionKey = "Plan" - // ExecuteResultSessionKey is the session key for the execute result. - ExecuteResultSessionKey = "ExecutedStep" + // ExecutedStepSessionKey is the session key for the execute result. + ExecutedStepSessionKey = "ExecutedStep" - // ExecuteResultsSessionKey is the session key for the execute results. - ExecuteResultsSessionKey = "ExecuteResults" + // ExecutedStepsSessionKey is the session key for the execute results. + ExecutedStepsSessionKey = "ExecutedSteps" ) // PlannerConfig provides configuration options for creating a planner agent. @@ -250,7 +261,7 @@ type GenPlannerInputFn func(ctx context.Context, in *PlannerInput) ([]adk.Messag type NewPlanFn func(ctx context.Context) Plan func defaultNewPlanFn(ctx context.Context) Plan { - return &plan{} + return &executionPlan{} } func defaultGenPlannerInputFn(ctx context.Context, in *PlannerInput) ([]adk.Message, error) { @@ -365,7 +376,7 @@ func (p *planner) Run(ctx context.Context, input *adk.AgentInput, generator.Send(event) } - var plan plan + var plan executionPlan var planJSON string if p.toolCall { planJSON = msg.ToolCalls[0].Function.Arguments @@ -442,19 +453,18 @@ type ExecutorConfig struct { ToolsConfig adk.ToolsConfig MaxStep int - GenInputFn GenPlanExecuteInputFn - Instruction string + GenInputFn GenPlanExecuteInputFn } type ExecutedStep struct { - Step any + Step string Result string } func formatPlan(ctx context.Context, p Plan) string { var formattedPlan strings.Builder - for i, step := range p.StepList(ctx) { - formattedPlan.WriteString(fmt.Sprintf("%d. %s\n", i+1, step)) + for i, step := range p.Steps(ctx) { + formattedPlan.WriteString(fmt.Sprintf("%d. %s\n", i+1, step.Description(ctx))) } return formattedPlan.String() @@ -482,7 +492,7 @@ func NewExecutor(ctx context.Context, cfg *ExecutorConfig) (adk.Agent, error) { userInput_ := userInput.([]adk.Message) var executedSteps_ []ExecutedStep - executedStep, ok := adk.GetSessionValue(ctx, ExecuteResultsSessionKey) + executedStep, ok := adk.GetSessionValue(ctx, ExecutedStepsSessionKey) if ok { executedSteps_ = executedStep.([]ExecutedStep) } @@ -504,12 +514,11 @@ func NewExecutor(ctx context.Context, cfg *ExecutorConfig) (adk.Agent, error) { agent, err := adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{ Name: "Executor", Description: "an executor agent", - Instruction: cfg.Instruction, Model: cfg.Model, ToolsConfig: cfg.ToolsConfig, GenModelInput: genInput, MaxStep: cfg.MaxStep, - OutputKey: ExecuteResultSessionKey, + OutputKey: ExecutedStepSessionKey, }) if err != nil { return nil, err @@ -520,11 +529,11 @@ func NewExecutor(ctx context.Context, cfg *ExecutorConfig) (adk.Agent, error) { func defaultGenExecutorInputFn(ctx context.Context, in *PlanExecuteInput) ([]adk.Message, error) { - step := in.Plan.StepList(ctx)[0] + step := in.Plan.Steps(ctx)[0] userMsgs, err := ExecutorUserPrompt.Format(ctx, map[string]any{ "input": formatInput(in.Input), "plan": formatPlan(ctx, in.Plan), - "step": step, + "step": step.Description(ctx), }) if err != nil { return nil, err @@ -590,30 +599,30 @@ func (r *replanner) Description(_ context.Context) string { func (r *replanner) genInput(ctx context.Context) ([]adk.Message, error) { - executeResult, ok := adk.GetSessionValue(ctx, ExecuteResultSessionKey) + executedStep, ok := adk.GetSessionValue(ctx, ExecutedStepSessionKey) if !ok { panic("impossible: execute result not found") } - executeResult_ := executeResult.(string) + executeStep_ := executedStep.(string) plan, ok := adk.GetSessionValue(ctx, PlanSessionKey) if !ok { panic("impossible: plan not found") } plan_ := plan.(Plan) - step := plan_.StepList(ctx)[0] + step := plan_.Steps(ctx)[0] var executedSteps_ []ExecutedStep - executedSteps, ok := adk.GetSessionValue(ctx, ExecuteResultsSessionKey) + executedSteps, ok := adk.GetSessionValue(ctx, ExecutedStepsSessionKey) if ok { executedSteps_ = executedSteps.([]ExecutedStep) } executedSteps_ = append(executedSteps_, ExecutedStep{ - Step: step, - Result: executeResult_, + Step: step.Description(ctx), + Result: executeStep_, }) - adk.SetSessionValue(ctx, ExecuteResultsSessionKey, executedSteps_) + adk.SetSessionValue(ctx, ExecutedStepsSessionKey, executedSteps_) userInput, ok := adk.GetSessionValue(ctx, PlanExecuteUserInputSessionKey) if !ok { @@ -763,11 +772,11 @@ func (r *replanner) Run(ctx context.Context, input *adk.AgentInput, _ ...adk.Age func buildDefaultReplannerInputFn(in *PlanExecuteInput, planToolName, respondToolName string) GenPlanExecuteInputFn { return func(ctx context.Context, in *PlanExecuteInput) ([]adk.Message, error) { msgs, err := ReplannerUserPrompt.Format(ctx, map[string]any{ - "plan": formatPlan(ctx, in.Plan), - "input": formatInput(in.Input), - "executed_results": formatExecutedSteps(in.ExecutedSteps), - "plan_tool": planToolName, - "respond_tool": respondToolName, + "plan": formatPlan(ctx, in.Plan), + "input": formatInput(in.Input), + "executed_steps": formatExecutedSteps(in.ExecutedSteps), + "plan_tool": planToolName, + "respond_tool": respondToolName, }) if err != nil { return nil, err diff --git a/adk/prebuilt/plan_execute_test.go b/adk/prebuilt/plan_execute_test.go index 8bb312dd..aff5db32 100644 --- a/adk/prebuilt/plan_execute_test.go +++ b/adk/prebuilt/plan_execute_test.go @@ -45,7 +45,6 @@ func TestNewPlannerWithFormattedOutput(t *testing.T) { // Create the PlannerConfig conf := &PlannerConfig{ ChatModelWithFormattedOutput: mockChatModel, - Instruction: "Custom instruction", } // Create the planner @@ -128,13 +127,13 @@ func TestPlannerRunWithFormattedOutput(t *testing.T) { event, ok = iterator.Next() assert.False(t, ok) - var plan plan - err = sonic.UnmarshalString(msg.Content, &plan) + var plan executionPlan + err = plan.Unmarshal(ctx, msg.Content) assert.NoError(t, err) - assert.Equal(t, 3, len(plan.StepList)) - assert.Equal(t, "Step 1", plan.StepList[0]) - assert.Equal(t, "Step 2", plan.StepList[1]) - assert.Equal(t, "Step 3", plan.StepList[2]) + assert.Equal(t, 3, len(plan.Steps(ctx))) + assert.Equal(t, "Step 1", plan.Steps(ctx)[0].Description(ctx)) + assert.Equal(t, "Step 2", plan.Steps(ctx)[1].Description(ctx)) + assert.Equal(t, "Step 3", plan.Steps(ctx)[2].Description(ctx)) } // TestPlannerRunWithToolCalling tests the Run method of a planner created with ToolCallingChatModel @@ -194,13 +193,13 @@ func TestPlannerRunWithToolCalling(t *testing.T) { _, ok = iterator.Next() assert.False(t, ok) - var plan plan - err = sonic.UnmarshalString(msg.Content, &plan) + var plan executionPlan + err = plan.Unmarshal(ctx, msg.Content) assert.NoError(t, err) - assert.Equal(t, 3, len(plan.StepList)) - assert.Equal(t, "Step 1", plan.StepList[0]) - assert.Equal(t, "Step 2", plan.StepList[1]) - assert.Equal(t, "Step 3", plan.StepList[2]) + assert.Equal(t, 3, len(plan.Steps(ctx))) + assert.Equal(t, "Step 1", plan.Steps(ctx)[0].Description(ctx)) + assert.Equal(t, "Step 2", plan.Steps(ctx)[1].Description(ctx)) + assert.Equal(t, "Step 3", plan.Steps(ctx)[2].Description(ctx)) } // TestNewExecutor tests the NewExecutor function @@ -216,9 +215,8 @@ func TestNewExecutor(t *testing.T) { // Create the ExecutorConfig conf := &ExecutorConfig{ - Instruction: "Custom instruction", - Model: mockToolCallingModel, - MaxStep: 5, + Model: mockToolCallingModel, + MaxStep: 5, } // Create the executor @@ -243,7 +241,7 @@ func TestExecutorRun(t *testing.T) { mockToolCallingModel := mockModel.NewMockToolCallingChatModel(ctrl) // Store a plan in the session - plan := &plan{Steps: []string{"Step 1", "Step 2", "Step 3"}} + plan := &executionPlan{Steps_: []string{"Step 1", "Step 2", "Step 3"}} adk.SetSessionValue(ctx, PlanSessionKey, plan) // Set up expectations for the mock model @@ -263,16 +261,16 @@ func TestExecutorRun(t *testing.T) { // Create the ExecutorConfig conf := &ExecutorConfig{ - Instruction: "Custom instruction", - Model: mockToolCallingModel, - MaxStep: 5, + Model: mockToolCallingModel, + MaxStep: 5, } // Create the executor executor, err := NewExecutor(ctx, conf) assert.NoError(t, err) executor, err = AgentWithSessionKVs(ctx, executor, map[string]any{ - PlanSessionKey: plan, + PlanSessionKey: plan, + PlanExecuteUserInputSessionKey: []adk.Message{schema.UserMessage("no input")}, }) assert.NoError(t, err) @@ -320,7 +318,6 @@ func TestNewReplanner(t *testing.T) { // Create the ReplannerConfig conf := &ReplannerConfig{ - Instruction: "Custom instruction", ChatModel: mockToolCallingModel, PlanTool: planTool, RespondTool: respondTool, @@ -378,7 +375,6 @@ func TestReplannerRunWithPlan(t *testing.T) { // Create the ReplannerConfig conf := &ReplannerConfig{ - Instruction: "Custom instruction", ChatModel: mockToolCallingModel, PlanTool: planTool, RespondTool: respondTool, @@ -389,10 +385,10 @@ func TestReplannerRunWithPlan(t *testing.T) { assert.NoError(t, err) // Store necessary values in the session - plan := &plan{Steps: []string{"Step 1", "Step 2", "Step 3"}} + plan := &executionPlan{Steps_: []string{"Step 1", "Step 2", "Step 3"}} rp, err = AgentWithSessionKVs(ctx, rp, map[string]any{ PlanSessionKey: plan, - ExecuteResultSessionKey: "Execution result", + ExecutedStepSessionKey: "Execution result", PlanExecuteUserInputSessionKey: []adk.Message{schema.UserMessage("User input")}, }) assert.NoError(t, err) @@ -417,14 +413,14 @@ func TestReplannerRunWithPlan(t *testing.T) { // Verify the updated plan was stored in the session planValue, ok := kvs[PlanSessionKey] assert.True(t, ok) - updatedPlan, ok := planValue.(*plan) + updatedPlan, ok := planValue.(Plan) assert.True(t, ok) - assert.Equal(t, 2, len(updatedPlan.StepList)) - assert.Equal(t, "Updated Step 1", updatedPlan.StepList[0]) - assert.Equal(t, "Updated Step 2", updatedPlan.StepList[1]) + assert.Equal(t, 2, len(updatedPlan.Steps(ctx))) + assert.Equal(t, "Updated Step 1", updatedPlan.Steps(ctx)[0].Description(ctx)) + assert.Equal(t, "Updated Step 2", updatedPlan.Steps(ctx)[1].Description(ctx)) // Verify the execute results were updated - executeResultsValue, ok := kvs[ExecuteResultsSessionKey] + executeResultsValue, ok := kvs[ExecutedStepsSessionKey] assert.True(t, ok) executeResults, ok := executeResultsValue.([]ExecutedStep) assert.True(t, ok) @@ -478,7 +474,6 @@ func TestReplannerRunWithRespond(t *testing.T) { // Create the ReplannerConfig conf := &ReplannerConfig{ - Instruction: "Custom instruction", ChatModel: mockToolCallingModel, PlanTool: planTool, RespondTool: respondTool, @@ -489,10 +484,10 @@ func TestReplannerRunWithRespond(t *testing.T) { assert.NoError(t, err) // Store necessary values in the session - plan := &plan{Steps: []string{"Step 1", "Step 2", "Step 3"}} + plan := &executionPlan{Steps_: []string{"Step 1", "Step 2", "Step 3"}} rp, err = AgentWithSessionKVs(ctx, rp, map[string]any{ PlanSessionKey: plan, - ExecuteResultSessionKey: "Execution result", + ExecutedStepSessionKey: "Execution result", PlanExecuteUserInputSessionKey: []adk.Message{schema.UserMessage("User input")}, }) assert.NoError(t, err) @@ -578,9 +573,9 @@ func TestPlanExecuteAgentWithReplan(t *testing.T) { mockReplanner.EXPECT().Description(gomock.Any()).Return("a replanner agent").AnyTimes() // Create a plan - originalPlan := &plan{Steps: []string{"Step 1", "Step 2", "Step 3"}} + originalPlan := &executionPlan{Steps_: []string{"Step 1", "Step 2", "Step 3"}} // Create an updated plan with fewer steps (after replanning) - updatedPlan := &plan{Steps: []string{"Updated Step 2", "Updated Step 3"}} + updatedPlan := &executionPlan{Steps_: []string{"Updated Step 2", "Updated Step 3"}} // Create execute result originalExecuteResult := "Execution result for Step 1" updatedExecuteResult := "Execution result for Updated Step 2" @@ -616,15 +611,15 @@ func TestPlanExecuteAgentWithReplan(t *testing.T) { iterator, generator := adk.NewAsyncIteratorPair[*adk.AgentEvent]() plan, _ := adk.GetSessionValue(ctx, PlanSessionKey) - currentPlan := plan.(*plan) + currentPlan := plan.(Plan) var msg adk.Message // Check if this is the first replanning (original plan has 3 steps) - if len(currentPlan.StepList) == 3 { + if len(currentPlan.Steps(ctx)) == 3 { msg = schema.AssistantMessage(originalExecuteResult, nil) - adk.SetSessionValue(ctx, ExecuteResultSessionKey, originalExecuteResult) + adk.SetSessionValue(ctx, ExecutedStepSessionKey, originalExecuteResult) } else { msg = schema.AssistantMessage(updatedExecuteResult, nil) - adk.SetSessionValue(ctx, ExecuteResultSessionKey, updatedExecuteResult) + adk.SetSessionValue(ctx, ExecutedStepSessionKey, updatedExecuteResult) } event := adk.EventFromMessage(msg, nil, schema.Assistant, "") generator.Send(event) @@ -642,10 +637,10 @@ func TestPlanExecuteAgentWithReplan(t *testing.T) { // First call: Update the plan // Get the current plan from the session plan, _ := adk.GetSessionValue(ctx, PlanSessionKey) - currentPlan := plan.(*plan) + currentPlan := plan.(Plan) // Check if this is the first replanning (original plan has 3 steps) - if len(currentPlan.StepList) == 3 { + if len(currentPlan.Steps(ctx)) == 3 { // Send a message event with the updated plan planJSON, _ := sonic.MarshalString(updatedPlan) msg := schema.AssistantMessage(planJSON, nil) @@ -654,8 +649,8 @@ func TestPlanExecuteAgentWithReplan(t *testing.T) { // Set the updated plan & execute result in the session adk.SetSessionValue(ctx, PlanSessionKey, updatedPlan) - adk.SetSessionValue(ctx, ExecuteResultsSessionKey, []ExecutedStep{{ - Step: currentPlan.StepList[0], + adk.SetSessionValue(ctx, ExecutedStepsSessionKey, []ExecutedStep{{ + Step: currentPlan.Steps(ctx)[0].Description(ctx), Result: originalExecuteResult, }}) } else { From 586b307eb0c19051af053f74d187b3ec55d40f86 Mon Sep 17 00:00:00 2001 From: lipandeng Date: Fri, 29 Aug 2025 20:47:14 +0800 Subject: [PATCH 03/12] feat: adjust Plan's abstract --- adk/prebuilt/plan_execute.go | 112 +++++++++++++----------------- adk/prebuilt/plan_execute_test.go | 43 ++++++------ 2 files changed, 71 insertions(+), 84 deletions(-) diff --git a/adk/prebuilt/plan_execute.go b/adk/prebuilt/plan_execute.go index 0a20186d..000d50d8 100644 --- a/adk/prebuilt/plan_execute.go +++ b/adk/prebuilt/plan_execute.go @@ -33,18 +33,16 @@ import ( "github.com/cloudwego/eino/schema" ) -type Step interface { +// Plan represents an execution plan with a sequence of actionable steps. +// It provides methods to access the plan's description and retrieve the next step to execute. +type Plan interface { + // Description returns the complete textual description of the plan. Description(ctx context.Context) string + // FirstStep returns the next step to be executed in the plan. + FirstStep(ctx context.Context) string } -// Plan represents a structured output format with a list of steps to be executed. -// the instance of Plan has two methods: -// - Steps: get the list of steps. the step will be rendered into the model prompt. -// - Unmarshal: used for JSON deserialization of the plan output generated by the model. -type Plan interface { - Steps(ctx context.Context) []Step - Unmarshal(ctx context.Context, arguments string) error -} +type PlanParser func(ctx context.Context, arguments string) (Plan, error) // executionPlan the default implementation of Plan. // @@ -69,22 +67,20 @@ type executionPlan struct { Steps_ []string `json:"steps"` } -type executionStep string +func (p *executionPlan) Description(ctx context.Context) string { + var formattedPlan strings.Builder + for i, step := range p.Steps_ { + formattedPlan.WriteString(fmt.Sprintf("%d. %s\n", i+1, step)) + } -func (s executionStep) Description(ctx context.Context) string { - return string(s) + return formattedPlan.String() } -func (p *executionPlan) Steps(ctx context.Context) []Step { - stepList := make([]Step, 0, len(p.Steps_)) - for _, step := range p.Steps_ { - stepList = append(stepList, executionStep(step)) +func (p *executionPlan) FirstStep(ctx context.Context) string { + if len(p.Steps_) == 0 { + return "" } - return stepList -} - -func (p *executionPlan) Unmarshal(ctx context.Context, arguments string) error { - return sonic.UnmarshalString(arguments, p) + return p.Steps_[0] } // Response represents the final response to the user. @@ -249,7 +245,7 @@ type PlannerConfig struct { GenInputFn GenPlannerInputFn - NewPlanFn NewPlanFn + PlanParser PlanParser } type PlannerInput struct { @@ -258,10 +254,13 @@ type PlannerInput struct { type GenPlannerInputFn func(ctx context.Context, in *PlannerInput) ([]adk.Message, error) -type NewPlanFn func(ctx context.Context) Plan - -func defaultNewPlanFn(ctx context.Context) Plan { - return &executionPlan{} +func defaultPlanParserFn(ctx context.Context, arguments string) (Plan, error) { + p := &executionPlan{} + err := sonic.UnmarshalString(arguments, p) + if err != nil { + return nil, err + } + return p, nil } func defaultGenPlannerInputFn(ctx context.Context, in *PlannerInput) ([]adk.Message, error) { @@ -275,7 +274,7 @@ type planner struct { toolCall bool chatModel model.BaseChatModel genInputFn GenPlannerInputFn - newPlanFn NewPlanFn + planParser PlanParser } func (p *planner) Name(_ context.Context) string { @@ -376,22 +375,20 @@ func (p *planner) Run(ctx context.Context, input *adk.AgentInput, generator.Send(event) } - var plan executionPlan var planJSON string if p.toolCall { planJSON = msg.ToolCalls[0].Function.Arguments } else { planJSON = msg.Content } - plan_ := p.newPlanFn(ctx) - err = plan_.Unmarshal(ctx, planJSON) + plan, err := p.planParser(ctx, planJSON) if err != nil { err = fmt.Errorf("unmarshal plan error: %w", err) generator.Send(&adk.AgentEvent{Err: err}) return } - adk.SetSessionValue(ctx, PlanSessionKey, &plan) + adk.SetSessionValue(ctx, PlanSessionKey, plan) }() return iterator @@ -428,16 +425,16 @@ func NewPlanner(_ context.Context, cfg *PlannerConfig) (adk.Agent, error) { inputFn = defaultGenPlannerInputFn } - newPlanFn := cfg.NewPlanFn - if newPlanFn == nil { - newPlanFn = defaultNewPlanFn + planParser := cfg.PlanParser + if planParser == nil { + planParser = defaultPlanParserFn } return &planner{ toolCall: toolCall, chatModel: chatModel, genInputFn: inputFn, - newPlanFn: newPlanFn, + planParser: planParser, }, nil } @@ -461,15 +458,6 @@ type ExecutedStep struct { Result string } -func formatPlan(ctx context.Context, p Plan) string { - var formattedPlan strings.Builder - for i, step := range p.Steps(ctx) { - formattedPlan.WriteString(fmt.Sprintf("%d. %s\n", i+1, step.Description(ctx))) - } - - return formattedPlan.String() -} - // NewExecutor creates a new executor agent. func NewExecutor(ctx context.Context, cfg *ExecutorConfig) (adk.Agent, error) { @@ -529,11 +517,10 @@ func NewExecutor(ctx context.Context, cfg *ExecutorConfig) (adk.Agent, error) { func defaultGenExecutorInputFn(ctx context.Context, in *PlanExecuteInput) ([]adk.Message, error) { - step := in.Plan.Steps(ctx)[0] userMsgs, err := ExecutorUserPrompt.Format(ctx, map[string]any{ "input": formatInput(in.Input), - "plan": formatPlan(ctx, in.Plan), - "step": step.Description(ctx), + "plan": in.Plan.Description(ctx), + "step": in.Plan.FirstStep(ctx), }) if err != nil { return nil, err @@ -548,7 +535,7 @@ type replanner struct { respondTool *schema.ToolInfo genInputFn GenPlanExecuteInputFn - newPlanFn NewPlanFn + planParser PlanParser } type ReplannerConfig struct { @@ -567,7 +554,7 @@ type ReplannerConfig struct { GenInputFn GenPlanExecuteInputFn - NewPlanFn NewPlanFn + PlanParser PlanParser } func formatInput(input []adk.Message) string { @@ -603,14 +590,14 @@ func (r *replanner) genInput(ctx context.Context) ([]adk.Message, error) { if !ok { panic("impossible: execute result not found") } - executeStep_ := executedStep.(string) + executedStep_ := executedStep.(string) plan, ok := adk.GetSessionValue(ctx, PlanSessionKey) if !ok { panic("impossible: plan not found") } plan_ := plan.(Plan) - step := plan_.Steps(ctx)[0] + step := plan_.FirstStep(ctx) var executedSteps_ []ExecutedStep executedSteps, ok := adk.GetSessionValue(ctx, ExecutedStepsSessionKey) @@ -619,8 +606,8 @@ func (r *replanner) genInput(ctx context.Context) ([]adk.Message, error) { } executedSteps_ = append(executedSteps_, ExecutedStep{ - Step: step.Description(ctx), - Result: executeStep_, + Step: step, + Result: executedStep_, }) adk.SetSessionValue(ctx, ExecutedStepsSessionKey, executedSteps_) @@ -755,11 +742,10 @@ func (r *replanner) Run(ctx context.Context, input *adk.AgentInput, _ ...adk.Age return } - plan_ := r.newPlanFn(ctx) - err = plan_.Unmarshal(ctx, planMsg.ToolCalls[0].Function.Arguments) - if err != nil { - err = fmt.Errorf("unmarshal plan error: %w", err) - generator.Send(&adk.AgentEvent{Err: err}) + plan_, err_ := r.planParser(ctx, planMsg.ToolCalls[0].Function.Name) + if err_ != nil { + err_ = fmt.Errorf("unmarshal plan error: %w", err_) + generator.Send(&adk.AgentEvent{Err: err_}) return } @@ -772,7 +758,7 @@ func (r *replanner) Run(ctx context.Context, input *adk.AgentInput, _ ...adk.Age func buildDefaultReplannerInputFn(in *PlanExecuteInput, planToolName, respondToolName string) GenPlanExecuteInputFn { return func(ctx context.Context, in *PlanExecuteInput) ([]adk.Message, error) { msgs, err := ReplannerUserPrompt.Format(ctx, map[string]any{ - "plan": formatPlan(ctx, in.Plan), + "plan": in.Plan.Description(ctx), "input": formatInput(in.Input), "executed_steps": formatExecutedSteps(in.ExecutedSteps), "plan_tool": planToolName, @@ -802,9 +788,9 @@ func NewReplanner(_ context.Context, cfg *ReplannerConfig) (adk.Agent, error) { return nil, err } - newPlanFn := cfg.NewPlanFn - if newPlanFn == nil { - newPlanFn = defaultNewPlanFn + planParser := cfg.PlanParser + if planParser == nil { + planParser = defaultPlanParserFn } return &replanner{ @@ -812,7 +798,7 @@ func NewReplanner(_ context.Context, cfg *ReplannerConfig) (adk.Agent, error) { planTool: planTool, respondTool: respondTool, genInputFn: cfg.GenInputFn, - newPlanFn: newPlanFn, + planParser: planParser, }, nil } diff --git a/adk/prebuilt/plan_execute_test.go b/adk/prebuilt/plan_execute_test.go index aff5db32..7586f93c 100644 --- a/adk/prebuilt/plan_execute_test.go +++ b/adk/prebuilt/plan_execute_test.go @@ -127,13 +127,13 @@ func TestPlannerRunWithFormattedOutput(t *testing.T) { event, ok = iterator.Next() assert.False(t, ok) - var plan executionPlan - err = plan.Unmarshal(ctx, msg.Content) + plan, err := defaultPlanParserFn(ctx, msg.Content) assert.NoError(t, err) - assert.Equal(t, 3, len(plan.Steps(ctx))) - assert.Equal(t, "Step 1", plan.Steps(ctx)[0].Description(ctx)) - assert.Equal(t, "Step 2", plan.Steps(ctx)[1].Description(ctx)) - assert.Equal(t, "Step 3", plan.Steps(ctx)[2].Description(ctx)) + plan_ := plan.(*executionPlan) + assert.Equal(t, 3, len(plan_.Steps_)) + assert.Equal(t, "Step 1", plan_.Steps_[0]) + assert.Equal(t, "Step 2", plan_.Steps_[1]) + assert.Equal(t, "Step 3", plan_.Steps_[2]) } // TestPlannerRunWithToolCalling tests the Run method of a planner created with ToolCallingChatModel @@ -193,13 +193,14 @@ func TestPlannerRunWithToolCalling(t *testing.T) { _, ok = iterator.Next() assert.False(t, ok) - var plan executionPlan - err = plan.Unmarshal(ctx, msg.Content) + plan, err := defaultPlanParserFn(ctx, msg.Content) assert.NoError(t, err) - assert.Equal(t, 3, len(plan.Steps(ctx))) - assert.Equal(t, "Step 1", plan.Steps(ctx)[0].Description(ctx)) - assert.Equal(t, "Step 2", plan.Steps(ctx)[1].Description(ctx)) - assert.Equal(t, "Step 3", plan.Steps(ctx)[2].Description(ctx)) + plan_ := plan.(*executionPlan) + assert.NoError(t, err) + assert.Equal(t, 3, len(plan_.Steps_)) + assert.Equal(t, "Step 1", plan_.Steps_[0]) + assert.Equal(t, "Step 2", plan_.Steps_[1]) + assert.Equal(t, "Step 3", plan_.Steps_[2]) } // TestNewExecutor tests the NewExecutor function @@ -413,11 +414,11 @@ func TestReplannerRunWithPlan(t *testing.T) { // Verify the updated plan was stored in the session planValue, ok := kvs[PlanSessionKey] assert.True(t, ok) - updatedPlan, ok := planValue.(Plan) + updatedPlan, ok := planValue.(*executionPlan) assert.True(t, ok) - assert.Equal(t, 2, len(updatedPlan.Steps(ctx))) - assert.Equal(t, "Updated Step 1", updatedPlan.Steps(ctx)[0].Description(ctx)) - assert.Equal(t, "Updated Step 2", updatedPlan.Steps(ctx)[1].Description(ctx)) + assert.Equal(t, 2, len(updatedPlan.Steps_)) + assert.Equal(t, "Updated Step 1", updatedPlan.Steps_[0]) + assert.Equal(t, "Updated Step 2", updatedPlan.Steps_[1]) // Verify the execute results were updated executeResultsValue, ok := kvs[ExecutedStepsSessionKey] @@ -611,10 +612,10 @@ func TestPlanExecuteAgentWithReplan(t *testing.T) { iterator, generator := adk.NewAsyncIteratorPair[*adk.AgentEvent]() plan, _ := adk.GetSessionValue(ctx, PlanSessionKey) - currentPlan := plan.(Plan) + currentPlan := plan.(*executionPlan) var msg adk.Message // Check if this is the first replanning (original plan has 3 steps) - if len(currentPlan.Steps(ctx)) == 3 { + if len(currentPlan.Steps_) == 3 { msg = schema.AssistantMessage(originalExecuteResult, nil) adk.SetSessionValue(ctx, ExecutedStepSessionKey, originalExecuteResult) } else { @@ -637,10 +638,10 @@ func TestPlanExecuteAgentWithReplan(t *testing.T) { // First call: Update the plan // Get the current plan from the session plan, _ := adk.GetSessionValue(ctx, PlanSessionKey) - currentPlan := plan.(Plan) + currentPlan := plan.(*executionPlan) // Check if this is the first replanning (original plan has 3 steps) - if len(currentPlan.Steps(ctx)) == 3 { + if len(currentPlan.Steps_) == 3 { // Send a message event with the updated plan planJSON, _ := sonic.MarshalString(updatedPlan) msg := schema.AssistantMessage(planJSON, nil) @@ -650,7 +651,7 @@ func TestPlanExecuteAgentWithReplan(t *testing.T) { // Set the updated plan & execute result in the session adk.SetSessionValue(ctx, PlanSessionKey, updatedPlan) adk.SetSessionValue(ctx, ExecutedStepsSessionKey, []ExecutedStep{{ - Step: currentPlan.Steps(ctx)[0].Description(ctx), + Step: currentPlan.Steps_[0], Result: originalExecuteResult, }}) } else { From 4d015038cc0b26b64082375d1ed5dcd0a3c864de Mon Sep 17 00:00:00 2001 From: lipandeng Date: Fri, 29 Aug 2025 21:09:59 +0800 Subject: [PATCH 04/12] fix: replanner's plan parser --- adk/prebuilt/plan_execute.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adk/prebuilt/plan_execute.go b/adk/prebuilt/plan_execute.go index 000d50d8..ff5eb502 100644 --- a/adk/prebuilt/plan_execute.go +++ b/adk/prebuilt/plan_execute.go @@ -742,7 +742,7 @@ func (r *replanner) Run(ctx context.Context, input *adk.AgentInput, _ ...adk.Age return } - plan_, err_ := r.planParser(ctx, planMsg.ToolCalls[0].Function.Name) + plan_, err_ := r.planParser(ctx, planMsg.ToolCalls[0].Function.Arguments) if err_ != nil { err_ = fmt.Errorf("unmarshal plan error: %w", err_) generator.Send(&adk.AgentEvent{Err: err_}) From 7f2d768f773c1d7eddeb5d6858d6c3df7967d3a3 Mon Sep 17 00:00:00 2001 From: lipandeng Date: Mon, 1 Sep 2025 12:11:57 +0800 Subject: [PATCH 05/12] feat: adjust prompt template --- adk/prebuilt/plan_execute.go | 90 +++++++++++++++++++++++++++--------- 1 file changed, 68 insertions(+), 22 deletions(-) diff --git a/adk/prebuilt/plan_execute.go b/adk/prebuilt/plan_execute.go index ff5eb502..8c0e6d5e 100644 --- a/adk/prebuilt/plan_execute.go +++ b/adk/prebuilt/plan_execute.go @@ -38,7 +38,7 @@ import ( type Plan interface { // Description returns the complete textual description of the plan. Description(ctx context.Context) string - // FirstStep returns the next step to be executed in the plan. + // FirstStep returns the first step to be executed in the plan. FirstStep(ctx context.Context) string } @@ -126,24 +126,57 @@ var ( ), } - ExecutorUserPrompt = prompt.FromMessages(schema.FString, schema.UserMessage( - ` -## Given the following plan: -{plan} -## Your task is to execute the first step, which is: -{step}`)) + // PlannerPrompt is the prompt template for the planner. + // It provides context and guidance to the planner on how to generate the Plan. + PlannerPrompt = prompt.FromMessages(schema.FString, + schema.SystemMessage(`You are an expert planning agent. Given an objective, create a comprehensive step-by-step plan to achieve the objective. - ReplannerUserPrompt = prompt.FromMessages(schema.FString, schema.UserMessage( - `You are going to review the progress toward an objective. Analyze the current state and determine the optimal next action. +## YOUR TASK +Analyze the objective and generate a strategic plan that breaks down the goal into manageable, executable steps. -## OBJECTIVE -{input} +## PLANNING REQUIREMENTS +Each step in your plan must be: +- **Specific and actionable**: Clear instructions that can be executed without ambiguity +- **Self-contained**: Include all necessary context, parameters, and requirements +- **Independently executable**: Can be performed by an agent without dependencies on other steps +- **Logically sequenced**: Arranged in optimal order for efficient execution +- **Objective-focused**: Directly contribute to achieving the main goal -## ORIGINAL PLAN -{plan} +## PLANNING GUIDELINES +- Eliminate redundant or unnecessary steps +- Include relevant constraints, parameters, and success criteria for each step +- Ensure the final step produces a complete answer or deliverable +- Anticipate potential challenges and include mitigation strategies +- Structure steps to build upon each other logically +- Provide sufficient detail for successful execution +## QUALITY CRITERIA +- Plan completeness: Does it address all aspects of the objective? +- Step clarity: Can each step be understood and executed independently? +- Logical flow: Do steps follow a sensible progression? +- Efficiency: Is this the most direct path to the objective? +- Adaptability: Can the plan handle unexpected results or changes?`), + schema.MessagesPlaceholder("input", false), + ) + + // ExecutorPrompt is the prompt template for the executor. + // It provides context and guidance to the executor on how to execute the Task. + ExecutorPrompt = prompt.FromMessages(schema.FString, + schema.SystemMessage(`You are a diligent and meticulous executor agent. Follow the given plan and execute your tasks carefully and thoroughly.`), + schema.UserMessage(`## OBJECTIVE +{input} +## Given the following plan: +{plan} ## COMPLETED STEPS & RESULTS {executed_steps} +## Your task is to execute the first step, which is: +{step}`)) + + // ReplannerPrompt is the prompt template for the replanner. + // It provides context and guidance to the replanner on how to regenerate the Plan. + ReplannerPrompt = prompt.FromMessages(schema.FString, + schema.SystemMessage( + `You are going to review the progress toward an objective. Analyze the current state and determine the optimal next action. ## YOUR TASK Based on the progress above, you MUST choose exactly ONE action: @@ -178,7 +211,16 @@ Each step in your plan must be: - Has the original objective been completely satisfied? - Are there any remaining requirements or sub-goals? - Do the results suggest a need for strategy adjustment? -- What specific actions are still required?`)) +- What specific actions are still required?`), + schema.UserMessage(`## OBJECTIVE +{input} + +## ORIGINAL PLAN +{plan} + +## COMPLETED STEPS & RESULTS +{executed_steps}`), + ) ) const ( @@ -264,9 +306,12 @@ func defaultPlanParserFn(ctx context.Context, arguments string) (Plan, error) { } func defaultGenPlannerInputFn(ctx context.Context, in *PlannerInput) ([]adk.Message, error) { - msgs := make([]adk.Message, 0, 1+len(in.Input)) - msgs = append(msgs, schema.SystemMessage(PlannerInstruction)) - msgs = append(msgs, in.Input...) + msgs, err := PlannerPrompt.Format(ctx, map[string]any{ + "input": in.Input, + }) + if err != nil { + return nil, err + } return msgs, nil } @@ -517,10 +562,11 @@ func NewExecutor(ctx context.Context, cfg *ExecutorConfig) (adk.Agent, error) { func defaultGenExecutorInputFn(ctx context.Context, in *PlanExecuteInput) ([]adk.Message, error) { - userMsgs, err := ExecutorUserPrompt.Format(ctx, map[string]any{ - "input": formatInput(in.Input), - "plan": in.Plan.Description(ctx), - "step": in.Plan.FirstStep(ctx), + userMsgs, err := ExecutorPrompt.Format(ctx, map[string]any{ + "input": formatInput(in.Input), + "plan": in.Plan.Description(ctx), + "executed_steps": formatExecutedSteps(in.ExecutedSteps), + "step": in.Plan.FirstStep(ctx), }) if err != nil { return nil, err @@ -757,7 +803,7 @@ func (r *replanner) Run(ctx context.Context, input *adk.AgentInput, _ ...adk.Age func buildDefaultReplannerInputFn(in *PlanExecuteInput, planToolName, respondToolName string) GenPlanExecuteInputFn { return func(ctx context.Context, in *PlanExecuteInput) ([]adk.Message, error) { - msgs, err := ReplannerUserPrompt.Format(ctx, map[string]any{ + msgs, err := ReplannerPrompt.Format(ctx, map[string]any{ "plan": in.Plan.Description(ctx), "input": formatInput(in.Input), "executed_steps": formatExecutedSteps(in.ExecutedSteps), From 72671f7e5d71b248a00453eed9e4f57a8044f541 Mon Sep 17 00:00:00 2001 From: lipandeng Date: Mon, 1 Sep 2025 13:28:35 +0800 Subject: [PATCH 06/12] feat: adjust comments --- adk/prebuilt/plan_execute.go | 77 +++++++++++++++++------------------- 1 file changed, 36 insertions(+), 41 deletions(-) diff --git a/adk/prebuilt/plan_execute.go b/adk/prebuilt/plan_execute.go index 8c0e6d5e..16be6ba5 100644 --- a/adk/prebuilt/plan_execute.go +++ b/adk/prebuilt/plan_execute.go @@ -34,7 +34,7 @@ import ( ) // Plan represents an execution plan with a sequence of actionable steps. -// It provides methods to access the plan's description and retrieve the next step to execute. +// It provides methods to access the plan's description and retrieve the first step to execute. type Plan interface { // Description returns the complete textual description of the plan. Description(ctx context.Context) string @@ -42,6 +42,7 @@ type Plan interface { FirstStep(ctx context.Context) string } +// PlanParser parses the plan string generated by Model into a Plan instance. type PlanParser func(ctx context.Context, arguments string) (Plan, error) // executionPlan the default implementation of Plan. @@ -224,38 +225,9 @@ Each step in your plan must be: ) const ( + // PlanExecuteUserInputSessionKey is the session key for the user input. PlanExecuteUserInputSessionKey = "UserInput" - // PlannerInstruction is the system instruction for the planner. - // It provides context and guidance to the planner on how to generate the Plan. - PlannerInstruction = `You are an expert planning agent. Given an objective, create a comprehensive step-by-step plan to achieve the objective. - -## YOUR TASK -Analyze the objective and generate a strategic plan that breaks down the goal into manageable, executable steps. - -## PLANNING REQUIREMENTS -Each step in your plan must be: -- **Specific and actionable**: Clear instructions that can be executed without ambiguity -- **Self-contained**: Include all necessary context, parameters, and requirements -- **Independently executable**: Can be performed by an agent without dependencies on other steps -- **Logically sequenced**: Arranged in optimal order for efficient execution -- **Objective-focused**: Directly contribute to achieving the main goal - -## PLANNING GUIDELINES -- Eliminate redundant or unnecessary steps -- Include relevant constraints, parameters, and success criteria for each step -- Ensure the final step produces a complete answer or deliverable -- Anticipate potential challenges and include mitigation strategies -- Structure steps to build upon each other logically -- Provide sufficient detail for successful execution - -## QUALITY CRITERIA -- Plan completeness: Does it address all aspects of the objective? -- Step clarity: Can each step be understood and executed independently? -- Logical flow: Do steps follow a sensible progression? -- Efficiency: Is this the most direct path to the objective? -- Adaptability: Can the plan handle unexpected results or changes?` - // PlanSessionKey is the session key for the plan. PlanSessionKey = "Plan" @@ -285,15 +257,21 @@ type PlannerConfig struct { // If not provided, PlanToolInfo will be used as the default. ToolInfo *schema.ToolInfo + // GenInputFn is a function that generates the input messages for the planner. + // If not provided, defaultGenPlannerInputFn will be used as the default. GenInputFn GenPlannerInputFn + // PlanParser is a function that parses the plan string generated by Model into a Plan instance. + // If not provided, defaultPlanParserFn will be used as the default. PlanParser PlanParser } +// PlannerInput is the input information for the planner. type PlannerInput struct { Input []adk.Message } +// GenPlannerInputFn is a function that generates the input messages for the planner. type GenPlannerInputFn func(ctx context.Context, in *PlannerInput) ([]adk.Message, error) func defaultPlanParserFn(ctx context.Context, arguments string) (Plan, error) { @@ -483,18 +461,28 @@ func NewPlanner(_ context.Context, cfg *PlannerConfig) (adk.Agent, error) { }, nil } +// PlanExecuteInput is the input information for the executor and the planner. type PlanExecuteInput struct { Input []adk.Message Plan Plan ExecutedSteps []ExecutedStep } + +// GenPlanExecuteInputFn is a function that generates the input messages for the executor. type GenPlanExecuteInputFn func(ctx context.Context, in *PlanExecuteInput) ([]adk.Message, error) +// ExecutorConfig provides configuration options for creating a executor agent. type ExecutorConfig struct { - Model model.ToolCallingChatModel + // Model is the chat model used by the executor. + Model model.ToolCallingChatModel + + // ToolsConfig is the tools configuration used by the executor. ToolsConfig adk.ToolsConfig - MaxStep int + // MaxStep is the maximum number of steps allowed for the executor. + MaxStep int + + // GenInputFn is the function that generates the input messages for the executor. GenInputFn GenPlanExecuteInputFn } @@ -598,11 +586,16 @@ type ReplannerConfig struct { // If not provided, the default RespondToolInfo will be used. RespondTool *schema.ToolInfo + // GenInputFn is the function that generates the input messages for the executor. + // if not provided, defaultGenPlannerInputFn will be used. GenInputFn GenPlanExecuteInputFn + // PlanParser is a function that parses the plan string generated by Model into a Plan instance. + // If not provided, defaultPlanParserFn will be used as the default. PlanParser PlanParser } +// formatInput formats the input messages into a string. func formatInput(input []adk.Message) string { var sb strings.Builder for _, msg := range input { @@ -848,22 +841,24 @@ func NewReplanner(_ context.Context, cfg *ReplannerConfig) (adk.Agent, error) { }, nil } +// PlanExecuteConfig provides configuration options for creating a plan execute agent. type PlanExecuteConfig struct { - Planner adk.Agent - Executor adk.Agent - Replanner adk.Agent - MaxReplans int + Planner adk.Agent + Executor adk.Agent + Replanner adk.Agent + MaxIterations int } +// NewPlanExecuteAgent creates a new plan execute agent with the given configuration. func NewPlanExecuteAgent(ctx context.Context, cfg *PlanExecuteConfig) (adk.Agent, error) { - maxReplans := cfg.MaxReplans - if maxReplans <= 0 { - maxReplans = 10 + maxIterations := cfg.MaxIterations + if maxIterations <= 0 { + maxIterations = 10 } loop, err := adk.NewLoopAgent(ctx, &adk.LoopAgentConfig{ Name: "execute_replan", SubAgents: []adk.Agent{cfg.Executor, cfg.Replanner}, - MaxIterations: maxReplans, + MaxIterations: maxIterations, }) if err != nil { return nil, err From 758b0f9e6942cce6476773bfe5223e71a6ed7695 Mon Sep 17 00:00:00 2001 From: lipandeng Date: Mon, 1 Sep 2025 14:33:53 +0800 Subject: [PATCH 07/12] feat: adjust comments --- adk/prebuilt/plan_execute.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/adk/prebuilt/plan_execute.go b/adk/prebuilt/plan_execute.go index 16be6ba5..5838d019 100644 --- a/adk/prebuilt/plan_execute.go +++ b/adk/prebuilt/plan_execute.go @@ -261,8 +261,11 @@ type PlannerConfig struct { // If not provided, defaultGenPlannerInputFn will be used as the default. GenInputFn GenPlannerInputFn - // PlanParser is a function that parses the plan string generated by Model into a Plan instance. - // If not provided, defaultPlanParserFn will be used as the default. + // PlanParser parses the model-generated plan string into a Plan instance. + // The arguments format of PlanParser depends on the model configuration: + // - ChatModelWithFormattedOutput: expects Plan format output + // - ToolCallingChatModel + ToolInfo: uses ToolInfo schema for arguments structure + // if not provided,defaultPlanParserFn will be used as the default. PlanParser PlanParser } @@ -482,7 +485,7 @@ type ExecutorConfig struct { // MaxStep is the maximum number of steps allowed for the executor. MaxStep int - // GenInputFn is the function that generates the input messages for the executor. + // GenInputFn is the function that generates the input messages for the Executor. GenInputFn GenPlanExecuteInputFn } @@ -586,11 +589,13 @@ type ReplannerConfig struct { // If not provided, the default RespondToolInfo will be used. RespondTool *schema.ToolInfo - // GenInputFn is the function that generates the input messages for the executor. - // if not provided, defaultGenPlannerInputFn will be used. + // GenInputFn is the function that generates the input messages for the Replanner. + // if not provided, buildDefaultReplannerInputFn will be used. GenInputFn GenPlanExecuteInputFn - // PlanParser is a function that parses the plan string generated by Model into a Plan instance. + // PlanParser parses the model-generated plan string into a Plan instance. + // The arguments format of PlanParser depends on the model configuration: + // - ChatModel with PlanTool: uses PlanTool schema for arguments structure // If not provided, defaultPlanParserFn will be used as the default. PlanParser PlanParser } From 1db6dc00985a74784a54dc5befa75eeb22808b7f Mon Sep 17 00:00:00 2001 From: lipandeng Date: Mon, 1 Sep 2025 16:27:27 +0800 Subject: [PATCH 08/12] feat: adjust Plan definition --- adk/prebuilt/plan_execute.go | 106 ++++++++++++++++-------------- adk/prebuilt/plan_execute_test.go | 4 +- 2 files changed, 58 insertions(+), 52 deletions(-) diff --git a/adk/prebuilt/plan_execute.go b/adk/prebuilt/plan_execute.go index 5838d019..e4c30065 100644 --- a/adk/prebuilt/plan_execute.go +++ b/adk/prebuilt/plan_execute.go @@ -18,6 +18,7 @@ package prebuilt import ( "context" + "encoding/json" "errors" "fmt" "io" @@ -36,14 +37,15 @@ import ( // Plan represents an execution plan with a sequence of actionable steps. // It provides methods to access the plan's description and retrieve the first step to execute. type Plan interface { - // Description returns the complete textual description of the plan. - Description(ctx context.Context) string // FirstStep returns the first step to be executed in the plan. - FirstStep(ctx context.Context) string + FirstStep() string + + json.Marshaler + json.Unmarshaler } -// PlanParser parses the plan string generated by Model into a Plan instance. -type PlanParser func(ctx context.Context, arguments string) (Plan, error) +// NewPlan parses the plan string generated by Model into a Plan instance. +type NewPlan func(ctx context.Context) Plan // executionPlan the default implementation of Plan. // @@ -68,22 +70,23 @@ type executionPlan struct { Steps_ []string `json:"steps"` } -func (p *executionPlan) Description(ctx context.Context) string { - var formattedPlan strings.Builder - for i, step := range p.Steps_ { - formattedPlan.WriteString(fmt.Sprintf("%d. %s\n", i+1, step)) - } - - return formattedPlan.String() -} - -func (p *executionPlan) FirstStep(ctx context.Context) string { +func (p *executionPlan) FirstStep() string { if len(p.Steps_) == 0 { return "" } return p.Steps_[0] } +func (p *executionPlan) MarshalJSON() ([]byte, error) { + type planTyp executionPlan + return sonic.Marshal((*planTyp)(p)) +} + +func (p *executionPlan) UnmarshalJSON(bytes []byte) error { + type planTyp executionPlan + return sonic.Unmarshal(bytes, (*planTyp)(p)) +} + // Response represents the final response to the user. // This struct is used for JSON serialization/deserialization of the final response // generated by the model. @@ -261,12 +264,10 @@ type PlannerConfig struct { // If not provided, defaultGenPlannerInputFn will be used as the default. GenInputFn GenPlannerInputFn - // PlanParser parses the model-generated plan string into a Plan instance. - // The arguments format of PlanParser depends on the model configuration: - // - ChatModelWithFormattedOutput: expects Plan format output - // - ToolCallingChatModel + ToolInfo: uses ToolInfo schema for arguments structure - // if not provided,defaultPlanParserFn will be used as the default. - PlanParser PlanParser + // NewPlan creates a new Plan instance for JSON. + // The returned Plan will be used to unmarshal the model-generated JSON output. + // If not provided, defaultNewPlan will be used as the default. + NewPlan NewPlan } // PlannerInput is the input information for the planner. @@ -277,13 +278,8 @@ type PlannerInput struct { // GenPlannerInputFn is a function that generates the input messages for the planner. type GenPlannerInputFn func(ctx context.Context, in *PlannerInput) ([]adk.Message, error) -func defaultPlanParserFn(ctx context.Context, arguments string) (Plan, error) { - p := &executionPlan{} - err := sonic.UnmarshalString(arguments, p) - if err != nil { - return nil, err - } - return p, nil +func defaultNewPlan(ctx context.Context) Plan { + return &executionPlan{} } func defaultGenPlannerInputFn(ctx context.Context, in *PlannerInput) ([]adk.Message, error) { @@ -300,7 +296,7 @@ type planner struct { toolCall bool chatModel model.BaseChatModel genInputFn GenPlannerInputFn - planParser PlanParser + newPlan NewPlan } func (p *planner) Name(_ context.Context) string { @@ -407,7 +403,8 @@ func (p *planner) Run(ctx context.Context, input *adk.AgentInput, } else { planJSON = msg.Content } - plan, err := p.planParser(ctx, planJSON) + plan := p.newPlan(ctx) + err = plan.UnmarshalJSON([]byte(planJSON)) if err != nil { err = fmt.Errorf("unmarshal plan error: %w", err) generator.Send(&adk.AgentEvent{Err: err}) @@ -451,16 +448,16 @@ func NewPlanner(_ context.Context, cfg *PlannerConfig) (adk.Agent, error) { inputFn = defaultGenPlannerInputFn } - planParser := cfg.PlanParser + planParser := cfg.NewPlan if planParser == nil { - planParser = defaultPlanParserFn + planParser = defaultNewPlan } return &planner{ toolCall: toolCall, chatModel: chatModel, genInputFn: inputFn, - planParser: planParser, + newPlan: planParser, }, nil } @@ -553,11 +550,16 @@ func NewExecutor(ctx context.Context, cfg *ExecutorConfig) (adk.Agent, error) { func defaultGenExecutorInputFn(ctx context.Context, in *PlanExecuteInput) ([]adk.Message, error) { + planContent, err := in.Plan.MarshalJSON() + if err != nil { + return nil, err + } + userMsgs, err := ExecutorPrompt.Format(ctx, map[string]any{ "input": formatInput(in.Input), - "plan": in.Plan.Description(ctx), + "plan": string(planContent), "executed_steps": formatExecutedSteps(in.ExecutedSteps), - "step": in.Plan.FirstStep(ctx), + "step": in.Plan.FirstStep(), }) if err != nil { return nil, err @@ -572,7 +574,7 @@ type replanner struct { respondTool *schema.ToolInfo genInputFn GenPlanExecuteInputFn - planParser PlanParser + newPlan NewPlan } type ReplannerConfig struct { @@ -593,11 +595,10 @@ type ReplannerConfig struct { // if not provided, buildDefaultReplannerInputFn will be used. GenInputFn GenPlanExecuteInputFn - // PlanParser parses the model-generated plan string into a Plan instance. - // The arguments format of PlanParser depends on the model configuration: - // - ChatModel with PlanTool: uses PlanTool schema for arguments structure - // If not provided, defaultPlanParserFn will be used as the default. - PlanParser PlanParser + // NewPlan creates a new Plan instance. + // The returned Plan will be used to unmarshal the model-generated JSON output from PlanTool. + // If not provided, defaultNewPlan will be used as the default. + NewPlan NewPlan } // formatInput formats the input messages into a string. @@ -641,7 +642,7 @@ func (r *replanner) genInput(ctx context.Context) ([]adk.Message, error) { panic("impossible: plan not found") } plan_ := plan.(Plan) - step := plan_.FirstStep(ctx) + step := plan_.FirstStep() var executedSteps_ []ExecutedStep executedSteps, ok := adk.GetSessionValue(ctx, ExecutedStepsSessionKey) @@ -786,10 +787,11 @@ func (r *replanner) Run(ctx context.Context, input *adk.AgentInput, _ ...adk.Age return } - plan_, err_ := r.planParser(ctx, planMsg.ToolCalls[0].Function.Arguments) - if err_ != nil { - err_ = fmt.Errorf("unmarshal plan error: %w", err_) - generator.Send(&adk.AgentEvent{Err: err_}) + plan_ := r.newPlan(ctx) + err = plan_.UnmarshalJSON([]byte(planMsg.ToolCalls[0].Function.Arguments)) + if err != nil { + err = fmt.Errorf("unmarshal plan error: %w", err) + generator.Send(&adk.AgentEvent{Err: err}) return } @@ -801,8 +803,12 @@ func (r *replanner) Run(ctx context.Context, input *adk.AgentInput, _ ...adk.Age func buildDefaultReplannerInputFn(in *PlanExecuteInput, planToolName, respondToolName string) GenPlanExecuteInputFn { return func(ctx context.Context, in *PlanExecuteInput) ([]adk.Message, error) { + planContent, err := in.Plan.MarshalJSON() + if err != nil { + return nil, err + } msgs, err := ReplannerPrompt.Format(ctx, map[string]any{ - "plan": in.Plan.Description(ctx), + "plan": string(planContent), "input": formatInput(in.Input), "executed_steps": formatExecutedSteps(in.ExecutedSteps), "plan_tool": planToolName, @@ -832,9 +838,9 @@ func NewReplanner(_ context.Context, cfg *ReplannerConfig) (adk.Agent, error) { return nil, err } - planParser := cfg.PlanParser + planParser := cfg.NewPlan if planParser == nil { - planParser = defaultPlanParserFn + planParser = defaultNewPlan } return &replanner{ @@ -842,7 +848,7 @@ func NewReplanner(_ context.Context, cfg *ReplannerConfig) (adk.Agent, error) { planTool: planTool, respondTool: respondTool, genInputFn: cfg.GenInputFn, - planParser: planParser, + newPlan: planParser, }, nil } diff --git a/adk/prebuilt/plan_execute_test.go b/adk/prebuilt/plan_execute_test.go index 7586f93c..423afe42 100644 --- a/adk/prebuilt/plan_execute_test.go +++ b/adk/prebuilt/plan_execute_test.go @@ -127,7 +127,7 @@ func TestPlannerRunWithFormattedOutput(t *testing.T) { event, ok = iterator.Next() assert.False(t, ok) - plan, err := defaultPlanParserFn(ctx, msg.Content) + plan, err := defaultNewPlan(ctx, msg.Content) assert.NoError(t, err) plan_ := plan.(*executionPlan) assert.Equal(t, 3, len(plan_.Steps_)) @@ -193,7 +193,7 @@ func TestPlannerRunWithToolCalling(t *testing.T) { _, ok = iterator.Next() assert.False(t, ok) - plan, err := defaultPlanParserFn(ctx, msg.Content) + plan, err := defaultNewPlan(ctx, msg.Content) assert.NoError(t, err) plan_ := plan.(*executionPlan) assert.NoError(t, err) From e329e9eed02f67ea9cadbccc4444782aa98b9212 Mon Sep 17 00:00:00 2001 From: lipandeng Date: Mon, 1 Sep 2025 16:29:17 +0800 Subject: [PATCH 09/12] feat: fix ut --- adk/prebuilt/plan_execute_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/adk/prebuilt/plan_execute_test.go b/adk/prebuilt/plan_execute_test.go index 423afe42..1ac58cd6 100644 --- a/adk/prebuilt/plan_execute_test.go +++ b/adk/prebuilt/plan_execute_test.go @@ -127,7 +127,8 @@ func TestPlannerRunWithFormattedOutput(t *testing.T) { event, ok = iterator.Next() assert.False(t, ok) - plan, err := defaultNewPlan(ctx, msg.Content) + plan := defaultNewPlan(ctx) + err = plan.UnmarshalJSON([]byte(msg.Content)) assert.NoError(t, err) plan_ := plan.(*executionPlan) assert.Equal(t, 3, len(plan_.Steps_)) @@ -193,7 +194,8 @@ func TestPlannerRunWithToolCalling(t *testing.T) { _, ok = iterator.Next() assert.False(t, ok) - plan, err := defaultNewPlan(ctx, msg.Content) + plan := defaultNewPlan(ctx) + err = plan.UnmarshalJSON([]byte(msg.Content)) assert.NoError(t, err) plan_ := plan.(*executionPlan) assert.NoError(t, err) From 7ec2e22ddce964fa61bf769b4c317ba342179c6e Mon Sep 17 00:00:00 2001 From: lipandeng Date: Mon, 1 Sep 2025 16:43:40 +0800 Subject: [PATCH 10/12] feat: adjust comments --- adk/prebuilt/plan_execute.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/adk/prebuilt/plan_execute.go b/adk/prebuilt/plan_execute.go index e4c30065..aabe5123 100644 --- a/adk/prebuilt/plan_execute.go +++ b/adk/prebuilt/plan_execute.go @@ -35,12 +35,14 @@ import ( ) // Plan represents an execution plan with a sequence of actionable steps. -// It provides methods to access the plan's description and retrieve the first step to execute. +// It supports JSON serialization and provides access to the first step. type Plan interface { // FirstStep returns the first step to be executed in the plan. FirstStep() string + // MarshalJSON marshals the Plan into JSON. json.Marshaler + // UnmarshalJSON unmarshals the JSON into the Plan. json.Unmarshaler } From 7ac7ad84e445fcd0c975491dd4b104290e143acc Mon Sep 17 00:00:00 2001 From: lipandeng Date: Mon, 1 Sep 2025 16:53:04 +0800 Subject: [PATCH 11/12] feat: adjust comments --- adk/prebuilt/plan_execute.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adk/prebuilt/plan_execute.go b/adk/prebuilt/plan_execute.go index aabe5123..98f854f0 100644 --- a/adk/prebuilt/plan_execute.go +++ b/adk/prebuilt/plan_execute.go @@ -46,7 +46,7 @@ type Plan interface { json.Unmarshaler } -// NewPlan parses the plan string generated by Model into a Plan instance. +// NewPlan creates a new Plan instance. type NewPlan func(ctx context.Context) Plan // executionPlan the default implementation of Plan. From 63797058268fb0ab2d2d4633a4533b461c8ccd09 Mon Sep 17 00:00:00 2001 From: lipandeng Date: Mon, 1 Sep 2025 19:23:54 +0800 Subject: [PATCH 12/12] feat: adjust code --- adk/prebuilt/plan_execute.go | 22 +++++++------- adk/prebuilt/plan_execute_test.go | 48 +++++++++++++++---------------- 2 files changed, 35 insertions(+), 35 deletions(-) diff --git a/adk/prebuilt/plan_execute.go b/adk/prebuilt/plan_execute.go index 98f854f0..c6515465 100644 --- a/adk/prebuilt/plan_execute.go +++ b/adk/prebuilt/plan_execute.go @@ -49,7 +49,7 @@ type Plan interface { // NewPlan creates a new Plan instance. type NewPlan func(ctx context.Context) Plan -// executionPlan the default implementation of Plan. +// defaultPlan the default implementation of Plan. // // JSON Schema: // @@ -66,26 +66,26 @@ type NewPlan func(ctx context.Context) Plan // }, // "required": ["steps"] // } -type executionPlan struct { +type defaultPlan struct { // Steps contains the ordered list of actions to be taken. // Each step should be clear, actionable, and arranged in a logical sequence. - Steps_ []string `json:"steps"` + Steps []string `json:"steps"` } -func (p *executionPlan) FirstStep() string { - if len(p.Steps_) == 0 { +func (p *defaultPlan) FirstStep() string { + if len(p.Steps) == 0 { return "" } - return p.Steps_[0] + return p.Steps[0] } -func (p *executionPlan) MarshalJSON() ([]byte, error) { - type planTyp executionPlan +func (p *defaultPlan) MarshalJSON() ([]byte, error) { + type planTyp defaultPlan return sonic.Marshal((*planTyp)(p)) } -func (p *executionPlan) UnmarshalJSON(bytes []byte) error { - type planTyp executionPlan +func (p *defaultPlan) UnmarshalJSON(bytes []byte) error { + type planTyp defaultPlan return sonic.Unmarshal(bytes, (*planTyp)(p)) } @@ -281,7 +281,7 @@ type PlannerInput struct { type GenPlannerInputFn func(ctx context.Context, in *PlannerInput) ([]adk.Message, error) func defaultNewPlan(ctx context.Context) Plan { - return &executionPlan{} + return &defaultPlan{} } func defaultGenPlannerInputFn(ctx context.Context, in *PlannerInput) ([]adk.Message, error) { diff --git a/adk/prebuilt/plan_execute_test.go b/adk/prebuilt/plan_execute_test.go index 1ac58cd6..120fbd47 100644 --- a/adk/prebuilt/plan_execute_test.go +++ b/adk/prebuilt/plan_execute_test.go @@ -130,11 +130,11 @@ func TestPlannerRunWithFormattedOutput(t *testing.T) { plan := defaultNewPlan(ctx) err = plan.UnmarshalJSON([]byte(msg.Content)) assert.NoError(t, err) - plan_ := plan.(*executionPlan) - assert.Equal(t, 3, len(plan_.Steps_)) - assert.Equal(t, "Step 1", plan_.Steps_[0]) - assert.Equal(t, "Step 2", plan_.Steps_[1]) - assert.Equal(t, "Step 3", plan_.Steps_[2]) + plan_ := plan.(*defaultPlan) + assert.Equal(t, 3, len(plan_.Steps)) + assert.Equal(t, "Step 1", plan_.Steps[0]) + assert.Equal(t, "Step 2", plan_.Steps[1]) + assert.Equal(t, "Step 3", plan_.Steps[2]) } // TestPlannerRunWithToolCalling tests the Run method of a planner created with ToolCallingChatModel @@ -197,12 +197,12 @@ func TestPlannerRunWithToolCalling(t *testing.T) { plan := defaultNewPlan(ctx) err = plan.UnmarshalJSON([]byte(msg.Content)) assert.NoError(t, err) - plan_ := plan.(*executionPlan) + plan_ := plan.(*defaultPlan) assert.NoError(t, err) - assert.Equal(t, 3, len(plan_.Steps_)) - assert.Equal(t, "Step 1", plan_.Steps_[0]) - assert.Equal(t, "Step 2", plan_.Steps_[1]) - assert.Equal(t, "Step 3", plan_.Steps_[2]) + assert.Equal(t, 3, len(plan_.Steps)) + assert.Equal(t, "Step 1", plan_.Steps[0]) + assert.Equal(t, "Step 2", plan_.Steps[1]) + assert.Equal(t, "Step 3", plan_.Steps[2]) } // TestNewExecutor tests the NewExecutor function @@ -244,7 +244,7 @@ func TestExecutorRun(t *testing.T) { mockToolCallingModel := mockModel.NewMockToolCallingChatModel(ctrl) // Store a plan in the session - plan := &executionPlan{Steps_: []string{"Step 1", "Step 2", "Step 3"}} + plan := &defaultPlan{Steps: []string{"Step 1", "Step 2", "Step 3"}} adk.SetSessionValue(ctx, PlanSessionKey, plan) // Set up expectations for the mock model @@ -388,7 +388,7 @@ func TestReplannerRunWithPlan(t *testing.T) { assert.NoError(t, err) // Store necessary values in the session - plan := &executionPlan{Steps_: []string{"Step 1", "Step 2", "Step 3"}} + plan := &defaultPlan{Steps: []string{"Step 1", "Step 2", "Step 3"}} rp, err = AgentWithSessionKVs(ctx, rp, map[string]any{ PlanSessionKey: plan, ExecutedStepSessionKey: "Execution result", @@ -416,11 +416,11 @@ func TestReplannerRunWithPlan(t *testing.T) { // Verify the updated plan was stored in the session planValue, ok := kvs[PlanSessionKey] assert.True(t, ok) - updatedPlan, ok := planValue.(*executionPlan) + updatedPlan, ok := planValue.(*defaultPlan) assert.True(t, ok) - assert.Equal(t, 2, len(updatedPlan.Steps_)) - assert.Equal(t, "Updated Step 1", updatedPlan.Steps_[0]) - assert.Equal(t, "Updated Step 2", updatedPlan.Steps_[1]) + assert.Equal(t, 2, len(updatedPlan.Steps)) + assert.Equal(t, "Updated Step 1", updatedPlan.Steps[0]) + assert.Equal(t, "Updated Step 2", updatedPlan.Steps[1]) // Verify the execute results were updated executeResultsValue, ok := kvs[ExecutedStepsSessionKey] @@ -487,7 +487,7 @@ func TestReplannerRunWithRespond(t *testing.T) { assert.NoError(t, err) // Store necessary values in the session - plan := &executionPlan{Steps_: []string{"Step 1", "Step 2", "Step 3"}} + plan := &defaultPlan{Steps: []string{"Step 1", "Step 2", "Step 3"}} rp, err = AgentWithSessionKVs(ctx, rp, map[string]any{ PlanSessionKey: plan, ExecutedStepSessionKey: "Execution result", @@ -576,9 +576,9 @@ func TestPlanExecuteAgentWithReplan(t *testing.T) { mockReplanner.EXPECT().Description(gomock.Any()).Return("a replanner agent").AnyTimes() // Create a plan - originalPlan := &executionPlan{Steps_: []string{"Step 1", "Step 2", "Step 3"}} + originalPlan := &defaultPlan{Steps: []string{"Step 1", "Step 2", "Step 3"}} // Create an updated plan with fewer steps (after replanning) - updatedPlan := &executionPlan{Steps_: []string{"Updated Step 2", "Updated Step 3"}} + updatedPlan := &defaultPlan{Steps: []string{"Updated Step 2", "Updated Step 3"}} // Create execute result originalExecuteResult := "Execution result for Step 1" updatedExecuteResult := "Execution result for Updated Step 2" @@ -614,10 +614,10 @@ func TestPlanExecuteAgentWithReplan(t *testing.T) { iterator, generator := adk.NewAsyncIteratorPair[*adk.AgentEvent]() plan, _ := adk.GetSessionValue(ctx, PlanSessionKey) - currentPlan := plan.(*executionPlan) + currentPlan := plan.(*defaultPlan) var msg adk.Message // Check if this is the first replanning (original plan has 3 steps) - if len(currentPlan.Steps_) == 3 { + if len(currentPlan.Steps) == 3 { msg = schema.AssistantMessage(originalExecuteResult, nil) adk.SetSessionValue(ctx, ExecutedStepSessionKey, originalExecuteResult) } else { @@ -640,10 +640,10 @@ func TestPlanExecuteAgentWithReplan(t *testing.T) { // First call: Update the plan // Get the current plan from the session plan, _ := adk.GetSessionValue(ctx, PlanSessionKey) - currentPlan := plan.(*executionPlan) + currentPlan := plan.(*defaultPlan) // Check if this is the first replanning (original plan has 3 steps) - if len(currentPlan.Steps_) == 3 { + if len(currentPlan.Steps) == 3 { // Send a message event with the updated plan planJSON, _ := sonic.MarshalString(updatedPlan) msg := schema.AssistantMessage(planJSON, nil) @@ -653,7 +653,7 @@ func TestPlanExecuteAgentWithReplan(t *testing.T) { // Set the updated plan & execute result in the session adk.SetSessionValue(ctx, PlanSessionKey, updatedPlan) adk.SetSessionValue(ctx, ExecutedStepsSessionKey, []ExecutedStep{{ - Step: currentPlan.Steps_[0], + Step: currentPlan.Steps[0], Result: originalExecuteResult, }}) } else {