{"content":"# Resumable AI Response Streams\n\nAdd automatic stream recovery to AI chat with WorkflowChatTransport, start/resume API endpoints, and the useResumableChat hook.\n\n### Start Workflow Endpoint\n\nCreate the endpoint to start workflow runs:\n\n```typescript\n// src/app/api/chats/[chatId]/messages/route.ts\nimport { headers } from \"next/headers\";\nimport { ensureChatExists } from \"@/lib/chat/queries\";\nimport { auth } from \"@/lib/auth/server\";\nimport { chatWorkflow } from \"@/workflows/chat\";\nimport { start } from \"workflow/api\";\nimport { createUIMessageStreamResponse } from \"ai\";\nimport type { ChatAgentUIMessage } from \"@/workflows/chat/types\";\n\nexport async function POST(request: Request) {\n  const session = await auth.api.getSession({\n    headers: await headers(),\n  });\n\n  if (!session) {\n    return new Response(\"Unauthorized\", { status: 401 });\n  }\n\n  const { chatId, message } = (await request.json()) as {\n    chatId: string;\n    message: ChatAgentUIMessage;\n  };\n\n  if (!chatId || !message) {\n    return new Response(\"Missing chatId or message\", { status: 400 });\n  }\n\n  // Ensure chat exists (create if needed) and verify ownership\n  const isAuthorized = await ensureChatExists(chatId, session.user.id);\n  if (!isAuthorized) {\n    return new Response(\"Forbidden\", { status: 403 });\n  }\n\n  // Start workflow with user message\n  const run = await start(chatWorkflow, [\n    {\n      chatId,\n      userMessage: message,\n    },\n  ]);\n\n  // Return stream with runId for resumability\n  return createUIMessageStreamResponse({\n    stream: run.readable,\n    headers: {\n      \"x-workflow-run-id\": run.runId,\n    },\n  });\n}\n```\n\n### Resume Stream Endpoint\n\nCreate the endpoint to resume workflow streams:\n\n```typescript\n// src/app/api/chats/[chatId]/messages/[runId]/stream/route.ts\nimport { headers } from \"next/headers\";\nimport { getRun } from \"workflow/api\";\nimport { createUIMessageStreamResponse } from \"ai\";\nimport { auth } from \"@/lib/auth/server\";\nimport { verifyChatOwnership } from \"@/lib/chat/queries\";\n\n/**\n * GET /api/chats/:chatId/messages/:runId/stream\n * Resume endpoint for workflow streams\n *\n * Query params:\n *   - startIndex: optional chunk index to resume from\n */\nexport async function GET(\n  request: Request,\n  { params }: { params: Promise<{ chatId: string; runId: string }> },\n) {\n  const session = await auth.api.getSession({\n    headers: await headers(),\n  });\n\n  if (!session) {\n    return new Response(\"Unauthorized\", { status: 401 });\n  }\n\n  const { chatId, runId } = await params;\n\n  if (!runId) {\n    return new Response(\"Missing runId parameter\", { status: 400 });\n  }\n\n  const isAuthorized = await verifyChatOwnership(chatId, session.user.id);\n  if (!isAuthorized) {\n    return new Response(\"Forbidden\", { status: 403 });\n  }\n\n  const { searchParams } = new URL(request.url);\n  const startIndexParam = searchParams.get(\"startIndex\");\n  const startIndex =\n    startIndexParam !== null ? parseInt(startIndexParam, 10) : undefined;\n\n  const run = await getRun(runId);\n  const readable = await run.getReadable({ startIndex });\n\n  return createUIMessageStreamResponse({\n    stream: readable,\n  });\n}\n```\n\n### Key Workflow API Functions\n\n**`start(workflow, args)`**\n\nStarts a new workflow run:\n\n- Returns `{ runId, readable }`\n- `runId` uniquely identifies this run for resumption\n- `readable` is a `ReadableStream` of UI message chunks\n\n**`getRun(runId)`**\n\nGets an existing workflow run:\n\n- Returns a run object with `getReadable({ startIndex? })`\n- `startIndex` allows resuming from a specific chunk\n\n### Response Headers\n\nThe `x-workflow-run-id` header is critical for resumability:\n\n```typescript\nreturn createUIMessageStreamResponse({\n  stream: run.readable,\n  headers: {\n    \"x-workflow-run-id\": run.runId,\n  },\n});\n```\n\nThe client captures this header and uses it for reconnection.\n\n### Authorization\n\nBoth endpoints verify:\n\n1. User is authenticated (session exists)\n2. User owns the chat (`ensureChatExists` / `verifyChatOwnership`)\n\nThis prevents unauthorized access to other users' workflow streams.\n\n---\n\n## Workflow Client Integration\n\nThe client uses `WorkflowChatTransport` for automatic stream resumption.\n\n### Resumable Chat Hook\n\n**Install via shadcn registry:**\n\n```bash\nbunx --bun shadcn@latest add https://fullstackrecipes.com/r/use-resumable-chat.json\n```\n\n**Or copy the source code:**\n\n`hooks/use-resumable-chat.ts`:\n\n```typescript\n\"use client\";\n\nimport { useChat } from \"@ai-sdk/react\";\nimport { WorkflowChatTransport } from \"@workflow/ai\";\nimport { v7 as uuidv7 } from \"uuid\";\nimport type { ChatAgentUIMessage } from \"@/workflows/chat/types\";\nimport { useRef } from \"react\";\n\ninterface UseResumableChatOptions {\n  chatId: string;\n  messageHistory: ChatAgentUIMessage[];\n  /** Initial workflow run ID for resuming an interrupted stream */\n  initialRunId?: string;\n}\n\n/**\n * Custom hook that wraps useChat with WorkflowChatTransport for resumable streaming.\n *\n * Uses useStateRef to track the active workflow run ID, enabling automatic\n * reconnection to interrupted streams without stale closure issues.\n */\nexport function useResumableChat({\n  chatId,\n  messageHistory,\n  initialRunId,\n}: UseResumableChatOptions) {\n  const activeRunIdRef = useRef<string | undefined>(initialRunId);\n\n  const chatResult = useChat<ChatAgentUIMessage>({\n    messages: messageHistory,\n    resume: !!initialRunId,\n    transport: new WorkflowChatTransport({\n      // Send new messages\n      prepareSendMessagesRequest: ({ messages }) => ({\n        api: `/api/chats/${chatId}/messages`,\n        body: {\n          chatId,\n          message: messages[messages.length - 1],\n        },\n      }),\n\n      // Store the workflow run ID when a message is sent\n      onChatSendMessage: (response) => {\n        const workflowRunId = response.headers.get(\"x-workflow-run-id\");\n        if (workflowRunId) {\n          activeRunIdRef.current = workflowRunId;\n        }\n      },\n\n      // Configure reconnection to use the ref for the latest value\n      prepareReconnectToStreamRequest: ({ api, ...rest }) => {\n        const currentRunId = activeRunIdRef.current;\n        if (!currentRunId) {\n          throw new Error(\"No active workflow run ID found for reconnection\");\n        }\n        return {\n          ...rest,\n          api: `/api/chats/${chatId}/messages/${encodeURIComponent(currentRunId)}/stream`,\n        };\n      },\n\n      // Clear the workflow run ID when the chat stream ends\n      onChatEnd: () => {\n        activeRunIdRef.current = undefined;\n      },\n\n      // Retry up to 5 times on reconnection errors\n      maxConsecutiveErrors: 5,\n    }),\n    id: chatId,\n    generateId: () => uuidv7(),\n  });\n\n  return {\n    ...chatResult,\n  };\n}\n```\n\nThe hook requires a `ChatAgentUIMessage` type definition. Update the import path to match your project structure.\n\n### Chat Page with Resumption Detection\n\nCreate or update the chat page with resumption detection:\n\n```typescript\n// src/app/chats/[chatId]/page.tsx\nimport type { Metadata } from \"next\";\nimport { redirect } from \"next/navigation\";\nimport { headers } from \"next/headers\";\nimport Link from \"next/link\";\nimport { ArrowLeft } from \"lucide-react\";\nimport { Chat } from \"@/components/chat/chat\";\nimport {\n  convertDbMessagesToUIMessages,\n  ensureChatExists,\n  getChatMessages,\n} from \"@/lib/chat/queries\";\nimport { auth } from \"@/lib/auth/server\";\nimport { UserMenu } from \"@/components/auth/user-menu\";\nimport { ThemeSelector } from \"@/components/themes/selector\";\n\nexport const metadata: Metadata = {\n  title: \"Chat\",\n  description: \"Continue your AI-powered conversation.\",\n};\n\ninterface PageProps {\n  params: Promise<{\n    chatId: string;\n  }>;\n}\n\nexport default async function ChatPage({ params }: PageProps) {\n  const session = await auth.api.getSession({\n    headers: await headers(),\n  });\n\n  if (!session) {\n    redirect(\"/sign-in\");\n  }\n\n  const { chatId } = await params;\n  const userId = session.user.id;\n\n  const isAuthorized = await ensureChatExists(chatId, userId);\n  if (!isAuthorized) {\n    redirect(\"/\");\n  }\n\n  // Fetch all messages for this chat\n  const persistedMessages = await getChatMessages(chatId);\n\n  // Check if the last message is an incomplete assistant message (has runId but no parts)\n  // This happens when a workflow was interrupted mid-stream\n  const lastMessage = persistedMessages.at(-1);\n  const isIncompleteMessage =\n    lastMessage?.role === \"assistant\" &&\n    lastMessage?.runId &&\n    lastMessage?.parts.length === 0;\n\n  // If incomplete, extract the runId for resumption and remove the empty message from history\n  const initialRunId = isIncompleteMessage ? lastMessage.runId : undefined;\n  const messagesToConvert = isIncompleteMessage\n    ? persistedMessages.slice(0, -1)\n    : persistedMessages;\n\n  const history = convertDbMessagesToUIMessages(messagesToConvert);\n\n  return (\n    <div className=\"h-dvh bg-gradient-to-b from-background via-background to-muted/20 grid grid-rows-[auto_1fr]\">\n      <header className=\"z-50 border-b border-border/50 bg-background/80 backdrop-blur-xl\">\n        <div className=\"container mx-auto px-4 h-14 flex items-center justify-between\">\n          <div className=\"flex items-center gap-2 sm:gap-4\">\n            <Link\n              href=\"/chats\"\n              className=\"flex items-center gap-2 text-muted-foreground hover:text-foreground transition-colors\"\n            >\n              <ArrowLeft className=\"h-4 w-4\" />\n              <span className=\"sr-only sm:not-sr-only text-sm font-medium\">\n                Back to chats\n              </span>\n            </Link>\n            <span className=\"text-border hidden sm:inline\">|</span>\n            <span className=\"hidden sm:block font-mono text-lg font-semibold tracking-tight\">\n              AI Chat\n            </span>\n          </div>\n          <div className=\"flex items-center gap-2\">\n            <ThemeSelector />\n            <UserMenu />\n          </div>\n        </div>\n      </header>\n\n      <main className=\"min-h-0 overflow-hidden\">\n        <Chat\n          messageHistory={history}\n          chatId={chatId}\n          initialRunId={initialRunId ?? undefined}\n        />\n      </main>\n    </div>\n  );\n}\n```\n\n### How Resumption Detection Works\n\n1. **On page load**, fetch all messages for the chat\n2. **Check last message** - if it's an assistant message with `runId` but no parts, it's incomplete\n3. **Extract `runId`** - pass to client for resumption\n4. **Remove empty message** - don't show the incomplete message in UI\n5. **Client resumes** - `WorkflowChatTransport` reconnects using the `runId`\n\n### WorkflowChatTransport Options\n\n| Option | Description |\n| `prepareSendMessagesRequest` | Configure the initial message send request |\n| `onChatSendMessage` | Callback when message is sent (capture `runId`) |\n| `prepareReconnectToStreamRequest` | Configure reconnection request URL |\n| `onChatEnd` | Callback when stream completes |\n| `maxConsecutiveErrors` | Number of reconnection retries |\n\n### Using the Hook in Components\n\n```tsx\n\"use client\";\n\nimport { useResumableChat } from \"@/hooks/use-resumable-chat\";\n\nexport function Chat({ chatId, messageHistory, initialRunId }) {\n  const { messages, sendMessage, status } = useResumableChat({\n    chatId,\n    messageHistory,\n    initialRunId,\n  });\n\n  // Render messages and input...\n}\n```"}