本指南演示 LangGraph 图 API 的基础知识。它详细介绍了状态,以及组合常见图结构,如序列分支循环。它还涵盖了 LangGraph 的控制功能,包括用于 map-reduce 工作流程的 Send API 和用于组合状态更新与节点之间”跳转”的 Command API

设置

安装 langgraph
pip install -U langgraph
设置 LangSmith 以获得更好的调试 注册 LangSmith 以快速发现问题并提高 LangGraph 项目的性能。LangSmith 让您使用跟踪数据来调试、测试和监控使用 LangGraph 构建的 LLM 应用程序 — 在文档中阅读更多关于如何开始的信息。

定义和更新状态

这里我们展示如何在 LangGraph 中定义和更新状态。我们将演示:
  1. 如何使用状态定义图的模式
  2. 如何使用 reducers 控制状态更新的处理方式。

定义状态

LangGraph 中的状态可以是 TypedDictPydantic 模型或 dataclass。下面我们将使用 TypedDict。有关使用 Pydantic 的详细信息,请参阅此部分 默认情况下,图将具有相同的输入和输出模式,状态决定该模式。有关如何定义不同的输入和输出模式,请参阅此部分 让我们考虑使用消息的一个简单示例。这代表了许多 LLM 应用程序的通用状态表述。有关更多详细信息,请参阅我们的概念页面
from langchain.messages import AnyMessage
from typing_extensions import TypedDict

class State(TypedDict):
    messages: list[AnyMessage]
    extra_field: int
此状态跟踪消息对象列表以及一个额外的整数字段。

更新状态

让我们构建一个包含单个节点的示例图。我们的节点只是一个读取图状态并对其进行更新的 Python 函数。此函数的第一个参数始终是状态:
from langchain.messages import AIMessage

def node(state: State):
    messages = state["messages"]
    new_message = AIMessage("Hello!")
    return {"messages": messages + [new_message], "extra_field": 10}
此节点简单地将消息追加到我们的消息列表,并填充一个额外字段。
节点应该直接返回状态更新,而不是改变状态。
接下来,让我们定义一个包含此节点的简单图。我们使用 StateGraph 定义一个在此状态上操作的图。然后我们使用 add_node 填充我们的图。
from langgraph.graph import StateGraph

builder = StateGraph(State)
builder.add_node(node)
builder.set_entry_point("node")
graph = builder.compile()
LangGraph 提供了用于可视化图的内置实用程序。让我们检查我们的图。有关可视化的详细信息,请参阅此部分
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))
Simple graph with single node In this case, our graph just executes a single node. Let’s proceed with a simple invocation:
from langchain.messages import HumanMessage

result = graph.invoke({"messages": [HumanMessage("Hi")]})
result
{'messages': [HumanMessage(content='Hi'), AIMessage(content='Hello!')], 'extra_field': 10}
请注意:
  • 我们通过更新状态中的单个键来启动调用。
  • 调用结果会返回整个状态。
为方便起见,我们经常使用 pretty-print 来查看消息对象的内容:
for message in result["messages"]:
    message.pretty_print()

使用 reducer 处理状态更新

状态中的每个键都可以有自己的独立 reducer 函数,它控制如何应用来自节点的更新。如果未明确指定 reducer 函数,则假定对该键的所有更新都应覆盖它。 对于 TypedDict 状态模式,我们可以通过使用 reducer 函数注释状态的相应字段来定义 reducer。 在 earlier 示例中,我们的节点通过向其追加消息来更新状态中的 "messages" 键。下面,我们向此键添加一个 reducer,以便自动追加更新:
from typing_extensions import Annotated

def add(left, right):
    """Can also import `add` from the `operator` built-in."""
    return left + right

class State(TypedDict):
    messages: Annotated[list[AnyMessage], add]  
    extra_field: int
Now our node can be simplified:
def node(state: State):
    new_message = AIMessage("Hello!")
    return {"messages": [new_message], "extra_field": 10}  
from langgraph.graph import START

graph = StateGraph(State).add_node(node).add_edge(START, "node").compile()

result = graph.invoke({"messages": [HumanMessage("Hi")]})

for message in result["messages"]:
    message.pretty_print()
================================ Human Message ================================

Hi
================================== Ai Message ==================================

Hello!

MessagesState

在实践中,更新消息列表还有其他考虑因素:
  • 我们可能希望更新状态中的现有消息。
  • 我们可能希望接受消息格式的简写,例如 OpenAI 格式
LangGraph 包含一个内置的 reducer add_messages,它处理这些考虑因素:
from langgraph.graph.message import add_messages

class State(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]  
    extra_field: int

def node(state: State):
    new_message = AIMessage("Hello!")
    return {"messages": [new_message], "extra_field": 10}

graph = StateGraph(State).add_node(node).set_entry_point("node").compile()
input_message = {"role": "user", "content": "Hi"}  

result = graph.invoke({"messages": [input_message]})

for message in result["messages"]:
    message.pretty_print()
================================ Human Message ================================

Hi
================================== Ai Message ==================================

Hello!
This is a versatile representation of state for applications involving chat models. LangGraph includes a pre-built MessagesState for convenience, so that we can have:
from langgraph.graph import MessagesState

class State(MessagesState):
    extra_field: int

使用 Overwrite 绕过 reducer

在某些情况下,您可能希望绕过 reducer 并直接覆盖状态值。LangGraph 为此目的提供了 Overwrite 类型。当节点返回用 Overwrite 包装的值时,reducer 被绕过,通道直接设置为该值。 当您想要重置或替换累积状态而不是将其与现有值合并时,这很有用。
from langgraph.graph import StateGraph, START, END
from langgraph.types import Overwrite
from typing_extensions import Annotated, TypedDict
import operator

class State(TypedDict):
    messages: Annotated[list, operator.add]

def add_message(state: State):
    return {"messages": ["first message"]}

def replace_messages(state: State):
    # Bypass the reducer and replace the entire messages list
    return {"messages": Overwrite(["replacement message"])}

builder = StateGraph(State)
builder.add_node("add_message", add_message)
builder.add_node("replace_messages", replace_messages)
builder.add_edge(START, "add_message")
builder.add_edge("add_message", "replace_messages")
builder.add_edge("replace_messages", END)

graph = builder.compile()

result = graph.invoke({"messages": ["initial"]})
print(result["messages"])
['replacement message']
You can also use JSON format with the special key "__overwrite__":
def replace_messages(state: State):
    return {"messages": {"__overwrite__": ["replacement message"]}}
When nodes execute in parallel, only one node can use Overwrite on the same state key in a given super-step. If multiple nodes attempt to overwrite the same key in the same super-step, an InvalidUpdateError will be raised.

定义输入和输出模式

默认情况下,StateGraph 使用单个模式运行,所有节点都期望使用该模式进行通信。但是,也可以为图定义不同的输入和输出模式。 当指定不同的模式时,内部模式仍将用于节点之间的通信。输入模式确保提供的输入匹配预期结构,而输出模式根据定义的输出模式过滤内部数据,仅返回相关信息。 下面,我们将看到如何定义不同的输入和输出模式。
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict

# Define the schema for the input
class InputState(TypedDict):
    question: str

# Define the schema for the output
class OutputState(TypedDict):
    answer: str

# Define the overall schema, combining both input and output
class OverallState(InputState, OutputState):
    pass

# Define the node that processes the input and generates an answer
def answer_node(state: InputState):
    # Example answer and an extra key
    return {"answer": "bye", "question": state["question"]}

# Build the graph with input and output schemas specified
builder = StateGraph(OverallState, input_schema=InputState, output_schema=OutputState)
builder.add_node(answer_node)  # Add the answer node
builder.add_edge(START, "answer_node")  # Define the starting edge
builder.add_edge("answer_node", END)  # Define the ending edge
graph = builder.compile()  # Compile the graph

# Invoke the graph with an input and print the result
print(graph.invoke({"question": "hi"}))
{'answer': 'bye'}
Notice that the output of invoke only includes the output schema.

在节点之间传递私有状态

在某些情况下,您可能希望节点交换对中间逻辑至关重要但不需要成为图主模式一部分的信息。此私有数据与图的整体输入/输出无关,应仅在特定节点之间共享。 下面,我们将创建一个由三个节点(node_1、node_2 和 node_3)组成的示例顺序图,其中私有数据在前两个步骤(node_1 和 node_2)之间传递,而第三步(node_3)只能访问公共整体状态。
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict

# The overall state of the graph (this is the public state shared across nodes)
class OverallState(TypedDict):
    a: str

# Output from node_1 contains private data that is not part of the overall state
class Node1Output(TypedDict):
    private_data: str

# The private data is only shared between node_1 and node_2
def node_1(state: OverallState) -> Node1Output:
    output = {"private_data": "set by node_1"}
    print(f"Entered node `node_1`:\n\tInput: {state}.\n\tReturned: {output}")
    return output

# Node 2 input only requests the private data available after node_1
class Node2Input(TypedDict):
    private_data: str

def node_2(state: Node2Input) -> OverallState:
    output = {"a": "set by node_2"}
    print(f"Entered node `node_2`:\n\tInput: {state}.\n\tReturned: {output}")
    return output

# Node 3 only has access to the overall state (no access to private data from node_1)
def node_3(state: OverallState) -> OverallState:
    output = {"a": "set by node_3"}
    print(f"Entered node `node_3`:\n\tInput: {state}.\n\tReturned: {output}")
    return output

# Connect nodes in a sequence
# node_2 accepts private data from node_1, whereas
# node_3 does not see the private data.
builder = StateGraph(OverallState).add_sequence([node_1, node_2, node_3])
builder.add_edge(START, "node_1")
graph = builder.compile()

# Invoke the graph with the initial state
response = graph.invoke(
    {
        "a": "set at start",
    }
)

print()
print(f"Output of graph invocation: {response}")
Entered node `node_1`:
    Input: {'a': 'set at start'}.
    Returned: {'private_data': 'set by node_1'}
Entered node `node_2`:
    Input: {'private_data': 'set by node_1'}.
    Returned: {'a': 'set by node_2'}
Entered node `node_3`:
    Input: {'a': 'set by node_2'}.
    Returned: {'a': 'set by node_3'}

Output of graph invocation: {'a': 'set by node_3'}

使用 Pydantic 模型作为图状态

StateGraph 在初始化时接受 state_schema 参数,该参数指定图中节点可以访问和更新的状态的”形状”。 在我们的示例中,我们通常使用 Python 原生的 TypedDictdataclass 作为 state_schema,但 state_schema 可以是任何类型 在这里,我们将看到如何使用 Pydantic BaseModel 作为 state_schema 来对输入添加运行时验证。
Known Limitations
  • Currently, the output of the graph will NOT be an instance of a pydantic model.
  • Run-time validation only occurs on inputs to the first node in the graph, not on subsequent nodes or outputs.
  • The validation error trace from pydantic does not show which node the error arises in.
  • Pydantic’s recursive validation can be slow. For performance-sensitive applications, you may want to consider using a dataclass instead.
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
from pydantic import BaseModel

# The overall state of the graph (this is the public state shared across nodes)
class OverallState(BaseModel):
    a: str

def node(state: OverallState):
    return {"a": "goodbye"}

# Build the state graph
builder = StateGraph(OverallState)
builder.add_node(node)  # node_1 is the first node
builder.add_edge(START, "node")  # Start the graph with node_1
builder.add_edge("node", END)  # End the graph after node_1
graph = builder.compile()

# Test the graph with a valid input
graph.invoke({"a": "hello"})
Invoke the graph with an invalid input
try:
    graph.invoke({"a": 123})  # Should be a string
except Exception as e:
    print("An exception was raised because `a` is an integer rather than a string.")
    print(e)
An exception was raised because `a` is an integer rather than a string.
1 validation error for OverallState
a
  Input should be a valid string [type=string_type, input_value=123, input_type=int]
    For further information visit https://errors.pydantic.dev/2.9/v/string_type
See below for additional features of Pydantic model state:
When using Pydantic models as state schemas, it’s important to understand how serialization works, especially when:
  • Passing Pydantic objects as inputs
  • Receiving outputs from the graph
  • Working with nested Pydantic models
Let’s see these behaviors in action.
from langgraph.graph import StateGraph, START, END
from pydantic import BaseModel

class NestedModel(BaseModel):
    value: str

class ComplexState(BaseModel):
    text: str
    count: int
    nested: NestedModel

def process_node(state: ComplexState):
    # Node receives a validated Pydantic object
    print(f"Input state type: {type(state)}")
    print(f"Nested type: {type(state.nested)}")
    # Return a dictionary update
    return {"text": state.text + " processed", "count": state.count + 1}

# Build the graph
builder = StateGraph(ComplexState)
builder.add_node("process", process_node)
builder.add_edge(START, "process")
builder.add_edge("process", END)
graph = builder.compile()

# Create a Pydantic instance for input
input_state = ComplexState(text="hello", count=0, nested=NestedModel(value="test"))
print(f"Input object type: {type(input_state)}")

# Invoke graph with a Pydantic instance
result = graph.invoke(input_state)
print(f"Output type: {type(result)}")
print(f"Output content: {result}")

# Convert back to Pydantic model if needed
output_model = ComplexState(**result)
print(f"Converted back to Pydantic: {type(output_model)}")
Pydantic performs runtime type coercion for certain data types. This can be helpful but also lead to unexpected behavior if you’re not aware of it.
from langgraph.graph import StateGraph, START, END
from pydantic import BaseModel

class CoercionExample(BaseModel):
    # Pydantic will coerce string numbers to integers
    number: int
    # Pydantic will parse string booleans to bool
    flag: bool

def inspect_node(state: CoercionExample):
    print(f"number: {state.number} (type: {type(state.number)})")
    print(f"flag: {state.flag} (type: {type(state.flag)})")
    return {}

builder = StateGraph(CoercionExample)
builder.add_node("inspect", inspect_node)
builder.add_edge(START, "inspect")
builder.add_edge("inspect", END)
graph = builder.compile()

# Demonstrate coercion with string inputs that will be converted
result = graph.invoke({"number": "42", "flag": "true"})

# This would fail with a validation error
try:
    graph.invoke({"number": "not-a-number", "flag": "true"})
except Exception as e:
    print(f"\nExpected validation error: {e}")
When working with LangChain message types in your state schema, there are important considerations for serialization. You should use AnyMessage (rather than BaseMessage) for proper serialization/deserialization when using message objects over the wire.
from langgraph.graph import StateGraph, START, END
from pydantic import BaseModel
from langchain.messages import HumanMessage, AIMessage, AnyMessage
from typing import List

class ChatState(BaseModel):
    messages: List[AnyMessage]
    context: str

def add_message(state: ChatState):
    return {"messages": state.messages + [AIMessage(content="Hello there!")]}

builder = StateGraph(ChatState)
builder.add_node("add_message", add_message)
builder.add_edge(START, "add_message")
builder.add_edge("add_message", END)
graph = builder.compile()

# Create input with a message
initial_state = ChatState(
    messages=[HumanMessage(content="Hi")], context="Customer support chat"
)

result = graph.invoke(initial_state)
print(f"Output: {result}")

# Convert back to Pydantic model to see message types
output_model = ChatState(**result)
for i, msg in enumerate(output_model.messages):
    print(f"Message {i}: {type(msg).__name__} - {msg.content}")

添加运行时配置

有时您希望在调用图时能够配置它。例如,您可能希望在运行时指定要使用的 LLM 或系统提示,而不会用这些参数污染图状态 要添加运行时配置:
  1. 为您的配置指定一个模式
  2. 将配置添加到节点或条件边的函数签名中
  3. 将配置传递给图。
See below for a simple example:
from langgraph.graph import END, StateGraph, START
from langgraph.runtime import Runtime
from typing_extensions import TypedDict

# 1. Specify config schema
class ContextSchema(TypedDict):
    my_runtime_value: str

# 2. Define a graph that accesses the config in a node
class State(TypedDict):
    my_state_value: str

def node(state: State, runtime: Runtime[ContextSchema]):  
    if runtime.context["my_runtime_value"] == "a":  
        return {"my_state_value": 1}
    elif runtime.context["my_runtime_value"] == "b":  
        return {"my_state_value": 2}
    else:
        raise ValueError("Unknown values.")

builder = StateGraph(State, context_schema=ContextSchema)  
builder.add_node(node)
builder.add_edge(START, "node")
builder.add_edge("node", END)

graph = builder.compile()

# 3. Pass in configuration at runtime:
print(graph.invoke({}, context={"my_runtime_value": "a"}))  
print(graph.invoke({}, context={"my_runtime_value": "b"}))  
{'my_state_value': 1}
{'my_state_value': 2}
Below we demonstrate a practical example in which we configure what LLM to use at runtime. We will use both OpenAI and Anthropic models.
from dataclasses import dataclass

from langchain.chat_models import init_chat_model
from langgraph.graph import MessagesState, END, StateGraph, START
from langgraph.runtime import Runtime
from typing_extensions import TypedDict

@dataclass
class ContextSchema:
    model_provider: str = "anthropic"

MODELS = {
    "anthropic": init_chat_model("claude-haiku-4-5-20251001"),
    "openai": init_chat_model("gpt-4.1-mini"),
}

def call_model(state: MessagesState, runtime: Runtime[ContextSchema]):
    model = MODELS[runtime.context.model_provider]
    response = model.invoke(state["messages"])
    return {"messages": [response]}

builder = StateGraph(MessagesState, context_schema=ContextSchema)
builder.add_node("model", call_model)
builder.add_edge(START, "model")
builder.add_edge("model", END)

graph = builder.compile()

# Usage
input_message = {"role": "user", "content": "hi"}
# With no configuration, uses default (Anthropic)
response_1 = graph.invoke({"messages": [input_message]}, context=ContextSchema())["messages"][-1]
# Or, can set OpenAI
response_2 = graph.invoke({"messages": [input_message]}, context={"model_provider": "openai"})["messages"][-1]

print(response_1.response_metadata["model_name"])
print(response_2.response_metadata["model_name"])
claude-haiku-4-5-20251001
gpt-4.1-mini-2025-04-14
Below we demonstrate a practical example in which we configure two parameters: the LLM and system message to use at runtime.
from dataclasses import dataclass
from langchain.chat_models import init_chat_model
from langchain.messages import SystemMessage
from langgraph.graph import END, MessagesState, StateGraph, START
from langgraph.runtime import Runtime
from typing_extensions import TypedDict

@dataclass
class ContextSchema:
    model_provider: str = "anthropic"
    system_message: str | None = None

MODELS = {
    "anthropic": init_chat_model("claude-haiku-4-5-20251001"),
    "openai": init_chat_model("gpt-4.1-mini"),
}

def call_model(state: MessagesState, runtime: Runtime[ContextSchema]):
    model = MODELS[runtime.context.model_provider]
    messages = state["messages"]
    if (system_message := runtime.context.system_message):
        messages = [SystemMessage(system_message)] + messages
    response = model.invoke(messages)
    return {"messages": [response]}

builder = StateGraph(MessagesState, context_schema=ContextSchema)
builder.add_node("model", call_model)
builder.add_edge(START, "model")
builder.add_edge("model", END)

graph = builder.compile()

# Usage
input_message = {"role": "user", "content": "hi"}
response = graph.invoke({"messages": [input_message]}, context={"model_provider": "openai", "system_message": "Respond in Italian."})
for message in response["messages"]:
    message.pretty_print()
================================ Human Message ================================

hi
================================== Ai Message ==================================

Ciao! Come posso aiutarti oggi?

添加重试策略

在许多用例中,您可能希望节点具有自定义重试策略,例如,如果您正在调用 API、查询数据库或调用 LLM 等。LangGraph 允许您向节点添加重试策略。 To configure a retry policy, pass the retry_policy parameter to the add_node. The retry_policy parameter takes in a RetryPolicy named tuple object. Below we instantiate a RetryPolicy object with the default parameters and associate it with a node:
from langgraph.types import RetryPolicy

builder.add_node(
    "node_name",
    node_function,
    retry_policy=RetryPolicy(),
)
By default, the retry_on parameter uses the default_retry_on function, which retries on any exception except for the following:
  • ValueError
  • TypeError
  • ArithmeticError
  • ImportError
  • LookupError
  • NameError
  • SyntaxError
  • RuntimeError
  • ReferenceError
  • StopIteration
  • StopAsyncIteration
  • OSError
In addition, for exceptions from popular http request libraries such as requests and httpx it only retries on 5xx status codes.
Consider an example in which we are reading from a SQL database. Below we pass two different retry policies to nodes:
import sqlite3
from typing_extensions import TypedDict
from langchain.chat_models import init_chat_model
from langgraph.graph import END, MessagesState, StateGraph, START
from langgraph.types import RetryPolicy
from langchain_community.utilities import SQLDatabase
from langchain.messages import AIMessage

db = SQLDatabase.from_uri("sqlite:///:memory:")
model = init_chat_model("claude-haiku-4-5-20251001")

def query_database(state: MessagesState):
    query_result = db.run("SELECT * FROM Artist LIMIT 10;")
    return {"messages": [AIMessage(content=query_result)]}

def call_model(state: MessagesState):
    response = model.invoke(state["messages"])
    return {"messages": [response]}

# Define a new graph
builder = StateGraph(MessagesState)
builder.add_node(
    "query_database",
    query_database,
    retry_policy=RetryPolicy(retry_on=sqlite3.OperationalError),
)
builder.add_node("model", call_model, retry_policy=RetryPolicy(max_attempts=5))
builder.add_edge(START, "model")
builder.add_edge("model", "query_database")
builder.add_edge("query_database", END)
graph = builder.compile()

添加节点缓存

节点缓存在您希望避免重复操作的情况下很有用,例如在执行昂贵操作(在时间或成本方面)时。LangGraph 允许您向图中的节点添加个性化缓存策略。 To configure a cache policy, pass the cache_policy parameter to the add_node function. In the following example, a CachePolicy object is instantiated with a time to live of 120 seconds and the default key_func generator. Then it is associated with a node:
from langgraph.types import CachePolicy

builder.add_node(
    "node_name",
    node_function,
    cache_policy=CachePolicy(ttl=120),
)
Then, to enable node-level caching for a graph, set the cache argument when compiling the graph. The example below uses InMemoryCache to set up a graph with in-memory cache, but SqliteCache is also available.
from langgraph.cache.memory import InMemoryCache

graph = builder.compile(cache=InMemoryCache())

创建步骤序列

先决条件 本指南假设您熟悉上面关于状态的部分。
这里我们演示如何构建一个简单的步骤序列。我们将展示:
  1. 如何构建顺序图
  2. 用于构建类似图的内置简写。
To add a sequence of nodes, we use the add_node and add_edge methods of our graph:
from langgraph.graph import START, StateGraph

builder = StateGraph(State)

# Add nodes
builder.add_node(step_1)
builder.add_node(step_2)
builder.add_node(step_3)

# Add edges
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
We can also use the built-in shorthand .add_sequence:
builder = StateGraph(State).add_sequence([step_1, step_2, step_3])
builder.add_edge(START, "step_1")
LangGraph makes it easy to add an underlying persistence layer to your application. This allows state to be checkpointed in between the execution of nodes, so your LangGraph nodes govern:They also determine how execution steps are streamed, and how your application is visualized and debugged using Studio.Let’s demonstrate an end-to-end example. We will create a sequence of three steps:
  1. Populate a value in a key of the state
  2. Update the same value
  3. Populate a different value
Let’s first define our state. This governs the schema of the graph, and can also specify how to apply updates. See this section for more detail.In our case, we will just keep track of two values:
from typing_extensions import TypedDict

class State(TypedDict):
    value_1: str
    value_2: int
Our nodes are just Python functions that read our graph’s state and make updates to it. The first argument to this function will always be the state:
def step_1(state: State):
    return {"value_1": "a"}

def step_2(state: State):
    current_value_1 = state["value_1"]
    return {"value_1": f"{current_value_1} b"}

def step_3(state: State):
    return {"value_2": 10}
请注意,在向状态发出更新时,每个节点只需指定它希望更新的键的值即可。默认情况下,这会覆盖相应键的值。您也可以使用归约器来控制如何处理更新——例如,您可以选择将连续的更新追加到某个键上。详细信息请参见这一节
Finally, we define the graph. We use StateGraph to define a graph that operates on this state.接下来我们将使用 add_nodeadd_edge 来填充图并定义其控制流。
from langgraph.graph import START, StateGraph

builder = StateGraph(State)

# Add nodes
builder.add_node(step_1)
builder.add_node(step_2)
builder.add_node(step_3)

# Add edges
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
Specifying custom names You can specify custom names for nodes using add_node:
builder.add_node("my_node", step_1)
请注意:
  • add_edge 接收节点名称,对于函数来说默认是 node.__name__
  • 我们必须指定图的入口点。为此,需要添加一条来自 START 节点 的边。
  • 当没有更多节点可以执行时,图会停止。
接下来我们编译图。这会对图结构进行一些基本检查(例如识别孤立节点)。如果我们要通过检查点为应用添加持久化,也会在这里传入。
graph = builder.compile()
LangGraph 提供了内置的图可视化工具。我们来检查一下这个顺序。关于可视化的详细信息,请参阅本指南
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))
Sequence of steps graphLet’s proceed with a simple invocation:
from langchain.messages import HumanMessage

result = graph.invoke({"messages": [HumanMessage("Hi")]})
result
{'messages': [HumanMessage(content='Hi'), AIMessage(content='Hello!')], 'extra_field': 10}
请注意:
  • 我们通过为单个状态键提供值来启动调用。始终至少需要为一个键提供值。
  • 我们传入的值被第一个节点覆盖。
  • 第二个节点更新了该值。
  • 第三个节点写入了另一个键的值。
Built-in shorthand langgraph>=0.2.46 包括一个内置的短语法 add_sequence 用于添加节点序列。您可以编译相同的图如下:
builder = StateGraph(State).add_sequence([step_1, step_2, step_3])  
builder.add_edge(START, "step_1")

graph = builder.compile()

graph.invoke({"value_1": "c"})

创建分支

节点的并行执行对于加速整体图操作至关重要。LangGraph 提供对节点并行执行的原生支持,这可以显著提高基于图的工作流的性能。这种并行化通过扇出和扇入机制实现,利用标准边和条件边。下面是一些示例,展示如何添加创建适合您的分支数据流。

并行运行图节点

在此示例中,我们从 Node A 扇出到 B 和 C,然后扇入到 D。对于我们的状态,我们指定 reducer add 操作。这将合并或累积 State 中特定键的值,而不是简单地覆盖现有值。对于列表,这意味着将新列表与现有列表连接。有关使用 reducer 更新状态的更多详细信息,请参阅上面关于状态 reducer的部分。
import operator
from typing import Annotated, Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    # The operator.add reducer fn makes this append-only
    aggregate: Annotated[list, operator.add]

def a(state: State):
    print(f'Adding "A" to {state["aggregate"]}')
    return {"aggregate": ["A"]}

def b(state: State):
    print(f'Adding "B" to {state["aggregate"]}')
    return {"aggregate": ["B"]}

def c(state: State):
    print(f'Adding "C" to {state["aggregate"]}')
    return {"aggregate": ["C"]}

def d(state: State):
    print(f'Adding "D" to {state["aggregate"]}')
    return {"aggregate": ["D"]}

builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_node(d)
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))
Parallel execution graph With the reducer, you can see that the values added in each node are accumulated.
graph.invoke({"aggregate": []}, {"configurable": {"thread_id": "foo"}})
Adding "A" to []
Adding "B" to ['A']
Adding "C" to ['A']
Adding "D" to ['A', 'B', 'C']
In the above example, nodes "b" and "c" are executed concurrently in the same superstep. Because they are in the same step, node "d" executes after both "b" and "c" are finished.Importantly, updates from a parallel superstep may not be ordered consistently. If you need a consistent, predetermined ordering of updates from a parallel superstep, you should write the outputs to a separate field in the state together with a value with which to order them.
LangGraph executes nodes within supersteps, meaning that while parallel branches are executed in parallel, the entire superstep is transactional. If any of these branches raises an exception, none of the updates are applied to the state (the entire superstep errors).Importantly, when using a checkpointer, results from successful nodes within a superstep are saved, and don’t repeat when resumed.If you have error-prone (perhaps want to handle flakey API calls), LangGraph provides two ways to address this:
  1. You can write regular python code within your node to catch and handle exceptions.
  2. You can set a retry_policy to direct the graph to retry nodes that raise certain types of exceptions. Only failing branches are retried, so you needn’t worry about performing redundant work.
Together, these let you perform parallel execution and fully control exception handling.
Set max concurrency You can control the maximum number of concurrent tasks by setting max_concurrency in the configuration when invoking the graph.
graph.invoke({"value_1": "c"}, {"configurable": {"max_concurrency": 10}})

延迟节点执行

当您希望延迟节点的执行直到所有其他待处理任务完成时,延迟节点执行很有用。这在分支具有不同长度时特别相关,这在 map-reduce 流等工作流中很常见。 The above example showed how to fan-out and fan-in when each path was only one step. But what if one branch had more than one step? Let’s add a node "b_2" in the "b" branch:
import operator
from typing import Annotated, Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    # The operator.add reducer fn makes this append-only
    aggregate: Annotated[list, operator.add]

def a(state: State):
    print(f'Adding "A" to {state["aggregate"]}')
    return {"aggregate": ["A"]}

def b(state: State):
    print(f'Adding "B" to {state["aggregate"]}')
    return {"aggregate": ["B"]}

def b_2(state: State):
    print(f'Adding "B_2" to {state["aggregate"]}')
    return {"aggregate": ["B_2"]}

def c(state: State):
    print(f'Adding "C" to {state["aggregate"]}')
    return {"aggregate": ["C"]}

def d(state: State):
    print(f'Adding "D" to {state["aggregate"]}')
    return {"aggregate": ["D"]}

builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(b_2)
builder.add_node(c)
builder.add_node(d, defer=True)  
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b_2")
builder.add_edge("b_2", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))
Deferred execution graph
graph.invoke({"aggregate": []})
Adding "A" to []
Adding "B" to ['A']
Adding "C" to ['A']
Adding "B_2" to ['A', 'B', 'C']
Adding "D" to ['A', 'B', 'C', 'B_2']
在上面的示例中,节点 "b""c" 在同一超级步骤中并发执行。我们在节点 d 上设置 defer=True,因此它不会执行,直到所有待处理的任务完成。在这种情况下,这意味着 "d" 等待执行,直到整个 "b" 分支完成。

条件分支

如果您的扇出应该根据状态在运行时变化,您可以使用 add_conditional_edges 使用图状态选择一个或多个路径。请参阅下面的示例,其中节点 a 生成确定下一个节点的状态更新。
import operator
from typing import Annotated, Literal, Sequence
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    aggregate: Annotated[list, operator.add]
    # Add a key to the state. We will set this key to determine
    # how we branch.
    which: str

def a(state: State):
    print(f'Adding "A" to {state["aggregate"]}')
    return {"aggregate": ["A"], "which": "c"}  

def b(state: State):
    print(f'Adding "B" to {state["aggregate"]}')
    return {"aggregate": ["B"]}

def c(state: State):
    print(f'Adding "C" to {state["aggregate"]}')
    return {"aggregate": ["C"]}

builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_edge(START, "a")
builder.add_edge("b", END)
builder.add_edge("c", END)

def conditional_edge(state: State) -> Literal["b", "c"]:
    # Fill in arbitrary logic here that uses the state
    # to determine the next node
    return state["which"]

builder.add_conditional_edges("a", conditional_edge)  

graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))
Conditional branching graph
result = graph.invoke({"aggregate": []})
print(result)
Adding "A" to []
Adding "C" to ['A']
{'aggregate': ['A', 'C'], 'which': 'c'}
Your conditional edges can route to multiple destination nodes. For example:
def route_bc_or_cd(state: State) -> Sequence[str]:
    if state["which"] == "cd":
        return ["c", "d"]
    return ["b", "c"]

Map-Reduce 和 Send API

LangGraph 支持使用 Send API 进行 map-reduce 和其他高级分支模式。以下是如何使用它的示例:
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
from typing_extensions import TypedDict, Annotated
import operator

class OverallState(TypedDict):
    topic: str
    subjects: list[str]
    jokes: Annotated[list[str], operator.add]
    best_selected_joke: str

def generate_topics(state: OverallState):
    return {"subjects": ["lions", "elephants", "penguins"]}

def generate_joke(state: OverallState):
    joke_map = {
        "lions": "Why don't lions like fast food? Because they can't catch it!",
        "elephants": "Why don't elephants use computers? They're afraid of the mouse!",
        "penguins": "Why don't penguins like talking to strangers at parties? Because they find it hard to break the ice."
    }
    return {"jokes": [joke_map[state["subject"]]]}

def continue_to_jokes(state: OverallState):
    return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]

def best_joke(state: OverallState):
    return {"best_selected_joke": "penguins"}

builder = StateGraph(OverallState)
builder.add_node("generate_topics", generate_topics)
builder.add_node("generate_joke", generate_joke)
builder.add_node("best_joke", best_joke)
builder.add_edge(START, "generate_topics")
builder.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])
builder.add_edge("generate_joke", "best_joke")
builder.add_edge("best_joke", END)
graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))
Map-reduce graph with fanout
# Call the graph: here we call it to generate a list of jokes
for step in graph.stream({"topic": "animals"}):
    print(step)
{'generate_topics': {'subjects': ['lions', 'elephants', 'penguins']}}
{'generate_joke': {'jokes': ["Why don't lions like fast food? Because they can't catch it!"]}}
{'generate_joke': {'jokes': ["Why don't elephants use computers? They're afraid of the mouse!"]}}
{'generate_joke': {'jokes': ['Why don't penguins like talking to strangers at parties? Because they find it hard to break the ice.']}}
{'best_joke': {'best_selected_joke': 'penguins'}}

创建和控制循环

创建带循环的图时,我们需要一种终止执行的机制。最常见的方法是通过添加条件边,一旦达到某个终止条件,该边就会路由到 END 节点。 您还可以在调用或流式传输图时设置图递归限制。递归限制设置图在引发错误之前允许执行的超级步骤数。有关递归限制概念的更多信息,请参阅此处 让我们考虑一个带循环的简单图,以更好地理解这些机制的工作原理。
To return the last value of your state instead of receiving a recursion limit error, see the next section.
When creating a loop, you can include a conditional edge that specifies a termination condition:
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)

def route(state: State) -> Literal["b", END]:
    if termination_condition(state):
        return END
    else:
        return "b"

builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "a")
graph = builder.compile()
To control the recursion limit, specify "recursionLimit" in the config. This will raise a GraphRecursionError, which you can catch and handle:
from langgraph.errors import GraphRecursionError

try:
    graph.invoke(inputs, {"recursion_limit": 3})
except GraphRecursionError:
    print("Recursion Error")
让我们定义一个带有简单循环的图。请注意,我们使用条件边来实现终止条件。
import operator
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    # 使用 operator.add 归约器函数,使该字段仅追加
    aggregate: Annotated[list, operator.add]

def a(state: State):
    print(f'节点 A 看到 {state["aggregate"]}')
    return {"aggregate": ["A"]}

def b(state: State):
    print(f'节点 B 看到 {state["aggregate"]}')
    return {"aggregate": ["B"]}

# 定义节点
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)

# 定义边
def route(state: State) -> Literal["b", END]:
    if len(state["aggregate"]) < 7:
        return "b"
    else:
        return END

builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "a")
graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))
Simple loop graph This architecture is similar to a ReAct agent in which node "a" is a tool-calling model, and node "b" represents the tools. In our route conditional edge, we specify that we should end after the "aggregate" list in the state passes a threshold length. Invoking the graph, we see that we alternate between nodes "a" and "b" before terminating once we reach the termination condition.
graph.invoke({"aggregate": []})
Node A sees []
Node B sees ['A']
Node A sees ['A', 'B']
Node B sees ['A', 'B', 'A']
Node A sees ['A', 'B', 'A', 'B']
Node B sees ['A', 'B', 'A', 'B', 'A']
Node A sees ['A', 'B', 'A', 'B', 'A', 'B']

施加递归限制

在某些应用程序中,我们可能无法保证会达到给定的终止条件。在这些情况下,我们可以设置图的递归限制。这将在给定数量的超级步骤后引发 GraphRecursionError。然后我们可以捕获并处理此异常:
from langgraph.errors import GraphRecursionError

try:
    graph.invoke({"aggregate": []}, {"recursion_limit": 4})
except GraphRecursionError:
    print("Recursion Error")
Node A sees []
Node B sees ['A']
Node C sees ['A', 'B']
Node D sees ['A', 'B']
Node A sees ['A', 'B', 'C', 'D']
Recursion Error
Instead of raising GraphRecursionError, we can introduce a new key to the state that keeps track of the number of steps remaining until reaching the recursion limit. We can then use this key to determine if we should end the run.LangGraph implements a special RemainingSteps annotation. Under the hood, it creates a ManagedValue channel — a state channel that will exist for the duration of our graph run and no longer.
import operator
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.managed.is_last_step import RemainingSteps

class State(TypedDict):
    aggregate: Annotated[list, operator.add]
    remaining_steps: RemainingSteps

def a(state: State):
    print(f'Node A sees {state["aggregate"]}')
    return {"aggregate": ["A"]}

def b(state: State):
    print(f'Node B sees {state["aggregate"]}')
    return {"aggregate": ["B"]}

# Define nodes
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)

# Define edges
def route(state: State) -> Literal["b", END]:
    if state["remaining_steps"] <= 2:
        return END
    else:
        return "b"

builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "a")
graph = builder.compile()

# Test it out
result = graph.invoke({"aggregate": []}, {"recursion_limit": 4})
print(result)
Node A sees []
Node B sees ['A']
Node A sees ['A', 'B']
{'aggregate': ['A', 'B', 'A']}
To better understand how the recursion limit works, let’s consider a more complex example. Below we implement a loop, but one step fans out into two nodes:
import operator
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    aggregate: Annotated[list, operator.add]

def a(state: State):
    print(f'Node A sees {state["aggregate"]}')
    return {"aggregate": ["A"]}

def b(state: State):
    print(f'Node B sees {state["aggregate"]}')
    return {"aggregate": ["B"]}

def c(state: State):
    print(f'Node C sees {state["aggregate"]}')
    return {"aggregate": ["C"]}

def d(state: State):
    print(f'Node D sees {state["aggregate"]}')
    return {"aggregate": ["D"]}

# Define nodes
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_node(d)

# Define edges
def route(state: State) -> Literal["b", END]:
    if len(state["aggregate"]) < 7:
        return "b"
    else:
        return END

builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "c")
builder.add_edge("b", "d")
builder.add_edge(["c", "d"], "a")
graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))
Complex loop graph with branchesThis graph looks complex, but can be conceptualized as loop of supersteps:
  1. Node A
  2. Node B
  3. Nodes C and D
  4. Node A
We have a loop of four supersteps, where nodes C and D are executed concurrently.Invoking the graph as before, we see that we complete two full “laps” before hitting the termination condition:
result = graph.invoke({"aggregate": []})
Node A sees []
Node B sees ['A']
Node D sees ['A', 'B']
Node C sees ['A', 'B']
Node A sees ['A', 'B', 'C', 'D']
Node B sees ['A', 'B', 'C', 'D', 'A']
Node D sees ['A', 'B', 'C', 'D', 'A', 'B']
Node C sees ['A', 'B', 'C', 'D', 'A', 'B']
Node A sees ['A', 'B', 'C', 'D', 'A', 'B', 'C', 'D']
However, if we set the recursion limit to four, we only complete one lap because each lap is four supersteps:
from langgraph.errors import GraphRecursionError

try:
    result = graph.invoke({"aggregate": []}, {"recursion_limit": 4})
except GraphRecursionError:
    print("Recursion Error")
Node A sees []
Node B sees ['A']
Node C sees ['A', 'B']
Node D sees ['A', 'B']
Node A sees ['A', 'B', 'C', 'D']
Recursion Error

异步

在并发运行 IO 密集型代码时(例如,向聊天模型提供商发出并发 API 请求),使用异步编程范式可以显著提高性能。 要将图的 sync 实现转换为 async 实现,您需要:
  1. 更新 nodes 使用 async def 而不是 def
  2. 更新内部代码以适当使用 await
  3. 根据需要使用 .ainvoke.astream 调用图。
Because many LangChain objects implement the Runnable Protocol which has async variants of all the sync methods it’s typically fairly quick to upgrade a sync graph to an async graph. See example below. To demonstrate async invocations of underlying LLMs, we will include a chat model:
  • OpenAI
  • Anthropic
  • Azure
  • Google Gemini
  • AWS Bedrock
👉 Read the OpenAI chat model integration docs
pip install -U "langchain[openai]"
import os
from langchain.chat_models import init_chat_model

os.environ["OPENAI_API_KEY"] = "sk-..."

model = init_chat_model("gpt-4.1")
from langchain.chat_models import init_chat_model
from langgraph.graph import MessagesState, StateGraph

async def node(state: MessagesState):  
    new_message = await llm.ainvoke(state["messages"])  
    return {"messages": [new_message]}

builder = StateGraph(MessagesState).add_node(node).set_entry_point("node")
graph = builder.compile()

input_message = {"role": "user", "content": "Hello"}
result = await graph.ainvoke({"messages": [input_message]})  
Async streaming See the streaming guide for examples of streaming with async.

使用 Command 组合控制流和状态更新

组合控制流(边)和状态更新(节点)可能很有用。例如,您可能希望在同一个节点同时执行状态更新决定下一步要转到哪个节点。LangGraph 通过从节点函数返回 Command 对象来提供这样做的方法:
def my_node(state: State) -> Command[Literal["my_other_node"]]:
    return Command(
        # state update
        update={"foo": "bar"},
        # control flow
        goto="my_other_node"
    )
We show an end-to-end example below. Let’s create a simple graph with 3 nodes: A, B and C. We will first execute node A, and then decide whether to go to Node B or Node C next based on the output of node A.
import random
from typing_extensions import TypedDict, Literal
from langgraph.graph import StateGraph, START
from langgraph.types import Command

# Define graph state
class State(TypedDict):
    foo: str

# Define the nodes

def node_a(state: State) -> Command[Literal["node_b", "node_c"]]:
    print("Called A")
    value = random.choice(["b", "c"])
    # this is a replacement for a conditional edge function
    if value == "b":
        goto = "node_b"
    else:
        goto = "node_c"

    # 请注意 Command 如何允许您同时更新图状态并路由到下一个节点
    return Command(
        # this is the state update
        update={"foo": value},
        # this is a replacement for an edge
        goto=goto,
    )

def node_b(state: State):
    print("Called B")
    return {"foo": state["foo"] + "b"}

def node_c(state: State):
    print("Called C")
    return {"foo": state["foo"] + "c"}
We can now create the StateGraph with the above nodes. Notice that the graph doesn’t have conditional edges for routing! This is because control flow is defined with Command inside node_a.
builder = StateGraph(State)
builder.add_edge(START, "node_a")
builder.add_node(node_a)
builder.add_node(node_b)
builder.add_node(node_c)
# 注意:节点 A、B 和 C 之间没有边!

graph = builder.compile()
You might have noticed that we used Command as a return type annotation, e.g. Command[Literal["node_b", "node_c"]]. This is necessary for the graph rendering and tells LangGraph that node_a can navigate to node_b and node_c.
from IPython.display import display, Image

display(Image(graph.get_graph().draw_mermaid_png()))
Command-based graph navigation If we run the graph multiple times, we’d see it take different paths (A -> B or A -> C) based on the random choice in node A.
graph.invoke({"foo": ""})
Called A
Called C

导航到父图中的节点

如果您正在使用子图,您可能希望从子图内的节点导航到不同的子图(即父图中的不同节点)。为此,您可以在 Command 中指定 graph=Command.PARENT
def my_node(state: State) -> Command[Literal["my_other_node"]]:
    return Command(
        update={"foo": "bar"},
        goto="other_subgraph",  # where `other_subgraph` is a node in the parent graph
        graph=Command.PARENT
    )
让我们使用上面的示例来演示这一点。我们将通过将上面示例中的 nodeA 更改为单节点图来实现这一点,我们将把它作为子图添加到父图中。
使用 Command.PARENT 的状态更新 当您从子图节点向父图节点发送对于父图和子图状态模式共享的键的更新时,您必须在父图状态中为您正在更新的键定义一个 reducer。请参阅下面的示例。
import operator
from typing_extensions import Annotated

class State(TypedDict):
    # 注意:我们在这里定义一个 reducer
    foo: Annotated[str, operator.add]  

def node_a(state: State):
    print("Called A")
    value = random.choice(["a", "b"])
    # this is a replacement for a conditional edge function
    if value == "a":
        goto = "node_b"
    else:
        goto = "node_c"

    # 请注意 Command 如何允许您同时更新图状态并路由到下一个节点
    return Command(
        update={"foo": value},
        goto=goto,
        # this tells LangGraph to navigate to node_b or node_c in the parent graph
        # 注意:这将导航到相对于子图最近的父图
        graph=Command.PARENT,  
    )

subgraph = StateGraph(State).add_node(node_a).add_edge(START, "node_a").compile()

def node_b(state: State):
    print("Called B")
    # 注意:由于我们已经定义了一个 reducer,我们不需要手动将新字符追加到现有的 'foo' 值。相反,reducer 会自动追加这些(通过 operator.add)
    return {"foo": "b"}  

def node_c(state: State):
    print("Called C")
    return {"foo": "c"}  

builder = StateGraph(State)
builder.add_edge(START, "subgraph")
builder.add_node("subgraph", subgraph)
builder.add_node(node_b)
builder.add_node(node_c)

graph = builder.compile()
graph.invoke({"foo": ""})
Called A
Called C

在工具内使用

一个常见的用例是从工具内部更新图状态。例如,在客户支持应用程序中,您可能希望在对话开始时根据客户的帐号或 ID 查找客户信息。要从工具更新图状态,您可以从工具返回 Command(update={"my_custom_key": "foo", "messages": [...]})
@tool
def lookup_user_info(tool_call_id: Annotated[str, InjectedToolCallId], config: RunnableConfig):
    """Use this to look up user information to better assist them with their questions."""
    user_info = get_user_info(config.get("configurable", {}).get("user_id"))
    return Command(
        update={
            # update the state keys
            "user_info": user_info,
            # update the message history
            "messages": [ToolMessage("Successfully looked up user information", tool_call_id=tool_call_id)]
        }
    )
You MUST include messages (or any state key used for the message history) in Command.update when returning Command from a tool and the list of messages in messages MUST contain a ToolMessage. This is necessary for the resulting message history to be valid (LLM providers require AI messages with tool calls to be followed by the tool result messages).
If you are using tools that update state via Command, we recommend using prebuilt ToolNode which automatically handles tools returning Command objects and propagates them to the graph state. If you’re writing a custom node that calls tools, you would need to manually propagate Command objects returned by the tools as the update from the node.

可视化您的图

这里我们演示如何可视化您创建的图。 您可以可视化任何任意 Graph,包括 StateGraph Let’s have some fun by drawing fractals :).
import random
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages

class State(TypedDict):
    messages: Annotated[list, add_messages]

class MyNode:
    def __init__(self, name: str):
        self.name = name
    def __call__(self, state: State):
        return {"messages": [("assistant", f"Called node {self.name}")]}

def route(state) -> Literal["entry_node", END]:
    if len(state["messages"]) > 10:
        return END
    return "entry_node"

def add_fractal_nodes(builder, current_node, level, max_level):
    if level > max_level:
        return
    # Number of nodes to create at this level
    num_nodes = random.randint(1, 3)  # Adjust randomness as needed
    for i in range(num_nodes):
        nm = ["A", "B", "C"][i]
        node_name = f"node_{current_node}_{nm}"
        builder.add_node(node_name, MyNode(node_name))
        builder.add_edge(current_node, node_name)
        # Recursively add more nodes
        r = random.random()
        if r > 0.2 and level + 1 < max_level:
            add_fractal_nodes(builder, node_name, level + 1, max_level)
        elif r > 0.05:
            builder.add_conditional_edges(node_name, route, node_name)
        else:
            # End
            builder.add_edge(node_name, END)

def build_fractal_graph(max_level: int):
    builder = StateGraph(State)
    entry_point = "entry_node"
    builder.add_node(entry_point, MyNode(entry_point))
    builder.add_edge(START, entry_point)
    add_fractal_nodes(builder, entry_point, 1, max_level)
    # Optional: set a finish point if required
    builder.add_edge(entry_point, END)  # or any specific node
    return builder.compile()

app = build_fractal_graph(3)

Mermaid

我们还可以将图类转换为 Mermaid 语法。
print(app.get_graph().draw_mermaid())
%%{init: {'flowchart': {'curve': 'linear'}}}%%
graph TD;
    tart__([<p>__start__</p>]):::first
    ry_node(entry_node)
    e_entry_node_A(node_entry_node_A)
    e_entry_node_B(node_entry_node_B)
    e_node_entry_node_B_A(node_node_entry_node_B_A)
    e_node_entry_node_B_B(node_node_entry_node_B_B)
    e_node_entry_node_B_C(node_node_entry_node_B_C)
    nd__([<p>__end__</p>]):::last
    tart__ --> entry_node;
    ry_node --> __end__;
    ry_node --> node_entry_node_A;
    ry_node --> node_entry_node_B;
    e_entry_node_B --> node_node_entry_node_B_A;
    e_entry_node_B --> node_node_entry_node_B_B;
    e_entry_node_B --> node_node_entry_node_B_C;
    e_entry_node_A -.-> entry_node;
    e_entry_node_A -.-> __end__;
    e_node_entry_node_B_A -.-> entry_node;
    e_node_entry_node_B_A -.-> __end__;
    e_node_entry_node_B_B -.-> entry_node;
    e_node_entry_node_B_B -.-> __end__;
    e_node_entry_node_B_C -.-> entry_node;
    e_node_entry_node_B_C -.-> __end__;
    ssDef default fill:#f2f0ff,line-height:1.2
    ssDef first fill-opacity:0
    ssDef last fill:#bfb6fc

PNG

如果愿意,我们可以将图渲染为 .png。这里我们可以使用三个选项:
  • Using Mermaid.ink API (does not require additional packages)
  • Using Mermaid + Pyppeteer (requires pip install pyppeteer)
  • Using graphviz (which requires pip install graphviz)
Using Mermaid.Ink By default, draw_mermaid_png() uses Mermaid.Ink’s API to generate the diagram.
from IPython.display import Image, display
from langchain_core.runnables.graph import CurveStyle, MermaidDrawMethod, NodeStyles

display(Image(app.get_graph().draw_mermaid_png()))
Fractal graph visualization Using Mermaid + Pyppeteer
import nest_asyncio

nest_asyncio.apply()  # Required for Jupyter Notebook to run async functions

display(
    Image(
        app.get_graph().draw_mermaid_png(
            curve_style=CurveStyle.LINEAR,
            node_colors=NodeStyles(first="#ffdfba", last="#baffc9", default="#fad7de"),
            wrap_label_n_words=9,
            output_file_path=None,
            draw_method=MermaidDrawMethod.PYPPETEER,
            background_color="white",
            padding=10,
        )
    )
)
Using Graphviz
try:
    display(Image(app.get_graph().draw_png()))
except ImportError:
    print(
        "You likely need to install dependencies for pygraphviz, see more here https://github.com/pygraphviz/pygraphviz/blob/main/INSTALL.txt"
    )

Connect these docs programmatically to Claude, VSCode, and more via MCP for real-time answers.