Implement Working with Workflows
Create and run durable workflows with steps, streaming, and agent execution. Covers starting, resuming, and persisting workflow results.
See:
- Resource:
using-workflowsin Fullstack Recipes - URL: https://fullstackrecipes.com/recipes/using-workflows
Creating a Workflow
Define workflows with the "use workflow" directive:
export async function chatWorkflow({ chatId, userMessage }) {
"use workflow";
const history = await getMessageHistory(chatId);
const { parts } = await agent.run(history, {
writable: getWritable(),
});
await persistMessageParts({ chatId, parts });
}export async function chatWorkflow({ chatId, userMessage }) {
"use workflow";
const history = await getMessageHistory(chatId);
const { parts } = await agent.run(history, {
writable: getWritable(),
});
await persistMessageParts({ chatId, parts });
}Starting a Workflow
Use the start function from workflow/api:
import { start } from "workflow/api";
import { chatWorkflow } from "@/workflows/chat";
const run = await start(chatWorkflow, [{ chatId, userMessage }]);
// run.runId - unique identifier for this run
// run.readable - stream of UI message chunksimport { start } from "workflow/api";
import { chatWorkflow } from "@/workflows/chat";
const run = await start(chatWorkflow, [{ chatId, userMessage }]);
// run.runId - unique identifier for this run
// run.readable - stream of UI message chunksResuming a Workflow Stream
Use getRun to reconnect to an in-progress or completed workflow:
import { getRun } from "workflow/api";
const run = await getRun(runId);
const readable = await run.getReadable({ startIndex });import { getRun } from "workflow/api";
const run = await getRun(runId);
const readable = await run.getReadable({ startIndex });Using Steps
Steps are durable checkpoints that persist their results:
async function getMessageHistory(chatId: string) {
"use step";
return db.query.messages.findMany({
where: eq(messages.chatId, chatId),
});
}async function getMessageHistory(chatId: string) {
"use step";
return db.query.messages.findMany({
where: eq(messages.chatId, chatId),
});
}Streaming from Workflows
Use getWritable() to stream data to clients:
import { getWritable } from "workflow";
export async function chatWorkflow({ chatId }) {
"use workflow";
const writable = getWritable();
// Pass to agent for streaming
await agent.run(history, { writable });
}import { getWritable } from "workflow";
export async function chatWorkflow({ chatId }) {
"use workflow";
const writable = getWritable();
// Pass to agent for streaming
await agent.run(history, { writable });
}Getting Workflow Metadata
Access the current run's metadata:
import { getWorkflowMetadata } from "workflow";
export async function chatWorkflow({ chatId }) {
"use workflow";
const { workflowRunId } = getWorkflowMetadata();
// Store runId for resumption
await saveRunId(chatId, workflowRunId);
}import { getWorkflowMetadata } from "workflow";
export async function chatWorkflow({ chatId }) {
"use workflow";
const { workflowRunId } = getWorkflowMetadata();
// Store runId for resumption
await saveRunId(chatId, workflowRunId);
}Workflow-Safe Logging
The workflow runtime doesn't support Node.js modules. Wrap logger calls in steps:
import { logger } from "@/lib/common/logger";
export async function log(
level: "info" | "warn" | "error",
message: string,
data?: Record<string, unknown>,
): Promise<void> {
"use step";
if (data) {
logger[level](data, message);
} else {
logger[level](message);
}
}import { logger } from "@/lib/common/logger";
export async function log(
level: "info" | "warn" | "error",
message: string,
data?: Record<string, unknown>,
): Promise<void> {
"use step";
if (data) {
logger[level](data, message);
} else {
logger[level](message);
}
}Running Agents in Workflows
Use the custom Agent class for full streaming control:
import { getWritable } from "workflow";
import { researchAgent } from "@/lib/ai/research";
export async function chatWorkflow({ chatId, userMessage }) {
"use workflow";
const history = await getMessageHistory(chatId);
const { parts } = await researchAgent.run(history, {
maxSteps: 10,
writable: getWritable(),
});
await persistMessageParts({ chatId, parts });
}import { getWritable } from "workflow";
import { researchAgent } from "@/lib/ai/research";
export async function chatWorkflow({ chatId, userMessage }) {
"use workflow";
const history = await getMessageHistory(chatId);
const { parts } = await researchAgent.run(history, {
maxSteps: 10,
writable: getWritable(),
});
await persistMessageParts({ chatId, parts });
}Persisting Workflow Results
Save agent output to the database:
async function persistMessageParts({ chatId, parts }) {
"use step";
await db.insert(messages).values({
chatId,
role: "assistant",
parts: JSON.stringify(parts),
});
}async function persistMessageParts({ chatId, parts }) {
"use step";
await db.insert(messages).values({
chatId,
role: "assistant",
parts: JSON.stringify(parts),
});
}