|
| 1 | +--- |
| 2 | +title: Build a resumable streaming chat |
| 3 | +pcx_content_type: tutorial |
| 4 | +tags: |
| 5 | + - AI SDK |
| 6 | + - Streaming |
| 7 | +sidebar: |
| 8 | + order: 8 |
| 9 | +--- |
| 10 | + |
| 11 | +import { TypeScriptExample, PackageManagers, Steps } from "~/components"; |
| 12 | + |
| 13 | +Build a chat application with automatic resumable streaming that continues seamlessly when users disconnect and reconnect. The `AIChatAgent` class provides built-in resumable streaming, automatically persisting and resuming AI responses during network interruptions. |
| 14 | + |
| 15 | +## What you will build |
| 16 | + |
| 17 | +A real-time AI chat application that: |
| 18 | +- Streams AI responses in real-time |
| 19 | +- Automatically resumes streams after network disconnections |
| 20 | +- Persists chat history across sessions |
| 21 | +- Works without additional configuration |
| 22 | + |
| 23 | +## Prerequisites |
| 24 | + |
| 25 | +- An OpenAI API key (or another AI provider supported by [Vercel AI SDK](https://sdk.vercel.ai/docs/introduction)) |
| 26 | + |
| 27 | +## How resumable streaming works |
| 28 | + |
| 29 | +When you use `AIChatAgent` with `useAgentChat`: |
| 30 | + |
| 31 | +1. **During streaming**: All chunks are automatically persisted to SQLite |
| 32 | +2. **On disconnect**: The stream continues server-side, buffering chunks |
| 33 | +3. **On reconnect**: Client receives all buffered chunks and continues streaming |
| 34 | + |
| 35 | +No additional setup required - it works automatically. |
| 36 | + |
| 37 | +## 1. Create a chat agent |
| 38 | + |
| 39 | +<Steps> |
| 40 | + |
| 41 | +1. Create a new Agent project using the `hello-world` template: |
| 42 | + |
| 43 | + <PackageManagers |
| 44 | + type="create" |
| 45 | + pkg="cloudflare@latest" |
| 46 | + args={"my-chat --template=cloudflare/ai/demos/hello-world"} |
| 47 | + /> |
| 48 | + |
| 49 | +2. Move into the project directory and install dependencies: |
| 50 | + |
| 51 | + ```sh |
| 52 | + cd my-chat |
| 53 | + ``` |
| 54 | + |
| 55 | +3. Install the AI SDK and OpenAI provider: |
| 56 | + |
| 57 | + <PackageManagers type="install" pkg="ai @ai-sdk/openai" /> |
| 58 | + |
| 59 | +4. Add your OpenAI API key to `.dev.vars`: |
| 60 | + |
| 61 | + ```txt title=".dev.vars" |
| 62 | + OPENAI_API_KEY=your_openai_api_key_here |
| 63 | + ``` |
| 64 | + |
| 65 | +</Steps> |
| 66 | + |
| 67 | +## 2. Implement the server-side agent |
| 68 | + |
| 69 | +<Steps> |
| 70 | + |
| 71 | +1. Replace the contents of `src/index.ts` with a chat agent that extends `AIChatAgent`: |
| 72 | + |
| 73 | + <TypeScriptExample> |
| 74 | + |
| 75 | + ```ts title="src/index.ts" |
| 76 | + import { openai } from "@ai-sdk/openai"; |
| 77 | + import { type AgentNamespace, routeAgentRequest } from "agents"; |
| 78 | + import { AIChatAgent } from "agents/ai-chat-agent"; |
| 79 | + import { |
| 80 | + streamText, |
| 81 | + convertToModelMessages, |
| 82 | + createUIMessageStream, |
| 83 | + createUIMessageStreamResponse |
| 84 | + } from "ai"; |
| 85 | + |
| 86 | + type Env = { |
| 87 | + OPENAI_API_KEY: string; |
| 88 | + ChatAgent: AgentNamespace<ChatAgent>; |
| 89 | + }; |
| 90 | + |
| 91 | + export class ChatAgent extends AIChatAgent<Env> { |
| 92 | + async onChatMessage() { |
| 93 | + const stream = createUIMessageStream({ |
| 94 | + execute: async ({ writer }) => { |
| 95 | + const result = streamText({ |
| 96 | + model: openai("gpt-4o"), |
| 97 | + messages: convertToModelMessages(this.messages) |
| 98 | + }); |
| 99 | + |
| 100 | + writer.merge(result.toUIMessageStream()); |
| 101 | + } |
| 102 | + }); |
| 103 | + return createUIMessageStreamResponse({ stream }); |
| 104 | + } |
| 105 | + } |
| 106 | + |
| 107 | + export default { |
| 108 | + async fetch(request: Request, env: Env) { |
| 109 | + return ( |
| 110 | + (await routeAgentRequest(request, env)) || |
| 111 | + new Response("Not found", { status: 404 }) |
| 112 | + ); |
| 113 | + } |
| 114 | + } satisfies ExportedHandler<Env>; |
| 115 | + ``` |
| 116 | + </TypeScriptExample> |
| 117 | + |
| 118 | +2. Update `wrangler.jsonc` to enable SQLite for stream persistence: |
| 119 | + |
| 120 | + ```jsonc title="wrangler.jsonc" |
| 121 | + { |
| 122 | + "$schema": "../../node_modules/wrangler/config-schema.json", |
| 123 | + "compatibility_date": "2025-03-14", |
| 124 | + "compatibility_flags": [ |
| 125 | + "nodejs_compat", |
| 126 | + "nodejs_compat_populate_process_env" |
| 127 | + ], |
| 128 | + "durable_objects": { |
| 129 | + "bindings": [ |
| 130 | + { |
| 131 | + "class_name": "ChatAgent", |
| 132 | + "name": "ChatAgent" |
| 133 | + } |
| 134 | + ] |
| 135 | + }, |
| 136 | + "main": "src/index.ts", |
| 137 | + "migrations": [ |
| 138 | + { |
| 139 | + "new_sqlite_classes": ["ChatAgent"], |
| 140 | + "tag": "v1" |
| 141 | + } |
| 142 | + ], |
| 143 | + "name": "my-chat" |
| 144 | + } |
| 145 | + ``` |
| 146 | + |
| 147 | +</Steps> |
| 148 | + |
| 149 | +The `AIChatAgent` class automatically handles: |
| 150 | +- Creating SQLite tables for stream chunks and metadata |
| 151 | +- Buffering and flushing chunks every 100ms |
| 152 | +- Detecting reconnections and sending `CF_AGENT_STREAM_RESUMING` notifications |
| 153 | +- Cleaning up old streams after 24 hours |
| 154 | + |
| 155 | +## 3. Build the client interface |
| 156 | + |
| 157 | +<Steps> |
| 158 | + |
| 159 | +1. Install React dependencies: |
| 160 | + |
| 161 | + <PackageManagers type="install" pkg="react react-dom agents" /> |
| 162 | + |
| 163 | + <PackageManagers type="install" pkg="-D @types/react @types/react-dom" /> |
| 164 | + |
| 165 | +2. Create a chat component that uses `useAgentChat`: |
| 166 | + |
| 167 | + <TypeScriptExample> |
| 168 | + |
| 169 | + ```tsx title="src/client.tsx" |
| 170 | + import { useState, useRef, useEffect } from "react"; |
| 171 | + import { useAgent } from "agents/react"; |
| 172 | + import { useAgentChat } from "agents/ai-react"; |
| 173 | + |
| 174 | + export default function Chat() { |
| 175 | + const [input, setInput] = useState(""); |
| 176 | + const messagesEndRef = useRef<HTMLDivElement>(null); |
| 177 | + |
| 178 | + const agent = useAgent({ |
| 179 | + agent: "ChatAgent", |
| 180 | + name: "my-chat" |
| 181 | + }); |
| 182 | + |
| 183 | + const { messages, sendMessage, status } = useAgentChat({ |
| 184 | + agent |
| 185 | + // resume: true is the default - streams automatically resume on reconnect |
| 186 | + }); |
| 187 | + |
| 188 | + const isStreaming = status === "streaming"; |
| 189 | + |
| 190 | + useEffect(() => { |
| 191 | + messagesEndRef.current?.scrollIntoView({ behavior: "smooth" }); |
| 192 | + }, [messages]); |
| 193 | + |
| 194 | + const handleSubmit = async (e: React.FormEvent) => { |
| 195 | + e.preventDefault(); |
| 196 | + if (!input.trim() || isStreaming) return; |
| 197 | + |
| 198 | + const message = input; |
| 199 | + setInput(""); |
| 200 | + |
| 201 | + await sendMessage({ |
| 202 | + role: "user", |
| 203 | + parts: [{ type: "text", text: message }] |
| 204 | + }); |
| 205 | + }; |
| 206 | + |
| 207 | + return ( |
| 208 | + <div style={{ display: "flex", flexDirection: "column", height: "100vh" }}> |
| 209 | + <div style={{ flex: 1, overflowY: "auto", padding: "1rem" }}> |
| 210 | + {messages.map((message) => { |
| 211 | + const text = message.parts |
| 212 | + .filter((part) => part.type === "text") |
| 213 | + .map((part) => part.text) |
| 214 | + .join(""); |
| 215 | + |
| 216 | + return ( |
| 217 | + <div key={message.id} style={{ marginBottom: "1rem" }}> |
| 218 | + <strong>{message.role === "user" ? "You" : "Assistant"}:</strong> |
| 219 | + <div>{text}</div> |
| 220 | + </div> |
| 221 | + ); |
| 222 | + })} |
| 223 | + <div ref={messagesEndRef} /> |
| 224 | + </div> |
| 225 | + |
| 226 | + <form onSubmit={handleSubmit} style={{ padding: "1rem" }}> |
| 227 | + <input |
| 228 | + type="text" |
| 229 | + value={input} |
| 230 | + onChange={(e) => setInput(e.target.value)} |
| 231 | + disabled={isStreaming} |
| 232 | + placeholder="Type your message..." |
| 233 | + style={{ width: "100%", padding: "0.5rem" }} |
| 234 | + /> |
| 235 | + <button type="submit" disabled={!input.trim() || isStreaming}> |
| 236 | + {isStreaming ? "Streaming..." : "Send"} |
| 237 | + </button> |
| 238 | + </form> |
| 239 | + </div> |
| 240 | + ); |
| 241 | + } |
| 242 | + ``` |
| 243 | + </TypeScriptExample> |
| 244 | + |
| 245 | +</Steps> |
| 246 | + |
| 247 | +The `useAgentChat` hook automatically handles: |
| 248 | +- Listening for `CF_AGENT_STREAM_RESUMING` notifications |
| 249 | +- Sending acknowledgment when ready to resume |
| 250 | +- Reconstructing messages from buffered chunks |
| 251 | +- Continuing to receive live chunks |
| 252 | + |
| 253 | +## 4. Test resumable streaming |
| 254 | + |
| 255 | +<Steps> |
| 256 | + |
| 257 | +1. Start your development server: |
| 258 | + |
| 259 | + ```sh |
| 260 | + npm start |
| 261 | + ``` |
| 262 | + |
| 263 | +2. Open your chat application in a browser. |
| 264 | + |
| 265 | +3. Start a long response by asking the AI a question that requires a lengthy answer. |
| 266 | + |
| 267 | +4. While the response is streaming, refresh the page or close and reopen the tab. |
| 268 | + |
| 269 | +5. Watch the stream automatically resume from where it left off. |
| 270 | + |
| 271 | +</Steps> |
| 272 | + |
| 273 | +## Optional: Disable automatic resume |
| 274 | + |
| 275 | +For use cases where resuming is not needed (for example, short responses), you can disable automatic resume: |
| 276 | + |
| 277 | +<TypeScriptExample> |
| 278 | + |
| 279 | +```tsx |
| 280 | +const { messages } = useAgentChat({ |
| 281 | + agent, |
| 282 | + resume: false // Disable automatic stream resumption |
| 283 | +}); |
| 284 | +``` |
| 285 | +</TypeScriptExample> |
| 286 | + |
| 287 | +## Under the hood |
| 288 | + |
| 289 | +### Server-side implementation |
| 290 | + |
| 291 | +The `AIChatAgent` class: |
| 292 | +- Creates SQLite tables for stream chunks and metadata on startup |
| 293 | +- Assigns a unique ID to each stream and tracks chunk indices |
| 294 | +- Buffers chunks and flushes to SQLite every 100ms for performance |
| 295 | +- Checks for active streams when clients connect |
| 296 | +- Sends `CF_AGENT_STREAM_RESUMING` notification if an active stream exists |
| 297 | +- Cleans up completed streams after 24 hours |
| 298 | + |
| 299 | +### Client-side implementation |
| 300 | + |
| 301 | +The `useAgentChat` hook: |
| 302 | +- Listens for `CF_AGENT_STREAM_RESUMING` notifications |
| 303 | +- Sends `CF_AGENT_STREAM_RESUME_ACK` when ready to resume |
| 304 | +- Receives all buffered chunks in order |
| 305 | +- Reconstructs the message state |
| 306 | +- Continues receiving live chunks as they arrive |
| 307 | + |
| 308 | +## Summary |
| 309 | + |
| 310 | +You built a chat application with automatic resumable streaming that: |
| 311 | +- Continues AI responses seamlessly after network interruptions |
| 312 | +- Requires no additional configuration or code |
| 313 | +- Persists stream state using SQLite |
| 314 | +- Works with any AI provider supported by Vercel AI SDK |
| 315 | + |
| 316 | +## Related resources |
| 317 | + |
| 318 | +- [AIChatAgent — API reference](/agents/api-reference/ai-chat-agent/) — Complete API documentation |
| 319 | +- [Vercel AI SDK](https://sdk.vercel.ai/docs/introduction) — AI SDK documentation |
| 320 | +- [Example: Resumable stream chat](https://github.com/cloudflare/agents/tree/main/examples/resumable-stream-chat) — Complete working example |
0 commit comments