Pregel 实现 LangGraph 的运行时,管理 LangGraph 应用程序的执行。 编译 StateGraph 或创建 entrypoint 会生成一个可以使用输入调用的 Pregel 实例。 本指南在高层次上解释运行时,并提供使用 Pregel 直接实现应用程序的说明。
注意: Pregel 运行时以 Google 的 Pregel 算法命名,该算法描述了使用图进行大规模并行计算的高效方法。

概述

在 LangGraph 中,Pregel 将参与者通道组合成单个应用程序。参与者从通道读取数据并向通道写入数据。Pregel 遵循 Pregel 算法/批量同步并行模型将应用程序的执行组织成多个步骤。 每个步骤包括三个阶段:
  • 计划:确定在此步骤中执行哪些参与者。例如,在第一步中,选择订阅特殊输入通道的参与者;在后续步骤中,选择订阅在前一步中更新的通道的参与者
  • 执行:并行执行所有选定的参与者,直到所有参与者完成、一个失败或达到超时。在此阶段,通道更新对参与者不可见,直到下一步。
  • 更新:使用参与者在此步骤中写入的值更新通道。
重复,直到没有选择执行的参与者,或达到最大步骤数。

参与者

参与者PregelNode。它订阅通道,从中读取数据,并向其写入数据。它可以被认为是 Pregel 算法中的参与者PregelNodes 实现 LangChain 的 Runnable 接口。

通道

通道用于在参与者(PregelNodes)之间通信。每个通道都有一个值类型、一个更新类型和一个更新函数 - 它接受一系列更新并修改存储的值。通道可用于将数据从一个链发送到另一个链,或在未来步骤中将数据从链发送到自身。LangGraph 提供了许多内置通道:
  • @[LastValue]:默认通道,存储发送到通道的最后一个值,对输入和输出值很有用,或用于从一个步骤向下一个步骤发送数据。
  • @[Topic]:可配置的 PubSub 主题,对在参与者之间发送多个值或累积输出很有用。可以配置为去重值或在多个步骤过程中累积值。
  • BinaryOperatorAggregate:存储持久值,通过将二元运算符应用于当前值和发送到通道的每个更新来更新,对计算多个步骤的聚合很有用;例如,total = BinaryOperatorAggregate(int, operator.add)

示例

虽然大多数用户将通过 StateGraph API 或 entrypoint 装饰器与 Pregel 交互,但也可以直接与 Pregel 交互。 以下是一些不同的示例,让您了解 Pregel API。
  • Single node
  • Multiple nodes
  • Topic
  • BinaryOperatorAggregate
  • Cycle
import { EphemeralValue } from "@langchain/langgraph/channels";
import { Pregel, NodeBuilder } from "@langchain/langgraph/pregel";

const node1 = new NodeBuilder()
  .subscribeOnly("a")
  .do((x: string) => x + x)
  .writeTo("b");

const app = new Pregel({
  nodes: { node1 },
  channels: {
    a: new EphemeralValue<string>(),
    b: new EphemeralValue<string>(),
  },
  inputChannels: ["a"],
  outputChannels: ["b"],
});

await app.invoke({ a: "foo" });
{ b: 'foofoo' }

高级 API

LangGraph 提供两个高级 API 用于创建 Pregel 应用程序:StateGraph (Graph API)Functional API
  • StateGraph (Graph API)
  • Functional API
The StateGraph (Graph API) is a higher-level abstraction that simplifies the creation of Pregel applications. It allows you to define a graph of nodes and edges. When you compile the graph, the StateGraph API automatically creates the Pregel application for you.
import { START, StateGraph } from "@langchain/langgraph";

interface Essay {
  topic: string;
  content?: string;
  score?: number;
}

const writeEssay = (essay: Essay) => {
  return {
    content: `Essay about ${essay.topic}`,
  };
};

const scoreEssay = (essay: Essay) => {
  return {
    score: 10
  };
};

const builder = new StateGraph<Essay>({
  channels: {
    topic: null,
    content: null,
    score: null,
  }
})
  .addNode("writeEssay", writeEssay)
  .addNode("scoreEssay", scoreEssay)
  .addEdge(START, "writeEssay")
  .addEdge("writeEssay", "scoreEssay");

// Compile the graph.
// This will return a Pregel instance.
const graph = builder.compile();
The compiled Pregel instance will be associated with a list of nodes and channels. You can inspect the nodes and channels by printing them.
console.log(graph.nodes);
You will see something like this:
{
  __start__: PregelNode { ... },
  writeEssay: PregelNode { ... },
  scoreEssay: PregelNode { ... }
}
console.log(graph.channels);
You should see something like this
{
  topic: LastValue { ... },
  content: LastValue { ... },
  score: LastValue { ... },
  __start__: EphemeralValue { ... },
  writeEssay: EphemeralValue { ... },
  scoreEssay: EphemeralValue { ... },
  'branch:__start__:__self__:writeEssay': EphemeralValue { ... },
  'branch:__start__:__self__:scoreEssay': EphemeralValue { ... },
  'branch:writeEssay:__self__:writeEssay': EphemeralValue { ... },
  'branch:writeEssay:__self__:scoreEssay': EphemeralValue { ... },
  'branch:scoreEssay:__self__:writeEssay': EphemeralValue { ... },
  'branch:scoreEssay:__self__:scoreEssay': EphemeralValue { ... },
  'start:writeEssay': EphemeralValue { ... }
}

Connect these docs programmatically to Claude, VSCode, and more via MCP for real-time answers.