在本指南中,我们将介绍如何使用 UpstashRatelimitHandler 基于请求数量或 token 数量添加速率限制。此处理器使用 Upstash 的速率限制库,它利用 Upstash Redis Upstash Ratelimit 的工作原理是每次调用 limit 方法时向 Upstash Redis 发送 HTTP 请求。检查并更新用户的剩余 token/请求。根据剩余的 token,我们可以停止执行昂贵的操作,例如调用 LLM 或查询向量存储:
const response = await ratelimit.limit();
if (response.success) {
  execute_costly_operation();
}
UpstashRatelimitHandler 允许您在几分钟内将此速率限制逻辑集成到您的链中。

设置

首先,您需要访问 Upstash 控制台 并创建一个 redis 数据库(参见我们的文档)。创建数据库后,您需要设置环境变量:
UPSTASH_REDIS_REST_URL="****"
UPSTASH_REDIS_REST_TOKEN="****"
接下来,您需要安装 Upstash Ratelimit 和 @langchain/community
有关安装 LangChain 包的通用说明,请参阅此部分
npm
npm install @upstash/ratelimit @langchain/community @langchain/core
现在您已准备好向链添加速率限制!

按请求速率限制

假设我们希望允许用户每分钟调用链 10 次。实现这一点很简单:
const UPSTASH_REDIS_REST_URL = "****";
const UPSTASH_REDIS_REST_TOKEN = "****";

import {
  UpstashRatelimitHandler,
  UpstashRatelimitError,
} from "@langchain/community/callbacks/handlers/upstash_ratelimit";
import { RunnableLambda } from "@langchain/core/runnables";
import { Ratelimit } from "@upstash/ratelimit";
import { Redis } from "@upstash/redis";

// 创建速率限制器
const ratelimit = new Ratelimit({
  redis: new Redis({
    url: UPSTASH_REDIS_REST_URL,
    token: UPSTASH_REDIS_REST_TOKEN,
  }),
  // 每个窗口 10 个请求,窗口大小为 60 秒:
  limiter: Ratelimit.fixedWindow(10, "60 s"),
});

// 创建处理器
const user_id = "user_id"; // 应该是获取用户 id 的方法
const handler = new UpstashRatelimitHandler(user_id, {
  requestRatelimit: ratelimit,
});

// 创建模拟链
const chain = new RunnableLambda({ func: (str: string): string => str });

try {
  const response = await chain.invoke("hello world", {
    callbacks: [handler],
  });
  console.log(response);
} catch (err) {
  if (err instanceof UpstashRatelimitError) {
    console.log("处理速率限制。");
  }
}
请注意,我们将处理器传递给 invoke 方法,而不是在定义链时传递处理器。 对于 FixedWindow 以外的速率限制算法,请参阅 upstash-ratelimit 文档 在执行管道中的任何步骤之前,速率限制器将检查用户是否超过了请求限制。如果是,将引发 UpstashRatelimitError

按 Token 速率限制

另一种选择是基于以下内容对链调用进行速率限制:
  1. 提示中的 token 数量
  2. 提示和 LLM 完成中的 token 数量
这仅在您的链中有 LLM 时有效。另一个要求是您使用的 LLM 应在其 LLMOutput 中返回 token 使用情况。返回的 token 使用情况字典的格式取决于 LLM。要了解如何根据您的 LLM 配置处理器,请参阅下面配置部分的末尾。

工作原理

处理器将在调用 LLM 之前获取剩余的 token。如果剩余 token 大于 0,将调用 LLM。否则将引发 UpstashRatelimitError 调用 LLM 后,token 使用信息将用于从用户的剩余 token 中减去。在此阶段不会引发错误。

配置

对于第一种配置,只需像这样初始化处理器:
const user_id = "user_id"; // 应该是获取用户 id 的方法
const handler = new UpstashRatelimitHandler(user_id, {
  requestRatelimit: ratelimit,
});
对于第二种配置,以下是初始化处理器的方法:
const user_id = "user_id"; // 应该是获取用户 id 的方法
const handler = new UpstashRatelimitHandler(user_id, {
  tokenRatelimit: ratelimit,
});
您还可以同时使用基于请求和 token 的速率限制,只需传递 request_ratelimittoken_ratelimit 参数。 为了使 token 使用情况正常工作,LangChain.js 中的 LLM 步骤应返回以下格式的 token 使用情况字段:
{
  "tokenUsage": {
    "totalTokens": 123,
    "promptTokens": 456,
    "otherFields: "..."
  },
  "otherFields: "..."
}
但是,并非 LangChain.js 中的所有 LLM 都符合此格式。如果您的 LLM 使用不同的键返回相同的值,您可以通过将它们传递给处理器来使用参数 llmOutputTokenUsageFieldllmOutputTotalTokenFieldllmOutputPromptTokenField
const handler = new UpstashRatelimitHandler(
  user_id,
  {
    requestRatelimit: ratelimit
    llmOutputTokenUsageField: "usage",
    llmOutputTotalTokenField: "total",
    llmOutputPromptTokenField: "prompt"
  }
)
以下是一个使用 LLM 的链的示例:
const UPSTASH_REDIS_REST_URL = "****";
const UPSTASH_REDIS_REST_TOKEN = "****";
const OPENAI_API_KEY = "****";

import {
  UpstashRatelimitHandler,
  UpstashRatelimitError,
} from "@langchain/community/callbacks/handlers/upstash_ratelimit";
import { RunnableLambda, RunnableSequence } from "@langchain/core/runnables";
import { OpenAI } from "@langchain/openai";
import { Ratelimit } from "@upstash/ratelimit";
import { Redis } from "@upstash/redis";

// 创建速率限制器
const ratelimit = new Ratelimit({
  redis: new Redis({
    url: UPSTASH_REDIS_REST_URL,
    token: UPSTASH_REDIS_REST_TOKEN,
  }),
  // 每个窗口 500 个 token,窗口大小为 60 秒:
  limiter: Ratelimit.fixedWindow(500, "60 s"),
});

// 创建处理器
const user_id = "user_id"; // 应该是获取用户 id 的方法
const handler = new UpstashRatelimitHandler(user_id, {
  tokenRatelimit: ratelimit,
});

// 创建模拟链
const asStr = new RunnableLambda({ func: (str: string): string => str });
const model = new OpenAI({
  apiKey: OPENAI_API_KEY,
});
const chain = RunnableSequence.from([asStr, model]);

// 使用处理器调用链:
try {
  const response = await chain.invoke("hello world", {
    callbacks: [handler],
  });
  console.log(response);
} catch (err) {
  if (err instanceof UpstashRatelimitError) {
    console.log("处理速率限制。");
  }
}

Connect these docs programmatically to Claude, VSCode, and more via MCP for real-time answers.