import { BaseMessage } from "@langchain/core/messages";import { MessagesZodMeta } from "@langchain/langgraph";import { registry } from "@langchain/langgraph/zod";import * as z from "zod";const State = z.object({ messages: z.array(z.custom<BaseMessage>()).register(registry, MessagesZodMeta), extraField: z.number(),});
This is a versatile representation of state for applications involving chat models. LangGraph includes this pre-built MessagesZodMeta for convenience, so that we can have:
Copy
import { MessagesZodMeta } from "@langchain/langgraph";import { registry } from "@langchain/langgraph/zod";import * as z from "zod";const State = z.object({ messages: z .array(z.custom<BaseMessage>()) .register(registry, MessagesZodMeta), extraField: z.number(),});
import { StateGraph, START, END } from "@langchain/langgraph";import * as z from "zod";// Define the schema for the inputconst InputState = z.object({ question: z.string(),});// Define the schema for the outputconst OutputState = z.object({ answer: z.string(),});// Define the overall schema, combining both input and outputconst OverallState = InputState.merge(OutputState);// Build the graph with input and output schemas specifiedconst graph = new StateGraph({ input: InputState, output: OutputState, state: OverallState,}) .addNode("answerNode", (state) => { // Example answer and an extra key return { answer: "bye", question: state.question }; }) .addEdge(START, "answerNode") .addEdge("answerNode", END) .compile();// Invoke the graph with an input and print the resultconsole.log(await graph.invoke({ question: "hi" }));
Copy
{ answer: 'bye' }
Notice that the output of invoke only includes the output schema.
import { StateGraph, START, END } from "@langchain/langgraph";import * as z from "zod";// The overall state of the graph (this is the public state shared across nodes)const OverallState = z.object({ a: z.string(),});// Output from node1 contains private data that is not part of the overall stateconst Node1Output = z.object({ privateData: z.string(),});// The private data is only shared between node1 and node2const node1 = (state: z.infer<typeof OverallState>): z.infer<typeof Node1Output> => { const output = { privateData: "set by node1" }; console.log(`Entered node 'node1':\n\tInput: ${JSON.stringify(state)}.\n\tReturned: ${JSON.stringify(output)}`); return output;};// Node 2 input only requests the private data available after node1const Node2Input = z.object({ privateData: z.string(),});const node2 = (state: z.infer<typeof Node2Input>): z.infer<typeof OverallState> => { const output = { a: "set by node2" }; console.log(`Entered node 'node2':\n\tInput: ${JSON.stringify(state)}.\n\tReturned: ${JSON.stringify(output)}`); return output;};// Node 3 only has access to the overall state (no access to private data from node1)const node3 = (state: z.infer<typeof OverallState>): z.infer<typeof OverallState> => { const output = { a: "set by node3" }; console.log(`Entered node 'node3':\n\tInput: ${JSON.stringify(state)}.\n\tReturned: ${JSON.stringify(output)}`); return output;};// Connect nodes in a sequence// node2 accepts private data from node1, whereas// node3 does not see the private data.const graph = new StateGraph({ state: OverallState, nodes: { node1: { action: node1, output: Node1Output }, node2: { action: node2, input: Node2Input }, node3: { action: node3 }, }}) .addEdge(START, "node1") .addEdge("node1", "node2") .addEdge("node2", "node3") .addEdge("node3", END) .compile();// Invoke the graph with the initial stateconst response = await graph.invoke({ a: "set at start" });console.log(`\nOutput of graph invocation: ${JSON.stringify(response)}`);
Copy
Entered node 'node1': ut: {"a":"set at start"}. urned: {"privateData":"set by node1"}Entered node 'node2': ut: {"privateData":"set by node1"}. urned: {"a":"set by node2"}Entered node 'node3': ut: {"a":"set by node2"}. urned: {"a":"set by node3"}Output of graph invocation: {"a":"set by node3"}
在许多用例中,您可能希望节点具有自定义重试策略,例如,如果您正在调用 API、查询数据库或调用 LLM 等。LangGraph 允许您向节点添加重试策略。To configure a retry policy, pass the retryPolicy parameter to the addNode. The retryPolicy parameter takes in a RetryPolicy object. Below we instantiate a RetryPolicy object with the default parameters and associate it with a node:
Copy
import { RetryPolicy } from "@langchain/langgraph";const graph = new StateGraph(State) .addNode("nodeName", nodeFunction, { retryPolicy: {} }) .compile();
By default, the retry policy retries on any exception except for the following:
TypeError
SyntaxError
ReferenceError
Extended example: customizing retry policies
Consider an example in which we are reading from a SQL database. Below we pass two different retry policies to nodes:
Copy
import Database from "better-sqlite3";import { ChatAnthropic } from "@langchain/anthropic";import { StateGraph, START, END, MessagesZodMeta } from "@langchain/langgraph";import { registry } from "@langchain/langgraph/zod";import { AIMessage, BaseMessage } from "@langchain/core/messages";import * as z from "zod";const MessagesZodState = z.object({ messages: z .array(z.custom<BaseMessage>()) .register(registry, MessagesZodMeta),});// Create an in-memory databaseconst db: typeof Database.prototype = new Database(":memory:");const model = new ChatAnthropic({ model: "claude-3-5-sonnet-20240620" });const callModel = async (state: z.infer<typeof MessagesZodState>) => { const response = await model.invoke(state.messages); return { messages: [response] };};const queryDatabase = async (state: z.infer<typeof MessagesZodState>) => { const queryResult: string = JSON.stringify( db.prepare("SELECT * FROM Artist LIMIT 10;").all(), ); return { messages: [new AIMessage({ content: "queryResult" })] };};const workflow = new StateGraph(MessagesZodState) // Define the two nodes we will cycle between .addNode("call_model", callModel, { retryPolicy: { maxAttempts: 5 } }) .addNode("query_database", queryDatabase, { retryPolicy: { retryOn: (e: any): boolean => { if (e instanceof Database.SqliteError) { // Retry on "SQLITE_BUSY" error return e.code === "SQLITE_BUSY"; } return false; // Don't retry on other errors }, }, }) .addEdge(START, "call_model") .addEdge("call_model", "query_database") .addEdge("query_database", END);const graph = workflow.compile();
Why split application steps into a sequence with LangGraph?
LangGraph makes it easy to add an underlying persistence layer to your application.
This allows state to be checkpointed in between the execution of nodes, so your LangGraph nodes govern:
How we can “rewind” and branch-off executions using LangGraph’s time travel features
They also determine how execution steps are streamed, and how your application is visualized and debugged using Studio.Let’s demonstrate an end-to-end example. We will create a sequence of three steps:
Populate a value in a key of the state
Update the same value
Populate a different value
Let’s first define our state. This governs the schema of the graph, and can also specify how to apply updates. See this section for more detail.In our case, we will just keep track of two values:
Copy
import * as z from "zod";const State = z.object({ value1: z.string(), value2: z.number(),});
Our nodes are just TypeScript functions that read our graph’s state and make updates to it. The first argument to this function will always be the state:
在此示例中,我们从 Node A 扇出到 B 和 C,然后扇入到 D。对于我们的状态,我们指定 reducer add 操作。这将合并或累积 State 中特定键的值,而不是简单地覆盖现有值。对于列表,这意味着将新列表与现有列表连接。有关使用 reducer 更新状态的更多详细信息,请参阅上面关于状态 reducer的部分。
Copy
import { StateGraph, START, END } from "@langchain/langgraph";import { registry } from "@langchain/langgraph/zod";import * as z from "zod";const State = z.object({ // The reducer makes this append-only aggregate: z.array(z.string()).register(registry, { reducer: { fn: (x, y) => x.concat(y), }, default: () => [] as string[], }),});const nodeA = (state: z.infer<typeof State>) => { console.log(`Adding "A" to ${state.aggregate}`); return { aggregate: ["A"] };};const nodeB = (state: z.infer<typeof State>) => { console.log(`Adding "B" to ${state.aggregate}`); return { aggregate: ["B"] };};const nodeC = (state: z.infer<typeof State>) => { console.log(`Adding "C" to ${state.aggregate}`); return { aggregate: ["C"] };};const nodeD = (state: z.infer<typeof State>) => { console.log(`Adding "D" to ${state.aggregate}`); return { aggregate: ["D"] };};const graph = new StateGraph(State) .addNode("a", nodeA) .addNode("b", nodeB) .addNode("c", nodeC) .addNode("d", nodeD) .addEdge(START, "a") .addEdge("a", "b") .addEdge("a", "c") .addEdge("b", "d") .addEdge("c", "d") .addEdge("d", END) .compile();
Copy
import * as fs from "node:fs/promises";const drawableGraph = await graph.getGraphAsync();const image = await drawableGraph.drawMermaidPng();const imageBuffer = new Uint8Array(await image.arrayBuffer());await fs.writeFile("graph.png", imageBuffer);
With the reducer, you can see that the values added in each node are accumulated.
Copy
const result = await graph.invoke({ aggregate: [],});console.log(result);
Copy
Adding "A" to []Adding "B" to ['A']Adding "C" to ['A']Adding "D" to ['A', 'B', 'C']{ aggregate: ['A', 'B', 'C', 'D'] }
In the above example, nodes "b" and "c" are executed concurrently in the same superstep. Because they are in the same step, node "d" executes after both "b" and "c" are finished.Importantly, updates from a parallel superstep may not be ordered consistently. If you need a consistent, predetermined ordering of updates from a parallel superstep, you should write the outputs to a separate field in the state together with a value with which to order them.
Exception handling?
LangGraph executes nodes within supersteps, meaning that while parallel branches are executed in parallel, the entire superstep is transactional. If any of these branches raises an exception, none of the updates are applied to the state (the entire superstep errors).Importantly, when using a checkpointer, results from successful nodes within a superstep are saved, and don’t repeat when resumed.If you have error-prone (perhaps want to handle flakey API calls), LangGraph provides two ways to address this:
You can write regular python code within your node to catch and handle exceptions.
You can set a retry_policy to direct the graph to retry nodes that raise certain types of exceptions. Only failing branches are retried, so you needn’t worry about performing redundant work.
Together, these let you perform parallel execution and fully control exception handling.
Set max concurrency
You can control the maximum number of concurrent tasks by setting max_concurrency in the configuration when invoking the graph.
If your fan-out should vary at runtime based on the state, you can use addConditionalEdges to select one or more paths using the graph state. See example below, where node a generates a state update that determines the following node.
Copy
import { StateGraph, START, END } from "@langchain/langgraph";import { registry } from "@langchain/langgraph/zod";import * as z from "zod";const State = z.object({ aggregate: z.array(z.string()).register(registry, { reducer: { fn: (x, y) => x.concat(y), }, default: () => [] as string[], }), // Add a key to the state. We will set this key to determine // how we branch. which: z.string().register(registry, { reducer: { fn: (x, y) => y ?? x, }, }),});const nodeA = (state: z.infer<typeof State>) => { console.log(`Adding "A" to ${state.aggregate}`); return { aggregate: ["A"], which: "c" };};const nodeB = (state: z.infer<typeof State>) => { console.log(`Adding "B" to ${state.aggregate}`); return { aggregate: ["B"] };};const nodeC = (state: z.infer<typeof State>) => { console.log(`Adding "C" to ${state.aggregate}`); return { aggregate: ["C"] }; };const conditionalEdge = (state: z.infer<typeof State>): "b" | "c" => { // Fill in arbitrary logic here that uses the state // to determine the next node return state.which as "b" | "c";};const graph = new StateGraph(State) .addNode("a", nodeA) .addNode("b", nodeB) .addNode("c", nodeC) .addEdge(START, "a") .addEdge("b", END) .addEdge("c", END) .addConditionalEdges("a", conditionalEdge) .compile();
Copy
import * as fs from "node:fs/promises";const drawableGraph = await graph.getGraphAsync();const image = await drawableGraph.drawMermaidPng();const imageBuffer = new Uint8Array(await image.arrayBuffer());await fs.writeFile("graph.png", imageBuffer);
Copy
const result = await graph.invoke({ aggregate: [] });console.log(result);
Copy
Adding "A" to []Adding "C" to ['A']{ aggregate: ['A', 'C'], which: 'c' }
Your conditional edges can route to multiple destination nodes. For example:
LangGraph 支持使用 Send API 进行 map-reduce 和其他高级分支模式。以下是如何使用它的示例:
Copy
import { StateGraph, START, END, Send } from "@langchain/langgraph";import { registry } from "@langchain/langgraph/zod";import * as z from "zod";const OverallState = z.object({ topic: z.string(), subjects: z.array(z.string()), jokes: z.array(z.string()).register(registry, { reducer: { fn: (x, y) => x.concat(y), }, }), bestSelectedJoke: z.string(),});const generateTopics = (state: z.infer<typeof OverallState>) => { return { subjects: ["lions", "elephants", "penguins"] };};const generateJoke = (state: { subject: string }) => { const jokeMap: Record<string, string> = { lions: "Why don't lions like fast food? Because they can't catch it!", elephants: "Why don't elephants use computers? They're afraid of the mouse!", penguins: "Why don't penguins like talking to strangers at parties? Because they find it hard to break the ice." }; return { jokes: [jokeMap[state.subject]] };};const continueToJokes = (state: z.infer<typeof OverallState>) => { return state.subjects.map((subject) => new Send("generateJoke", { subject }));};const bestJoke = (state: z.infer<typeof OverallState>) => { return { bestSelectedJoke: "penguins" };};const graph = new StateGraph(OverallState) .addNode("generateTopics", generateTopics) .addNode("generateJoke", generateJoke) .addNode("bestJoke", bestJoke) .addEdge(START, "generateTopics") .addConditionalEdges("generateTopics", continueToJokes) .addEdge("generateJoke", "bestJoke") .addEdge("bestJoke", END) .compile();
Copy
import * as fs from "node:fs/promises";const drawableGraph = await graph.getGraphAsync();const image = await drawableGraph.drawMermaidPng();const imageBuffer = new Uint8Array(await image.arrayBuffer());await fs.writeFile("graph.png", imageBuffer);
Copy
// Call the graph: here we call it to generate a list of jokesfor await (const step of await graph.stream({ topic: "animals" })) { console.log(step);}
Copy
{ generateTopics: { subjects: [ 'lions', 'elephants', 'penguins' ] } }{ generateJoke: { jokes: [ "Why don't lions like fast food? Because they can't catch it!" ] } }{ generateJoke: { jokes: [ "Why don't elephants use computers? They're afraid of the mouse!" ] } }{ generateJoke: { jokes: [ "Why don't penguins like talking to strangers at parties? Because they find it hard to break the ice." ] } }{ bestJoke: { bestSelectedJoke: 'penguins' } }
创建带循环的图时,我们需要一种终止执行的机制。最常见的方法是通过添加条件边,一旦达到某个终止条件,该边就会路由到 END 节点。您还可以在调用或流式传输图时设置图递归限制。递归限制设置图在引发错误之前允许执行的超级步骤数。有关递归限制概念的更多信息,请参阅此处。让我们考虑一个带循环的简单图,以更好地理解这些机制的工作原理。
To return the last value of your state instead of receiving a recursion limit error, see the next section.
When creating a loop, you can include a conditional edge that specifies a termination condition:
import { StateGraph, START, END } from "@langchain/langgraph";import { registry } from "@langchain/langgraph/zod";import * as z from "zod";const State = z.object({ // The reducer makes this append-only aggregate: z.array(z.string()).register(registry, { reducer: { fn: (x, y) => x.concat(y), }, default: () => [] as string[], }),});const nodeA = (state: z.infer<typeof State>) => { console.log(`Node A sees ${state.aggregate}`); return { aggregate: ["A"] };};const nodeB = (state: z.infer<typeof State>) => { console.log(`Node B sees ${state.aggregate}`); return { aggregate: ["B"] };};// Define edgesconst route = (state: z.infer<typeof State>): "b" | typeof END => { if (state.aggregate.length < 7) { return "b"; } else { return END; }};const graph = new StateGraph(State) .addNode("a", nodeA) .addNode("b", nodeB) .addEdge(START, "a") .addConditionalEdges("a", route) .addEdge("b", "a") .compile();
Copy
import * as fs from "node:fs/promises";const drawableGraph = await graph.getGraphAsync();const image = await drawableGraph.drawMermaidPng();const imageBuffer = new Uint8Array(await image.arrayBuffer());await fs.writeFile("graph.png", imageBuffer);
This architecture is similar to a ReAct agent in which node "a" is a tool-calling model, and node "b" represents the tools.In our route conditional edge, we specify that we should end after the "aggregate" list in the state passes a threshold length.Invoking the graph, we see that we alternate between nodes "a" and "b" before terminating once we reach the termination condition.
Copy
const result = await graph.invoke({ aggregate: [] });console.log(result);
import { Command } from "@langchain/langgraph";const myNode = (state: State): Command => { return new Command({ # state update update: { foo: "bar" }, # control flow goto: "myOtherNode" });};
We show an end-to-end example below. Let’s create a simple graph with 3 nodes: A, B and C. We will first execute node A, and then decide whether to go to Node B or Node C next based on the output of node A.
Copy
import { StateGraph, START, Command } from "@langchain/langgraph";import * as z from "zod";// Define graph stateconst State = z.object({ foo: z.string(),});// Define the nodesconst nodeA = (state: z.infer<typeof State>): Command => { console.log("Called A"); const value = Math.random() > 0.5 ? "b" : "c"; // this is a replacement for a conditional edge function const goto = value === "b" ? "nodeB" : "nodeC"; // 请注意 Command 如何允许您同时更新图状态并路由到下一个节点 return new Command({ // this is the state update update: { foo: value }, // this is a replacement for an edge goto, });};const nodeB = (state: z.infer<typeof State>) => { console.log("Called B"); return { foo: state.foo + "b" };};const nodeC = (state: z.infer<typeof State>) => { console.log("Called C"); return { foo: state.foo + "c" };};
We can now create the StateGraph with the above nodes. Notice that the graph doesn’t have conditional edges for routing! This is because control flow is defined with Command inside nodeA.
You might have noticed that we used ends to specify which nodes nodeA can navigate to. This is necessary for the graph rendering and tells LangGraph that nodeA can navigate to nodeB and nodeC.
Copy
import * as fs from "node:fs/promises";const drawableGraph = await graph.getGraphAsync();const image = await drawableGraph.drawMermaidPng();const imageBuffer = new Uint8Array(await image.arrayBuffer());await fs.writeFile("graph.png", imageBuffer);
If we run the graph multiple times, we’d see it take different paths (A -> B or A -> C) based on the random choice in node A.
Copy
const result = await graph.invoke({ foo: "" });console.log(result);
一个常见的用例是从工具内部更新图状态。例如,在客户支持应用程序中,您可能希望在对话开始时根据客户的帐号或 ID 查找客户信息。要从工具更新图状态,您可以从工具返回 Command(update={"my_custom_key": "foo", "messages": [...]}):
Copy
import { tool } from "@langchain/core/tools";import { Command } from "@langchain/langgraph";import * as z from "zod";const lookupUserInfo = tool( async (input, config) => { const userId = config.configurable?.userId; const userInfo = getUserInfo(userId); return new Command({ update: { # update the state keys userInfo: userInfo, # update the message history messages: [{ role: "tool", content: "Successfully looked up user information", tool_call_id: config.toolCall.id }] } }); }, { name: "lookupUserInfo", description: "Use this to look up user information to better assist them with their questions.", schema: z.object({}), });
You MUST include messages (or any state key used for the message history) in Command.update when returning Command from a tool and the list of messages in messages MUST contain a ToolMessage. This is necessary for the resulting message history to be valid (LLM providers require AI messages with tool calls to be followed by the tool result messages).
If you are using tools that update state via Command, we recommend using prebuilt ToolNode which automatically handles tools returning Command objects and propagates them to the graph state. If you’re writing a custom node that calls tools, you would need to manually propagate Command objects returned by the tools as the update from the node.