Step 1: Install the packages
bun add workflow @workflow/aibun add workflow @workflow/aiStep 2: Create the workflows folder
Create the src/workflows/ folder structure:
src/workflows/
steps/ # Shared step functions (reusable across workflows)
chat/
index.ts # Workflow orchestration function ("use workflow")
steps/ # Workflow-specific steps ("use step")
history.ts
logger.ts
name-chat.ts
types.ts # Workflow-specific typessrc/workflows/
steps/ # Shared step functions (reusable across workflows)
chat/
index.ts # Workflow orchestration function ("use workflow")
steps/ # Workflow-specific steps ("use step")
history.ts
logger.ts
name-chat.ts
types.ts # Workflow-specific typesworkflows/steps/- Shared step functions reusable across workflows.workflows/chat/- A specific workflow with its own orchestration and steps.
Step 3: Update Next.js config
Update the Next.js configuration:
// next.config.ts
import type { NextConfig } from "next";
import { withWorkflow } from "workflow/next";
const nextConfig: NextConfig = {
/* config options here */
reactCompiler: true,
};
export default withWorkflow(nextConfig);// next.config.ts
import type { NextConfig } from "next";
import { withWorkflow } from "workflow/next";
const nextConfig: NextConfig = {
/* config options here */
reactCompiler: true,
};
export default withWorkflow(nextConfig);Step 4: Add stream step utilities
Install via shadcn registry
bunx --bun shadcn@latest add https://fullstackrecipes.com/r/workflow-stream.jsonWhen streaming UIMessageChunk responses (like chat messages), you must signal the start and end of the stream. This is required for proper stream framing with WorkflowChatTransport.
The Chat Workflow
Create the main workflow that processes user messages and generates AI responses:
import { getWorkflowMetadata, getWritable } from "workflow";
import type { ChatAgentUIMessage } from "./types";
import {
persistUserMessage,
createAssistantMessage,
getMessageHistory,
removeRunId,
persistMessageParts,
} from "./steps/history";
import { startStream, finishStream } from "../steps/stream";
import { log } from "./steps/logger";
import { nameChatStep } from "./steps/name-chat";
import { chatAgent } from "@/lib/ai/chat-agent";
/**
* Main chat workflow that processes user messages and generates AI responses.
* Uses runId for stream resumability on client reconnection.
*/
export async function chatWorkflow({
chatId,
userMessage,
}: {
chatId: string;
userMessage: ChatAgentUIMessage;
}) {
"use workflow";
const { workflowRunId } = getWorkflowMetadata();
await log("info", "Starting chat workflow", { chatId, runId: workflowRunId });
// Persist the user message
await persistUserMessage({ chatId, message: userMessage });
// Create a placeholder assistant message with runId for resumability
const messageId = await createAssistantMessage({
chatId,
runId: workflowRunId,
});
// Get full message history
const history = await getMessageHistory(chatId);
// Start the UI message stream
await startStream(messageId);
// Run the agent with streaming
const { parts } = await chatAgent.run(history, {
maxSteps: 10,
writable: getWritable(),
});
// Persist the assistant message parts
await persistMessageParts({ chatId, messageId, parts });
// Finish the UI message stream
await finishStream();
// Clear the runId to mark the message as complete
await removeRunId(messageId);
// Generate a chat title if this is the first message
await nameChatStep(chatId, userMessage);
await log("info", "Chat workflow completed", {
chatId,
runId: workflowRunId,
partsCount: parts.length,
});
}import { getWorkflowMetadata, getWritable } from "workflow";
import type { ChatAgentUIMessage } from "./types";
import {
persistUserMessage,
createAssistantMessage,
getMessageHistory,
removeRunId,
persistMessageParts,
} from "./steps/history";
import { startStream, finishStream } from "../steps/stream";
import { log } from "./steps/logger";
import { nameChatStep } from "./steps/name-chat";
import { chatAgent } from "@/lib/ai/chat-agent";
/**
* Main chat workflow that processes user messages and generates AI responses.
* Uses runId for stream resumability on client reconnection.
*/
export async function chatWorkflow({
chatId,
userMessage,
}: {
chatId: string;
userMessage: ChatAgentUIMessage;
}) {
"use workflow";
const { workflowRunId } = getWorkflowMetadata();
await log("info", "Starting chat workflow", { chatId, runId: workflowRunId });
// Persist the user message
await persistUserMessage({ chatId, message: userMessage });
// Create a placeholder assistant message with runId for resumability
const messageId = await createAssistantMessage({
chatId,
runId: workflowRunId,
});
// Get full message history
const history = await getMessageHistory(chatId);
// Start the UI message stream
await startStream(messageId);
// Run the agent with streaming
const { parts } = await chatAgent.run(history, {
maxSteps: 10,
writable: getWritable(),
});
// Persist the assistant message parts
await persistMessageParts({ chatId, messageId, parts });
// Finish the UI message stream
await finishStream();
// Clear the runId to mark the message as complete
await removeRunId(messageId);
// Generate a chat title if this is the first message
await nameChatStep(chatId, userMessage);
await log("info", "Chat workflow completed", {
chatId,
runId: workflowRunId,
partsCount: parts.length,
});
}History Steps
Create step functions for message persistence:
import type { UIMessage } from "ai";
import { db } from "@/lib/db/client";
import { messages, chats } from "@/lib/chat/schema";
import {
persistMessage,
insertMessageParts,
getChatMessages,
convertDbMessagesToUIMessages,
clearMessageRunId,
} from "@/lib/chat/queries";
import { eq } from "drizzle-orm";
import { assertChatAgentParts, type ChatAgentUIMessage } from "../types";
import { v7 as uuidv7 } from "uuid";
/**
* Persist a user message to the database.
*/
export async function persistUserMessage({
chatId,
message,
}: {
chatId: string;
message: ChatAgentUIMessage;
}): Promise<void> {
"use step";
await persistMessage({ chatId, message });
// Update chat timestamp
await db
.update(chats)
.set({ updatedAt: new Date() })
.where(eq(chats.id, chatId));
}
/**
* Create a placeholder assistant message with a runId for stream resumption.
* Parts will be added later when streaming completes.
*/
export async function createAssistantMessage({
chatId,
runId,
}: {
chatId: string;
runId: string;
}): Promise<string> {
"use step";
const [{ messageId }] = await db
.insert(messages)
.values({
id: uuidv7(),
chatId,
role: "assistant",
runId,
})
.returning({ messageId: messages.id });
return messageId;
}
/**
* Persist message parts after streaming completes.
* Validates and narrows generic UIMessage parts to ChatAgentUIMessage parts.
*/
export async function persistMessageParts({
chatId,
messageId,
parts,
}: {
chatId: string;
messageId: string;
parts: UIMessage["parts"];
}): Promise<void> {
"use step";
assertChatAgentParts(parts);
await insertMessageParts(chatId, messageId, parts);
// Update chat timestamp
await db
.update(chats)
.set({ updatedAt: new Date() })
.where(eq(chats.id, chatId));
}
/**
* Get message history for a chat, converted to UI message format.
*/
export async function getMessageHistory(
chatId: string,
): Promise<ChatAgentUIMessage[]> {
"use step";
const dbMessages = await getChatMessages(chatId);
return convertDbMessagesToUIMessages(dbMessages);
}
/**
* Clear the runId from a message after streaming is complete.
* This marks the message as finalized.
*/
export async function removeRunId(messageId: string): Promise<void> {
"use step";
await clearMessageRunId(messageId);
}import type { UIMessage } from "ai";
import { db } from "@/lib/db/client";
import { messages, chats } from "@/lib/chat/schema";
import {
persistMessage,
insertMessageParts,
getChatMessages,
convertDbMessagesToUIMessages,
clearMessageRunId,
} from "@/lib/chat/queries";
import { eq } from "drizzle-orm";
import { assertChatAgentParts, type ChatAgentUIMessage } from "../types";
import { v7 as uuidv7 } from "uuid";
/**
* Persist a user message to the database.
*/
export async function persistUserMessage({
chatId,
message,
}: {
chatId: string;
message: ChatAgentUIMessage;
}): Promise<void> {
"use step";
await persistMessage({ chatId, message });
// Update chat timestamp
await db
.update(chats)
.set({ updatedAt: new Date() })
.where(eq(chats.id, chatId));
}
/**
* Create a placeholder assistant message with a runId for stream resumption.
* Parts will be added later when streaming completes.
*/
export async function createAssistantMessage({
chatId,
runId,
}: {
chatId: string;
runId: string;
}): Promise<string> {
"use step";
const [{ messageId }] = await db
.insert(messages)
.values({
id: uuidv7(),
chatId,
role: "assistant",
runId,
})
.returning({ messageId: messages.id });
return messageId;
}
/**
* Persist message parts after streaming completes.
* Validates and narrows generic UIMessage parts to ChatAgentUIMessage parts.
*/
export async function persistMessageParts({
chatId,
messageId,
parts,
}: {
chatId: string;
messageId: string;
parts: UIMessage["parts"];
}): Promise<void> {
"use step";
assertChatAgentParts(parts);
await insertMessageParts(chatId, messageId, parts);
// Update chat timestamp
await db
.update(chats)
.set({ updatedAt: new Date() })
.where(eq(chats.id, chatId));
}
/**
* Get message history for a chat, converted to UI message format.
*/
export async function getMessageHistory(
chatId: string,
): Promise<ChatAgentUIMessage[]> {
"use step";
const dbMessages = await getChatMessages(chatId);
return convertDbMessagesToUIMessages(dbMessages);
}
/**
* Clear the runId from a message after streaming is complete.
* This marks the message as finalized.
*/
export async function removeRunId(messageId: string): Promise<void> {
"use step";
await clearMessageRunId(messageId);
}Logging in Workflows
Workflow functions run in a restricted environment that doesn't support Node.js modules like fs, events, or worker_threads. Since pino uses these modules, you cannot import the logger directly in workflow functions.
Instead, wrap logger calls in a step function:
import { logger } from "@/lib/logging/logger";
type LogLevel = "info" | "warn" | "error" | "debug";
/**
* Workflow-safe logger step.
* Wraps pino logger calls in a step function to avoid bundling
* Node.js modules (fs, events, worker_threads) into workflow functions.
*/
export async function log(
level: LogLevel,
message: string,
data?: Record<string, unknown>,
): Promise<void> {
"use step";
if (data) {
logger[level](data, message);
} else {
logger[level](message);
}
}import { logger } from "@/lib/logging/logger";
type LogLevel = "info" | "warn" | "error" | "debug";
/**
* Workflow-safe logger step.
* Wraps pino logger calls in a step function to avoid bundling
* Node.js modules (fs, events, worker_threads) into workflow functions.
*/
export async function log(
level: LogLevel,
message: string,
data?: Record<string, unknown>,
): Promise<void> {
"use step";
if (data) {
logger[level](data, message);
} else {
logger[level](message);
}
}This pattern applies to any library that uses Node.js modules. Move the import and usage into a step function to isolate it from the workflow runtime.