Pregel 实现 LangGraph 的运行时,管理 LangGraph 应用程序的执行。 编译 StateGraph 或创建 @entrypoint 会生成一个可以使用输入调用的 Pregel 实例。 本指南在高层次上解释运行时,并提供使用 Pregel 直接实现应用程序的说明。
注意: Pregel 运行时以 Google 的 Pregel 算法命名,该算法描述了使用图进行大规模并行计算的高效方法。

概述

在 LangGraph 中,Pregel 将参与者通道组合成单个应用程序。参与者从通道读取数据并向通道写入数据。Pregel 遵循 Pregel 算法/批量同步并行模型将应用程序的执行组织成多个步骤。 每个步骤包括三个阶段:
  • 计划:确定在此步骤中执行哪些参与者。例如,在第一步中,选择订阅特殊输入通道的参与者;在后续步骤中,选择订阅在前一步中更新的通道的参与者
  • 执行:并行执行所有选定的参与者,直到所有参与者完成、一个失败或达到超时。在此阶段,通道更新对参与者不可见,直到下一步。
  • 更新:使用参与者在此步骤中写入的值更新通道。
重复,直到没有选择执行的参与者,或达到最大步骤数。

参与者

参与者PregelNode。它订阅通道,从中读取数据,并向其写入数据。它可以被认为是 Pregel 算法中的参与者PregelNodes 实现 LangChain 的 Runnable 接口。

通道

通道用于在参与者(PregelNodes)之间通信。每个通道都有一个值类型、一个更新类型和一个更新函数 - 它接受一系列更新并修改存储的值。通道可用于将数据从一个链发送到另一个链,或在未来步骤中将数据从链发送到自身。LangGraph 提供了许多内置通道:
  • LastValue:默认通道,存储发送到通道的最后一个值,对输入和输出值很有用,或用于从一个步骤向下一个步骤发送数据。
  • Topic:可配置的 PubSub 主题,对在参与者之间发送多个值或累积输出很有用。可以配置为去重值或在多个步骤过程中累积值。
  • BinaryOperatorAggregate:存储持久值,通过将二元运算符应用于当前值和发送到通道的每个更新来更新,对计算多个步骤的聚合很有用;例如,total = BinaryOperatorAggregate(int, operator.add)

示例

虽然大多数用户将通过 StateGraph API 或 @entrypoint 装饰器与 Pregel 交互,但也可以直接与 Pregel 交互。 以下是一些不同的示例,让您了解 Pregel API。
  • Single node
  • Multiple nodes
  • Topic
  • BinaryOperatorAggregate
  • Cycle
from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder

node1 = (
    NodeBuilder().subscribe_only("a")
    .do(lambda x: x + x)
    .write_to("b")
)

app = Pregel(
    nodes={"node1": node1},
    channels={
        "a": EphemeralValue(str),
        "b": EphemeralValue(str),
    },
    input_channels=["a"],
    output_channels=["b"],
)

app.invoke({"a": "foo"})
{'b': 'foofoo'}

高级 API

LangGraph 提供两个高级 API 用于创建 Pregel 应用程序:StateGraph (Graph API)Functional API
  • StateGraph (Graph API)
  • Functional API
The StateGraph (Graph API) is a higher-level abstraction that simplifies the creation of Pregel applications. It allows you to define a graph of nodes and edges. When you compile the graph, the StateGraph API automatically creates the Pregel application for you.
from typing import TypedDict

from langgraph.constants import START
from langgraph.graph import StateGraph

class Essay(TypedDict):
    topic: str
    content: str | None
    score: float | None

def write_essay(essay: Essay):
    return {
        "content": f"Essay about {essay['topic']}",
    }

def score_essay(essay: Essay):
    return {
        "score": 10
    }

builder = StateGraph(Essay)
builder.add_node(write_essay)
builder.add_node(score_essay)
builder.add_edge(START, "write_essay")
builder.add_edge("write_essay", "score_essay")

# Compile the graph.
# This will return a Pregel instance.
graph = builder.compile()
The compiled Pregel instance will be associated with a list of nodes and channels. You can inspect the nodes and channels by printing them.
print(graph.nodes)
You will see something like this:
{'__start__': <langgraph.pregel.read.PregelNode at 0x7d05e3ba1810>,
 'write_essay': <langgraph.pregel.read.PregelNode at 0x7d05e3ba14d0>,
 'score_essay': <langgraph.pregel.read.PregelNode at 0x7d05e3ba1710>}
print(graph.channels)
You should see something like this
{'topic': <langgraph.channels.last_value.LastValue at 0x7d05e3294d80>,
 'content': <langgraph.channels.last_value.LastValue at 0x7d05e3295040>,
 'score': <langgraph.channels.last_value.LastValue at 0x7d05e3295980>,
 '__start__': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e3297e00>,
 'write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e32960c0>,
 'score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8ab80>,
 'branch:__start__:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e32941c0>,
 'branch:__start__:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d88800>,
 'branch:write_essay:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e3295ec0>,
 'branch:write_essay:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8ac00>,
 'branch:score_essay:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d89700>,
 'branch:score_essay:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8b400>,
 'start:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8b280>}

Connect these docs programmatically to Claude, VSCode, and more via MCP for real-time answers.