本指南演示 LangGraph 图 API 的基础知识。它详细介绍了状态,以及组合常见图结构,如序列分支循环。它还涵盖了 LangGraph 的控制功能,包括用于 map-reduce 工作流程的 Send API 和用于组合状态更新与节点之间”跳转”的 Command API

设置

安装 langgraph
npm install @langchain/langgraph
设置 LangSmith 以获得更好的调试 注册 LangSmith 以快速发现问题并提高 LangGraph 项目的性能。LangSmith 让您使用跟踪数据来调试、测试和监控使用 LangGraph 构建的 LLM 应用程序 — 在文档中阅读更多关于如何开始的信息。

定义和更新状态

这里我们展示如何在 LangGraph 中定义和更新状态。我们将演示:
  1. 如何使用状态定义图的模式
  2. 如何使用 reducers 控制状态更新的处理方式。

定义状态

LangGraph 中的状态可以使用 Zod 模式定义。下面我们将使用 Zod。有关使用替代方法的详细信息,请参阅此部分 默认情况下,图将具有相同的输入和输出模式,状态决定该模式。有关如何定义不同的输入和输出模式,请参阅此部分 让我们考虑使用消息的一个简单示例。这代表了许多 LLM 应用程序的通用状态表述。有关更多详细信息,请参阅我们的概念页面
import { BaseMessage } from "@langchain/core/messages";
import { MessagesZodMeta } from "@langchain/langgraph";
import { registry } from "@langchain/langgraph/zod";
import * as z from "zod";

const State = z.object({
  messages: z.array(z.custom<BaseMessage>()).register(registry, MessagesZodMeta),
  extraField: z.number(),
});
此状态跟踪消息对象列表以及一个额外的整数字段。

更新状态

让我们构建一个包含单个节点的示例图。我们的节点只是一个读取图状态并对其进行更新的 TypeScript 函数。此函数的第一个参数始终是状态:
import { AIMessage } from "@langchain/core/messages";

const node = (state: z.infer<typeof State>) => {
  const messages = state.messages;
  const newMessage = new AIMessage("Hello!");
  return { messages: messages.concat([newMessage]), extraField: 10 };
};
此节点简单地将消息追加到我们的消息列表,并填充一个额外字段。
节点应该直接返回状态更新,而不是改变状态。
接下来,让我们定义一个包含此节点的简单图。我们使用 StateGraph 定义一个在此状态上操作的图。然后我们使用 addNode 填充我们的图。
import { StateGraph } from "@langchain/langgraph";

const graph = new StateGraph(State)
  .addNode("node", node)
  .addEdge("__start__", "node")
  .compile();
LangGraph 提供了用于可视化图的内置实用程序。让我们检查我们的图。有关可视化的详细信息,请参阅此部分
import * as fs from "node:fs/promises";

const drawableGraph = await graph.getGraphAsync();
const image = await drawableGraph.drawMermaidPng();
const imageBuffer = new Uint8Array(await image.arrayBuffer());

await fs.writeFile("graph.png", imageBuffer);
In this case, our graph just executes a single node. Let’s proceed with a simple invocation:
import { HumanMessage } from "@langchain/core/messages";

const result = await graph.invoke({ messages: [new HumanMessage("Hi")], extraField: 0 });
console.log(result);
{ messages: [HumanMessage { content: 'Hi' }, AIMessage { content: 'Hello!' }], extraField: 10 }
请注意:
  • 我们通过更新状态中的单个键来启动调用。
  • 调用结果会返回整个状态。
为方便起见,我们经常通过日志记录检查消息对象的内容:
for (const message of result.messages) {
  console.log(`${message.getType()}: ${message.content}`);
}
human: Hi
ai: Hello!

使用 reducer 处理状态更新

状态中的每个键都可以有自己的独立 reducer 函数,它控制如何应用来自节点的更新。如果未明确指定 reducer 函数,则假定对该键的所有更新都应覆盖它。 对于 Zod 状态模式,我们可以通过在模式字段上使用特殊的 .langgraph.reducer() 方法来定义 reducer。 在 earlier 示例中,我们的节点通过向其追加消息来更新状态中的 "messages" 键。下面,我们向此键添加一个 reducer,以便自动追加更新:
import "@langchain/langgraph/zod";

const State = z.object({
  messages: z.array(z.custom<BaseMessage>()).langgraph.reducer((x, y) => x.concat(y)),  
  extraField: z.number(),
});
Now our node can be simplified:
const node = (state: z.infer<typeof State>) => {
  const newMessage = new AIMessage("Hello!");
  return { messages: [newMessage], extraField: 10 };  
};
import { START } from "@langchain/langgraph";

const graph = new StateGraph(State)
  .addNode("node", node)
  .addEdge(START, "node")
  .compile();

const result = await graph.invoke({ messages: [new HumanMessage("Hi")] });

for (const message of result.messages) {
  console.log(`${message.getType()}: ${message.content}`);
}
human: Hi
ai: Hello!

MessagesState

在实践中,更新消息列表还有其他考虑因素:
  • 我们可能希望更新状态中的现有消息。
  • 我们可能希望接受消息格式的简写,例如 OpenAI 格式
LangGraph 包含一个内置的 MessagesZodMeta,它处理这些考虑因素:
import { MessagesZodMeta } from "@langchain/langgraph";
import { registry } from "@langchain/langgraph/zod";
import * as z from "zod";

const State = z.object({  
  messages: z
    .array(z.custom<BaseMessage>())
    .register(registry, MessagesZodMeta),
  extraField: z.number(),
});

const graph = new StateGraph(State)
  .addNode("node", (state) => {
    const newMessage = new AIMessage("Hello!");
    return { messages: [newMessage], extraField: 10 };
  })
  .addEdge(START, "node")
  .compile();
const inputMessage = { role: "user", content: "Hi" };  

const result = await graph.invoke({ messages: [inputMessage] });

for (const message of result.messages) {
  console.log(`${message.getType()}: ${message.content}`);
}
human: Hi
ai: Hello!
This is a versatile representation of state for applications involving chat models. LangGraph includes this pre-built MessagesZodMeta for convenience, so that we can have:
import { MessagesZodMeta } from "@langchain/langgraph";
import { registry } from "@langchain/langgraph/zod";
import * as z from "zod";

const State = z.object({
  messages: z
    .array(z.custom<BaseMessage>())
    .register(registry, MessagesZodMeta),
  extraField: z.number(),
});

定义输入和输出模式

默认情况下,StateGraph 使用单个模式运行,所有节点都期望使用该模式进行通信。但是,也可以为图定义不同的输入和输出模式。 当指定不同的模式时,内部模式仍将用于节点之间的通信。输入模式确保提供的输入匹配预期结构,而输出模式根据定义的输出模式过滤内部数据,仅返回相关信息。 下面,我们将看到如何定义不同的输入和输出模式。
import { StateGraph, START, END } from "@langchain/langgraph";
import * as z from "zod";

// Define the schema for the input
const InputState = z.object({
  question: z.string(),
});

// Define the schema for the output
const OutputState = z.object({
  answer: z.string(),
});

// Define the overall schema, combining both input and output
const OverallState = InputState.merge(OutputState);

// Build the graph with input and output schemas specified
const graph = new StateGraph({
  input: InputState,
  output: OutputState,
  state: OverallState,
})
  .addNode("answerNode", (state) => {
    // Example answer and an extra key
    return { answer: "bye", question: state.question };
  })
  .addEdge(START, "answerNode")
  .addEdge("answerNode", END)
  .compile();

// Invoke the graph with an input and print the result
console.log(await graph.invoke({ question: "hi" }));
{ answer: 'bye' }
Notice that the output of invoke only includes the output schema.

在节点之间传递私有状态

在某些情况下,您可能希望节点交换对中间逻辑至关重要但不需要成为图主模式一部分的信息。此私有数据与图的整体输入/输出无关,应仅在特定节点之间共享。 下面,我们将创建一个由三个节点(node_1、node_2 和 node_3)组成的示例顺序图,其中私有数据在前两个步骤(node_1 和 node_2)之间传递,而第三步(node_3)只能访问公共整体状态。
import { StateGraph, START, END } from "@langchain/langgraph";
import * as z from "zod";

// The overall state of the graph (this is the public state shared across nodes)
const OverallState = z.object({
  a: z.string(),
});

// Output from node1 contains private data that is not part of the overall state
const Node1Output = z.object({
  privateData: z.string(),
});

// The private data is only shared between node1 and node2
const node1 = (state: z.infer<typeof OverallState>): z.infer<typeof Node1Output> => {
  const output = { privateData: "set by node1" };
  console.log(`Entered node 'node1':\n\tInput: ${JSON.stringify(state)}.\n\tReturned: ${JSON.stringify(output)}`);
  return output;
};

// Node 2 input only requests the private data available after node1
const Node2Input = z.object({
  privateData: z.string(),
});

const node2 = (state: z.infer<typeof Node2Input>): z.infer<typeof OverallState> => {
  const output = { a: "set by node2" };
  console.log(`Entered node 'node2':\n\tInput: ${JSON.stringify(state)}.\n\tReturned: ${JSON.stringify(output)}`);
  return output;
};

// Node 3 only has access to the overall state (no access to private data from node1)
const node3 = (state: z.infer<typeof OverallState>): z.infer<typeof OverallState> => {
  const output = { a: "set by node3" };
  console.log(`Entered node 'node3':\n\tInput: ${JSON.stringify(state)}.\n\tReturned: ${JSON.stringify(output)}`);
  return output;
};

// Connect nodes in a sequence
// node2 accepts private data from node1, whereas
// node3 does not see the private data.
const graph = new StateGraph({
  state: OverallState,
  nodes: {
    node1: { action: node1, output: Node1Output },
    node2: { action: node2, input: Node2Input },
    node3: { action: node3 },
  }
})
  .addEdge(START, "node1")
  .addEdge("node1", "node2")
  .addEdge("node2", "node3")
  .addEdge("node3", END)
  .compile();

// Invoke the graph with the initial state
const response = await graph.invoke({ a: "set at start" });

console.log(`\nOutput of graph invocation: ${JSON.stringify(response)}`);
Entered node 'node1':
    ut: {"a":"set at start"}.
    urned: {"privateData":"set by node1"}
Entered node 'node2':
    ut: {"privateData":"set by node1"}.
    urned: {"a":"set by node2"}
Entered node 'node3':
    ut: {"a":"set by node2"}.
    urned: {"a":"set by node3"}

Output of graph invocation: {"a":"set by node3"}

替代状态定义

虽然 Zod 模式是推荐的方法,但 LangGraph 也支持其他定义状态模式的方式:
import { BaseMessage } from "@langchain/core/messages";
import { StateGraph } from "@langchain/langgraph";

interface WorkflowChannelsState {
  messages: BaseMessage[];
  question: string;
  answer: string;
}

const workflowWithChannels = new StateGraph<WorkflowChannelsState>({
  channels: {
    messages: {
      reducer: (currentState, updateValue) => currentState.concat(updateValue),
      default: () => [],
    },
    question: null,
    answer: null,
  },
});

添加运行时配置

有时您希望在调用图时能够配置它。例如,您可能希望在运行时指定要使用的 LLM 或系统提示,而不会用这些参数污染图状态 要添加运行时配置:
  1. 为您的配置指定一个模式
  2. 将配置添加到节点或条件边的函数签名中
  3. 将配置传递给图。
See below for a simple example:
import { StateGraph, END, START } from "@langchain/langgraph";
import * as z from "zod";

// 1. Specify config schema
const ContextSchema = z.object({
  myRuntimeValue: z.string(),
});

// 2. Define a graph that accesses the config in a node
const StateSchema = z.object({
  myStateValue: z.number(),
});

const graph = new StateGraph(StateSchema, ContextSchema)
  .addNode("node", (state, runtime) => {
    if (runtime?.context?.myRuntimeValue === "a") {  
      return { myStateValue: 1 };
    } else if (runtime?.context?.myRuntimeValue === "b") {  
      return { myStateValue: 2 };
    } else {
      throw new Error("Unknown values.");
    }
  })
  .addEdge(START, "node")
  .addEdge("node", END)
  .compile();

// 3. Pass in configuration at runtime:
console.log(await graph.invoke({}, { context: { myRuntimeValue: "a" } }));  
console.log(await graph.invoke({}, { context: { myRuntimeValue: "b" } }));  
{ myStateValue: 1 }
{ myStateValue: 2 }
Below we demonstrate a practical example in which we configure what LLM to use at runtime. We will use both OpenAI and Anthropic models.
import { ChatOpenAI } from "@langchain/openai";
import { ChatAnthropic } from "@langchain/anthropic";
import { BaseMessage } from "@langchain/core/messages";
import { MessagesZodMeta, StateGraph, START, END } from "@langchain/langgraph";
import { registry } from "@langchain/langgraph/zod";
import { RunnableConfig } from "@langchain/core/runnables";
import * as z from "zod";

const ConfigSchema = z.object({
  modelProvider: z.string().default("anthropic"),
});

const MessagesZodState = z.object({
  messages: z
    .array(z.custom<BaseMessage>())
    .register(registry, MessagesZodMeta),
});

const MODELS = {
  anthropic: new ChatAnthropic({ model: "claude-haiku-4-5-20251001" }),
  openai: new ChatOpenAI({ model: "gpt-4o-mini" }),
};

const graph = new StateGraph(MessagesZodState, ConfigSchema)
  .addNode("model", async (state, config) => {
    const modelProvider = config?.configurable?.modelProvider || "anthropic";
    const model = MODELS[modelProvider as keyof typeof MODELS];
    const response = await model.invoke(state.messages);
    return { messages: [response] };
  })
  .addEdge(START, "model")
  .addEdge("model", END)
  .compile();

// Usage
const inputMessage = { role: "user", content: "hi" };
// With no configuration, uses default (Anthropic)
const response1 = await graph.invoke({ messages: [inputMessage] });
// Or, can set OpenAI
const response2 = await graph.invoke(
  { messages: [inputMessage] },
  { configurable: { modelProvider: "openai" } },
);

console.log(response1.messages.at(-1)?.response_metadata?.model);
console.log(response2.messages.at(-1)?.response_metadata?.model);
claude-haiku-4-5-20251001
gpt-4o-mini-2024-07-18
Below we demonstrate a practical example in which we configure two parameters: the LLM and system message to use at runtime.
import { ChatOpenAI } from "@langchain/openai";
import { ChatAnthropic } from "@langchain/anthropic";
import { SystemMessage, BaseMessage } from "@langchain/core/messages";
import { MessagesZodMeta, StateGraph, START, END } from "@langchain/langgraph";
import { registry } from "@langchain/langgraph/zod";
import * as z from "zod";

const ConfigSchema = z.object({
  modelProvider: z.string().default("anthropic"),
  systemMessage: z.string().optional(),
});

const MessagesZodState = z.object({
  messages: z
    .array(z.custom<BaseMessage>())
    .register(registry, MessagesZodMeta),
});

const MODELS = {
  anthropic: new ChatAnthropic({ model: "claude-haiku-4-5-20251001" }),
  openai: new ChatOpenAI({ model: "gpt-4o-mini" }),
};

const graph = new StateGraph(MessagesZodState, ConfigSchema)
  .addNode("model", async (state, config) => {
    const modelProvider = config?.configurable?.modelProvider || "anthropic";
    const systemMessage = config?.configurable?.systemMessage;

    const model = MODELS[modelProvider as keyof typeof MODELS];
    let messages = state.messages;

    if (systemMessage) {
      messages = [new SystemMessage(systemMessage), ...messages];
    }

    const response = await model.invoke(messages);
    return { messages: [response] };
  })
  .addEdge(START, "model")
  .addEdge("model", END)
  .compile();

// Usage
const inputMessage = { role: "user", content: "hi" };
const response = await graph.invoke(
  { messages: [inputMessage] },
  {
    configurable: {
      modelProvider: "openai",
      systemMessage: "Respond in Italian."
    }
  }
);

for (const message of response.messages) {
  console.log(`${message.getType()}: ${message.content}`);
}
human: hi
ai: Ciao! Come posso aiutarti oggi?

添加重试策略

在许多用例中,您可能希望节点具有自定义重试策略,例如,如果您正在调用 API、查询数据库或调用 LLM 等。LangGraph 允许您向节点添加重试策略。 To configure a retry policy, pass the retryPolicy parameter to the addNode. The retryPolicy parameter takes in a RetryPolicy object. Below we instantiate a RetryPolicy object with the default parameters and associate it with a node:
import { RetryPolicy } from "@langchain/langgraph";

const graph = new StateGraph(State)
  .addNode("nodeName", nodeFunction, { retryPolicy: {} })
  .compile();
By default, the retry policy retries on any exception except for the following:
  • TypeError
  • SyntaxError
  • ReferenceError
Consider an example in which we are reading from a SQL database. Below we pass two different retry policies to nodes:
import Database from "better-sqlite3";
import { ChatAnthropic } from "@langchain/anthropic";
import { StateGraph, START, END, MessagesZodMeta } from "@langchain/langgraph";
import { registry } from "@langchain/langgraph/zod";
import { AIMessage, BaseMessage } from "@langchain/core/messages";
import * as z from "zod";

const MessagesZodState = z.object({
  messages: z
    .array(z.custom<BaseMessage>())
    .register(registry, MessagesZodMeta),
});

// Create an in-memory database
const db: typeof Database.prototype = new Database(":memory:");

const model = new ChatAnthropic({ model: "claude-3-5-sonnet-20240620" });

const callModel = async (state: z.infer<typeof MessagesZodState>) => {
  const response = await model.invoke(state.messages);
  return { messages: [response] };
};

const queryDatabase = async (state: z.infer<typeof MessagesZodState>) => {
  const queryResult: string = JSON.stringify(
    db.prepare("SELECT * FROM Artist LIMIT 10;").all(),
  );

  return { messages: [new AIMessage({ content: "queryResult" })] };
};

const workflow = new StateGraph(MessagesZodState)
  // Define the two nodes we will cycle between
  .addNode("call_model", callModel, { retryPolicy: { maxAttempts: 5 } })
  .addNode("query_database", queryDatabase, {
    retryPolicy: {
      retryOn: (e: any): boolean => {
        if (e instanceof Database.SqliteError) {
          // Retry on "SQLITE_BUSY" error
          return e.code === "SQLITE_BUSY";
        }
        return false; // Don't retry on other errors
      },
    },
  })
  .addEdge(START, "call_model")
  .addEdge("call_model", "query_database")
  .addEdge("query_database", END);

const graph = workflow.compile();

创建步骤序列

先决条件 本指南假设您熟悉上面关于状态的部分。
这里我们演示如何构建一个简单的步骤序列。我们将展示:
  1. 如何构建顺序图
  2. 用于构建类似图的内置简写。
To add a sequence of nodes, we use the .addNode and .addEdge methods of our graph:
import { START, StateGraph } from "@langchain/langgraph";

const builder = new StateGraph(State)
  .addNode("step1", step1)
  .addNode("step2", step2)
  .addNode("step3", step3)
  .addEdge(START, "step1")
  .addEdge("step1", "step2")
  .addEdge("step2", "step3");
LangGraph makes it easy to add an underlying persistence layer to your application. This allows state to be checkpointed in between the execution of nodes, so your LangGraph nodes govern:They also determine how execution steps are streamed, and how your application is visualized and debugged using Studio.Let’s demonstrate an end-to-end example. We will create a sequence of three steps:
  1. Populate a value in a key of the state
  2. Update the same value
  3. Populate a different value
Let’s first define our state. This governs the schema of the graph, and can also specify how to apply updates. See this section for more detail.In our case, we will just keep track of two values:
import * as z from "zod";

const State = z.object({
  value1: z.string(),
  value2: z.number(),
});
Our nodes are just TypeScript functions that read our graph’s state and make updates to it. The first argument to this function will always be the state:
const step1 = (state: z.infer<typeof State>) => {
  return { value1: "a" };
};

const step2 = (state: z.infer<typeof State>) => {
  const currentValue1 = state.value1;
  return { value1: `${currentValue1} b` };
};

const step3 = (state: z.infer<typeof State>) => {
  return { value2: 10 };
};
请注意,在向状态发出更新时,每个节点只需指定它希望更新的键的值即可。默认情况下,这会覆盖相应键的值。您也可以使用归约器来控制如何处理更新——例如,您可以选择将连续的更新追加到某个键上。详细信息请参见这一节
Finally, we define the graph. We use StateGraph to define a graph that operates on this state.We will then use addNodeaddEdge 来填充图并定义其控制流。
import { START, StateGraph } from "@langchain/langgraph";

const graph = new StateGraph(State)
  .addNode("step1", step1)
  .addNode("step2", step2)
  .addNode("step3", step3)
  .addEdge(START, "step1")
  .addEdge("step1", "step2")
  .addEdge("step2", "step3")
  .compile();
Specifying custom names You can specify custom names for nodes using .addNode:
const graph = new StateGraph(State)
.addNode("myNode", step1)
.compile();
请注意:
  • .addEdge 接收节点名称,对于函数来说默认是 node.name
  • 我们必须指定图的入口点。为此,需要添加一条来自 START 节点 的边。
  • 当没有更多节点可以执行时,图会停止。
接下来我们编译图。这会对图结构进行一些基本检查(例如识别孤立节点)。如果我们要通过检查点为应用添加持久化,也会在这里传入。LangGraph 提供了内置的图可视化工具。我们来检查一下这个顺序。关于可视化的详细信息,请参阅本指南
import * as fs from "node:fs/promises";

const drawableGraph = await graph.getGraphAsync();
const image = await drawableGraph.drawMermaidPng();
const imageBuffer = new Uint8Array(await image.arrayBuffer());

await fs.writeFile("graph.png", imageBuffer);
Let’s proceed with a simple invocation:
import { HumanMessage } from "@langchain/core/messages";

const result = await graph.invoke({ messages: [new HumanMessage("Hi")], extraField: 0 });
console.log(result);
{ messages: [HumanMessage { content: 'Hi' }, AIMessage { content: 'Hello!' }], extraField: 10 }
请注意:
  • 我们通过为单个状态键提供值来启动调用。始终至少需要为一个键提供值。
  • 我们传入的值被第一个节点覆盖。
  • 第二个节点更新了该值。
  • 第三个节点写入了另一个键的值。

创建分支

节点的并行执行对于加速整体图操作至关重要。LangGraph 提供对节点并行执行的原生支持,这可以显著提高基于图的工作流的性能。这种并行化通过扇出和扇入机制实现,利用标准边和条件边。下面是一些示例,展示如何添加创建适合您的分支数据流。

并行运行图节点

在此示例中,我们从 Node A 扇出到 B 和 C,然后扇入到 D。对于我们的状态,我们指定 reducer add 操作。这将合并或累积 State 中特定键的值,而不是简单地覆盖现有值。对于列表,这意味着将新列表与现有列表连接。有关使用 reducer 更新状态的更多详细信息,请参阅上面关于状态 reducer的部分。
import { StateGraph, START, END } from "@langchain/langgraph";
import { registry } from "@langchain/langgraph/zod";
import * as z from "zod";

const State = z.object({
  // The reducer makes this append-only
  aggregate: z.array(z.string()).register(registry, {
    reducer: {
      fn: (x, y) => x.concat(y),
    },
    default: () => [] as string[],
  }),
});

const nodeA = (state: z.infer<typeof State>) => {
  console.log(`Adding "A" to ${state.aggregate}`);
  return { aggregate: ["A"] };
};

const nodeB = (state: z.infer<typeof State>) => {
  console.log(`Adding "B" to ${state.aggregate}`);
  return { aggregate: ["B"] };
};

const nodeC = (state: z.infer<typeof State>) => {
  console.log(`Adding "C" to ${state.aggregate}`);
  return { aggregate: ["C"] };
};

const nodeD = (state: z.infer<typeof State>) => {
  console.log(`Adding "D" to ${state.aggregate}`);
  return { aggregate: ["D"] };
};

const graph = new StateGraph(State)
  .addNode("a", nodeA)
  .addNode("b", nodeB)
  .addNode("c", nodeC)
  .addNode("d", nodeD)
  .addEdge(START, "a")
  .addEdge("a", "b")
  .addEdge("a", "c")
  .addEdge("b", "d")
  .addEdge("c", "d")
  .addEdge("d", END)
  .compile();
import * as fs from "node:fs/promises";

const drawableGraph = await graph.getGraphAsync();
const image = await drawableGraph.drawMermaidPng();
const imageBuffer = new Uint8Array(await image.arrayBuffer());

await fs.writeFile("graph.png", imageBuffer);
With the reducer, you can see that the values added in each node are accumulated.
const result = await graph.invoke({
  aggregate: [],
});
console.log(result);
Adding "A" to []
Adding "B" to ['A']
Adding "C" to ['A']
Adding "D" to ['A', 'B', 'C']
{ aggregate: ['A', 'B', 'C', 'D'] }
In the above example, nodes "b" and "c" are executed concurrently in the same superstep. Because they are in the same step, node "d" executes after both "b" and "c" are finished.Importantly, updates from a parallel superstep may not be ordered consistently. If you need a consistent, predetermined ordering of updates from a parallel superstep, you should write the outputs to a separate field in the state together with a value with which to order them.
LangGraph executes nodes within supersteps, meaning that while parallel branches are executed in parallel, the entire superstep is transactional. If any of these branches raises an exception, none of the updates are applied to the state (the entire superstep errors).Importantly, when using a checkpointer, results from successful nodes within a superstep are saved, and don’t repeat when resumed.If you have error-prone (perhaps want to handle flakey API calls), LangGraph provides two ways to address this:
  1. You can write regular python code within your node to catch and handle exceptions.
  2. You can set a retry_policy to direct the graph to retry nodes that raise certain types of exceptions. Only failing branches are retried, so you needn’t worry about performing redundant work.
Together, these let you perform parallel execution and fully control exception handling.
Set max concurrency You can control the maximum number of concurrent tasks by setting max_concurrency in the configuration when invoking the graph.
const result = await graph.invoke({ value1: "c" }, {configurable: {max_concurrency: 10}});

条件分支

If your fan-out should vary at runtime based on the state, you can use addConditionalEdges to select one or more paths using the graph state. See example below, where node a generates a state update that determines the following node.
import { StateGraph, START, END } from "@langchain/langgraph";
import { registry } from "@langchain/langgraph/zod";
import * as z from "zod";

const State = z.object({
  aggregate: z.array(z.string()).register(registry, {
    reducer: {
      fn: (x, y) => x.concat(y),
    },
    default: () => [] as string[],
  }),
  // Add a key to the state. We will set this key to determine
  // how we branch.
  which: z.string().register(registry, {  
    reducer: {
      fn: (x, y) => y ?? x,
    },
  }),
});

const nodeA = (state: z.infer<typeof State>) => {
  console.log(`Adding "A" to ${state.aggregate}`);
  return { aggregate: ["A"], which: "c" };
};

const nodeB = (state: z.infer<typeof State>) => {
  console.log(`Adding "B" to ${state.aggregate}`);
  return { aggregate: ["B"] };
};

const nodeC = (state: z.infer<typeof State>) => {
  console.log(`Adding "C" to ${state.aggregate}`);
  return { aggregate: ["C"] };  
};

const conditionalEdge = (state: z.infer<typeof State>): "b" | "c" => {
  // Fill in arbitrary logic here that uses the state
  // to determine the next node
  return state.which as "b" | "c";
};

const graph = new StateGraph(State)
  .addNode("a", nodeA)
  .addNode("b", nodeB)
  .addNode("c", nodeC)
  .addEdge(START, "a")
  .addEdge("b", END)
  .addEdge("c", END)
  .addConditionalEdges("a", conditionalEdge)
  .compile();
import * as fs from "node:fs/promises";

const drawableGraph = await graph.getGraphAsync();
const image = await drawableGraph.drawMermaidPng();
const imageBuffer = new Uint8Array(await image.arrayBuffer());

await fs.writeFile("graph.png", imageBuffer);
const result = await graph.invoke({ aggregate: [] });
console.log(result);
Adding "A" to []
Adding "C" to ['A']
{ aggregate: ['A', 'C'], which: 'c' }
Your conditional edges can route to multiple destination nodes. For example:
const routeBcOrCd = (state: z.infer<typeof State>): string[] => {
if (state.which === "cd") {
return ["c", "d"];
}
return ["b", "c"];
};

Map-Reduce 和 Send API

LangGraph 支持使用 Send API 进行 map-reduce 和其他高级分支模式。以下是如何使用它的示例:
import { StateGraph, START, END, Send } from "@langchain/langgraph";
import { registry } from "@langchain/langgraph/zod";
import * as z from "zod";

const OverallState = z.object({
  topic: z.string(),
  subjects: z.array(z.string()),
  jokes: z.array(z.string()).register(registry, {
    reducer: {
      fn: (x, y) => x.concat(y),
    },
  }),
  bestSelectedJoke: z.string(),
});

const generateTopics = (state: z.infer<typeof OverallState>) => {
  return { subjects: ["lions", "elephants", "penguins"] };
};

const generateJoke = (state: { subject: string }) => {
  const jokeMap: Record<string, string> = {
    lions: "Why don't lions like fast food? Because they can't catch it!",
    elephants: "Why don't elephants use computers? They're afraid of the mouse!",
    penguins: "Why don't penguins like talking to strangers at parties? Because they find it hard to break the ice."
  };
  return { jokes: [jokeMap[state.subject]] };
};

const continueToJokes = (state: z.infer<typeof OverallState>) => {
  return state.subjects.map((subject) => new Send("generateJoke", { subject }));
};

const bestJoke = (state: z.infer<typeof OverallState>) => {
  return { bestSelectedJoke: "penguins" };
};

const graph = new StateGraph(OverallState)
  .addNode("generateTopics", generateTopics)
  .addNode("generateJoke", generateJoke)
  .addNode("bestJoke", bestJoke)
  .addEdge(START, "generateTopics")
  .addConditionalEdges("generateTopics", continueToJokes)
  .addEdge("generateJoke", "bestJoke")
  .addEdge("bestJoke", END)
  .compile();
import * as fs from "node:fs/promises";

const drawableGraph = await graph.getGraphAsync();
const image = await drawableGraph.drawMermaidPng();
const imageBuffer = new Uint8Array(await image.arrayBuffer());

await fs.writeFile("graph.png", imageBuffer);
// Call the graph: here we call it to generate a list of jokes
for await (const step of await graph.stream({ topic: "animals" })) {
  console.log(step);
}
{ generateTopics: { subjects: [ 'lions', 'elephants', 'penguins' ] } }
{ generateJoke: { jokes: [ "Why don't lions like fast food? Because they can't catch it!" ] } }
{ generateJoke: { jokes: [ "Why don't elephants use computers? They're afraid of the mouse!" ] } }
{ generateJoke: { jokes: [ "Why don't penguins like talking to strangers at parties? Because they find it hard to break the ice." ] } }
{ bestJoke: { bestSelectedJoke: 'penguins' } }

创建和控制循环

创建带循环的图时,我们需要一种终止执行的机制。最常见的方法是通过添加条件边,一旦达到某个终止条件,该边就会路由到 END 节点。 您还可以在调用或流式传输图时设置图递归限制。递归限制设置图在引发错误之前允许执行的超级步骤数。有关递归限制概念的更多信息,请参阅此处 让我们考虑一个带循环的简单图,以更好地理解这些机制的工作原理。
To return the last value of your state instead of receiving a recursion limit error, see the next section.
When creating a loop, you can include a conditional edge that specifies a termination condition:
const graph = new StateGraph(State)
  .addNode("a", nodeA)
  .addNode("b", nodeB)
  .addEdge(START, "a")
  .addConditionalEdges("a", route)
  .addEdge("b", "a")
  .compile();

const route = (state: z.infer<typeof State>): "b" | typeof END => {
  if (terminationCondition(state)) {
    return END;
  } else {
    return "b";
  }
};
To control the recursion limit, specify "recursionLimit" in the config. This will raise a GraphRecursionError, which you can catch and handle:
import { GraphRecursionError } from "@langchain/langgraph";

try {
  await graph.invoke(inputs, { recursionLimit: 3 });
} catch (error) {
  if (error instanceof GraphRecursionError) {
    console.log("Recursion Error");
  }
}
让我们定义一个带有简单循环的图。请注意,我们使用条件边来实现终止条件。
import { StateGraph, START, END } from "@langchain/langgraph";
import { registry } from "@langchain/langgraph/zod";
import * as z from "zod";

const State = z.object({
  // The reducer makes this append-only
  aggregate: z.array(z.string()).register(registry, {
    reducer: {
      fn: (x, y) => x.concat(y),
    },
    default: () => [] as string[],
  }),
});

const nodeA = (state: z.infer<typeof State>) => {
  console.log(`Node A sees ${state.aggregate}`);
  return { aggregate: ["A"] };
};

const nodeB = (state: z.infer<typeof State>) => {
  console.log(`Node B sees ${state.aggregate}`);
  return { aggregate: ["B"] };
};

// Define edges
const route = (state: z.infer<typeof State>): "b" | typeof END => {
  if (state.aggregate.length < 7) {
    return "b";
  } else {
    return END;
  }
};

const graph = new StateGraph(State)
  .addNode("a", nodeA)
  .addNode("b", nodeB)
  .addEdge(START, "a")
  .addConditionalEdges("a", route)
  .addEdge("b", "a")
  .compile();
import * as fs from "node:fs/promises";

const drawableGraph = await graph.getGraphAsync();
const image = await drawableGraph.drawMermaidPng();
const imageBuffer = new Uint8Array(await image.arrayBuffer());

await fs.writeFile("graph.png", imageBuffer);
This architecture is similar to a ReAct agent in which node "a" is a tool-calling model, and node "b" represents the tools. In our route conditional edge, we specify that we should end after the "aggregate" list in the state passes a threshold length. Invoking the graph, we see that we alternate between nodes "a" and "b" before terminating once we reach the termination condition.
const result = await graph.invoke({ aggregate: [] });
console.log(result);
{ aggregate: ['A', 'B', 'A', 'B', 'A', 'B', 'A'] }

施加递归限制

在某些应用程序中,我们可能无法保证会达到给定的终止条件。在这些情况下,我们可以设置图的递归限制。这将在给定数量的超级步骤后引发 GraphRecursionError。然后我们可以捕获并处理此异常:
import { GraphRecursionError } from "@langchain/langgraph";

try {
  await graph.invoke({ aggregate: [] }, { recursionLimit: 4 });
} catch (error) {
  if (error instanceof GraphRecursionError) {
    console.log("Recursion Error");
  }
}
Node A sees []
Node B sees ['A']
Node A sees ['A', 'B']
Node B sees ['A', 'B', 'A']
Node A sees ['A', 'B', 'A', 'B']
Recursion Error

使用 Command 组合控制流和状态更新

组合控制流(边)和状态更新(节点)可能很有用。例如,您可能希望在同一个节点同时执行状态更新决定下一步要转到哪个节点。LangGraph 通过从节点函数返回 Command 对象来提供这样做的方法:
import { Command } from "@langchain/langgraph";

const myNode = (state: State): Command => {
  return new Command({
    # state update
    update: { foo: "bar" },
    # control flow
    goto: "myOtherNode"
  });
};
We show an end-to-end example below. Let’s create a simple graph with 3 nodes: A, B and C. We will first execute node A, and then decide whether to go to Node B or Node C next based on the output of node A.
import { StateGraph, START, Command } from "@langchain/langgraph";
import * as z from "zod";

// Define graph state
const State = z.object({
  foo: z.string(),
});

// Define the nodes

const nodeA = (state: z.infer<typeof State>): Command => {
  console.log("Called A");
  const value = Math.random() > 0.5 ? "b" : "c";
  // this is a replacement for a conditional edge function
  const goto = value === "b" ? "nodeB" : "nodeC";

  // 请注意 Command 如何允许您同时更新图状态并路由到下一个节点
  return new Command({
    // this is the state update
    update: { foo: value },
    // this is a replacement for an edge
    goto,
  });
};

const nodeB = (state: z.infer<typeof State>) => {
  console.log("Called B");
  return { foo: state.foo + "b" };
};

const nodeC = (state: z.infer<typeof State>) => {
  console.log("Called C");
  return { foo: state.foo + "c" };
};
We can now create the StateGraph with the above nodes. Notice that the graph doesn’t have conditional edges for routing! This is because control flow is defined with Command inside nodeA.
const graph = new StateGraph(State)
  .addNode("nodeA", nodeA, {
    ends: ["nodeB", "nodeC"],
  })
  .addNode("nodeB", nodeB)
  .addNode("nodeC", nodeC)
  .addEdge(START, "nodeA")
  .compile();
You might have noticed that we used ends to specify which nodes nodeA can navigate to. This is necessary for the graph rendering and tells LangGraph that nodeA can navigate to nodeB and nodeC.
import * as fs from "node:fs/promises";

const drawableGraph = await graph.getGraphAsync();
const image = await drawableGraph.drawMermaidPng();
const imageBuffer = new Uint8Array(await image.arrayBuffer());

await fs.writeFile("graph.png", imageBuffer);
If we run the graph multiple times, we’d see it take different paths (A -> B or A -> C) based on the random choice in node A.
const result = await graph.invoke({ foo: "" });
console.log(result);
Called A
Called C
{ foo: 'cc' }

导航到父图中的节点

如果您正在使用子图,您可能希望从子图内的节点导航到不同的子图(即父图中的不同节点)。为此,您可以在 Command 中指定 graph=Command.PARENT
const myNode = (state: State): Command => {
  return new Command({
    update: { foo: "bar" },
    goto: "otherSubgraph",  // where `otherSubgraph` is a node in the parent graph
    graph: Command.PARENT
  });
};
让我们使用上面的示例来演示这一点。我们将通过将上面示例中的 nodeA 更改为单节点图来实现这一点,我们将把它作为子图添加到父图中。
使用 Command.PARENT 的状态更新 当您从子图节点向父图节点发送对于父图和子图状态模式共享的键的更新时,您必须在父图状态中为您正在更新的键定义一个 reducer。请参阅下面的示例。
import { StateGraph, START, Command } from "@langchain/langgraph";
import { registry } from "@langchain/langgraph/zod";
import * as z from "zod";

const State = z.object({
  // 注意:我们在这里定义一个 reducer
  foo: z.string().register(registry, {  
    reducer: {
      fn: (x, y) => x + y,
    },
  }),
});

const nodeA = (state: z.infer<typeof State>) => {
  console.log("Called A");
  const value = Math.random() > 0.5 ? "nodeB" : "nodeC";

  // 请注意 Command 如何允许您同时更新图状态并路由到下一个节点
  return new Command({
    update: { foo: "a" },  
    goto: value,
    // this tells LangGraph to navigate to nodeB or nodeC in the parent graph
    // 注意:这将导航到相对于子图最近的父图
    graph: Command.PARENT,
  });
};

const subgraph = new StateGraph(State)
  .addNode("nodeA", nodeA, { ends: ["nodeB", "nodeC"] })
  .addEdge(START, "nodeA")
  .compile();

const nodeB = (state: z.infer<typeof State>) => {
  console.log("Called B");  
  // 注意:由于我们已经定义了一个 reducer,我们不需要手动将新字符追加到现有的 'foo' 值。相反,reducer 会自动追加这些
  return { foo: "b" };
};  

const nodeC = (state: z.infer<typeof State>) => {
  console.log("Called C");
  return { foo: "c" };
};

const graph = new StateGraph(State)
  .addNode("subgraph", subgraph, { ends: ["nodeB", "nodeC"] })
  .addNode("nodeB", nodeB)
  .addNode("nodeC", nodeC)
  .addEdge(START, "subgraph")
  .compile();
const result = await graph.invoke({ foo: "" });
console.log(result);
Called A
Called C
{ foo: 'ac' }

在工具内使用

一个常见的用例是从工具内部更新图状态。例如,在客户支持应用程序中,您可能希望在对话开始时根据客户的帐号或 ID 查找客户信息。要从工具更新图状态,您可以从工具返回 Command(update={"my_custom_key": "foo", "messages": [...]})
import { tool } from "@langchain/core/tools";
import { Command } from "@langchain/langgraph";
import * as z from "zod";

const lookupUserInfo = tool(
  async (input, config) => {
    const userId = config.configurable?.userId;
    const userInfo = getUserInfo(userId);
    return new Command({
      update: {
        # update the state keys
        userInfo: userInfo,
        # update the message history
        messages: [{
          role: "tool",
          content: "Successfully looked up user information",
          tool_call_id: config.toolCall.id
        }]
      }
    });
  },
  {
    name: "lookupUserInfo",
    description: "Use this to look up user information to better assist them with their questions.",
    schema: z.object({}),
  }
);
You MUST include messages (or any state key used for the message history) in Command.update when returning Command from a tool and the list of messages in messages MUST contain a ToolMessage. This is necessary for the resulting message history to be valid (LLM providers require AI messages with tool calls to be followed by the tool result messages).
If you are using tools that update state via Command, we recommend using prebuilt ToolNode which automatically handles tools returning Command objects and propagates them to the graph state. If you’re writing a custom node that calls tools, you would need to manually propagate Command objects returned by the tools as the update from the node.

可视化您的图

这里我们演示如何可视化您创建的图。 您可以可视化任何任意 Graph,包括 StateGraph Let’s create a simple example graph to demonstrate visualization.
import { StateGraph, START, END, MessagesZodMeta } from "@langchain/langgraph";
import { BaseMessage } from "@langchain/core/messages";
import { registry } from "@langchain/langgraph/zod";
import * as z from "zod";

const State = z.object({
  messages: z
    .array(z.custom<BaseMessage>())
    .register(registry, MessagesZodMeta),
  value: z.number().register(registry, {
    reducer: {
      fn: (x, y) => x + y,
    },
  }),
});

const app = new StateGraph(State)
  .addNode("node1", (state) => {
    return { value: state.value + 1 };
  })
  .addNode("node2", (state) => {
    return { value: state.value * 2 };
  })
  .addEdge(START, "node1")
  .addConditionalEdges("node1", (state) => {
    if (state.value < 10) {
      return "node2";
    }
    return END;
  })
  .addEdge("node2", "node1")
  .compile();

Mermaid

我们还可以将图类转换为 Mermaid 语法。
const drawableGraph = await app.getGraphAsync();
console.log(drawableGraph.drawMermaid());
%%{init: {'flowchart': {'curve': 'linear'}}}%%
graph TD;
    tart__([<p>__start__</p>]):::first
    e1(node1)
    e2(node2)
    nd__([<p>__end__</p>]):::last
    tart__ --> node1;
    e1 -.-> node2;
    e1 -.-> __end__;
    e2 --> node1;
    ssDef default fill:#f2f0ff,line-height:1.2
    ssDef first fill-opacity:0
    ssDef last fill:#bfb6fc

PNG

If preferred, we could render the Graph into a .png. This uses the Mermaid.ink API to generate the diagram.
import * as fs from "node:fs/promises";

const drawableGraph = await app.getGraphAsync();
const image = await drawableGraph.drawMermaidPng();
const imageBuffer = new Uint8Array(await image.arrayBuffer());

await fs.writeFile("graph.png", imageBuffer);

Connect these docs programmatically to Claude, VSCode, and more via MCP for real-time answers.