Back to recipes

Workflow Development Kit Setup

Install and configure the Workflow Development Kit for resumable, durable AI agent workflows with step-level persistence, stream resumption, and agent orchestration.

Setup Instructions

Step 1: Install the packages

bash
bun add workflow @workflow/ai

Step 2: Create the workflows folder

Create the src/workflows/ folder structure:

src/workflows/
  steps/           # Shared step functions (reusable across workflows)
  chat/
    index.ts       # Workflow orchestration function ("use workflow")
    steps/         # Workflow-specific steps ("use step")
      history.ts
      logger.ts
      name-chat.ts
    types.ts       # Workflow-specific types
  • workflows/steps/ - Shared step functions reusable across workflows.
  • workflows/chat/ - A specific workflow with its own orchestration and steps.

Step 3: Update Next.js config

Update the Next.js configuration:

ts
// next.config.ts
import type { NextConfig } from "next";
import { withWorkflow } from "workflow/next";

const nextConfig: NextConfig = {
  /* config options here */
  reactCompiler: true,
};

export default withWorkflow(nextConfig);

Step 4: Add stream step utilities

Install via shadcn registry

optional
bunx --bun shadcn@latest add https://fullstackrecipes.com/r/workflow-stream.json

When streaming UIMessageChunk responses (like chat messages), you must signal the start and end of the stream. This is required for proper stream framing with WorkflowChatTransport.


The Chat Workflow

Create the main workflow that processes user messages and generates AI responses:

ts
import { getWorkflowMetadata, getWritable } from "workflow";
import type { ChatAgentUIMessage } from "./types";
import {
  persistUserMessage,
  createAssistantMessage,
  getMessageHistory,
  removeRunId,
  persistMessageParts,
} from "./steps/history";
import { startStream, finishStream } from "../steps/stream";
import { log } from "./steps/logger";
import { nameChatStep } from "./steps/name-chat";
import { chatAgent } from "@/lib/ai/chat-agent";

/**
 * Main chat workflow that processes user messages and generates AI responses.
 * Uses runId for stream resumability on client reconnection.
 */
export async function chatWorkflow({
  chatId,
  userMessage,
}: {
  chatId: string;
  userMessage: ChatAgentUIMessage;
}) {
  "use workflow";

  const { workflowRunId } = getWorkflowMetadata();

  await log("info", "Starting chat workflow", { chatId, runId: workflowRunId });

  // Persist the user message
  await persistUserMessage({ chatId, message: userMessage });

  // Create a placeholder assistant message with runId for resumability
  const messageId = await createAssistantMessage({
    chatId,
    runId: workflowRunId,
  });

  // Get full message history
  const history = await getMessageHistory(chatId);

  // Start the UI message stream
  await startStream(messageId);

  // Run the agent with streaming
  const { parts } = await chatAgent.run(history, {
    maxSteps: 10,
    writable: getWritable(),
  });

  // Persist the assistant message parts
  await persistMessageParts({ chatId, messageId, parts });

  // Finish the UI message stream
  await finishStream();

  // Clear the runId to mark the message as complete
  await removeRunId(messageId);

  // Generate a chat title if this is the first message
  await nameChatStep(chatId, userMessage);

  await log("info", "Chat workflow completed", {
    chatId,
    runId: workflowRunId,
    partsCount: parts.length,
  });
}

History Steps

Create step functions for message persistence:

ts
import type { UIMessage } from "ai";
import { db } from "@/lib/db/client";
import { messages, chats } from "@/lib/chat/schema";
import {
  persistMessage,
  insertMessageParts,
  getChatMessages,
  convertDbMessagesToUIMessages,
  clearMessageRunId,
} from "@/lib/chat/queries";
import { eq } from "drizzle-orm";
import { assertChatAgentParts, type ChatAgentUIMessage } from "../types";
import { v7 as uuidv7 } from "uuid";

/**
 * Persist a user message to the database.
 */
export async function persistUserMessage({
  chatId,
  message,
}: {
  chatId: string;
  message: ChatAgentUIMessage;
}): Promise<void> {
  "use step";

  await persistMessage({ chatId, message });

  // Update chat timestamp
  await db
    .update(chats)
    .set({ updatedAt: new Date() })
    .where(eq(chats.id, chatId));
}

/**
 * Create a placeholder assistant message with a runId for stream resumption.
 * Parts will be added later when streaming completes.
 */
export async function createAssistantMessage({
  chatId,
  runId,
}: {
  chatId: string;
  runId: string;
}): Promise<string> {
  "use step";

  const [{ messageId }] = await db
    .insert(messages)
    .values({
      id: uuidv7(),
      chatId,
      role: "assistant",
      runId,
    })
    .returning({ messageId: messages.id });

  return messageId;
}

/**
 * Persist message parts after streaming completes.
 * Validates and narrows generic UIMessage parts to ChatAgentUIMessage parts.
 */
export async function persistMessageParts({
  chatId,
  messageId,
  parts,
}: {
  chatId: string;
  messageId: string;
  parts: UIMessage["parts"];
}): Promise<void> {
  "use step";

  assertChatAgentParts(parts);

  await insertMessageParts(chatId, messageId, parts);

  // Update chat timestamp
  await db
    .update(chats)
    .set({ updatedAt: new Date() })
    .where(eq(chats.id, chatId));
}

/**
 * Get message history for a chat, converted to UI message format.
 */
export async function getMessageHistory(
  chatId: string,
): Promise<ChatAgentUIMessage[]> {
  "use step";

  const dbMessages = await getChatMessages(chatId);
  return convertDbMessagesToUIMessages(dbMessages);
}

/**
 * Clear the runId from a message after streaming is complete.
 * This marks the message as finalized.
 */
export async function removeRunId(messageId: string): Promise<void> {
  "use step";

  await clearMessageRunId(messageId);
}

Logging in Workflows

Workflow functions run in a restricted environment that doesn't support Node.js modules like fs, events, or worker_threads. Since pino uses these modules, you cannot import the logger directly in workflow functions.

Instead, wrap logger calls in a step function:

ts
import { logger } from "@/lib/logging/logger";

type LogLevel = "info" | "warn" | "error" | "debug";

/**
 * Workflow-safe logger step.
 * Wraps pino logger calls in a step function to avoid bundling
 * Node.js modules (fs, events, worker_threads) into workflow functions.
 */
export async function log(
  level: LogLevel,
  message: string,
  data?: Record<string, unknown>,
): Promise<void> {
  "use step";

  if (data) {
    logger[level](data, message);
  } else {
    logger[level](message);
  }
}

This pattern applies to any library that uses Node.js modules. Move the import and usage into a step function to isolate it from the workflow runtime.


References