Skip to content

Commit d3ed347

Browse files
authored
fix: persist workflow execution timeline events to prevent data loss after completion - VoltAgent#647 (VoltAgent#738)
1 parent 6892fac commit d3ed347

File tree

7 files changed

+357
-110
lines changed

7 files changed

+357
-110
lines changed

.changeset/thirty-geckos-design.md

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
---
2+
"@voltagent/postgres": patch
3+
"@voltagent/supabase": patch
4+
"@voltagent/libsql": patch
5+
"@voltagent/core": patch
6+
---
7+
8+
fix: persist workflow execution timeline events to prevent data loss after completion - #647
9+
10+
## The Problem
11+
12+
When workflows executed, their timeline events (step-start, step-complete, workflow-complete, etc.) were only visible during streaming. Once the workflow completed, the WebSocket state update would replace the execution object without the events field, causing the timeline UI to reset and lose all execution history. Users couldn't see what happened in completed or suspended workflows.
13+
14+
**Symptoms:**
15+
16+
- Timeline showed events during execution
17+
- Timeline cleared/reset when workflow completed
18+
- No execution history for completed workflows
19+
- Events were lost after browser refresh
20+
21+
## The Solution
22+
23+
**Backend (Framework)**:
24+
25+
- Added `events`, `output`, and `cancellation` fields to `WorkflowStateEntry` interface
26+
- Modified workflow execution to collect all stream events in memory during execution
27+
- Persist collected events to workflow state when workflow completes, suspends, fails, or is cancelled
28+
- Updated all storage adapters to support the new fields:
29+
- **LibSQL**: Added schema columns + automatic migration method (`addWorkflowStateColumns`)
30+
- **Supabase**: Added schema columns + migration detection + ALTER TABLE migration SQL
31+
- **Postgres**: Added schema columns + INSERT/UPDATE queries
32+
- **In-Memory**: Automatically supported via TypeScript interface
33+
34+
**Frontend (Console)**:
35+
36+
- Updated `WorkflowPlaygroundProvider` to include events when converting `WorkflowStateEntry``WorkflowHistoryEntry`
37+
- Implemented smart merge strategy for WebSocket updates: Use backend persisted events when workflow finishes, keep streaming events during execution
38+
- Events are now preserved across page refreshes and always visible in timeline UI
39+
40+
## What Gets Persisted
41+
42+
```typescript
43+
// In WorkflowStateEntry (stored in Memory V2):
44+
{
45+
"events": [
46+
{
47+
"id": "evt_123",
48+
"type": "workflow-start",
49+
"name": "Workflow Started",
50+
"startTime": "2025-01-24T10:00:00Z",
51+
"status": "running",
52+
"input": { "userId": "123" }
53+
},
54+
{
55+
"id": "evt_124",
56+
"type": "step-complete",
57+
"name": "Step: fetch-user",
58+
"startTime": "2025-01-24T10:00:01Z",
59+
"endTime": "2025-01-24T10:00:02Z",
60+
"status": "success",
61+
"output": { "user": { "name": "John" } }
62+
}
63+
],
64+
"output": { "result": "success" },
65+
"cancellation": {
66+
"cancelledAt": "2025-01-24T10:00:05Z",
67+
"reason": "User requested cancellation"
68+
}
69+
}
70+
```
71+
72+
## Migration Guide
73+
74+
### LibSQL Users
75+
76+
No action required - migrations run automatically on next initialization.
77+
78+
### Supabase Users
79+
80+
When you upgrade and initialize the adapter, you'll see migration SQL in the console. Run it in your Supabase SQL Editor:
81+
82+
```sql
83+
-- Add workflow event persistence columns
84+
ALTER TABLE voltagent_workflow_states
85+
ADD COLUMN IF NOT EXISTS events JSONB;
86+
87+
ALTER TABLE voltagent_workflow_states
88+
ADD COLUMN IF NOT EXISTS output JSONB;
89+
90+
ALTER TABLE voltagent_workflow_states
91+
ADD COLUMN IF NOT EXISTS cancellation JSONB;
92+
```
93+
94+
### Postgres Users
95+
96+
New deployments get the columns automatically. For existing tables, run:
97+
98+
```sql
99+
ALTER TABLE voltagent_workflow_states
100+
ADD COLUMN IF NOT EXISTS events JSONB,
101+
ADD COLUMN IF NOT EXISTS output JSONB,
102+
ADD COLUMN IF NOT EXISTS cancellation JSONB;
103+
```
104+
105+
### In-Memory Users
106+
107+
No action required - automatically supported.
108+
109+
## Impact
110+
111+
- ✅ Workflow execution timeline is now persistent and survives completion
112+
- ✅ Full execution history visible for completed, suspended, and failed workflows
113+
- ✅ Events, output, and cancellation metadata preserved in database
114+
- ✅ Console UI timeline works consistently across all workflow states
115+
- ✅ All storage backends (LibSQL, Supabase, Postgres, In-Memory) behave consistently
116+
- ✅ No data loss on workflow completion or page refresh

packages/core/src/memory/types.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,30 @@ export interface WorkflowStateEntry {
105105
};
106106
suspendData?: any;
107107
};
108+
/**
109+
* Stream events collected during execution
110+
* Used for timeline visualization in UI
111+
*/
112+
events?: Array<{
113+
id: string;
114+
type: string;
115+
name?: string;
116+
from?: string;
117+
startTime: string;
118+
endTime?: string;
119+
status?: string;
120+
input?: any;
121+
output?: any;
122+
metadata?: Record<string, unknown>;
123+
context?: Record<string, unknown>;
124+
}>;
125+
/** Final output of the workflow execution */
126+
output?: unknown;
127+
/** Cancellation metadata */
128+
cancellation?: {
129+
cancelledAt: Date;
130+
reason?: string;
131+
};
108132
/** User ID if applicable */
109133
userId?: string;
110134
/** Conversation ID if applicable */

packages/core/src/workflow/core.ts

Lines changed: 92 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -640,6 +640,19 @@ export function createWorkflow<
640640
executionId: string,
641641
memory: typeof effectiveMemory,
642642
logger: Logger,
643+
events: Array<{
644+
id: string;
645+
type: string;
646+
name?: string;
647+
from?: string;
648+
startTime: string;
649+
endTime?: string;
650+
status?: string;
651+
input?: any;
652+
output?: any;
653+
metadata?: Record<string, unknown>;
654+
context?: Record<string, unknown>;
655+
}>,
643656
): Promise<void> => {
644657
try {
645658
logger.trace(`Storing suspension checkpoint for execution ${executionId}`);
@@ -655,6 +668,7 @@ export function createWorkflow<
655668
suspendData: suspensionData.suspendData,
656669
}
657670
: undefined,
671+
events,
658672
updatedAt: new Date(),
659673
});
660674
logger.trace(`Successfully stored suspension checkpoint for execution ${executionId}`);
@@ -715,6 +729,58 @@ export function createWorkflow<
715729
// For normal run, we don't need a stream controller
716730
const streamController = externalStreamController || null;
717731

732+
// Collect events during execution for persistence
733+
const collectedEvents: Array<{
734+
id: string;
735+
type: string;
736+
name?: string;
737+
from?: string;
738+
startTime: string;
739+
endTime?: string;
740+
status?: string;
741+
input?: any;
742+
output?: any;
743+
metadata?: Record<string, unknown>;
744+
context?: Record<string, unknown>;
745+
}> = [];
746+
747+
// Helper to emit event and collect for persistence
748+
const emitAndCollectEvent = (event: {
749+
type: string;
750+
executionId: string;
751+
from: string;
752+
input?: any;
753+
output?: any;
754+
status: string;
755+
context?: any;
756+
timestamp: string;
757+
stepIndex?: number;
758+
stepType?: string;
759+
metadata?: Record<string, any>;
760+
error?: any;
761+
}) => {
762+
// Emit to stream if available
763+
if (streamController) {
764+
streamController.emit(event as any);
765+
}
766+
767+
// Collect for persistence (convert to storage format)
768+
const collectedEvent = {
769+
id: randomUUID(),
770+
type: event.type,
771+
name: event.from,
772+
from: event.from,
773+
startTime: event.timestamp,
774+
endTime: event.timestamp, // Will be updated on complete events
775+
status: event.status,
776+
input: event.input,
777+
output: event.output,
778+
metadata: event.metadata,
779+
context: event.context as Record<string, unknown> | undefined,
780+
};
781+
collectedEvents.push(collectedEvent);
782+
};
783+
718784
// Get observability instance
719785
const observability = getObservability();
720786

@@ -882,7 +948,7 @@ export function createWorkflow<
882948
};
883949

884950
// Emit workflow start event
885-
streamController?.emit({
951+
emitAndCollectEvent({
886952
type: "workflow-start",
887953
executionId,
888954
from: name,
@@ -967,7 +1033,7 @@ export function createWorkflow<
9671033
stepData.output = stateManager.state.data;
9681034
}
9691035

970-
streamController?.emit({
1036+
emitAndCollectEvent({
9711037
type: "step-complete",
9721038
executionId,
9731039
from: stepName,
@@ -991,6 +1057,11 @@ export function createWorkflow<
9911057
try {
9921058
await effectiveMemory.updateWorkflowState(executionId, {
9931059
status: "cancelled",
1060+
events: collectedEvents,
1061+
cancellation: {
1062+
cancelledAt: new Date(),
1063+
reason,
1064+
},
9941065
metadata: {
9951066
...(stateManager.state?.usage ? { usage: stateManager.state.usage } : {}),
9961067
cancellationReason: reason,
@@ -1003,7 +1074,7 @@ export function createWorkflow<
10031074
});
10041075
}
10051076

1006-
streamController?.emit({
1077+
emitAndCollectEvent({
10071078
type: "workflow-cancelled",
10081079
executionId,
10091080
from: name,
@@ -1124,7 +1195,13 @@ export function createWorkflow<
11241195
// Save suspension state to memory
11251196
const suspensionData = stateManager.state.suspension;
11261197
try {
1127-
await saveSuspensionState(suspensionData, executionId, effectiveMemory, runLogger);
1198+
await saveSuspensionState(
1199+
suspensionData,
1200+
executionId,
1201+
effectiveMemory,
1202+
runLogger,
1203+
collectedEvents,
1204+
);
11281205
} catch (_) {
11291206
// Error already logged in saveSuspensionState, don't throw
11301207
}
@@ -1191,7 +1268,7 @@ export function createWorkflow<
11911268
executionContext.streamWriter = stepWriter;
11921269

11931270
// Emit step start event
1194-
streamController?.emit({
1271+
emitAndCollectEvent({
11951272
type: "step-start",
11961273
executionId,
11971274
from: step.name || step.id,
@@ -1341,7 +1418,7 @@ export function createWorkflow<
13411418
);
13421419

13431420
// Emit step complete event
1344-
streamController?.emit({
1421+
emitAndCollectEvent({
13451422
type: "step-complete",
13461423
executionId,
13471424
from: stepName,
@@ -1398,7 +1475,7 @@ export function createWorkflow<
13981475
runLogger.debug(`Workflow suspended at step ${index}`, suspensionMetadata);
13991476

14001477
// Emit suspension event to stream
1401-
streamController?.emit({
1478+
emitAndCollectEvent({
14021479
type: "workflow-suspended",
14031480
executionId,
14041481
from: step.name || step.id,
@@ -1437,6 +1514,7 @@ export function createWorkflow<
14371514
executionId,
14381515
effectiveMemory,
14391516
runLogger,
1517+
collectedEvents,
14401518
);
14411519
} catch (_) {
14421520
// Error already logged in saveSuspensionState, don't throw
@@ -1477,10 +1555,12 @@ export function createWorkflow<
14771555
traceContext.setUsage(stateManager.state.usage);
14781556
traceContext.end("completed");
14791557

1480-
// Update Memory V2 state to completed
1558+
// Update Memory V2 state to completed with events and output
14811559
try {
14821560
await effectiveMemory.updateWorkflowState(executionContext.executionId, {
14831561
status: "completed",
1562+
events: collectedEvents,
1563+
output: finalState.result,
14841564
updatedAt: new Date(),
14851565
});
14861566
} catch (memoryError) {
@@ -1502,7 +1582,7 @@ export function createWorkflow<
15021582
);
15031583

15041584
// Emit workflow complete event
1505-
streamController?.emit({
1585+
emitAndCollectEvent({
15061586
type: "workflow-complete",
15071587
executionId,
15081588
from: name,
@@ -1543,7 +1623,7 @@ export function createWorkflow<
15431623

15441624
workflowRegistry.activeExecutions.delete(executionId);
15451625

1546-
streamController?.emit({
1626+
emitAndCollectEvent({
15471627
type: "workflow-cancelled",
15481628
executionId,
15491629
from: name,
@@ -1626,7 +1706,7 @@ export function createWorkflow<
16261706
);
16271707

16281708
// Emit workflow error event
1629-
streamController?.emit({
1709+
emitAndCollectEvent({
16301710
type: "workflow-error",
16311711
executionId,
16321712
from: name,
@@ -1644,6 +1724,7 @@ export function createWorkflow<
16441724
try {
16451725
await effectiveMemory.updateWorkflowState(executionId, {
16461726
status: "error",
1727+
events: collectedEvents,
16471728
// Store a lightweight error summary in metadata for debugging
16481729
metadata: {
16491730
...(stateManager.state?.usage ? { usage: stateManager.state.usage } : {}),

0 commit comments

Comments
 (0)