推荐阅读在深入了解此内容之前,阅读以下内容可能会有所帮助:
如果您要导出大量跟踪,我们建议您使用批量数据导出功能,因为它可以更好地处理大量数据,并支持跨分区的自动重试和并行化。
查询运行(LangSmith 跟踪中的跨度数据)的推荐方法是使用 SDK 中的 list_runs 方法或 API 中的 /runs/query 端点。 LangSmith 以运行(跨度)数据格式中指定的简单格式存储跟踪。

使用过滤器参数

对于简单查询,您不必依赖我们的查询语法。您可以使用过滤器参数参考中指定的过滤器参数。
前置条件在运行下方示例前,请先初始化客户端。
from langsmith import Client

client = Client()
下面是若干通过关键字参数列出运行的示例:

列出项目内所有运行

project_runs = client.list_runs(project_name="<your_project>")

列出最近 24 小时的 LLM/Chat 运行

todays_llm_runs = client.list_runs(
    project_name="<your_project>",
    start_time=datetime.now() - timedelta(days=1),
    run_type="llm",
)

列出项目中的根运行

根运行没有父运行,is_root 会被设置为 True,可据此筛选。
root_runs = client.list_runs(
    project_name="<your_project>",
    is_root=True
)

列出未报错的运行

correct_runs = client.list_runs(project_name="<your_project>", error=False)

按运行 ID 列出

忽略其他参数按以下方式传入 run ID 列表时,其余过滤参数(project_namerun_type 等)都会被忽略,直接返回匹配的运行。
如果手头有 run ID 列表,可以直接查询:
run_ids = ['a36092d2-4ad5-4fb4-9c0d-0dba9a2ed836','9398e6be-964f-4aa4-8ae9-ad78cd4b7074']
selected_runs = client.list_runs(id=run_ids)

使用过滤查询语言

对于更复杂的需求,可以使用过滤查询语言参考中描述的查询语法。

列出对话线程中的全部根运行

以下示例展示如何获取单个对话线程内的运行。关于线程设置,请参阅线程使用指南。 将线程 ID 设为共享值即可将运行归为一组。LangSmith UI 支持以下三个元数据键:session_idconversation_idthread_id(其中 session ID 也叫 tracing project ID)。下面的查询会匹配任意一个键。
group_key = "<your_thread_id>"
filter_string = f'and(in(metadata_key, ["session_id","conversation_id","thread_id"]), eq(metadata_value, "{group_key}"))'
thread_runs = client.list_runs(
    project_name="<your_project>",
    filter=filter_string,
    is_root=True
)

列出名为 “extractor” 且根运行获得 user_score=1 反馈的运行

client.list_runs(
    project_name="<your_project>",
    filter='eq(name, "extractor")',
    trace_filter='and(eq(feedback_key, "user_score"), eq(feedback_score, 1))'
)

列出 star_rating 评分大于 4 的运行

client.list_runs(
    project_name="<your_project>",
    filter='and(eq(feedback_key, "star_rating"), gt(feedback_score, 4))'
)

列出耗时超过 5 秒的运行

client.list_runs(project_name="<your_project>", filter='gt(latency, "5s")')

列出 error 不为 null 的运行

client.list_runs(project_name="<your_project>", filter='neq(error, null)')

列出 start_time 晚于特定时间戳的运行

client.list_runs(project_name="<your_project>", filter='gt(start_time, "2023-07-15T12:34:56Z")')

列出包含字符串 “substring” 的运行

client.list_runs(project_name="<your_project>", filter='search("substring")')

列出带有 git hash “2aa1cf4” 标签的运行

client.list_runs(project_name="<your_project>", filter='has(tags, "2aa1cf4")')

列出在指定时间后开始,且满足 “error 不为 null” 或 “Correctness 反馈为 0” 的运行

client.list_runs(
  project_name="<your_project>",
  filter='and(gt(start_time, "2023-07-15T12:34:56Z"), or(neq(error, null), and(eq(feedback_key, "Correctness"), eq(feedback_score, 0.0))))'
)

复杂示例:标签包含 “experimental”/“beta” 且延迟大于 2 秒

client.list_runs(
  project_name="<your_project>",
  filter='and(or(has(tags, "experimental"), has(tags, "beta")), gt(latency, 2))'
)

按全文搜索跟踪树

search() 在不指定字段时会在运行的所有字符串字段中执行全文检索,便于快速定位包含某个关键字的跟踪。
client.list_runs(
  project_name="<your_project>",
  filter='search("image classification")'
)

检查元数据是否存在

若要判断元数据是否存在,可使用 eq 运算符,并可配合 and 指定特定值。这在记录结构化信息时非常实用。
to_search = {
    "user_id": ""
}

# 检查是否存在带有 "user_id" 元数据键的运行
client.list_runs(
  project_name="default",
  filter="eq(metadata_key, 'user_id')"
)
# 检查 user_id=4070f233-f61e-44eb-bff1-da3c163895a3 的运行
client.list_runs(
  project_name="default",
  filter="and(eq(metadata_key, 'user_id'), eq(metadata_value, '4070f233-f61e-44eb-bff1-da3c163895a3'))"
)

检查元数据中的环境信息

常见做法是把环境信息写入元数据。需要筛选包含环境字段的运行时,可复用上述模式:
client.list_runs(
  project_name="default",
  filter="and(eq(metadata_key, 'environment'), eq(metadata_value, 'production'))"
)

检查元数据中的 conversation ID

另一种串联同一会话跟踪的方式是共享 conversation_id。若需按该 ID 筛选,可在元数据中搜索对应值。
client.list_runs(
  project_name="default",
  filter="and(eq(metadata_key, 'conversation_id'), eq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

针对键值对的负向过滤

可以对元数据、输入或输出的键值对执行负向过滤,以排除特定运行。以下示例基于元数据,输入/输出同理。
# 查找元数据中不含 "conversation_id" 键的运行
client.list_runs(
  project_name="default",
  filter="and(neq(metadata_key, 'conversation_id'))"
)

# 查找 conversation_id 不等于 a1b2c3d4-e5f6-7890 的运行
client.list_runs(
  project_name="default",
  filter="and(eq(metadata_key, 'conversation_id'), neq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

# 查找不存在 "conversation_id" 键且值不为 a1b2c3d4-e5f6-7890 的运行
client.list_runs(
  project_name="default",
  filter="and(neq(metadata_key, 'conversation_id'), neq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

# 查找没有 "conversation_id" 键但值等于 a1b2c3d4-e5f6-7890 的运行
client.list_runs(
  project_name="default",
  filter="and(neq(metadata_key, 'conversation_id'), eq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

组合多个过滤条件

可通过 and 配合其他函数组合条件。如下示例同时要求运行名为 “ChatOpenAI” 且元数据包含特定 conversation_id
client.list_runs(
  project_name="default",
  filter="and(eq(name, 'ChatOpenAI'), eq(metadata_key, 'conversation_id'), eq(metadata_value, '69b12c91-b1e2-46ce-91de-794c077e8151'))"
)

树级过滤

列出名称为 “RetrieveDocs”、其根运行的 user_score 反馈为 1,且整条跟踪中存在名为 “ExpandQuery” 的运行。 当需要在满足多个状态/步骤的前提下筛选特定运行时,此模式非常有用。
client.list_runs(
    project_name="<your_project>",
    filter='eq(name, "RetrieveDocs")',
    trace_filter='and(eq(feedback_key, "user_score"), eq(feedback_score, 1))',
    tree_filter='eq(name, "ExpandQuery")'
)

高阶:导出包含子工具信息的扁平化跟踪视图

下面的 Python 示例展示如何导出扁平化的跟踪视图,包含代理在每条跟踪中调用的工具(嵌套运行)。可用于分析代理在多条跟踪中的行为。 示例会在指定时间范围内查询所有工具运行,并按父(根)运行 ID 分组,再获取各根运行的名称、输入、输出等信息,与子运行数据合并。 为优化查询,示例采取了:
  1. 查询工具运行时仅选择必要字段,减少耗时;
  2. 在处理工具运行的同时分批获取根运行。
from collections import defaultdict
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime, timedelta

from langsmith import Client
from tqdm.auto import tqdm

client = Client()
project_name = "my-project"
num_days = 30

# 列出所有工具运行
tool_runs = client.list_runs(
    project_name=project_name,
    start_time=datetime.now() - timedelta(days=num_days),
    run_type="tool",
    # 无需获取输入、输出等可能增加查询时间的字段
    select=["trace_id", "name", "run_type"],
)

data = []
futures: list[Future] = []
trace_cursor = 0
trace_batch_size = 50

tool_runs_by_parent = defaultdict(lambda: defaultdict(set))
# 避免触发速率限制
with ThreadPoolExecutor(max_workers=2) as executor:
    # 按父运行 ID 对工具运行分组
    for run in tqdm(tool_runs):
        # 收集同一条跟踪中调用过的所有工具
        tool_runs_by_parent[run.trace_id]["tools_involved"].add(run.name)
        # 根据需要批量发送父运行 ID
        # 这样可以在处理工具运行的同时按批查询根运行
        if len(tool_runs_by_parent) % trace_batch_size == 0:
            if this_batch := list(tool_runs_by_parent.keys())[
                trace_cursor : trace_cursor + trace_batch_size
            ]:
                trace_cursor += trace_batch_size
                futures.append(
                    executor.submit(
                        client.list_runs,
                        project_name=project_name,
                        run_ids=this_batch,
                        select=["name", "inputs", "outputs", "run_type"],
                    )
                )
    if this_batch := list(tool_runs_by_parent.keys())[trace_cursor:]:
        futures.append(
            executor.submit(
                client.list_runs,
                project_name=project_name,
                run_ids=this_batch,
                select=["name", "inputs", "outputs", "run_type"],
            )
        )

for future in tqdm(futures):
    root_runs = future.result()
    for root_run in root_runs:
        root_data = tool_runs_by_parent[root_run.id]
        data.append(
            {
                "run_id": root_run.id,
                "run_name": root_run.name,
                "run_type": root_run.run_type,
                "inputs": root_run.inputs,
                "outputs": root_run.outputs,
                "tools_involved": list(root_data["tools_involved"]),
            }
        )

# (可选)转换为 pandas DataFrame
import pandas as pd

df = pd.DataFrame(data)
df.head()

高阶:导出含反馈的检索器输入输出

如果需要依据检索器行为微调向量或诊断端到端系统问题,可以使用该查询。下面的 Python 样例展示如何导出具有特定反馈分数的跟踪中,检索器的输入与输出。
from collections import defaultdict
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime, timedelta

import pandas as pd
from langsmith import Client
from tqdm.auto import tqdm

client = Client()
project_name = "your-project-name"
num_days = 1

# 列出所有工具运行
retriever_runs = client.list_runs(
    project_name=project_name,
    start_time=datetime.now() - timedelta(days=num_days),
    run_type="retriever",
    # 这次需要保留输入和输出,因为它们可能被查询扩展步骤修改
    select=["trace_id", "name", "run_type", "inputs", "outputs"],
    trace_filter='eq(feedback_key, "user_score")',
)

data = []
futures: list[Future] = []
trace_cursor = 0
trace_batch_size = 50

retriever_runs_by_parent = defaultdict(lambda: defaultdict(list))
# 避免触发速率限制
with ThreadPoolExecutor(max_workers=2) as executor:
    # 按父运行 ID 对检索器运行分组
    for run in tqdm(retriever_runs):
        # 收集同一条跟踪中的全部检索调用
        for k, v in run.inputs.items():
            retriever_runs_by_parent[run.trace_id][f"retriever.inputs.{k}"].append(v)
        for k, v in (run.outputs or {}).items():
            # 展开输出的文档列表
            retriever_runs_by_parent[run.trace_id][f"retriever.outputs.{k}"].extend(v)
        # 根据需要批量发送父运行 ID
        # 这样可以在处理检索器运行的同时按批查询根运行
        if len(retriever_runs_by_parent) % trace_batch_size == 0:
            if this_batch := list(retriever_runs_by_parent.keys())[
                trace_cursor : trace_cursor + trace_batch_size
            ]:
                trace_cursor += trace_batch_size
                futures.append(
                    executor.submit(
                        client.list_runs,
                        project_name=project_name,
                        run_ids=this_batch,
                        select=[
                            "name",
                            "inputs",
                            "outputs",
                            "run_type",
                            "feedback_stats",
                        ],
                    )
                )
    if this_batch := list(retriever_runs_by_parent.keys())[trace_cursor:]:
        futures.append(
            executor.submit(
                client.list_runs,
                project_name=project_name,
                run_ids=this_batch,
                select=["name", "inputs", "outputs", "run_type"],
            )
        )

for future in tqdm(futures):
    root_runs = future.result()
    for root_run in root_runs:
        root_data = retriever_runs_by_parent[root_run.id]
        feedback = {
            f"feedback.{k}": v.get("avg")
            for k, v in (root_run.feedback_stats or {}).items()
        }
        inputs = {f"inputs.{k}": v for k, v in root_run.inputs.items()}
        outputs = {f"outputs.{k}": v for k, v in (root_run.outputs or {}).items()}
        data.append(
            {
                "run_id": root_run.id,
                "run_name": root_run.name,
                **inputs,
                **outputs,
                **feedback,
                **root_data,
            }
        )

# (可选)转换为 pandas DataFrame
import pandas as pd
df = pd.DataFrame(data)
df.head()

Connect these docs programmatically to Claude, VSCode, and more via MCP for real-time answers.