{"content":"# Workflow Development Kit Setup\n\nInstall and configure the Workflow Development Kit for resumable, durable AI agent workflows with step-level persistence, stream resumption, and agent orchestration.\n\n### Step 1: Install the packages\n\n```bash\nbun add workflow @workflow/ai\n```\n\n### Step 2: Create the workflows folder\n\nCreate the `src/workflows/` folder structure:\n\n```\nsrc/workflows/\n  steps/           # Shared step functions (reusable across workflows)\n  chat/\n    index.ts       # Workflow orchestration function (\"use workflow\")\n    steps/         # Workflow-specific steps (\"use step\")\n      history.ts\n      logger.ts\n      name-chat.ts\n    types.ts       # Workflow-specific types\n```\n\n- **`workflows/steps/`** - Shared step functions reusable across workflows.\n- **`workflows/chat/`** - A specific workflow with its own orchestration and steps.\n\n### Step 3: Update Next.js config\n\nUpdate the Next.js configuration:\n\n```ts\n// next.config.ts\nimport type { NextConfig } from \"next\";\nimport { withWorkflow } from \"workflow/next\";\n\nconst nextConfig: NextConfig = {\n  /* config options here */\n  reactCompiler: true,\n};\n\nexport default withWorkflow(nextConfig);\n```\n\n### Step 4: Add stream step utilities\n\n**Install via shadcn registry:**\n\n```bash\nbunx --bun shadcn@latest add https://fullstackrecipes.com/r/workflow-stream.json\n```\n\n**Or copy the source code:**\n\n`workflows/steps/stream.ts`:\n\n```typescript\nimport { getWritable } from \"workflow\";\nimport type { UIMessageChunk } from \"ai\";\n\n/**\n * Signal the start of a UI message stream.\n * Must be called before agent.run() when streaming UIMessageChunks.\n */\nexport async function startStream(messageId: string): Promise<void> {\n  \"use step\";\n\n  const writable = getWritable<UIMessageChunk>();\n  const writer = writable.getWriter();\n  try {\n    await writer.write({\n      type: \"start\",\n      messageId,\n    });\n  } finally {\n    writer.releaseLock();\n  }\n}\n\n/**\n * Signal the end of a UI message stream.\n * Must be called after agent.run() completes to close the stream properly.\n */\nexport async function finishStream(): Promise<void> {\n  \"use step\";\n\n  const writable = getWritable<UIMessageChunk>();\n  const writer = writable.getWriter();\n  try {\n    await writer.write({\n      type: \"finish\",\n      finishReason: \"stop\",\n    });\n  } finally {\n    writer.releaseLock();\n  }\n\n  await writable.close();\n}\n```\n\nWhen streaming `UIMessageChunk` responses (like chat messages), you must signal the start and end of the stream. This is required for proper stream framing with `WorkflowChatTransport`.\n\n---\n\n## The Chat Workflow\n\nCreate the main workflow that processes user messages and generates AI responses:\n\n```typescript\n// src/workflows/chat/index.ts\nimport { getWorkflowMetadata, getWritable } from \"workflow\";\nimport type { ChatAgentUIMessage } from \"./types\";\nimport {\n  persistUserMessage,\n  createAssistantMessage,\n  getMessageHistory,\n  removeRunId,\n  persistMessageParts,\n} from \"./steps/history\";\nimport { startStream, finishStream } from \"../steps/stream\";\nimport { log } from \"./steps/logger\";\nimport { nameChatStep } from \"./steps/name-chat\";\nimport { chatAgent } from \"@/lib/ai/chat-agent\";\n\n/**\n * Main chat workflow that processes user messages and generates AI responses.\n * Uses runId for stream resumability on client reconnection.\n */\nexport async function chatWorkflow({\n  chatId,\n  userMessage,\n}: {\n  chatId: string;\n  userMessage: ChatAgentUIMessage;\n}) {\n  \"use workflow\";\n\n  const { workflowRunId } = getWorkflowMetadata();\n\n  await log(\"info\", \"Starting chat workflow\", { chatId, runId: workflowRunId });\n\n  // Persist the user message\n  await persistUserMessage({ chatId, message: userMessage });\n\n  // Create a placeholder assistant message with runId for resumability\n  const messageId = await createAssistantMessage({\n    chatId,\n    runId: workflowRunId,\n  });\n\n  // Get full message history\n  const history = await getMessageHistory(chatId);\n\n  // Start the UI message stream\n  await startStream(messageId);\n\n  // Run the agent with streaming\n  const { parts } = await chatAgent.run(history, {\n    maxSteps: 10,\n    writable: getWritable(),\n  });\n\n  // Persist the assistant message parts\n  await persistMessageParts({ chatId, messageId, parts });\n\n  // Finish the UI message stream\n  await finishStream();\n\n  // Clear the runId to mark the message as complete\n  await removeRunId(messageId);\n\n  // Generate a chat title if this is the first message\n  await nameChatStep(chatId, userMessage);\n\n  await log(\"info\", \"Chat workflow completed\", {\n    chatId,\n    runId: workflowRunId,\n    partsCount: parts.length,\n  });\n}\n```\n\n---\n\n## History Steps\n\nCreate step functions for message persistence:\n\n```typescript\n// src/workflows/chat/steps/history.ts\nimport type { UIMessage } from \"ai\";\nimport { db } from \"@/lib/db/client\";\nimport { messages, chats } from \"@/lib/chat/schema\";\nimport {\n  persistMessage,\n  insertMessageParts,\n  getChatMessages,\n  convertDbMessagesToUIMessages,\n  clearMessageRunId,\n} from \"@/lib/chat/queries\";\nimport { eq } from \"drizzle-orm\";\nimport { assertChatAgentParts, type ChatAgentUIMessage } from \"../types\";\nimport { v7 as uuidv7 } from \"uuid\";\n\n/**\n * Persist a user message to the database.\n */\nexport async function persistUserMessage({\n  chatId,\n  message,\n}: {\n  chatId: string;\n  message: ChatAgentUIMessage;\n}): Promise<void> {\n  \"use step\";\n\n  await persistMessage({ chatId, message });\n\n  // Update chat timestamp\n  await db\n    .update(chats)\n    .set({ updatedAt: new Date() })\n    .where(eq(chats.id, chatId));\n}\n\n/**\n * Create a placeholder assistant message with a runId for stream resumption.\n * Parts will be added later when streaming completes.\n */\nexport async function createAssistantMessage({\n  chatId,\n  runId,\n}: {\n  chatId: string;\n  runId: string;\n}): Promise<string> {\n  \"use step\";\n\n  const [{ messageId }] = await db\n    .insert(messages)\n    .values({\n      id: uuidv7(),\n      chatId,\n      role: \"assistant\",\n      runId,\n    })\n    .returning({ messageId: messages.id });\n\n  return messageId;\n}\n\n/**\n * Persist message parts after streaming completes.\n * Validates and narrows generic UIMessage parts to ChatAgentUIMessage parts.\n */\nexport async function persistMessageParts({\n  chatId,\n  messageId,\n  parts,\n}: {\n  chatId: string;\n  messageId: string;\n  parts: UIMessage[\"parts\"];\n}): Promise<void> {\n  \"use step\";\n\n  assertChatAgentParts(parts);\n\n  await insertMessageParts(chatId, messageId, parts);\n\n  // Update chat timestamp\n  await db\n    .update(chats)\n    .set({ updatedAt: new Date() })\n    .where(eq(chats.id, chatId));\n}\n\n/**\n * Get message history for a chat, converted to UI message format.\n */\nexport async function getMessageHistory(\n  chatId: string,\n): Promise<ChatAgentUIMessage[]> {\n  \"use step\";\n\n  const dbMessages = await getChatMessages(chatId);\n  return convertDbMessagesToUIMessages(dbMessages);\n}\n\n/**\n * Clear the runId from a message after streaming is complete.\n * This marks the message as finalized.\n */\nexport async function removeRunId(messageId: string): Promise<void> {\n  \"use step\";\n\n  await clearMessageRunId(messageId);\n}\n```\n\n---\n\n## Logging in Workflows\n\nWorkflow functions run in a restricted environment that doesn't support Node.js modules like `fs`, `events`, or `worker_threads`. Since pino uses these modules, you cannot import the logger directly in workflow functions.\n\nInstead, wrap logger calls in a step function:\n\n```ts\n// src/workflows/chat/steps/logger.ts\nimport { logger } from \"@/lib/logging/logger\";\n\ntype LogLevel = \"info\" | \"warn\" | \"error\" | \"debug\";\n\n/**\n * Workflow-safe logger step.\n * Wraps pino logger calls in a step function to avoid bundling\n * Node.js modules (fs, events, worker_threads) into workflow functions.\n */\nexport async function log(\n  level: LogLevel,\n  message: string,\n  data?: Record<string, unknown>,\n): Promise<void> {\n  \"use step\";\n\n  if (data) {\n    logger[level](data, message);\n  } else {\n    logger[level](message);\n  }\n}\n```\n\nThis pattern applies to any library that uses Node.js modules. Move the import and usage into a step function to isolate it from the workflow runtime.\n\n---\n\n## References\n\n- [Workflow Development Kit Documentation](https://useworkflow.dev/docs)\n- [Getting Started on Next.js](https://useworkflow.dev/docs/getting-started/next)"}