diff --git a/examples/with-live-evals/src/index.ts b/examples/with-live-evals/src/index.ts index ad745dbb1..4549ac439 100644 --- a/examples/with-live-evals/src/index.ts +++ b/examples/with-live-evals/src/index.ts @@ -1,5 +1,15 @@ import { openai } from "@ai-sdk/openai"; -import VoltAgent, { Agent, VoltAgentObservability, buildScorer } from "@voltagent/core"; +import VoltAgent, { + Agent, + VoltAgentObservability, + andAgent, + andThen, + buildScorer, + createWorkflow, + InMemoryStorageAdapter, + Memory, + type WorkflowEvalScorerConfig, +} from "@voltagent/core"; import { createAnswerCorrectnessScorer, createAnswerRelevancyScorer, @@ -148,6 +158,198 @@ Provide a score from 0 to 1 and explain your reasoning.`; }) .build(); +const contextAssemblerScorer = buildScorer({ + id: "context-assembler", + label: "Context Assembly", +}) + .score(({ payload, results }) => { + const rawOutput = typeof payload.output === "string" ? payload.output : ""; + let parsed: { question?: unknown; context?: unknown } = {}; + try { + parsed = rawOutput ? (JSON.parse(rawOutput) as typeof parsed) : {}; + } catch { + parsed = {}; + } + + const question = typeof parsed.question === "string" ? parsed.question.trim() : ""; + const contextList = Array.isArray(parsed.context) ? parsed.context : []; + const sanitizedContext = contextList.filter( + (item): item is string => typeof item === "string" && item.trim().length > 0, + ); + + const hasQuestion = question.length > 0; + const hasContext = sanitizedContext.length > 0; + + const raw = results.raw; + raw.contextAssembler = { + hasQuestion, + contextCount: sanitizedContext.length, + }; + results.raw = raw; + + const score = hasQuestion && hasContext ? 1 : 0; + + return { + score, + metadata: { + hasQuestion, + contextCount: sanitizedContext.length, + }, + }; + }) + .reason(({ results, score }) => { + const snapshot = results.raw.contextAssembler as + | { hasQuestion?: boolean; contextCount?: number } + | undefined; + + if (!snapshot) { + return { + reason: "No context assembly data recorded for this step.", + }; + } + + if (!score || score < 1) { + const missing: string[] = []; + if (!snapshot.hasQuestion) { + missing.push("question"); + } + if (!snapshot.contextCount) { + missing.push("context snippets"); + } + return { + reason: `Missing ${missing.join(" and ")} in step output.`, + }; + } + + return { + reason: `Captured ${snapshot.contextCount ?? 0} context snippet(s) for downstream steps.`, + }; + }) + .build(); + +const createLiveEvalScorers = (): Record => ({ + keyword: { + scorer: keywordMatchScorer, + params: { + keyword: "voltagent", + }, + }, + exactMatch: { + scorer: scorers.exactMatch, + params: { + expected: referenceAnswer, + }, + }, + factuality: { + scorer: factualityScorer, + buildPayload: (context) => ({ + input: context.input, + output: context.output, + expected: referenceAnswer, + }), + }, + answerCorrectness: { + scorer: answerCorrectnessScorer, + buildPayload: () => ({ + expected: referenceAnswer, + }), + }, + answerRelevancy: { + scorer: answerRelevancyScorer, + buildPayload: () => ({ + context: referenceAnswer, + }), + }, + summary: { + scorer: summaryScorer, + buildPayload: () => ({ + input: referenceSummarySource, + expected: referenceSummary, + }), + }, + translation: { + scorer: translationScorer, + buildPayload: () => ({ + input: referenceTranslationSource, + expected: referenceTranslationExpected, + }), + buildParams: () => ({ + language: "Spanish", + }), + }, + humor: { + scorer: humorScorer, + }, + possible: { + scorer: possibleScorer, + }, + contextPrecision: { + scorer: contextPrecisionScorer, + buildPayload: () => ({ + context: referenceContextSnippets, + expected: referenceAnswer, + }), + }, + contextRecall: { + scorer: contextRecallScorer, + buildPayload: () => ({ + expected: referenceAnswer, + context: referenceContextSnippets, + }), + }, + contextRelevancy: { + scorer: contextRelevancyScorer, + buildPayload: () => ({ + context: referenceContextSnippets, + }), + }, + moderation: { + scorer: createModerationScorer({ + model: moderationModel, + threshold: 0.5, + }), + }, + helpfulness: { + scorer: helpfulnessJudgeScorer, + params: { + criteria: "Reward answers that are specific to VoltAgent features and actionable guidance.", + }, + }, + levenshtein: { + scorer: scorers.levenshtein, + params: { + expected: referenceAnswer, + }, + }, + numericDiff: { + scorer: scorers.numericDiff, + params: { + expected: numericBaseline.expected, + output: numericBaseline.output, + }, + }, + jsonDiff: { + scorer: scorers.jsonDiff, + params: { + expected: referenceJson, + output: referenceJson, + }, + }, + listContains: { + scorer: scorers.listContains, + params: { + expected: referenceEntities, + output: [...referenceEntities, "extra-note"], + }, + }, +}); + +const createGatherStepScorers = (): Record => ({ + assembler: { + scorer: contextAssemblerScorer, + }, +}); + const supportAgent = new Agent({ name: "live-scorer-demo", instructions: @@ -155,136 +357,86 @@ const supportAgent = new Agent({ model: openai("gpt-4o-mini"), eval: { sampling: { type: "ratio", rate: 1 }, - scorers: { - keyword: { - scorer: keywordMatchScorer, - params: { - keyword: "voltagent", - }, - }, - exactMatch: { - scorer: scorers.exactMatch, - params: { - expected: referenceAnswer, - }, - }, - factuality: { - scorer: factualityScorer, - buildPayload: (context) => ({ - input: context.input, - output: context.output, - expected: referenceAnswer, - }), - }, - answerCorrectness: { - scorer: answerCorrectnessScorer, - buildPayload: () => ({ - expected: referenceAnswer, - }), - }, - answerRelevancy: { - scorer: answerRelevancyScorer, - buildPayload: () => ({ - context: referenceAnswer, - }), - }, - summary: { - scorer: summaryScorer, - buildPayload: () => ({ - input: referenceSummarySource, - expected: referenceSummary, - }), - }, - translation: { - scorer: translationScorer, - buildPayload: () => ({ - input: referenceTranslationSource, - expected: referenceTranslationExpected, - }), - buildParams: () => ({ - language: "Spanish", - }), - }, - humor: { - scorer: humorScorer, - }, - possible: { - scorer: possibleScorer, - }, - contextPrecision: { - scorer: contextPrecisionScorer, - buildPayload: () => ({ - context: referenceContextSnippets, - expected: referenceAnswer, - }), - }, - contextRecall: { - scorer: contextRecallScorer, - buildPayload: () => ({ - expected: referenceAnswer, - context: referenceContextSnippets, - }), - }, - contextRelevancy: { - scorer: contextRelevancyScorer, - buildPayload: () => ({ - context: referenceContextSnippets, - }), - }, - moderation: { - scorer: createModerationScorer({ - model: moderationModel, - threshold: 0.5, - }), - }, - helpfulness: { - scorer: helpfulnessJudgeScorer, - params: { - criteria: - "Reward answers that are specific to VoltAgent features and actionable guidance.", - }, - }, - levenshtein: { - scorer: scorers.levenshtein, - params: { - expected: referenceAnswer, - }, - }, - numericDiff: { - scorer: scorers.numericDiff, - params: { - expected: numericBaseline.expected, - output: numericBaseline.output, - }, - }, - jsonDiff: { - scorer: scorers.jsonDiff, - params: { - expected: referenceJson, - output: referenceJson, - }, - }, - listContains: { - scorer: scorers.listContains, - params: { - expected: referenceEntities, - output: [...referenceEntities, "extra-note"], - }, - }, - }, + scorers: createLiveEvalScorers(), }, }); +const workflowResponder = new Agent({ + id: "workflow-responder", + name: "workflow-responder", + instructions: + "You are a support assistant that only answers using VoltAgent documentation provided in the workflow context.", + model: openai("gpt-4o-mini"), +}); + +const supportWorkflow = createWorkflow( + { + id: "support-live-evals", + name: "Support Workflow with Live Evals", + purpose: "Answer VoltAgent support questions and score results in real time.", + input: z.object({ question: z.string() }), + result: z.string(), + memory: new Memory({ storage: new InMemoryStorageAdapter() }), + observability, + /* eval: { + sampling: { type: "ratio", rate: 1 }, + scorers: createLiveEvalScorers(), + }, */ + }, + andThen({ + id: "gather-context", + execute: async ({ data }) => ({ + question: data.question, + context: referenceContextSnippets, + }), + eval: { + scorers: createGatherStepScorers(), + }, + }), + andAgent( + async ({ data }) => { + const context = data.context as string[]; + return `You are a VoltAgent support specialist. + +Use the provided context snippets to answer the user's question accurately. + +Context: +${context.map((snippet) => `- ${snippet}`).join("\n")} + +Question: ${data.question} + +Provide a concise answer.`; + }, + workflowResponder, + { + schema: z.object({ + answer: z.string().describe("Support answer for the user question."), + }), + /* eval: { + sampling: { type: "ratio", rate: 1 }, + scorers: createLiveEvalScorers(), + }, */ + }, + ), + andThen({ + id: "finalize-answer", + execute: async ({ data }) => data.answer, + }), +); + new VoltAgent({ agents: { support: supportAgent }, + workflows: { support: supportWorkflow }, server: honoServer(), observability, }); (async () => { const question = "How can I enable live eval scorers in VoltAgent?"; - const result = await supportAgent.generateText(question); + /* const result = await supportAgent.generateText(question); */ + const workflowRun = await supportWorkflow.run({ question }); console.log("Question:\n", question, "\n"); - console.log("Agent response:\n", result.text, "\n"); + /* console.log("Agent response:\n", result.text, "\n"); */ + console.log("Workflow response:\n", workflowRun.result, "\n"); })(); diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index f8278cea8..bb51ed4c0 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -20,6 +20,16 @@ export type { WorkflowStats, WorkflowTimelineEvent, RegisteredWorkflow, + WorkflowEvalConfig, + WorkflowEvalScorerConfig, + WorkflowEvalScorerFactory, + WorkflowEvalScorerReference, + WorkflowEvalResult, + WorkflowEvalSamplingPolicy, + WorkflowEvalOperationType, + WorkflowEvalPayload, + WorkflowEvalContext, + WorkflowEvalStepConfig, } from "./workflow"; // Export new Agent from agent.ts export { diff --git a/packages/core/src/workflow/core.spec.ts b/packages/core/src/workflow/core.spec.ts index 5b36d70c4..5d897beee 100644 --- a/packages/core/src/workflow/core.spec.ts +++ b/packages/core/src/workflow/core.spec.ts @@ -1,9 +1,11 @@ import type { UIMessageChunk } from "ai"; -import { beforeEach, describe, expect, it } from "vitest"; +import { beforeEach, describe, expect, it, vi } from "vitest"; import { z } from "zod"; +import type { LocalScorerDefinition } from "../eval/runtime"; import { Memory } from "../memory"; import { InMemoryStorageAdapter } from "../memory/adapters/storage/in-memory"; import { createWorkflow } from "./core"; +import type { WorkflowEvalContext } from "./eval/types"; import { WorkflowRegistry } from "./registry"; import { andThen } from "./steps"; @@ -206,3 +208,74 @@ describe.sequential("workflow streaming", () => { } }); }); + +describe.sequential("workflow eval scoring", () => { + beforeEach(() => { + const registry = WorkflowRegistry.getInstance(); + (registry as any).workflows.clear(); + }); + + it("runs step-level scorers when configured", async () => { + const memory = new Memory({ storage: new InMemoryStorageAdapter() }); + const onStepResult = vi.fn(); + + const stepScorer: LocalScorerDefinition> = { + id: "step-scorer", + name: "Step Scorer", + scorer: () => ({ + status: "success", + score: 0.9, + }), + }; + + const workflow = createWorkflow( + { + id: "eval-test", + name: "Eval Test", + input: z.object({ value: z.number() }), + result: z.object({ doubled: z.number() }), + memory, + }, + andThen({ + id: "multiply", + name: "Multiply", + execute: async ({ data }) => ({ doubled: data.value * 2 }), + eval: { + scorers: { + quality: { + scorer: stepScorer, + onResult: onStepResult, + }, + }, + }, + }), + andThen({ + id: "identity", + execute: async ({ data }) => data, + }), + ); + + const registry = WorkflowRegistry.getInstance(); + registry.registerWorkflow(workflow); + + await workflow.run({ value: 3 }); + + await new Promise((resolve) => setTimeout(resolve, 0)); + + expect(onStepResult).toHaveBeenCalledTimes(1); + const [result] = onStepResult.mock.calls[0]; + expect(result.status).toBe("success"); + expect(result.payload.target).toBe("step"); + expect(result.payload.step?.id).toBe("multiply"); + expect(result.payload.status).toBe("completed"); + expect(result.metadata?.workflow?.id).toBe("eval-test"); + expect(result.metadata?.step).toEqual( + expect.objectContaining({ + id: "multiply", + metadata: expect.objectContaining({ + durationMs: expect.any(Number), + }), + }), + ); + }); +}); diff --git a/packages/core/src/workflow/core.ts b/packages/core/src/workflow/core.ts index 7c271766b..71ab56ade 100644 --- a/packages/core/src/workflow/core.ts +++ b/packages/core/src/workflow/core.ts @@ -9,6 +9,8 @@ import { type VoltAgentObservability, createVoltAgentObservability } from "../ob import { AgentRegistry } from "../registries/agent-registry"; import { randomUUID } from "../utils/id"; import type { WorkflowExecutionContext } from "./context"; +import { enqueueWorkflowEvalScoring, enqueueWorkflowStepEvalScoring } from "./eval"; +import type { WorkflowEvalHost, WorkflowEvalPayload } from "./eval/types"; import { createWorkflowStateManager } from "./internal/state"; import type { InternalBaseWorkflowInputSchema } from "./internal/types"; import { @@ -628,6 +630,7 @@ export function createWorkflow< resumeSchema, memory: workflowMemory, observability: workflowObservability, + eval: workflowEvalConfig, }: WorkflowConfig, ...steps: ReadonlyArray ) { @@ -690,6 +693,14 @@ export function createWorkflow< return cachedObservability; }; + const workflowEvalHost: WorkflowEvalHost = { + id, + name, + logger, + evalConfig: workflowEvalConfig, + getObservability: () => getObservability(), + }; + // Set default schemas if not provided const effectiveSuspendSchema = suspendSchema || z.any(); const effectiveResumeSchema = resumeSchema || z.any(); @@ -714,6 +725,7 @@ export function createWorkflow< // Only create stream controller if one is provided (for streaming execution) // For normal run, we don't need a stream controller const streamController = externalStreamController || null; + const operationType = streamController ? "stream" : "run"; // Get observability instance const observability = getObservability(); @@ -1176,6 +1188,48 @@ export function createWorkflow< }, }); + const stepInputForEval = stateManager.state.data; + const stepStartTime = Date.now(); + const userIdForEval = stateManager.state.userId ?? options?.userId; + const conversationIdForEval = + stateManager.state.conversationId ?? options?.conversationId; + const globalStepEvalConfig = + workflowEvalHost.evalConfig?.steps?.[step.id] ?? + workflowEvalHost.evalConfig?.steps?.["*"]; + const stepEvalConfig = step.eval ?? globalStepEvalConfig; + + const scheduleStepScoring = ( + status: WorkflowEvalPayload["status"], + details: { output?: unknown; metadata?: Record }, + ) => { + enqueueWorkflowStepEvalScoring( + workflowEvalHost, + { + span: stepSpan, + executionId, + workflowId: id, + workflowName: name, + status, + input: stepInputForEval, + output: details.output, + rawInput: stepInputForEval, + rawOutput: details.output, + userId: userIdForEval, + conversationId: conversationIdForEval, + operation: operationType, + metadata: details.metadata, + step: { + id: step.id, + name: step.name ?? stepName, + index, + type: step.type, + metadata: details.metadata, + }, + }, + stepEvalConfig ?? undefined, + ); + }; + // Create stream writer for this step - real one for streaming, no-op for regular execution const stepWriter = streamController ? new WorkflowStreamWriterImpl( @@ -1310,6 +1364,19 @@ export function createWorkflow< const isSkipped = step.type === "conditional-when" && result === stateManager.state.data; + const stepDuration = Date.now() - stepStartTime; + const stepMetadata: Record = { + durationMs: stepDuration, + }; + if (isSkipped) { + stepMetadata.skipped = true; + } + + scheduleStepScoring(isSkipped ? "skipped" : "completed", { + output: result, + metadata: stepMetadata, + }); + stateManager.update({ data: result, result: result, @@ -1379,6 +1446,19 @@ export function createWorkflow< // Get suspend data if provided const suspendData = executionContext.context.get("suspendData"); + const suspensionDuration = Date.now() - stepStartTime; + const suspensionEvalMetadata: Record = { + durationMs: suspensionDuration, + reason: suspensionReason, + }; + if (suspendData !== undefined) { + suspensionEvalMetadata.suspendData = suspendData; + } + + scheduleStepScoring("suspended", { + metadata: suspensionEvalMetadata, + }); + const suspensionMetadata = stateManager.suspend( suspensionReason, { @@ -1461,6 +1541,28 @@ export function createWorkflow< } // End step span with error + const errorDuration = Date.now() - stepStartTime; + const errorMetadata: Record = { + durationMs: errorDuration, + }; + let errorMessage: string | undefined; + if (stepError instanceof Error && stepError.message) { + errorMessage = stepError.message; + } else if (stepError !== undefined) { + try { + errorMessage = safeStringify(stepError); + } catch { + errorMessage = String(stepError); + } + } + if (errorMessage) { + errorMetadata.errorMessage = errorMessage; + } + + scheduleStepScoring("error", { + metadata: errorMetadata, + }); + traceContext.endStepSpan(stepSpan, "error", { error: stepError as Error, }); @@ -1470,10 +1572,44 @@ export function createWorkflow< } const finalState = stateManager.finish(); + const duration = finalState.endAt.getTime() - finalState.startAt.getTime(); // Record workflow completion in trace traceContext.setOutput(finalState.result); traceContext.setUsage(stateManager.state.usage); + + if (workflowEvalHost.evalConfig) { + const metadata: Record = { + durationMs: duration, + stepsExecuted: executionContext.steps.length, + }; + if (stateManager.state.usage) { + try { + metadata.usage = JSON.parse(safeStringify(stateManager.state.usage)); + } catch (error) { + runLogger.debug("Failed to serialize workflow usage for eval metadata", { + error: error instanceof Error ? error.message : error, + }); + } + } + + enqueueWorkflowEvalScoring(workflowEvalHost, { + span: rootSpan, + executionId, + workflowId: id, + workflowName: name, + status: "completed", + input: stateManager.state.input, + output: finalState.result, + rawInput: stateManager.state.input, + rawOutput: finalState.result, + userId: stateManager.state.userId ?? options?.userId, + conversationId: stateManager.state.conversationId ?? options?.conversationId, + operation: operationType, + metadata, + }); + } + traceContext.end("completed"); // Update Memory V2 state to completed @@ -1491,7 +1627,6 @@ export function createWorkflow< await hooks?.onEnd?.(stateManager.state); // Log workflow completion with context - const duration = finalState.endAt.getTime() - finalState.startAt.getTime(); runLogger.debug( `Workflow completed | user=${options?.userId || "anonymous"} conv=${options?.conversationId || "none"} duration=${duration}ms`, { @@ -1689,6 +1824,7 @@ export function createWorkflow< // ✅ Always expose memory for registry access memory: effectiveMemory, observability: workflowObservability, + evalConfig: workflowEvalConfig, getFullState: () => { // Return workflow state similar to agent.getFullState return { diff --git a/packages/core/src/workflow/eval/index.ts b/packages/core/src/workflow/eval/index.ts new file mode 100644 index 000000000..92f9d592f --- /dev/null +++ b/packages/core/src/workflow/eval/index.ts @@ -0,0 +1,1217 @@ +import { + type Attributes, + type Span, + type SpanContext, + SpanKind, + SpanStatusCode, + context as otelContext, + trace, +} from "@opentelemetry/api"; +import { safeStringify } from "@voltagent/internal/utils"; +import { + type LocalScorerDefinition, + type ScorerLifecycleScope, + runLocalScorers, +} from "../../eval/runtime"; +import type { VoltAgentObservability } from "../../observability"; +import { randomUUID } from "../../utils/id"; +import type { + WorkflowEvalConfig, + WorkflowEvalContext, + WorkflowEvalHost, + WorkflowEvalPayload, + WorkflowEvalResult, + WorkflowEvalScorerConfig, + WorkflowEvalScorerReference, + WorkflowEvalStepConfig, + WorkflowEvalStepInfo, +} from "./types"; + +const scheduleAsync = + typeof setImmediate === "function" + ? (fn: () => void) => { + setImmediate(fn); + } + : (fn: () => void) => { + setTimeout(fn, 0); + }; + +type WorkflowScorerDescriptor = { + key: string; + config: WorkflowEvalScorerConfig; + definition: LocalScorerDefinition>; +}; + +interface ScoreMetrics { + combinedMetadata: Record | null; + scoreValue: number | null; + thresholdValue?: number; + thresholdPassed: boolean | null; + datasetMetadata?: ReturnType; +} + +async function resolveScorerDescriptors( + config: WorkflowEvalStepConfig, + host: WorkflowEvalHost, + scope: string, +): Promise { + const scorerEntries = Object.entries(config.scorers ?? {}); + if (scorerEntries.length === 0) { + return []; + } + + const descriptors: WorkflowScorerDescriptor[] = []; + for (const [key, scorerConfig] of scorerEntries) { + try { + const definition = await resolveEvalScorersDefinition(key, scorerConfig); + if (!definition) { + host.logger.warn(`[Workflow:${host.name}][${scope}] Unknown eval scorer for key ${key}`); + continue; + } + descriptors.push({ key, config: scorerConfig, definition }); + } catch (error) { + host.logger.warn( + `[Workflow:${host.name}][${scope}] Failed to resolve eval scorer for key ${key}`, + { + error: error instanceof Error ? error.message : error, + }, + ); + } + } + + return descriptors; +} + +function buildScoreMetrics( + storagePayload: WorkflowEvalPayload, + result: Awaited>["results"][number], +): ScoreMetrics { + const combinedMetadata = combineEvalMetadata(storagePayload, result.metadata); + const scoreValue = result.score ?? null; + const thresholdValue = resolveThresholdFromMetadata(combinedMetadata); + let thresholdPassed = resolveThresholdPassedFromMetadata(combinedMetadata); + if (thresholdPassed === null && thresholdValue !== undefined && scoreValue !== null) { + thresholdPassed = scoreValue >= thresholdValue; + } + + const datasetMetadata = extractDatasetMetadataFromCombinedMetadata(combinedMetadata); + + return { + combinedMetadata, + scoreValue, + thresholdValue, + thresholdPassed, + datasetMetadata, + }; +} + +function createScorerSpanAttributes( + host: WorkflowEvalHost, + descriptor: WorkflowScorerDescriptor, + config: WorkflowEvalStepConfig, + storagePayload: WorkflowEvalPayload, + metrics: ScoreMetrics, + result: Awaited>["results"][number], +): Attributes { + const { definition } = descriptor; + const scorerLabel = definition.name ?? descriptor.key ?? definition.id; + const attributes: Attributes = { + "span.type": "scorer", + "voltagent.label": scorerLabel, + "entity.id": host.id, + "entity.name": host.name, + "entity.type": "workflow", + "eval.scorer.id": definition.id, + "eval.scorer.key": descriptor.key, + "eval.scorer.name": scorerLabel, + "eval.scorer.kind": "live", + "eval.scorer.status": result.status, + "eval.operation.id": storagePayload.executionId, + "eval.operation.type": storagePayload.operationType, + "eval.operation.status": storagePayload.status, + "eval.target": storagePayload.target, + "eval.trace.id": storagePayload.traceId, + "eval.source.span_id": storagePayload.spanId, + "eval.trigger_source": config.triggerSource ?? "live", + "eval.environment": config.environment, + "workflow.execution.id": storagePayload.executionId, + "workflow.id": storagePayload.workflowId, + "workflow.name": storagePayload.workflowName, + "workflow.status": storagePayload.status, + }; + + if (storagePayload.step) { + attributes["workflow.step.id"] = storagePayload.step.id; + if (storagePayload.step.name) { + attributes["workflow.step.name"] = storagePayload.step.name; + } + if (typeof storagePayload.step.index === "number") { + attributes["workflow.step.index"] = storagePayload.step.index; + } + if (storagePayload.step.type) { + attributes["workflow.step.type"] = storagePayload.step.type; + } + } + + if (metrics.scoreValue !== null) { + attributes["eval.scorer.score"] = metrics.scoreValue; + } + if (metrics.thresholdValue !== undefined) { + attributes["eval.scorer.threshold"] = metrics.thresholdValue; + } + if (metrics.thresholdPassed !== null) { + attributes["eval.scorer.threshold_passed"] = metrics.thresholdPassed; + } + if (result.durationMs !== undefined) { + attributes["eval.scorer.duration_ms"] = result.durationMs; + } + if (result.sampling?.applied !== undefined) { + attributes["eval.scorer.sampling.applied"] = result.sampling.applied; + } + if (result.sampling?.rate !== undefined) { + attributes["eval.scorer.sampling.rate"] = result.sampling.rate; + } + if (result.sampling?.strategy) { + attributes["eval.scorer.sampling.strategy"] = result.sampling.strategy; + } + if (metrics.datasetMetadata?.datasetId) { + attributes["eval.dataset.id"] = metrics.datasetMetadata.datasetId; + } + if (metrics.datasetMetadata?.datasetVersionId) { + attributes["eval.dataset.version_id"] = metrics.datasetMetadata.datasetVersionId; + } + if (metrics.datasetMetadata?.datasetItemId) { + attributes["eval.dataset.item_id"] = metrics.datasetMetadata.datasetItemId; + } + if (metrics.datasetMetadata?.datasetItemHash) { + attributes["eval.dataset.item_hash"] = metrics.datasetMetadata.datasetItemHash; + } + if (storagePayload.userId) { + attributes["user.id"] = storagePayload.userId; + } + if (storagePayload.conversationId) { + attributes["conversation.id"] = storagePayload.conversationId; + } + + return attributes; +} + +function finalizeScorerSpan( + span: Span, + host: WorkflowEvalHost, + descriptor: WorkflowScorerDescriptor, + config: WorkflowEvalStepConfig, + storagePayload: WorkflowEvalPayload, + metrics: ScoreMetrics, + result: Awaited>["results"][number], +): void { + const attributes = createScorerSpanAttributes( + host, + descriptor, + config, + storagePayload, + metrics, + result, + ); + + span.setAttributes(attributes); + + if (metrics.combinedMetadata && Object.keys(metrics.combinedMetadata).length > 0) { + try { + span.setAttribute("eval.scorer.metadata", safeStringify(metrics.combinedMetadata)); + } catch { + span.setAttribute("eval.scorer.metadata", "[unserializable]"); + } + } + + span.addEvent("eval.scorer.result", { + status: result.status, + score: metrics.scoreValue ?? undefined, + threshold: metrics.thresholdValue ?? undefined, + thresholdPassed: metrics.thresholdPassed ?? undefined, + }); + + if (result.status === "error") { + const errorMessage = extractErrorMessage(result.error); + span.setAttribute("eval.scorer.error_message", errorMessage); + span.setStatus({ + code: SpanStatusCode.ERROR, + message: errorMessage, + }); + if (result.error instanceof Error) { + span.recordException(result.error); + } else if (result.error) { + span.recordException({ message: errorMessage }); + } + } else { + span.setStatus({ + code: SpanStatusCode.OK, + message: result.status === "skipped" ? "skipped" : undefined, + }); + } + + span.end(); +} + +export interface EnqueueWorkflowEvalScoringArgs { + span: Span; + executionId: string; + workflowId: string; + workflowName: string; + status: WorkflowEvalPayload["status"]; + input?: unknown; + output?: unknown; + rawInput?: unknown; + rawOutput?: unknown; + userId?: string; + conversationId?: string; + operation: WorkflowEvalPayload["operationType"]; + metadata?: Record; + target?: WorkflowEvalPayload["target"]; + step?: WorkflowEvalStepInfo; +} + +export function enqueueWorkflowEvalScoring( + host: WorkflowEvalHost, + args: EnqueueWorkflowEvalScoringArgs, +): void { + scheduleEvalScoring( + host, + host.evalConfig, + { ...args, target: args.target ?? "workflow" }, + "workflow", + ); +} + +export function enqueueWorkflowStepEvalScoring( + host: WorkflowEvalHost, + args: EnqueueWorkflowEvalScoringArgs & { step: WorkflowEvalStepInfo }, + overrideConfig?: WorkflowEvalStepConfig | null, +): void { + const stepId = args.step.id; + const config = + overrideConfig ?? host.evalConfig?.steps?.[stepId] ?? host.evalConfig?.steps?.["*"]; + if (!config) { + return; + } + + scheduleEvalScoring(host, config, { ...args, target: "step" }, `step:${stepId}`); +} + +function scheduleEvalScoring( + host: WorkflowEvalHost, + config: WorkflowEvalStepConfig | undefined, + args: EnqueueWorkflowEvalScoringArgs, + scope: string, +): void { + if (!config || !config.scorers || Object.keys(config.scorers).length === 0) { + return; + } + + const span = args.span; + const spanContext = span.spanContext(); + + const normalizedArgs: EnqueueWorkflowEvalScoringArgs = { + ...args, + target: args.target ?? (args.step ? "step" : "workflow"), + }; + + const rawPayload = buildEvalPayload(normalizedArgs, spanContext); + if (!rawPayload) { + return; + } + + const storagePayload = + config.redact?.(cloneEvalPayload(rawPayload)) ?? cloneEvalPayload(rawPayload); + + if (spanContext.traceId && spanContext.spanId) { + const scorerKeys = Object.keys(config.scorers ?? {}); + if (scorerKeys.length > 0) { + span.setAttribute("eval.scorers.count", scorerKeys.length); + span.setAttribute("eval.scorers.trigger_source", config.triggerSource ?? "live"); + span.setAttribute("eval.operation.type", rawPayload.operationType); + span.setAttribute("eval.operation.id", rawPayload.executionId); + span.setAttribute("eval.operation.status", rawPayload.status); + span.setAttribute("eval.target", rawPayload.target); + if (rawPayload.step) { + span.setAttribute("workflow.step.id", rawPayload.step.id); + if (rawPayload.step.name) { + span.setAttribute("workflow.step.name", rawPayload.step.name); + } + if (typeof rawPayload.step.index === "number") { + span.setAttribute("workflow.step.index", rawPayload.step.index); + } + if (rawPayload.step.type) { + span.setAttribute("workflow.step.type", rawPayload.step.type); + } + } + if (config.environment) { + span.setAttribute("eval.environment", config.environment); + } + if (config.sampling?.type === "ratio" && config.sampling.rate !== undefined) { + const boundedRate = Math.max(0, Math.min(1, config.sampling.rate)); + span.setAttribute("eval.sampling.rate", boundedRate); + span.setAttribute("eval.sampling.percentage", boundedRate * 100); + } + span.addEvent("eval.scorers.scheduled", { + count: scorerKeys.length, + operation: rawPayload.operationType, + trigger: config.triggerSource ?? "live", + target: rawPayload.target, + scope, + }); + } + } + + const context: WorkflowEvalContext = { + ...rawPayload, + timestamp: new Date().toISOString(), + rawPayload, + }; + + const observability = host.getObservability(); + + scheduleAsync(() => { + runEvalScorers(host, { + config, + context, + rawPayload, + storagePayload, + observability, + rootSpanContext: spanContext, + scope, + }).catch((error) => { + host.logger.warn(`[Workflow:${host.name}][${scope}] eval scoring failed`, { + error: error instanceof Error ? error.message : error, + }); + }); + }); +} + +interface RunEvalScorersArgs { + config: WorkflowEvalStepConfig; + context: WorkflowEvalContext; + rawPayload: WorkflowEvalPayload; + storagePayload: WorkflowEvalPayload; + observability: VoltAgentObservability; + rootSpanContext: SpanContext; + scope: string; +} + +async function runEvalScorers(host: WorkflowEvalHost, args: RunEvalScorersArgs): Promise { + const { config, context, rawPayload, storagePayload, observability, rootSpanContext, scope } = + args; + const descriptors = await resolveScorerDescriptors(config, host, scope); + if (descriptors.length === 0) { + return; + } + + const descriptorById = new Map(); + for (const descriptor of descriptors) { + descriptorById.set(descriptor.definition.id, descriptor); + } + + const tracer = observability.getTracer(); + const parentContext = + rootSpanContext.traceId && rootSpanContext.spanId + ? trace.setSpanContext(otelContext.active(), rootSpanContext) + : otelContext.active(); + + const execution = await runLocalScorers({ + payload: context, + defaultSampling: config.sampling, + baseArgs: (payload) => { + const base: Record = {}; + if (payload.input !== undefined) { + base.input = payload.input ?? ""; + } + if (payload.output !== undefined) { + base.output = payload.output ?? ""; + } + base.status = payload.status; + base.target = payload.target; + if (payload.step) { + base.step = payload.step; + } + return base; + }, + scorers: descriptors.map(({ definition }) => definition), + onScorerStart: ({ definition }) => { + const descriptor = descriptorById.get(definition.id); + if (!descriptor) { + return undefined; + } + + const links = + rootSpanContext.traceId && rootSpanContext.spanId + ? [ + { + context: { + traceId: rootSpanContext.traceId, + spanId: rootSpanContext.spanId, + traceFlags: rootSpanContext.traceFlags, + traceState: rootSpanContext.traceState, + }, + attributes: { + "link.type": "eval-scorer", + "eval.operation.id": storagePayload.executionId, + "eval.operation.type": storagePayload.operationType, + }, + }, + ] + : undefined; + + const span = tracer.startSpan( + `eval.scorer.${definition.id}`, + { + kind: SpanKind.INTERNAL, + attributes: { "span.type": "scorer" }, + links, + }, + parentContext, + ); + + span.setAttributes({ + "voltagent.label": definition.name ?? descriptor.key ?? definition.id, + "entity.id": host.id, + "entity.type": "workflow", + "entity.name": host.name, + "eval.scorer.id": definition.id, + "eval.scorer.key": descriptor.key, + "eval.scorer.name": definition.name ?? definition.id, + "eval.scorer.kind": "live", + "eval.scorer.status": "running", + "eval.operation.id": storagePayload.executionId, + "eval.operation.type": storagePayload.operationType, + "eval.operation.status": storagePayload.status, + "eval.target": storagePayload.target, + "eval.scope": scope, + "eval.trace.id": storagePayload.traceId, + "eval.source.span_id": storagePayload.spanId, + "eval.trigger_source": config.triggerSource ?? "live", + "eval.environment": config.environment, + }); + + if (storagePayload.step) { + span.setAttribute("workflow.step.id", storagePayload.step.id); + if (storagePayload.step.name) { + span.setAttribute("workflow.step.name", storagePayload.step.name); + } + if (typeof storagePayload.step.index === "number") { + span.setAttribute("workflow.step.index", storagePayload.step.index); + } + if (storagePayload.step.type) { + span.setAttribute("workflow.step.type", storagePayload.step.type); + } + } + + if (storagePayload.userId) { + span.setAttribute("user.id", storagePayload.userId); + } + if (storagePayload.conversationId) { + span.setAttribute("conversation.id", storagePayload.conversationId); + } + + span.addEvent("eval.scorer.started"); + const spanContext = trace.setSpan(parentContext, span); + return { + span, + run: (executor: () => T | Promise) => + otelContext.with(spanContext, () => { + try { + return Promise.resolve(executor()); + } catch (error) { + return Promise.reject(error); + } + }), + }; + }, + onScorerComplete: ({ definition, execution: scorerExecution, context: lifecycleContext }) => { + const lifecycleScope = lifecycleContext as + | (ScorerLifecycleScope & { span?: Span }) + | undefined; + const span = lifecycleScope?.span; + if (!span) { + return; + } + + const descriptor = descriptorById.get(definition.id); + if (!descriptor) { + span.end(); + return; + } + + const metrics = buildScoreMetrics(storagePayload, scorerExecution); + finalizeScorerSpan(span, host, descriptor, config, storagePayload, metrics, scorerExecution); + }, + }); + + for (const result of execution.results) { + const descriptor = descriptorById.get(result.id); + if (!descriptor) { + host.logger.warn( + `[Workflow:${host.name}][${scope}] Received eval scorer result for unknown id ${result.id}`, + ); + continue; + } + + const metrics = buildScoreMetrics(storagePayload, result); + + await invokeEvalResultCallback( + host, + descriptor.config, + { + scorerId: descriptor.definition.id, + scorerName: descriptor.definition.name, + status: result.status, + score: result.score ?? null, + metadata: metrics.combinedMetadata ?? undefined, + error: result.error, + durationMs: result.durationMs, + payload: storagePayload, + rawPayload, + }, + scope, + ); + + if (result.status === "error") { + host.logger.warn( + `[Workflow:${host.name}][${scope}] Eval scorer '${descriptor.definition.name}' failed`, + { + error: result.error instanceof Error ? result.error.message : result.error, + scorerId: descriptor.definition.id, + }, + ); + } + } +} + +async function resolveEvalScorersDefinition( + key: string, + config: WorkflowEvalScorerConfig, +): Promise> | null> { + const scorerRef = config.scorer; + let baseDefinition: LocalScorerDefinition> | null = null; + + if (isLocalScorerDefinition(scorerRef)) { + baseDefinition = scorerRef; + } else if (typeof scorerRef === "function") { + const resolved = await scorerRef(); + if (!isLocalScorerDefinition(resolved)) { + throw new Error( + `Workflow eval scorer factory for key '${key}' did not return a LocalScorerDefinition.`, + ); + } + baseDefinition = resolved; + } + + if (!baseDefinition) { + return null; + } + + const adaptedDefinition = adaptScorerDefinitionForWorkflow(baseDefinition, config); + return applyEvalConfigOverrides(adaptedDefinition, key, config); +} + +function applyEvalConfigOverrides( + baseDefinition: LocalScorerDefinition>, + key: string, + config: WorkflowEvalScorerConfig, +): LocalScorerDefinition> { + const resolvedId = config.id ?? baseDefinition.id ?? key ?? randomUUID(); + const resolvedName = baseDefinition.name ?? resolvedId; + + return { + ...baseDefinition, + id: resolvedId, + name: resolvedName, + sampling: config.sampling ?? baseDefinition.sampling, + params: mergeParamsSources(baseDefinition.params, config.params), + }; +} + +function adaptScorerDefinitionForWorkflow( + definition: LocalScorerDefinition>, + config: WorkflowEvalScorerConfig, +): LocalScorerDefinition> { + const { buildPayload, buildParams } = config; + + const baseParams = definition.params; + + const computeMergedParams = + buildParams || baseParams + ? async ( + workflowContext: WorkflowEvalContext, + normalizedPayload: Record, + ) => { + const merged: Record = {}; + + if (typeof baseParams === "function") { + const baseResult = await baseParams(normalizedPayload); + if (isPlainRecord(baseResult)) { + Object.assign(merged, baseResult); + } + } else if (isPlainRecord(baseParams)) { + Object.assign(merged, baseParams); + } + + if (buildParams) { + const override = await buildParams(workflowContext); + if (isPlainRecord(override)) { + Object.assign(merged, override); + } + } + + return merged; + } + : undefined; + + const adaptedParams = + computeMergedParams !== undefined + ? async (workflowContext: WorkflowEvalContext) => { + const rawPayload = buildPayload ? await buildPayload(workflowContext) : undefined; + const normalizedPayload = normalizeScorerPayload(workflowContext, rawPayload); + return computeMergedParams(workflowContext, normalizedPayload); + } + : undefined; + + const adaptedScorer: LocalScorerDefinition< + WorkflowEvalContext, + Record + >["scorer"] = async ({ payload, params }) => { + const workflowPayload = payload; + const rawPayload = buildPayload ? await buildPayload(workflowPayload) : undefined; + const payloadForBase = normalizeScorerPayload(workflowPayload, rawPayload); + + let resolvedParams = params; + if ((!resolvedParams || Object.keys(resolvedParams).length === 0) && computeMergedParams) { + resolvedParams = await computeMergedParams(workflowPayload, payloadForBase); + } + + return definition.scorer({ + payload: payloadForBase, + params: (resolvedParams ?? {}) as Record, + }); + }; + + return { + ...definition, + scorer: adaptedScorer, + params: adaptedParams, + } as LocalScorerDefinition>; +} + +function mergeParamsSources( + baseParams: LocalScorerDefinition>["params"], + override: WorkflowEvalScorerConfig["params"], +): LocalScorerDefinition>["params"] | undefined { + if (!override) { + return baseParams; + } + + if (!baseParams) { + return typeof override === "function" ? override : { ...override }; + } + + return async (payload: WorkflowEvalContext) => { + const baseValue = await resolveParamsSource(baseParams, payload); + const overrideValue = await resolveParamsSource(override, payload); + const merged = { + ...baseValue, + ...overrideValue, + }; + return Object.keys(merged).length > 0 ? merged : {}; + }; +} + +async function resolveParamsSource( + source: + | LocalScorerDefinition>["params"] + | WorkflowEvalScorerConfig["params"], + payload: WorkflowEvalContext, +): Promise> { + if (!source) { + return {}; + } + + if (typeof source === "function") { + const result = await source(payload); + return isPlainRecord(result) ? result : {}; + } + + if (isPlainRecord(source)) { + return source; + } + + return {}; +} + +function isLocalScorerDefinition( + value: WorkflowEvalScorerReference, +): value is LocalScorerDefinition> { + return ( + typeof value === "object" && + value !== null && + typeof (value as LocalScorerDefinition).scorer === "function" + ); +} + +function normalizeScorerPayload( + workflowContext: WorkflowEvalContext, + rawPayload?: Record, +): Record { + if (isPlainRecord(rawPayload)) { + return rawPayload; + } + + const payload: Record = { + target: workflowContext.target, + workflowId: workflowContext.workflowId, + workflowName: workflowContext.workflowName, + executionId: workflowContext.executionId, + operationType: workflowContext.operationType, + status: workflowContext.status, + input: ensureScorerText(workflowContext.input), + output: ensureScorerText(workflowContext.output), + metadata: isPlainRecord(workflowContext.metadata) ? workflowContext.metadata : undefined, + }; + + if (workflowContext.userId) { + payload.userId = workflowContext.userId; + } + if (workflowContext.conversationId) { + payload.conversationId = workflowContext.conversationId; + } + if (workflowContext.step) { + const stepPayload: Record = { + id: workflowContext.step.id, + }; + if (workflowContext.step.name) { + stepPayload.name = workflowContext.step.name; + } + if (typeof workflowContext.step.index === "number") { + stepPayload.index = workflowContext.step.index; + } + if (workflowContext.step.type) { + stepPayload.type = workflowContext.step.type; + } + if (workflowContext.step.metadata && Object.keys(workflowContext.step.metadata).length > 0) { + stepPayload.metadata = workflowContext.step.metadata; + } + payload.step = stepPayload; + } + + return payload; +} + +function ensureScorerText(value: unknown): string { + if (typeof value === "string") { + return value; + } + if (value === null || value === undefined) { + return ""; + } + if (typeof value === "object") { + try { + return safeStringify(value); + } catch { + return String(value); + } + } + return String(value); +} + +function buildEvalPayload( + args: EnqueueWorkflowEvalScoringArgs, + spanContext: SpanContext, +): WorkflowEvalPayload | undefined { + if (!spanContext.traceId || !spanContext.spanId) { + return undefined; + } + + const target = args.target ?? (args.step ? "step" : "workflow"); + const stepInfo = args.step + ? { + id: args.step.id, + name: args.step.name, + index: args.step.index, + type: args.step.type, + metadata: args.step.metadata, + } + : undefined; + + return { + target, + executionId: args.executionId, + workflowId: args.workflowId, + workflowName: args.workflowName, + operationType: args.operation, + status: args.status, + input: normalizeEvalString(args.input), + output: normalizeEvalString(args.output), + rawInput: args.rawInput, + rawOutput: args.rawOutput, + userId: args.userId, + conversationId: args.conversationId, + traceId: spanContext.traceId, + spanId: spanContext.spanId, + metadata: args.metadata, + step: stepInfo, + }; +} + +function normalizeEvalString(value: unknown): string | null { + if (value === undefined || value === null) { + return null; + } + if (typeof value === "string") { + return value; + } + return safeStringify(value); +} + +function cloneEvalPayload(payload: WorkflowEvalPayload): WorkflowEvalPayload { + return JSON.parse(safeStringify(payload)) as WorkflowEvalPayload; +} + +function combineEvalMetadata( + payload: WorkflowEvalPayload, + scorerMetadata: Record | null | undefined, +): Record | null { + const combined: Record = { + target: payload.target, + workflow: { + id: payload.workflowId, + name: payload.workflowName, + executionId: payload.executionId, + status: payload.status, + }, + }; + + if (payload.step) { + const stepRecord: Record = { + id: payload.step.id, + }; + if (payload.step.name) { + stepRecord.name = payload.step.name; + } + if (typeof payload.step.index === "number") { + stepRecord.index = payload.step.index; + } + if (payload.step.type) { + stepRecord.type = payload.step.type; + } + if (payload.step.metadata && Object.keys(payload.step.metadata).length > 0) { + stepRecord.metadata = payload.step.metadata; + } + combined.step = stepRecord; + } + + if (payload.input !== undefined) { + combined.input = payload.input; + } + if (payload.output !== undefined) { + combined.output = payload.output; + } + + const payloadMetadata = isPlainRecord(payload.metadata) + ? (payload.metadata as Record) + : undefined; + if (payloadMetadata && Object.keys(payloadMetadata).length > 0) { + combined.payload = payloadMetadata; + } + + const scorerRecord = isPlainRecord(scorerMetadata) + ? (scorerMetadata as Record) + : undefined; + if (scorerRecord && Object.keys(scorerRecord).length > 0) { + combined.scorer = scorerRecord; + const builderSnapshot = isPlainRecord(scorerRecord.scorerBuilder) + ? (scorerRecord.scorerBuilder as Record) + : undefined; + if (builderSnapshot) { + combined.scorerBuilder = builderSnapshot; + } + } + + const voltAgentMetadata = collectVoltAgentMetadataFromSources(payloadMetadata, scorerRecord); + const datasetMetadata = collectDatasetMetadataFromSources(payloadMetadata, scorerRecord); + const liveEvalMetadata = collectLiveEvalMetadata(payloadMetadata, scorerRecord); + + if (datasetMetadata) { + combined.dataset = { + ...(isPlainRecord(combined.dataset) ? (combined.dataset as Record) : {}), + ...datasetMetadata, + }; + } + + if (voltAgentMetadata || datasetMetadata) { + const mergedVoltAgent: Record = { + ...(voltAgentMetadata ?? {}), + }; + if (datasetMetadata) { + const baseDataset = isPlainRecord(mergedVoltAgent.dataset) + ? (mergedVoltAgent.dataset as Record) + : undefined; + mergedVoltAgent.dataset = { + ...(baseDataset ?? {}), + ...datasetMetadata, + }; + } + if (Object.keys(mergedVoltAgent).length > 0) { + combined.voltAgent = mergedVoltAgent; + } + } + + if (liveEvalMetadata && Object.keys(liveEvalMetadata).length > 0) { + combined.liveEval = liveEvalMetadata; + } + + return Object.keys(combined).length > 0 ? combined : null; +} + +function collectVoltAgentMetadataFromSources( + ...sources: Array | undefined> +): Record | undefined { + const voltAgentRecords: Record[] = []; + + for (const source of sources) { + if (!isPlainRecord(source)) { + continue; + } + + if (hasVoltAgentMetadataShape(source)) { + voltAgentRecords.push(source); + continue; + } + + const nestedVoltAgent = extractVoltAgentMetadataFromNested(source); + if (nestedVoltAgent && Object.keys(nestedVoltAgent).length > 0) { + voltAgentRecords.push(nestedVoltAgent); + } + } + + if (voltAgentRecords.length === 0) { + return undefined; + } + + return voltAgentRecords.reduce>((acc, record) => { + Object.assign(acc, record); + return acc; + }, {}); +} + +function hasVoltAgentMetadataShape(source: Record): boolean { + return ( + typeof source.threshold === "number" || + typeof source.thresholdPassed === "boolean" || + typeof source.useCase === "string" || + typeof source.dataset === "object" + ); +} + +function extractVoltAgentMetadataFromNested( + source: Record, +): Record | undefined { + const voltAgent = source.voltAgent; + if (!isPlainRecord(voltAgent)) { + return undefined; + } + + const metadata: Record = {}; + for (const key of Object.keys(voltAgent)) { + metadata[key] = (voltAgent as Record)[key]; + } + return metadata; +} + +function collectDatasetMetadataFromSources( + ...sources: Array | undefined> +): Record | undefined { + const datasetRecords: Record[] = []; + const seen = new Set>(); + + for (const source of sources) { + if (!isPlainRecord(source)) { + continue; + } + gatherDatasetRecords(source, datasetRecords, seen); + } + + if (datasetRecords.length === 0) { + return undefined; + } + + return datasetRecords.reduce>((acc, record) => { + Object.assign(acc, record); + return acc; + }, {}); +} + +function gatherDatasetRecords( + current: Record, + out: Record[], + seen: Set>, + deep = false, +): void { + if (seen.has(current)) { + return; + } + seen.add(current); + + if (hasDatasetShape(current)) { + out.push(current); + } + + if (!deep) { + const dataset = current.dataset; + if (isPlainRecord(dataset)) { + gatherDatasetRecords(dataset, out, seen, true); + } + const voltAgent = current.voltAgent; + if (isPlainRecord(voltAgent)) { + gatherDatasetRecords(voltAgent, out, seen, true); + } + } + + const payload = isPlainRecord(current.payload) + ? (current.payload as Record) + : undefined; + if (payload) { + gatherDatasetRecords(payload, out, seen, true); + } + + const scorer = isPlainRecord(current.scorer) + ? (current.scorer as Record) + : undefined; + if (scorer) { + gatherDatasetRecords(scorer, out, seen, true); + } +} + +function hasDatasetShape(source: Record): boolean { + return ( + typeof source.datasetId === "string" || + typeof source.datasetVersionId === "string" || + typeof source.datasetItemId === "string" || + typeof source.datasetItemHash === "string" || + typeof source.id === "string" || + typeof source.itemId === "string" + ); +} + +function resolveThresholdFromMetadata( + metadata: Record | null | undefined, +): number | undefined { + const record = isPlainRecord(metadata) ? (metadata as Record) : undefined; + const voltAgent = collectVoltAgentMetadataFromSources(record); + if (!voltAgent) { + return undefined; + } + const threshold = voltAgent.threshold; + return typeof threshold === "number" ? threshold : undefined; +} + +function resolveThresholdPassedFromMetadata( + metadata: Record | null | undefined, +): boolean | null { + const record = isPlainRecord(metadata) ? (metadata as Record) : undefined; + const voltAgent = collectVoltAgentMetadataFromSources(record); + if (!voltAgent) { + return null; + } + const value = voltAgent.thresholdPassed; + return typeof value === "boolean" ? value : null; +} + +function extractDatasetMetadataFromCombinedMetadata( + metadata: Record | null | undefined, +): + | { + datasetId?: string; + datasetVersionId?: string; + datasetItemHash?: string; + datasetItemId?: string; + } + | undefined { + const record = isPlainRecord(metadata) ? (metadata as Record) : undefined; + if (!record) { + return undefined; + } + + const datasetMetadata = collectDatasetMetadataFromSources(record); + if (!datasetMetadata) { + return undefined; + } + + return { + datasetId: datasetMetadata.datasetId as string | undefined, + datasetVersionId: datasetMetadata.datasetVersionId as string | undefined, + datasetItemHash: datasetMetadata.datasetItemHash as string | undefined, + datasetItemId: datasetMetadata.datasetItemId as string | undefined, + }; +} + +function collectLiveEvalMetadata( + ...sources: Array | undefined> +): Record | undefined { + const records: Record[] = []; + for (const source of sources) { + if (!isPlainRecord(source)) { + continue; + } + const liveEval = source.liveEval; + if (isPlainRecord(liveEval)) { + records.push(liveEval); + } + } + + if (records.length === 0) { + return undefined; + } + + return records.reduce>((acc, record) => { + Object.assign(acc, record); + return acc; + }, {}); +} + +function extractErrorMessage(error: unknown): string { + if (error instanceof Error) { + return error.message; + } + if (typeof error === "string") { + return error; + } + try { + return safeStringify(error); + } catch { + return String(error); + } +} + +async function invokeEvalResultCallback( + host: WorkflowEvalHost, + config: WorkflowEvalScorerConfig, + result: WorkflowEvalResult, + scope: string, +): Promise { + if (!config.onResult) { + return; + } + + try { + await config.onResult(result); + } catch (error) { + host.logger.warn( + `[Workflow:${host.name}][${scope}] Eval scorer callback threw for scorer '${result.scorerId}'`, + { error: error instanceof Error ? error.message : error }, + ); + } +} + +function isPlainRecord(value: unknown): value is Record { + return ( + typeof value === "object" && value !== null && Object.getPrototypeOf(value) === Object.prototype + ); +} diff --git a/packages/core/src/workflow/eval/types.ts b/packages/core/src/workflow/eval/types.ts new file mode 100644 index 000000000..6297b9c6b --- /dev/null +++ b/packages/core/src/workflow/eval/types.ts @@ -0,0 +1,102 @@ +import type { Logger } from "@voltagent/internal"; +import type { LocalScorerDefinition, SamplingPolicy } from "../../eval/runtime"; +import type { VoltAgentObservability } from "../../observability"; + +export type WorkflowEvalOperationType = "run" | "stream"; +export type WorkflowEvalTarget = "workflow" | "step"; +export type WorkflowEvalStatus = "completed" | "suspended" | "cancelled" | "error" | "skipped"; + +export interface WorkflowEvalStepInfo { + id: string; + name?: string; + index?: number; + type?: string; + metadata?: Record; +} + +export interface WorkflowEvalPayload { + target: WorkflowEvalTarget; + executionId: string; + workflowId: string; + workflowName: string; + operationType: WorkflowEvalOperationType; + status: WorkflowEvalStatus; + input?: string | null; + output?: string | null; + rawInput?: unknown; + rawOutput?: unknown; + userId?: string; + conversationId?: string; + traceId: string; + spanId: string; + metadata?: Record; + step?: WorkflowEvalStepInfo; +} + +export type WorkflowEvalContext = WorkflowEvalPayload & + Record & { + timestamp: string; + rawPayload: WorkflowEvalPayload; + }; + +export type WorkflowEvalParams = Record; + +export type WorkflowEvalSamplingPolicy = SamplingPolicy; + +export type WorkflowEvalScorerFactory = () => + | LocalScorerDefinition> + | Promise>>; + +export type WorkflowEvalScorerReference = + | LocalScorerDefinition> + | WorkflowEvalScorerFactory; + +export interface WorkflowEvalResult { + scorerId: string; + scorerName?: string; + status: "success" | "error" | "skipped"; + score?: number | null; + metadata?: Record | null; + error?: unknown; + durationMs?: number; + payload: WorkflowEvalPayload; + rawPayload: WorkflowEvalPayload; +} + +export interface WorkflowEvalScorerConfig { + scorer: WorkflowEvalScorerReference; + params?: + | WorkflowEvalParams + | (( + context: WorkflowEvalContext, + ) => WorkflowEvalParams | undefined | Promise); + sampling?: WorkflowEvalSamplingPolicy; + id?: string; + onResult?: (result: WorkflowEvalResult) => void | Promise; + buildPayload?: ( + context: WorkflowEvalContext, + ) => Record | Promise>; + buildParams?: ( + context: WorkflowEvalContext, + ) => WorkflowEvalParams | undefined | Promise; +} + +export interface WorkflowEvalStepConfig { + scorers: Record; + triggerSource?: string; + environment?: string; + sampling?: WorkflowEvalSamplingPolicy; + redact?: (payload: WorkflowEvalPayload) => WorkflowEvalPayload; +} + +export interface WorkflowEvalConfig extends WorkflowEvalStepConfig { + steps?: Record; +} + +export interface WorkflowEvalHost { + readonly id: string; + readonly name: string; + readonly logger: Logger; + readonly evalConfig?: WorkflowEvalConfig; + getObservability(): VoltAgentObservability; +} diff --git a/packages/core/src/workflow/index.ts b/packages/core/src/workflow/index.ts index 9f62d85a3..bd44f49bb 100644 --- a/packages/core/src/workflow/index.ts +++ b/packages/core/src/workflow/index.ts @@ -16,3 +16,15 @@ export type { WorkflowTimelineEvent, } from "./types"; export type { WorkflowExecuteContext } from "./internal/types"; +export type { + WorkflowEvalConfig, + WorkflowEvalScorerConfig, + WorkflowEvalScorerFactory, + WorkflowEvalScorerReference, + WorkflowEvalResult, + WorkflowEvalSamplingPolicy, + WorkflowEvalOperationType, + WorkflowEvalPayload, + WorkflowEvalContext, + WorkflowEvalStepConfig, +} from "./eval/types"; diff --git a/packages/core/src/workflow/internal/types.ts b/packages/core/src/workflow/internal/types.ts index 40fceb356..9a049dc3e 100644 --- a/packages/core/src/workflow/internal/types.ts +++ b/packages/core/src/workflow/internal/types.ts @@ -5,6 +5,7 @@ import type * as TF from "type-fest"; import type { z } from "zod"; import type { BaseMessage } from "../../agent/providers"; import type { WorkflowExecutionContext } from "../context"; +import type { WorkflowEvalStepConfig } from "../eval/types"; import type { WorkflowStreamWriter } from "../types"; import type { WorkflowState } from "./state"; @@ -79,6 +80,10 @@ export type InternalWorkflowStepConfig = { * Description of what the step does */ purpose?: string; + /** + * Live evaluation configuration applied to this step + */ + eval?: WorkflowEvalStepConfig; } & T; /** @@ -126,6 +131,10 @@ export interface InternalBaseWorkflowStep, ) => Promise; + /** + * Live evaluation configuration applied directly to this step + */ + eval?: WorkflowEvalStepConfig; } /** diff --git a/packages/core/src/workflow/registry.ts b/packages/core/src/workflow/registry.ts index f64ee1e70..65008b045 100644 --- a/packages/core/src/workflow/registry.ts +++ b/packages/core/src/workflow/registry.ts @@ -1,6 +1,7 @@ import { LoggerProxy } from "../logger"; import { SimpleEventEmitter } from "../utils/simple-event-emitter"; import { serializeWorkflowStep } from "./core"; +import type { WorkflowEvalConfig } from "./eval/types"; import type { Workflow, WorkflowExecutionResult, WorkflowSuspendController } from "./types"; /** @@ -14,6 +15,7 @@ export interface RegisteredWorkflow { inputSchema?: any; // Store the input schema for API access suspendSchema?: any; // Store the suspend schema for API access resumeSchema?: any; // Store the resume schema for API access + evalConfig?: WorkflowEvalConfig; } /** @@ -66,6 +68,7 @@ export class WorkflowRegistry extends SimpleEventEmitter { inputSchema: workflow.inputSchema, suspendSchema: workflow.suspendSchema, resumeSchema: workflow.resumeSchema, + evalConfig: workflow.evalConfig, }; this.workflows.set(workflow.id, registeredWorkflow); diff --git a/packages/core/src/workflow/steps/and-agent.ts b/packages/core/src/workflow/steps/and-agent.ts index bc46c1480..7686f7126 100644 --- a/packages/core/src/workflow/steps/and-agent.ts +++ b/packages/core/src/workflow/steps/and-agent.ts @@ -3,6 +3,7 @@ import type { UIMessage } from "ai"; import type { z } from "zod"; import type { Agent, BaseGenerationOptions } from "../../agent/agent"; import { convertUsage } from "../../utils/usage-converter"; +import type { WorkflowEvalStepConfig } from "../eval/types"; import type { InternalWorkflowFunc, WorkflowExecuteContext } from "../internal/types"; import type { WorkflowStepAgent } from "./types"; @@ -12,6 +13,22 @@ export type AgentConfig = BaseGenerati | (( context: Omit, "suspend" | "writer">, ) => SCHEMA | Promise); + /** + * Override step identifier for workflow tracking + */ + stepId?: string; + /** + * Override step display name (defaults to agent name or ID) + */ + stepName?: string; + /** + * Override step purpose/description (defaults to agent purpose) + */ + stepPurpose?: string; + /** + * Live evaluation configuration for this step + */ + eval?: WorkflowEvalStepConfig; }; /** @@ -46,15 +63,21 @@ export function andAgent( agent: Agent, config: AgentConfig, ) { + const { schema, stepId, stepName, stepPurpose, eval: stepEvalConfig, ...restConfig } = config; + + const stepIdentifier = stepId ?? agent.id; + const resolvedStepName = stepName ?? agent.name ?? stepIdentifier; + const resolvedPurpose = stepPurpose ?? agent.purpose ?? null; + return { type: "agent", - id: agent.id, - name: agent.name || agent.id, - purpose: agent.purpose ?? null, + id: stepIdentifier, + name: resolvedStepName, + purpose: resolvedPurpose, + eval: stepEvalConfig, agent, execute: async (context) => { const { state } = context; - const { schema, ...restConfig } = config; const finalTask = typeof task === "function" ? await task(context) : task; const finalSchema = typeof schema === "function" ? await schema(context) : schema; diff --git a/packages/core/src/workflow/types.ts b/packages/core/src/workflow/types.ts index 09b6d5baf..2bca324e9 100644 --- a/packages/core/src/workflow/types.ts +++ b/packages/core/src/workflow/types.ts @@ -9,6 +9,7 @@ import type { UserContext } from "../agent/types"; import type { Memory } from "../memory"; import type { VoltAgentObservability } from "../observability"; import type { WorkflowExecutionContext } from "./context"; +import type { WorkflowEvalConfig } from "./eval/types"; import type { WorkflowState } from "./internal/state"; import type { InternalBaseWorkflowInputSchema } from "./internal/types"; import type { WorkflowStep } from "./steps"; @@ -371,6 +372,10 @@ export type WorkflowConfig< * If not provided, will use global observability or create a default one */ observability?: VoltAgentObservability; + /** + * Live evaluation configuration for workflow executions and optional per-step scorers + */ + eval?: WorkflowEvalConfig; }; /** @@ -419,6 +424,10 @@ export type Workflow< * Observability instance for OpenTelemetry integration */ observability?: VoltAgentObservability; + /** + * Live evaluation configuration applied to this workflow (including per-step overrides) + */ + evalConfig?: WorkflowEvalConfig; /** * Get the full state of the workflow including all steps * @returns The serialized workflow state