有关函数式 API 的概念信息,请参阅函数式 API。
创建简单的工作流程
定义entrypoint 时,输入仅限于函数的第一个参数。要传递多个输入,您可以使用字典。
Copy
@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
value = inputs["value"]
another_value = inputs["another_value"]
...
my_workflow.invoke({"value": 1, "another_value": 2})
Extended example: simple workflow
Extended example: simple workflow
Copy
import uuid
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver
# Task that checks if a number is even
@task
def is_even(number: int) -> bool:
return number % 2 == 0
# Task that formats a message
@task
def format_message(is_even: bool) -> str:
return "The number is even." if is_even else "The number is odd."
# Create a checkpointer for persistence
checkpointer = InMemorySaver()
@entrypoint(checkpointer=checkpointer)
def workflow(inputs: dict) -> str:
"""Simple workflow to classify a number."""
even = is_even(inputs["number"]).result()
return format_message(even).result()
# Run the workflow with a unique thread ID
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
result = workflow.invoke({"number": 7}, config=config)
print(result)
Extended example: Compose an essay with an LLM
Extended example: Compose an essay with an LLM
This example demonstrates how to use the
@task and @entrypoint decorators
syntactically. Given that a checkpointer is provided, the workflow results will
be persisted in the checkpointer.Copy
import uuid
from langchain.chat_models import init_chat_model
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver
model = init_chat_model('gpt-3.5-turbo')
# Task: generate essay using an LLM
@task
def compose_essay(topic: str) -> str:
"""Generate an essay about the given topic."""
return model.invoke([
{"role": "system", "content": "You are a helpful assistant that writes essays."},
{"role": "user", "content": f"Write an essay about {topic}."}
]).content
# Create a checkpointer for persistence
checkpointer = InMemorySaver()
@entrypoint(checkpointer=checkpointer)
def workflow(topic: str) -> str:
"""Simple workflow that generates an essay with an LLM."""
return compose_essay(topic).result()
# Execute the workflow
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
result = workflow.invoke("the history of flight", config=config)
print(result)
并行执行
可以通过并发调用任务并等待结果来并行执行任务。这对于提高 IO 密集型任务的性能很有用(例如,调用 LLM 的 API)。Copy
@task
def add_one(number: int) -> int:
return number + 1
@entrypoint(checkpointer=checkpointer)
def graph(numbers: list[int]) -> list[str]:
futures = [add_one(i) for i in numbers]
return [f.result() for f in futures]
Extended example: parallel LLM calls
Extended example: parallel LLM calls
This example demonstrates how to run multiple LLM calls in parallel using This example uses LangGraph’s concurrency model to improve execution time, especially when tasks involve I/O like LLM completions.
@task. Each call generates a paragraph on a different topic, and results are joined into a single text output.Copy
import uuid
from langchain.chat_models import init_chat_model
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver
# Initialize the LLM model
model = init_chat_model("gpt-3.5-turbo")
# Task that generates a paragraph about a given topic
@task
def generate_paragraph(topic: str) -> str:
response = model.invoke([
{"role": "system", "content": "You are a helpful assistant that writes educational paragraphs."},
{"role": "user", "content": f"Write a paragraph about {topic}."}
])
return response.content
# Create a checkpointer for persistence
checkpointer = InMemorySaver()
@entrypoint(checkpointer=checkpointer)
def workflow(topics: list[str]) -> str:
"""Generates multiple paragraphs in parallel and combines them."""
futures = [generate_paragraph(topic) for topic in topics]
paragraphs = [f.result() for f in futures]
return "\n\n".join(paragraphs)
# Run the workflow
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
result = workflow.invoke(["quantum computing", "climate change", "history of aviation"], config=config)
print(result)
调用图
Functional API 和 Graph API 可以在同一应用程序中一起使用,因为它们共享相同的底层运行时。Copy
from langgraph.func import entrypoint
from langgraph.graph import StateGraph
builder = StateGraph()
...
some_graph = builder.compile()
@entrypoint()
def some_workflow(some_input: dict) -> int:
# Call a graph defined using the graph API
result_1 = some_graph.invoke(...)
# Call another graph defined using the graph API
result_2 = another_graph.invoke(...)
return {
"result_1": result_1,
"result_2": result_2
}
Extended example: calling a simple graph from the functional API
Extended example: calling a simple graph from the functional API
Copy
import uuid
from typing import TypedDict
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph
# Define the shared state type
class State(TypedDict):
foo: int
# Define a simple transformation node
def double(state: State) -> State:
return {"foo": state["foo"] * 2}
# Build the graph using the Graph API
builder = StateGraph(State)
builder.add_node("double", double)
builder.set_entry_point("double")
graph = builder.compile()
# Define the functional API workflow
checkpointer = InMemorySaver()
@entrypoint(checkpointer=checkpointer)
def workflow(x: int) -> dict:
result = graph.invoke({"foo": x})
return {"bar": result["foo"]}
# Execute the workflow
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
print(workflow.invoke(5, config=config)) # Output: {'bar': 10}
调用其他入口点
您可以从入口点或任务内部调用其他入口点。Copy
@entrypoint() # Will automatically use the checkpointer from the parent entrypoint
def some_other_workflow(inputs: dict) -> int:
return inputs["value"]
@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
value = some_other_workflow.invoke({"value": 1})
return value
Extended example: calling another entrypoint
Extended example: calling another entrypoint
Copy
import uuid
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver
# Initialize a checkpointer
checkpointer = InMemorySaver()
# A reusable sub-workflow that multiplies a number
@entrypoint()
def multiply(inputs: dict) -> int:
return inputs["a"] * inputs["b"]
# Main workflow that invokes the sub-workflow
@entrypoint(checkpointer=checkpointer)
def main(inputs: dict) -> dict:
result = multiply.invoke({"a": inputs["x"], "b": inputs["y"]})
return {"product": result}
# Execute the main workflow
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
print(main.invoke({"x": 6, "y": 7}, config=config)) # Output: {'product': 42}
流式传输
Functional API 使用与 Graph API 相同的流式传输机制。请 阅读 流式传输指南 部分了解更多详细信息。 使用流式传输 API 流式传输更新和自定义数据的示例。Copy
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.config import get_stream_writer
checkpointer = InMemorySaver()
@entrypoint(checkpointer=checkpointer)
def main(inputs: dict) -> int:
writer = get_stream_writer()
writer("Started processing")
result = inputs["x"] * 2
writer(f"Result is {result}")
return result
config = {"configurable": {"thread_id": "abc"}}
for mode, chunk in main.stream(
{"x": 5},
stream_mode=["custom", "updates"],
config=config
):
print(f"{mode}: {chunk}")
- 导入
get_stream_writer从langgraph.config。 - 在 entrypoint 内部获取流写入器实例。
- 在计算开始前发出自定义数据。
- 在计算结果后发出另一个自定义消息。
- 使用
.stream()处理流式输出。 - 指定要使用的流式模式。
Copy
('updates', {'add_one': 2})
('updates', {'add_two': 3})
('custom', 'hello')
('custom', 'world')
('updates', {'main': 5})
Async with Python < 3.11
If using Python < 3.11 and writing async code, using
get_stream_writer will not work. Instead please
use the StreamWriter class directly. See Async with Python < 3.11 for more details.Copy
from langgraph.types import StreamWriter
@entrypoint(checkpointer=checkpointer)
async def main(inputs: dict, writer: StreamWriter) -> int:
...
重试策略
Copy
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import RetryPolicy
# This variable is just used for demonstration purposes to simulate a network failure.
# It's not something you will have in your actual code.
attempts = 0
# Let's configure the RetryPolicy to retry on ValueError.
# The default RetryPolicy is optimized for retrying specific network errors.
retry_policy = RetryPolicy(retry_on=ValueError)
@task(retry_policy=retry_policy)
def get_info():
global attempts
attempts += 1
if attempts < 2:
raise ValueError('Failure')
return "OK"
checkpointer = InMemorySaver()
@entrypoint(checkpointer=checkpointer)
def main(inputs, writer):
return get_info().result()
config = {
"configurable": {
"thread_id": "1"
}
}
main.invoke({'any_input': 'foobar'}, config=config)
Copy
'OK'
缓存任务
Copy
import time
from langgraph.cache.memory import InMemoryCache
from langgraph.func import entrypoint, task
from langgraph.types import CachePolicy
@task(cache_policy=CachePolicy(ttl=120))
def slow_add(x: int) -> int:
time.sleep(1)
return x * 2
@entrypoint(cache=InMemoryCache())
def main(inputs: dict) -> dict[str, int]:
result1 = slow_add(inputs["x"]).result()
result2 = slow_add(inputs["x"]).result()
return {"result1": result1, "result2": result2}
for chunk in main.stream({"x": 5}, stream_mode="updates"):
print(chunk)
#> {'slow_add': 10}
#> {'slow_add': 10, '__metadata__': {'cached': True}}
#> {'main': {'result1': 10, 'result2': 10}}
ttl以秒为单位指定。缓存将在该时间后失效。
错误后恢复
Copy
import time
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import StreamWriter
# This variable is just used for demonstration purposes to simulate a network failure.
# It's not something you will have in your actual code.
attempts = 0
@task()
def get_info():
"""
Simulates a task that fails once before succeeding.
Raises an exception on the first attempt, then returns "OK" on subsequent tries.
"""
global attempts
attempts += 1
if attempts < 2:
raise ValueError("Failure") # Simulate a failure on the first attempt
return "OK"
# Initialize an in-memory checkpointer for persistence
checkpointer = InMemorySaver()
@task
def slow_task():
"""
Simulates a slow-running task by introducing a 1-second delay.
"""
time.sleep(1)
return "Ran slow task."
@entrypoint(checkpointer=checkpointer)
def main(inputs, writer: StreamWriter):
"""
Main workflow function that runs the slow_task and get_info tasks sequentially.
Parameters:
- inputs: Dictionary containing workflow input values.
- writer: StreamWriter for streaming custom data.
The workflow first executes `slow_task` and then attempts to execute `get_info`,
which will fail on the first invocation.
"""
slow_task_result = slow_task().result() # Blocking call to slow_task
get_info().result() # Exception will be raised here on the first attempt
return slow_task_result
# Workflow execution configuration with a unique thread identifier
config = {
"configurable": {
"thread_id": "1" # Unique identifier to track workflow execution
}
}
# This invocation will take ~1 second due to the slow_task execution
try:
# First invocation will raise an exception due to the `get_info` task failing
main.invoke({'any_input': 'foobar'}, config=config)
except ValueError:
pass # Handle the failure gracefully
step_1 之后通过 interrupt 暂停了执行。该中断会提供恢复运行所需的指令。要恢复执行,需要发出一个包含 human_feedback 任务预期数据的 Command。
python
Copy
from langgraph.types import Command
# 继续执行
for event in graph.stream(Command(resume="baz"), config):
print(event)
print("\n")
审核工具调用
若要在执行前审核工具调用,我们可以添加一个调用interrupt 的 review_tool_call 函数。当调用该函数时,执行会暂停,直到我们发出命令使其继续。
Connect these docs programmatically to Claude, VSCode, and more via MCP for real-time answers.