LangGraph 可以改变您对所构建智能体的思考方式。当您使用 LangGraph 构建智能体时,您首先将其分解为称为节点的离散步骤。然后,您将描述每个节点的不同决策和转换。最后,您将通过每个节点可以读取和写入的共享状态将节点连接在一起。在本教程中,我们将引导您完成使用 LangGraph 构建客户支持电子邮件智能体的思考过程。

从您想要自动化的过程开始

假设您需要构建一个处理客户支持电子邮件的 AI 智能体。您的产品团队为您提供了以下要求: 智能体应该:
  • 阅读传入的客户电子邮件
  • 按紧急程度和主题对它们进行分类
  • 搜索相关文档以回答问题
  • 起草适当的响应
  • 将复杂问题升级给人类智能体
  • 在需要时安排后续跟进
要处理的示例场景:
  1. 简单的产品问题:“如何重置我的密码?”
  2. 错误报告:“当我选择 PDF 格式时,导出功能崩溃”
  3. 紧急计费问题:“我的订阅被收费两次!”
  4. 功能请求:“您能在移动应用程序中添加暗模式吗?”
  5. 复杂的技术问题:“我们的 API 集成间歇性地失败,出现 504 错误”
要在 LangGraph 中实现智能体,您通常会遵循相同的五个步骤。

步骤 1:将您的工作流程映射为离散步骤

首先识别过程中的不同步骤。每个步骤将成为一个节点(执行一件特定事情的函数)。然后勾勒这些步骤如何相互连接。 箭头显示可能的路径,但采取哪条路径的实际决策在每个节点内部发生。 现在您已经识别了工作流程中的组件,让我们了解每个节点需要做什么:
  • 阅读电子邮件:提取和解析电子邮件内容
  • 分类意图:使用 LLM 对紧急程度和主题进行分类,然后路由到适当的操作
  • 文档搜索:查询您的知识库以获取相关信息
  • 错误跟踪:在跟踪系统中创建或更新问题
  • 起草回复:生成适当的响应
  • 人工审查:升级给人类智能体以获得批准或处理
  • 发送回复:发送电子邮件响应
请注意,某些节点决定下一步去哪里(分类意图、起草回复、人工审查),而其他节点总是进入相同的下一步(阅读电子邮件总是进入分类意图,文档搜索总是进入起草回复)。

步骤 2:确定每个步骤需要做什么

对于图中的每个节点,确定它代表什么类型的操作以及它需要什么上下文才能正常工作。

LLM 步骤

当步骤需要理解、分析、生成文本或做出推理决策时:
  • 静态上下文(提示):分类类别、紧急程度定义、响应格式
  • 动态上下文(来自状态):电子邮件内容、发件人信息
  • 期望结果:确定路由的结构化分类
  • 静态上下文(提示):语气指南、公司政策、响应模板
  • 动态上下文(来自状态):分类结果、搜索结果、客户历史
  • 期望结果:准备好审查的专业电子邮件响应

数据步骤

当步骤需要从外部源检索信息时:
  • 参数:从意图和主题构建的查询
  • 重试策略:是,对于瞬态故障使用指数退避
  • 缓存:可以缓存常见查询以减少 API 调用
  • 参数:来自状态的客户电子邮件或 ID
  • 重试策略:是,但如果不可用则回退到基本信息
  • 缓存:是,使用生存时间来平衡新鲜度和性能

操作步骤

当步骤需要执行外部操作时:
  • 何时执行:批准后(人工或自动)
  • 重试策略:是,对于网络问题使用指数退避
  • 不应缓存:每次发送都是唯一的操作
  • 何时执行:当意图为”bug”时始终执行
  • 重试策略:是,不能丢失错误报告至关重要
  • 返回:要包含在响应中的工单 ID

用户输入步骤

当步骤需要人工干预时:
  • 决策上下文:原始电子邮件、草稿响应、紧急程度、分类
  • 预期输入格式:批准布尔值加可选的编辑响应
  • 触发时机:高紧急程度、复杂问题或质量问题

步骤 3:设计您的状态

状态是智能体中所有节点可访问的共享内存。将其视为智能体在完成过程时用来跟踪它学习和决定的一切的笔记本。

什么属于状态?

对每条数据提出以下问题:

包含在状态中

它需要在步骤之间持久化吗?如果是,则放入状态。

不要存储

您可以从其他数据派生它吗?如果是,则在需要时计算它,而不是将其存储在状态中。
对于我们的电子邮件智能体,我们需要跟踪:
  • 原始电子邮件和发件人信息(无法重建这些)
  • 分类结果(多个下游节点需要)
  • 搜索结果和客户数据(重新获取成本高)
  • 草稿响应(需要在审查过程中持久化)
  • 执行元数据(用于调试和恢复)

保持状态原始,按需格式化提示

关键原则:您的状态应该存储原始数据,而不是格式化的文本。在需要时在节点内部格式化提示。
这种分离意味着:
  • 不同的节点可以根据需要以不同方式格式化相同的数据
  • 您可以更改提示模板而无需修改状态模式
  • 调试更清晰 - 您可以准确看到每个节点接收到的数据
  • 您的智能体可以演进而不会破坏现有状态
让我们定义我们的状态:
from typing import TypedDict, Literal

# 定义电子邮件分类的结构
class EmailClassification(TypedDict):
    intent: Literal["question", "bug", "billing", "feature", "complex"]
    urgency: Literal["low", "medium", "high", "critical"]
    topic: str
    summary: str

class EmailAgentState(TypedDict):
    # 原始电子邮件数据
    email_content: str
    sender_email: str
    email_id: str

    # 分类结果
    classification: EmailClassification | None

    # 原始搜索/API 结果
    search_results: list[str] | None  # 原始文档块列表
    customer_history: dict | None  # 来自 CRM 的原始客户数据

    # 生成的内容
    draft_response: str | None
    messages: list[str] | None
请注意,状态仅包含原始数据 - 没有提示模板、没有格式化的字符串、没有指令。分类输出存储为单个字典,直接来自 LLM。

步骤 4:构建您的节点

现在我们将每个步骤实现为一个函数。LangGraph 中的节点只是一个接受当前状态并返回更新的 Python 函数。

适当处理错误

不同的错误需要不同的处理策略:
错误类型谁修复它策略何时使用
瞬态错误(网络问题、速率限制)系统(自动)重试策略通常在重试时解决的临时故障
LLM可恢复错误(工具故障、解析问题)LLM将错误存储在状态中并循环回去LLM 可以看到错误并调整其方法
用户可修复错误(缺少信息、不清楚的说明)人类使用 interrupt() 暂停需要用户输入才能继续
意外错误开发者让它们冒泡需要调试的未知问题
  • 瞬态错误
  • LLM可恢复
  • 用户可修复
  • 意外错误
添加重试策略以自动重试网络问题和速率限制:
from langgraph.types import RetryPolicy

workflow.add_node(
    "search_documentation",
    search_documentation,
    retry_policy=RetryPolicy(max_attempts=3, initial_interval=1.0)
)

实现邮件智能体节点

我们将每个节点实现为一个简单的函数。记住:节点接收状态、执行工作,并返回更新。
from typing import Literal
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command, RetryPolicy
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage

llm = ChatOpenAI(model="gpt-4")

def read_email(state: EmailAgentState) -> dict:
    """Extract and parse email content"""
    # In production, this would connect to your email service
    return {
        "messages": [HumanMessage(content=f"Processing email: {state['email_content']}")]
    }

def classify_intent(state: EmailAgentState) -> Command[Literal["search_documentation", "human_review", "draft_response", "bug_tracking"]]:
    """Use LLM to classify email intent and urgency, then route accordingly"""

    # Create structured LLM that returns EmailClassification dict
    structured_llm = llm.with_structured_output(EmailClassification)

    # Format the prompt on-demand, not stored in state
    classification_prompt = f"""
    Analyze this customer email and classify it:

    Email: {state['email_content']}
    From: {state['sender_email']}

    Provide classification including intent, urgency, topic, and summary.
    """

    # Get structured response directly as dict
    classification = structured_llm.invoke(classification_prompt)

    # Determine next node based on classification
    if classification['intent'] == 'billing' or classification['urgency'] == 'critical':
        goto = "human_review"
    elif classification['intent'] in ['question', 'feature']:
        goto = "search_documentation"
    elif classification['intent'] == 'bug':
        goto = "bug_tracking"
    else:
        goto = "draft_response"

    # Store classification as a single dict in state
    return Command(
        update={"classification": classification},
        goto=goto
    )
def search_documentation(state: EmailAgentState) -> Command[Literal["draft_response"]]:
    """Search knowledge base for relevant information"""

    # Build search query from classification
    classification = state.get('classification', {})
    query = f"{classification.get('intent', '')} {classification.get('topic', '')}"

    try:
        # Implement your search logic here
        # Store raw search results, not formatted text
        search_results = [
            "Reset password via Settings > Security > Change Password",
            "Password must be at least 12 characters",
            "Include uppercase, lowercase, numbers, and symbols"
        ]
    except SearchAPIError as e:
        # For recoverable search errors, store error and continue
        search_results = [f"Search temporarily unavailable: {str(e)}"]

    return Command(
        update={"search_results": search_results},  # Store raw results or error
        goto="draft_response"
    )

def bug_tracking(state: EmailAgentState) -> Command[Literal["draft_response"]]:
    """Create or update bug tracking ticket"""

    # Create ticket in your bug tracking system
    ticket_id = "BUG-12345"  # Would be created via API

    return Command(
        update={
            "search_results": [f"Bug ticket {ticket_id} created"],
            "current_step": "bug_tracked"
        },
        goto="draft_response"
    )
def draft_response(state: EmailAgentState) -> Command[Literal["human_review", "send_reply"]]:
    """Generate response using context and route based on quality"""

    classification = state.get('classification', {})

    # Format context from raw state data on-demand
    context_sections = []

    if state.get('search_results'):
        # Format search results for the prompt
        formatted_docs = "\n".join([f"- {doc}" for doc in state['search_results']])
        context_sections.append(f"Relevant documentation:\n{formatted_docs}")

    if state.get('customer_history'):
        # Format customer data for the prompt
        context_sections.append(f"Customer tier: {state['customer_history'].get('tier', 'standard')}")

    # Build the prompt with formatted context
    draft_prompt = f"""
    Draft a response to this customer email:
    {state['email_content']}

    Email intent: {classification.get('intent', 'unknown')}
    Urgency level: {classification.get('urgency', 'medium')}

    {chr(10).join(context_sections)}

    Guidelines:
    - Be professional and helpful
    - Address their specific concern
    - Use the provided documentation when relevant
    """

    response = llm.invoke(draft_prompt)

    # Determine if human review needed based on urgency and intent
    needs_review = (
        classification.get('urgency') in ['high', 'critical'] or
        classification.get('intent') == 'complex'
    )

    # Route to appropriate next node
    goto = "human_review" if needs_review else "send_reply"

    return Command(
        update={"draft_response": response.content},  # Store only the raw response
        goto=goto
    )

def human_review(state: EmailAgentState) -> Command[Literal["send_reply", END]]:
    """Pause for human review using interrupt and route based on decision"""

    classification = state.get('classification', {})

    # interrupt() must come first - any code before it will re-run on resume
    human_decision = interrupt({
        "email_id": state.get('email_id',''),
        "original_email": state.get('email_content',''),
        "draft_response": state.get('draft_response',''),
        "urgency": classification.get('urgency'),
        "intent": classification.get('intent'),
        "action": "Please review and approve/edit this response"
    })

    # Now process the human's decision
    if human_decision.get("approved"):
        return Command(
            update={"draft_response": human_decision.get("edited_response", state.get('draft_response',''))},
            goto="send_reply"
        )
    else:
        # Rejection means human will handle directly
        return Command(update={}, goto=END)

def send_reply(state: EmailAgentState) -> dict:
    """Send the email response"""
    # Integrate with email service
    print(f"Sending reply: {state['draft_response'][:100]}...")
    return {}

步骤 5:将其连接起来

现在我们将节点连接成一个工作图。由于我们的节点处理自己的路由决策,我们只需要几个基本的边。 要使用 interrupt() 启用人机回路,我们需要使用检查点器编译以在运行之间保存状态:
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import RetryPolicy

# 创建图
workflow = StateGraph(EmailAgentState)

# 添加具有适当错误处理的节点
workflow.add_node("read_email", read_email)
workflow.add_node("classify_intent", classify_intent)

# 为可能有瞬态故障的节点添加重试策略
workflow.add_node(
    "search_documentation",
    search_documentation,
    retry_policy=RetryPolicy(max_attempts=3)
)
workflow.add_node("bug_tracking", bug_tracking)
workflow.add_node("draft_response", draft_response)
workflow.add_node("human_review", human_review)
workflow.add_node("send_reply", send_reply)

# 仅添加基本边
workflow.add_edge(START, "read_email")
workflow.add_edge("read_email", "classify_intent")
workflow.add_edge("send_reply", END)

# 使用检查点器编译以实现持久化,如果使用 Local_Server 运行图 --> 请不使用检查点器编译
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
图结构是最小的,因为路由通过 Command 对象在节点内部发生。每个节点使用类型提示(如 Command[Literal["node1", "node2"]])声明它可以去哪里,使流程明确且可追溯。

试用您的智能体

让我们使用需要人工审查的紧急计费问题运行我们的智能体:
# 使用紧急计费问题进行测试
initial_state = {
    "email_content": "我因为订阅被重复扣款,这非常紧急!",
    "sender_email": "customer@example.com",
    "email_id": "email_123",
    "messages": []
}

# 使用 thread_id 运行以实现持久化
config = {"configurable": {"thread_id": "customer_123"}}
result = app.invoke(initial_state, config)
# 图将在 human_review 暂停
print(f"Draft ready for review: {result['draft_response'][:100]}...")

# 准备好后,提供人工输入以恢复
from langgraph.types import Command

human_response = Command(
    resume={
        "approved": True,
        "edited_response": "我们对重复收费深表歉意,已经立即为您启动退款流程……"
    }
)

# 恢复执行
final_result = app.invoke(human_response, config)
print(f"Email sent successfully!")
当图遇到 interrupt() 时会暂停,将所有内容保存到检查点器并等待。它可以在几天后恢复,从中断的地方准确继续。thread_id 确保此对话的所有状态一起保留。

总结和后续步骤

关键见解

构建这个电子邮件智能体向我们展示了 LangGraph 的思维方式:

高级考虑事项

本节探讨节点粒度设计中的权衡。大多数应用程序可以跳过这部分并使用上面显示的模式。
您可能想知道:为什么不将 Read EmailClassify Intent 合并为一个节点?或者为什么将 Doc Search 与 Draft Reply 分开?答案涉及弹性和可观测性之间的权衡。弹性考虑因素: LangGraph 的持久化执行在节点边界创建检查点。当工作流程在中断或故障后恢复时,它从执行停止的节点的开头开始。更小的节点意味着更频繁的检查点,这意味着如果出现问题需要重复的工作更少。如果您将多个操作组合成一个大节点,接近末尾的故障意味着从该节点的开头重新执行所有内容。我们为电子邮件智能体选择此分解的原因:
  • 外部服务隔离: Doc Search 和 Bug Track 是单独的节点,因为它们调用外部 API。如果搜索服务缓慢或失败,我们希望将其与 LLM 调用隔离。我们可以向这些特定节点添加重试策略而不影响其他节点。
  • 中间可见性:Classify Intent 作为自己的节点让我们在采取行动之前检查 LLM 决定了什么。这对于调试和监控很有价值——您可以准确看到智能体何时以及为什么路由到人工审查。
  • 不同的故障模式: LLM 调用、数据库查找和电子邮件发送具有不同的重试策略。单独的节点让您独立配置这些。
  • 可重用性和测试: 更小的节点更容易在隔离状态下测试并在其他工作流程中重用。
另一种有效方法:您可以将 Read EmailClassify Intent 合并为单个节点。您将失去在分类之前检查原始电子邮件的能力,并且在该节点的任何故障时将重复两个操作。对于大多数应用程序,单独节点的可观测性和调试优势值得权衡。应用程序级关注点:步骤 2 中的缓存讨论(是否缓存搜索结果)是应用程序级决策,而不是 LangGraph 框架功能。您根据特定要求在节点函数中实现缓存——LangGraph 不规定这一点。性能考虑因素:更多节点并不意味着更慢的执行。LangGraph 默认在后台写入检查点(异步持久性模式),因此您的图继续运行而无需等待检查点完成。这意味着您获得频繁的检查点而对性能影响最小。如果需要,您可以调整此行为——使用 "exit" 模式仅在完成时进行检查点,或使用 "sync" 模式阻止执行直到写入每个检查点。

从这里去哪里

这是关于使用 LangGraph 构建智能体的思考的介绍。您可以使用以下内容扩展此基础:
Connect these docs programmatically to Claude, VSCode, and more via MCP for real-time answers.