LangGraph 具有通过检查点器实现的内置持久层。当您使用检查点器编译图时,检查点器会在每个超级步骤保存图状态的 checkpoint(检查点)。这些检查点保存到 thread(线程),可在图执行后访问。由于 threads 允许在执行后访问图的状态,因此可以实现多个强大的功能,包括人在回路、内存、时间旅行和容错。下面,我们将更详细地讨论这些概念。 检查点
LangGraph API 自动处理检查点 使用 LangGraph API 时,您无需手动实现或配置检查点器。API 会在后台为您处理所有持久化基础设施。

线程

线程是分配给检查点器保存的每个检查点的唯一 ID 或线程标识符。它包含一系列运行的累积状态。当执行运行时,助手底层图的状态将持久化到线程。 使用检查点器调用图时,您必须在配置的 configurable 部分指定 thread_id
{"configurable": {"thread_id": "1"}}
可以检索线程的当前和历史状态。要持久化状态,必须在执行运行之前创建线程。LangSmith API 提供了几个端点用于创建和管理线程和线程状态。有关更多详细信息,请参阅 API 参考

检查点

线程在特定时间点的状态称为检查点。检查点是在每个超级步骤保存的图状态的快照,由具有以下关键属性的 StateSnapshot 对象表示:
  • config:与此检查点关联的配置。
  • metadata:与此检查点关联的元数据。
  • values:此时间点状态通道的值。
  • next:图中下一个要执行的节点名称的元组。
  • tasks:包含下一个要执行的任务信息的 PregelTask 对象的元组。如果先前尝试过该步骤,它将包括错误信息。如果图从节点内动态中断,任务将包含与中断关联的其他数据。
检查点被持久化,可用于稍后恢复线程的状态。 让我们看看当简单图按如下方式调用时保存了哪些检查点:
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from langchain_core.runnables import RunnableConfig
from typing import Annotated
from typing_extensions import TypedDict
from operator import add

class State(TypedDict):
    foo: str
    bar: Annotated[list[str], add]

def node_a(state: State):
    return {"foo": "a", "bar": ["a"]}

def node_b(state: State):
    return {"foo": "b", "bar": ["b"]}


workflow = StateGraph(State)
workflow.add_node(node_a)
workflow.add_node(node_b)
workflow.add_edge(START, "node_a")
workflow.add_edge("node_a", "node_b")
workflow.add_edge("node_b", END)

checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)

config: RunnableConfig = {"configurable": {"thread_id": "1"}}
graph.invoke({"foo": ""}, config)
After we run the graph, we expect to see exactly 4 checkpoints:
  • Empty checkpoint with START as the next node to be executed
  • Checkpoint with the user input {'foo': '', 'bar': []} and node_a as the next node to be executed
  • Checkpoint with the outputs of node_a {'foo': 'a', 'bar': ['a']} and node_b as the next node to be executed
  • Checkpoint with the outputs of node_b {'foo': 'b', 'bar': ['a', 'b']} and no next nodes to be executed
请注意,bar 通道值包含来自两个节点的输出,因为我们为 bar 通道定义了一个 reducer。

Get state

与保存的图状态交互时,您必须指定线程标识符。您可以通过调用 graph.get_state(config) 查看图的_最新_状态。这将返回一个 StateSnapshot 对象,该对象对应于与配置中提供的线程 ID 关联的最新检查点,或者与线程的检查点 ID 关联的检查点(如果提供)。
# get the latest state snapshot
config = {"configurable": {"thread_id": "1"}}
graph.get_state(config)

# get a state snapshot for a specific checkpoint_id
config = {"configurable": {"thread_id": "1", "checkpoint_id": "1ef663ba-28fe-6528-8002-5a559208592c"}}
graph.get_state(config)
在我们的示例中,get_state 的输出将如下所示:
StateSnapshot(
    values={'foo': 'b', 'bar': ['a', 'b']},
    next=(),
    config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}},
    metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2},
    created_at='2024-08-29T19:19:38.821749+00:00',
    parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}}, tasks=()
)

Get state history

您可以通过调用 graph.get_state_history(config) 获取给定线程的图执行完整历史。这将返回与配置中提供的线程 ID 关联的 StateSnapshot 对象列表。重要的是,检查点将按时间顺序排列,最新的检查点 / StateSnapshot 位于列表的第一位。
config = {"configurable": {"thread_id": "1"}}
list(graph.get_state_history(config))
在我们的示例中,get_state_history 的输出将如下所示:
[
    StateSnapshot(
        values={'foo': 'b', 'bar': ['a', 'b']},
        next=(),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}},
        metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2},
        created_at='2024-08-29T19:19:38.821749+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},
        tasks=(),
    ),
    StateSnapshot(
        values={'foo': 'a', 'bar': ['a']},
        next=('node_b',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},
        metadata={'source': 'loop', 'writes': {'node_a': {'foo': 'a', 'bar': ['a']}}, 'step': 1},
        created_at='2024-08-29T19:19:38.819946+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
        tasks=(PregelTask(id='6fb7314f-f114-5413-a1f3-d37dfe98ff44', name='node_b', error=None, interrupts=()),),
    ),
    StateSnapshot(
        values={'foo': '', 'bar': []},
        next=('node_a',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
        metadata={'source': 'loop', 'writes': None, 'step': 0},
        created_at='2024-08-29T19:19:38.817813+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},
        tasks=(PregelTask(id='f1b14528-5ee5-579c-949b-23ef9bfbed58', name='node_a', error=None, interrupts=()),),
    ),
    StateSnapshot(
        values={'bar': []},
        next=('__start__',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},
        metadata={'source': 'input', 'writes': {'foo': ''}, 'step': -1},
        created_at='2024-08-29T19:19:38.816205+00:00',
        parent_config=None,
        tasks=(PregelTask(id='6d27aa2e-d72b-5504-a36f-8620e54a76dd', name='__start__', error=None, interrupts=()),),
    )
]
State

重放

也可以回放先前的图执行。如果我们使用 thread_idcheckpoint_id 调用图,那么我们将_重放_对应于 checkpoint_id 的检查点_之前_先前执行的步骤,并且只执行检查点_之后_的步骤。
  • thread_id 是线程的 ID。
  • checkpoint_id 是指向线程内特定检查点的标识符。
您必须在调用图时将这些作为配置的 configurable 部分传递:
config = {"configurable": {"thread_id": "1", "checkpoint_id": "0c62ca34-ac19-445d-bbb0-5b4984975b2a"}}
graph.invoke(None, config=config)
重要的是,LangGraph 知道某个特定步骤是否先前已执行。如果已执行,LangGraph 只是_重放_图中的该特定步骤,而不重新执行该步骤,但仅针对提供的 checkpoint_id_之前_的步骤。checkpoint_id_之后_的所有步骤都将被执行(即,新的分叉),即使它们先前已执行过。请参阅此关于时间旅行的操作指南以了解更多关于重放的信息 Replay

更新状态

除了从特定 checkpoints 重放图之外,我们还可以_编辑_图状态。我们使用 update_state 来执行此操作。此方法接受三个不同的参数:

config

The config should contain thread_id specifying which thread to update. When only the thread_id is passed, we update (or fork) the current state. Optionally, if we include checkpoint_id field, then we fork that selected checkpoint.

values

These are the values that will be used to update the state. Note that this update is treated exactly as any update from a node is treated. This means that these values will be passed to the reducer functions, if they are defined for some of the channels in the graph state. This means that update_state does NOT automatically overwrite the channel values for every channel, but only for the channels without reducers. Let’s walk through an example. Let’s assume you have defined the state of your graph with the following schema (see full example above):
from typing import Annotated
from typing_extensions import TypedDict
from operator import add

class State(TypedDict):
    foo: int
    bar: Annotated[list[str], add]
Let’s now assume the current state of the graph is
{"foo": 1, "bar": ["a"]}
If you update the state as below:
graph.update_state(config, {"foo": 2, "bar": ["b"]})
Then the new state of the graph will be:
{"foo": 2, "bar": ["a", "b"]}
The foo key (channel) is completely changed (because there is no reducer specified for that channel, so update_state overwrites it). However, there is a reducer specified for the bar key, and so it appends "b" to the state of bar.

as_node

The final thing you can optionally specify when calling update_state is as_node. If you provided it, the update will be applied as if it came from node as_node. If as_node is not provided, it will be set to the last node that updated the state, if not ambiguous. The reason this matters is that the next steps to execute depend on the last node to have given an update, so this can be used to control which node executes next. See this how to guide on time-travel to learn more about forking state. Update

Memory Store

Model of shared state A state schema specifies a set of keys that are populated as a graph is executed. As discussed above, state can be written by a checkpointer to a thread at each graph step, enabling state persistence. 但是,如果我们想要_跨线程_保留一些信息怎么办?考虑聊天机器人的情况,我们希望在与该用户的所有聊天对话(例如,线程)中保留有关用户的特定信息! 仅使用检查点器,我们无法跨线程共享信息。这促使需要 Store 接口。作为说明,我们可以定义一个 InMemoryStore 来跨线程存储有关用户的信息。我们只需像以前一样使用检查点器和新的 in_memory_store 变量编译我们的图。
LangGraph API handles stores automatically When using the LangGraph API, you don’t need to implement or configure stores manually. The API handles all storage infrastructure for you behind the scenes.

基本用法

首先,让我们在不使用 LangGraph 的情况下单独展示这一点。
from langgraph.store.memory import InMemoryStore
in_memory_store = InMemoryStore()
记忆由 tuple 命名空间化,在这个特定示例中将是 (<user_id>, "memories")。命名空间可以是任何长度并代表任何内容,不必是用户特定的。
user_id = "1"
namespace_for_memory = (user_id, "memories")
我们使用 store.put 方法将记忆保存到存储中的命名空间。当我们这样做时,我们指定命名空间(如上所述)和记忆的键值对:键只是记忆的唯一标识符(memory_id),值(字典)是记忆本身。
memory_id = str(uuid.uuid4())
memory = {"food_preference" : "I like pizza"}
in_memory_store.put(namespace_for_memory, memory_id, memory)
我们可以使用 store.search 方法读取命名空间中的记忆,该方法将返回给定用户的所有记忆作为列表。最新的记忆是列表中的最后一个。
memories = in_memory_store.search(namespace_for_memory)
memories[-1].dict()
{'value': {'food_preference': 'I like pizza'},
 'key': '07e0caf4-1631-47b7-b15f-65515d4c1843',
 'namespace': ['1', 'memories'],
 'created_at': '2024-10-02T17:22:31.590602+00:00',
 'updated_at': '2024-10-02T17:22:31.590605+00:00'}
每种记忆类型都是一个具有某些属性的 Python 类(Item)。我们可以通过如上所示的 .dict 转换将其作为字典访问。 它具有的属性是:
  • value:此记忆的值(本身是一个字典)
  • key:此命名空间中此记忆的唯一键
  • namespace:字符串列表,此记忆类型的命名空间
  • created_at:创建此记忆时的时间戳
  • updated_at:更新此记忆时的时间戳
Beyond simple retrieval, the store also supports semantic search, allowing you to find memories based on meaning rather than exact matches. To enable this, configure the store with an embedding model:
from langchain.embeddings import init_embeddings

store = InMemoryStore(
    index={
        "embed": init_embeddings("openai:text-embedding-3-small"),  # Embedding provider
        "dims": 1536,                              # Embedding dimensions
        "fields": ["food_preference", "$"]              # Fields to embed
    }
)
Now when searching, you can use natural language queries to find relevant memories:
# Find memories about food preferences
# (This can be done after putting memories into the store)
memories = store.search(
    namespace_for_memory,
    query="What does the user like to eat?",
    limit=3  # Return top 3 matches
)
You can control which parts of your memories get embedded by configuring the fields parameter or by specifying the index parameter when storing memories:
# Store with specific fields to embed
store.put(
    namespace_for_memory,
    str(uuid.uuid4()),
    {
        "food_preference": "I love Italian cuisine",
        "context": "Discussing dinner plans"
    },
    index=["food_preference"]  # Only embed "food_preferences" field
)

# Store without embedding (still retrievable, but not searchable)
store.put(
    namespace_for_memory,
    str(uuid.uuid4()),
    {"system_info": "Last updated: 2024-01-01"},
    index=False
)

Using in LangGraph

有了这一切,我们在 LangGraph 中使用 in_memory_storein_memory_store 与检查点器协同工作:检查点器将状态保存到线程,如上所述,in_memory_store 允许我们存储任意信息以_跨_线程访问。我们使用检查点器和 in_memory_store 编译图,如下所示。
from langgraph.checkpoint.memory import InMemorySaver

# We need this because we want to enable threads (conversations)
checkpointer = InMemorySaver()

# ... Define the graph ...

# Compile the graph with the checkpointer and store
graph = graph.compile(checkpointer=checkpointer, store=in_memory_store)
我们像以前一样使用 thread_id 调用图,还使用 user_id,我们将使用它来将我们的记忆命名空间化到此特定用户,如上所示。
# Invoke the graph
user_id = "1"
config = {"configurable": {"thread_id": "1", "user_id": user_id}}

# First let's just say hi to the AI
for update in graph.stream(
    {"messages": [{"role": "user", "content": "hi"}]}, config, stream_mode="updates"
):
    print(update)
我们可以在_任何节点_中通过将 store: BaseStoreconfig: RunnableConfig 作为节点参数来访问 in_memory_storeuser_id。以下是我们如何在节点中使用语义搜索来查找相关记忆:
def update_memory(state: MessagesState, config: RunnableConfig, *, store: BaseStore):

    # Get the user id from the config
    user_id = config["configurable"]["user_id"]

    # Namespace the memory
    namespace = (user_id, "memories")

    # ... Analyze conversation and create a new memory

    # Create a new memory ID
    memory_id = str(uuid.uuid4())

    # We create a new memory
    store.put(namespace, memory_id, {"memory": memory})

As we showed above, we can also access the store in any node and use the store.search method to get memories. Recall the memories are returned as a list of objects that can be converted to a dictionary.
memories[-1].dict()
{'value': {'food_preference': 'I like pizza'},
 'key': '07e0caf4-1631-47b7-b15f-65515d4c1843',
 'namespace': ['1', 'memories'],
 'created_at': '2024-10-02T17:22:31.590602+00:00',
 'updated_at': '2024-10-02T17:22:31.590605+00:00'}
We can access the memories and use them in our model call.
def call_model(state: MessagesState, config: RunnableConfig, *, store: BaseStore):
    # Get the user id from the config
    user_id = config["configurable"]["user_id"]

    # Namespace the memory
    namespace = (user_id, "memories")

    # Search based on the most recent message
    memories = store.search(
        namespace,
        query=state["messages"][-1].content,
        limit=3
    )
    info = "\n".join([d.value["memory"] for d in memories])

    # ... Use memories in the model call
如果我们创建一个新线程,只要 user_id 相同,我们仍然可以访问相同的记忆。
# Invoke the graph
config = {"configurable": {"thread_id": "2", "user_id": "1"}}

# Let's say hi again
for update in graph.stream(
    {"messages": [{"role": "user", "content": "hi, tell me about my memories"}]}, config, stream_mode="updates"
):
    print(update)
当我们使用 LangSmith 时,无论是本地(例如,在 Studio 中)还是托管在 LangSmith 上,默认情况下都可以使用基础存储,无需在图编译期间指定。但是,要启用语义搜索,您确实需要在 langgraph.json 文件中配置索引设置。例如:
{
    ...
    "store": {
        "index": {
            "embed": "openai:text-embeddings-3-small",
            "dims": 1536,
            "fields": ["$"]
        }
    }
}
有关更多详细信息和配置选项,请参阅部署指南

检查点器库

在底层,检查点由符合 BaseCheckpointSaver 接口的检查点器对象提供支持。LangGraph 提供了几种检查点器实现,所有这些都通过独立的可安装库实现:
  • langgraph-checkpoint: The base interface for checkpointer savers (BaseCheckpointSaver) and serialization/deserialization interface (SerializerProtocol). Includes in-memory checkpointer implementation (InMemorySaver) for experimentation. LangGraph comes with langgraph-checkpoint included.
  • langgraph-checkpoint-sqlite: An implementation of LangGraph checkpointer that uses SQLite database (SqliteSaver / AsyncSqliteSaver). Ideal for experimentation and local workflows. Needs to be installed separately.
  • langgraph-checkpoint-postgres: An advanced checkpointer that uses Postgres database (PostgresSaver / AsyncPostgresSaver), used in LangSmith. Ideal for using in production. Needs to be installed separately.

检查点器接口

每个检查点器都符合 BaseCheckpointSaver 接口并实现以下方法:
  • .put - 存储检查点及其配置和元数据。
  • .put_writes - 存储与检查点链接的中间写入(即待处理写入)。
  • .get_tuple - 使用给定配置(thread_idcheckpoint_id)获取检查点元组。这用于在 graph.get_state() 中填充 StateSnapshot
  • .list - 列出与给定配置和过滤条件匹配的检查点。这用于在 graph.get_state_history() 中填充状态历史
如果检查点器与异步图执行一起使用(即通过 .ainvoke.astream.abatch 执行图),将使用上述方法的异步版本(.aput.aput_writes.aget_tuple.alist)。
要异步运行图,您可以使用 InMemorySaver,或 Sqlite/Postgres 检查点器的异步版本 — AsyncSqliteSaver / AsyncPostgresSaver 检查点器。

序列化器

当检查点器保存图状态时,它们需要序列化状态中的通道值。这是使用序列化器对象完成的。 langgraph_checkpoint 定义了用于实现序列化器的 协议,并提供了一个默认实现(JsonPlusSerializer),该实现处理多种类型,包括 LangChain 和 LangGraph 原语、日期时间、枚举等。

Serialization with pickle

The default serializer, JsonPlusSerializer, uses ormsgpack and JSON under the hood, which is not suitable for all types of objects. If you want to fallback to pickle for objects not currently supported by our msgpack encoder (such as Pandas dataframes), you can use the pickle_fallback argument of the JsonPlusSerializer:
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer

# ... Define the graph ...
graph.compile(
    checkpointer=InMemorySaver(serde=JsonPlusSerializer(pickle_fallback=True))
)

Encryption

Checkpointers can optionally encrypt all persisted state. To enable this, pass an instance of EncryptedSerializer to the serde argument of any BaseCheckpointSaver implementation. The easiest way to create an encrypted serializer is via from_pycryptodome_aes, which reads the AES key from the LANGGRAPH_AES_KEY environment variable (or accepts a key argument):
import sqlite3

from langgraph.checkpoint.serde.encrypted import EncryptedSerializer
from langgraph.checkpoint.sqlite import SqliteSaver

serde = EncryptedSerializer.from_pycryptodome_aes()  # reads LANGGRAPH_AES_KEY
checkpointer = SqliteSaver(sqlite3.connect("checkpoint.db"), serde=serde)
from langgraph.checkpoint.serde.encrypted import EncryptedSerializer
from langgraph.checkpoint.postgres import PostgresSaver

serde = EncryptedSerializer.from_pycryptodome_aes()
checkpointer = PostgresSaver.from_conn_string("postgresql://...", serde=serde)
checkpointer.setup()
在 LangSmith 上运行时,只要存在 LANGGRAPH_AES_KEY,加密就会自动启用,因此您只需要提供环境变量。可以通过实现 CipherProtocol 并将其提供给 EncryptedSerializer 来使用其他加密方案。

功能

人在回路

首先,检查点器通过允许人类检查、中断和批准图步骤来促进人在回路工作流程工作流程。这些工作流程需要检查点器,因为人类必须能够在任何时间点查看图的状态,并且图必须在人类对状态进行任何更新后恢复执行。有关示例,请参阅操作指南

内存

其次,检查点器允许交互之间的”内存”。在重复的人类交互(如对话)的情况下,任何后续消息都可以发送到该线程,该线程将保留其对先前消息的记忆。有关如何使用检查点器添加和管理对话内存的信息,请参阅添加内存

时间旅行

第三,检查点器允许”时间旅行”,允许用户重放先前的图执行以审查和/或调试特定的图步骤。此外,检查点器使得可以在任意检查点分叉图状态以探索替代轨迹。

容错

最后,检查点还提供容错和错误恢复:如果一个或多个节点在给定超级步骤中失败,您可以从最后一个成功的步骤重新启动图。此外,当图节点在给定超级步骤中执行中途失败时,LangGraph 会存储来自在该超级步骤中成功完成的任何其他节点的待处理检查点写入,这样无论何时我们从该超级步骤恢复图执行,我们都不会重新运行成功的节点。

待处理写入

此外,当图节点在给定超级步骤中执行中途失败时,LangGraph 会存储来自在该超级步骤中成功完成的任何其他节点的待处理检查点写入,这样无论何时我们从该超级步骤恢复图执行,我们都不会重新运行成功的节点。
Connect these docs programmatically to Claude, VSCode, and more via MCP for real-time answers.