Set up Workflow Development Kit
The Workflow Development Kit provides resumable, durable workflows for AI agents. It enables step-level persistence, stream resumption, and agent orchestration.
Step 1: Install the packages
bun add workflow @workflow/aibun add workflow @workflow/aiStep 2: Update Next.js config
// next.config.ts
import type { NextConfig } from "next";
import { withWorkflow } from "workflow/next";
const nextConfig: NextConfig = {
/* config options here */
reactCompiler: true,
};
export default withWorkflow(nextConfig);// next.config.ts
import type { NextConfig } from "next";
import { withWorkflow } from "workflow/next";
const nextConfig: NextConfig = {
/* config options here */
reactCompiler: true,
};
export default withWorkflow(nextConfig);References
Workflow Type Definitions
Define types for your chat messages and parts that work with the Workflow SDK.
Prerequisites
- Completed Workflow SDK Setup (Step 1)
Chat Message Types
Create src/workflows/chat/types.ts:
import type { UIMessage, UIMessagePart, InferUITools } from "ai";
import { z } from "zod";
import { allTools } from "@/lib/ai/tools";
const metadataSchema = z.object({});
type ChatMetadata = z.infer<typeof metadataSchema>;
const dataPartSchema = z.object({
progress: z.object({
text: z.string(),
}),
});
export type ChatDataPart = z.infer<typeof dataPartSchema>;
export type ChatToolSet = InferUITools<typeof allTools>;
export type ChatAgentUIMessage = UIMessage<
ChatMetadata,
ChatDataPart,
ChatToolSet
>;
export type ChatUIMessagePart = UIMessagePart<ChatDataPart, ChatToolSet>;
export type ChatTextPart = Extract<ChatUIMessagePart, { type: "text" }>;
export type ChatReasoningPart = Extract<
ChatUIMessagePart,
{ type: "reasoning" }
>;
export type ChatSourceUrlPart = Extract<
ChatUIMessagePart,
{ type: "source-url" }
>;
export type ChatToolPart = Extract<
ChatUIMessagePart,
{ type: `tool-${string}` }
>;
export type ChatDataProgressPart = Extract<
ChatUIMessagePart,
{ type: "data-progress" }
>;
export type ChatFilePart = Extract<ChatUIMessagePart, { type: "file" }>;
export function isToolPart(part: ChatUIMessagePart): part is ChatToolPart {
return part.type.startsWith("tool-");
}
export function isDataProgressPart(
part: ChatUIMessagePart,
): part is ChatDataProgressPart {
return part.type === "data-progress";
}import type { UIMessage, UIMessagePart, InferUITools } from "ai";
import { z } from "zod";
import { allTools } from "@/lib/ai/tools";
const metadataSchema = z.object({});
type ChatMetadata = z.infer<typeof metadataSchema>;
const dataPartSchema = z.object({
progress: z.object({
text: z.string(),
}),
});
export type ChatDataPart = z.infer<typeof dataPartSchema>;
export type ChatToolSet = InferUITools<typeof allTools>;
export type ChatAgentUIMessage = UIMessage<
ChatMetadata,
ChatDataPart,
ChatToolSet
>;
export type ChatUIMessagePart = UIMessagePart<ChatDataPart, ChatToolSet>;
export type ChatTextPart = Extract<ChatUIMessagePart, { type: "text" }>;
export type ChatReasoningPart = Extract<
ChatUIMessagePart,
{ type: "reasoning" }
>;
export type ChatSourceUrlPart = Extract<
ChatUIMessagePart,
{ type: "source-url" }
>;
export type ChatToolPart = Extract<
ChatUIMessagePart,
{ type: `tool-${string}` }
>;
export type ChatDataProgressPart = Extract<
ChatUIMessagePart,
{ type: "data-progress" }
>;
export type ChatFilePart = Extract<ChatUIMessagePart, { type: "file" }>;
export function isToolPart(part: ChatUIMessagePart): part is ChatToolPart {
return part.type.startsWith("tool-");
}
export function isDataProgressPart(
part: ChatUIMessagePart,
): part is ChatDataProgressPart {
return part.type === "data-progress";
}Key Types
ChatAgentUIMessage- Full message type with metadata, data parts, and toolsChatUIMessagePart- Union of all possible message part typesChatDataPart- Custom data parts (like progress updates)ChatToolSet- Inferred tool types from your tool definitions
Type Guards
The isToolPart and isDataProgressPart functions help narrow types when processing message parts:
for (const part of message.parts) {
if (isToolPart(part)) {
// part is ChatToolPart
console.log(part.toolCallId, part.input);
} else if (isDataProgressPart(part)) {
// part is ChatDataProgressPart
console.log(part.data.text);
}
}for (const part of message.parts) {
if (isToolPart(part)) {
// part is ChatToolPart
console.log(part.toolCallId, part.input);
} else if (isDataProgressPart(part)) {
// part is ChatDataProgressPart
console.log(part.data.text);
}
}Workflow Tool Definitions
Define tools that your agents can use within workflows.
Tool Structure
Create src/lib/ai/tools.ts:
import { google } from "@ai-sdk/google";
import { tool, type Tool } from "ai";
import { z } from "zod";
// Cast needed: @ai-sdk/google returns Tool<{}, unknown> but AI SDK expects Tool<any, any>
function asToolSetCompatible<T>(tool: T): Tool<any, any> {
return tool as Tool<any, any>;
}
export const researchTools = {
googleSearch: asToolSetCompatible(google.tools.googleSearch({})),
urlContext: asToolSetCompatible(google.tools.urlContext({})),
};
export const draftingTools = {
countCharacters: tool({
description:
"Count the number of characters in a text. Use this to verify tweet length before finalizing.",
inputSchema: z.object({
text: z.string().describe("The text to count characters for"),
}),
execute: async ({ text }) => {
const count = text.length;
const remaining = 280 - count;
return {
characterCount: count,
remainingCharacters: remaining,
isWithinLimit: count <= 280,
status:
count <= 280
? `${count}/280 characters (${remaining} remaining)`
: `${count}/280 characters (${Math.abs(remaining)} over limit)`,
};
},
}),
};
export const allTools = {
...researchTools,
...draftingTools,
};
// Tool type names for database schema - must match keys in allTools as "tool-{key}"
export const TOOL_TYPES = [
"tool-googleSearch",
"tool-urlContext",
"tool-countCharacters",
] as const;
export type ToolType = (typeof TOOL_TYPES)[number];import { google } from "@ai-sdk/google";
import { tool, type Tool } from "ai";
import { z } from "zod";
// Cast needed: @ai-sdk/google returns Tool<{}, unknown> but AI SDK expects Tool<any, any>
function asToolSetCompatible<T>(tool: T): Tool<any, any> {
return tool as Tool<any, any>;
}
export const researchTools = {
googleSearch: asToolSetCompatible(google.tools.googleSearch({})),
urlContext: asToolSetCompatible(google.tools.urlContext({})),
};
export const draftingTools = {
countCharacters: tool({
description:
"Count the number of characters in a text. Use this to verify tweet length before finalizing.",
inputSchema: z.object({
text: z.string().describe("The text to count characters for"),
}),
execute: async ({ text }) => {
const count = text.length;
const remaining = 280 - count;
return {
characterCount: count,
remainingCharacters: remaining,
isWithinLimit: count <= 280,
status:
count <= 280
? `${count}/280 characters (${remaining} remaining)`
: `${count}/280 characters (${Math.abs(remaining)} over limit)`,
};
},
}),
};
export const allTools = {
...researchTools,
...draftingTools,
};
// Tool type names for database schema - must match keys in allTools as "tool-{key}"
export const TOOL_TYPES = [
"tool-googleSearch",
"tool-urlContext",
"tool-countCharacters",
] as const;
export type ToolType = (typeof TOOL_TYPES)[number];Why Separate Tool Sets?
Tools are grouped by function (research vs drafting) so different agents can use different capabilities:
- Research tools - Web search, URL context extraction
- Drafting tools - Character counting, formatting helpers
Database Schema Integration
The TOOL_TYPES array must match your tool keys prefixed with tool- for the database schema's enum constraint. When adding new tools:
- Add the tool to the appropriate tool set
- Add
"tool-{toolKey}"toTOOL_TYPES - Update your database schema's tool type enum
Provider Tools
Some providers offer built-in tools (like Google's googleSearch). These need type casting for compatibility:
// Google's tools have different type signatures
function asToolSetCompatible<T>(tool: T): Tool<any, any> {
return tool as Tool<any, any>;
}
const searchTool = asToolSetCompatible(google.tools.googleSearch({}));// Google's tools have different type signatures
function asToolSetCompatible<T>(tool: T): Tool<any, any> {
return tool as Tool<any, any>;
}
const searchTool = asToolSetCompatible(google.tools.googleSearch({}));Workflow Definition
The main workflow orchestrates agents and manages message persistence with durable execution.
Project Structure
src/workflows/chat/
├── index.ts # Main workflow definition
├── types.ts # Type definitions
└── steps/
├── history.ts # Message persistence steps
├── stream.ts # Stream control steps
├── router.ts # Agent routing logic
└── progress.ts # Progress updatessrc/workflows/chat/
├── index.ts # Main workflow definition
├── types.ts # Type definitions
└── steps/
├── history.ts # Message persistence steps
├── stream.ts # Stream control steps
├── router.ts # Agent routing logic
└── progress.ts # Progress updatesMain Workflow
Create src/workflows/chat/index.ts:
import { getWorkflowMetadata, getWritable } from "workflow";
import type { ChatAgentUIMessage } from "./types";
import {
persistUserMessage,
createAssistantMessage,
getMessageHistory,
removeRunId,
persistMessageParts,
} from "./steps/history";
import { startStream, finishStream } from "./steps/stream";
import { routerStep } from "./steps/router";
import { writeProgress } from "./steps/progress";
import { researchAgent } from "@/lib/ai/research";
import { draftingAgent } from "@/lib/ai/drafting";
/**
* Main chat workflow that routes between research and drafting agents.
* Uses runId for stream resumability on client reconnection.
*/
export async function chatWorkflow({
chatId,
userMessage,
}: {
chatId: string;
userMessage: ChatAgentUIMessage;
}) {
"use workflow";
const { workflowRunId } = getWorkflowMetadata();
await persistUserMessage({ chatId, message: userMessage });
const messageId = await createAssistantMessage({
chatId,
runId: workflowRunId,
});
const history = await getMessageHistory(chatId);
await startStream(messageId);
const { next, reasoning } = await routerStep(chatId, messageId, history);
console.log(`Router: ${next} - ${reasoning}`);
const progressText =
next === "research" ? "Researching topic..." : "Authoring tweet...";
await writeProgress(progressText, chatId, messageId);
const agent = next === "research" ? researchAgent : draftingAgent;
const { parts } = await agent.run(history, {
maxSteps: 10,
writable: getWritable(),
});
await persistMessageParts({ chatId, messageId, parts });
await finishStream();
await removeRunId(messageId);
}import { getWorkflowMetadata, getWritable } from "workflow";
import type { ChatAgentUIMessage } from "./types";
import {
persistUserMessage,
createAssistantMessage,
getMessageHistory,
removeRunId,
persistMessageParts,
} from "./steps/history";
import { startStream, finishStream } from "./steps/stream";
import { routerStep } from "./steps/router";
import { writeProgress } from "./steps/progress";
import { researchAgent } from "@/lib/ai/research";
import { draftingAgent } from "@/lib/ai/drafting";
/**
* Main chat workflow that routes between research and drafting agents.
* Uses runId for stream resumability on client reconnection.
*/
export async function chatWorkflow({
chatId,
userMessage,
}: {
chatId: string;
userMessage: ChatAgentUIMessage;
}) {
"use workflow";
const { workflowRunId } = getWorkflowMetadata();
await persistUserMessage({ chatId, message: userMessage });
const messageId = await createAssistantMessage({
chatId,
runId: workflowRunId,
});
const history = await getMessageHistory(chatId);
await startStream(messageId);
const { next, reasoning } = await routerStep(chatId, messageId, history);
console.log(`Router: ${next} - ${reasoning}`);
const progressText =
next === "research" ? "Researching topic..." : "Authoring tweet...";
await writeProgress(progressText, chatId, messageId);
const agent = next === "research" ? researchAgent : draftingAgent;
const { parts } = await agent.run(history, {
maxSteps: 10,
writable: getWritable(),
});
await persistMessageParts({ chatId, messageId, parts });
await finishStream();
await removeRunId(messageId);
}The "use workflow" Directive
The "use workflow" directive marks the function as a durable workflow entry point:
- Provides access to
getWorkflowMetadata()for the run ID - Provides access to
getWritable()for streaming - Enables step-level durability for all called steps
Workflow Flow
- Persist user message - Save incoming message to database
- Create assistant message - Create placeholder with
runIdfor resumability - Load history - Fetch full conversation context
- Start stream - Send stream start event to client
- Route - Determine which agent to invoke
- Progress update - Inform client what's happening
- Run agent - Execute the selected agent with tool loop
- Persist parts - Save agent response to database
- Finish stream - Send stream finish event
- Clear runId - Mark message as complete
Why runId Matters
The runId enables stream resumption:
- Stored in the assistant message before streaming starts
- Client receives it in response headers
- If connection drops, client can reconnect using the
runId - Cleared after successful completion
Workflow Steps
Steps are durable functions that persist their results. If a workflow is interrupted, completed steps are replayed from storage.
The "use step" Directive
Mark functions as durable steps with "use step":
async function myStep(input: string): Promise<string> {
"use step";
// This function's result is persisted
return await someOperation(input);
}async function myStep(input: string): Promise<string> {
"use step";
// This function's result is persisted
return await someOperation(input);
}Important: "use step" only works in standalone functions, not class methods.
History Steps
Create src/workflows/chat/steps/history.ts:
import type { UIMessage } from "ai";
import {
convertDbMessagesToUIMessages,
persistMessage,
getChatMessages,
clearMessageRunId,
insertMessageParts,
} from "@/lib/chat/queries";
import { messages } from "@/lib/chat/schema";
import type { ChatAgentUIMessage } from "../types";
import { db } from "@/lib/db/client";
export async function persistUserMessage({
chatId,
message,
}: {
chatId: string;
message: ChatAgentUIMessage;
}): Promise<void> {
"use step";
await persistMessage({ chatId, message, runId: null });
}
/**
* Creates message record with runId before streaming starts,
* enabling client stream resumption on reconnection.
*/
export async function createAssistantMessage({
chatId,
runId,
}: {
chatId: string;
runId: string;
}): Promise<string> {
"use step";
const [{ messageId }] = await db
.insert(messages)
.values({
chatId,
role: "assistant",
runId,
})
.returning({ messageId: messages.id });
return messageId;
}
export async function getMessageHistory(
chatId: string,
): Promise<ChatAgentUIMessage[]> {
"use step";
const messageHistory = await getChatMessages(chatId);
return convertDbMessagesToUIMessages(messageHistory);
}
export async function removeRunId(messageId: string): Promise<void> {
"use step";
await clearMessageRunId(messageId);
}
export async function persistMessageParts({
chatId,
messageId,
parts,
}: {
chatId: string;
messageId: string;
parts: UIMessage["parts"];
}): Promise<void> {
"use step";
await insertMessageParts(
chatId,
messageId,
parts as ChatAgentUIMessage["parts"],
);
}import type { UIMessage } from "ai";
import {
convertDbMessagesToUIMessages,
persistMessage,
getChatMessages,
clearMessageRunId,
insertMessageParts,
} from "@/lib/chat/queries";
import { messages } from "@/lib/chat/schema";
import type { ChatAgentUIMessage } from "../types";
import { db } from "@/lib/db/client";
export async function persistUserMessage({
chatId,
message,
}: {
chatId: string;
message: ChatAgentUIMessage;
}): Promise<void> {
"use step";
await persistMessage({ chatId, message, runId: null });
}
/**
* Creates message record with runId before streaming starts,
* enabling client stream resumption on reconnection.
*/
export async function createAssistantMessage({
chatId,
runId,
}: {
chatId: string;
runId: string;
}): Promise<string> {
"use step";
const [{ messageId }] = await db
.insert(messages)
.values({
chatId,
role: "assistant",
runId,
})
.returning({ messageId: messages.id });
return messageId;
}
export async function getMessageHistory(
chatId: string,
): Promise<ChatAgentUIMessage[]> {
"use step";
const messageHistory = await getChatMessages(chatId);
return convertDbMessagesToUIMessages(messageHistory);
}
export async function removeRunId(messageId: string): Promise<void> {
"use step";
await clearMessageRunId(messageId);
}
export async function persistMessageParts({
chatId,
messageId,
parts,
}: {
chatId: string;
messageId: string;
parts: UIMessage["parts"];
}): Promise<void> {
"use step";
await insertMessageParts(
chatId,
messageId,
parts as ChatAgentUIMessage["parts"],
);
}Stream Steps
Create src/workflows/chat/steps/stream.ts:
import { getWritable } from "workflow";
import type { UIMessageChunk } from "ai";
export async function startStream(messageId: string): Promise<void> {
"use step";
const writable = getWritable<UIMessageChunk>();
const writer = writable.getWriter();
try {
await writer.write({
type: "start",
messageId,
});
} finally {
writer.releaseLock();
}
}
export async function finishStream(): Promise<void> {
"use step";
const writable = getWritable<UIMessageChunk>();
const writer = writable.getWriter();
try {
await writer.write({
type: "finish",
finishReason: "stop",
});
} finally {
writer.releaseLock();
}
await writable.close();
}import { getWritable } from "workflow";
import type { UIMessageChunk } from "ai";
export async function startStream(messageId: string): Promise<void> {
"use step";
const writable = getWritable<UIMessageChunk>();
const writer = writable.getWriter();
try {
await writer.write({
type: "start",
messageId,
});
} finally {
writer.releaseLock();
}
}
export async function finishStream(): Promise<void> {
"use step";
const writable = getWritable<UIMessageChunk>();
const writer = writable.getWriter();
try {
await writer.write({
type: "finish",
finishReason: "stop",
});
} finally {
writer.releaseLock();
}
await writable.close();
}Progress Step
Create src/workflows/chat/steps/progress.ts:
import { getWritable } from "workflow";
import type { UIMessageChunk } from "ai";
import type { ChatDataProgressPart } from "../types";
import { insertMessageParts } from "@/lib/chat/queries";
/** Writes a progress update to both the stream and database. */
export async function writeProgress(
text: string,
chatId: string,
messageId: string,
): Promise<void> {
"use step";
const progressPart: ChatDataProgressPart = {
type: "data-progress",
data: {
text,
},
};
const writable = getWritable<UIMessageChunk>();
const writer = writable.getWriter();
try {
await writer.write(progressPart);
} finally {
writer.releaseLock();
}
await insertMessageParts(chatId, messageId, [progressPart]);
}import { getWritable } from "workflow";
import type { UIMessageChunk } from "ai";
import type { ChatDataProgressPart } from "../types";
import { insertMessageParts } from "@/lib/chat/queries";
/** Writes a progress update to both the stream and database. */
export async function writeProgress(
text: string,
chatId: string,
messageId: string,
): Promise<void> {
"use step";
const progressPart: ChatDataProgressPart = {
type: "data-progress",
data: {
text,
},
};
const writable = getWritable<UIMessageChunk>();
const writer = writable.getWriter();
try {
await writer.write(progressPart);
} finally {
writer.releaseLock();
}
await insertMessageParts(chatId, messageId, [progressPart]);
}Router Step
Create src/workflows/chat/steps/router.ts:
import { generateObject, convertToModelMessages, type UIMessage } from "ai";
import { z } from "zod";
import { writeProgress } from "./progress";
const routerSystemPrompt = `You are an orchestrator agent for a tweet author system.
Analyze the conversation and determine what should happen next:
1. If the user provides a draft tweet idea, prompt, or topic that needs research:
- Return { next: 'research' }
2. If research has been completed and the user confirms they want to proceed with drafting:
- Return { next: 'drafting' }
3. If the user has feedback or questions about the research, or wants more information:
- Return { next: 'research' }
4. If the conversation is just starting with a new tweet request:
- Return { next: 'research' }
Look at the conversation history to understand the current state.`;
const routerSchema = z.object({
next: z.enum(["research", "drafting"]).describe("The next agent to invoke"),
reasoning: z
.string()
.describe("Brief explanation of why this route was chosen"),
});
export type RouterDecision = z.infer<typeof routerSchema>;
export async function routerStep(
chatId: string,
messageId: string,
history: UIMessage[],
): Promise<RouterDecision> {
"use step";
await writeProgress("Thinking about the next step...", chatId, messageId);
const result = await generateObject({
model: "google/gemini-2.5-flash",
system: routerSystemPrompt,
schema: routerSchema,
messages: convertToModelMessages(history),
});
return result.object;
}import { generateObject, convertToModelMessages, type UIMessage } from "ai";
import { z } from "zod";
import { writeProgress } from "./progress";
const routerSystemPrompt = `You are an orchestrator agent for a tweet author system.
Analyze the conversation and determine what should happen next:
1. If the user provides a draft tweet idea, prompt, or topic that needs research:
- Return { next: 'research' }
2. If research has been completed and the user confirms they want to proceed with drafting:
- Return { next: 'drafting' }
3. If the user has feedback or questions about the research, or wants more information:
- Return { next: 'research' }
4. If the conversation is just starting with a new tweet request:
- Return { next: 'research' }
Look at the conversation history to understand the current state.`;
const routerSchema = z.object({
next: z.enum(["research", "drafting"]).describe("The next agent to invoke"),
reasoning: z
.string()
.describe("Brief explanation of why this route was chosen"),
});
export type RouterDecision = z.infer<typeof routerSchema>;
export async function routerStep(
chatId: string,
messageId: string,
history: UIMessage[],
): Promise<RouterDecision> {
"use step";
await writeProgress("Thinking about the next step...", chatId, messageId);
const result = await generateObject({
model: "google/gemini-2.5-flash",
system: routerSystemPrompt,
schema: routerSchema,
messages: convertToModelMessages(history),
});
return result.object;
}Step Durability
Each step's result is persisted. If the workflow crashes after persistUserMessage completes but before createAssistantMessage:
- Workflow restarts
persistUserMessagereplays from storage (no duplicate insert)createAssistantMessageexecutes fresh
This ensures exactly-once semantics for database operations.
Workflow API Routes
API routes start workflows and handle stream resumption.
Start Workflow Endpoint
Create src/app/api/chats/[chatId]/messages/route.ts:
import { headers } from "next/headers";
import { verifyChatOwnership } from "@/lib/chat/queries";
import { auth } from "@/lib/auth/server";
import { chatWorkflow } from "@/workflows/chat";
import { start } from "workflow/api";
import { createUIMessageStreamResponse } from "ai";
import type { ChatAgentUIMessage } from "@/workflows/chat/types";
export async function POST(request: Request) {
const session = await auth.api.getSession({
headers: await headers(),
});
if (!session) {
return new Response("Unauthorized", { status: 401 });
}
const { chatId, message } = (await request.json()) as {
chatId: string;
message: ChatAgentUIMessage;
};
if (!chatId || !message) {
return new Response("Missing chatId or message", { status: 400 });
}
const isAuthorized = await verifyChatOwnership(chatId, session.user.id);
if (!isAuthorized) {
return new Response("Forbidden", { status: 403 });
}
// Start workflow with user message
const run = await start(chatWorkflow, [
{
chatId,
userMessage: message,
},
]);
// Return stream with runId for resumability
return createUIMessageStreamResponse({
stream: run.readable,
headers: {
"x-workflow-run-id": run.runId,
},
});
}import { headers } from "next/headers";
import { verifyChatOwnership } from "@/lib/chat/queries";
import { auth } from "@/lib/auth/server";
import { chatWorkflow } from "@/workflows/chat";
import { start } from "workflow/api";
import { createUIMessageStreamResponse } from "ai";
import type { ChatAgentUIMessage } from "@/workflows/chat/types";
export async function POST(request: Request) {
const session = await auth.api.getSession({
headers: await headers(),
});
if (!session) {
return new Response("Unauthorized", { status: 401 });
}
const { chatId, message } = (await request.json()) as {
chatId: string;
message: ChatAgentUIMessage;
};
if (!chatId || !message) {
return new Response("Missing chatId or message", { status: 400 });
}
const isAuthorized = await verifyChatOwnership(chatId, session.user.id);
if (!isAuthorized) {
return new Response("Forbidden", { status: 403 });
}
// Start workflow with user message
const run = await start(chatWorkflow, [
{
chatId,
userMessage: message,
},
]);
// Return stream with runId for resumability
return createUIMessageStreamResponse({
stream: run.readable,
headers: {
"x-workflow-run-id": run.runId,
},
});
}Resume Stream Endpoint
Create src/app/api/chats/[chatId]/messages/[runId]/stream/route.ts:
import { headers } from "next/headers";
import { getRun } from "workflow/api";
import { createUIMessageStreamResponse } from "ai";
import { auth } from "@/lib/auth/server";
import { verifyChatOwnership } from "@/lib/chat/queries";
/**
* GET /api/chats/:chatId/messages/:runId/stream
* Resume endpoint for workflow streams
*
* Query params:
* - startIndex: optional chunk index to resume from
*/
export async function GET(
request: Request,
{ params }: { params: Promise<{ chatId: string; runId: string }> },
) {
const session = await auth.api.getSession({
headers: await headers(),
});
if (!session) {
return new Response("Unauthorized", { status: 401 });
}
const { chatId, runId } = await params;
if (!runId) {
return new Response("Missing runId parameter", { status: 400 });
}
const isAuthorized = await verifyChatOwnership(chatId, session.user.id);
if (!isAuthorized) {
return new Response("Forbidden", { status: 403 });
}
const { searchParams } = new URL(request.url);
const startIndexParam = searchParams.get("startIndex");
const startIndex =
startIndexParam !== null ? parseInt(startIndexParam, 10) : undefined;
const run = await getRun(runId);
const readable = await run.getReadable({ startIndex });
return createUIMessageStreamResponse({
stream: readable,
});
}import { headers } from "next/headers";
import { getRun } from "workflow/api";
import { createUIMessageStreamResponse } from "ai";
import { auth } from "@/lib/auth/server";
import { verifyChatOwnership } from "@/lib/chat/queries";
/**
* GET /api/chats/:chatId/messages/:runId/stream
* Resume endpoint for workflow streams
*
* Query params:
* - startIndex: optional chunk index to resume from
*/
export async function GET(
request: Request,
{ params }: { params: Promise<{ chatId: string; runId: string }> },
) {
const session = await auth.api.getSession({
headers: await headers(),
});
if (!session) {
return new Response("Unauthorized", { status: 401 });
}
const { chatId, runId } = await params;
if (!runId) {
return new Response("Missing runId parameter", { status: 400 });
}
const isAuthorized = await verifyChatOwnership(chatId, session.user.id);
if (!isAuthorized) {
return new Response("Forbidden", { status: 403 });
}
const { searchParams } = new URL(request.url);
const startIndexParam = searchParams.get("startIndex");
const startIndex =
startIndexParam !== null ? parseInt(startIndexParam, 10) : undefined;
const run = await getRun(runId);
const readable = await run.getReadable({ startIndex });
return createUIMessageStreamResponse({
stream: readable,
});
}Key Workflow API Functions
start(workflow, args)
Starts a new workflow run:
- Returns
{ runId, readable } runIduniquely identifies this run for resumptionreadableis aReadableStreamof UI message chunks
getRun(runId)
Gets an existing workflow run:
- Returns a run object with
getReadable({ startIndex? }) startIndexallows resuming from a specific chunk
Response Headers
The x-workflow-run-id header is critical for resumability:
return createUIMessageStreamResponse({
stream: run.readable,
headers: {
"x-workflow-run-id": run.runId,
},
});return createUIMessageStreamResponse({
stream: run.readable,
headers: {
"x-workflow-run-id": run.runId,
},
});The client captures this header and uses it for reconnection.
Authorization
Both endpoints verify:
- User is authenticated (session exists)
- User owns the chat (
verifyChatOwnership)
This prevents unauthorized access to other users' workflow streams.
Workflow Client Integration
The client uses WorkflowChatTransport for automatic stream resumption.
Resumable Chat Hook
Create src/hooks/use-resumable-chat.ts:
"use client";
import { useChat } from "@ai-sdk/react";
import { WorkflowChatTransport } from "@workflow/ai";
import { v7 as uuidv7 } from "uuid";
import type { ChatAgentUIMessage } from "@/workflows/chat/types";
import { useRef } from "react";
interface UseResumableChatOptions {
chatId: string;
messageHistory: ChatAgentUIMessage[];
/** Initial workflow run ID for resuming an interrupted stream */
initialRunId?: string;
}
/**
* Custom hook that wraps useChat with WorkflowChatTransport for resumable streaming.
*/
export function useResumableChat({
chatId,
messageHistory,
initialRunId,
}: UseResumableChatOptions) {
const activeRunIdRef = useRef<string | undefined>(initialRunId);
const chatResult = useChat<ChatAgentUIMessage>({
messages: messageHistory,
resume: !!initialRunId,
transport: new WorkflowChatTransport({
// Send new messages
prepareSendMessagesRequest: ({ messages }) => ({
api: `/api/chats/${chatId}/messages`,
body: {
chatId,
message: messages[messages.length - 1],
},
}),
// Store the workflow run ID when a message is sent
onChatSendMessage: (response) => {
const workflowRunId = response.headers.get("x-workflow-run-id");
if (workflowRunId) {
activeRunIdRef.current = workflowRunId;
}
},
// Configure reconnection to use the ref for the latest value
prepareReconnectToStreamRequest: ({ api, ...rest }) => {
const currentRunId = activeRunIdRef.current;
if (!currentRunId) {
throw new Error("No active workflow run ID found for reconnection");
}
return {
...rest,
api: `/api/chats/${chatId}/messages/${encodeURIComponent(currentRunId)}/stream`,
};
},
// Clear the workflow run ID when the chat stream ends
onChatEnd: () => {
activeRunIdRef.current = undefined;
},
// Retry up to 5 times on reconnection errors
maxConsecutiveErrors: 5,
}),
id: chatId,
generateId: () => uuidv7(),
});
return chatResult;
}"use client";
import { useChat } from "@ai-sdk/react";
import { WorkflowChatTransport } from "@workflow/ai";
import { v7 as uuidv7 } from "uuid";
import type { ChatAgentUIMessage } from "@/workflows/chat/types";
import { useRef } from "react";
interface UseResumableChatOptions {
chatId: string;
messageHistory: ChatAgentUIMessage[];
/** Initial workflow run ID for resuming an interrupted stream */
initialRunId?: string;
}
/**
* Custom hook that wraps useChat with WorkflowChatTransport for resumable streaming.
*/
export function useResumableChat({
chatId,
messageHistory,
initialRunId,
}: UseResumableChatOptions) {
const activeRunIdRef = useRef<string | undefined>(initialRunId);
const chatResult = useChat<ChatAgentUIMessage>({
messages: messageHistory,
resume: !!initialRunId,
transport: new WorkflowChatTransport({
// Send new messages
prepareSendMessagesRequest: ({ messages }) => ({
api: `/api/chats/${chatId}/messages`,
body: {
chatId,
message: messages[messages.length - 1],
},
}),
// Store the workflow run ID when a message is sent
onChatSendMessage: (response) => {
const workflowRunId = response.headers.get("x-workflow-run-id");
if (workflowRunId) {
activeRunIdRef.current = workflowRunId;
}
},
// Configure reconnection to use the ref for the latest value
prepareReconnectToStreamRequest: ({ api, ...rest }) => {
const currentRunId = activeRunIdRef.current;
if (!currentRunId) {
throw new Error("No active workflow run ID found for reconnection");
}
return {
...rest,
api: `/api/chats/${chatId}/messages/${encodeURIComponent(currentRunId)}/stream`,
};
},
// Clear the workflow run ID when the chat stream ends
onChatEnd: () => {
activeRunIdRef.current = undefined;
},
// Retry up to 5 times on reconnection errors
maxConsecutiveErrors: 5,
}),
id: chatId,
generateId: () => uuidv7(),
});
return chatResult;
}Chat Page with Resumption Detection
Create or update src/app/[chatId]/page.tsx:
import { redirect } from "next/navigation";
import { headers } from "next/headers";
import { Chat } from "@/components/chat/chat";
import {
convertDbMessagesToUIMessages,
ensureChatExists,
getChatMessages,
} from "@/lib/chat/queries";
import { auth } from "@/lib/auth/server";
interface PageProps {
params: Promise<{
chatId: string;
}>;
}
export default async function ChatPage({ params }: PageProps) {
const session = await auth.api.getSession({
headers: await headers(),
});
if (!session) {
redirect("/sign-in");
}
const { chatId } = await params;
const userId = session.user.id;
const isAuthorized = await ensureChatExists(chatId, userId);
if (!isAuthorized) {
redirect("/");
}
const persistedMessages = await getChatMessages(chatId);
// Check for incomplete assistant message (has runId but no parts)
// This happens when a workflow was interrupted mid-stream
const lastMessage = persistedMessages.at(-1);
const isIncompleteMessage =
lastMessage?.role === "assistant" &&
lastMessage?.runId &&
lastMessage?.parts.length === 0;
// Extract runId for resumption and remove empty message from history
const initialRunId = isIncompleteMessage ? lastMessage.runId : undefined;
const messagesToConvert = isIncompleteMessage
? persistedMessages.slice(0, -1)
: persistedMessages;
const history = convertDbMessagesToUIMessages(messagesToConvert);
return (
<Chat
messageHistory={history}
chatId={chatId}
initialRunId={initialRunId ?? undefined}
/>
);
}import { redirect } from "next/navigation";
import { headers } from "next/headers";
import { Chat } from "@/components/chat/chat";
import {
convertDbMessagesToUIMessages,
ensureChatExists,
getChatMessages,
} from "@/lib/chat/queries";
import { auth } from "@/lib/auth/server";
interface PageProps {
params: Promise<{
chatId: string;
}>;
}
export default async function ChatPage({ params }: PageProps) {
const session = await auth.api.getSession({
headers: await headers(),
});
if (!session) {
redirect("/sign-in");
}
const { chatId } = await params;
const userId = session.user.id;
const isAuthorized = await ensureChatExists(chatId, userId);
if (!isAuthorized) {
redirect("/");
}
const persistedMessages = await getChatMessages(chatId);
// Check for incomplete assistant message (has runId but no parts)
// This happens when a workflow was interrupted mid-stream
const lastMessage = persistedMessages.at(-1);
const isIncompleteMessage =
lastMessage?.role === "assistant" &&
lastMessage?.runId &&
lastMessage?.parts.length === 0;
// Extract runId for resumption and remove empty message from history
const initialRunId = isIncompleteMessage ? lastMessage.runId : undefined;
const messagesToConvert = isIncompleteMessage
? persistedMessages.slice(0, -1)
: persistedMessages;
const history = convertDbMessagesToUIMessages(messagesToConvert);
return (
<Chat
messageHistory={history}
chatId={chatId}
initialRunId={initialRunId ?? undefined}
/>
);
}How Resumption Detection Works
- On page load, fetch all messages for the chat
- Check last message - if it's an assistant message with
runIdbut no parts, it's incomplete - Extract
runId- pass to client for resumption - Remove empty message - don't show the incomplete message in UI
- Client resumes -
WorkflowChatTransportreconnects using therunId
WorkflowChatTransport Options
| Option | Description |
|---|---|
prepareSendMessagesRequest | Configure the initial message send request |
onChatSendMessage | Callback when message is sent (capture runId) |
prepareReconnectToStreamRequest | Configure reconnection request URL |
onChatEnd | Callback when stream completes |
maxConsecutiveErrors | Number of reconnection retries |
Using the Hook in Components
"use client";
import { useResumableChat } from "@/hooks/use-resumable-chat";
export function Chat({ chatId, messageHistory, initialRunId }) {
const { messages, sendMessage, status } = useResumableChat({
chatId,
messageHistory,
initialRunId,
});
// Render messages and input...
}"use client";
import { useResumableChat } from "@/hooks/use-resumable-chat";
export function Chat({ chatId, messageHistory, initialRunId }) {
const { messages, sendMessage, status } = useResumableChat({
chatId,
messageHistory,
initialRunId,
});
// Render messages and input...
}Workflow Key Concepts
Understanding the core concepts behind resumable workflows.
How Resumability Works
- Workflow starts -
workflowRunIdis generated and returned in response header - Message created - Assistant message is created with
runIdbefore streaming - Client stores runId -
WorkflowChatTransportcaptures it from header - Connection lost - Client detects disconnect
- Auto-reconnect - Transport calls resume endpoint with
runIdandstartIndex - Stream resumes - Workflow SDK returns stream from where client left off
- Page reload - Server detects incomplete message by checking for
runId, passes to client to resume
The "use step" Directive
Marks functions as durable workflow steps:
async function myStep(input: string): Promise<string> {
"use step";
// Result is persisted and replayed on restart
return await someOperation(input);
}async function myStep(input: string): Promise<string> {
"use step";
// Result is persisted and replayed on restart
return await someOperation(input);
}- Each step is persisted and can be replayed if the workflow is interrupted
- Only works in standalone functions, not class methods
- Enables exactly-once semantics for database operations
The "use workflow" Directive
Marks the main workflow function:
export async function chatWorkflow({ chatId, userMessage }) {
"use workflow";
const { workflowRunId } = getWorkflowMetadata();
const writable = getWritable();
// ...
}export async function chatWorkflow({ chatId, userMessage }) {
"use workflow";
const { workflowRunId } = getWorkflowMetadata();
const writable = getWritable();
// ...
}Provides access to:
getWorkflowMetadata()- Get the run ID and other metadatagetWritable()- Get the writable stream for sending chunks
Stream Ordering
- UUID v7 IDs ensure chronological ordering of message parts
startIndexparameter allows resuming from a specific chunk- Parts are sorted by ID when loading from database
Tool Loops
Agents continue executing until finishReason !== "tool-calls":
while (shouldContinue && stepCount < maxSteps) {
const result = await executeAgentStep(modelMessages, stepConfig);
shouldContinue = result.finishReason === "tool-calls";
stepCount++;
}while (shouldContinue && stepCount < maxSteps) {
const result = await executeAgentStep(modelMessages, stepConfig);
shouldContinue = result.finishReason === "tool-calls";
stepCount++;
}This allows models to call tools multiple times before responding.
Why Tools Are Referenced by Key
Workflow runtimes serialize step inputs/outputs. Function references can't be serialized, so tools are stored in a lookup object:
const toolSets = {
research: researchTools,
drafting: draftingTools,
} as const;
// Inside step executor:
const tools = toolSets[config.stepOptions.tools];const toolSets = {
research: researchTools,
drafting: draftingTools,
} as const;
// Inside step executor:
const tools = toolSets[config.stepOptions.tools];Why Step Executors Are Separate
The "use step" directive only works in standalone functions:
// This works:
async function executeAgentStep(...) {
"use step";
// ...
}
// This does NOT work:
class Agent {
async executeStep(...) {
"use step"; // Error: directive not supported in methods
}
}// This works:
async function executeAgentStep(...) {
"use step";
// ...
}
// This does NOT work:
class Agent {
async executeStep(...) {
"use step"; // Error: directive not supported in methods
}
}Progress Updates
Use progress updates to keep users informed during long operations:
await writeProgress("Researching topic...", chatId, messageId);await writeProgress("Researching topic...", chatId, messageId);Progress updates are:
- Streamed to the client immediately
- Persisted to the database for history