checkpoint(检查点)。这些检查点保存到 thread(线程),可在图执行后访问。由于 threads 允许在执行后访问图的状态,因此可以实现多个强大的功能,包括人在回路、内存、时间旅行和容错。下面,我们将更详细地讨论这些概念。
LangGraph API 自动处理检查点
使用 LangGraph API 时,您无需手动实现或配置检查点器。API 会在后台为您处理所有持久化基础设施。
线程
线程是分配给检查点器保存的每个检查点的唯一 ID 或线程标识符。它包含一系列运行的累积状态。当执行运行时,助手底层图的状态将持久化到线程。 使用检查点器调用图时,您必须在配置的configurable 部分指定 thread_id。
检查点
线程在特定时间点的状态称为检查点。检查点是在每个超级步骤保存的图状态的快照,由具有以下关键属性的StateSnapshot 对象表示:
config:与此检查点关联的配置。metadata:与此检查点关联的元数据。values:此时间点状态通道的值。next:图中下一个要执行的节点名称的元组。tasks:包含下一个要执行的任务信息的PregelTask对象的元组。如果先前尝试过该步骤,它将包括错误信息。如果图从节点内动态中断,任务将包含与中断关联的其他数据。
- Empty checkpoint with
STARTas the next node to be executed - Checkpoint with the user input
{'foo': '', 'bar': []}andnode_aas the next node to be executed - Checkpoint with the outputs of
node_a{'foo': 'a', 'bar': ['a']}andnode_bas the next node to be executed - Checkpoint with the outputs of
node_b{'foo': 'b', 'bar': ['a', 'b']}and no next nodes to be executed
bar 通道值包含来自两个节点的输出,因为我们为 bar 通道定义了一个 reducer。
Get state
与保存的图状态交互时,您必须指定线程标识符。您可以通过调用graph.get_state(config) 查看图的_最新_状态。这将返回一个 StateSnapshot 对象,该对象对应于与配置中提供的线程 ID 关联的最新检查点,或者与线程的检查点 ID 关联的检查点(如果提供)。
get_state 的输出将如下所示:
Get state history
您可以通过调用graph.get_state_history(config) 获取给定线程的图执行完整历史。这将返回与配置中提供的线程 ID 关联的 StateSnapshot 对象列表。重要的是,检查点将按时间顺序排列,最新的检查点 / StateSnapshot 位于列表的第一位。
get_state_history 的输出将如下所示:
重放
也可以回放先前的图执行。如果我们使用thread_id 和 checkpoint_id 调用图,那么我们将_重放_对应于 checkpoint_id 的检查点_之前_先前执行的步骤,并且只执行检查点_之后_的步骤。
thread_id是线程的 ID。checkpoint_id是指向线程内特定检查点的标识符。
configurable 部分传递:
checkpoint_id_之前_的步骤。checkpoint_id_之后_的所有步骤都将被执行(即,新的分叉),即使它们先前已执行过。请参阅此关于时间旅行的操作指南以了解更多关于重放的信息。
更新状态
除了从特定checkpoints 重放图之外,我们还可以_编辑_图状态。我们使用 update_state 来执行此操作。此方法接受三个不同的参数:
config
The config should contain thread_id specifying which thread to update. When only the thread_id is passed, we update (or fork) the current state. Optionally, if we include checkpoint_id field, then we fork that selected checkpoint.
values
These are the values that will be used to update the state. Note that this update is treated exactly as any update from a node is treated. This means that these values will be passed to the reducer functions, if they are defined for some of the channels in the graph state. This means that update_state does NOT automatically overwrite the channel values for every channel, but only for the channels without reducers. Let’s walk through an example.
Let’s assume you have defined the state of your graph with the following schema (see full example above):
foo key (channel) is completely changed (because there is no reducer specified for that channel, so update_state overwrites it). However, there is a reducer specified for the bar key, and so it appends "b" to the state of bar.
as_node
The final thing you can optionally specify when calling update_state is as_node. If you provided it, the update will be applied as if it came from node as_node. If as_node is not provided, it will be set to the last node that updated the state, if not ambiguous. The reason this matters is that the next steps to execute depend on the last node to have given an update, so this can be used to control which node executes next. See this how to guide on time-travel to learn more about forking state.
Memory Store
A state schema specifies a set of keys that are populated as a graph is executed. As discussed above, state can be written by a checkpointer to a thread at each graph step, enabling state persistence.
但是,如果我们想要_跨线程_保留一些信息怎么办?考虑聊天机器人的情况,我们希望在与该用户的所有聊天对话(例如,线程)中保留有关用户的特定信息!
仅使用检查点器,我们无法跨线程共享信息。这促使需要 Store 接口。作为说明,我们可以定义一个 InMemoryStore 来跨线程存储有关用户的信息。我们只需像以前一样使用检查点器和新的 in_memory_store 变量编译我们的图。
LangGraph API handles stores automatically
When using the LangGraph API, you don’t need to implement or configure stores manually. The API handles all storage infrastructure for you behind the scenes.
基本用法
首先,让我们在不使用 LangGraph 的情况下单独展示这一点。tuple 命名空间化,在这个特定示例中将是 (<user_id>, "memories")。命名空间可以是任何长度并代表任何内容,不必是用户特定的。
store.put 方法将记忆保存到存储中的命名空间。当我们这样做时,我们指定命名空间(如上所述)和记忆的键值对:键只是记忆的唯一标识符(memory_id),值(字典)是记忆本身。
store.search 方法读取命名空间中的记忆,该方法将返回给定用户的所有记忆作为列表。最新的记忆是列表中的最后一个。
Item)。我们可以通过如上所示的 .dict 转换将其作为字典访问。
它具有的属性是:
value:此记忆的值(本身是一个字典)key:此命名空间中此记忆的唯一键namespace:字符串列表,此记忆类型的命名空间created_at:创建此记忆时的时间戳updated_at:更新此记忆时的时间戳
Semantic Search
Beyond simple retrieval, the store also supports semantic search, allowing you to find memories based on meaning rather than exact matches. To enable this, configure the store with an embedding model:fields parameter or by specifying the index parameter when storing memories:
Using in LangGraph
有了这一切,我们在 LangGraph 中使用in_memory_store。in_memory_store 与检查点器协同工作:检查点器将状态保存到线程,如上所述,in_memory_store 允许我们存储任意信息以_跨_线程访问。我们使用检查点器和 in_memory_store 编译图,如下所示。
thread_id 调用图,还使用 user_id,我们将使用它来将我们的记忆命名空间化到此特定用户,如上所示。
store: BaseStore 和 config: RunnableConfig 作为节点参数来访问 in_memory_store 和 user_id。以下是我们如何在节点中使用语义搜索来查找相关记忆:
store.search method to get memories. Recall the memories are returned as a list of objects that can be converted to a dictionary.
user_id 相同,我们仍然可以访问相同的记忆。
langgraph.json 文件中配置索引设置。例如:
检查点器库
在底层,检查点由符合BaseCheckpointSaver 接口的检查点器对象提供支持。LangGraph 提供了几种检查点器实现,所有这些都通过独立的可安装库实现:
langgraph-checkpoint: The base interface for checkpointer savers (BaseCheckpointSaver) and serialization/deserialization interface (SerializerProtocol). Includes in-memory checkpointer implementation (InMemorySaver) for experimentation. LangGraph comes withlanggraph-checkpointincluded.langgraph-checkpoint-sqlite: An implementation of LangGraph checkpointer that uses SQLite database (SqliteSaver/AsyncSqliteSaver). Ideal for experimentation and local workflows. Needs to be installed separately.langgraph-checkpoint-postgres: An advanced checkpointer that uses Postgres database (PostgresSaver/AsyncPostgresSaver), used in LangSmith. Ideal for using in production. Needs to be installed separately.
检查点器接口
每个检查点器都符合BaseCheckpointSaver 接口并实现以下方法:
.put- 存储检查点及其配置和元数据。.put_writes- 存储与检查点链接的中间写入(即待处理写入)。.get_tuple- 使用给定配置(thread_id和checkpoint_id)获取检查点元组。这用于在graph.get_state()中填充StateSnapshot。.list- 列出与给定配置和过滤条件匹配的检查点。这用于在graph.get_state_history()中填充状态历史
.ainvoke、.astream、.abatch 执行图),将使用上述方法的异步版本(.aput、.aput_writes、.aget_tuple、.alist)。
要异步运行图,您可以使用
InMemorySaver,或 Sqlite/Postgres 检查点器的异步版本 — AsyncSqliteSaver / AsyncPostgresSaver 检查点器。序列化器
当检查点器保存图状态时,它们需要序列化状态中的通道值。这是使用序列化器对象完成的。langgraph_checkpoint 定义了用于实现序列化器的 协议,并提供了一个默认实现(JsonPlusSerializer),该实现处理多种类型,包括 LangChain 和 LangGraph 原语、日期时间、枚举等。
Serialization with pickle
The default serializer, JsonPlusSerializer, uses ormsgpack and JSON under the hood, which is not suitable for all types of objects.
If you want to fallback to pickle for objects not currently supported by our msgpack encoder (such as Pandas dataframes),
you can use the pickle_fallback argument of the JsonPlusSerializer:
Encryption
Checkpointers can optionally encrypt all persisted state. To enable this, pass an instance ofEncryptedSerializer to the serde argument of any BaseCheckpointSaver implementation. The easiest way to create an encrypted serializer is via from_pycryptodome_aes, which reads the AES key from the LANGGRAPH_AES_KEY environment variable (or accepts a key argument):
LANGGRAPH_AES_KEY,加密就会自动启用,因此您只需要提供环境变量。可以通过实现 CipherProtocol 并将其提供给 EncryptedSerializer 来使用其他加密方案。