持久化执行是一种技术,其中过程或工作流程在关键点保存其进度,允许它暂停并稍后从中断的位置准确恢复。这在需要人在回路的场景中特别有用,用户可以在继续之前检查、验证或修改过程,以及在可能遇到中断或错误(例如,对 LLM 的调用超时)的长时间运行任务中。通过保留已完成的工作,持久化执行使过程能够在不重新处理先前步骤的情况下恢复 — 即使在显著延迟之后(例如,一周后)。 LangGraph 的内置持久化层为工作流程提供持久化执行,确保每个执行步骤的状态保存到持久存储中。此功能保证,如果工作流程被中断 — 无论是由于系统故障还是人在回路交互 — 它都可以从其最后记录的状态恢复。
如果您使用带有检查点器的 LangGraph,您已经启用了持久化执行。您可以在任何点暂停和恢复工作流程,即使在中断或失败后也是如此。 要充分利用持久化执行,请确保您的工作流程设计为确定性幂等,并将任何副作用或非确定性操作包装在任务中。您可以从StateGraph(图 API)函数式 API中使用任务

要求

要在 LangGraph 中利用持久化执行,您需要:
  1. 通过指定将保存工作流程进度的检查点器在工作流程中启用持久化
  2. 在执行工作流程时指定线程标识符。这将跟踪工作流程特定实例的执行历史。
  3. 将任何非确定性操作(例如,随机数生成)或具有副作用的操作(例如,文件写入、API 调用)包装在 @[task] 中,以确保当恢复工作流程时,这些操作不会针对特定运行重复,而是从持久层检索其结果。有关更多信息,请参阅确定性和一致重放

确定性和一致重放

当您恢复工作流程运行时,代码不会从执行停止的相同代码行恢复;相反,它将识别一个适当的起点,从中继续进行。这意味着工作流程将从起点重放所有步骤,直到达到停止的点。 因此,当您为持久化执行编写工作流程时,必须将任何非确定性操作(例如,随机数生成)和任何具有副作用的操作(例如,文件写入、API 调用)包装在任务节点中。 要确保您的工作流程是确定性的并且可以一致地重放,请遵循以下准则:
  • 避免重复工作:如果节点包含多个具有副作用的操作(例如,日志记录、文件写入或网络调用),请将每个操作包装在单独的任务中。这确保当恢复工作流程时,操作不会重复,并且它们的结果从持久层检索。
  • 封装非确定性操作:将可能产生非确定性结果的任何代码(例如,随机数生成)包装在任务节点中。这确保在恢复时,工作流程遵循具有相同结果的确切记录的步骤序列。
  • 使用幂等操作:在可能的情况下,确保副作用(例如,API 调用、文件写入)是幂等的。这意味着如果在工作流程失败后重试操作,它将具有与第一次执行时相同的效果。这对于导致数据写入的操作特别重要。如果任务开始但未能成功完成,工作流程的恢复将重新运行任务,依赖记录的结果来保持一致性。使用幂等键或验证现有结果以避免意外重复,确保平滑和可预测的工作流程执行。
有关要避免的陷阱的一些示例,请参阅函数式 API 中的常见陷阱部分,它展示了如何使用任务构建代码以避免这些问题。相同的原则适用于 StateGraph(图 API)

持久性模式

LangGraph 支持三种持久性模式,允许您根据应用程序的要求平衡性能和数据一致性。持久性模式,从最不持久到最持久,如下所示: 更高的持久性模式会给工作流程执行增加更多开销。
在 v0.6.0 中添加 使用 durability 参数而不是 checkpoint_during(在 v0.6.0 中已弃用)进行持久化策略管理:
  • durability="async" 替换 checkpoint_during=True
  • durability="exit" 替换 checkpoint_during=False
用于持久化策略管理,具有以下映射:
  • checkpoint_during=True -> durability="async"
  • checkpoint_during=False -> durability="exit"

"exit"

仅当图执行完成(成功或出错)时才持久化更改。这为长时间运行的图提供了最佳性能,但意味着不保存中间状态,因此您无法从执行中期失败中恢复或中断图执行。

"async"

在执行下一步时异步持久化更改。这提供了良好的性能和持久性,但如果进程在执行期间崩溃,则存在检查点可能不会被写入的小风险。

"sync"

在下一步开始之前同步持久化更改。这确保在继续执行之前写入每个检查点,以一些性能开销为代价提供高持久性。 您可以在调用任何图执行方法时指定持久性模式:
graph.stream(
    {"input": "test"},
    durability="sync"
)

在节点中使用任务

如果节点包含多个操作,您可能会发现将每个操作转换为任务比将操作重构为单个节点更容易。
  • Original
  • With task
from typing import NotRequired
from typing_extensions import TypedDict
import uuid

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END
import requests

# Define a TypedDict to represent the state
class State(TypedDict):
    url: str
    result: NotRequired[str]

def call_api(state: State):
    """Example node that makes an API request."""
    result = requests.get(state['url']).text[:100]  # Side-effect  #
    return {
        "result": result
    }

# Create a StateGraph builder and add a node for the call_api function
builder = StateGraph(State)
builder.add_node("call_api", call_api)

# Connect the start and end nodes to the call_api node
builder.add_edge(START, "call_api")
builder.add_edge("call_api", END)

# Specify a checkpointer
checkpointer = InMemorySaver()

# Compile the graph with the checkpointer
graph = builder.compile(checkpointer=checkpointer)

# Define a config with a thread ID.
thread_id = uuid.uuid4()
config = {"configurable": {"thread_id": thread_id}}

# Invoke the graph
graph.invoke({"url": "https://www.example.com"}, config)

Resuming Workflows

一旦您在工作流程中启用了持久化执行,您可以为以下场景恢复执行:
  • **暂停和恢复工作流程:**使用 interrupt 函数在特定点暂停工作流程,并使用 Command 原语使用更新的状态恢复它。有关更多详细信息,请参阅中断
  • **从故障中恢复:**在异常(例如,LLM 提供商中断)后自动从最后一个成功的检查点恢复工作流程。这涉及通过提供 None 作为输入值,使用相同的线程标识符执行工作流程(请参阅函数式 API 的此示例)。

Starting Points for Resuming Workflows

  • If you’re using a StateGraph (Graph API), the starting point is the beginning of the node where execution stopped.
  • If you’re making a subgraph call inside a node, the starting point will be the parent node that called the subgraph that was halted. Inside the subgraph, the starting point will be the specific node where execution stopped.
  • If you’re using the Functional API, the starting point is the beginning of the entrypoint where execution stopped.

Connect these docs programmatically to Claude, VSCode, and more via MCP for real-time answers.