from langgraph_sdk import get_clientclient = get_client(url=<DEPLOYMENT_URL>, api_key=<API_KEY>)# Using the graph deployed with the name "agent"assistant_id = "agent"# create a threadthread = await client.threads.create()thread_id = thread["thread_id"]# create a streaming runasync for chunk in client.runs.stream( thread_id, assistant_id, input=inputs, stream_mode="updates"): print(chunk.data)
扩展示例:流式传输更新
这是一个可以在 Agent Server 中运行的示例图。
有关更多详细信息,请参阅 LangSmith 快速入门。
Once you have a running Agent Server, you can interact with it using
LangGraph SDK
Python
JavaScript
cURL
Copy
from langgraph_sdk import get_clientclient = get_client(url=<DEPLOYMENT_URL>)# Using the graph deployed with the name "agent"assistant_id = "agent"# create a threadthread = await client.threads.create()thread_id = thread["thread_id"]# create a streaming runasync for chunk in client.runs.stream( # (1)! thread_id, assistant_id, input={"topic": "ice cream"}, stream_mode="updates" # (2)!): print(chunk.data)
{'run_id': '1f02c2b3-3cef-68de-b720-eec2a4a8e920', 'attempt': 1}{'refine_topic': {'topic': 'ice cream and cats'}}{'generate_joke': {'joke': 'This is a joke about ice cream and cats'}}
Streams the updates to the state after each step of the graph. If multiple updates are made in the same step (e.g., multiple nodes are run), those updates are streamed separately.
from langgraph_sdk import get_clientclient = get_client(url=<DEPLOYMENT_URL>)# Using the graph deployed with the name "agent"assistant_id = "agent"# create a threadthread = await client.threads.create()thread_id = thread["thread_id"]
async for chunk in client.runs.stream( thread_id, assistant_id, input={"foo": "foo"}, stream_subgraphs=True, # (1)! stream_mode="updates",): print(chunk)
设置 stream_subgraphs=True 以流式传输子图的输出。
扩展示例:从子图流式传输
这是一个可以在 Agent Server 中运行的示例图。
有关更多详细信息,请参阅 LangSmith 快速入门。
Copy
# graph.pyfrom langgraph.graph import START, StateGraphfrom typing import TypedDict# Define subgraphclass SubgraphState(TypedDict): foo: str # note that this key is shared with the parent graph state bar: strdef subgraph_node_1(state: SubgraphState): return {"bar": "bar"}def subgraph_node_2(state: SubgraphState): return {"foo": state["foo"] + state["bar"]}subgraph_builder = StateGraph(SubgraphState)subgraph_builder.add_node(subgraph_node_1)subgraph_builder.add_node(subgraph_node_2)subgraph_builder.add_edge(START, "subgraph_node_1")subgraph_builder.add_edge("subgraph_node_1", "subgraph_node_2")subgraph = subgraph_builder.compile()# Define parent graphclass ParentState(TypedDict): foo: strdef node_1(state: ParentState): return {"foo": "hi! " + state["foo"]}builder = StateGraph(ParentState)builder.add_node("node_1", node_1)builder.add_node("node_2", subgraph)builder.add_edge(START, "node_1")builder.add_edge("node_1", "node_2")graph = builder.compile()
Once you have a running Agent Server, you can interact with it using
LangGraph SDK
Python
JavaScript
cURL
Copy
from langgraph_sdk import get_clientclient = get_client(url=<DEPLOYMENT_URL>)# Using the graph deployed with the name "agent"assistant_id = "agent"# create a threadthread = await client.threads.create()thread_id = thread["thread_id"]async for chunk in client.runs.stream( thread_id, assistant_id, input={"foo": "foo"}, stream_subgraphs=True, # (1)! stream_mode="updates",): print(chunk)