from langgraph_sdk import get_clientclient = get_client(url=<DEPLOYMENT_URL>, api_key=<API_KEY>)# Using the graph deployed with the name "agent"assistant_id = "agent"# create a threadthread = await client.threads.create()thread_id = thread["thread_id"]# create a streaming runasync for chunk in client.runs.stream( thread_id, assistant_id, input=inputs, stream_mode="updates"): print(chunk.data)
import { Client } from "@langchain/langgraph-sdk";const client = new Client({ apiUrl: <DEPLOYMENT_URL>, apiKey: <API_KEY> });// Using the graph deployed with the name "agent"const assistantID = "agent";// create a threadconst thread = await client.threads.create();const threadID = thread["thread_id"];// create a streaming runconst streamResponse = client.runs.stream( threadID, assistantID, { input, streamMode: "updates" });for await (const chunk of streamResponse) { console.log(chunk.data);}
Create a thread:
curl --request POST \--url <DEPLOYMENT_URL>/threads \--header 'Content-Type: application/json' \--data '{}'
Once you have a running Agent Server, you can interact with it using
LangGraph SDK
Python
JavaScript
cURL
from langgraph_sdk import get_clientclient = get_client(url=<DEPLOYMENT_URL>)# Using the graph deployed with the name "agent"assistant_id = "agent"# create a threadthread = await client.threads.create()thread_id = thread["thread_id"]# create a streaming runasync for chunk in client.runs.stream( # (1)! thread_id, assistant_id, input={"topic": "ice cream"}, stream_mode="updates" # (2)!): print(chunk.data)
{'run_id': '1f02c2b3-3cef-68de-b720-eec2a4a8e920', 'attempt': 1}{'refine_topic': {'topic': 'ice cream and cats'}}{'generate_joke': {'joke': 'This is a joke about ice cream and cats'}}
Streams the updates to the state after each step of the graph. If multiple updates are made in the same step (e.g., multiple nodes are run), those updates are streamed separately.
from langgraph_sdk import get_clientclient = get_client(url=<DEPLOYMENT_URL>)# Using the graph deployed with the name "agent"assistant_id = "agent"# create a threadthread = await client.threads.create()thread_id = thread["thread_id"]
import { Client } from "@langchain/langgraph-sdk";const client = new Client({ apiUrl: <DEPLOYMENT_URL> });// Using the graph deployed with the name "agent"const assistantID = "agent";// create a threadconst thread = await client.threads.create();const threadID = thread["thread_id"]
curl --request POST \--url <DEPLOYMENT_URL>/threads \--header 'Content-Type: application/json' \--data '{}'
async for chunk in client.runs.stream( thread_id, assistant_id, input={"foo": "foo"}, stream_subgraphs=True, # (1)! stream_mode="updates",): print(chunk)
设置 stream_subgraphs=True 以流式传输子图的输出。
扩展示例:从子图流式传输
这是一个可以在 Agent Server 中运行的示例图。
有关更多详细信息,请参阅 LangSmith 快速入门。
# graph.pyfrom langgraph.graph import START, StateGraphfrom typing import TypedDict# Define subgraphclass SubgraphState(TypedDict): foo: str # note that this key is shared with the parent graph state bar: strdef subgraph_node_1(state: SubgraphState): return {"bar": "bar"}def subgraph_node_2(state: SubgraphState): return {"foo": state["foo"] + state["bar"]}subgraph_builder = StateGraph(SubgraphState)subgraph_builder.add_node(subgraph_node_1)subgraph_builder.add_node(subgraph_node_2)subgraph_builder.add_edge(START, "subgraph_node_1")subgraph_builder.add_edge("subgraph_node_1", "subgraph_node_2")subgraph = subgraph_builder.compile()# Define parent graphclass ParentState(TypedDict): foo: strdef node_1(state: ParentState): return {"foo": "hi! " + state["foo"]}builder = StateGraph(ParentState)builder.add_node("node_1", node_1)builder.add_node("node_2", subgraph)builder.add_edge(START, "node_1")builder.add_edge("node_1", "node_2")graph = builder.compile()
Once you have a running Agent Server, you can interact with it using
LangGraph SDK
Python
JavaScript
cURL
from langgraph_sdk import get_clientclient = get_client(url=<DEPLOYMENT_URL>)# Using the graph deployed with the name "agent"assistant_id = "agent"# create a threadthread = await client.threads.create()thread_id = thread["thread_id"]async for chunk in client.runs.stream( thread_id, assistant_id, input={"foo": "foo"}, stream_subgraphs=True, # (1)! stream_mode="updates",): print(chunk)
设置 stream_subgraphs=True 以流式传输子图的输出。
import { Client } from "@langchain/langgraph-sdk";const client = new Client({ apiUrl: <DEPLOYMENT_URL> });// Using the graph deployed with the name "agent"const assistantID = "agent";// create a threadconst thread = await client.threads.create();const threadID = thread["thread_id"];// create a streaming runconst streamResponse = client.runs.stream( threadID, assistantID, { input: { foo: "foo" }, streamSubgraphs: true, // (1)! streamMode: "updates" });for await (const chunk of streamResponse) { console.log(chunk);}
Set streamSubgraphs: true to stream outputs from subgraphs.
创建线程:
curl --request POST \--url <DEPLOYMENT_URL>/threads \--header 'Content-Type: application/json' \--data '{}'