Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/chat/app/(chat)/actions.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use server';

import { generateText, Message } from 'ai';
import { generateText, UIMessage } from 'ai';
import { cookies } from 'next/headers';

import {
Expand All @@ -19,7 +19,7 @@ export async function saveChatModelAsCookie(model: string) {
export async function generateTitleFromUserMessage({
message,
}: {
message: Message;
message: UIMessage;
}) {
const { text: title } = await generateText({
model: myProvider.languageModel('title-model'),
Expand Down
166 changes: 108 additions & 58 deletions examples/chat/app/(chat)/api/chat/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import { getEffectiveSession, shouldPersistData } from "@/lib/auth-utils"
import { MCPSessionManager } from "@/mods/mcp-client"
import {
UIMessage,
appendResponseMessages,
createDataStreamResponse,
smoothStream,
} from "ai"
import { generateTitleFromUserMessage } from "../../actions"
Expand Down Expand Up @@ -142,65 +140,117 @@ export async function POST(request: Request) {
console.log('DEBUG: Final sessionId for MCPSessionManager:', sessionId)
const mcpSession = new MCPSessionManager(MCP_BASE_URL, userId, id, sessionId)

return createDataStreamResponse({
execute: async (dataStream) => {
const system = systemPrompt({ selectedChatModel })
await streamText(
{ dataStream, userMessage },
{
model: myProvider.languageModel(selectedChatModel),
system,
messages,
maxSteps: 20,
experimental_transform: smoothStream({ chunking: "word" }),
experimental_generateMessageId: generateUUID,
getTools: () => mcpSession.tools({ useCache: false }),
onFinish: async ({ response }) => {
if (userId && shouldPersistData()) {
try {
const assistantId = getTrailingMessageId({
messages: response.messages.filter(
(message) => message.role === "assistant"
),
})

if (!assistantId) {
throw new Error("No assistant message found!")
}

const [, assistantMessage] = appendResponseMessages({
messages: [userMessage],
responseMessages: response.messages,
})

await saveMessages({
messages: [
{
id: assistantId,
chatId: id,
role: assistantMessage.role,
parts: assistantMessage.parts,
attachments:
assistantMessage.experimental_attachments ?? [],
createdAt: new Date(),
},
],
})
} catch (error) {
console.error("Failed to save chat")
}
const system = systemPrompt({ selectedChatModel })
const result = await streamText(
{ userMessage },
{
model: myProvider.languageModel(selectedChatModel),
system,
messages,
maxSteps: 20,
experimental_generateMessageId: generateUUID,
getTools: () => mcpSession.tools({ useCache: false }),
onFinish: async ({ response }) => {
if (userId && shouldPersistData()) {
try {
const assistantId = getTrailingMessageId({
messages: response.messages.filter(
(message) => message.role === "assistant"
),
})

if (!assistantId) {
throw new Error("No assistant message found!")
}
},
experimental_telemetry: {
isEnabled: isProductionEnvironment,
functionId: "stream-text",
},

// Find the last assistant message in the response messages
const assistantMessage = response.messages.filter(
(message) => message.role === "assistant"
).pop()

if (!assistantMessage) {
throw new Error("No assistant message found in response!")
}

await saveMessages({
messages: [
{
id: assistantId,
chatId: id,
role: assistantMessage.role,
parts: assistantMessage.parts,
attachments:
assistantMessage.experimental_attachments ?? [],
createdAt: new Date(),
},
],
})
} catch (error) {
console.error("Failed to save chat")
}
}
},
experimental_telemetry: {
isEnabled: isProductionEnvironment,
functionId: "stream-text",
},
}
)

// Use custom fullStream approach for true progressive streaming
const encoder = new TextEncoder()
const stream = new ReadableStream({
async start(controller) {
try {
let closed = false

// Handle stream closure
const cleanup = () => {
if (!closed) {
closed = true
controller.close()
}
}

for await (const chunk of result.fullStream) {
if (closed) break

console.log(`>> Streaming chunk type: ${chunk.type}`)

try {
const data = `data: ${JSON.stringify(chunk)}\n\n`
controller.enqueue(encoder.encode(data))
} catch (enqueueError) {
console.log('>> Controller already closed, stopping stream')
break
}

// If this is a finish chunk, prepare to close
if (chunk.type === 'finish') {
console.log('>> Stream finishing')
cleanup()
break
}
}

cleanup()
} catch (error) {
console.error('>> Stream error:', error)
try {
controller.error(error)
} catch (errorError) {
console.log('>> Controller already closed during error handling')
}
)
}
},
onError: (error) => {
console.error("Error:", error)
return "Oops, an error occured!"
})

return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'x-vercel-ai-ui-message-stream': 'v1',
},
})
} catch (error) {
Expand Down
120 changes: 52 additions & 68 deletions examples/chat/app/(chat)/api/chat/streamText.ts
Original file line number Diff line number Diff line change
@@ -1,95 +1,79 @@
import { generateUUID, getTrailingMessageId } from "@/lib/utils"
import { generateUUID } from "@/lib/utils"
import {
streamText as _streamText,
appendResponseMessages,
DataStreamWriter,
ToolSet,
UIMessage,
Message,
smoothStream,
convertToModelMessages,
stepCountIs,
} from "ai"

export const streamText = async (
{
dataStream,
userMessage,
}: { dataStream: DataStreamWriter; userMessage: UIMessage },
}: { userMessage: UIMessage },
args: Omit<Parameters<typeof _streamText>[0], "tools"> & {
getTools: () => Promise<ToolSet>
}
) => {
const {
maxSteps = 1,
maxRetries,
messages: _messages,
getTools,
...rest
} = args

// Convert UI messages to proper Message objects with IDs if needed
let messages = (_messages ?? []).map((msg) =>
"id" in msg ? msg : { ...msg, id: generateUUID() }
) as Message[]
) as UIMessage[]

for (let steps = 0; steps < maxSteps; steps++) {
const cont = await new Promise<boolean>(async (resolve, reject) => {
const tools = await getTools()
console.log(">> Using tools", Object.keys(tools).join(", "))
const result = _streamText({
...rest,
messages,
tools,
experimental_transform: [
smoothStream({
chunking: /\s*\S+\s*/m,
delayInMs: 0
})
],
onFinish: async (event) => {
console.log(">> Finish reason", event.finishReason)
const tools = await getTools()
console.log(">> Using tools", Object.keys(tools).join(", "))
console.log(">> MaxSteps configured:", maxSteps)

switch (event.finishReason) {
case "stop":
case "content-filter":
case "error":
resolve(false)
break
case "length":
case "tool-calls":
case "other":
case "unknown":
default:
break
}
// Go back to manual step approach for proper sequential execution and streaming
// This ensures tools execute sequentially and results are properly chained

// Use v5's built-in multi-step execution with stopWhen
const result = _streamText({
...rest,
messages: convertToModelMessages(messages),
tools,

// v5 pattern: stopWhen evaluated only when step contains tool results
stopWhen: stepCountIs(maxSteps),

// Remove smoothStream to test if it's causing buffering in multi-step
// experimental_transform: [
// smoothStream({
// chunking: /\s*\S+\s*/m,
// delayInMs: 25
// })
// ],

// Monitor each step
onStepFinish: ({ stepNumber, finishReason, toolCalls, toolResults, steps }) => {
console.log(`>> Step ${stepNumber} finished with reason: ${finishReason}`)
console.log(`>> Total steps so far: ${steps?.length || 'unknown'}`)
if (toolCalls?.length) {
console.log(`>> Made ${toolCalls.length} tool calls`)
}
if (toolResults?.length) {
console.log(`>> Got ${toolResults.length} tool results`)
}
console.log(`>> Should continue? stepCount=${steps?.length}, maxSteps=${maxSteps}`)
},

onFinish: async (event) => {
console.log(">> All steps completed, final reason:", event.finishReason)
console.log(">> Total steps:", event.steps?.length || 0)

// Call the original onFinish handler
await rest.onFinish?.(event)
},
})

const assistantId = getTrailingMessageId({
messages: event.response.messages.filter(
(message) => message.role === "assistant"
),
})

if (!assistantId) {
throw new Error("No assistant message found!")
}

messages = appendResponseMessages({
messages,
responseMessages: event.response.messages,
})
await rest.onFinish?.(event)
resolve(true)
},
})

result.consumeStream()

result.mergeIntoDataStream(dataStream, {
sendReasoning: true,
})
})

if (!cont) {
console.log("Ending loop", steps)
break
}
}
// Return the result directly - this will stream properly
return result
}
20 changes: 14 additions & 6 deletions examples/chat/artifacts/code/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@ export const codeDocumentHandler = createDocumentHandler<'code'>({
const { code } = object;

if (code) {
dataStream.writeData({
type: 'code-delta',
content: code ?? '',
dataStream.write({
'type': 'data',

'value': [{
type: 'code-delta',
content: code ?? '',
}]
});

draftContent = code;
Expand Down Expand Up @@ -58,9 +62,13 @@ export const codeDocumentHandler = createDocumentHandler<'code'>({
const { code } = object;

if (code) {
dataStream.writeData({
type: 'code-delta',
content: code ?? '',
dataStream.write({
'type': 'data',

'value': [{
type: 'code-delta',
content: code ?? '',
}]
});

draftContent = code;
Expand Down
Loading