Back to recipes

Resumable AI Agent Workflows

Build resumable AI agent workflows with durable execution, tool loops, and automatic stream recovery on client reconnection.

AIAgentsWorkflow Dev KitStreaming

Set up Workflow Development Kit

The Workflow Development Kit provides resumable, durable workflows for AI agents. It enables step-level persistence, stream resumption, and agent orchestration.

Step 1: Install the packages

bun add workflow @workflow/ai
bash

Step 2: Update Next.js config

// next.config.ts
import type { NextConfig } from "next";
import { withWorkflow } from "workflow/next";

const nextConfig: NextConfig = {
  /* config options here */
  reactCompiler: true,
};

export default withWorkflow(nextConfig);
ts

References


Workflow Type Definitions

Define types for your chat messages and parts that work with the Workflow SDK.

Prerequisites

Chat Message Types

Create src/workflows/chat/types.ts:

import type { UIMessage, UIMessagePart, InferUITools } from "ai";
import { z } from "zod";
import { allTools } from "@/lib/ai/tools";

const metadataSchema = z.object({});
type ChatMetadata = z.infer<typeof metadataSchema>;

const dataPartSchema = z.object({
  progress: z.object({
    text: z.string(),
  }),
});
export type ChatDataPart = z.infer<typeof dataPartSchema>;

export type ChatToolSet = InferUITools<typeof allTools>;

export type ChatAgentUIMessage = UIMessage<
  ChatMetadata,
  ChatDataPart,
  ChatToolSet
>;
export type ChatUIMessagePart = UIMessagePart<ChatDataPart, ChatToolSet>;

export type ChatTextPart = Extract<ChatUIMessagePart, { type: "text" }>;
export type ChatReasoningPart = Extract<
  ChatUIMessagePart,
  { type: "reasoning" }
>;
export type ChatSourceUrlPart = Extract<
  ChatUIMessagePart,
  { type: "source-url" }
>;
export type ChatToolPart = Extract<
  ChatUIMessagePart,
  { type: `tool-${string}` }
>;
export type ChatDataProgressPart = Extract<
  ChatUIMessagePart,
  { type: "data-progress" }
>;
export type ChatFilePart = Extract<ChatUIMessagePart, { type: "file" }>;

export function isToolPart(part: ChatUIMessagePart): part is ChatToolPart {
  return part.type.startsWith("tool-");
}

export function isDataProgressPart(
  part: ChatUIMessagePart,
): part is ChatDataProgressPart {
  return part.type === "data-progress";
}
typescript

Key Types

  • ChatAgentUIMessage - Full message type with metadata, data parts, and tools
  • ChatUIMessagePart - Union of all possible message part types
  • ChatDataPart - Custom data parts (like progress updates)
  • ChatToolSet - Inferred tool types from your tool definitions

Type Guards

The isToolPart and isDataProgressPart functions help narrow types when processing message parts:

for (const part of message.parts) {
  if (isToolPart(part)) {
    // part is ChatToolPart
    console.log(part.toolCallId, part.input);
  } else if (isDataProgressPart(part)) {
    // part is ChatDataProgressPart
    console.log(part.data.text);
  }
}
typescript

Workflow Tool Definitions

Define tools that your agents can use within workflows.

Tool Structure

Create src/lib/ai/tools.ts:

import { google } from "@ai-sdk/google";
import { tool, type Tool } from "ai";
import { z } from "zod";

// Cast needed: @ai-sdk/google returns Tool<{}, unknown> but AI SDK expects Tool<any, any>
function asToolSetCompatible<T>(tool: T): Tool<any, any> {
  return tool as Tool<any, any>;
}

export const researchTools = {
  googleSearch: asToolSetCompatible(google.tools.googleSearch({})),
  urlContext: asToolSetCompatible(google.tools.urlContext({})),
};

export const draftingTools = {
  countCharacters: tool({
    description:
      "Count the number of characters in a text. Use this to verify tweet length before finalizing.",
    inputSchema: z.object({
      text: z.string().describe("The text to count characters for"),
    }),
    execute: async ({ text }) => {
      const count = text.length;
      const remaining = 280 - count;
      return {
        characterCount: count,
        remainingCharacters: remaining,
        isWithinLimit: count <= 280,
        status:
          count <= 280
            ? `${count}/280 characters (${remaining} remaining)`
            : `${count}/280 characters (${Math.abs(remaining)} over limit)`,
      };
    },
  }),
};

export const allTools = {
  ...researchTools,
  ...draftingTools,
};

// Tool type names for database schema - must match keys in allTools as "tool-{key}"
export const TOOL_TYPES = [
  "tool-googleSearch",
  "tool-urlContext",
  "tool-countCharacters",
] as const;

export type ToolType = (typeof TOOL_TYPES)[number];
typescript

Why Separate Tool Sets?

Tools are grouped by function (research vs drafting) so different agents can use different capabilities:

  • Research tools - Web search, URL context extraction
  • Drafting tools - Character counting, formatting helpers

Database Schema Integration

The TOOL_TYPES array must match your tool keys prefixed with tool- for the database schema's enum constraint. When adding new tools:

  1. Add the tool to the appropriate tool set
  2. Add "tool-{toolKey}" to TOOL_TYPES
  3. Update your database schema's tool type enum

Provider Tools

Some providers offer built-in tools (like Google's googleSearch). These need type casting for compatibility:

// Google's tools have different type signatures
function asToolSetCompatible<T>(tool: T): Tool<any, any> {
  return tool as Tool<any, any>;
}

const searchTool = asToolSetCompatible(google.tools.googleSearch({}));
typescript

Workflow Definition

The main workflow orchestrates agents and manages message persistence with durable execution.

Project Structure

src/workflows/chat/
├── index.ts       # Main workflow definition
├── types.ts       # Type definitions
└── steps/
    ├── history.ts   # Message persistence steps
    ├── stream.ts    # Stream control steps
    ├── router.ts    # Agent routing logic
    └── progress.ts  # Progress updates

Main Workflow

Create src/workflows/chat/index.ts:

import { getWorkflowMetadata, getWritable } from "workflow";
import type { ChatAgentUIMessage } from "./types";
import {
  persistUserMessage,
  createAssistantMessage,
  getMessageHistory,
  removeRunId,
  persistMessageParts,
} from "./steps/history";
import { startStream, finishStream } from "./steps/stream";
import { routerStep } from "./steps/router";
import { writeProgress } from "./steps/progress";
import { researchAgent } from "@/lib/ai/research";
import { draftingAgent } from "@/lib/ai/drafting";

/**
 * Main chat workflow that routes between research and drafting agents.
 * Uses runId for stream resumability on client reconnection.
 */
export async function chatWorkflow({
  chatId,
  userMessage,
}: {
  chatId: string;
  userMessage: ChatAgentUIMessage;
}) {
  "use workflow";

  const { workflowRunId } = getWorkflowMetadata();

  await persistUserMessage({ chatId, message: userMessage });

  const messageId = await createAssistantMessage({
    chatId,
    runId: workflowRunId,
  });

  const history = await getMessageHistory(chatId);

  await startStream(messageId);

  const { next, reasoning } = await routerStep(chatId, messageId, history);
  console.log(`Router: ${next} - ${reasoning}`);

  const progressText =
    next === "research" ? "Researching topic..." : "Authoring tweet...";
  await writeProgress(progressText, chatId, messageId);

  const agent = next === "research" ? researchAgent : draftingAgent;

  const { parts } = await agent.run(history, {
    maxSteps: 10,
    writable: getWritable(),
  });

  await persistMessageParts({ chatId, messageId, parts });

  await finishStream();

  await removeRunId(messageId);
}
typescript

The "use workflow" Directive

The "use workflow" directive marks the function as a durable workflow entry point:

  • Provides access to getWorkflowMetadata() for the run ID
  • Provides access to getWritable() for streaming
  • Enables step-level durability for all called steps

Workflow Flow

  1. Persist user message - Save incoming message to database
  2. Create assistant message - Create placeholder with runId for resumability
  3. Load history - Fetch full conversation context
  4. Start stream - Send stream start event to client
  5. Route - Determine which agent to invoke
  6. Progress update - Inform client what's happening
  7. Run agent - Execute the selected agent with tool loop
  8. Persist parts - Save agent response to database
  9. Finish stream - Send stream finish event
  10. Clear runId - Mark message as complete

Why runId Matters

The runId enables stream resumption:

  • Stored in the assistant message before streaming starts
  • Client receives it in response headers
  • If connection drops, client can reconnect using the runId
  • Cleared after successful completion

Workflow Steps

Steps are durable functions that persist their results. If a workflow is interrupted, completed steps are replayed from storage.

The "use step" Directive

Mark functions as durable steps with "use step":

async function myStep(input: string): Promise<string> {
  "use step";
  // This function's result is persisted
  return await someOperation(input);
}
typescript

Important: "use step" only works in standalone functions, not class methods.


History Steps

Create src/workflows/chat/steps/history.ts:

import type { UIMessage } from "ai";
import {
  convertDbMessagesToUIMessages,
  persistMessage,
  getChatMessages,
  clearMessageRunId,
  insertMessageParts,
} from "@/lib/chat/queries";
import { messages } from "@/lib/chat/schema";
import type { ChatAgentUIMessage } from "../types";
import { db } from "@/lib/db/client";

export async function persistUserMessage({
  chatId,
  message,
}: {
  chatId: string;
  message: ChatAgentUIMessage;
}): Promise<void> {
  "use step";

  await persistMessage({ chatId, message, runId: null });
}

/**
 * Creates message record with runId before streaming starts,
 * enabling client stream resumption on reconnection.
 */
export async function createAssistantMessage({
  chatId,
  runId,
}: {
  chatId: string;
  runId: string;
}): Promise<string> {
  "use step";

  const [{ messageId }] = await db
    .insert(messages)
    .values({
      chatId,
      role: "assistant",
      runId,
    })
    .returning({ messageId: messages.id });

  return messageId;
}

export async function getMessageHistory(
  chatId: string,
): Promise<ChatAgentUIMessage[]> {
  "use step";

  const messageHistory = await getChatMessages(chatId);
  return convertDbMessagesToUIMessages(messageHistory);
}

export async function removeRunId(messageId: string): Promise<void> {
  "use step";

  await clearMessageRunId(messageId);
}

export async function persistMessageParts({
  chatId,
  messageId,
  parts,
}: {
  chatId: string;
  messageId: string;
  parts: UIMessage["parts"];
}): Promise<void> {
  "use step";

  await insertMessageParts(
    chatId,
    messageId,
    parts as ChatAgentUIMessage["parts"],
  );
}
typescript

Stream Steps

Create src/workflows/chat/steps/stream.ts:

import { getWritable } from "workflow";
import type { UIMessageChunk } from "ai";

export async function startStream(messageId: string): Promise<void> {
  "use step";

  const writable = getWritable<UIMessageChunk>();
  const writer = writable.getWriter();
  try {
    await writer.write({
      type: "start",
      messageId,
    });
  } finally {
    writer.releaseLock();
  }
}

export async function finishStream(): Promise<void> {
  "use step";

  const writable = getWritable<UIMessageChunk>();
  const writer = writable.getWriter();
  try {
    await writer.write({
      type: "finish",
      finishReason: "stop",
    });
  } finally {
    writer.releaseLock();
  }

  await writable.close();
}
typescript

Progress Step

Create src/workflows/chat/steps/progress.ts:

import { getWritable } from "workflow";
import type { UIMessageChunk } from "ai";
import type { ChatDataProgressPart } from "../types";
import { insertMessageParts } from "@/lib/chat/queries";

/** Writes a progress update to both the stream and database. */
export async function writeProgress(
  text: string,
  chatId: string,
  messageId: string,
): Promise<void> {
  "use step";

  const progressPart: ChatDataProgressPart = {
    type: "data-progress",
    data: {
      text,
    },
  };

  const writable = getWritable<UIMessageChunk>();
  const writer = writable.getWriter();
  try {
    await writer.write(progressPart);
  } finally {
    writer.releaseLock();
  }

  await insertMessageParts(chatId, messageId, [progressPart]);
}
typescript

Router Step

Create src/workflows/chat/steps/router.ts:

import { generateObject, convertToModelMessages, type UIMessage } from "ai";
import { z } from "zod";
import { writeProgress } from "./progress";

const routerSystemPrompt = `You are an orchestrator agent for a tweet author system.

Analyze the conversation and determine what should happen next:

1. If the user provides a draft tweet idea, prompt, or topic that needs research:
   - Return { next: 'research' }

2. If research has been completed and the user confirms they want to proceed with drafting:
   - Return { next: 'drafting' }

3. If the user has feedback or questions about the research, or wants more information:
   - Return { next: 'research' }

4. If the conversation is just starting with a new tweet request:
   - Return { next: 'research' }

Look at the conversation history to understand the current state.`;

const routerSchema = z.object({
  next: z.enum(["research", "drafting"]).describe("The next agent to invoke"),
  reasoning: z
    .string()
    .describe("Brief explanation of why this route was chosen"),
});

export type RouterDecision = z.infer<typeof routerSchema>;

export async function routerStep(
  chatId: string,
  messageId: string,
  history: UIMessage[],
): Promise<RouterDecision> {
  "use step";

  await writeProgress("Thinking about the next step...", chatId, messageId);

  const result = await generateObject({
    model: "google/gemini-2.5-flash",
    system: routerSystemPrompt,
    schema: routerSchema,
    messages: convertToModelMessages(history),
  });

  return result.object;
}
typescript

Step Durability

Each step's result is persisted. If the workflow crashes after persistUserMessage completes but before createAssistantMessage:

  1. Workflow restarts
  2. persistUserMessage replays from storage (no duplicate insert)
  3. createAssistantMessage executes fresh

This ensures exactly-once semantics for database operations.


Workflow API Routes

API routes start workflows and handle stream resumption.

Start Workflow Endpoint

Create src/app/api/chats/[chatId]/messages/route.ts:

import { headers } from "next/headers";
import { verifyChatOwnership } from "@/lib/chat/queries";
import { auth } from "@/lib/auth/server";
import { chatWorkflow } from "@/workflows/chat";
import { start } from "workflow/api";
import { createUIMessageStreamResponse } from "ai";
import type { ChatAgentUIMessage } from "@/workflows/chat/types";

export async function POST(request: Request) {
  const session = await auth.api.getSession({
    headers: await headers(),
  });

  if (!session) {
    return new Response("Unauthorized", { status: 401 });
  }

  const { chatId, message } = (await request.json()) as {
    chatId: string;
    message: ChatAgentUIMessage;
  };

  if (!chatId || !message) {
    return new Response("Missing chatId or message", { status: 400 });
  }

  const isAuthorized = await verifyChatOwnership(chatId, session.user.id);
  if (!isAuthorized) {
    return new Response("Forbidden", { status: 403 });
  }

  // Start workflow with user message
  const run = await start(chatWorkflow, [
    {
      chatId,
      userMessage: message,
    },
  ]);

  // Return stream with runId for resumability
  return createUIMessageStreamResponse({
    stream: run.readable,
    headers: {
      "x-workflow-run-id": run.runId,
    },
  });
}
typescript

Resume Stream Endpoint

Create src/app/api/chats/[chatId]/messages/[runId]/stream/route.ts:

import { headers } from "next/headers";
import { getRun } from "workflow/api";
import { createUIMessageStreamResponse } from "ai";
import { auth } from "@/lib/auth/server";
import { verifyChatOwnership } from "@/lib/chat/queries";

/**
 * GET /api/chats/:chatId/messages/:runId/stream
 * Resume endpoint for workflow streams
 *
 * Query params:
 *   - startIndex: optional chunk index to resume from
 */
export async function GET(
  request: Request,
  { params }: { params: Promise<{ chatId: string; runId: string }> },
) {
  const session = await auth.api.getSession({
    headers: await headers(),
  });

  if (!session) {
    return new Response("Unauthorized", { status: 401 });
  }

  const { chatId, runId } = await params;

  if (!runId) {
    return new Response("Missing runId parameter", { status: 400 });
  }

  const isAuthorized = await verifyChatOwnership(chatId, session.user.id);
  if (!isAuthorized) {
    return new Response("Forbidden", { status: 403 });
  }

  const { searchParams } = new URL(request.url);
  const startIndexParam = searchParams.get("startIndex");
  const startIndex =
    startIndexParam !== null ? parseInt(startIndexParam, 10) : undefined;

  const run = await getRun(runId);
  const readable = await run.getReadable({ startIndex });

  return createUIMessageStreamResponse({
    stream: readable,
  });
}
typescript

Key Workflow API Functions

start(workflow, args)

Starts a new workflow run:

  • Returns { runId, readable }
  • runId uniquely identifies this run for resumption
  • readable is a ReadableStream of UI message chunks

getRun(runId)

Gets an existing workflow run:

  • Returns a run object with getReadable({ startIndex? })
  • startIndex allows resuming from a specific chunk

Response Headers

The x-workflow-run-id header is critical for resumability:

return createUIMessageStreamResponse({
  stream: run.readable,
  headers: {
    "x-workflow-run-id": run.runId,
  },
});
typescript

The client captures this header and uses it for reconnection.

Authorization

Both endpoints verify:

  1. User is authenticated (session exists)
  2. User owns the chat (verifyChatOwnership)

This prevents unauthorized access to other users' workflow streams.


Workflow Client Integration

The client uses WorkflowChatTransport for automatic stream resumption.

Resumable Chat Hook

Create src/hooks/use-resumable-chat.ts:

"use client";

import { useChat } from "@ai-sdk/react";
import { WorkflowChatTransport } from "@workflow/ai";
import { v7 as uuidv7 } from "uuid";
import type { ChatAgentUIMessage } from "@/workflows/chat/types";
import { useRef } from "react";

interface UseResumableChatOptions {
  chatId: string;
  messageHistory: ChatAgentUIMessage[];
  /** Initial workflow run ID for resuming an interrupted stream */
  initialRunId?: string;
}

/**
 * Custom hook that wraps useChat with WorkflowChatTransport for resumable streaming.
 */
export function useResumableChat({
  chatId,
  messageHistory,
  initialRunId,
}: UseResumableChatOptions) {
  const activeRunIdRef = useRef<string | undefined>(initialRunId);

  const chatResult = useChat<ChatAgentUIMessage>({
    messages: messageHistory,
    resume: !!initialRunId,
    transport: new WorkflowChatTransport({
      // Send new messages
      prepareSendMessagesRequest: ({ messages }) => ({
        api: `/api/chats/${chatId}/messages`,
        body: {
          chatId,
          message: messages[messages.length - 1],
        },
      }),

      // Store the workflow run ID when a message is sent
      onChatSendMessage: (response) => {
        const workflowRunId = response.headers.get("x-workflow-run-id");
        if (workflowRunId) {
          activeRunIdRef.current = workflowRunId;
        }
      },

      // Configure reconnection to use the ref for the latest value
      prepareReconnectToStreamRequest: ({ api, ...rest }) => {
        const currentRunId = activeRunIdRef.current;
        if (!currentRunId) {
          throw new Error("No active workflow run ID found for reconnection");
        }
        return {
          ...rest,
          api: `/api/chats/${chatId}/messages/${encodeURIComponent(currentRunId)}/stream`,
        };
      },

      // Clear the workflow run ID when the chat stream ends
      onChatEnd: () => {
        activeRunIdRef.current = undefined;
      },

      // Retry up to 5 times on reconnection errors
      maxConsecutiveErrors: 5,
    }),
    id: chatId,
    generateId: () => uuidv7(),
  });

  return chatResult;
}
typescript

Chat Page with Resumption Detection

Create or update src/app/[chatId]/page.tsx:

import { redirect } from "next/navigation";
import { headers } from "next/headers";
import { Chat } from "@/components/chat/chat";
import {
  convertDbMessagesToUIMessages,
  ensureChatExists,
  getChatMessages,
} from "@/lib/chat/queries";
import { auth } from "@/lib/auth/server";

interface PageProps {
  params: Promise<{
    chatId: string;
  }>;
}

export default async function ChatPage({ params }: PageProps) {
  const session = await auth.api.getSession({
    headers: await headers(),
  });

  if (!session) {
    redirect("/sign-in");
  }

  const { chatId } = await params;
  const userId = session.user.id;

  const isAuthorized = await ensureChatExists(chatId, userId);
  if (!isAuthorized) {
    redirect("/");
  }

  const persistedMessages = await getChatMessages(chatId);

  // Check for incomplete assistant message (has runId but no parts)
  // This happens when a workflow was interrupted mid-stream
  const lastMessage = persistedMessages.at(-1);
  const isIncompleteMessage =
    lastMessage?.role === "assistant" &&
    lastMessage?.runId &&
    lastMessage?.parts.length === 0;

  // Extract runId for resumption and remove empty message from history
  const initialRunId = isIncompleteMessage ? lastMessage.runId : undefined;
  const messagesToConvert = isIncompleteMessage
    ? persistedMessages.slice(0, -1)
    : persistedMessages;

  const history = convertDbMessagesToUIMessages(messagesToConvert);

  return (
    <Chat
      messageHistory={history}
      chatId={chatId}
      initialRunId={initialRunId ?? undefined}
    />
  );
}
typescript

How Resumption Detection Works

  1. On page load, fetch all messages for the chat
  2. Check last message - if it's an assistant message with runId but no parts, it's incomplete
  3. Extract runId - pass to client for resumption
  4. Remove empty message - don't show the incomplete message in UI
  5. Client resumes - WorkflowChatTransport reconnects using the runId

WorkflowChatTransport Options

OptionDescription
prepareSendMessagesRequestConfigure the initial message send request
onChatSendMessageCallback when message is sent (capture runId)
prepareReconnectToStreamRequestConfigure reconnection request URL
onChatEndCallback when stream completes
maxConsecutiveErrorsNumber of reconnection retries

Using the Hook in Components

"use client";

import { useResumableChat } from "@/hooks/use-resumable-chat";

export function Chat({ chatId, messageHistory, initialRunId }) {
  const { messages, sendMessage, status } = useResumableChat({
    chatId,
    messageHistory,
    initialRunId,
  });

  // Render messages and input...
}
tsx

Workflow Key Concepts

Understanding the core concepts behind resumable workflows.

How Resumability Works

  1. Workflow starts - workflowRunId is generated and returned in response header
  2. Message created - Assistant message is created with runId before streaming
  3. Client stores runId - WorkflowChatTransport captures it from header
  4. Connection lost - Client detects disconnect
  5. Auto-reconnect - Transport calls resume endpoint with runId and startIndex
  6. Stream resumes - Workflow SDK returns stream from where client left off
  7. Page reload - Server detects incomplete message by checking for runId, passes to client to resume

The "use step" Directive

Marks functions as durable workflow steps:

async function myStep(input: string): Promise<string> {
  "use step";
  // Result is persisted and replayed on restart
  return await someOperation(input);
}
typescript
  • Each step is persisted and can be replayed if the workflow is interrupted
  • Only works in standalone functions, not class methods
  • Enables exactly-once semantics for database operations

The "use workflow" Directive

Marks the main workflow function:

export async function chatWorkflow({ chatId, userMessage }) {
  "use workflow";

  const { workflowRunId } = getWorkflowMetadata();
  const writable = getWritable();
  // ...
}
typescript

Provides access to:

  • getWorkflowMetadata() - Get the run ID and other metadata
  • getWritable() - Get the writable stream for sending chunks

Stream Ordering

  • UUID v7 IDs ensure chronological ordering of message parts
  • startIndex parameter allows resuming from a specific chunk
  • Parts are sorted by ID when loading from database

Tool Loops

Agents continue executing until finishReason !== "tool-calls":

while (shouldContinue && stepCount < maxSteps) {
  const result = await executeAgentStep(modelMessages, stepConfig);
  shouldContinue = result.finishReason === "tool-calls";
  stepCount++;
}
typescript

This allows models to call tools multiple times before responding.

Why Tools Are Referenced by Key

Workflow runtimes serialize step inputs/outputs. Function references can't be serialized, so tools are stored in a lookup object:

const toolSets = {
  research: researchTools,
  drafting: draftingTools,
} as const;

// Inside step executor:
const tools = toolSets[config.stepOptions.tools];
typescript

Why Step Executors Are Separate

The "use step" directive only works in standalone functions:

// This works:
async function executeAgentStep(...) {
  "use step";
  // ...
}

// This does NOT work:
class Agent {
  async executeStep(...) {
    "use step"; // Error: directive not supported in methods
  }
}
typescript

Progress Updates

Use progress updates to keep users informed during long operations:

await writeProgress("Researching topic...", chatId, messageId);
typescript

Progress updates are:

  • Streamed to the client immediately
  • Persisted to the database for history

References