Working with Workflows
Create and run durable workflows with steps, streaming, and agent execution. Covers starting, resuming, and persisting workflow results.
See:
- Resource:
using-workflowsin Fullstack Recipes - URL: https://fullstackrecipes.com/recipes/using-workflows
Workflow Folder Structure
Each workflow has its own subfolder in src/workflows/:
src/workflows/
steps/ # Shared step functions
stream.ts # UI message stream helpers
chat/
index.ts # Workflow orchestration function ("use workflow")
steps/ # Workflow-specific steps ("use step")
history.ts
logger.ts
name-chat.ts
types.ts # Workflow-specific typessrc/workflows/
steps/ # Shared step functions
stream.ts # UI message stream helpers
chat/
index.ts # Workflow orchestration function ("use workflow")
steps/ # Workflow-specific steps ("use step")
history.ts
logger.ts
name-chat.ts
types.ts # Workflow-specific typesworkflows/steps/- Shared step functions reusable across workflows (e.g., stream helpers).index.ts- Contains the main workflow function with the"use workflow"directive. Orchestrates the workflow by calling step functions.steps/- Contains individual step functions with the"use step"directive. Each step is a durable checkpoint.types.ts- Type definitions for the workflow's UI messages.
Creating a Workflow
Define workflows with the "use workflow" directive:
import { getWorkflowMetadata, getWritable } from "workflow";
import { startStream, finishStream } from "../steps/stream";
import { chatAgent } from "@/lib/ai/chat-agent";
export async function chatWorkflow({ chatId, userMessage }) {
"use workflow";
const { workflowRunId } = getWorkflowMetadata();
// Persist user message
await persistUserMessage({ chatId, message: userMessage });
// Create assistant placeholder with runId for resumption
const messageId = await createAssistantMessage({
chatId,
runId: workflowRunId,
});
// Get message history
const history = await getMessageHistory(chatId);
// Start the UI message stream
await startStream(messageId);
// Run agent with streaming
const { parts } = await chatAgent.run(history, {
maxSteps: 10,
writable: getWritable(),
});
// Persist and finalize
await persistMessageParts({ chatId, messageId, parts });
// Finish the UI message stream
await finishStream();
await removeRunId(messageId);
}import { getWorkflowMetadata, getWritable } from "workflow";
import { startStream, finishStream } from "../steps/stream";
import { chatAgent } from "@/lib/ai/chat-agent";
export async function chatWorkflow({ chatId, userMessage }) {
"use workflow";
const { workflowRunId } = getWorkflowMetadata();
// Persist user message
await persistUserMessage({ chatId, message: userMessage });
// Create assistant placeholder with runId for resumption
const messageId = await createAssistantMessage({
chatId,
runId: workflowRunId,
});
// Get message history
const history = await getMessageHistory(chatId);
// Start the UI message stream
await startStream(messageId);
// Run agent with streaming
const { parts } = await chatAgent.run(history, {
maxSteps: 10,
writable: getWritable(),
});
// Persist and finalize
await persistMessageParts({ chatId, messageId, parts });
// Finish the UI message stream
await finishStream();
await removeRunId(messageId);
}Starting a Workflow
Use the start function from workflow/api:
import { start } from "workflow/api";
import { chatWorkflow } from "@/workflows/chat";
const run = await start(chatWorkflow, [{ chatId, userMessage }]);
// run.runId - unique identifier for this run
// run.readable - stream of UI message chunksimport { start } from "workflow/api";
import { chatWorkflow } from "@/workflows/chat";
const run = await start(chatWorkflow, [{ chatId, userMessage }]);
// run.runId - unique identifier for this run
// run.readable - stream of UI message chunksResuming a Workflow Stream
Use getRun to reconnect to an in-progress or completed workflow:
import { getRun } from "workflow/api";
const run = await getRun(runId);
const readable = await run.getReadable({ startIndex });import { getRun } from "workflow/api";
const run = await getRun(runId);
const readable = await run.getReadable({ startIndex });Using Steps
Steps are durable checkpoints that persist their results:
async function getMessageHistory(chatId: string) {
"use step";
const dbMessages = await getChatMessages(chatId);
return convertDbMessagesToUIMessages(dbMessages);
}async function getMessageHistory(chatId: string) {
"use step";
const dbMessages = await getChatMessages(chatId);
return convertDbMessagesToUIMessages(dbMessages);
}Streaming UIMessageChunks
When streaming UIMessageChunk responses to clients (e.g., chat messages), you must signal the start and end of the stream. This is required for proper stream framing with WorkflowChatTransport.
Always call startStream() before agent.run() and finishStream() after:
import { getWritable } from "workflow";
import { startStream, finishStream } from "../steps/stream";
import { chatAgent } from "@/lib/ai/chat-agent";
export async function chatWorkflow({ chatId, messageId }) {
"use workflow";
const history = await getMessageHistory(chatId);
// Signal stream start with the message ID
await startStream(messageId);
// Run agent - streams UIMessageChunks to the client
const { parts } = await chatAgent.run(history, {
maxSteps: 10,
writable: getWritable(),
});
await persistMessageParts({ chatId, messageId, parts });
// Signal stream end and close the writable
await finishStream();
}import { getWritable } from "workflow";
import { startStream, finishStream } from "../steps/stream";
import { chatAgent } from "@/lib/ai/chat-agent";
export async function chatWorkflow({ chatId, messageId }) {
"use workflow";
const history = await getMessageHistory(chatId);
// Signal stream start with the message ID
await startStream(messageId);
// Run agent - streams UIMessageChunks to the client
const { parts } = await chatAgent.run(history, {
maxSteps: 10,
writable: getWritable(),
});
await persistMessageParts({ chatId, messageId, parts });
// Signal stream end and close the writable
await finishStream();
}The stream step functions write UIMessageChunk messages:
startStream(messageId)- Writes{ type: "start", messageId }to signal a new messagefinishStream()- Writes{ type: "finish", finishReason: "stop" }and closes the stream
Without these signals, the client's WorkflowChatTransport cannot properly parse the streamed response.
Getting Workflow Metadata
Access the current run's metadata:
import { getWorkflowMetadata } from "workflow";
export async function chatWorkflow({ chatId }) {
"use workflow";
const { workflowRunId } = getWorkflowMetadata();
// Store runId for resumption
await createAssistantMessage({ chatId, runId: workflowRunId });
}import { getWorkflowMetadata } from "workflow";
export async function chatWorkflow({ chatId }) {
"use workflow";
const { workflowRunId } = getWorkflowMetadata();
// Store runId for resumption
await createAssistantMessage({ chatId, runId: workflowRunId });
}Workflow-Safe Logging
The workflow runtime doesn't support Node.js modules. Wrap logger calls in steps:
import { logger } from "@/lib/logging/logger";
export async function log(
level: "info" | "warn" | "error" | "debug",
message: string,
data?: Record<string, unknown>,
): Promise<void> {
"use step";
if (data) {
logger[level](data, message);
} else {
logger[level](message);
}
}import { logger } from "@/lib/logging/logger";
export async function log(
level: "info" | "warn" | "error" | "debug",
message: string,
data?: Record<string, unknown>,
): Promise<void> {
"use step";
if (data) {
logger[level](data, message);
} else {
logger[level](message);
}
}Running Agents in Workflows
Use the custom Agent class for full streaming control:
import { getWritable } from "workflow";
import { startStream, finishStream } from "../steps/stream";
import { chatAgent } from "@/lib/ai/chat-agent";
export async function chatWorkflow({ chatId, userMessage }) {
"use workflow";
const messageId = await createAssistantMessage({ chatId, runId });
const history = await getMessageHistory(chatId);
await startStream(messageId);
const { parts } = await chatAgent.run(history, {
maxSteps: 10,
writable: getWritable(),
});
await persistMessageParts({ chatId, messageId, parts });
await finishStream();
}import { getWritable } from "workflow";
import { startStream, finishStream } from "../steps/stream";
import { chatAgent } from "@/lib/ai/chat-agent";
export async function chatWorkflow({ chatId, userMessage }) {
"use workflow";
const messageId = await createAssistantMessage({ chatId, runId });
const history = await getMessageHistory(chatId);
await startStream(messageId);
const { parts } = await chatAgent.run(history, {
maxSteps: 10,
writable: getWritable(),
});
await persistMessageParts({ chatId, messageId, parts });
await finishStream();
}Persisting Workflow Results
Save agent output using step functions. The assertChatAgentParts function validates that generic UIMessage["parts"] (returned by agents) match your application's specific tool and data types:
import type { UIMessage } from "ai";
import { insertMessageParts } from "@/lib/chat/queries";
import { assertChatAgentParts, type ChatAgentUIMessage } from "../types";
export async function persistMessageParts({
chatId,
messageId,
parts,
}: {
chatId: string;
messageId: string;
parts: UIMessage["parts"];
}): Promise<void> {
"use step";
assertChatAgentParts(parts);
await insertMessageParts(chatId, messageId, parts);
// Update chat timestamp
await db
.update(chats)
.set({ updatedAt: new Date() })
.where(eq(chats.id, chatId));
}import type { UIMessage } from "ai";
import { insertMessageParts } from "@/lib/chat/queries";
import { assertChatAgentParts, type ChatAgentUIMessage } from "../types";
export async function persistMessageParts({
chatId,
messageId,
parts,
}: {
chatId: string;
messageId: string;
parts: UIMessage["parts"];
}): Promise<void> {
"use step";
assertChatAgentParts(parts);
await insertMessageParts(chatId, messageId, parts);
// Update chat timestamp
await db
.update(chats)
.set({ updatedAt: new Date() })
.where(eq(chats.id, chatId));
}