-
-
Notifications
You must be signed in to change notification settings - Fork 456
feat: add resumable streaming support via @voltagent/resumable-streams #921
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
77bc378
feat: add resumable streaming support via @voltagent/resumable-streams
omeraplak 5985ca3
fix: type issue
omeraplak f200379
fix: sp lint
omeraplak 24ab4b6
feat: update examples and voltops
omeraplak dd87298
fix: voltops api url
omeraplak 3a2bf38
chore: update readme
omeraplak ef0e91f
chore: fix doc link
omeraplak e3999c3
chore: sp lint fix
omeraplak File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,81 @@ | ||
| --- | ||
| "@voltagent/resumable-streams": patch | ||
| "@voltagent/serverless-hono": patch | ||
| "@voltagent/server-core": patch | ||
| "@voltagent/server-hono": patch | ||
| "@voltagent/core": patch | ||
| --- | ||
|
|
||
| feat: add resumable streaming support via @voltagent/resumable-streams, with server adapters that let clients reconnect to in-flight streams. | ||
|
|
||
| ```ts | ||
| import { openai } from "@ai-sdk/openai"; | ||
| import { Agent, VoltAgent } from "@voltagent/core"; | ||
| import { | ||
| createResumableStreamAdapter, | ||
| createResumableStreamRedisStore, | ||
| } from "@voltagent/resumable-streams"; | ||
| import { honoServer } from "@voltagent/server-hono"; | ||
|
|
||
| const streamStore = await createResumableStreamRedisStore(); | ||
| const resumableStream = await createResumableStreamAdapter({ streamStore }); | ||
|
|
||
| const agent = new Agent({ | ||
| id: "assistant", | ||
| name: "Resumable Stream Agent", | ||
| instructions: "You are a helpful assistant.", | ||
| model: openai("gpt-4o-mini"), | ||
| }); | ||
|
|
||
| new VoltAgent({ | ||
| agents: { assistant: agent }, | ||
| server: honoServer({ | ||
| resumableStream: { adapter: resumableStream }, | ||
| }), | ||
| }); | ||
|
|
||
| await fetch("http://localhost:3141/agents/assistant/chat", { | ||
| method: "POST", | ||
| headers: { "Content-Type": "application/json" }, | ||
| body: `{"input":"Hello!","options":{"conversationId":"conv-1","userId":"user-1","resumableStream":true}}`, | ||
| }); | ||
|
|
||
| // Resume the same stream after reconnect/refresh | ||
| const resumeResponse = await fetch( | ||
| "http://localhost:3141/agents/assistant/chat/conv-1/stream?userId=user-1" | ||
| ); | ||
|
|
||
| const reader = resumeResponse.body?.getReader(); | ||
| const decoder = new TextDecoder(); | ||
| while (reader) { | ||
| const { done, value } = await reader.read(); | ||
| if (done) break; | ||
| const chunk = decoder.decode(value, { stream: true }); | ||
| console.log(chunk); | ||
| } | ||
| ``` | ||
|
|
||
| AI SDK client (resume on refresh): | ||
|
|
||
| ```tsx | ||
| import { useChat } from "@ai-sdk/react"; | ||
| import { DefaultChatTransport } from "ai"; | ||
|
|
||
| const { messages, sendMessage } = useChat({ | ||
| id: chatId, | ||
| messages: initialMessages, | ||
| resume: true, | ||
| transport: new DefaultChatTransport({ | ||
| api: "/api/chat", | ||
| prepareSendMessagesRequest: ({ id, messages }) => ({ | ||
| body: { | ||
| message: messages[messages.length - 1], | ||
| options: { conversationId: id, userId }, | ||
| }, | ||
| }), | ||
| prepareReconnectToStreamRequest: ({ id }) => ({ | ||
| api: `/api/chat/${id}/stream?userId=${encodeURIComponent(userId)}`, | ||
| }), | ||
| }), | ||
| }); | ||
| ``` |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| OPENAI_API_KEY=sk-proj- | ||
| REDIS_URL=redis://localhost:6379 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| # See https://help.github.com/articles/ignoring-files/ for more about ignoring files. | ||
|
|
||
| # dependencies | ||
| /node_modules | ||
| /.pnp | ||
| .pnp.* | ||
| .yarn/* | ||
| !.yarn/patches | ||
| !.yarn/plugins | ||
| !.yarn/releases | ||
| !.yarn/versions | ||
|
|
||
| # testing | ||
| /coverage | ||
|
|
||
| # next.js | ||
| /.next/ | ||
| /out/ | ||
|
|
||
| # production | ||
| /build | ||
|
|
||
| # misc | ||
| .DS_Store | ||
| *.pem | ||
|
|
||
| # debug | ||
| npm-debug.log* | ||
| yarn-debug.log* | ||
| yarn-error.log* | ||
| .pnpm-debug.log* | ||
|
|
||
| # env files (can opt-in for committing if needed) | ||
| .env | ||
|
|
||
| # vercel | ||
| .vercel | ||
|
|
||
| # typescript | ||
| *.tsbuildinfo | ||
| next-env.d.ts |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,53 @@ | ||
| <div align="center"> | ||
| <a href="https://voltagent.dev/"> | ||
| <img width="1800" alt="435380213-b6253409-8741-462b-a346-834cd18565a9" src="https://github.com/user-attachments/assets/452a03e7-eeda-4394-9ee7-0ffbcf37245c" /> | ||
| </a> | ||
|
|
||
| <br/> | ||
| <br/> | ||
|
|
||
| <div align="center"> | ||
| <a href="https://voltagent.dev">Home Page</a> | | ||
| <a href="https://voltagent.dev/docs/">Documentation</a> | | ||
| <a href="https://github.com/voltagent/voltagent/tree/main/examples">Examples</a> | | ||
| <a href="https://s.voltagent.dev/discord">Discord</a> | | ||
| <a href="https://voltagent.dev/blog/">Blog</a> | ||
| </div> | ||
| </div> | ||
|
|
||
| <br/> | ||
|
|
||
| <div align="center"> | ||
| <strong>VoltAgent is an open source TypeScript framework for building and orchestrating AI agents.</strong><br> | ||
| Escape the limitations of no-code builders and the complexity of starting from scratch. | ||
| <br /> | ||
| <br /> | ||
| </div> | ||
|
|
||
| <div align="center"> | ||
|
|
||
| [](https://www.npmjs.com/package/@voltagent/core) | ||
| [](CODE_OF_CONDUCT.md) | ||
| [](https://s.voltagent.dev/discord) | ||
| [](https://twitter.com/voltagent_dev) | ||
|
|
||
| </div> | ||
|
|
||
| <br/> | ||
|
|
||
| <div align="center"> | ||
| <a href="https://voltagent.dev/"> | ||
| <img width="896" alt="VoltAgent Schema" src="https://github.com/user-attachments/assets/f0627868-6153-4f63-ba7f-bdfcc5dd603d" /> | ||
| </a> | ||
|
|
||
| </div> | ||
|
|
||
| ## VoltAgent: Build AI Agents Fast and Flexibly | ||
|
|
||
| VoltAgent is an open-source TypeScript framework for creating and managing AI agents. It provides modular components to build, customize, and scale agents with ease. From connecting to APIs and memory management to supporting multiple LLMs, VoltAgent simplifies the process of creating sophisticated AI systems. It enables fast development, maintains clean code, and offers flexibility to switch between models and tools without vendor lock-in. | ||
|
|
||
| ## Try Example | ||
|
|
||
| ```bash | ||
| npm create voltagent-app@latest -- --example with-nextjs-resumable-stream | ||
| ``` | ||
37 changes: 37 additions & 0 deletions
37
examples/with-nextjs-resumable-stream/app/api/chat/[id]/stream/route.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,37 @@ | ||
| import { getResumableStreamAdapter } from "@/lib/resumable-stream"; | ||
| import { supervisorAgent } from "@/voltagent"; | ||
| import { safeStringify } from "@voltagent/internal/utils"; | ||
| import { createResumableChatSession } from "@voltagent/resumable-streams"; | ||
|
|
||
| const jsonError = (status: number, message: string) => | ||
| new Response(safeStringify({ error: message, message }), { | ||
| status, | ||
| headers: { "Content-Type": "application/json" }, | ||
| }); | ||
|
|
||
| export async function GET(request: Request, { params }: { params: Promise<{ id: string }> }) { | ||
| const { id } = await params; | ||
| if (!id) { | ||
| return jsonError(400, "conversationId is required"); | ||
| } | ||
|
|
||
| const userId = new URL(request.url).searchParams.get("userId"); | ||
| if (!userId) { | ||
| return jsonError(400, "userId is required"); | ||
| } | ||
| const agentId = supervisorAgent.id; | ||
| const resumableStream = await getResumableStreamAdapter(); | ||
| const session = createResumableChatSession({ | ||
| adapter: resumableStream, | ||
| conversationId: id, | ||
| userId, | ||
| agentId, | ||
| }); | ||
|
|
||
| try { | ||
| return await session.resumeResponse(); | ||
| } catch (error) { | ||
| console.error("[API] Failed to resume stream:", error); | ||
| return new Response(null, { status: 204 }); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P2: Returning 204 for all errors masks actual failures. HTTP 204 means "success with no content", not "error occurred". Consider returning 500 for unexpected errors, or use the existing Prompt for AI agents |
||
| } | ||
| } | ||
87 changes: 87 additions & 0 deletions
87
examples/with-nextjs-resumable-stream/app/api/chat/route.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,87 @@ | ||
| import { getResumableStreamAdapter } from "@/lib/resumable-stream"; | ||
| import { supervisorAgent } from "@/voltagent"; | ||
| import { setWaitUntil } from "@voltagent/core"; | ||
| import { safeStringify } from "@voltagent/internal/utils"; | ||
| import { createResumableChatSession } from "@voltagent/resumable-streams"; | ||
| import { after } from "next/server"; | ||
|
|
||
| const jsonError = (status: number, message: string) => | ||
| new Response(safeStringify({ error: message, message }), { | ||
| status, | ||
| headers: { "Content-Type": "application/json" }, | ||
| }); | ||
|
|
||
| export async function POST(req: Request) { | ||
| try { | ||
| const body = await req.json(); | ||
| const messages = Array.isArray(body?.messages) ? body.messages : []; | ||
| const message = body?.message; | ||
| const options = | ||
| body?.options && typeof body.options === "object" | ||
| ? (body.options as Record<string, unknown>) | ||
| : undefined; | ||
| const conversationId = | ||
| typeof options?.conversationId === "string" ? options.conversationId : undefined; | ||
| const userId = typeof options?.userId === "string" ? options.userId : undefined; | ||
| const input = | ||
| message !== undefined ? (typeof message === "string" ? message : [message]) : messages; | ||
|
|
||
| if (!conversationId) { | ||
| return jsonError(400, "options.conversationId is required"); | ||
| } | ||
|
|
||
| if (!userId) { | ||
| return jsonError(400, "options.userId is required"); | ||
| } | ||
|
|
||
| if (isEmptyInput(input)) { | ||
| return jsonError(400, "Message input is required"); | ||
| } | ||
|
|
||
| // Enable non-blocking OTel export for Vercel/serverless | ||
| // This ensures spans are flushed in the background without blocking the response | ||
| setWaitUntil(after); | ||
|
|
||
| const agentId = supervisorAgent.id; | ||
| const resumableStream = await getResumableStreamAdapter(); | ||
| const session = createResumableChatSession({ | ||
| adapter: resumableStream, | ||
| conversationId, | ||
| userId, | ||
| agentId, | ||
| }); | ||
|
|
||
| try { | ||
| await session.clearActiveStream(); | ||
| } catch (error) { | ||
| console.error("[API] Failed to clear active resumable stream:", error); | ||
| } | ||
|
|
||
| // Stream text from the supervisor agent with proper context | ||
| // The agent accepts UIMessage[] directly | ||
| const result = await supervisorAgent.streamText(input, { | ||
| userId, | ||
| conversationId, | ||
| }); | ||
|
|
||
| return result.toUIMessageStreamResponse({ | ||
| consumeSseStream: session.consumeSseStream, | ||
| onFinish: session.onFinish, | ||
| }); | ||
| } catch (error) { | ||
| console.error("[API] Chat error:", error); | ||
| return jsonError(500, "Internal server error"); | ||
| } | ||
| } | ||
|
|
||
| function isEmptyInput(input: unknown) { | ||
| if (input == null) { | ||
| return true; | ||
| } | ||
|
|
||
| if (typeof input === "string") { | ||
| return input.trim().length === 0; | ||
| } | ||
|
|
||
| return Array.isArray(input) && input.length === 0; | ||
| } |
17 changes: 17 additions & 0 deletions
17
examples/with-nextjs-resumable-stream/app/api/messages/route.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| import { sharedMemory } from "@/voltagent/memory"; | ||
|
|
||
| export async function GET(request: Request) { | ||
| const { searchParams } = new URL(request.url); | ||
| const conversationId = searchParams.get("conversationId"); | ||
| const userId = searchParams.get("userId"); | ||
|
|
||
| if (!conversationId || !userId) { | ||
| return Response.json({ error: "conversationId and userId are required" }, { status: 400 }); | ||
| } | ||
|
|
||
| const uiMessages = await sharedMemory.getMessages(userId, conversationId); | ||
|
|
||
| return Response.json({ | ||
| data: uiMessages || [], | ||
| }); | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: This README change removes essential setup documentation. Users will not know that
OPENAI_API_KEYandREDIS_URLenvironment variables are required, nor will they understand the project structure (adapter location, route handlers, component paths). Consider keeping the Setup and Notes sections from the original README below the 'Try Example' section, similar to howwith-resumable-streamsdocuments its example.Prompt for AI agents
Setup
Update
OPENAI_API_KEYandREDIS_URLin.env.Open
http://localhost:3000.Notes
components/ai-elements.lib/resumable-stream.ts, and route handlers usecreateResumableChatSessionto manage lifecycle.app/api/chat/route.ts.app/api/chat/[id]/stream/route.ts.activeStreamStoreif you need a different backend.voltagent/.