diff --git a/.changeset/social-humans-hammer.md b/.changeset/social-humans-hammer.md new file mode 100644 index 000000000..4daa9646b --- /dev/null +++ b/.changeset/social-humans-hammer.md @@ -0,0 +1,217 @@ +--- +"@voltagent/core": patch +--- + +feat: add workflow control steps (branch, foreach, loop, map, sleep) + +```ts +import { + createWorkflowChain, + andThen, + andBranch, + andForEach, + andDoWhile, + andDoUntil, + andMap, + andSleep, + andSleepUntil, +} from "@voltagent/core"; +import { z } from "zod"; +``` + +Branching: + +```ts +const workflow = createWorkflowChain({ + id: "branching-flow", + input: z.object({ amount: z.number() }), +}).andBranch({ + id: "rules", + branches: [ + { + condition: ({ data }) => data.amount > 1000, + step: andThen({ + id: "flag-large", + execute: async ({ data }) => ({ ...data, large: true }), + }), + }, + { + condition: ({ data }) => data.amount < 0, + step: andThen({ + id: "flag-invalid", + execute: async ({ data }) => ({ ...data, invalid: true }), + }), + }, + ], +}); +``` + +For-each and loops: + +```ts +createWorkflowChain({ + id: "batch-process", + input: z.array(z.number()), +}).andForEach({ + id: "double-each", + concurrency: 2, + step: andThen({ + id: "double", + execute: async ({ data }) => data * 2, + }), +}); + +createWorkflowChain({ + id: "looping-flow", + input: z.number(), +}) + .andDoWhile({ + id: "increment-until-3", + step: andThen({ + id: "increment", + execute: async ({ data }) => data + 1, + }), + condition: ({ data }) => data < 3, + }) + .andDoUntil({ + id: "increment-until-2", + step: andThen({ + id: "increment-until", + execute: async ({ data }) => data + 1, + }), + condition: ({ data }) => data >= 2, + }); +``` + +Data shaping: + +```ts +createWorkflowChain({ + id: "compose-result", + input: z.object({ userId: z.string() }), +}) + .andThen({ + id: "fetch-user", + execute: async ({ data }) => ({ name: "Ada", id: data.userId }), + }) + .andMap({ + id: "shape-output", + map: { + userId: { source: "data", path: "userId" }, + name: { source: "step", stepId: "fetch-user", path: "name" }, + region: { source: "context", key: "region" }, + constant: { source: "value", value: "ok" }, + }, + }); +``` + +Sleep: + +```ts +createWorkflowChain({ + id: "delayed-step", + input: z.object({ id: z.string() }), +}) + .andSleep({ + id: "pause", + duration: 500, + }) + .andSleepUntil({ + id: "wait-until", + date: () => new Date(Date.now() + 60_000), + }) + .andThen({ + id: "continue", + execute: async ({ data }) => ({ ...data, resumed: true }), + }); +``` + +Workflow-level retries: + +```ts +createWorkflowChain({ + id: "retry-defaults", + retryConfig: { attempts: 2, delayMs: 500 }, +}) + .andThen({ + id: "fetch-user", + execute: async ({ data }) => fetchUser(data.userId), + }) + .andThen({ + id: "no-retry-step", + retries: 0, + execute: async ({ data }) => data, + }); +``` + +Workflow hooks (finish/error/suspend): + +```ts +createWorkflowChain({ + id: "hooked-workflow", + hooks: { + onSuspend: async (info) => { + console.log("Suspended:", info.suspension?.reason); + }, + onError: async (info) => { + console.error("Failed:", info.error); + }, + onFinish: async (info) => { + console.log("Done:", info.status); + }, + onEnd: async (state, info) => { + if (info?.status === "completed") { + console.log("Result:", state.result); + console.log("Steps:", Object.keys(info.steps)); + } + }, + }, +}); +``` + +Workflow guardrails (input/output + step-level): + +```ts +import { + andGuardrail, + andThen, + createInputGuardrail, + createOutputGuardrail, + createWorkflowChain, +} from "@voltagent/core"; +import { z } from "zod"; + +const trimInput = createInputGuardrail({ + name: "trim", + handler: async ({ input }) => ({ + pass: true, + action: "modify", + modifiedInput: typeof input === "string" ? input.trim() : input, + }), +}); + +const redactOutput = createOutputGuardrail({ + name: "redact", + handler: async ({ output }) => ({ + pass: true, + action: "modify", + modifiedOutput: output.replace(/[0-9]/g, "*"), + }), +}); + +createWorkflowChain({ + id: "guarded-workflow", + input: z.string(), + result: z.string(), + inputGuardrails: [trimInput], + outputGuardrails: [redactOutput], +}) + .andGuardrail({ + id: "sanitize-step", + outputGuardrails: [redactOutput], + }) + .andThen({ + id: "finish", + execute: async ({ data }) => data, + }); +``` diff --git a/examples/with-workflow/src/index.ts b/examples/with-workflow/src/index.ts index a7af3c036..d836701ac 100644 --- a/examples/with-workflow/src/index.ts +++ b/examples/with-workflow/src/index.ts @@ -1,5 +1,13 @@ import { openai } from "@ai-sdk/openai"; -import { Agent, VoltAgent, createWorkflowChain } from "@voltagent/core"; +import { + Agent, + VoltAgent, + andGuardrail, + andThen, + createInputGuardrail, + createOutputGuardrail, + createWorkflowChain, +} from "@voltagent/core"; import { createPinoLogger } from "@voltagent/logger"; import { honoServer } from "@voltagent/server-hono"; import { z } from "zod"; @@ -357,6 +365,191 @@ const articleSummarizationWorkflow = createWorkflowChain({ }, ); +// ============================================================================== +// Example 5: Timed Reminder Workflow +// Concepts: andSleep, andSleepUntil +// ============================================================================== +const timedReminderWorkflow = createWorkflowChain({ + id: "timed-reminder", + name: "Timed Reminder Workflow", + purpose: "Pause briefly, then align to a scheduled time before completing", + input: z.object({ + userId: z.string(), + waitMs: z.number().default(200), + }), + result: z.object({ + userId: z.string(), + status: z.enum(["sent"]), + resumedAt: z.string(), + }), +}) + .andSleep({ + id: "pause-briefly", + duration: ({ data }) => Math.max(0, data.waitMs), + }) + .andSleepUntil({ + id: "align-to-next-second", + date: () => new Date(Date.now() + 1000), + }) + .andThen({ + id: "complete-reminder", + execute: async ({ data }) => ({ + userId: data.userId, + status: "sent", + resumedAt: new Date().toISOString(), + }), + }); + +// ============================================================================== +// Example 6: Batch Transform Workflow +// Concepts: andForEach, andMap +// ============================================================================== +const batchTransformWorkflow = createWorkflowChain({ + id: "batch-transform", + name: "Batch Transform Workflow", + purpose: "Process arrays with for-each and map steps", + input: z.array(z.number()), + result: z.object({ + original: z.array(z.number()), + doubled: z.array(z.number()), + count: z.number(), + total: z.number(), + }), +}) + .andForEach({ + id: "double-each", + step: andThen({ + id: "double", + execute: async ({ data }) => data * 2, + }), + concurrency: 2, + }) + .andMap({ + id: "summarize-results", + map: { + original: { source: "input" }, + doubled: { source: "data" }, + count: { + source: "fn", + fn: ({ data }) => (Array.isArray(data) ? data.length : 0), + }, + total: { + source: "fn", + fn: ({ data }) => (Array.isArray(data) ? data.reduce((sum, value) => sum + value, 0) : 0), + }, + }, + }); + +// ============================================================================== +// Example 7: Loop + Branch Workflow +// Concepts: andDoWhile, andDoUntil, andBranch +// ============================================================================== +const loopAndBranchWorkflow = createWorkflowChain({ + id: "loop-and-branch", + name: "Loop + Branch Workflow", + purpose: "Demonstrate loops and branching with a simple counter", + input: z.object({ + counter: z.number().default(0), + }), + result: z.object({ + counter: z.number(), + label: z.enum(["ready", "warmup"]), + }), +}) + .andDoWhile({ + id: "warmup-loop", + step: andThen({ + id: "increment-warmup", + execute: async ({ data }) => ({ ...data, counter: data.counter + 1 }), + }), + condition: ({ data }) => data.counter < 1, + }) + .andDoUntil({ + id: "retry-loop", + step: andThen({ + id: "increment-retry", + execute: async ({ data }) => ({ ...data, counter: data.counter + 1 }), + }), + condition: ({ data }) => data.counter >= 3, + }) + .andBranch({ + id: "categorize-counter", + branches: [ + { + condition: ({ data }) => data.counter >= 3, + step: andThen({ + id: "mark-ready", + execute: async ({ data }) => ({ ...data, label: "ready" as const }), + }), + }, + { + condition: ({ data }) => data.counter < 3, + step: andThen({ + id: "mark-warmup", + execute: async ({ data }) => ({ ...data, label: "warmup" as const }), + }), + }, + ], + }) + .andThen({ + id: "select-branch", + execute: async ({ data }) => { + const results = Array.isArray(data) ? data : []; + const selected = results.find((entry) => entry !== undefined) as + | { counter: number; label: "ready" | "warmup" } + | undefined; + + if (!selected) { + return { counter: 0, label: "warmup" }; + } + + return { + counter: selected.counter, + label: selected.label, + }; + }, + }); + +// ============================================================================== +// Example 8: Guardrail Workflow +// Concepts: Workflow-level guardrails, step-level guardrails +// ============================================================================== +const trimInput = createInputGuardrail({ + name: "trim-input", + handler: async ({ input }) => ({ + pass: true, + action: "modify", + modifiedInput: typeof input === "string" ? input.trim() : input, + }), +}); + +const redactNumbers = createOutputGuardrail({ + name: "redact-numbers", + handler: async ({ output }) => ({ + pass: true, + action: "modify", + modifiedOutput: output.replace(/[0-9]/g, "*"), + }), +}); + +const guardrailWorkflow = createWorkflowChain({ + id: "guardrail-workflow", + name: "Guardrail Workflow", + purpose: "Applies guardrails to sanitize inputs and outputs", + input: z.string(), + result: z.string(), + inputGuardrails: [trimInput], + outputGuardrails: [redactNumbers], +}) + .andGuardrail({ + id: "sanitize-step", + outputGuardrails: [redactNumbers], + }) + .andThen({ + id: "finish", + execute: async ({ data }) => data, + }); + // Register workflows with VoltAgent // Create logger @@ -377,5 +570,9 @@ new VoltAgent({ expenseApprovalWorkflow, contentAnalysisWorkflow, articleSummarizationWorkflow, + timedReminderWorkflow, + batchTransformWorkflow, + loopAndBranchWorkflow, + guardrailWorkflow, }, }); diff --git a/packages/core/src/agent/guardrail.ts b/packages/core/src/agent/guardrail.ts index f4738c8a2..dc5c678e5 100644 --- a/packages/core/src/agent/guardrail.ts +++ b/packages/core/src/agent/guardrail.ts @@ -162,8 +162,8 @@ export function normalizeInputGuardrailList( ); } -export function normalizeOutputGuardrailList( - guardrails: OutputGuardrail[], +export function normalizeOutputGuardrailList( + guardrails: OutputGuardrail[], startIndex = 0, ): NormalizedOutputGuardrail[] { return guardrails.map((guardrail, index) => { diff --git a/packages/core/src/agent/types.ts b/packages/core/src/agent/types.ts index 51d1ec32d..c142e850b 100644 --- a/packages/core/src/agent/types.ts +++ b/packages/core/src/agent/types.ts @@ -492,7 +492,8 @@ export type AgentEvalOperationType = | "generateText" | "streamText" | "generateObject" - | "streamObject"; + | "streamObject" + | "workflow"; export interface AgentEvalPayload { operationId: string; diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 3f21504ba..1a0c49c94 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -8,6 +8,14 @@ export { andAll, andRace, andTap, + andGuardrail, + andSleep, + andSleepUntil, + andForEach, + andBranch, + andDoWhile, + andDoUntil, + andMap, andWorkflow, } from "./workflow"; export type { diff --git a/packages/core/src/test-utils/mocks/workflows.ts b/packages/core/src/test-utils/mocks/workflows.ts index 6600fae94..bf0ac7208 100644 --- a/packages/core/src/test-utils/mocks/workflows.ts +++ b/packages/core/src/test-utils/mocks/workflows.ts @@ -33,5 +33,9 @@ export function createMockWorkflowExecuteContext( fatal: vi.fn(), child: vi.fn(), }, + writer: overrides.writer ?? { + write: vi.fn(), + pipeFrom: vi.fn(), + }, }; } diff --git a/packages/core/src/utils/node-utils.ts b/packages/core/src/utils/node-utils.ts index 0eb9521f0..3efba84ee 100644 --- a/packages/core/src/utils/node-utils.ts +++ b/packages/core/src/utils/node-utils.ts @@ -17,9 +17,17 @@ export enum NodeType { WORKFLOW_STEP = "workflow_step", WORKFLOW_AGENT_STEP = "workflow_agent_step", WORKFLOW_FUNC_STEP = "workflow_func_step", + WORKFLOW_TAP_STEP = "workflow_tap_step", + WORKFLOW_WORKFLOW_STEP = "workflow_workflow_step", WORKFLOW_CONDITIONAL_STEP = "workflow_conditional_step", WORKFLOW_PARALLEL_ALL_STEP = "workflow_parallel_all_step", WORKFLOW_PARALLEL_RACE_STEP = "workflow_parallel_race_step", + WORKFLOW_SLEEP_STEP = "workflow_sleep_step", + WORKFLOW_SLEEP_UNTIL_STEP = "workflow_sleep_until_step", + WORKFLOW_FOREACH_STEP = "workflow_foreach_step", + WORKFLOW_LOOP_STEP = "workflow_loop_step", + WORKFLOW_BRANCH_STEP = "workflow_branch_step", + WORKFLOW_MAP_STEP = "workflow_map_step", } /** @@ -60,9 +68,17 @@ export const getNodeTypeFromNodeId = (nodeId: string): NodeType | null => { export type WorkflowStepType = | "agent" | "func" + | "tap" + | "workflow" | "conditional-when" | "parallel-all" - | "parallel-race"; + | "parallel-race" + | "sleep" + | "sleep-until" + | "foreach" + | "loop" + | "branch" + | "map"; /** * Create a workflow step node ID with consistent pattern @@ -119,12 +135,28 @@ export const getWorkflowStepNodeType = (stepType: WorkflowStepType): NodeType => return NodeType.WORKFLOW_AGENT_STEP; case "func": return NodeType.WORKFLOW_FUNC_STEP; + case "tap": + return NodeType.WORKFLOW_TAP_STEP; + case "workflow": + return NodeType.WORKFLOW_WORKFLOW_STEP; case "conditional-when": return NodeType.WORKFLOW_CONDITIONAL_STEP; case "parallel-all": return NodeType.WORKFLOW_PARALLEL_ALL_STEP; case "parallel-race": return NodeType.WORKFLOW_PARALLEL_RACE_STEP; + case "sleep": + return NodeType.WORKFLOW_SLEEP_STEP; + case "sleep-until": + return NodeType.WORKFLOW_SLEEP_UNTIL_STEP; + case "foreach": + return NodeType.WORKFLOW_FOREACH_STEP; + case "loop": + return NodeType.WORKFLOW_LOOP_STEP; + case "branch": + return NodeType.WORKFLOW_BRANCH_STEP; + case "map": + return NodeType.WORKFLOW_MAP_STEP; default: return NodeType.WORKFLOW_STEP; } diff --git a/packages/core/src/workflow/chain.ts b/packages/core/src/workflow/chain.ts index 23150b60e..58a90953a 100644 --- a/packages/core/src/workflow/chain.ts +++ b/packages/core/src/workflow/chain.ts @@ -14,12 +14,29 @@ import type { } from "./internal/types"; import { type WorkflowStep, + type WorkflowStepBranchConfig, type WorkflowStepConditionalWhenConfig, + type WorkflowStepForEachConfig, + type WorkflowStepGuardrailConfig, + type WorkflowStepLoopConfig, + type WorkflowStepMapConfig, + type WorkflowStepMapEntry, + type WorkflowStepMapResult, type WorkflowStepParallelAllConfig, type WorkflowStepParallelRaceConfig, + type WorkflowStepSleepConfig, + type WorkflowStepSleepUntilConfig, andAgent, andAll, + andBranch, + andDoUntil, + andDoWhile, + andForEach, + andGuardrail, + andMap, andRace, + andSleep, + andSleepUntil, andTap, andThen, andWhen, @@ -32,6 +49,7 @@ import type { WorkflowExecutionResult, WorkflowInput, WorkflowRunOptions, + WorkflowStepData, WorkflowStepState, WorkflowStreamResult, WorkflowStreamWriter, @@ -193,7 +211,7 @@ export class WorkflowChain< execute: (context: { data: z.infer; state: WorkflowStepState>; - getStepData: (stepId: string) => { input: any; output: any } | undefined; + getStepData: (stepId: string) => WorkflowStepData | undefined; suspend: ( reason?: string, suspendData?: SS extends z.ZodTypeAny ? z.infer : z.infer, @@ -225,7 +243,7 @@ export class WorkflowChain< execute: (context: { data: z.infer; state: WorkflowStepState>; - getStepData: (stepId: string) => { input: any; output: any } | undefined; + getStepData: (stepId: string) => WorkflowStepData | undefined; suspend: ( reason?: string, suspendData?: SS extends z.ZodTypeAny ? z.infer : z.infer, @@ -256,7 +274,7 @@ export class WorkflowChain< execute: (context: { data: CURRENT_DATA; state: WorkflowStepState>; - getStepData: (stepId: string) => { input: any; output: any } | undefined; + getStepData: (stepId: string) => WorkflowStepData | undefined; suspend: ( reason?: string, suspendData?: SS extends z.ZodTypeAny ? z.infer : z.infer, @@ -287,7 +305,7 @@ export class WorkflowChain< execute: (context: { data: CURRENT_DATA; state: WorkflowStepState>; - getStepData: (stepId: string) => { input: any; output: any } | undefined; + getStepData: (stepId: string) => WorkflowStepData | undefined; suspend: ( reason?: string, suspendData?: SS extends z.ZodTypeAny ? z.infer : z.infer, @@ -330,7 +348,7 @@ export class WorkflowChain< execute: (context: { data: CURRENT_DATA; state: WorkflowStepState>; - getStepData: (stepId: string) => { input: any; output: any } | undefined; + getStepData: (stepId: string) => WorkflowStepData | undefined; suspend: (reason?: string, suspendData?: z.infer) => Promise; resumeData?: z.infer; logger: Logger; @@ -376,7 +394,7 @@ export class WorkflowChain< condition: (context: { data: z.infer; state: WorkflowStepState>; - getStepData: (stepId: string) => { input: any; output: any } | undefined; + getStepData: (stepId: string) => WorkflowStepData | undefined; suspend: ( reason?: string, suspendData?: SS extends z.ZodTypeAny ? z.infer : z.infer, @@ -469,7 +487,7 @@ export class WorkflowChain< execute: (context: { data: z.infer; state: WorkflowStepState>; - getStepData: (stepId: string) => { input: any; output: any } | undefined; + getStepData: (stepId: string) => WorkflowStepData | undefined; suspend: ( reason?: string, suspendData?: SS extends z.ZodTypeAny ? z.infer : z.infer, @@ -511,7 +529,7 @@ export class WorkflowChain< execute: (context: { data: CURRENT_DATA; state: WorkflowStepState>; - getStepData: (stepId: string) => { input: any; output: any } | undefined; + getStepData: (stepId: string) => WorkflowStepData | undefined; suspend: (reason?: string, suspendData?: z.infer) => Promise; resumeData?: z.infer; logger: Logger; @@ -531,6 +549,130 @@ export class WorkflowChain< return this; } + /** + * Add a guardrail step to validate or sanitize data + */ + andGuardrail( + config: WorkflowStepGuardrailConfig, CURRENT_DATA>, + ): WorkflowChain { + this.steps.push(andGuardrail(config)); + return this; + } + + /** + * Add a sleep step to the workflow + */ + andSleep( + config: WorkflowStepSleepConfig, CURRENT_DATA>, + ): WorkflowChain { + this.steps.push(andSleep(config)); + return this; + } + + /** + * Add a sleep-until step to the workflow + */ + andSleepUntil( + config: WorkflowStepSleepUntilConfig, CURRENT_DATA>, + ): WorkflowChain { + this.steps.push(andSleepUntil(config)); + return this; + } + + /** + * Add a branching step that runs all matching branches + */ + andBranch( + config: WorkflowStepBranchConfig, CURRENT_DATA, NEW_DATA>, + ): WorkflowChain< + INPUT_SCHEMA, + RESULT_SCHEMA, + Array, + SUSPEND_SCHEMA, + RESUME_SCHEMA + > { + this.steps.push(andBranch(config)); + return this as unknown as WorkflowChain< + INPUT_SCHEMA, + RESULT_SCHEMA, + Array, + SUSPEND_SCHEMA, + RESUME_SCHEMA + >; + } + + /** + * Add a foreach step that runs a step for each item in an array + */ + andForEach( + config: WorkflowStepForEachConfig, ITEM, NEW_DATA>, + ): WorkflowChain { + this.steps.push(andForEach(config)); + return this as unknown as WorkflowChain< + INPUT_SCHEMA, + RESULT_SCHEMA, + NEW_DATA[], + SUSPEND_SCHEMA, + RESUME_SCHEMA + >; + } + + /** + * Add a do-while loop step + */ + andDoWhile( + config: WorkflowStepLoopConfig, CURRENT_DATA, NEW_DATA>, + ): WorkflowChain { + this.steps.push(andDoWhile(config)); + return this as unknown as WorkflowChain< + INPUT_SCHEMA, + RESULT_SCHEMA, + NEW_DATA, + SUSPEND_SCHEMA, + RESUME_SCHEMA + >; + } + + /** + * Add a do-until loop step + */ + andDoUntil( + config: WorkflowStepLoopConfig, CURRENT_DATA, NEW_DATA>, + ): WorkflowChain { + this.steps.push(andDoUntil(config)); + return this as unknown as WorkflowChain< + INPUT_SCHEMA, + RESULT_SCHEMA, + NEW_DATA, + SUSPEND_SCHEMA, + RESUME_SCHEMA + >; + } + + /** + * Add a mapping step to the workflow + */ + andMap< + MAP extends Record, CURRENT_DATA>>, + >( + config: WorkflowStepMapConfig, CURRENT_DATA, MAP>, + ): WorkflowChain< + INPUT_SCHEMA, + RESULT_SCHEMA, + WorkflowStepMapResult, + SUSPEND_SCHEMA, + RESUME_SCHEMA + > { + this.steps.push(andMap(config)); + return this as unknown as WorkflowChain< + INPUT_SCHEMA, + RESULT_SCHEMA, + WorkflowStepMapResult, + SUSPEND_SCHEMA, + RESUME_SCHEMA + >; + } + /** * Add a workflow step to the workflow * diff --git a/packages/core/src/workflow/context.ts b/packages/core/src/workflow/context.ts index b02087c2c..9f632af16 100644 --- a/packages/core/src/workflow/context.ts +++ b/packages/core/src/workflow/context.ts @@ -1,8 +1,9 @@ import type { Span } from "@opentelemetry/api"; import type { Logger } from "@voltagent/internal"; +import type { Agent } from "../agent/agent"; import type { Memory } from "../memory"; import type { WorkflowTraceContext } from "./open-telemetry/trace-context"; -import type { WorkflowStreamWriter } from "./types"; +import type { WorkflowStepData, WorkflowStreamWriter } from "./types"; /** * Context information for a workflow execution @@ -54,7 +55,7 @@ export interface WorkflowExecutionContext { * Map of executed step data (input and output) by step ID * Used for accessing previous step results */ - stepData: Map; + stepData: Map; /** * Current event sequence number for this workflow execution * Used to maintain event ordering even after server restarts @@ -75,6 +76,10 @@ export interface WorkflowExecutionContext { * Manages span hierarchy and attributes for the workflow execution */ traceContext?: WorkflowTraceContext; + /** + * Optional agent instance supplied to workflow guardrails + */ + guardrailAgent?: Agent; /** * Current step span for passing to agents called within workflow steps @@ -89,7 +94,21 @@ export interface WorkflowExecutionContext { export interface WorkflowStepContext { stepId: string; stepIndex: number; - stepType: "agent" | "func" | "conditional-when" | "parallel-all" | "parallel-race"; + stepType: + | "agent" + | "func" + | "conditional-when" + | "parallel-all" + | "parallel-race" + | "tap" + | "workflow" + | "guardrail" + | "sleep" + | "sleep-until" + | "foreach" + | "loop" + | "branch" + | "map"; stepName: string; workflowId: string; executionId: string; diff --git a/packages/core/src/workflow/core.ts b/packages/core/src/workflow/core.ts index 3136511ca..cfe909ba0 100644 --- a/packages/core/src/workflow/core.ts +++ b/packages/core/src/workflow/core.ts @@ -9,6 +9,13 @@ import { type VoltAgentObservability, createVoltAgentObservability } from "../ob import { AgentRegistry } from "../registries/agent-registry"; import { randomUUID } from "../utils/id"; import type { WorkflowExecutionContext } from "./context"; +import { + applyWorkflowInputGuardrails, + applyWorkflowOutputGuardrails, + createWorkflowGuardrailRuntime, + isWorkflowGuardrailInput, + resolveWorkflowGuardrailSets, +} from "./internal/guardrails"; import { createWorkflowStateManager } from "./internal/state"; import type { InternalBaseWorkflowInputSchema } from "./internal/types"; import { @@ -19,6 +26,7 @@ import { import { WorkflowTraceContext } from "./open-telemetry/trace-context"; import { WorkflowRegistry } from "./registry"; import type { WorkflowStep } from "./steps"; +import { waitWithSignal } from "./steps/signal"; import { NoOpWorkflowStreamWriter, @@ -31,6 +39,8 @@ import type { WorkflowCancellationMetadata, WorkflowConfig, WorkflowExecutionResult, + WorkflowHookContext, + WorkflowHookStatus, WorkflowInput, WorkflowResult, WorkflowRunOptions, @@ -627,8 +637,12 @@ export function createWorkflow< result, suspendSchema, resumeSchema, + inputGuardrails: workflowInputGuardrails, + outputGuardrails: workflowOutputGuardrails, + guardrailAgent: workflowGuardrailAgent, memory: workflowMemory, observability: workflowObservability, + retryConfig: workflowRetryConfig, }: WorkflowConfig, ...steps: ReadonlyArray ) { @@ -838,6 +852,11 @@ export function createWorkflow< inputSchema: input, suspendSchema: effectiveSuspendSchema, resumeSchema: effectiveResumeSchema, + retryConfig: workflowRetryConfig, + guardrails: { + inputCount: workflowInputGuardrails?.length ?? 0, + outputCount: workflowOutputGuardrails?.length ?? 0, + }, }; rootSpan.setAttribute("workflow.stateSnapshot", safeStringify(workflowState)); @@ -946,8 +965,35 @@ export function createWorkflow< // Stream writer is always available streamWriter: streamWriter, traceContext: traceContext, + guardrailAgent: options?.guardrailAgent ?? workflowGuardrailAgent, }; + const guardrailSets = resolveWorkflowGuardrailSets({ + inputGuardrails: workflowInputGuardrails, + outputGuardrails: workflowOutputGuardrails, + optionInputGuardrails: options?.inputGuardrails, + optionOutputGuardrails: options?.outputGuardrails, + }); + const hasWorkflowGuardrails = + guardrailSets.input.length > 0 || guardrailSets.output.length > 0; + const workflowGuardrailRuntime = hasWorkflowGuardrails + ? createWorkflowGuardrailRuntime({ + workflowId: id, + workflowName: name, + executionId, + traceContext, + logger: runLogger, + userId: options?.userId, + conversationId: options?.conversationId, + context: contextMap, + guardrailAgent: executionContext.guardrailAgent, + }) + : null; + + if (workflowGuardrailRuntime) { + executionContext.guardrailAgent = workflowGuardrailRuntime.guardrailAgent; + } + // Emit workflow start event emitAndCollectEvent({ type: "workflow-start", @@ -1002,7 +1048,86 @@ export function createWorkflow< executionContext.currentStepIndex = startStepIndex; } + const effectiveRetryConfig = options?.retryConfig ?? workflowRetryConfig; + const workflowRetryLimit = Number.isFinite(effectiveRetryConfig?.attempts) + ? Math.max(0, Math.floor(effectiveRetryConfig?.attempts as number)) + : 0; + const workflowRetryDelayMs = Number.isFinite(effectiveRetryConfig?.delayMs) + ? Math.max(0, Math.floor(effectiveRetryConfig?.delayMs as number)) + : 0; + + const buildHookContext = ( + status: WorkflowHookStatus, + ): WorkflowHookContext, WorkflowResult> => ({ + status, + state: stateManager.state, + result: stateManager.state.result, + error: stateManager.state.error, + suspension: stateManager.state.suspension, + cancellation: stateManager.state.cancellation, + steps: Object.fromEntries( + Array.from(executionContext.stepData.entries()).map(([stepId, data]) => [ + stepId, + { ...data }, + ]), + ), + }); + + const runTerminalHooks = async ( + status: WorkflowHookStatus, + options?: { includeEnd?: boolean }, + ): Promise => { + const hookContext = buildHookContext(status); + const safeHook = async (hookName: string, hook?: () => Promise | void) => { + if (!hook) { + return; + } + + try { + await hook(); + } catch (error) { + runLogger.error("Workflow hook failed", { + hook: hookName, + error: + error instanceof Error ? { message: error.message, stack: error.stack } : error, + }); + } + }; + + if (status === "suspended") { + await safeHook("onSuspend", () => hooks?.onSuspend?.(hookContext)); + } + if (status === "error") { + await safeHook("onError", () => hooks?.onError?.(hookContext)); + } + await safeHook("onFinish", () => hooks?.onFinish?.(hookContext)); + const shouldCallEnd = options?.includeEnd ?? status !== "suspended"; + if (shouldCallEnd) { + await safeHook("onEnd", () => hooks?.onEnd?.(stateManager.state, hookContext)); + } + }; + try { + if (workflowGuardrailRuntime && guardrailSets.input.length > 0) { + if (!isWorkflowGuardrailInput(input)) { + throw new Error( + "Workflow input guardrails require string or message input. Use outputGuardrails or andGuardrail for structured data.", + ); + } + + const guardrailedInput = (await applyWorkflowInputGuardrails( + input, + guardrailSets.input, + workflowGuardrailRuntime, + )) as WorkflowInput; + + if (options?.resumeFrom) { + resumeInputData = guardrailedInput; + } else { + stateManager.update({ data: guardrailedInput }); + } + } + for (const [index, step] of (steps as BaseStep[]).entries()) { // Skip already completed steps when resuming if (index < startStepIndex) { @@ -1013,6 +1138,9 @@ export function createWorkflow< } const stepName = step.name || step.id || `Step ${index + 1}`; + const stepRetryLimit = Number.isFinite(step.retries) + ? Math.max(0, Math.floor(step.retries as number)) + : workflowRetryLimit; executionContext.currentStepIndex = index; @@ -1032,6 +1160,8 @@ export function createWorkflow< const stepData = executionContext.stepData.get(step.id); if (stepData) { stepData.output = stateManager.state.data; + stepData.status = "cancelled"; + stepData.error = null; } emitAndCollectEvent({ @@ -1098,7 +1228,7 @@ export function createWorkflow< }, ); - await hooks?.onEnd?.(stateManager.state); + await runTerminalHooks("cancelled"); return createWorkflowExecutionResult( id, @@ -1253,13 +1383,12 @@ export function createWorkflow< ); } - const stepSpan = traceContext.createStepSpan(index, step.type, stepName, { - stepId: step.id, - input: stateManager.state.data, - attributes: { - "workflow.step.function": step.execute?.name, - }, - }); + const baseStepSpanAttributes = { + "workflow.step.function": step.execute?.name, + ...(stepRetryLimit > 0 && { "workflow.step.retries": stepRetryLimit }), + ...(workflowRetryLimit > 0 && { "workflow.retry.attempts": workflowRetryLimit }), + ...(workflowRetryDelayMs > 0 && { "workflow.retry.delay_ms": workflowRetryDelayMs }), + }; // Create stream writer for this step - real one for streaming, no-op for regular execution const stepWriter = streamController @@ -1295,7 +1424,9 @@ export function createWorkflow< // Store step input data before execution executionContext.stepData.set(step.id, { input: stateManager.state.data, - output: null, + output: undefined, + status: "running", + error: null, }); // Log step start with context @@ -1330,232 +1461,347 @@ export function createWorkflow< throw new Error("WORKFLOW_SUSPENDED"); }; - try { - // Create execution context for the step with typed suspend function - const typedSuspendFn = ( - reason?: string, - suspendData?: z.infer, - ) => suspendFn(reason, suspendData); - - // Only pass resumeData if we're on the step that was suspended and we have resume input - const isResumingThisStep = - options?.resumeFrom && index === startStepIndex && resumeInputData !== undefined; - - // Update stream writer for this specific step - executionContext.streamWriter = streamController - ? new WorkflowStreamWriterImpl( - streamController, - executionId, - step.id, - step.name || step.id, - index, - options?.context, - ) - : new NoOpWorkflowStreamWriter(); - - // Create a modified execution context with the current step span - const stepExecutionContext = { - ...executionContext, - currentStepSpan: stepSpan, // Add the current step span for agent integration - }; + const handleStepSuspension = async ( + span: ReturnType, + suspensionReason: string, + ): Promise> => { + runLogger.debug(`Step ${index} suspended during execution`); - const stepContext = createStepExecutionContext< - WorkflowInput, - typeof stateManager.state.data, - z.infer, - z.infer - >( - stateManager.state.data, - convertWorkflowStateToParam( - stateManager.state, - stepExecutionContext, - options?.suspendController?.signal, - ), - stepExecutionContext, - typedSuspendFn, - isResumingThisStep ? resumeInputData : undefined, - ); - // Execute step within span context with automatic signal checking for immediate suspension - const result = await traceContext.withSpan(stepSpan, async () => { - return await executeWithSignalCheck( - () => step.execute(stepContext), - options?.suspendController?.signal, - options?.suspensionMode === "immediate" ? 50 : 500, // Check more frequently in immediate mode - ); + // End step span as suspended with reason + traceContext.endStepSpan(span, "suspended", { + suspensionReason, }); - // Update step output data after successful execution - const stepData = executionContext.stepData.get(step.id); - if (stepData) { - stepData.output = result; - } + // Get suspend data if provided + const suspendData = executionContext.context.get("suspendData"); - // Check if the step was skipped (for conditional steps) - // For conditional-when steps, if the output equals the input, the condition wasn't met - const isSkipped = - step.type === "conditional-when" && result === stateManager.state.data; + const suspensionMetadata = stateManager.suspend( + suspensionReason, + { + stepExecutionState: stateManager.state.data, + completedStepsData: Array.from({ length: index }, (_, i) => i), + }, + index, // Current step that was suspended + executionContext.eventSequence, // Pass current event sequence + ); - stateManager.update({ - data: result, - result: result, - }); + // Add suspend data to suspension metadata if provided + if (suspendData !== undefined && suspensionMetadata) { + (suspensionMetadata as WorkflowSuspensionMetadata).suspendData = suspendData; + } - // End step span with appropriate status - if (isSkipped) { - traceContext.endStepSpan(stepSpan, "skipped", { - output: result, - skippedReason: "Condition not met", - }); - } else { - traceContext.endStepSpan(stepSpan, "completed", { - output: result, - }); + const stepData = executionContext.stepData.get(step.id); + if (stepData) { + stepData.output = stateManager.state.data; + stepData.status = "suspended"; + stepData.error = null; } - // Log step completion with context - runLogger.debug( - `Step ${index + 1} ${isSkipped ? "skipped" : "completed"}: ${stepName} | type=${step.type}`, - { - stepIndex: index, - stepType: step.type, - stepName, - output: result !== undefined ? result : null, - skipped: isSkipped, - }, - ); + runLogger.debug(`Workflow suspended at step ${index}`, suspensionMetadata); - // Emit step complete event + // Emit suspension event to stream emitAndCollectEvent({ - type: "step-complete", + type: "workflow-suspended", executionId, - from: stepName, + from: step.name || step.id, input: stateManager.state.data, - output: result, - status: "success", + output: undefined, + status: "suspended", context: options?.context, timestamp: new Date().toISOString(), stepIndex: index, - stepType: step.type as any, metadata: { - displayName: `Step ${index + 1}: ${stepName}`, + reason: suspensionReason, + suspendData, + suspension: suspensionMetadata, }, }); - await hooks?.onStepEnd?.(stateManager.state); - } catch (stepError) { - if (stepError instanceof Error && stepError.message === "WORKFLOW_CANCELLED") { - const cancellationReason = resolveCancellationReason(); - return completeCancellation(stepSpan, cancellationReason); + // Record suspension in trace + traceContext.recordSuspension( + index, + suspensionReason, + suspendData, + suspensionMetadata?.checkpoint, + ); + + // End root span as suspended + traceContext.end("suspended"); + + // Ensure spans are flushed (critical for serverless environments) + await observability.flushOnFinish(); + + // Save suspension state to workflow's own Memory V2 + try { + await saveSuspensionState( + suspensionMetadata, + executionId, + effectiveMemory, + runLogger, + collectedEvents, + ); + } catch (_) { + // Error already logged in saveSuspensionState, don't throw } - // Check if this is a suspension, not an error - if (stepError instanceof Error && stepError.message === "WORKFLOW_SUSPENDED") { - runLogger.debug(`Step ${index} suspended during execution`); + runLogger.trace(`Workflow execution suspended: ${executionContext.executionId}`); - // Handle suspension - const suspensionReason = - options?.suspendController?.getReason() || "Step suspended during execution"; + await runTerminalHooks("suspended", { includeEnd: false }); - // End step span as suspended with reason - traceContext.endStepSpan(stepSpan, "suspended", { - suspensionReason, - }); + // Return suspended state without throwing + // Don't close the stream when suspended - it will continue after resume + return createWorkflowExecutionResult( + id, + executionId, + stateManager.state.startAt, + new Date(), + "suspended", + null, + stateManager.state.usage, + stateManager.state.suspension, + stateManager.state.cancellation, + undefined, + effectiveResumeSchema, + ); + }; - // Get suspend data if provided - const suspendData = executionContext.context.get("suspendData"); + let retryCount = 0; + while (true) { + const stepData = executionContext.stepData.get(step.id); + if (stepData) { + stepData.status = "running"; + stepData.error = null; + } - const suspensionMetadata = stateManager.suspend( - suspensionReason, - { - stepExecutionState: stateManager.state.data, - completedStepsData: Array.from({ length: index }, (_, i) => i), - }, - index, // Current step that was suspended - executionContext.eventSequence, // Pass current event sequence + const attemptSpan = traceContext.createStepSpan(index, step.type, stepName, { + stepId: step.id, + input: stateManager.state.data, + attributes: { + ...baseStepSpanAttributes, + ...(stepRetryLimit > 0 && { "workflow.step.retry.count": retryCount }), + }, + }); + try { + // Create execution context for the step with typed suspend function + const typedSuspendFn = ( + reason?: string, + suspendData?: z.infer, + ) => suspendFn(reason, suspendData); + + // Only pass resumeData if we're on the step that was suspended and we have resume input + const isResumingThisStep = + options?.resumeFrom && index === startStepIndex && resumeInputData !== undefined; + + // Update stream writer for this specific step + executionContext.streamWriter = streamController + ? new WorkflowStreamWriterImpl( + streamController, + executionId, + step.id, + step.name || step.id, + index, + options?.context, + ) + : new NoOpWorkflowStreamWriter(); + + // Create a modified execution context with the current step span + const stepExecutionContext = { + ...executionContext, + currentStepSpan: attemptSpan, // Add the current step span for agent integration + }; + + const stepContext = createStepExecutionContext< + WorkflowInput, + typeof stateManager.state.data, + z.infer, + z.infer + >( + stateManager.state.data, + convertWorkflowStateToParam( + stateManager.state, + stepExecutionContext, + options?.suspendController?.signal, + ), + stepExecutionContext, + typedSuspendFn, + isResumingThisStep ? resumeInputData : undefined, + retryCount, ); + // Execute step within span context with automatic signal checking for immediate suspension + const result = await traceContext.withSpan(attemptSpan, async () => { + return await executeWithSignalCheck( + () => step.execute(stepContext), + options?.suspendController?.signal, + options?.suspensionMode === "immediate" ? 50 : 500, // Check more frequently in immediate mode + ); + }); + + // Check if the step was skipped (for conditional steps) + // For conditional-when steps, if the output equals the input, the condition wasn't met + const isSkipped = + step.type === "conditional-when" && result === stateManager.state.data; + + // Update step output data after successful execution + const stepData = executionContext.stepData.get(step.id); + if (stepData) { + stepData.output = result; + stepData.status = isSkipped ? "skipped" : "success"; + stepData.error = null; + } + + stateManager.update({ + data: result, + result: result, + }); - // Add suspend data to suspension metadata if provided - if (suspendData !== undefined && suspensionMetadata) { - (suspensionMetadata as WorkflowSuspensionMetadata).suspendData = suspendData; + // End step span with appropriate status + if (isSkipped) { + traceContext.endStepSpan(attemptSpan, "skipped", { + output: result, + skippedReason: "Condition not met", + }); + } else { + traceContext.endStepSpan(attemptSpan, "completed", { + output: result, + }); } - runLogger.debug(`Workflow suspended at step ${index}`, suspensionMetadata); + // Log step completion with context + runLogger.debug( + `Step ${index + 1} ${isSkipped ? "skipped" : "completed"}: ${stepName} | type=${step.type}`, + { + stepIndex: index, + stepType: step.type, + stepName, + output: result !== undefined ? result : null, + skipped: isSkipped, + }, + ); - // Emit suspension event to stream + // Emit step complete event emitAndCollectEvent({ - type: "workflow-suspended", + type: "step-complete", executionId, - from: step.name || step.id, + from: stepName, input: stateManager.state.data, - output: undefined, - status: "suspended", + output: result, + status: isSkipped ? "skipped" : "success", context: options?.context, timestamp: new Date().toISOString(), stepIndex: index, + stepType: step.type as any, metadata: { - reason: suspensionReason, - suspendData, - suspension: suspensionMetadata, + displayName: `Step ${index + 1}: ${stepName}`, }, }); - // Step suspend event removed - now handled by OpenTelemetry spans - - // Workflow suspend event removed - now handled by OpenTelemetry spans - - // Record suspension in trace - traceContext.recordSuspension( - index, - suspensionReason, - suspendData, - suspensionMetadata?.checkpoint, - ); + await hooks?.onStepEnd?.(stateManager.state); + break; + } catch (stepError) { + if (stepError instanceof Error && stepError.message === "WORKFLOW_CANCELLED") { + const cancellationReason = resolveCancellationReason(); + return completeCancellation(attemptSpan, cancellationReason); + } - // End root span as suspended - traceContext.end("suspended"); + // Check if this is a suspension, not an error + if (stepError instanceof Error && stepError.message === "WORKFLOW_SUSPENDED") { + const suspensionReason = + options?.suspendController?.getReason() || "Step suspended during execution"; + return handleStepSuspension(attemptSpan, suspensionReason); + } - // Ensure spans are flushed (critical for serverless environments) - await observability.flushOnFinish(); + const stepData = executionContext.stepData.get(step.id); + if (stepData) { + stepData.status = "error"; + stepData.error = + stepError instanceof Error ? stepError : new Error(String(stepError)); + } - // Save suspension state to workflow's own Memory V2 - try { - await saveSuspensionState( - suspensionMetadata, - executionId, - effectiveMemory, - runLogger, - collectedEvents, + if (retryCount < stepRetryLimit) { + traceContext.endStepSpan(attemptSpan, "error", { + error: stepError as Error, + }); + retryCount += 1; + runLogger.warn( + `Step ${index + 1} failed, retrying (${retryCount}/${stepRetryLimit}): ${stepName} | type=${step.type}`, + { + stepIndex: index, + stepType: step.type, + stepName, + error: + stepError instanceof Error + ? { message: stepError.message, stack: stepError.stack } + : stepError, + }, ); - } catch (_) { - // Error already logged in saveSuspensionState, don't throw + if (workflowRetryDelayMs > 0) { + try { + await waitWithSignal(workflowRetryDelayMs, options?.suspendController?.signal); + } catch (delayError) { + const interruptionSpan = traceContext.createStepSpan( + index, + step.type, + stepName, + { + stepId: step.id, + input: stateManager.state.data, + attributes: { + ...baseStepSpanAttributes, + ...(stepRetryLimit > 0 && { + "workflow.step.retry.count": retryCount, + }), + }, + }, + ); + if ( + delayError instanceof Error && + delayError.message === "WORKFLOW_CANCELLED" + ) { + const cancellationReason = resolveCancellationReason(); + return completeCancellation(interruptionSpan, cancellationReason); + } + + if ( + delayError instanceof Error && + delayError.message === "WORKFLOW_SUSPENDED" + ) { + const suspensionReason = + options?.suspendController?.getReason() || + "Step suspended during execution"; + return handleStepSuspension(interruptionSpan, suspensionReason); + } + + traceContext.endStepSpan(interruptionSpan, "error", { + error: delayError as Error, + }); + throw delayError; + } + } + continue; } - runLogger.trace(`Workflow execution suspended: ${executionContext.executionId}`); + // End step span with error + traceContext.endStepSpan(attemptSpan, "error", { + error: stepError as Error, + }); - // Return suspended state without throwing - // Don't close the stream when suspended - it will continue after resume - return createWorkflowExecutionResult( - id, - executionId, - stateManager.state.startAt, - new Date(), - "suspended", - null, - stateManager.state.usage, - stateManager.state.suspension, - stateManager.state.cancellation, - undefined, - effectiveResumeSchema, - ); + throw stepError; // Re-throw the original error } + } + } - // End step span with error - traceContext.endStepSpan(stepSpan, "error", { - error: stepError as Error, - }); + if (workflowGuardrailRuntime && guardrailSets.output.length > 0) { + const workflowOutput = stateManager.state.result ?? stateManager.state.data; + const guardrailedOutput = await applyWorkflowOutputGuardrails( + workflowOutput, + guardrailSets.output, + workflowGuardrailRuntime, + ); - throw stepError; // Re-throw the original error - } + stateManager.update({ + data: guardrailedOutput, + result: guardrailedOutput, + }); } const finalState = stateManager.finish(); @@ -1582,7 +1828,7 @@ export function createWorkflow< }); } - await hooks?.onEnd?.(stateManager.state); + await runTerminalHooks("completed"); // Log workflow completion with context const duration = finalState.endAt.getTime() - finalState.startAt.getTime(); @@ -1666,7 +1912,7 @@ export function createWorkflow< }); } - await hooks?.onEnd?.(stateManager.state); + await runTerminalHooks("cancelled"); return createWorkflowExecutionResult( id, @@ -1695,6 +1941,9 @@ export function createWorkflow< // Ensure spans are flushed (critical for serverless environments) await observability.flushOnFinish(); + if (stateManager.state.status === "suspended") { + await runTerminalHooks("suspended", { includeEnd: false }); + } // This case should be handled in the step catch block, // but just in case it bubbles up here streamController?.close(); @@ -1759,7 +2008,7 @@ export function createWorkflow< error: memoryError, }); } - await hooks?.onEnd?.(stateManager.state); + await runTerminalHooks("error"); // Close stream after state update streamController?.close(); @@ -1794,6 +2043,10 @@ export function createWorkflow< // ✅ Always expose memory for registry access memory: effectiveMemory, observability: workflowObservability, + inputGuardrails: workflowInputGuardrails, + outputGuardrails: workflowOutputGuardrails, + guardrailAgent: workflowGuardrailAgent, + retryConfig: workflowRetryConfig, getFullState: () => { // Return workflow state similar to agent.getFullState return { @@ -1806,6 +2059,11 @@ export function createWorkflow< resultSchema: result, suspendSchema: effectiveSuspendSchema, resumeSchema: effectiveResumeSchema, + retryConfig: workflowRetryConfig, + guardrails: { + inputCount: workflowInputGuardrails?.length ?? 0, + outputCount: workflowOutputGuardrails?.length ?? 0, + }, }; }, createSuspendController: () => createDefaultSuspendController(), @@ -2171,9 +2429,21 @@ export interface SerializedWorkflowStep { outputSchema?: unknown; suspendSchema?: unknown; resumeSchema?: unknown; + retries?: number; agentId?: string; + workflowId?: string; executeFunction?: string; conditionFunction?: string; + conditionFunctions?: string[]; + loopType?: "dowhile" | "dountil"; + sleepDurationMs?: number; + sleepDurationFn?: string; + sleepUntil?: string; + sleepUntilFn?: string; + concurrency?: number; + mapConfig?: string; + guardrailInputCount?: number; + guardrailOutputCount?: number; nestedStep?: SerializedWorkflowStep; subSteps?: SerializedWorkflowStep[]; subStepsCount?: number; @@ -2194,6 +2464,7 @@ export function serializeWorkflowStep(step: BaseStep, index: number): Serialized ...(step.outputSchema && { outputSchema: step.outputSchema }), ...(step.suspendSchema && { suspendSchema: step.suspendSchema }), ...(step.resumeSchema && { resumeSchema: step.resumeSchema }), + ...(typeof step.retries === "number" && { retries: step.retries }), }; // Add type-specific data @@ -2258,6 +2529,137 @@ export function serializeWorkflowStep(step: BaseStep, index: number): Serialized }; } + case "sleep": { + const sleepStep = step as WorkflowStep & { + duration?: number | ((...args: any[]) => unknown); + }; + return { + ...baseStep, + ...(typeof sleepStep.duration === "number" && { + sleepDurationMs: sleepStep.duration, + }), + ...(typeof sleepStep.duration === "function" && { + sleepDurationFn: sleepStep.duration.toString(), + }), + }; + } + + case "sleep-until": { + const sleepUntilStep = step as WorkflowStep & { + date?: Date | ((...args: any[]) => unknown); + }; + return { + ...baseStep, + ...(sleepUntilStep.date instanceof Date && { + sleepUntil: sleepUntilStep.date.toISOString(), + }), + ...(typeof sleepUntilStep.date === "function" && { + sleepUntilFn: sleepUntilStep.date.toString(), + }), + }; + } + + case "foreach": { + const forEachStep = step as WorkflowStep & { + step?: BaseStep; + concurrency?: number; + }; + return { + ...baseStep, + ...(forEachStep.step && { + nestedStep: serializeWorkflowStep(forEachStep.step, 0), + }), + ...(typeof forEachStep.concurrency === "number" && { + concurrency: forEachStep.concurrency, + }), + }; + } + + case "loop": { + const loopStep = step as WorkflowStep & { + step?: BaseStep; + condition?: (...args: any[]) => unknown; + loopType?: "dowhile" | "dountil"; + }; + return { + ...baseStep, + ...(loopStep.condition && { + conditionFunction: loopStep.condition.toString(), + }), + ...(loopStep.loopType && { + loopType: loopStep.loopType, + }), + ...(loopStep.step && { + nestedStep: serializeWorkflowStep(loopStep.step, 0), + }), + }; + } + + case "branch": { + const branchStep = step as WorkflowStep & { + branches?: Array<{ step: BaseStep; condition: (...args: any[]) => unknown }>; + }; + return { + ...baseStep, + ...(branchStep.branches && { + subSteps: branchStep.branches.map((branch, index) => + serializeWorkflowStep(branch.step, index), + ), + subStepsCount: branchStep.branches.length, + conditionFunctions: branchStep.branches.map((branch) => branch.condition.toString()), + }), + }; + } + + case "map": { + const mapStep = step as WorkflowStep & { + map?: Record unknown }>; + }; + const mapConfig = mapStep.map + ? Object.fromEntries( + Object.entries(mapStep.map).map(([key, entry]) => { + if (entry?.source === "fn" && entry.fn) { + return [key, { ...entry, fn: entry.fn.toString() }]; + } + return [key, entry]; + }), + ) + : undefined; + + return { + ...baseStep, + ...(mapConfig && { + mapConfig: safeStringify(mapConfig), + }), + }; + } + + case "guardrail": { + const guardrailStep = step as WorkflowStep & { + inputGuardrails?: unknown[]; + outputGuardrails?: unknown[]; + }; + return { + ...baseStep, + ...(guardrailStep.inputGuardrails && { + guardrailInputCount: guardrailStep.inputGuardrails.length, + }), + ...(guardrailStep.outputGuardrails && { + guardrailOutputCount: guardrailStep.outputGuardrails.length, + }), + }; + } + + case "workflow": { + const workflowStep = step as WorkflowStep & { + workflow?: { id?: string }; + }; + return { + ...baseStep, + ...(workflowStep.workflow?.id && { workflowId: workflowStep.workflow.id }), + }; + } + default: { return baseStep; } diff --git a/packages/core/src/workflow/guardrails.spec.ts b/packages/core/src/workflow/guardrails.spec.ts new file mode 100644 index 000000000..0a0605266 --- /dev/null +++ b/packages/core/src/workflow/guardrails.spec.ts @@ -0,0 +1,89 @@ +import { beforeEach, describe, expect, it } from "vitest"; +import { z } from "zod"; +import { createInputGuardrail, createOutputGuardrail } from "../agent/guardrail"; +import { createWorkflowChain } from "./chain"; +import { WorkflowRegistry } from "./registry"; + +describe("workflow guardrails", () => { + beforeEach(() => { + const registry = WorkflowRegistry.getInstance(); + (registry as any).workflows.clear(); + }); + + it("applies input guardrails before execution", async () => { + const trim = createInputGuardrail({ + name: "trim", + handler: async ({ input }) => ({ + pass: true, + action: "modify", + modifiedInput: typeof input === "string" ? input.trim() : input, + }), + }); + + const workflow = createWorkflowChain({ + id: "guardrail-input", + name: "Guardrail Input", + input: z.string(), + result: z.string(), + inputGuardrails: [trim], + }).andThen({ + id: "echo", + execute: async ({ data }) => `${data}-done`, + }); + + const result = await workflow.run(" hello "); + + expect(result.result).toBe("hello-done"); + }); + + it("applies output guardrails after execution", async () => { + const redact = createOutputGuardrail({ + name: "redact", + handler: async ({ output }) => ({ + pass: true, + action: "modify", + modifiedOutput: output.replace(/[0-9]/g, "*"), + }), + }); + + const workflow = createWorkflowChain({ + id: "guardrail-output", + name: "Guardrail Output", + input: z.string(), + result: z.string(), + outputGuardrails: [redact], + }).andThen({ + id: "echo", + execute: async ({ data }) => data, + }); + + const result = await workflow.run("Code 123"); + + expect(result.result).toBe("Code ***"); + }); + + it("returns an error result when input guardrails block execution", async () => { + const block = createInputGuardrail({ + name: "block", + handler: async () => ({ pass: false, message: "Blocked" }), + }); + + const workflow = createWorkflowChain({ + id: "guardrail-block", + name: "Guardrail Block", + input: z.string(), + result: z.string(), + inputGuardrails: [block], + }).andThen({ + id: "echo", + execute: async ({ data }) => data, + }); + + const result = await workflow.run("bad"); + + expect(result.status).toBe("error"); + expect(result.error).toMatchObject({ + code: "GUARDRAIL_INPUT_BLOCKED", + }); + }); +}); diff --git a/packages/core/src/workflow/hooks.spec.ts b/packages/core/src/workflow/hooks.spec.ts new file mode 100644 index 000000000..1a6060db6 --- /dev/null +++ b/packages/core/src/workflow/hooks.spec.ts @@ -0,0 +1,154 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { z } from "zod"; +import { Memory } from "../memory"; +import { InMemoryStorageAdapter } from "../memory/adapters/storage/in-memory"; +import { createWorkflow } from "./core"; +import { WorkflowRegistry } from "./registry"; +import { andThen, andWhen } from "./steps"; + +describe.sequential("workflow hooks", () => { + beforeEach(() => { + const registry = WorkflowRegistry.getInstance(); + (registry as any).workflows.clear(); + }); + + it("provides step snapshots in onFinish and onEnd", async () => { + const hooks = { + onFinish: vi.fn(), + onEnd: vi.fn(), + onError: vi.fn(), + onSuspend: vi.fn(), + }; + + const workflow = createWorkflow( + { + id: "hook-success", + name: "Hook Success", + input: z.object({ value: z.number() }), + result: z.object({ value: z.number() }), + memory: new Memory({ storage: new InMemoryStorageAdapter() }), + hooks, + }, + andThen({ + id: "increment", + execute: async ({ data }) => ({ value: data.value + 1 }), + }), + andWhen({ + id: "maybe-skip", + condition: async () => false, + step: andThen({ + id: "maybe-skip-inner", + execute: async ({ data }) => ({ value: data.value + 100 }), + }), + }), + ); + + const registry = WorkflowRegistry.getInstance(); + registry.registerWorkflow(workflow); + + const result = await workflow.run({ value: 1 }); + + expect(result.status).toBe("completed"); + expect(hooks.onFinish).toHaveBeenCalledTimes(1); + expect(hooks.onEnd).toHaveBeenCalledTimes(1); + expect(hooks.onError).not.toHaveBeenCalled(); + expect(hooks.onSuspend).not.toHaveBeenCalled(); + + const finishInfo = hooks.onFinish.mock.calls[0]?.[0]; + expect(finishInfo.status).toBe("completed"); + expect(finishInfo.steps.increment).toEqual({ + input: { value: 1 }, + output: { value: 2 }, + status: "success", + error: null, + }); + expect(finishInfo.steps["maybe-skip"].status).toBe("skipped"); + + const [endState, endInfo] = hooks.onEnd.mock.calls[0] ?? []; + expect(endState.status).toBe("completed"); + expect(endInfo?.steps.increment?.status).toBe("success"); + }); + + it("calls onError with step error snapshots", async () => { + const hooks = { + onFinish: vi.fn(), + onEnd: vi.fn(), + onError: vi.fn(), + }; + + const workflow = createWorkflow( + { + id: "hook-error", + name: "Hook Error", + input: z.object({ value: z.number() }), + result: z.object({ value: z.number() }), + memory: new Memory({ storage: new InMemoryStorageAdapter() }), + hooks, + }, + andThen({ + id: "explode", + execute: async () => { + throw new Error("boom"); + }, + }), + ); + + const registry = WorkflowRegistry.getInstance(); + registry.registerWorkflow(workflow); + + const result = await workflow.run({ value: 1 }); + + expect(result.status).toBe("error"); + expect(hooks.onError).toHaveBeenCalledTimes(1); + expect(hooks.onFinish).toHaveBeenCalledTimes(1); + expect(hooks.onEnd).toHaveBeenCalledTimes(1); + + const errorInfo = hooks.onError.mock.calls[0]?.[0]; + expect(errorInfo.status).toBe("error"); + expect(errorInfo.steps.explode.status).toBe("error"); + expect(errorInfo.steps.explode.output).toBeUndefined(); + expect(errorInfo.steps.explode.error?.message).toBe("boom"); + }); + + it("calls onSuspend and skips onEnd when suspended", async () => { + const hooks = { + onFinish: vi.fn(), + onEnd: vi.fn(), + onError: vi.fn(), + onSuspend: vi.fn(), + }; + + const workflow = createWorkflow( + { + id: "hook-suspend", + name: "Hook Suspend", + input: z.object({ value: z.number() }), + result: z.object({ value: z.number() }), + memory: new Memory({ storage: new InMemoryStorageAdapter() }), + hooks, + }, + andThen({ + id: "needs-input", + execute: async ({ suspend }) => { + return await suspend("need-approval", { requestedBy: "system" }); + }, + }), + ); + + const registry = WorkflowRegistry.getInstance(); + registry.registerWorkflow(workflow); + + const result = await workflow.run({ value: 42 }); + + expect(result.status).toBe("suspended"); + expect(hooks.onSuspend).toHaveBeenCalledTimes(1); + expect(hooks.onFinish).toHaveBeenCalledTimes(1); + expect(hooks.onError).not.toHaveBeenCalled(); + expect(hooks.onEnd).not.toHaveBeenCalled(); + + const suspendInfo = hooks.onSuspend.mock.calls[0]?.[0]; + expect(suspendInfo.status).toBe("suspended"); + expect(suspendInfo.steps["needs-input"].status).toBe("suspended"); + expect(suspendInfo.steps["needs-input"].output).toEqual({ value: 42 }); + }); +}); diff --git a/packages/core/src/workflow/index.ts b/packages/core/src/workflow/index.ts index 9f62d85a3..9127c3c9e 100644 --- a/packages/core/src/workflow/index.ts +++ b/packages/core/src/workflow/index.ts @@ -1,4 +1,20 @@ -export { andAgent, andThen, andWhen, andAll, andWorkflow, andRace, andTap } from "./steps"; +export { + andAgent, + andThen, + andWhen, + andAll, + andWorkflow, + andRace, + andTap, + andGuardrail, + andSleep, + andSleepUntil, + andForEach, + andBranch, + andDoWhile, + andDoUntil, + andMap, +} from "./steps"; export { createWorkflow, serializeWorkflowStep } from "./core"; export type { SerializedWorkflowStep } from "./core"; export { createWorkflowChain } from "./chain"; @@ -8,11 +24,16 @@ export { createSuspendController } from "./suspend-controller"; export type { WorkflowConfig, Workflow, + WorkflowHookContext, + WorkflowHookStatus, + WorkflowRetryConfig, WorkflowRunOptions, WorkflowResumeOptions, WorkflowSuspensionMetadata, WorkflowSuspendController, WorkflowStats, + WorkflowStepData, + WorkflowStepStatus, WorkflowTimelineEvent, } from "./types"; export type { WorkflowExecuteContext } from "./internal/types"; diff --git a/packages/core/src/workflow/internal/guardrails.ts b/packages/core/src/workflow/internal/guardrails.ts new file mode 100644 index 000000000..3af27bdeb --- /dev/null +++ b/packages/core/src/workflow/internal/guardrails.ts @@ -0,0 +1,235 @@ +import type { Span } from "@opentelemetry/api"; +import type { Logger } from "@voltagent/internal"; +import type { UIMessage } from "ai"; +import type { Agent } from "../../agent/agent"; +import { + type NormalizedInputGuardrail, + type NormalizedOutputGuardrail, + normalizeInputGuardrailList, + normalizeOutputGuardrailList, + runInputGuardrails, + runOutputGuardrails, +} from "../../agent/guardrail"; +import type { BaseMessage } from "../../agent/providers"; +import type { + AgentEvalOperationType, + InputGuardrail, + OperationContext, + OutputGuardrail, +} from "../../agent/types"; +import { randomUUID } from "../../utils/id"; +import type { WorkflowTraceContext } from "../open-telemetry/trace-context"; + +export type WorkflowGuardrailInput = string | UIMessage[] | BaseMessage[]; + +export type WorkflowGuardrailSet = { + input: NormalizedInputGuardrail[]; + output: NormalizedOutputGuardrail[]; +}; + +const DEFAULT_GUARDRAIL_OPERATION: AgentEvalOperationType = "workflow"; + +const createNoopSpan = (): Span => + ({ + setAttribute: () => undefined, + setAttributes: () => undefined, + addEvent: () => undefined, + setStatus: () => undefined, + end: () => undefined, + recordException: () => undefined, + }) as unknown as Span; + +type GuardrailTraceContext = { + createChildSpan: ( + name: string, + type: string, + options?: { label?: string; attributes?: Record }, + ) => Span; + withSpan: (span: Span, fn: () => T | Promise) => Promise; + setInput: (input: unknown) => void; + setOutput: (output: unknown) => void; + end: (status: "completed" | "error", error?: Error) => void; + getRootSpan: () => Span; +}; + +const createNoopGuardrailTraceContext = (): GuardrailTraceContext => ({ + createChildSpan: () => createNoopSpan(), + withSpan: async (_span, fn) => await fn(), + setInput: () => undefined, + setOutput: () => undefined, + end: () => undefined, + getRootSpan: () => createNoopSpan(), +}); + +const createWorkflowGuardrailTraceContext = ( + traceContext?: WorkflowTraceContext, + parentSpan?: Span, +): GuardrailTraceContext => { + if (!traceContext) { + return createNoopGuardrailTraceContext(); + } + + return { + createChildSpan: (name, type, options) => + traceContext.createChildSpan(name, type, { ...options, parentSpan }), + withSpan: (span, fn) => traceContext.withSpan(span, fn), + setInput: (input) => traceContext.setInput(input), + setOutput: (output) => traceContext.setOutput(output), + end: () => undefined, + getRootSpan: () => traceContext.getRootSpan(), + }; +}; + +const createWorkflowGuardrailAgentStub = ({ + workflowId, + workflowName, +}: { + workflowId?: string; + workflowName?: string; +}): Agent => { + const id = workflowId ? `workflow:${workflowId}` : "workflow"; + const name = workflowName || workflowId || "Workflow"; + const baseAgent = { id, name }; + + return new Proxy(baseAgent, { + get(target, prop) { + if (prop in target) { + return target[prop as keyof typeof target]; + } + throw new Error( + "Workflow guardrails do not expose agent methods. Provide guardrailAgent in workflow config or run options.", + ); + }, + }) as unknown as Agent; +}; + +export const isWorkflowGuardrailInput = (value: unknown): value is WorkflowGuardrailInput => + typeof value === "string" || Array.isArray(value); + +export const resolveWorkflowGuardrailSets = ({ + inputGuardrails, + outputGuardrails, + optionInputGuardrails, + optionOutputGuardrails, +}: { + inputGuardrails?: InputGuardrail[]; + outputGuardrails?: OutputGuardrail[]; + optionInputGuardrails?: InputGuardrail[]; + optionOutputGuardrails?: OutputGuardrail[]; +}): WorkflowGuardrailSet => { + const baseInput = inputGuardrails ?? []; + const baseOutput = outputGuardrails ?? []; + const optionInput = optionInputGuardrails ?? []; + const optionOutput = optionOutputGuardrails ?? []; + + const normalizedBaseInput = normalizeInputGuardrailList(baseInput); + const normalizedBaseOutput = normalizeOutputGuardrailList(baseOutput); + const normalizedOptionInput = normalizeInputGuardrailList( + optionInput, + normalizedBaseInput.length, + ); + const normalizedOptionOutput = normalizeOutputGuardrailList( + optionOutput, + normalizedBaseOutput.length, + ); + + return { + input: [...normalizedBaseInput, ...normalizedOptionInput], + output: [...normalizedBaseOutput, ...normalizedOptionOutput], + }; +}; + +export type WorkflowGuardrailRuntime = { + operationContext: OperationContext; + guardrailAgent: Agent; +}; + +export const createWorkflowGuardrailRuntime = ({ + workflowId, + workflowName, + executionId, + traceContext, + logger, + userId, + conversationId, + context, + guardrailAgent, + parentSpan, + operationId, +}: { + workflowId?: string; + workflowName?: string; + executionId?: string; + traceContext?: WorkflowTraceContext; + logger: Logger; + userId?: string; + conversationId?: string; + context?: Map; + guardrailAgent?: Agent; + parentSpan?: Span; + operationId?: string; +}): WorkflowGuardrailRuntime => { + const resolvedGuardrailAgent = + guardrailAgent ?? createWorkflowGuardrailAgentStub({ workflowId, workflowName }); + const guardrailTraceContext = createWorkflowGuardrailTraceContext(traceContext, parentSpan); + const resolvedOperationId = + operationId || [workflowId, executionId, "guardrail", randomUUID()].filter(Boolean).join(":"); + + const operationContext: OperationContext = { + operationId: resolvedOperationId, + userId, + conversationId, + context: context ?? new Map(), + systemContext: new Map(), + isActive: true, + traceContext: guardrailTraceContext as OperationContext["traceContext"], + logger, + abortController: new AbortController(), + startTime: new Date(), + }; + + return { + operationContext, + guardrailAgent: resolvedGuardrailAgent, + }; +}; + +export const applyWorkflowInputGuardrails = async ( + input: WorkflowGuardrailInput, + guardrails: NormalizedInputGuardrail[], + runtime: WorkflowGuardrailRuntime, + operation: AgentEvalOperationType = DEFAULT_GUARDRAIL_OPERATION, +): Promise => { + if (!guardrails.length) { + return input; + } + + runtime.operationContext.input = input; + return runInputGuardrails( + input, + runtime.operationContext, + guardrails, + operation, + runtime.guardrailAgent, + ); +}; + +export const applyWorkflowOutputGuardrails = async ( + output: TOutput, + guardrails: NormalizedOutputGuardrail[], + runtime: WorkflowGuardrailRuntime, + operation: AgentEvalOperationType = DEFAULT_GUARDRAIL_OPERATION, +): Promise => { + if (!guardrails.length) { + return output; + } + + runtime.operationContext.output = output as OperationContext["output"]; + return runOutputGuardrails({ + output, + operationContext: runtime.operationContext, + guardrails, + operation, + agent: runtime.guardrailAgent, + }); +}; diff --git a/packages/core/src/workflow/internal/types.ts b/packages/core/src/workflow/internal/types.ts index 40fceb356..669ff1859 100644 --- a/packages/core/src/workflow/internal/types.ts +++ b/packages/core/src/workflow/internal/types.ts @@ -5,7 +5,7 @@ import type * as TF from "type-fest"; import type { z } from "zod"; import type { BaseMessage } from "../../agent/providers"; import type { WorkflowExecutionContext } from "../context"; -import type { WorkflowStreamWriter } from "../types"; +import type { WorkflowStepData, WorkflowStreamWriter } from "../types"; import type { WorkflowState } from "./state"; /** @@ -39,11 +39,12 @@ export type InternalWorkflowStateParam = Omit< * @private - INTERNAL USE ONLY */ export interface WorkflowExecuteContext { - data: InternalExtractWorkflowInputData; + data: DATA; state: InternalWorkflowStateParam; - getStepData: (stepId: string) => { input: any; output: any } | undefined; + getStepData: (stepId: string) => WorkflowStepData | undefined; suspend: (reason?: string, suspendData?: SUSPEND_DATA) => Promise; resumeData?: RESUME_DATA; + retryCount?: number; /** * Logger instance for this workflow execution. * Provides execution-scoped logging with full context (userId, conversationId, executionId). @@ -79,6 +80,10 @@ export type InternalWorkflowStepConfig = { * Description of what the step does */ purpose?: string; + /** + * Number of retry attempts when the step throws an error + */ + retries?: number; } & T; /** @@ -118,6 +123,10 @@ export interface InternalBaseWorkflowStep(con * @returns The execution context for the step */ export function createStepExecutionContext( - data: InternalExtractWorkflowInputData, + data: DATA, state: InternalWorkflowStateParam, executionContext: WorkflowExecutionContext, suspendFn: (reason?: string, suspendData?: SUSPEND_DATA) => Promise, resumeData?: RESUME_DATA, + retryCount = 0, ): WorkflowExecuteContext { return { data, @@ -81,6 +81,7 @@ export function createStepExecutionContext executionContext?.stepData.get(stepId), suspend: suspendFn, resumeData, + retryCount, logger: executionContext.logger, writer: executionContext.streamWriter, }; diff --git a/packages/core/src/workflow/open-telemetry/trace-context.ts b/packages/core/src/workflow/open-telemetry/trace-context.ts index 01bcba002..bac40678a 100644 --- a/packages/core/src/workflow/open-telemetry/trace-context.ts +++ b/packages/core/src/workflow/open-telemetry/trace-context.ts @@ -253,6 +253,36 @@ export class WorkflowTraceContext { return { parentSpan, childSpans }; } + /** + * Create a generic child span under the workflow root or an optional parent span + */ + createChildSpan( + name: string, + type: string, + options?: { + label?: string; + attributes?: Record; + kind?: SpanKind; + parentSpan?: Span; + }, + ): Span { + const spanOptions: SpanOptions = { + kind: options?.kind || SpanKind.INTERNAL, + attributes: { + ...this.commonAttributes, + "span.type": type, + ...(options?.label && { "span.label": options.label }), + ...(options?.attributes || {}), + }, + }; + + const parentContext = options?.parentSpan + ? trace.setSpan(this.activeContext, options.parentSpan) + : this.activeContext; + + return this.tracer.startSpan(name, spanOptions, parentContext); + } + /** * Record a suspension event on the workflow */ @@ -323,6 +353,14 @@ export class WorkflowTraceContext { return this.rootSpan; } + /** + * Set input on the root span + */ + setInput(input: any): void { + const inputStr = typeof input === "string" ? input : safeStringify(input); + this.rootSpan.setAttribute("input", inputStr); + } + /** * Set output on the root span */ diff --git a/packages/core/src/workflow/steps/and-branch.spec.ts b/packages/core/src/workflow/steps/and-branch.spec.ts new file mode 100644 index 000000000..241852dc8 --- /dev/null +++ b/packages/core/src/workflow/steps/and-branch.spec.ts @@ -0,0 +1,167 @@ +import { describe, expect, it } from "vitest"; +import { createMockWorkflowExecuteContext } from "../../test-utils"; +import { andBranch } from "./and-branch"; +import { andThen } from "./and-then"; + +describe("andBranch", () => { + it("runs matching branches and keeps index alignment", async () => { + const step = andBranch({ + id: "branch", + branches: [ + { + condition: async ({ data }) => data.value > 3, + step: andThen({ + id: "branch-a", + execute: async () => ({ branch: "a" }), + }), + }, + { + condition: async ({ data }) => data.value < 3, + step: andThen({ + id: "branch-b", + execute: async () => ({ branch: "b" }), + }), + }, + { + condition: async ({ data }) => data.value === 5, + step: andThen({ + id: "branch-c", + execute: async () => ({ branch: "c" }), + }), + }, + ], + }); + + const result = await step.execute( + createMockWorkflowExecuteContext({ + data: { value: 5 }, + }), + ); + + expect(result).toEqual([{ branch: "a" }, undefined, { branch: "c" }]); + }); + + it("returns undefined for branches that do not match", async () => { + const step = andBranch({ + id: "branch", + branches: [ + { + condition: async () => false, + step: andThen({ + id: "branch-a", + execute: async () => ({ branch: "a" }), + }), + }, + { + condition: async () => false, + step: andThen({ + id: "branch-b", + execute: async () => ({ branch: "b" }), + }), + }, + ], + }); + + const result = await step.execute( + createMockWorkflowExecuteContext({ + data: { value: 0 }, + }), + ); + + expect(result).toEqual([undefined, undefined]); + }); + + it("returns an empty array when no branches are provided", async () => { + const step = andBranch({ + id: "branch", + branches: [], + }); + + const result = await step.execute( + createMockWorkflowExecuteContext({ + data: { value: 1 }, + }), + ); + + expect(result).toEqual([]); + }); + + it("propagates errors from branch steps", async () => { + const step = andBranch({ + id: "branch", + branches: [ + { + condition: async () => true, + step: andThen({ + id: "boom", + execute: async () => { + throw new Error("boom"); + }, + }), + }, + ], + }); + + await expect( + step.execute( + createMockWorkflowExecuteContext({ + data: { value: 1 }, + }), + ), + ).rejects.toThrow("boom"); + }); + + it("throws when the workflow is cancelled", async () => { + const controller = new AbortController(); + controller.abort({ type: "cancelled" }); + + const step = andBranch({ + id: "branch", + branches: [ + { + condition: async () => true, + step: andThen({ + id: "branch-a", + execute: async () => ({ ok: true }), + }), + }, + ], + }); + + await expect( + step.execute( + createMockWorkflowExecuteContext({ + data: { value: 1 }, + state: { signal: controller.signal } as any, + }), + ), + ).rejects.toThrow("WORKFLOW_CANCELLED"); + }); + + it("throws when the workflow is suspended", async () => { + const controller = new AbortController(); + controller.abort(); + + const step = andBranch({ + id: "branch", + branches: [ + { + condition: async () => true, + step: andThen({ + id: "branch-a", + execute: async () => ({ ok: true }), + }), + }, + ], + }); + + await expect( + step.execute( + createMockWorkflowExecuteContext({ + data: { value: 1 }, + state: { signal: controller.signal } as any, + }), + ), + ).rejects.toThrow("WORKFLOW_SUSPENDED"); + }); +}); diff --git a/packages/core/src/workflow/steps/and-branch.ts b/packages/core/src/workflow/steps/and-branch.ts new file mode 100644 index 000000000..2f39646a2 --- /dev/null +++ b/packages/core/src/workflow/steps/and-branch.ts @@ -0,0 +1,91 @@ +import type { Span } from "@opentelemetry/api"; +import { defaultStepConfig } from "../internal/utils"; +import { matchStep } from "./helpers"; +import { throwIfAborted } from "./signal"; +import type { WorkflowStepBranch, WorkflowStepBranchConfig } from "./types"; + +/** + * Creates a branching step that runs all steps whose conditions match. + */ +export function andBranch({ + branches, + ...config +}: WorkflowStepBranchConfig) { + return { + ...defaultStepConfig(config), + type: "branch", + branches, + execute: async (context) => { + const { state } = context; + const traceContext = state.workflowContext?.traceContext; + + const conditionResults = await Promise.all( + branches.map(async (branch) => { + throwIfAborted(state.signal); + return branch.condition(context); + }), + ); + + const results = await Promise.all( + branches.map(async (branch, index) => { + if (!conditionResults[index]) { + return undefined; + } + + throwIfAborted(state.signal); + + const finalStep = matchStep(branch.step); + let childSpan: Span | undefined; + + if (traceContext) { + childSpan = traceContext.createStepSpan( + index, + finalStep.type, + finalStep.name || finalStep.id || `Branch ${index + 1}`, + { + parentStepId: config.id, + parallelIndex: index, + input: context.data, + attributes: { + "workflow.step.branch": true, + "workflow.step.parent_type": "branch", + }, + }, + ); + } + + const subState = { + ...state, + workflowContext: undefined, + }; + + const executeStep = () => + finalStep.execute({ + ...context, + state: subState, + }); + + try { + const result = + childSpan && traceContext + ? await traceContext.withSpan(childSpan, executeStep) + : await executeStep(); + + if (childSpan && traceContext) { + traceContext.endStepSpan(childSpan, "completed", { output: result }); + } + + return result; + } catch (error) { + if (childSpan && traceContext) { + traceContext.endStepSpan(childSpan, "error", { error }); + } + throw error; + } + }), + ); + + return results as Array; + }, + } satisfies WorkflowStepBranch; +} diff --git a/packages/core/src/workflow/steps/and-foreach.spec.ts b/packages/core/src/workflow/steps/and-foreach.spec.ts new file mode 100644 index 000000000..9767cd5f5 --- /dev/null +++ b/packages/core/src/workflow/steps/and-foreach.spec.ts @@ -0,0 +1,149 @@ +import { describe, expect, it } from "vitest"; +import { createMockWorkflowExecuteContext } from "../../test-utils"; +import { andForEach } from "./and-foreach"; +import { andThen } from "./and-then"; + +describe("andForEach", () => { + it("maps each item with the provided step", async () => { + const step = andForEach({ + id: "foreach", + step: andThen({ + id: "double", + execute: async ({ data }) => data * 2, + }), + }); + + const result = await step.execute( + createMockWorkflowExecuteContext({ + data: [1, 2, 3], + }), + ); + + expect(result).toEqual([2, 4, 6]); + }); + + it("returns an empty array for empty input", async () => { + const step = andForEach({ + id: "foreach-empty", + step: andThen({ + id: "noop", + execute: async ({ data }) => data, + }), + }); + + const result = await step.execute( + createMockWorkflowExecuteContext({ + data: [], + }), + ); + + expect(result).toEqual([]); + }); + + it("handles single-item arrays", async () => { + const step = andForEach({ + id: "foreach-single", + step: andThen({ + id: "double", + execute: async ({ data }) => data * 2, + }), + }); + + const result = await step.execute( + createMockWorkflowExecuteContext({ + data: [4], + }), + ); + + expect(result).toEqual([8]); + }); + + it("preserves order with async steps", async () => { + const step = andForEach({ + id: "foreach-order", + concurrency: 3, + step: andThen({ + id: "delayed", + execute: async ({ data }) => { + const delay = data === 1 ? 30 : data === 2 ? 10 : 20; + await new Promise((resolve) => setTimeout(resolve, delay)); + return data * 2; + }, + }), + }); + + const result = await step.execute( + createMockWorkflowExecuteContext({ + data: [1, 2, 3], + }), + ); + + expect(result).toEqual([2, 4, 6]); + }); + + it("respects the concurrency limit", async () => { + let inFlight = 0; + let maxInFlight = 0; + + const step = andForEach({ + id: "foreach-concurrency", + concurrency: 2, + step: andThen({ + id: "track", + execute: async ({ data }) => { + inFlight += 1; + maxInFlight = Math.max(maxInFlight, inFlight); + await new Promise((resolve) => setTimeout(resolve, 20)); + inFlight -= 1; + return data; + }, + }), + }); + + await step.execute( + createMockWorkflowExecuteContext({ + data: [1, 2, 3, 4], + }), + ); + + expect(maxInFlight).toBeLessThanOrEqual(2); + }); + + it("throws when input is not an array", async () => { + const step = andForEach({ + id: "foreach-invalid", + step: andThen({ + id: "noop", + execute: async ({ data }) => data, + }), + }); + + await expect( + step.execute( + createMockWorkflowExecuteContext({ + data: { value: 1 } as any, + }), + ), + ).rejects.toThrow("andForEach expects array input data"); + }); + + it("propagates errors from inner steps", async () => { + const step = andForEach({ + id: "foreach-error", + step: andThen({ + id: "boom", + execute: async () => { + throw new Error("boom"); + }, + }), + }); + + await expect( + step.execute( + createMockWorkflowExecuteContext({ + data: [1, 2], + }), + ), + ).rejects.toThrow("boom"); + }); +}); diff --git a/packages/core/src/workflow/steps/and-foreach.ts b/packages/core/src/workflow/steps/and-foreach.ts new file mode 100644 index 000000000..e841e4147 --- /dev/null +++ b/packages/core/src/workflow/steps/and-foreach.ts @@ -0,0 +1,112 @@ +import type { Span } from "@opentelemetry/api"; +import { defaultStepConfig } from "../internal/utils"; +import { matchStep } from "./helpers"; +import { throwIfAborted } from "./signal"; +import type { WorkflowStepForEach, WorkflowStepForEachConfig } from "./types"; + +/** + * Creates a foreach step that runs a step for each item in an array. + */ +export function andForEach({ + step, + concurrency = 1, + ...config +}: WorkflowStepForEachConfig) { + const finalStep = matchStep(step); + + return { + ...defaultStepConfig(config), + type: "foreach", + step, + concurrency, + execute: async (context) => { + const { data, state } = context; + if (!Array.isArray(data)) { + throw new Error("andForEach expects array input data"); + } + + const items = data as ITEM[]; + if (items.length === 0) { + return []; + } + + const traceContext = state.workflowContext?.traceContext; + const normalizedConcurrency = Number.isFinite(concurrency) ? Math.floor(concurrency) : 1; + const maxConcurrency = Math.max(1, normalizedConcurrency); + + const runItem = async (item: ITEM, index: number) => { + throwIfAborted(state.signal); + + let childSpan: Span | undefined; + if (traceContext) { + childSpan = traceContext.createStepSpan( + index, + finalStep.type, + finalStep.name || finalStep.id || `ForEach ${index + 1}`, + { + parentStepId: config.id, + parallelIndex: index, + input: item, + attributes: { + "workflow.step.foreach": true, + "workflow.step.parent_type": "foreach", + }, + }, + ); + } + + const subState = { + ...state, + workflowContext: undefined, + }; + + const executeStep = () => + finalStep.execute({ + ...context, + data: item, + state: subState, + }); + + try { + const result = + childSpan && traceContext + ? await traceContext.withSpan(childSpan, executeStep) + : await executeStep(); + + if (childSpan && traceContext) { + traceContext.endStepSpan(childSpan, "completed", { output: result }); + } + + return result; + } catch (error) { + if (childSpan && traceContext) { + traceContext.endStepSpan(childSpan, "error", { error }); + } + throw error; + } + }; + + if (maxConcurrency === 1) { + const results: RESULT[] = []; + for (let index = 0; index < items.length; index += 1) { + results.push(await runItem(items[index] as ITEM, index)); + } + return results; + } + + const results = new Array(items.length); + let nextIndex = 0; + + const workers = Array.from({ length: Math.min(maxConcurrency, items.length) }, async () => { + while (nextIndex < items.length) { + const index = nextIndex; + nextIndex += 1; + results[index] = await runItem(items[index] as ITEM, index); + } + }); + + await Promise.all(workers); + return results; + }, + } satisfies WorkflowStepForEach; +} diff --git a/packages/core/src/workflow/steps/and-guardrail.spec.ts b/packages/core/src/workflow/steps/and-guardrail.spec.ts new file mode 100644 index 000000000..038946f47 --- /dev/null +++ b/packages/core/src/workflow/steps/and-guardrail.spec.ts @@ -0,0 +1,54 @@ +import { describe, expect, it } from "vitest"; +import { createInputGuardrail, createOutputGuardrail } from "../../agent/guardrail"; +import { createMockWorkflowExecuteContext } from "../../test-utils"; +import { andGuardrail } from "./and-guardrail"; + +describe("andGuardrail", () => { + it("applies output guardrails to data", async () => { + const redact = createOutputGuardrail({ + name: "redact", + handler: async ({ output }) => ({ + pass: true, + action: "modify", + modifiedOutput: output.replace(/[0-9]/g, "*"), + }), + }); + + const step = andGuardrail({ + id: "guard-output", + outputGuardrails: [redact], + }); + + const result = await step.execute( + createMockWorkflowExecuteContext({ + data: "Code 123", + }), + ); + + expect(result).toBe("Code ***"); + }); + + it("applies input guardrails to string data", async () => { + const trim = createInputGuardrail({ + name: "trim", + handler: async ({ input }) => ({ + pass: true, + action: "modify", + modifiedInput: typeof input === "string" ? input.trim() : input, + }), + }); + + const step = andGuardrail({ + id: "guard-input", + inputGuardrails: [trim], + }); + + const result = await step.execute( + createMockWorkflowExecuteContext({ + data: " hello ", + }), + ); + + expect(result).toBe("hello"); + }); +}); diff --git a/packages/core/src/workflow/steps/and-guardrail.ts b/packages/core/src/workflow/steps/and-guardrail.ts new file mode 100644 index 000000000..6a9a4e948 --- /dev/null +++ b/packages/core/src/workflow/steps/and-guardrail.ts @@ -0,0 +1,84 @@ +import { normalizeInputGuardrailList, normalizeOutputGuardrailList } from "../../agent/guardrail"; +import { + applyWorkflowInputGuardrails, + applyWorkflowOutputGuardrails, + createWorkflowGuardrailRuntime, + isWorkflowGuardrailInput, +} from "../internal/guardrails"; +import type { WorkflowExecuteContext } from "../internal/types"; +import { defaultStepConfig } from "../internal/utils"; +import type { WorkflowStepGuardrail, WorkflowStepGuardrailConfig } from "./types"; + +/** + * Applies guardrails to the current workflow data. + * Use input guardrails for string/message data and output guardrails for structured data. + */ +export function andGuardrail({ + inputGuardrails, + outputGuardrails, + ...config +}: WorkflowStepGuardrailConfig) { + const normalizedInputGuardrails = inputGuardrails + ? normalizeInputGuardrailList(inputGuardrails) + : []; + const normalizedOutputGuardrails = outputGuardrails + ? normalizeOutputGuardrailList(outputGuardrails) + : []; + + return { + ...defaultStepConfig(config), + type: "guardrail", + inputGuardrails, + outputGuardrails, + execute: async (context: WorkflowExecuteContext) => { + if (normalizedInputGuardrails.length === 0 && normalizedOutputGuardrails.length === 0) { + return context.data; + } + + const workflowContext = context.state.workflowContext; + const guardrailRuntime = createWorkflowGuardrailRuntime({ + workflowId: workflowContext?.workflowId, + workflowName: workflowContext?.workflowName, + executionId: workflowContext?.executionId, + traceContext: workflowContext?.traceContext, + logger: context.logger, + userId: context.state.userId, + conversationId: context.state.conversationId, + context: (workflowContext?.context ?? context.state.context) as + | Map + | undefined, + guardrailAgent: workflowContext?.guardrailAgent, + parentSpan: workflowContext?.currentStepSpan, + operationId: workflowContext?.executionId + ? `${workflowContext.executionId}:guardrail:${config.id}` + : undefined, + }); + + let currentData: DATA = context.data; + + if (normalizedInputGuardrails.length > 0) { + if (!isWorkflowGuardrailInput(currentData)) { + throw new Error( + "andGuardrail input guardrails require string or message input. Use outputGuardrails for structured data.", + ); + } + const guardrailedInput = await applyWorkflowInputGuardrails( + currentData, + normalizedInputGuardrails, + guardrailRuntime, + ); + currentData = guardrailedInput as DATA; + } + + if (normalizedOutputGuardrails.length > 0) { + currentData = (await applyWorkflowOutputGuardrails( + currentData, + normalizedOutputGuardrails, + guardrailRuntime, + )) as DATA; + } + + return currentData; + }, + } satisfies WorkflowStepGuardrail; +} diff --git a/packages/core/src/workflow/steps/and-loop.spec.ts b/packages/core/src/workflow/steps/and-loop.spec.ts new file mode 100644 index 000000000..6da835cb2 --- /dev/null +++ b/packages/core/src/workflow/steps/and-loop.spec.ts @@ -0,0 +1,44 @@ +import { describe, expect, it } from "vitest"; +import { createMockWorkflowExecuteContext } from "../../test-utils"; +import { andDoUntil, andDoWhile } from "./and-loop"; +import { andThen } from "./and-then"; + +describe("andLoop", () => { + it("runs a do-while loop until the condition is false", async () => { + const step = andDoWhile({ + id: "loop", + step: andThen({ + id: "increment", + execute: async ({ data }) => data + 1, + }), + condition: async ({ data }) => data < 3, + }); + + const result = await step.execute( + createMockWorkflowExecuteContext({ + data: 0, + }), + ); + + expect(result).toBe(3); + }); + + it("runs a do-until loop until the condition is true", async () => { + const step = andDoUntil({ + id: "loop", + step: andThen({ + id: "increment", + execute: async ({ data }) => data + 1, + }), + condition: async ({ data }) => data >= 2, + }); + + const result = await step.execute( + createMockWorkflowExecuteContext({ + data: 0, + }), + ); + + expect(result).toBe(2); + }); +}); diff --git a/packages/core/src/workflow/steps/and-loop.ts b/packages/core/src/workflow/steps/and-loop.ts new file mode 100644 index 000000000..7f5288ef7 --- /dev/null +++ b/packages/core/src/workflow/steps/and-loop.ts @@ -0,0 +1,110 @@ +import type { Span } from "@opentelemetry/api"; +import { defaultStepConfig } from "../internal/utils"; +import { matchStep } from "./helpers"; +import { throwIfAborted } from "./signal"; +import type { WorkflowStepLoop, WorkflowStepLoopConfig } from "./types"; + +type LoopType = "dowhile" | "dountil"; + +const createLoopStep = ( + loopType: LoopType, + { step, condition, ...config }: WorkflowStepLoopConfig, +) => { + const finalStep = matchStep(step); + + return { + ...defaultStepConfig(config), + type: "loop", + loopType, + step, + condition, + execute: async (context) => { + const { state } = context; + const traceContext = state.workflowContext?.traceContext; + let currentData = context.data as DATA | RESULT; + let iteration = 0; + + while (true) { + throwIfAborted(state.signal); + + let childSpan: Span | undefined; + if (traceContext) { + childSpan = traceContext.createStepSpan( + iteration, + finalStep.type, + finalStep.name || finalStep.id || `Loop ${iteration + 1}`, + { + parentStepId: config.id, + parallelIndex: iteration, + input: currentData, + attributes: { + "workflow.step.loop": true, + "workflow.step.parent_type": "loop", + "workflow.step.loop_type": loopType, + }, + }, + ); + } + + const subState = { + ...state, + workflowContext: undefined, + }; + + const executeStep = () => + finalStep.execute({ + ...context, + data: currentData as DATA, + state: subState, + }); + + try { + currentData = + childSpan && traceContext + ? await traceContext.withSpan(childSpan, executeStep) + : await executeStep(); + + if (childSpan && traceContext) { + traceContext.endStepSpan(childSpan, "completed", { output: currentData }); + } + } catch (error) { + if (childSpan && traceContext) { + traceContext.endStepSpan(childSpan, "error", { error }); + } + throw error; + } + + iteration += 1; + throwIfAborted(state.signal); + const shouldContinue = await condition({ + ...context, + data: currentData as RESULT, + }); + + if (loopType === "dowhile" ? !shouldContinue : shouldContinue) { + break; + } + } + + return currentData as RESULT; + }, + } satisfies WorkflowStepLoop; +}; + +/** + * Creates a do-while loop step for the workflow. + */ +export function andDoWhile( + config: WorkflowStepLoopConfig, +) { + return createLoopStep("dowhile", config); +} + +/** + * Creates a do-until loop step for the workflow. + */ +export function andDoUntil( + config: WorkflowStepLoopConfig, +) { + return createLoopStep("dountil", config); +} diff --git a/packages/core/src/workflow/steps/and-map.spec.ts b/packages/core/src/workflow/steps/and-map.spec.ts new file mode 100644 index 000000000..93f296c61 --- /dev/null +++ b/packages/core/src/workflow/steps/and-map.spec.ts @@ -0,0 +1,63 @@ +import { describe, expect, it } from "vitest"; +import { createMockWorkflowExecuteContext } from "../../test-utils"; +import { andMap } from "./and-map"; + +describe("andMap", () => { + it("maps values from data, input, context, and steps", async () => { + const step = andMap({ + id: "map", + map: { + userId: { source: "data", path: "userId" }, + orderId: { source: "input", path: "orderId" }, + region: { source: "context", key: "region", path: "code" }, + name: { source: "step", stepId: "fetch-user", path: "name" }, + constant: { source: "value", value: 42 }, + }, + }); + + const result = await step.execute( + createMockWorkflowExecuteContext({ + data: { userId: "u-1" }, + state: { + input: { orderId: "o-9" }, + context: new Map([["region", { code: "eu" }]]), + } as any, + getStepData: (stepId) => + stepId === "fetch-user" + ? { input: null, output: { name: "Ada" }, status: "success", error: null } + : undefined, + }), + ); + + expect(result).toEqual({ + userId: "u-1", + orderId: "o-9", + region: "eu", + name: "Ada", + constant: 42, + }); + }); + + it("awaits async fn map entries", async () => { + const step = andMap({ + id: "map", + map: { + value: { + source: "fn", + fn: async () => { + await new Promise((resolve) => setTimeout(resolve, 5)); + return "ok"; + }, + }, + }, + }); + + const result = await step.execute( + createMockWorkflowExecuteContext({ + data: { ok: true }, + }), + ); + + expect(result).toEqual({ value: "ok" }); + }); +}); diff --git a/packages/core/src/workflow/steps/and-map.ts b/packages/core/src/workflow/steps/and-map.ts new file mode 100644 index 000000000..076870ea3 --- /dev/null +++ b/packages/core/src/workflow/steps/and-map.ts @@ -0,0 +1,101 @@ +import type { WorkflowExecuteContext } from "../internal/types"; +import { defaultStepConfig } from "../internal/utils"; +import type { + WorkflowStepMap, + WorkflowStepMapConfig, + WorkflowStepMapEntry, + WorkflowStepMapResult, +} from "./types"; + +const readPath = (value: unknown, path?: string) => { + if (path === undefined || path === ".") { + return value; + } + + const parts = path.split("."); + let current: any = value; + + for (const part of parts) { + if (current && typeof current === "object") { + current = current[part]; + } else { + throw new Error(`Invalid path '${path}'`); + } + } + + return current; +}; + +const getContextValue = (context: WorkflowExecuteContext, key: string) => { + const ctx = context.state.context; + if (!ctx) { + return undefined; + } + + if (ctx instanceof Map) { + return ctx.get(key); + } + + if (typeof ctx === "object") { + return (ctx as Record)[key]; + } + + return undefined; +}; + +const resolveMapEntry = async ( + entry: WorkflowStepMapEntry, + context: WorkflowExecuteContext, +) => { + switch (entry.source) { + case "value": + return entry.value; + case "data": + return readPath(context.data, entry.path); + case "input": + return readPath(context.state.input, entry.path); + case "step": { + const stepData = context.getStepData(entry.stepId); + if (!stepData) { + throw new Error(`Step '${entry.stepId}' not found in map`); + } + const stepValue = stepData.output !== undefined ? stepData.output : stepData.input; + return readPath(stepValue, entry.path); + } + case "context": { + const ctxValue = getContextValue(context, entry.key); + return readPath(ctxValue, entry.path); + } + case "fn": + return await entry.fn(context); + default: + throw new Error("Unsupported map entry"); + } +}; + +/** + * Creates a mapping step that composes data from input, steps, or context. + */ +export function andMap>>({ + map, + ...config +}: WorkflowStepMapConfig) { + return { + ...defaultStepConfig(config), + type: "map", + map, + execute: async (context) => { + const entries = Object.entries(map) as Array<[keyof MAP, MAP[keyof MAP]]>; + const result = {} as WorkflowStepMapResult; + + for (const [key, entry] of entries) { + result[key] = (await resolveMapEntry( + entry, + context, + )) as WorkflowStepMapResult[typeof key]; + } + + return result; + }, + } satisfies WorkflowStepMap; +} diff --git a/packages/core/src/workflow/steps/and-sleep-until.ts b/packages/core/src/workflow/steps/and-sleep-until.ts new file mode 100644 index 000000000..d330e4f7a --- /dev/null +++ b/packages/core/src/workflow/steps/and-sleep-until.ts @@ -0,0 +1,37 @@ +import type { WorkflowExecuteContext } from "../internal/types"; +import { defaultStepConfig } from "../internal/utils"; +import { waitWithSignal } from "./signal"; +import type { WorkflowStepSleepUntil, WorkflowStepSleepUntilConfig } from "./types"; + +/** + * Creates a sleep-until step for the workflow + * + * @example + * ```ts + * const w = createWorkflow( + * andSleepUntil({ id: "pause-until", date: new Date(Date.now() + 1000) }), + * andThen({ id: "next", execute: async ({ data }) => ({ ...data }) }) + * ); + * ``` + */ +export function andSleepUntil({ + date, + ...config +}: WorkflowStepSleepUntilConfig) { + return { + ...defaultStepConfig(config), + type: "sleep-until", + date, + execute: async (context: WorkflowExecuteContext) => { + const target = typeof date === "function" ? await date(context) : date; + + if (!(target instanceof Date) || Number.isNaN(target.getTime())) { + throw new Error("andSleepUntil expected a valid Date"); + } + + const delayMs = target.getTime() - Date.now(); + await waitWithSignal(delayMs, context.state.signal); + return context.data as DATA; + }, + } satisfies WorkflowStepSleepUntil; +} diff --git a/packages/core/src/workflow/steps/and-sleep.spec.ts b/packages/core/src/workflow/steps/and-sleep.spec.ts new file mode 100644 index 000000000..7ad261106 --- /dev/null +++ b/packages/core/src/workflow/steps/and-sleep.spec.ts @@ -0,0 +1,133 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { createMockWorkflowExecuteContext } from "../../test-utils"; +import { andSleep } from "./and-sleep"; +import { andSleepUntil } from "./and-sleep-until"; + +afterEach(() => { + vi.useRealTimers(); +}); + +describe("andSleep", () => { + it("returns input data after sleeping", async () => { + const step = andSleep({ + id: "sleep", + duration: 0, + }); + + const result = await step.execute( + createMockWorkflowExecuteContext({ + data: { ok: true }, + }), + ); + + expect(result).toEqual({ ok: true }); + }); + + it("waits for the configured duration", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2024-01-01T00:00:00Z")); + + const step = andSleep({ + id: "sleep", + duration: 100, + }); + + let resolved = false; + const resultPromise = step.execute( + createMockWorkflowExecuteContext({ + data: { ok: true }, + }), + ); + resultPromise.then(() => { + resolved = true; + }); + + await vi.advanceTimersByTimeAsync(99); + await Promise.resolve(); + expect(resolved).toBe(false); + + await vi.advanceTimersByTimeAsync(1); + const result = await resultPromise; + expect(result).toEqual({ ok: true }); + }); + + it("treats negative durations as zero", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2024-01-01T00:00:00Z")); + + const step = andSleep({ + id: "sleep", + duration: -10, + }); + + const resultPromise = step.execute( + createMockWorkflowExecuteContext({ + data: { ok: true }, + }), + ); + + await vi.runAllTimersAsync(); + const result = await resultPromise; + expect(result).toEqual({ ok: true }); + }); +}); + +describe("andSleepUntil", () => { + it("returns input data when sleepUntil is in the past", async () => { + const step = andSleepUntil({ + id: "sleep-until", + date: new Date(Date.now() - 1000), + }); + + const result = await step.execute( + createMockWorkflowExecuteContext({ + data: { ok: true }, + }), + ); + + expect(result).toEqual({ ok: true }); + }); + + it("waits until the target date", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2024-01-01T00:00:00Z")); + + const step = andSleepUntil({ + id: "sleep-until", + date: () => new Date(Date.now() + 100), + }); + + let resolved = false; + const resultPromise = step.execute( + createMockWorkflowExecuteContext({ + data: { ok: true }, + }), + ); + resultPromise.then(() => { + resolved = true; + }); + + await vi.advanceTimersByTimeAsync(99); + await Promise.resolve(); + expect(resolved).toBe(false); + + await vi.advanceTimersByTimeAsync(1); + const result = await resultPromise; + expect(result).toEqual({ ok: true }); + }); + + it("throws when the date is invalid", async () => { + const step = andSleepUntil({ + id: "sleep-until", + date: new Date("invalid"), + }); + + await expect( + step.execute( + createMockWorkflowExecuteContext({ + data: { ok: true }, + }), + ), + ).rejects.toThrow("andSleepUntil expected a valid Date"); + }); +}); diff --git a/packages/core/src/workflow/steps/and-sleep.ts b/packages/core/src/workflow/steps/and-sleep.ts new file mode 100644 index 000000000..c60e56b7a --- /dev/null +++ b/packages/core/src/workflow/steps/and-sleep.ts @@ -0,0 +1,31 @@ +import type { WorkflowExecuteContext } from "../internal/types"; +import { defaultStepConfig } from "../internal/utils"; +import { waitWithSignal } from "./signal"; +import type { WorkflowStepSleep, WorkflowStepSleepConfig } from "./types"; + +/** + * Creates a sleep step for the workflow + * + * @example + * ```ts + * const w = createWorkflow( + * andSleep({ id: "pause", duration: 500 }), + * andThen({ id: "next", execute: async ({ data }) => ({ ...data }) }) + * ); + * ``` + */ +export function andSleep({ + duration, + ...config +}: WorkflowStepSleepConfig) { + return { + ...defaultStepConfig(config), + type: "sleep", + duration, + execute: async (context: WorkflowExecuteContext) => { + const durationMs = typeof duration === "function" ? await duration(context) : duration; + await waitWithSignal(durationMs, context.state.signal); + return context.data as DATA; + }, + } satisfies WorkflowStepSleep; +} diff --git a/packages/core/src/workflow/steps/helpers.ts b/packages/core/src/workflow/steps/helpers.ts index b324a7760..f7097f060 100644 --- a/packages/core/src/workflow/steps/helpers.ts +++ b/packages/core/src/workflow/steps/helpers.ts @@ -12,9 +12,18 @@ export function matchStep( return match(stepOrAgent) .with({ type: "agent" }, (agentStep) => agentStep) .with({ type: "func" }, (funcStep) => funcStep) + .with({ type: "tap" }, (tapStep) => tapStep) + .with({ type: "workflow" }, (workflowStep) => workflowStep) + .with({ type: "guardrail" }, (guardrailStep) => guardrailStep) .with({ type: "conditional-when" }, (condStep) => condStep) .with({ type: "parallel-all" }, (allStep) => allStep) .with({ type: "parallel-race" }, (raceStep) => raceStep) + .with({ type: "sleep" }, (sleepStep) => sleepStep) + .with({ type: "sleep-until" }, (sleepUntilStep) => sleepUntilStep) + .with({ type: "foreach" }, (forEachStep) => forEachStep) + .with({ type: "loop" }, (loopStep) => loopStep) + .with({ type: "branch" }, (branchStep) => branchStep) + .with({ type: "map" }, (mapStep) => mapStep) .otherwise(() => { throw new Error("Invalid step or agent"); }); diff --git a/packages/core/src/workflow/steps/index.ts b/packages/core/src/workflow/steps/index.ts index 56f913fce..4c0ed51f5 100644 --- a/packages/core/src/workflow/steps/index.ts +++ b/packages/core/src/workflow/steps/index.ts @@ -4,6 +4,13 @@ export { andWhen } from "./and-when"; export { andAll } from "./and-all"; export { andRace } from "./and-race"; export { andTap } from "./and-tap"; +export { andGuardrail } from "./and-guardrail"; +export { andSleep } from "./and-sleep"; +export { andSleepUntil } from "./and-sleep-until"; +export { andForEach } from "./and-foreach"; +export { andBranch } from "./and-branch"; +export { andDoWhile, andDoUntil } from "./and-loop"; +export { andMap } from "./and-map"; export { andWorkflow } from "./and-workflow"; export { matchStep } from "./helpers"; export type { @@ -16,5 +23,15 @@ export type { WorkflowStepParallelRace, WorkflowStepAgent, WorkflowStepFunc, + WorkflowStepGuardrail, + WorkflowStepGuardrailConfig, WorkflowStepTapConfig, + WorkflowStepSleepConfig, + WorkflowStepSleepUntilConfig, + WorkflowStepForEachConfig, + WorkflowStepLoopConfig, + WorkflowStepBranchConfig, + WorkflowStepMapConfig, + WorkflowStepMapEntry, + WorkflowStepMapResult, } from "./types"; diff --git a/packages/core/src/workflow/steps/signal.ts b/packages/core/src/workflow/steps/signal.ts new file mode 100644 index 000000000..433e86185 --- /dev/null +++ b/packages/core/src/workflow/steps/signal.ts @@ -0,0 +1,56 @@ +const getAbortError = (signal?: AbortSignal): Error | undefined => { + if (!signal?.aborted) { + return undefined; + } + + const reason = (signal as AbortSignal & { reason?: unknown }).reason; + const typedReason = + typeof reason === "object" && reason !== null && "type" in reason + ? (reason as { type?: string }).type + : reason; + + if (typedReason === "cancelled") { + return new Error("WORKFLOW_CANCELLED"); + } + + return new Error("WORKFLOW_SUSPENDED"); +}; + +export const throwIfAborted = (signal?: AbortSignal) => { + const error = getAbortError(signal); + if (error) { + throw error; + } +}; + +export const waitWithSignal = (ms: number, signal?: AbortSignal) => { + const durationMs = Number.isFinite(ms) ? Math.max(0, ms) : 0; + + if (!signal) { + return new Promise((resolve) => { + setTimeout(resolve, durationMs); + }); + } + + return new Promise((resolve, reject) => { + const abortError = () => getAbortError(signal) || new Error("WORKFLOW_SUSPENDED"); + + if (signal.aborted) { + reject(abortError()); + return; + } + + const timer = setTimeout(() => { + signal.removeEventListener("abort", onAbort); + resolve(); + }, durationMs); + + const onAbort = () => { + clearTimeout(timer); + signal.removeEventListener("abort", onAbort); + reject(abortError()); + }; + + signal.addEventListener("abort", onAbort, { once: true }); + }); +}; diff --git a/packages/core/src/workflow/steps/types.ts b/packages/core/src/workflow/steps/types.ts index c1303f73f..4ef4437f7 100644 --- a/packages/core/src/workflow/steps/types.ts +++ b/packages/core/src/workflow/steps/types.ts @@ -1,6 +1,7 @@ import type { DangerouslyAllowAny } from "@voltagent/internal/types"; import type { z } from "zod"; import type { Agent } from "../../agent/agent"; +import type { InputGuardrail, OutputGuardrail } from "../../agent/types"; import type { InternalAnyWorkflowStep, InternalBaseWorkflowStep, @@ -14,9 +15,18 @@ import type { Workflow, WorkflowRunOptions } from "../types"; export type WorkflowStepType = | "agent" | "func" + | "tap" + | "workflow" + | "guardrail" | "conditional-when" | "parallel-all" - | "parallel-race"; + | "parallel-race" + | "sleep" + | "sleep-until" + | "foreach" + | "loop" + | "branch" + | "map"; export interface WorkflowStepAgent extends InternalBaseWorkflowStep { @@ -49,6 +59,18 @@ export interface WorkflowStepWorkflow; } +export type WorkflowStepGuardrailConfig<_INPUT, DATA> = InternalWorkflowStepConfig<{ + inputGuardrails?: InputGuardrail[]; + outputGuardrails?: OutputGuardrail[]; +}>; + +export interface WorkflowStepGuardrail + extends InternalBaseWorkflowStep { + type: "guardrail"; + inputGuardrails?: InputGuardrail[]; + outputGuardrails?: OutputGuardrail[]; +} + export type WorkflowStepTapConfig< INPUT, DATA, @@ -117,6 +139,104 @@ export interface WorkflowStepParallelAll | WorkflowStepParallelDynamicStepsFunc; } +export type WorkflowStepSleepConfig = InternalWorkflowStepConfig<{ + duration: number | InternalWorkflowFunc; +}>; + +export interface WorkflowStepSleep + extends InternalBaseWorkflowStep { + type: "sleep"; + duration: number | InternalWorkflowFunc; +} + +export type WorkflowStepSleepUntilConfig = InternalWorkflowStepConfig<{ + date: Date | InternalWorkflowFunc; +}>; + +export interface WorkflowStepSleepUntil + extends InternalBaseWorkflowStep { + type: "sleep-until"; + date: Date | InternalWorkflowFunc; +} + +export type WorkflowStepForEachConfig = InternalWorkflowStepConfig<{ + step: InternalAnyWorkflowStep; + concurrency?: number; +}>; + +export interface WorkflowStepForEach + extends InternalBaseWorkflowStep { + type: "foreach"; + step: InternalAnyWorkflowStep; + concurrency?: number; +} + +export type WorkflowStepLoopConfig = InternalWorkflowStepConfig<{ + step: InternalAnyWorkflowStep; + condition: InternalWorkflowFunc; +}>; + +export interface WorkflowStepLoop + extends InternalBaseWorkflowStep { + type: "loop"; + loopType: "dowhile" | "dountil"; + step: InternalAnyWorkflowStep; + condition: InternalWorkflowFunc; +} + +export type WorkflowStepBranchConfig = InternalWorkflowStepConfig<{ + branches: ReadonlyArray<{ + condition: InternalWorkflowFunc; + step: InternalAnyWorkflowStep; + }>; +}>; + +export interface WorkflowStepBranch + extends InternalBaseWorkflowStep, any, any> { + type: "branch"; + branches: ReadonlyArray<{ + condition: InternalWorkflowFunc; + step: InternalAnyWorkflowStep; + }>; +} + +export type WorkflowStepMapEntry = + | { source: "value"; value: unknown } + | { source: "data"; path?: string } + | { source: "input"; path?: string } + | { source: "step"; stepId: string; path?: string } + | { source: "context"; key: string; path?: string } + | { source: "fn"; fn: InternalWorkflowFunc }; + +type WorkflowStepMapEntryResult = ENTRY extends { source: "value"; value: infer VALUE } + ? VALUE + : ENTRY extends { source: "fn"; fn: (...args: any[]) => Promise } + ? RESULT + : unknown; + +export type WorkflowStepMapResult>> = { + [KEY in keyof MAP]: WorkflowStepMapEntryResult; +}; + +export type WorkflowStepMapConfig< + INPUT, + DATA, + MAP extends Record>, +> = InternalWorkflowStepConfig<{ + map: MAP; + inputSchema?: z.ZodTypeAny; + outputSchema?: z.ZodTypeAny; +}>; + +export interface WorkflowStepMap< + INPUT, + DATA, + MAP extends Record>, +> extends InternalBaseWorkflowStep, any, any> { + type: "map"; + map: MAP; +} + export type WorkflowStepParallelDynamicStepsFunc = ( context: WorkflowExecuteContext, ) => Promise>; @@ -129,10 +249,17 @@ export type WorkflowStep = | WorkflowStepAgent | WorkflowStepFunc | WorkflowStepConditionalWhen + | WorkflowStepGuardrail | WorkflowStepParallelAll | WorkflowStepTap | WorkflowStepParallelRace - | WorkflowStepWorkflow; + | WorkflowStepWorkflow + | WorkflowStepSleep + | WorkflowStepSleepUntil + | WorkflowStepForEach + | WorkflowStepLoop + | WorkflowStepBranch + | WorkflowStepMap>>; /** * Internal type to allow overriding the run method for the workflow @@ -140,7 +267,7 @@ export type WorkflowStep = export interface InternalWorkflow<_INPUT, DATA, RESULT> extends Omit, "run"> { run: ( - input: InternalExtractWorkflowInputData, + input: DATA, options?: InternalWorkflowRunOptions, ) => Promise<{ executionId: string; diff --git a/packages/core/src/workflow/types.ts b/packages/core/src/workflow/types.ts index f7eed2823..ea4ae069a 100644 --- a/packages/core/src/workflow/types.ts +++ b/packages/core/src/workflow/types.ts @@ -3,9 +3,10 @@ import type { DangerouslyAllowAny } from "@voltagent/internal/types"; import type { StreamTextResult, UIMessage } from "ai"; import type * as TF from "type-fest"; import type { z } from "zod"; +import type { Agent } from "../agent/agent"; import type { BaseMessage } from "../agent/providers"; import type { UsageInfo } from "../agent/providers"; -import type { UserContext } from "../agent/types"; +import type { InputGuardrail, OutputGuardrail, UserContext } from "../agent/types"; import type { Memory } from "../memory"; import type { VoltAgentObservability } from "../observability"; import type { WorkflowExecutionContext } from "./context"; @@ -199,6 +200,19 @@ export interface WorkflowStreamResult< abort: () => void; } +export interface WorkflowRetryConfig { + /** + * Number of retry attempts for a step when it throws an error + * @default 0 + */ + attempts?: number; + /** + * Delay in milliseconds between retry attempts + * @default 0 + */ + delayMs?: number; +} + export interface WorkflowRunOptions { /** * The active step, this can be used to track the current step in a workflow @@ -247,6 +261,22 @@ export interface WorkflowRunOptions { * If not provided, will use the workflow's logger or global logger */ logger?: Logger; + /** + * Override retry settings for this workflow execution + */ + retryConfig?: WorkflowRetryConfig; + /** + * Input guardrails to run before workflow execution + */ + inputGuardrails?: InputGuardrail[]; + /** + * Output guardrails to run after workflow execution + */ + outputGuardrails?: OutputGuardrail[]; + /** + * Optional agent instance to supply to workflow guardrails + */ + guardrailAgent?: Agent; } export interface WorkflowResumeOptions { @@ -280,6 +310,54 @@ export interface WorkflowResumeOptions { * @param DATA - The type of the data * @param RESULT - The type of the result */ +export type WorkflowHookStatus = "completed" | "suspended" | "cancelled" | "error"; + +export type WorkflowStepStatus = + | "running" + | "success" + | "error" + | "suspended" + | "cancelled" + | "skipped"; + +export type WorkflowStepData = { + input: DangerouslyAllowAny; + output?: DangerouslyAllowAny; + status: WorkflowStepStatus; + error?: Error | null; +}; + +export type WorkflowHookContext = { + /** + * Terminal status for the workflow execution + */ + status: WorkflowHookStatus; + /** + * The current workflow state + */ + state: WorkflowState; + /** + * Result of the workflow execution, if available + */ + result: RESULT | null; + /** + * Error from the workflow execution, if any + */ + error: unknown | null; + /** + * Suspension metadata when status is suspended + */ + suspension?: WorkflowSuspensionMetadata; + /** + * Cancellation metadata when status is cancelled + */ + cancellation?: WorkflowCancellationMetadata; + /** + * Step input/output snapshots keyed by step ID + */ + steps: Record; +}; + export type WorkflowHooks = { /** * Called when the workflow starts @@ -300,11 +378,33 @@ export type WorkflowHooks = { */ onStepEnd?: (state: WorkflowState) => Promise; /** - * Called when the workflow ends + * Called when the workflow is suspended + * @param context - The terminal hook context + * @returns void + */ + onSuspend?: (context: WorkflowHookContext) => Promise; + /** + * Called when the workflow ends with an error + * @param context - The terminal hook context + * @returns void + */ + onError?: (context: WorkflowHookContext) => Promise; + /** + * Called when the workflow reaches a terminal state + * @param context - The terminal hook context + * @returns void + */ + onFinish?: (context: WorkflowHookContext) => Promise; + /** + * Called when the workflow ends (completed, cancelled, or error) * @param state - The current state of the workflow + * @param context - The terminal hook context * @returns void */ - onEnd?: (state: WorkflowState) => Promise; + onEnd?: ( + state: WorkflowState, + context?: WorkflowHookContext, + ) => Promise; }; export type WorkflowInput = @@ -371,6 +471,22 @@ export type WorkflowConfig< * If not provided, will use global observability or create a default one */ observability?: VoltAgentObservability; + /** + * Input guardrails to run before workflow execution + */ + inputGuardrails?: InputGuardrail[]; + /** + * Output guardrails to run after workflow execution + */ + outputGuardrails?: OutputGuardrail[]; + /** + * Optional agent instance to supply to workflow guardrails + */ + guardrailAgent?: Agent; + /** + * Default retry configuration for steps in this workflow + */ + retryConfig?: WorkflowRetryConfig; }; /** @@ -423,6 +539,22 @@ export type Workflow< * Observability instance for OpenTelemetry integration */ observability?: VoltAgentObservability; + /** + * Input guardrails configured for this workflow + */ + inputGuardrails?: InputGuardrail[]; + /** + * Output guardrails configured for this workflow + */ + outputGuardrails?: OutputGuardrail[]; + /** + * Optional agent instance supplied to workflow guardrails + */ + guardrailAgent?: Agent; + /** + * Default retry configuration for steps in this workflow + */ + retryConfig?: WorkflowRetryConfig; /** * Get the full state of the workflow including all steps * @returns The serialized workflow state @@ -437,6 +569,11 @@ export type Workflow< resultSchema?: DangerouslyAllowAny; suspendSchema?: DangerouslyAllowAny; resumeSchema?: DangerouslyAllowAny; + retryConfig?: WorkflowRetryConfig; + guardrails?: { + inputCount: number; + outputCount: number; + }; }; /** * Execute the workflow with the given input @@ -487,7 +624,21 @@ export interface BaseWorkflowHistoryEntry { */ export interface BaseWorkflowStepHistoryEntry { stepIndex: number; - stepType: "agent" | "func" | "conditional-when" | "parallel-all" | "parallel-race"; + stepType: + | "agent" + | "func" + | "conditional-when" + | "parallel-all" + | "parallel-race" + | "tap" + | "workflow" + | "guardrail" + | "sleep" + | "sleep-until" + | "foreach" + | "loop" + | "branch" + | "map"; stepName: string; status: "pending" | "running" | "completed" | "error" | "skipped"; // includes all possible statuses startTime?: Date; // optional since pending steps might not have started @@ -603,7 +754,7 @@ export interface WorkflowStreamEvent { /** * Current status of the step/event */ - status: "pending" | "running" | "success" | "error" | "suspended" | "cancelled"; + status: "pending" | "running" | "success" | "skipped" | "error" | "suspended" | "cancelled"; /** * User context passed through the workflow */ @@ -626,7 +777,14 @@ export interface WorkflowStreamEvent { | "parallel-all" | "parallel-race" | "tap" - | "workflow"; + | "workflow" + | "guardrail" + | "sleep" + | "sleep-until" + | "foreach" + | "loop" + | "branch" + | "map"; /** * Additional metadata */ diff --git a/website/docs/workflows/execute-api.md b/website/docs/workflows/execute-api.md index eadd878a8..c3e62c568 100644 --- a/website/docs/workflows/execute-api.md +++ b/website/docs/workflows/execute-api.md @@ -9,7 +9,7 @@ Every workflow step has an `execute` function that receives a context object: ```typescript .andThen({ id: "my-step", - execute: async ({ data, state, getStepData, suspend, resumeData }) => { + execute: async ({ data, state, getStepData, suspend, resumeData, retryCount }) => { // Your logic here return { result: "processed" }; } @@ -147,6 +147,36 @@ When a suspended workflow resumes, this contains the resume data: }) ``` +### 6. `retryCount` - Retry Attempt Number + +If a step is configured with `retries`, this value tells you which attempt is running: + +```typescript +.andThen({ + id: "fetch-user", + retries: 2, + execute: async ({ data, retryCount }) => { + console.log("Attempt:", retryCount); // 0, 1, 2 + return await fetchUser(data.userId); + } +}) +``` + +`retryCount` also increments when retries are enabled at the workflow level: + +```typescript +const workflow = createWorkflowChain({ + id: "retry-defaults", + retryConfig: { attempts: 2, delayMs: 250 }, +}).andThen({ + id: "fetch-user", + execute: async ({ data, retryCount }) => { + console.log("Attempt:", retryCount); + return fetchUser(data.userId); + }, +}); +``` + ## Complete Example Here's a real-world example using all context properties: @@ -398,6 +428,7 @@ interface ExecuteContext { getStepData: (stepId: string) => { input: any; output: any } | undefined; suspend: (reason?: string, data?: TSuspendData) => Promise; resumeData?: TResumeData; + retryCount?: number; } ``` diff --git a/website/docs/workflows/hooks.md b/website/docs/workflows/hooks.md index 31b279377..9866c6ad5 100644 --- a/website/docs/workflows/hooks.md +++ b/website/docs/workflows/hooks.md @@ -19,11 +19,11 @@ const workflow = createWorkflowChain({ onStart: async (state) => { console.log(`Processing order ${state.data.orderId}`); }, - onEnd: async (state) => { - if (state.status === "completed") { - console.log(`Order ${state.data.orderId} completed!`); - } else { - console.error(`Order failed: ${state.error}`); + onFinish: async (info) => { + if (info.status === "completed") { + console.log(`Order ${info.state.data.orderId} completed!`); + } else if (info.status === "error") { + console.error(`Order failed: ${info.error}`); } }, }, @@ -43,7 +43,7 @@ await workflow.run({ orderId: "123", amount: 99.99 }); // Order 123 completed! ``` -## The Four Hooks +## Hook Overview ### 1. onStart @@ -54,29 +54,12 @@ onStart: async (state) => { // state.data = initial input // state.executionId = unique run ID await logger.info("Workflow started", { - workflowId: state.workflowId, executionId: state.executionId, }); }; ``` -### 2. onEnd - -Runs once when workflow finishes: - -```typescript -onEnd: async (state) => { - // state.status = "completed" or "error" - // state.result = final data (if completed) - // state.error = error details (if failed) - - if (state.status === "error") { - await alertTeam(`Workflow failed: ${state.error}`); - } -}; -``` - -### 3. onStepStart +### 2. onStepStart Runs before each step: @@ -89,7 +72,7 @@ onStepStart: async (state) => { }; ``` -### 4. onStepEnd +### 3. onStepEnd Runs after each step succeeds: @@ -102,6 +85,58 @@ onStepEnd: async (state) => { }; ``` +### 4. onSuspend + +Runs when the workflow suspends: + +```typescript +onSuspend: async (info) => { + // info.status === "suspended" + // info.suspension?.reason + // info.suspension?.suspendData + await notifyTeam(`Workflow suspended: ${info.suspension?.reason}`); +}; +``` + +### 5. onError + +Runs when the workflow ends with an error: + +```typescript +onError: async (info) => { + // info.status === "error" + // info.error = error details + await alertTeam(`Workflow failed: ${info.error}`); +}; +``` + +### 6. onFinish + +Runs when the workflow reaches a terminal state: + +```typescript +onFinish: async (info) => { + // info.status = "completed" | "cancelled" | "suspended" | "error" + // info.steps["fetch-user"]?.output + await metrics.recordWorkflowEnd(info.status); +}; +``` + +`info.steps` includes `{ input, output, status, error }` snapshots keyed by step ID. + +### 7. onEnd (extended) + +Runs when the workflow ends (completed, cancelled, or error). It receives the state plus a +structured context: + +```typescript +onEnd: async (state, info) => { + if (info?.status === "error") { + await alertTeam(`Workflow failed: ${info.error}`); + } +}; +``` + ## Common Patterns ### Performance Monitoring @@ -123,15 +158,12 @@ const performanceHooks = { ```typescript const errorHooks = { - onEnd: async (state) => { - if (state.status === "error") { - await errorTracker.report({ - workflowId: state.workflowId, - executionId: state.executionId, - error: state.error, - input: state.data, - }); - } + onError: async (info) => { + await errorTracker.report({ + executionId: info.state.executionId, + error: info.error, + input: info.state.data, + }); }, }; ``` @@ -143,17 +175,15 @@ const auditHooks = { onStart: async (state) => { await auditLog.create({ action: "workflow.started", - workflowId: state.workflowId, userId: state.context?.get("userId"), timestamp: new Date(), }); }, - onEnd: async (state) => { + onFinish: async (info) => { await auditLog.create({ - action: "workflow.completed", - workflowId: state.workflowId, - status: state.status, - duration: Date.now() - state.startTime, + action: "workflow.ended", + status: info.status, + duration: Date.now() - info.state.startAt.getTime(), }); }, }; @@ -169,11 +199,9 @@ const debugHooks = { onStepEnd: async (state) => { console.log(`← ${state.stepId}`, state.data); }, - onEnd: async (state) => { - if (state.status === "error") { - console.error("Workflow failed:", state.error); - console.error("Last data:", state.data); - } + onError: async (info) => { + console.error("Workflow failed:", info.error); + console.error("Last data:", info.state.data); }, }; ``` @@ -190,7 +218,8 @@ Here's what happens when you run a workflow: 5. onStepStart (step 2) 6. [Step 2 executes] 7. onStepEnd (step 2) -8. onEnd +8. onFinish +9. onEnd ``` If a step fails: @@ -199,7 +228,19 @@ If a step fails: 1. onStart 2. onStepStart (step 1) 3. [Step 1 fails with error] -4. onEnd (with error status) +4. onError +5. onFinish +6. onEnd +``` + +If a step suspends: + +``` +1. onStart +2. onStepStart (step 1) +3. [Step 1 suspends] +4. onSuspend +5. onFinish ``` Note: `onStepEnd` is skipped for failed steps. @@ -231,21 +272,21 @@ const productionWorkflow = createWorkflowChain({ step: state.stepId, }); }, - onEnd: async (state) => { - if (state.status === "completed") { + onFinish: async (info) => { + if (info.status === "completed") { // Send welcome email await emailService.send({ - to: state.data.email, + to: info.state.data.email, template: "welcome", }); // Track success await analytics.track("onboarding.completed", { - userId: state.data.userId, + userId: info.state.data.userId, }); - } else { + } else if (info.status === "error") { // Alert team about failure - await slack.alert(`Onboarding failed for ${state.data.userId}`); + await slack.alert(`Onboarding failed for ${info.state.data.userId}`); } }, }, diff --git a/website/docs/workflows/overview.md b/website/docs/workflows/overview.md index 3fe65142b..cdc4eedda 100644 --- a/website/docs/workflows/overview.md +++ b/website/docs/workflows/overview.md @@ -404,6 +404,79 @@ This allows the agent to maintain a persistent, contextual conversation with eac // used by the agent's memory to provide context-aware responses. ``` +### Workflow Retry Policies + +Set a workflow-wide default retry policy with `retryConfig`. Steps inherit it unless they define `retries` (use `retries: 0` to opt out). `delayMs` waits between retry attempts. + +```typescript +const workflow = createWorkflowChain({ + id: "ingest", + name: "Ingest", + retryConfig: { attempts: 2, delayMs: 500 }, +}) + .andThen({ + id: "fetch-user", + execute: async ({ data }) => fetchUser(data.userId), + }) + .andThen({ + id: "no-retry-step", + retries: 0, + execute: async ({ data }) => data, + }); + +await workflow.run({ userId: "123" }, { retryConfig: { attempts: 1 } }); +``` + +### Workflow Guardrails + +Guardrails let you validate or sanitize workflow inputs and outputs. Configure them at the workflow level or use `andGuardrail` inside a chain. + +```typescript +import { + createWorkflowChain, + andGuardrail, + createInputGuardrail, + createOutputGuardrail, +} from "@voltagent/core"; + +const trimInput = createInputGuardrail({ + name: "trim-input", + handler: async ({ input }) => ({ + pass: true, + action: "modify", + modifiedInput: typeof input === "string" ? input.trim() : input, + }), +}); + +const redactOutput = createOutputGuardrail({ + name: "redact-output", + handler: async ({ output }) => ({ + pass: true, + action: "modify", + modifiedOutput: output.replace(/[0-9]/g, "*"), + }), +}); + +const workflow = createWorkflowChain({ + id: "guarded", + input: z.string(), + result: z.string(), + inputGuardrails: [trimInput], + outputGuardrails: [redactOutput], +}) + .andGuardrail({ + id: "sanitize", + outputGuardrails: [redactOutput], + }) + .andThen({ + id: "finish", + execute: async ({ data }) => data, + }); +``` + +Input guardrails only accept string or message inputs. For structured data, use output guardrails. +If your guardrails rely on agent APIs or metadata, pass `guardrailAgent` in the workflow config or run options. + ### Workflow History & Observability ![VoltOps Workflow Observability](https://cdn.voltagent.dev/docs/workflow-observability-demo.gif) @@ -516,12 +589,15 @@ The workflow can also suspend during execution and be resumed later. For detaile Workflows provide hooks that allow you to tap into the lifecycle of a workflow run. You can execute custom logic at key points, such as before and after a step or at the beginning and end of the entire workflow. This is useful for logging, metrics, or any other side effects you want to perform. -| Hook | Trigger | -| ------------- | ----------------------------------------------- | -| `onStart` | Before the workflow begins execution. | -| `onEnd` | After the workflow finishes (success or error). | -| `onStepStart` | Before each individual step starts. | -| `onStepEnd` | After each individual step completes. | +| Hook | Trigger | +| ------------- | --------------------------------------------------------------------------------- | +| `onStart` | Before the workflow begins execution. | +| `onStepStart` | Before each individual step starts. | +| `onStepEnd` | After each individual step completes successfully. | +| `onSuspend` | When the workflow suspends. | +| `onError` | When the workflow ends with an error. | +| `onFinish` | When the workflow reaches a terminal state (completed/cancelled/suspended/error). | +| `onEnd` | After the workflow ends (completed, cancelled, or error). | For more details, see the [Workflow Hooks](./hooks.md) documentation. diff --git a/website/docs/workflows/steps/and-all.md b/website/docs/workflows/steps/and-all.md index 62cc585d4..9c9fac9ee 100644 --- a/website/docs/workflows/steps/and-all.md +++ b/website/docs/workflows/steps/and-all.md @@ -62,11 +62,43 @@ Think of it like ordering from multiple restaurants at once - you wait for all d .andAll({ id: string, steps: Array, + retries?: number, name?: string, // Optional purpose?: string // Optional }) ``` +### Retries + +```typescript +.andAll({ + id: "parallel-with-retries", + retries: 2, + steps: [ + andThen({ + id: "unstable-call", + execute: async ({ retryCount }) => { + // retryCount starts at 0 and increments on each retry attempt + if (retryCount < 2) { + throw new Error(`Transient failure (attempt ${retryCount})`); + } + return { ok: true }; + }, + }), + andThen({ + id: "stable-call", + execute: async () => ({ ready: true }), + }), + ], +}) +``` + +- `retryCount` starts at 0 and increments on each retry attempt after a thrown error (0 -> 1 -> 2). +- Retries only apply to thrown errors; suspend or cancel does not trigger retries. +- `retries` overrides workflow-wide `retryConfig.attempts` for this step. +- If `retries` is omitted, the workflow `retryConfig.attempts` value is used. +- When `retries` is set on `andAll`, all parallel steps rerun together and see the same `retryCount`. + ## Common Patterns ### Parallel API Calls diff --git a/website/docs/workflows/steps/and-branch.md b/website/docs/workflows/steps/and-branch.md new file mode 100644 index 000000000..eeacbaeec --- /dev/null +++ b/website/docs/workflows/steps/and-branch.md @@ -0,0 +1,60 @@ +# andBranch + +> Run multiple conditional branches. All branches whose condition is true will execute. + +## Quick Start + +```typescript +import { createWorkflowChain, andBranch, andThen } from "@voltagent/core"; +import { z } from "zod"; + +const workflow = createWorkflowChain({ + id: "branching-flow", + input: z.object({ amount: z.number() }), +}).andBranch({ + id: "rules", + branches: [ + { + condition: ({ data }) => data.amount > 1000, + step: andThen({ + id: "flag-large", + execute: async ({ data }) => ({ ...data, large: true }), + }), + }, + { + condition: ({ data }) => data.amount > 500, + step: andThen({ + id: "flag-review", + execute: async ({ data }) => ({ ...data, review: true }), + }), + }, + { + condition: ({ data }) => data.amount < 0, + step: andThen({ + id: "flag-invalid", + execute: async ({ data }) => ({ ...data, invalid: true }), + }), + }, + ], +}); +``` + +## Function Signature + +```typescript +.andBranch({ + id: string, + branches: Array<{ + condition: (ctx) => boolean | Promise, + step: Step + }>, + retries?: number, + name?: string, + purpose?: string +}) +``` + +## Notes + +- Results are returned as an array aligned to the `branches` order. +- Branches that do not run return `undefined`. diff --git a/website/docs/workflows/steps/and-foreach.md b/website/docs/workflows/steps/and-foreach.md new file mode 100644 index 000000000..27f9210f6 --- /dev/null +++ b/website/docs/workflows/steps/and-foreach.md @@ -0,0 +1,40 @@ +# andForEach + +> Run a step for each item in an array and return all results. + +## Quick Start + +```typescript +import { createWorkflowChain, andForEach, andThen } from "@voltagent/core"; +import { z } from "zod"; + +const workflow = createWorkflowChain({ + id: "batch-process", + input: z.array(z.number()), +}).andForEach({ + id: "double-each", + step: andThen({ + id: "double", + execute: async ({ data }) => data * 2, + }), +}); +``` + +## Function Signature + +```typescript +.andForEach({ + id: string, + step: Step, + concurrency?: number, + retries?: number, + name?: string, + purpose?: string +}) +``` + +## Notes + +- The current workflow data must be an array. +- Results preserve the original order. +- Use `concurrency` to control parallelism. diff --git a/website/docs/workflows/steps/and-guardrail.md b/website/docs/workflows/steps/and-guardrail.md new file mode 100644 index 000000000..04cefcb08 --- /dev/null +++ b/website/docs/workflows/steps/and-guardrail.md @@ -0,0 +1,112 @@ +# andGuardrail + +> Apply guardrails to the current workflow data to validate or sanitize it. + +## Quick Start + +```typescript +import { createWorkflowChain, andGuardrail, createOutputGuardrail } from "@voltagent/core"; +import { z } from "zod"; + +const redactNumbers = createOutputGuardrail({ + name: "redact-numbers", + handler: async ({ output }) => ({ + pass: true, + action: "modify", + modifiedOutput: output.replace(/\d/g, "*"), + }), +}); + +const workflow = createWorkflowChain({ + id: "guarded-flow", + input: z.string(), + result: z.string(), +}) + .andGuardrail({ + id: "sanitize-input", + outputGuardrails: [redactNumbers], + }) + .andThen({ + id: "finish", + execute: async ({ data }) => data, + }); + +const result = await workflow.run("Order #123"); +// Result: "Order ###" +``` + +## How It Works + +- `inputGuardrails` run first and only accept string or message inputs. +- `outputGuardrails` run after and can validate/modify structured data. +- If a guardrail blocks, the workflow throws an error (like agent guardrails). +- If your guardrails need agent APIs or metadata, pass `guardrailAgent` on the workflow config or run options. + +```typescript +.andGuardrail({ + id: "check-text", + inputGuardrails: [myInputGuardrail], + outputGuardrails: [myOutputGuardrail], +}) +``` + +## Input Guardrails (string/messages only) + +```typescript +import { createInputGuardrail } from "@voltagent/core"; + +const trimGuardrail = createInputGuardrail({ + name: "trim-input", + handler: async ({ input }) => ({ + pass: true, + action: "modify", + modifiedInput: typeof input === "string" ? input.trim() : input, + }), +}); + +createWorkflowChain({ + id: "trim-flow", + input: z.string(), + result: z.string(), +}) + .andGuardrail({ + id: "trim", + inputGuardrails: [trimGuardrail], + }) + .andThen({ + id: "echo", + execute: async ({ data }) => data, + }); +``` + +If your data is an object, prefer `outputGuardrails` to inspect or mutate fields. + +## Structured Data Guardrails + +```typescript +const maskEmail = createOutputGuardrail<{ email: string }>({ + name: "mask-email", + handler: async ({ output }) => ({ + pass: true, + action: "modify", + modifiedOutput: { + ...output, + email: output.email.replace(/@.*/, "@***"), + }, + }), +}); + +createWorkflowChain({ + id: "profile-guard", + input: z.object({ email: z.string() }), + result: z.object({ email: z.string() }), +}) + .andGuardrail({ + id: "mask", + outputGuardrails: [maskEmail], + }) + .andThen({ + id: "finish", + execute: async ({ data }) => data, + }); +``` diff --git a/website/docs/workflows/steps/and-loop.md b/website/docs/workflows/steps/and-loop.md new file mode 100644 index 000000000..f6b5d7a60 --- /dev/null +++ b/website/docs/workflows/steps/and-loop.md @@ -0,0 +1,70 @@ +# andLoop + +> Repeat a step with do-while or do-until semantics. + +## Quick Start + +### Do-While + +```typescript +import { createWorkflowChain, andDoWhile, andThen } from "@voltagent/core"; +import { z } from "zod"; + +const workflow = createWorkflowChain({ + id: "retry-loop", + input: z.number(), +}).andDoWhile({ + id: "increment-until-3", + step: andThen({ + id: "increment", + execute: async ({ data }) => data + 1, + }), + condition: ({ data }) => data < 3, +}); +``` + +### Do-Until + +```typescript +import { createWorkflowChain, andDoUntil, andThen } from "@voltagent/core"; +import { z } from "zod"; + +const workflow = createWorkflowChain({ + id: "until-loop", + input: z.number(), +}).andDoUntil({ + id: "increment-until-2", + step: andThen({ + id: "increment", + execute: async ({ data }) => data + 1, + }), + condition: ({ data }) => data >= 2, +}); +``` + +## Function Signature + +```typescript +.andDoWhile({ + id: string, + step: Step, + condition: (ctx) => boolean | Promise, + retries?: number, + name?: string, + purpose?: string +}) + +.andDoUntil({ + id: string, + step: Step, + condition: (ctx) => boolean | Promise, + retries?: number, + name?: string, + purpose?: string +}) +``` + +## Notes + +- The step runs at least once. +- The loop continues until the condition fails (do-while) or succeeds (do-until). diff --git a/website/docs/workflows/steps/and-map.md b/website/docs/workflows/steps/and-map.md new file mode 100644 index 000000000..0700d3f67 --- /dev/null +++ b/website/docs/workflows/steps/and-map.md @@ -0,0 +1,54 @@ +# andMap + +> Compose a new object from workflow data, input, context, or step outputs. + +## Quick Start + +```typescript +import { createWorkflowChain, andMap, andThen } from "@voltagent/core"; +import { z } from "zod"; + +const workflow = createWorkflowChain({ + id: "compose-result", + input: z.object({ userId: z.string() }), +}) + .andThen({ + id: "fetch-user", + execute: async ({ data }) => ({ name: "Ada", id: data.userId }), + }) + .andMap({ + id: "shape-output", + map: { + userId: { source: "data", path: "userId" }, + name: { source: "step", stepId: "fetch-user", path: "name" }, + region: { source: "context", key: "region" }, + constant: { source: "value", value: "ok" }, + }, + }); +``` + +## Function Signature + +```typescript +.andMap({ + id: string, + map: { + [key: string]: + | { source: "value", value: any } + | { source: "data", path?: string } + | { source: "input", path?: string } + | { source: "context", key: string, path?: string } + | { source: "step", stepId: string, path?: string } + | { source: "fn", fn: (ctx) => any } + }, + retries?: number, + name?: string, + purpose?: string +}) +``` + +## Notes + +- `data` refers to the current step input. +- `input` refers to the original workflow input. +- `context` reads from `WorkflowRunOptions.context`. diff --git a/website/docs/workflows/steps/and-race.md b/website/docs/workflows/steps/and-race.md index 4adeded83..037825a98 100644 --- a/website/docs/workflows/steps/and-race.md +++ b/website/docs/workflows/steps/and-race.md @@ -13,35 +13,38 @@ import { z } from "zod"; const workflow = createWorkflowChain({ id: "get-user-data", input: z.object({ userId: z.string() }), -}).andRace([ - // Fast: Check cache (100ms) - andThen({ - id: "check-cache", - execute: async ({ data }) => { - const cached = await checkCache(data.userId); - if (cached) return { data: cached, source: "cache" }; - throw new Error("Not in cache"); - }, - }), - - // Medium: Database (300ms) - andThen({ - id: "check-database", - execute: async ({ data }) => { - const user = await database.getUser(data.userId); - return { data: user, source: "database" }; - }, - }), - - // Slow: External API (1000ms) - andThen({ - id: "fetch-from-api", - execute: async ({ data }) => { - const response = await fetch(`/api/users/${data.userId}`); - return { data: await response.json(), source: "api" }; - }, - }), -]); +}).andRace({ + id: "race-user-data", + steps: [ + // Fast: Check cache (100ms) + andThen({ + id: "check-cache", + execute: async ({ data }) => { + const cached = await checkCache(data.userId); + if (cached) return { data: cached, source: "cache" }; + throw new Error("Not in cache"); + }, + }), + + // Medium: Database (300ms) + andThen({ + id: "check-database", + execute: async ({ data }) => { + const user = await database.getUser(data.userId); + return { data: user, source: "database" }; + }, + }), + + // Slow: External API (1000ms) + andThen({ + id: "fetch-from-api", + execute: async ({ data }) => { + const response = await fetch(`/api/users/${data.userId}`); + return { data: await response.json(), source: "api" }; + }, + }), + ], +}); const result = await workflow.run({ userId: "123" }); // If cache has data: returns in ~100ms from cache @@ -62,7 +65,13 @@ Think of it like a race - whoever crosses the finish line first wins, regardless ## Function Signature ```typescript -.andRace([step1, step2, step3]) // Array of steps to race +.andRace({ + id: string, + steps: Array, + retries?: number, + name?: string, + purpose?: string +}) ``` ## Common Patterns @@ -72,24 +81,27 @@ Think of it like a race - whoever crosses the finish line first wins, regardless Add a timeout to any operation: ```typescript -.andRace([ - // Main operation - andThen({ - id: "slow-api", - execute: async ({ data }) => { - const result = await slowAPICall(data); - return { result, timedOut: false }; - } - }), - // Timeout after 5 seconds - andThen({ - id: "timeout", - execute: async () => { - await new Promise(resolve => setTimeout(resolve, 5000)); - return { result: "Timeout", timedOut: true }; - } - }) -]) +.andRace({ + id: "timeout-race", + steps: [ + // Main operation + andThen({ + id: "slow-api", + execute: async ({ data }) => { + const result = await slowAPICall(data); + return { result, timedOut: false }; + } + }), + // Timeout after 5 seconds + andThen({ + id: "timeout", + execute: async () => { + await new Promise(resolve => setTimeout(resolve, 5000)); + return { result: "Timeout", timedOut: true }; + } + }) + ] +}) ``` ### Multiple AI Providers @@ -97,23 +109,26 @@ Add a timeout to any operation: Get response from fastest AI: ```typescript -.andRace([ - andAgent( - ({ data }) => data.prompt, - openaiAgent, - { schema: z.object({ response: z.string(), ai: z.literal("openai") }) } - ), - andAgent( - ({ data }) => data.prompt, - claudeAgent, - { schema: z.object({ response: z.string(), ai: z.literal("claude") }) } - ), - andAgent( - ({ data }) => data.prompt, - geminiAgent, - { schema: z.object({ response: z.string(), ai: z.literal("gemini") }) } - ) -]) +.andRace({ + id: "ai-provider-race", + steps: [ + andAgent( + ({ data }) => data.prompt, + openaiAgent, + { schema: z.object({ response: z.string(), ai: z.literal("openai") }) } + ), + andAgent( + ({ data }) => data.prompt, + claudeAgent, + { schema: z.object({ response: z.string(), ai: z.literal("claude") }) } + ), + andAgent( + ({ data }) => data.prompt, + geminiAgent, + { schema: z.object({ response: z.string(), ai: z.literal("gemini") }) } + ) + ] +}) ``` ### Cache vs Database @@ -121,26 +136,29 @@ Get response from fastest AI: Try cache first, fall back to database: ```typescript -.andRace([ - // Try cache (fast) - andThen({ - id: "cache-lookup", - execute: async ({ data }) => { - const cached = await cache.get(data.key); - if (!cached) throw new Error("Cache miss"); - return { value: cached, from: "cache" }; - } - }), - // Fall back to database (slower) - andThen({ - id: "db-lookup", - execute: async ({ data }) => { - const value = await db.find(data.key); - await cache.set(data.key, value); // Update cache - return { value, from: "database" }; - } - }) -]) +.andRace({ + id: "cache-db-race", + steps: [ + // Try cache (fast) + andThen({ + id: "cache-lookup", + execute: async ({ data }) => { + const cached = await cache.get(data.key); + if (!cached) throw new Error("Cache miss"); + return { value: cached, from: "cache" }; + } + }), + // Fall back to database (slower) + andThen({ + id: "db-lookup", + execute: async ({ data }) => { + const value = await db.find(data.key); + await cache.set(data.key, value); // Update cache + return { value, from: "database" }; + } + }) + ] +}) ``` ## Error Handling @@ -148,24 +166,27 @@ Try cache first, fall back to database: If the fastest step fails, the race continues: ```typescript -.andRace([ - andThen({ - id: "unreliable-fast", - execute: async () => { - if (Math.random() > 0.5) { - throw new Error("Failed!"); +.andRace({ + id: "error-race", + steps: [ + andThen({ + id: "unreliable-fast", + execute: async () => { + if (Math.random() > 0.5) { + throw new Error("Failed!"); + } + return { result: "fast" }; } - return { result: "fast" }; - } - }), - andThen({ - id: "reliable-slow", - execute: async () => { - await sleep(1000); - return { result: "slow but reliable" }; - } - }) -]) + }), + andThen({ + id: "reliable-slow", + execute: async () => { + await sleep(1000); + return { result: "slow but reliable" }; + } + }) + ] +}) // If fast fails, you get slow result // If fast succeeds, you get fast result ``` @@ -177,11 +198,14 @@ If the fastest step fails, the race continues: .andThen({ execute: async () => await slowAPI() }) // With race: Usually fast (50ms) -.andRace([ - andThen({ execute: async () => await cache() }), // 50ms - andThen({ execute: async () => await database() }), // 500ms - andThen({ execute: async () => await slowAPI() }) // 2000ms -]) +.andRace({ + id: "perf-race", + steps: [ + andThen({ execute: async () => await cache() }), // 50ms + andThen({ execute: async () => await database() }), // 500ms + andThen({ execute: async () => await slowAPI() }) // 2000ms + ] +}) ``` ## Best Practices @@ -190,17 +214,20 @@ If the fastest step fails, the race continues: ```typescript // Good: Fastest first -.andRace([ - cacheStep, // 10ms - databaseStep, // 100ms - apiStep // 1000ms -]) +.andRace({ + id: "fastest-first", + steps: [ + cacheStep, // 10ms + databaseStep, // 100ms + apiStep // 1000ms + ] +}) ``` ### 2. Handle Different Results ```typescript -.andRace([...steps]) +.andRace({ id: "race-results", steps: [...steps] }) .andThen({ execute: async ({ data }) => { // Check which source won @@ -216,11 +243,14 @@ If the fastest step fails, the race continues: ```typescript // Multiple APIs for reliability -.andRace([ - primaryAPI, - backupAPI, - fallbackAPI -]) +.andRace({ + id: "redundancy-race", + steps: [ + primaryAPI, + backupAPI, + fallbackAPI + ] +}) ``` ## Comparison with andAll diff --git a/website/docs/workflows/steps/and-sleep-until.md b/website/docs/workflows/steps/and-sleep-until.md new file mode 100644 index 000000000..2531d6c67 --- /dev/null +++ b/website/docs/workflows/steps/and-sleep-until.md @@ -0,0 +1,40 @@ +# andSleepUntil + +> Pause the workflow until a specific Date. + +## Quick Start + +```typescript +import { createWorkflowChain, andSleepUntil, andThen } from "@voltagent/core"; +import { z } from "zod"; + +const workflow = createWorkflowChain({ + id: "scheduled-step", + input: z.object({ id: z.string() }), +}) + .andSleepUntil({ + id: "wait-until", + date: new Date(Date.now() + 60_000), + }) + .andThen({ + id: "continue", + execute: async ({ data }) => ({ ...data, resumed: true }), + }); +``` + +## Function Signature + +```typescript +.andSleepUntil({ + id: string, + date: Date | ((ctx) => Date | Promise), + retries?: number, + name?: string, + purpose?: string +}) +``` + +## Notes + +- The input data is returned unchanged. +- If the date is in the past, the step continues immediately. diff --git a/website/docs/workflows/steps/and-sleep.md b/website/docs/workflows/steps/and-sleep.md new file mode 100644 index 000000000..d6390724f --- /dev/null +++ b/website/docs/workflows/steps/and-sleep.md @@ -0,0 +1,40 @@ +# andSleep + +> Pause the workflow for a fixed or computed duration (milliseconds). + +## Quick Start + +```typescript +import { createWorkflowChain, andSleep, andThen } from "@voltagent/core"; +import { z } from "zod"; + +const workflow = createWorkflowChain({ + id: "delayed-step", + input: z.object({ id: z.string() }), +}) + .andSleep({ + id: "pause", + duration: 500, + }) + .andThen({ + id: "continue", + execute: async ({ data }) => ({ ...data, resumed: true }), + }); +``` + +## Function Signature + +```typescript +.andSleep({ + id: string, + duration: number | ((ctx) => number | Promise), + retries?: number, + name?: string, + purpose?: string +}) +``` + +## Notes + +- The input data is returned unchanged. +- If a suspend or cancel signal is triggered, the sleep ends early. diff --git a/website/docs/workflows/steps/and-then.md b/website/docs/workflows/steps/and-then.md index e215e779a..2b0b5e48a 100644 --- a/website/docs/workflows/steps/and-then.md +++ b/website/docs/workflows/steps/and-then.md @@ -42,13 +42,34 @@ const result = await workflow.run({ text: "hello" }); ### Available Parameters ```typescript -execute: async ({ data, suspend, resumeData }) => { +execute: async ({ data, suspend, resumeData, retryCount }) => { // data: All accumulated data from previous steps // suspend: Function to pause workflow // resumeData: Data provided when resuming + // retryCount: 0 for the initial attempt, increments by 1 for each retry }; ``` +### Retries + +```typescript +.andThen({ + id: "fetch-user", + retries: 2, + execute: async ({ data, retryCount }) => { + // retryCount starts at 0, then 1/2 for each retry attempt + const user = await fetchUser(data.userId); + return { user }; + } +}) +``` + +Retries only apply to thrown errors; suspend or cancel does not retry. + +`retries: 2` means up to 2 retry attempts (3 total tries including the first). + +You can also set workflow-wide defaults with `retryConfig` on `createWorkflowChain`; `retries` overrides it. + ## Data Flow Example Each step builds on the previous: diff --git a/website/docs/workflows/steps/and-when.md b/website/docs/workflows/steps/and-when.md index 4f7f81eda..99954bacc 100644 --- a/website/docs/workflows/steps/and-when.md +++ b/website/docs/workflows/steps/and-when.md @@ -46,7 +46,8 @@ const workflow = createWorkflowChain({ .andWhen({ id: "step-name", condition: ({ data }) => data.someField > 100, - step: andThen({ execute: async () => {...} }) + step: andThen({ execute: async () => {...} }), + retries?: number }) ``` diff --git a/website/recipes/workflows.md b/website/recipes/workflows.md index 6c1e95b73..fcd279eb2 100644 --- a/website/recipes/workflows.md +++ b/website/recipes/workflows.md @@ -75,12 +75,22 @@ new VoltAgent({ ## Workflow Methods -| Method | Purpose | -| ------------- | ----------------------- | -| `.andThen()` | Execute custom logic | -| `.andAgent()` | Use AI agent for a step | -| `.andTap()` | Side effects (logging) | -| `.andWhen()` | Conditional branching | +| Method | Purpose | +| ------------------ | ------------------------- | +| `.andThen()` | Execute custom logic | +| `.andAgent()` | Use AI agent for a step | +| `.andTap()` | Side effects (logging) | +| `.andWhen()` | Conditional branching | +| `.andBranch()` | Multi-branch conditions | +| `.andAll()` | Parallel execution | +| `.andRace()` | First-result wins | +| `.andForEach()` | Run a step per item | +| `.andDoWhile()` | Loop while condition | +| `.andDoUntil()` | Loop until condition | +| `.andSleep()` | Pause for a duration | +| `.andSleepUntil()` | Pause until a date | +| `.andMap()` | Compose data from sources | +| `.andWorkflow()` | Nested workflow step | ## Human-in-the-Loop (Suspend/Resume) diff --git a/website/sidebars.ts b/website/sidebars.ts index d0c70a95e..16ccd3952 100644 --- a/website/sidebars.ts +++ b/website/sidebars.ts @@ -117,9 +117,16 @@ const sidebars: SidebarsConfig = { "workflows/steps/and-then", "workflows/steps/and-agent", "workflows/steps/and-when", + "workflows/steps/and-branch", "workflows/steps/and-tap", + "workflows/steps/and-guardrail", "workflows/steps/and-all", "workflows/steps/and-race", + "workflows/steps/and-foreach", + "workflows/steps/and-loop", + "workflows/steps/and-sleep", + "workflows/steps/and-sleep-until", + "workflows/steps/and-map", ], }, {