函数式 API 允许您以最小的代码更改将 LangGraph 的关键功能 — 持久化内存人在回路流式传输 — 添加到您的应用程序中。 它旨在将这些功能集成到可能使用标准语言原语进行分支和控制流的现有代码中,例如 if 语句、for 循环和函数调用。与许多需要将代码重组为显式管道或 DAG 的数据编排框架不同,函数式 API 允许您在不强制执行严格执行模型的情况下合并这些能力。 函数式 API 使用两个关键构建块:
  • entrypoint – 入口点封装工作流程逻辑并管理执行流程,包括处理长时间运行的任务和中断。
  • task – 表示离散的工作单元,例如 API 调用或数据处理步骤,可以在入口点内异步执行。任务返回类似 future 的对象,可以等待或同步解析。
这为构建具有状态管理和流式传输的工作流程提供了最小抽象。
有关如何使用函数式 API 的信息,请参阅使用函数式 API

函数式 API vs. 图 API

对于更喜欢声明式方法的用户,LangGraph 的图 API 允许您使用图范式定义工作流程。两个 API 共享相同的底层运行时,因此您可以在同一应用程序中一起使用它们。 以下是一些关键差异:
  • 控制流:函数式 API 不需要考虑图结构。您可以使用标准 Python 构造来定义工作流程。这通常会减少您需要编写的代码量。
  • 短期记忆图 API 需要声明状态,可能需要定义归约器来管理图状态的更新。@entrypoint@tasks 不需要显式状态管理,因为它们的状态范围限定于函数,不在函数之间共享。
  • 检查点:两个 API 都生成和使用检查点。在图 API 中,在每个超级步骤之后生成新的检查点。在函数式 API 中,当执行任务时,它们的结果保存到与给定入口点关联的现有检查点,而不是创建新检查点。
  • 可视化:图 API 使可视化工作流程作为图变得容易,这对于调试、理解工作流程和与他人共享很有用。函数式 API 不支持可视化,因为图在运行时动态生成。

示例

下面我们演示一个编写论文并中断以请求人工审查的简单应用程序。
import { MemorySaver, entrypoint, task, interrupt } from "@langchain/langgraph";

const writeEssay = task("writeEssay", async (topic: string) => {
  // A placeholder for a long-running task.
  await new Promise((resolve) => setTimeout(resolve, 1000));
  return `An essay about topic: ${topic}`;
});

const workflow = entrypoint(
  { checkpointer: new MemorySaver(), name: "workflow" },
  async (topic: string) => {
    const essay = await writeEssay(topic);
    const isApproved = interrupt({
      // Any json-serializable payload provided to interrupt as argument.
      // It will be surfaced on the client side as an Interrupt when streaming data
      // from the workflow.
      essay, // The essay we want reviewed.
      // We can add any additional information that we need.
      // For example, introduce a key called "action" with some instructions.
      action: "Please approve/reject the essay",
    });

    return {
      essay, // The essay that was generated
      isApproved, // Response from HIL
    };
  }
);
This workflow will write an essay about the topic “cat” and then pause to get a review from a human. The workflow can be interrupted for an indefinite amount of time until a review is provided.When the workflow is resumed, it executes from the very start, but because the result of the writeEssay task was already saved, the task result will be loaded from the checkpoint instead of being recomputed.
import { v4 as uuidv4 } from "uuid";
import { MemorySaver, entrypoint, task, interrupt } from "@langchain/langgraph";

const writeEssay = task("writeEssay", async (topic: string) => {
  // This is a placeholder for a long-running task.
  await new Promise(resolve => setTimeout(resolve, 1000));
  return `An essay about topic: ${topic}`;
});

const workflow = entrypoint(
  { checkpointer: new MemorySaver(), name: "workflow" },
  async (topic: string) => {
    const essay = await writeEssay(topic);
    const isApproved = interrupt({
      // Any json-serializable payload provided to interrupt as argument.
      // It will be surfaced on the client side as an Interrupt when streaming data
      // from the workflow.
      essay, // The essay we want reviewed.
      // We can add any additional information that we need.
      // For example, introduce a key called "action" with some instructions.
      action: "Please approve/reject the essay",
    });

    return {
      essay, // The essay that was generated
      isApproved, // Response from HIL
    };
  }
);

const threadId = uuidv4();

const config = {
  configurable: {
    thread_id: threadId
  }
};

for await (const item of workflow.stream("cat", config)) {
  console.log(item);
}
{ writeEssay: 'An essay about topic: cat' }
{
  __interrupt__: [{
    value: { essay: 'An essay about topic: cat', action: 'Please approve/reject the essay' },
    resumable: true,
    ns: ['workflow:f7b8508b-21c0-8b4c-5958-4e8de74d2684'],
    when: 'during'
  }]
}
An essay has been written and is ready for review. Once the review is provided, we can resume the workflow:
import { Command } from "@langchain/langgraph";

// Get review from a user (e.g., via a UI)
// In this case, we're using a bool, but this can be any json-serializable value.
const humanReview = true;

for await (const item of workflow.stream(new Command({ resume: humanReview }), config)) {
  console.log(item);
}
{ workflow: { essay: 'An essay about topic: cat', isApproved: true } }
The workflow has been completed and the review has been added to the essay.

Entrypoint

entrypoint 函数可用于从函数创建工作流程。它封装工作流程逻辑并管理执行流程,包括处理_长时间运行的任务_和中断

定义

entrypoint 通过使用配置和函数调用 entrypoint 函数来定义。 函数必须接受单个位置参数,该参数用作工作流程输入。如果您需要传递多个数据,请使用对象作为第一个参数的输入类型。 使用函数创建 entrypoint 会产生一个工作流程实例,该实例有助于管理工作流程的执行(例如,处理流式传输、恢复和检查点)。 您通常希望将检查点器传递给 entrypoint 函数以启用持久化并使用人在回路等功能。
import { entrypoint } from "@langchain/langgraph";

const myWorkflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (someInput: Record<string, any>): Promise<number> => {
    // some logic that may involve long-running tasks like API calls,
    // and may be interrupted for human-in-the-loop
    return result;
  }
);
序列化 entrypoint 的输入输出必须是 JSON 可序列化的,以支持检查点。有关更多详细信息,请参阅序列化部分。

执行

使用 entrypoint 函数将返回一个可以使用 invokestream 方法执行的对象。
  • Invoke
  • Stream
const config = {
  configurable: {
    thread_id: "some_thread_id"
  }
};
await myWorkflow.invoke(someInput, config); // Wait for the result

恢复

interrupt 之后恢复执行可以通过将resume值传递给 Command 原语来完成。
  • Invoke
  • Stream
import { Command } from "@langchain/langgraph";

const config = {
  configurable: {
    thread_id: "some_thread_id"
  }
};

await myWorkflow.invoke(new Command({ resume: someResumeValue }), config);
Resuming after an error To resume after an error, run the entrypoint with null and the same thread id (config). This assumes that the underlying error has been resolved and execution can proceed successfully.
  • Invoke
  • Stream
const config = {
  configurable: {
    thread_id: "some_thread_id"
  }
};

await myWorkflow.invoke(null, config);

Short-term memory

When an entrypoint is defined with a checkpointer, it stores information between successive invocations on the same thread id in checkpoints. This allows accessing the state from the previous invocation using the getPreviousState function. By default, the getPreviousState function returns the return value of the previous invocation.
import { entrypoint, getPreviousState } from "@langchain/langgraph";

const myWorkflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (number: number) => {
    const previous = getPreviousState<number>() ?? 0;
    return number + previous;
  }
);

const config = {
  configurable: {
    thread_id: "some_thread_id",
  },
};

await myWorkflow.invoke(1, config); // 1 (previous was undefined)
await myWorkflow.invoke(2, config); // 3 (previous was 1 from the previous invocation)

entrypoint.final

entrypoint.final is a special primitive that can be returned from an entrypoint and allows decoupling the value that is saved in the checkpoint from the return value of the entrypoint. The first value is the return value of the entrypoint, and the second value is the value that will be saved in the checkpoint.
import { entrypoint, getPreviousState } from "@langchain/langgraph";

const myWorkflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (number: number) => {
    const previous = getPreviousState<number>() ?? 0;
    // This will return the previous value to the caller, saving
    // 2 * number to the checkpoint, which will be used in the next invocation
    // for the `previous` parameter.
    return entrypoint.final({
      value: previous,
      save: 2 * number,
    });
  }
);

const config = {
  configurable: {
    thread_id: "1",
  },
};

await myWorkflow.invoke(3, config); // 0 (previous was undefined)
await myWorkflow.invoke(1, config); // 6 (previous was 3 * 2 from the previous invocation)

Task

A task represents a discrete unit of work, such as an API call or data processing step. It has two key characteristics:
  • Asynchronous Execution: Tasks are designed to be executed asynchronously, allowing multiple operations to run concurrently without blocking.
  • Checkpointing: Task results are saved to a checkpoint, enabling resumption of the workflow from the last saved state. (See persistence for more details).

Definition

Tasks are defined using the task function, which wraps a regular function.
import { task } from "@langchain/langgraph";

const slowComputation = task("slowComputation", async (inputValue: any) => {
  // Simulate a long-running operation
  return result;
});
Serialization The outputs of tasks must be JSON-serializable to support checkpointing.

Execution

Tasks can only be called from within an entrypoint, another task, or a state graph node. Tasks cannot be called directly from the main application code. When you call a task, it returns a Promise that can be awaited.
const myWorkflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (someInput: number): Promise<number> => {
    return await slowComputation(someInput);
  }
);

When to use a task

Tasks are useful in the following scenarios:
  • Checkpointing: When you need to save the result of a long-running operation to a checkpoint, so you don’t need to recompute it when resuming the workflow.
  • Human-in-the-loop: If you’re building a workflow that requires human intervention, you MUST use tasks to encapsulate any randomness (e.g., API calls) to ensure that the workflow can be resumed correctly. See the determinism section for more details.
  • Parallel Execution: For I/O-bound tasks, tasks enable parallel execution, allowing multiple operations to run concurrently without blocking (e.g., calling multiple APIs).
  • Observability: Wrapping operations in tasks provides a way to track the progress of the workflow and monitor the execution of individual operations using LangSmith.
  • Retryable Work: When work needs to be retried to handle failures or inconsistencies, tasks provide a way to encapsulate and manage the retry logic.

Serialization

There are two key aspects to serialization in LangGraph:
  1. entrypoint inputs and outputs must be JSON-serializable.
  2. task outputs must be JSON-serializable.
These requirements are necessary for enabling checkpointing and workflow resumption. Use primitives like objects, arrays, strings, numbers, and booleans to ensure that your inputs and outputs are serializable. Serialization ensures that workflow state, such as task results and intermediate values, can be reliably saved and restored. This is critical for enabling human-in-the-loop interactions, fault tolerance, and parallel execution. Providing non-serializable inputs or outputs will result in a runtime error when a workflow is configured with a checkpointer.

Determinism

To utilize features like human-in-the-loop, any randomness should be encapsulated inside of tasks. This guarantees that when execution is halted (e.g., for human in the loop) and then resumed, it will follow the same sequence of steps, even if task results are non-deterministic. LangGraph achieves this behavior by persisting task and subgraph results as they execute. A well-designed workflow ensures that resuming execution follows the same sequence of steps, allowing previously computed results to be retrieved correctly without having to re-execute them. This is particularly useful for long-running tasks or tasks with non-deterministic results, as it avoids repeating previously done work and allows resuming from essentially the same. While different runs of a workflow can produce different results, resuming a specific run should always follow the same sequence of recorded steps. This allows LangGraph to efficiently look up task and subgraph results that were executed prior to the graph being interrupted and avoid recomputing them.

Idempotency

Idempotency ensures that running the same operation multiple times produces the same result. This helps prevent duplicate API calls and redundant processing if a step is rerun due to a failure. Always place API calls inside tasks functions for checkpointing, and design them to be idempotent in case of re-execution. Re-execution can occur if a task starts, but does not complete successfully. Then, if the workflow is resumed, the task will run again. Use idempotency keys or verify existing results to avoid duplication.

Common Pitfalls

Handling side effects

Encapsulate side effects (e.g., writing to a file, sending an email) in tasks to ensure they are not executed multiple times when resuming a workflow.
  • Incorrect
  • Correct
In this example, a side effect (writing to a file) is directly included in the workflow, so it will be executed a second time when resuming the workflow.
import { entrypoint, interrupt } from "@langchain/langgraph";
import fs from "fs";

const myWorkflow = entrypoint(
  { checkpointer, name: "workflow },
  async (inputs: Record<string, any>) => {
    // This code will be executed a second time when resuming the workflow.
    // Which is likely not what you want.
    fs.writeFileSync("output.txt", "Side effect executed");
    const value = interrupt("question");
    return value;
  }
);

Non-deterministic control flow

Operations that might give different results each time (like getting current time or random numbers) should be encapsulated in tasks to ensure that on resume, the same result is returned.
  • In a task: Get random number (5) → interrupt → resume → (returns 5 again) → …
  • Not in a task: Get random number (5) → interrupt → resume → get new random number (7) → …
This is especially important when using human-in-the-loop workflows with multiple interrupt calls. LangGraph keeps a list of resume values for each task/entrypoint. When an interrupt is encountered, it’s matched with the corresponding resume value. This matching is strictly index-based, so the order of the resume values should match the order of the interrupts. If order of execution is not maintained when resuming, one interrupt call may be matched with the wrong resume value, leading to incorrect results. Please read the section on determinism for more details.
  • Incorrect
  • Correct
In this example, the workflow uses the current time to determine which task to execute. This is non-deterministic because the result of the workflow depends on the time at which it is executed.
import { entrypoint, interrupt } from "@langchain/langgraph";

const myWorkflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (inputs: { t0: number }) => {
    const t1 = Date.now();

    const deltaT = t1 - inputs.t0;

    if (deltaT > 1000) {
      const result = await slowTask(1);
      const value = interrupt("question");
      return { result, value };
    } else {
      const result = await slowTask(2);
      const value = interrupt("question");
      return { result, value };
    }
  }
);

Connect these docs programmatically to Claude, VSCode, and more via MCP for real-time answers.