LangGraph SDK 允许您从 LangSmith Deployment API 流式传输输出
LangGraph SDK 和 Agent Server 是 LangSmith 的一部分。

基本用法

基本用法示例:
  • Python
  • JavaScript
  • cURL
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>, api_key=<API_KEY>)

# Using the graph deployed with the name "agent"
assistant_id = "agent"

# create a thread
thread = await client.threads.create()
thread_id = thread["thread_id"]

# create a streaming run
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input=inputs,
    stream_mode="updates"
):
    print(chunk.data)
这是一个可以在 Agent Server 中运行的示例图。 有关更多详细信息,请参阅 LangSmith 快速入门
# graph.py
from typing import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    topic: str
    joke: str

def refine_topic(state: State):
    return {"topic": state["topic"] + " and cats"}

def generate_joke(state: State):
    return {"joke": f"This is a joke about {state['topic']}"}

graph = (
    StateGraph(State)
    .add_node(refine_topic)
    .add_node(generate_joke)
    .add_edge(START, "refine_topic")
    .add_edge("refine_topic", "generate_joke")
    .add_edge("generate_joke", END)
    .compile()
)
Once you have a running Agent Server, you can interact with it using LangGraph SDK
  • Python
  • JavaScript
  • cURL
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)

# Using the graph deployed with the name "agent"
assistant_id = "agent"

# create a thread
thread = await client.threads.create()
thread_id = thread["thread_id"]

# create a streaming run
async for chunk in client.runs.stream(  # (1)!
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    stream_mode="updates"  # (2)!
):
    print(chunk.data)
  1. client.runs.stream() 方法返回一个产生流式输出的迭代器。 2. 设置 stream_mode="updates" 以仅流式传输每个节点后对图状态的更新。还提供其他流模式。有关详细信息,请参阅支持的流模式
{'run_id': '1f02c2b3-3cef-68de-b720-eec2a4a8e920', 'attempt': 1}
{'refine_topic': {'topic': 'ice cream and cats'}}
{'generate_joke': {'joke': 'This is a joke about ice cream and cats'}}

支持的流模式

ModeDescriptionLangGraph Library Method
valuesStream the full graph state after each super-step..stream() / .astream() with stream_mode="values"
updatesStreams the updates to the state after each step of the graph. If multiple updates are made in the same step (e.g., multiple nodes are run), those updates are streamed separately..stream() / .astream() with stream_mode="updates"
messages-tupleStreams LLM tokens and metadata for the graph node where the LLM is invoked (useful for chat apps)..stream() / .astream() with stream_mode="messages"
debugStreams as much information as possible throughout the execution of the graph..stream() / .astream() with stream_mode="debug"
customStreams custom data from inside your graph.stream() / .astream() with stream_mode="custom"
eventsStream all events (including the state of the graph); mainly useful when migrating large LCEL apps..astream_events()

流式传输多种模式

您可以将列表作为 stream_mode 参数传递,以同时流式传输多种模式。 流式输出将是 (mode, chunk) 元组,其中 mode 是流模式的名称,chunk 是该模式流式传输的数据。
  • Python
  • JavaScript
  • cURL
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input=inputs,
    stream_mode=["updates", "custom"]
):
    print(chunk)

流式传输图状态

使用流模式 updatesvalues 在图形执行时流式传输其状态。
  • updates 流式传输图每个步骤后对状态的更新
  • values 流式传输图每个步骤后状态的完整值
from typing import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
  topic: str
  joke: str

def refine_topic(state: State):
    return {"topic": state["topic"] + " and cats"}

def generate_joke(state: State):
    return {"joke": f"This is a joke about {state['topic']}"}

graph = (
  StateGraph(State)
  .add_node(refine_topic)
  .add_node(generate_joke)
  .add_edge(START, "refine_topic")
  .add_edge("refine_topic", "generate_joke")
  .add_edge("generate_joke", END)
  .compile()
)
有状态运行 下面的示例假设您希望在 checkpointer 数据库中持久化流式运行的输出,并且已创建线程。要创建线程:
  • Python
  • JavaScript
  • cURL
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)

# Using the graph deployed with the name "agent"
assistant_id = "agent"
# create a thread
thread = await client.threads.create()
thread_id = thread["thread_id"]
如果您不需要持久化运行的输出,在流式传输时可以传递 None 而不是 thread_id

流模式:updates

使用此模式仅流式传输每个步骤后节点返回的状态更新。流式输出包括节点名称和更新。
  • Python
  • JavaScript
  • cURL
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    stream_mode="updates"
):
    print(chunk.data)

流模式:values

使用此模式流式传输每个步骤后图的完整状态
  • Python
  • JavaScript
  • cURL
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    stream_mode="values"
):
    print(chunk.data)

子图

要在流式输出中包含子图的输出,可以在父图的 .stream() 方法中设置 subgraphs=True。这将流式传输父图和任何子图的输出。
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"foo": "foo"},
    stream_subgraphs=True, # (1)!
    stream_mode="updates",
):
    print(chunk)
  1. 设置 stream_subgraphs=True 以流式传输子图的输出。
这是一个可以在 Agent Server 中运行的示例图。 有关更多详细信息,请参阅 LangSmith 快速入门
# graph.py
from langgraph.graph import START, StateGraph
from typing import TypedDict

# Define subgraph
class SubgraphState(TypedDict):
    foo: str  # note that this key is shared with the parent graph state
    bar: str

def subgraph_node_1(state: SubgraphState):
    return {"bar": "bar"}

def subgraph_node_2(state: SubgraphState):
    return {"foo": state["foo"] + state["bar"]}

subgraph_builder = StateGraph(SubgraphState)
subgraph_builder.add_node(subgraph_node_1)
subgraph_builder.add_node(subgraph_node_2)
subgraph_builder.add_edge(START, "subgraph_node_1")
subgraph_builder.add_edge("subgraph_node_1", "subgraph_node_2")
subgraph = subgraph_builder.compile()

# Define parent graph
class ParentState(TypedDict):
    foo: str

def node_1(state: ParentState):
    return {"foo": "hi! " + state["foo"]}

builder = StateGraph(ParentState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", subgraph)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
graph = builder.compile()
Once you have a running Agent Server, you can interact with it using LangGraph SDK
  • Python
  • JavaScript
  • cURL
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)

# Using the graph deployed with the name "agent"
assistant_id = "agent"

# create a thread
thread = await client.threads.create()
thread_id = thread["thread_id"]

async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"foo": "foo"},
    stream_subgraphs=True, # (1)!
    stream_mode="updates",
):
    print(chunk)
  1. 设置 stream_subgraphs=True 以流式传输子图的输出。
注意,我们不仅接收节点更新,还接收命名空间,这些命名空间告诉我们正在从哪个图(或子图)流式传输。

调试

使用 debug 流模式在整个图执行过程中流式传输尽可能多的信息。流式输出包括节点名称和完整状态。
  • Python
  • JavaScript
  • cURL
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    stream_mode="debug"
):
    print(chunk.data)

LLM 令牌

使用 messages-tuple 流模式从图的任何部分(包括节点、工具、子图或任务)逐令牌流式传输大型语言模型 (LLM) 输出。 来自 messages-tuple 模式 的流式输出是一个元组 (message_chunk, metadata),其中:
  • message_chunk:来自 LLM 的令牌或消息段。
  • metadata:包含有关图节点和 LLM 调用详细信息的字典。
from dataclasses import dataclass

from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, START

@dataclass
class MyState:
    topic: str
    joke: str = ""

model = init_chat_model(model="gpt-4o-mini")

def call_model(state: MyState):
    """Call the LLM to generate a joke about a topic"""
    model_response = model.invoke( # (1)!
        [
            {"role": "user", "content": f"Generate a joke about {state.topic}"}
        ]
    )
    return {"joke": model_response.content}

graph = (
    StateGraph(MyState)
    .add_node(call_model)
    .add_edge(START, "call_model")
    .compile()
)
  1. 请注意,即使通过 invoke(而非 stream)运行 LLM,也会发送消息事件。
  • Python
  • JavaScript
  • cURL
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    stream_mode="messages-tuple",
):
    if chunk.event != "messages":
        continue

    message_chunk, metadata = chunk.data  # (1)!
    if message_chunk["content"]:
        print(message_chunk["content"], end="|", flush=True)
  1. “messages-tuple” 流模式返回一个元组 (message_chunk, metadata) 的迭代器,其中 message_chunk 是 LLM 流式传输的令牌,metadata 是一个包含有关调用 LLM 的图节点和其他信息的字典。

过滤 LLM 令牌

流式传输自定义数据

要发送自定义用户定义的数据
  • Python
  • JavaScript
  • cURL
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"query": "example"},
    stream_mode="custom"
):
    print(chunk.data)

流式传输事件

要流式传输所有事件,包括图的状态:
  • Python
  • JavaScript
  • cURL
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    stream_mode="events"
):
    print(chunk.data)

无状态运行

如果您不想在 checkpointer 数据库中持久化流式运行的输出,可以在不创建线程的情况下创建无状态运行:
  • Python
  • JavaScript
  • cURL
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>, api_key=<API_KEY>)

async for chunk in client.runs.stream(
    None,  # (1)!
    assistant_id,
    input=inputs,
    stream_mode="updates"
):
    print(chunk.data)
  1. 我们传递 None 而不是 thread_id UUID。

加入并流式传输

LangSmith 允许您加入活动的后台运行并从中流式传输输出。为此,您可以使用 LangGraph SDKclient.runs.join_stream 方法:
  • Python
  • JavaScript
  • cURL
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>, api_key=<API_KEY>)

async for chunk in client.runs.join_stream(
    thread_id,
    run_id,  # (1)!
):
    print(chunk)
  1. 这是您要加入的现有运行的 run_id
输出未缓冲 当您使用 .join_stream 时,输出不会缓冲,因此加入之前产生的任何输出都不会被接收。

API 参考

有关 API 使用和实现,请参阅 API 参考
Connect these docs programmatically to Claude, VSCode, and more via MCP for real-time answers.