本指南解释了使用子图的机制。子图是在另一个图中用作节点 的图 。
子图对以下情况有用:
构建多智能体系统
在多个图中重用一组节点
分布式开发:当您希望不同的团队独立处理图的不同部分时,您可以将每个部分定义为子图,只要尊重子图接口(输入和输出模式),就可以在不了解子图任何细节的情况下构建父图
添加子图时,您需要定义父图和子图如何通信:
npm install @langchain/langgraph
为 LangGraph 开发设置 LangSmith
注册 LangSmith 以快速发现问题并提高 LangGraph 项目的性能。LangSmith 让您使用跟踪数据来调试、测试和监控使用 LangGraph 构建的 LLM 应用程序 — 在这里 阅读更多关于如何开始的信息。
从节点调用图
实现子图的一种简单方法是从另一个图的节点内部调用图。在这种情况下,子图可以具有与父图完全不同的模式 (没有共享键)。例如,您可能希望为多智能体 系统中的每个智能体保留私有消息历史。
如果您的应用程序就是这种情况,您需要定义一个调用子图的节点函数 。此函数需要在调用子图之前将输入(父)状态转换为子图状态,并在从节点返回状态更新之前将结果转换回父状态。
import { StateGraph , START } from "@langchain/langgraph" ;
import * as z from "zod" ;
const SubgraphState = z . object ({
bar: z . string (),
});
// Subgraph
const subgraphBuilder = new StateGraph ( SubgraphState )
. addNode ( "subgraphNode1" , ( state ) => {
return { bar: "hi! " + state . bar };
})
. addEdge ( START , "subgraphNode1" );
const subgraph = subgraphBuilder . compile ();
// Parent graph
const State = z . object ({
foo: z . string (),
});
// Transform the state to the subgraph state and back
const builder = new StateGraph ( State )
. addNode ( "node1" , async ( state ) => {
const subgraphOutput = await subgraph . invoke ({ bar: state . foo });
return { foo: subgraphOutput . bar };
})
. addEdge ( START , "node1" );
const graph = builder . compile ();
Full example: different state schemas
import { StateGraph , START } from "@langchain/langgraph" ;
import * as z from "zod" ;
// Define subgraph
const SubgraphState = z . object ({
// note that none of these keys are shared with the parent graph state
bar: z . string (),
baz: z . string (),
});
const subgraphBuilder = new StateGraph ( SubgraphState )
. addNode ( "subgraphNode1" , ( state ) => {
return { baz: "baz" };
})
. addNode ( "subgraphNode2" , ( state ) => {
return { bar: state . bar + state . baz };
})
. addEdge ( START , "subgraphNode1" )
. addEdge ( "subgraphNode1" , "subgraphNode2" );
const subgraph = subgraphBuilder . compile ();
// Define parent graph
const ParentState = z . object ({
foo: z . string (),
});
const builder = new StateGraph ( ParentState )
. addNode ( "node1" , ( state ) => {
return { foo: "hi! " + state . foo };
})
. addNode ( "node2" , async ( state ) => {
const response = await subgraph . invoke ({ bar: state . foo });
return { foo: response . bar };
})
. addEdge ( START , "node1" )
. addEdge ( "node1" , "node2" );
const graph = builder . compile ();
for await ( const chunk of await graph . stream (
{ foo: "foo" },
{ subgraphs: true }
)) {
console . log ( chunk );
}
Transform the state to the subgraph state
Transform response back to the parent state
[[], { node1: { foo: 'hi! foo' } }]
[['node2:9c36dd0f-151a-cb42-cbad-fa2f851f9ab7'], { subgraphNode1: { baz: 'baz' } }]
[['node2:9c36dd0f-151a-cb42-cbad-fa2f851f9ab7'], { subgraphNode2: { bar: 'hi! foobaz' } }]
[[], { node2: { foo: 'hi! foobaz' } }]
Full example: different state schemas (two levels of subgraphs)
This is an example with two levels of subgraphs: parent -> child -> grandchild. import { StateGraph , START , END } from "@langchain/langgraph" ;
import * as z from "zod" ;
// Grandchild graph
const GrandChildState = z . object ({
myGrandchildKey: z . string (),
});
const grandchild = new StateGraph ( GrandChildState )
. addNode ( "grandchild1" , ( state ) => {
// NOTE: child or parent keys will not be accessible here
return { myGrandchildKey: state . myGrandchildKey + ", how are you" };
})
. addEdge ( START , "grandchild1" )
. addEdge ( "grandchild1" , END );
const grandchildGraph = grandchild . compile ();
// Child graph
const ChildState = z . object ({
myChildKey: z . string (),
});
const child = new StateGraph ( ChildState )
. addNode ( "child1" , async ( state ) => {
// NOTE: parent or grandchild keys won't be accessible here
const grandchildGraphInput = { myGrandchildKey: state . myChildKey };
const grandchildGraphOutput = await grandchildGraph . invoke ( grandchildGraphInput );
return { myChildKey: grandchildGraphOutput . myGrandchildKey + " today?" };
})
. addEdge ( START , "child1" )
. addEdge ( "child1" , END );
const childGraph = child . compile ();
// Parent graph
const ParentState = z . object ({
myKey: z . string (),
});
const parent = new StateGraph ( ParentState )
. addNode ( "parent1" , ( state ) => {
// NOTE: child or grandchild keys won't be accessible here
return { myKey: "hi " + state . myKey };
})
. addNode ( "child" , async ( state ) => {
const childGraphInput = { myChildKey: state . myKey };
const childGraphOutput = await childGraph . invoke ( childGraphInput );
return { myKey: childGraphOutput . myChildKey };
})
. addNode ( "parent2" , ( state ) => {
return { myKey: state . myKey + " bye!" };
})
. addEdge ( START , "parent1" )
. addEdge ( "parent1" , "child" )
. addEdge ( "child" , "parent2" )
. addEdge ( "parent2" , END );
const parentGraph = parent . compile ();
for await ( const chunk of await parentGraph . stream (
{ myKey: "Bob" },
{ subgraphs: true }
)) {
console . log ( chunk );
}
We’re transforming the state from the child state channels (myChildKey) to the grandchild state channels (myGrandchildKey)
We’re transforming the state from the grandchild state channels (myGrandchildKey) back to the child state channels (myChildKey)
We’re passing a function here instead of just compiled graph (grandchildGraph)
We’re transforming the state from the parent state channels (myKey) to the child state channels (myChildKey)
We’re transforming the state from the child state channels (myChildKey) back to the parent state channels (myKey)
We’re passing a function here instead of just a compiled graph (childGraph)
[[], { parent1: { myKey: 'hi Bob' } }]
[['child:2e26e9ce-602f-862c-aa66-1ea5a4655e3b', 'child1:781bb3b1-3971-84ce-810b-acf819a03f9c'], { grandchild1: { myGrandchildKey: 'hi Bob, how are you' } }]
[['child:2e26e9ce-602f-862c-aa66-1ea5a4655e3b'], { child1: { myChildKey: 'hi Bob, how are you today?' } }]
[[], { child: { myKey: 'hi Bob, how are you today?' } }]
[[], { parent2: { myKey: 'hi Bob, how are you today? bye!' } }]
将图添加为节点
当父图和子图可以通过模式 中的共享状态键(通道)进行通信时,您可以将图添加为另一个图中的节点 。例如,在多智能体 系统中,智能体通常通过共享的 messages 键进行通信。
如果您的子图与父图共享状态键,您可以按照以下步骤将其添加到您的图中:
Define the subgraph workflow (subgraphBuilder in the example below) and compile it
Pass compiled subgraph to the .addNode method when defining the parent graph workflow
import { StateGraph , START } from "@langchain/langgraph" ;
import * as z from "zod" ;
const State = z . object ({
foo: z . string (),
});
// Subgraph
const subgraphBuilder = new StateGraph ( State )
. addNode ( "subgraphNode1" , ( state ) => {
return { foo: "hi! " + state . foo };
})
. addEdge ( START , "subgraphNode1" );
const subgraph = subgraphBuilder . compile ();
// Parent graph
const builder = new StateGraph ( State )
. addNode ( "node1" , subgraph )
. addEdge ( START , "node1" );
const graph = builder . compile ();
Full example: shared state schemas
import { StateGraph , START } from "@langchain/langgraph" ;
import * as z from "zod" ;
// Define subgraph
const SubgraphState = z . object ({
foo: z . string (),
bar: z . string (),
});
const subgraphBuilder = new StateGraph ( SubgraphState )
. addNode ( "subgraphNode1" , ( state ) => {
return { bar: "bar" };
})
. addNode ( "subgraphNode2" , ( state ) => {
// note that this node is using a state key ('bar') that is only available in the subgraph
// and is sending update on the shared state key ('foo')
return { foo: state . foo + state . bar };
})
. addEdge ( START , "subgraphNode1" )
. addEdge ( "subgraphNode1" , "subgraphNode2" );
const subgraph = subgraphBuilder . compile ();
// Define parent graph
const ParentState = z . object ({
foo: z . string (),
});
const builder = new StateGraph ( ParentState )
. addNode ( "node1" , ( state ) => {
return { foo: "hi! " + state . foo };
})
. addNode ( "node2" , subgraph )
. addEdge ( START , "node1" )
. addEdge ( "node1" , "node2" );
const graph = builder . compile ();
for await ( const chunk of await graph . stream ({ foo: "foo" })) {
console . log ( chunk );
}
This key is shared with the parent graph state
This key is private to the SubgraphState and is not visible to the parent graph
{ node1: { foo: 'hi! foo' } }
{ node2: { foo: 'hi! foobar' } }
添加持久化
您只需要在编译父图时提供检查点器 。LangGraph 会自动将检查点器传播到子图。
import { StateGraph , START , MemorySaver } from "@langchain/langgraph" ;
import * as z from "zod" ;
const State = z . object ({
foo: z . string (),
});
// Subgraph
const subgraphBuilder = new StateGraph ( State )
. addNode ( "subgraphNode1" , ( state ) => {
return { foo: state . foo + "bar" };
})
. addEdge ( START , "subgraphNode1" );
const subgraph = subgraphBuilder . compile ();
// Parent graph
const builder = new StateGraph ( State )
. addNode ( "node1" , subgraph )
. addEdge ( START , "node1" );
const checkpointer = new MemorySaver ();
const graph = builder . compile ({ checkpointer });
如果您希望子图拥有自己的内存 ,可以使用适当的检查点器选项编译它。这在多智能体 系统中很有用,如果您希望智能体跟踪其内部消息历史:
const subgraphBuilder = new StateGraph ( ... )
const subgraph = subgraphBuilder . compile ({ checkpointer: true });
查看子图状态
当您启用持久化 时,您可以通过适当的方法检查图状态 (检查点)。要查看子图状态,您可以使用 subgraphs 选项。
您可以通过 graph.getState(config) 检查图状态。要查看子图状态,可以使用 graph.getState(config, { subgraphs: true })。
仅在中断时可用
子图状态只能在子图被中断时 查看。一旦您恢复图,您将无法访问子图状态。
View interrupted subgraph state
import { StateGraph , START , MemorySaver , interrupt , Command } from "@langchain/langgraph" ;
import * as z from "zod" ;
const State = z . object ({
foo: z . string (),
});
// Subgraph
const subgraphBuilder = new StateGraph ( State )
. addNode ( "subgraphNode1" , ( state ) => {
const value = interrupt ( "Provide value:" );
return { foo: state . foo + value };
})
. addEdge ( START , "subgraphNode1" );
const subgraph = subgraphBuilder . compile ();
// Parent graph
const builder = new StateGraph ( State )
. addNode ( "node1" , subgraph )
. addEdge ( START , "node1" );
const checkpointer = new MemorySaver ();
const graph = builder . compile ({ checkpointer });
const config = { configurable: { thread_id: "1" } };
await graph . invoke ({ foo: "" }, config );
const parentState = await graph . getState ( config );
const subgraphState = ( await graph . getState ( config , { subgraphs: true })). tasks [ 0 ]. state ;
// resume the subgraph
await graph . invoke ( new Command ({ resume: "bar" }), config );
流式传输子图输出
要在流式输出中包含子图的输出,您可以在父图的 stream 方法中设置 subgraphs 选项。这将从父图和任何子图流式传输输出。
for await ( const chunk of await graph . stream (
{ foo: "foo" },
{
subgraphs: true ,
streamMode: "updates" ,
}
)) {
console . log ( chunk );
}
设置 subgraphs: true 以流式传输子图的输出。
import { StateGraph , START } from "@langchain/langgraph" ;
import * as z from "zod" ;
// Define subgraph
const SubgraphState = z . object ({
foo: z . string (),
bar: z . string (),
});
const subgraphBuilder = new StateGraph ( SubgraphState )
. addNode ( "subgraphNode1" , ( state ) => {
return { bar: "bar" };
})
. addNode ( "subgraphNode2" , ( state ) => {
// note that this node is using a state key ('bar') that is only available in the subgraph
// and is sending update on the shared state key ('foo')
return { foo: state . foo + state . bar };
})
. addEdge ( START , "subgraphNode1" )
. addEdge ( "subgraphNode1" , "subgraphNode2" );
const subgraph = subgraphBuilder . compile ();
// Define parent graph
const ParentState = z . object ({
foo: z . string (),
});
const builder = new StateGraph ( ParentState )
. addNode ( "node1" , ( state ) => {
return { foo: "hi! " + state . foo };
})
. addNode ( "node2" , subgraph )
. addEdge ( START , "node1" )
. addEdge ( "node1" , "node2" );
const graph = builder . compile ();
for await ( const chunk of await graph . stream (
{ foo: "foo" },
{
streamMode: "updates" ,
subgraphs: true ,
}
)) {
console . log ( chunk );
}
设置 subgraphs: true 以流式传输子图的输出。
[[], { node1: { foo: 'hi! foo' } }]
[['node2:e58e5673-a661-ebb0-70d4-e298a7fc28b7'], { subgraphNode1: { bar: 'bar' } }]
[['node2:e58e5673-a661-ebb0-70d4-e298a7fc28b7'], { subgraphNode2: { foo: 'hi! foobar' } }]
[[], { node2: { foo: 'hi! foobar' } }]