Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
392 changes: 272 additions & 120 deletions examples/with-live-evals/src/index.ts

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ export type {
WorkflowStats,
WorkflowTimelineEvent,
RegisteredWorkflow,
WorkflowEvalConfig,
WorkflowEvalScorerConfig,
WorkflowEvalScorerFactory,
WorkflowEvalScorerReference,
WorkflowEvalResult,
WorkflowEvalSamplingPolicy,
WorkflowEvalOperationType,
WorkflowEvalPayload,
WorkflowEvalContext,
WorkflowEvalStepConfig,
} from "./workflow";
// Export new Agent from agent.ts
export {
Expand Down
75 changes: 74 additions & 1 deletion packages/core/src/workflow/core.spec.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import type { UIMessageChunk } from "ai";
import { beforeEach, describe, expect, it } from "vitest";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { z } from "zod";
import type { LocalScorerDefinition } from "../eval/runtime";
import { Memory } from "../memory";
import { InMemoryStorageAdapter } from "../memory/adapters/storage/in-memory";
import { createWorkflow } from "./core";
import type { WorkflowEvalContext } from "./eval/types";
import { WorkflowRegistry } from "./registry";
import { andThen } from "./steps";

Expand Down Expand Up @@ -206,3 +208,74 @@ describe.sequential("workflow streaming", () => {
}
});
});

describe.sequential("workflow eval scoring", () => {
beforeEach(() => {
const registry = WorkflowRegistry.getInstance();
(registry as any).workflows.clear();
});

it("runs step-level scorers when configured", async () => {
const memory = new Memory({ storage: new InMemoryStorageAdapter() });
const onStepResult = vi.fn();

const stepScorer: LocalScorerDefinition<WorkflowEvalContext, Record<string, unknown>> = {
id: "step-scorer",
name: "Step Scorer",
scorer: () => ({
status: "success",
score: 0.9,
}),
};

const workflow = createWorkflow(
{
id: "eval-test",
name: "Eval Test",
input: z.object({ value: z.number() }),
result: z.object({ doubled: z.number() }),
memory,
},
andThen({
id: "multiply",
name: "Multiply",
execute: async ({ data }) => ({ doubled: data.value * 2 }),
eval: {
scorers: {
quality: {
scorer: stepScorer,
onResult: onStepResult,
},
},
},
}),
andThen({
id: "identity",
execute: async ({ data }) => data,
}),
);

const registry = WorkflowRegistry.getInstance();
registry.registerWorkflow(workflow);

await workflow.run({ value: 3 });

await new Promise((resolve) => setTimeout(resolve, 0));

expect(onStepResult).toHaveBeenCalledTimes(1);
const [result] = onStepResult.mock.calls[0];
expect(result.status).toBe("success");
expect(result.payload.target).toBe("step");
expect(result.payload.step?.id).toBe("multiply");
expect(result.payload.status).toBe("completed");
expect(result.metadata?.workflow?.id).toBe("eval-test");
expect(result.metadata?.step).toEqual(
expect.objectContaining({
id: "multiply",
metadata: expect.objectContaining({
durationMs: expect.any(Number),
}),
}),
);
});
});
138 changes: 137 additions & 1 deletion packages/core/src/workflow/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import { type VoltAgentObservability, createVoltAgentObservability } from "../ob
import { AgentRegistry } from "../registries/agent-registry";
import { randomUUID } from "../utils/id";
import type { WorkflowExecutionContext } from "./context";
import { enqueueWorkflowEvalScoring, enqueueWorkflowStepEvalScoring } from "./eval";
import type { WorkflowEvalHost, WorkflowEvalPayload } from "./eval/types";
import { createWorkflowStateManager } from "./internal/state";
import type { InternalBaseWorkflowInputSchema } from "./internal/types";
import {
Expand Down Expand Up @@ -628,6 +630,7 @@ export function createWorkflow<
resumeSchema,
memory: workflowMemory,
observability: workflowObservability,
eval: workflowEvalConfig,
}: WorkflowConfig<INPUT_SCHEMA, RESULT_SCHEMA, SUSPEND_SCHEMA, RESUME_SCHEMA>,
...steps: ReadonlyArray<BaseStep>
) {
Expand Down Expand Up @@ -690,6 +693,14 @@ export function createWorkflow<
return cachedObservability;
};

const workflowEvalHost: WorkflowEvalHost = {
id,
name,
logger,
evalConfig: workflowEvalConfig,
getObservability: () => getObservability(),
};

// Set default schemas if not provided
const effectiveSuspendSchema = suspendSchema || z.any();
const effectiveResumeSchema = resumeSchema || z.any();
Expand All @@ -714,6 +725,7 @@ export function createWorkflow<
// Only create stream controller if one is provided (for streaming execution)
// For normal run, we don't need a stream controller
const streamController = externalStreamController || null;
const operationType = streamController ? "stream" : "run";

// Get observability instance
const observability = getObservability();
Expand Down Expand Up @@ -1176,6 +1188,48 @@ export function createWorkflow<
},
});

const stepInputForEval = stateManager.state.data;
const stepStartTime = Date.now();
const userIdForEval = stateManager.state.userId ?? options?.userId;
const conversationIdForEval =
stateManager.state.conversationId ?? options?.conversationId;
const globalStepEvalConfig =
workflowEvalHost.evalConfig?.steps?.[step.id] ??
workflowEvalHost.evalConfig?.steps?.["*"];
const stepEvalConfig = step.eval ?? globalStepEvalConfig;

const scheduleStepScoring = (
status: WorkflowEvalPayload["status"],
details: { output?: unknown; metadata?: Record<string, unknown> },
) => {
enqueueWorkflowStepEvalScoring(
workflowEvalHost,
{
span: stepSpan,
executionId,
workflowId: id,
workflowName: name,
status,
input: stepInputForEval,
output: details.output,
rawInput: stepInputForEval,
rawOutput: details.output,
userId: userIdForEval,
conversationId: conversationIdForEval,
operation: operationType,
metadata: details.metadata,
step: {
id: step.id,
name: step.name ?? stepName,
index,
type: step.type,
metadata: details.metadata,
},
},
stepEvalConfig ?? undefined,
);
};

// Create stream writer for this step - real one for streaming, no-op for regular execution
const stepWriter = streamController
? new WorkflowStreamWriterImpl(
Expand Down Expand Up @@ -1310,6 +1364,19 @@ export function createWorkflow<
const isSkipped =
step.type === "conditional-when" && result === stateManager.state.data;

const stepDuration = Date.now() - stepStartTime;
const stepMetadata: Record<string, unknown> = {
durationMs: stepDuration,
};
if (isSkipped) {
stepMetadata.skipped = true;
}

scheduleStepScoring(isSkipped ? "skipped" : "completed", {
output: result,
metadata: stepMetadata,
});

stateManager.update({
data: result,
result: result,
Expand Down Expand Up @@ -1379,6 +1446,19 @@ export function createWorkflow<
// Get suspend data if provided
const suspendData = executionContext.context.get("suspendData");

const suspensionDuration = Date.now() - stepStartTime;
const suspensionEvalMetadata: Record<string, unknown> = {
durationMs: suspensionDuration,
reason: suspensionReason,
};
if (suspendData !== undefined) {
suspensionEvalMetadata.suspendData = suspendData;
}

scheduleStepScoring("suspended", {
metadata: suspensionEvalMetadata,
});

const suspensionMetadata = stateManager.suspend(
suspensionReason,
{
Expand Down Expand Up @@ -1461,6 +1541,28 @@ export function createWorkflow<
}

// End step span with error
const errorDuration = Date.now() - stepStartTime;
const errorMetadata: Record<string, unknown> = {
durationMs: errorDuration,
};
let errorMessage: string | undefined;
if (stepError instanceof Error && stepError.message) {
errorMessage = stepError.message;
} else if (stepError !== undefined) {
try {
errorMessage = safeStringify(stepError);
} catch {
errorMessage = String(stepError);
}
}
if (errorMessage) {
errorMetadata.errorMessage = errorMessage;
}

scheduleStepScoring("error", {
metadata: errorMetadata,
});

traceContext.endStepSpan(stepSpan, "error", {
error: stepError as Error,
});
Expand All @@ -1470,10 +1572,44 @@ export function createWorkflow<
}

const finalState = stateManager.finish();
const duration = finalState.endAt.getTime() - finalState.startAt.getTime();

// Record workflow completion in trace
traceContext.setOutput(finalState.result);
traceContext.setUsage(stateManager.state.usage);

if (workflowEvalHost.evalConfig) {
const metadata: Record<string, unknown> = {
durationMs: duration,
stepsExecuted: executionContext.steps.length,
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: executionContext.steps.length will always be 0 because this array is initialized empty and never populated. Consider using steps.length or tracking executed steps with a counter.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At packages/core/src/workflow/core.ts, line 1584:

<comment>`executionContext.steps.length` will always be 0 because this array is initialized empty and never populated. Consider using `steps.length` or tracking executed steps with a counter.</comment>

<file context>
@@ -1470,10 +1572,44 @@ export function createWorkflow&lt;
+        if (workflowEvalHost.evalConfig) {
+          const metadata: Record&lt;string, unknown&gt; = {
+            durationMs: duration,
+            stepsExecuted: executionContext.steps.length,
+          };
+          if (stateManager.state.usage) {
</file context>
Fix with Cubic

};
if (stateManager.state.usage) {
try {
metadata.usage = JSON.parse(safeStringify(stateManager.state.usage));
} catch (error) {
runLogger.debug("Failed to serialize workflow usage for eval metadata", {
error: error instanceof Error ? error.message : error,
});
}
}

enqueueWorkflowEvalScoring(workflowEvalHost, {
span: rootSpan,
executionId,
workflowId: id,
workflowName: name,
status: "completed",
input: stateManager.state.input,
output: finalState.result,
rawInput: stateManager.state.input,
rawOutput: finalState.result,
userId: stateManager.state.userId ?? options?.userId,
conversationId: stateManager.state.conversationId ?? options?.conversationId,
operation: operationType,
metadata,
});
}

traceContext.end("completed");

// Update Memory V2 state to completed
Expand All @@ -1491,7 +1627,6 @@ export function createWorkflow<
await hooks?.onEnd?.(stateManager.state);

// Log workflow completion with context
const duration = finalState.endAt.getTime() - finalState.startAt.getTime();
runLogger.debug(
`Workflow completed | user=${options?.userId || "anonymous"} conv=${options?.conversationId || "none"} duration=${duration}ms`,
{
Expand Down Expand Up @@ -1689,6 +1824,7 @@ export function createWorkflow<
// ✅ Always expose memory for registry access
memory: effectiveMemory,
observability: workflowObservability,
evalConfig: workflowEvalConfig,
getFullState: () => {
// Return workflow state similar to agent.getFullState
return {
Expand Down
Loading
Loading