写在前面
-
prompt是怎么写的? -
benchmark是如何做的? -
有一个奇怪的多轮对话的报错,是为什么?
背景
-
用户输入 (User Message):对话的起点。 -
AI 的思考过程 (Thought):Agent 的中间推理步骤。 -
工具调用 (Tool Call):Agent 决定使用哪个工具,以及传入的参数。 -
工具执行结果 (Tool Output):工具返回给 Agent 的信息。 -
AI 的最终响应 (AI Message):Agent 回复给用户的内容。
-
调试与可观测性 (Debugging & Observability):当 Agent 行为不符合预期时,Trajectory 是定位问题的最直接、最有效的工具。开发者可以清晰地看到每一步的输入输出,快速诊断是模型幻觉、工具错误还是逻辑流问题。 -
审计与归档 (Audit & Archive):在金融、法务、客服等需要合规和追溯的场景下,Trajectory 提供了一份不可篡改的、详细的交互历史。这既可以作为审计凭证,也可以作为历史案例进行归档,计与归档 (Audit & Archive):在金融、法务、客服等需要合规和追溯的场景下,Trajectory 提供了一份不可篡改的、详细的交互历史。这既可以作为审计凭证,也可以作为历史案例进行归档,用于后续的分析和复盘。 -
评估与优化 (Evaluation & Optimization):通过分析大量的 Trajectory 数据,我们可以评估 Agent 在不同任务上的表现,发现其能力的边界和常见的失败模式,为后续的模型微调(Fine-tuning)或 Prompt Engineering 提供数据支持。
实现目标
-
调试与审计:开发者可以清晰地回溯 Agent 的思考链、工具调用和模型响应,快速定位问题。 -
归档与分析:将 Agent 的完整交互历史永久化存储,用于后续的行为分析和模型优化。 -
可观测性与分布式追踪:借鉴 OpenTelemetry 等分布式追踪系统的理念,为每一次用户交互(Trace)和其中的每一个步骤(Span)分配唯一 ID,实现跨组件、跨服务的行为链路追踪。 -
多轮对话分组:能准确地将一次完整的端到端对话(从用户输入到最终回复)划分为一个独立的 Trace,便于分组查看和分析。 -
高可扩展性:系统设计应与具体存储后端解耦,支持从本地文件轻松扩展到 Kafka、数据库或专业日志系统。 -
灵活集成:既能无缝集成到 LangGraph 的 ReAct Agent 中,也能作为一个独立的节点(Node)在任何 LangGraph 图中即插即用。
设计与实现
设计
-
节点装饰器 (Node Decorator):最初,想为 LangGraph 中的每一个 Node(节点)都包裹一个装饰器。这个装饰器会在节点执行前后自动记录日志。
-
代码类似
-
优点:逻辑和业务分离,看起来很优雅。 -
缺点:实现起来非常复杂。LangGraph 的节点功能各异,有的调用 LLM,有的执行工具,有的只是简单的逻辑判断。为这些异构的节点设计一个通用的、能提取所有关键信息的装饰器,成本很高,且容易与 LangGraph 的内部机制产生冲突。 class TrajectoryHook:
"""Hook for automatically recording LangGraph execution."""
def __init__(self, recorder: TrajectoryRecorder):
self.recorder = recorder
self._session_id: Optional[str] = None
def wrap_node(self, node_name: str, node_func: Callable) -> Callable:
"""Wrap a node function to record its execution."""
@wraps(node_func)
async def wrapped_node(state: Dict[str, Any]) -> Any:
if not self._session_id:
return await node_func(state)
# Record node start
await self.recorder.record_event(
self._session_id,
node_name=node_name,
event_type="node_start",
data={"state_keys": list(state.keys()) if isinstance(state, dict) else None}
)
try:
# Execute node
if asyncio.iscoroutinefunction(node_func):
result = await node_func(state)
else:
result = node_func(state)
# Record node end
await self.recorder.record_event(
self._session_id,
node_name=node_name,
event_type="node_end",
data={"has_result": result is not None}
)
# Record messages if present
if isinstance(result, dict) and "messages" in result:
messages = result["messages"]
if isinstance(messages, list):
for msg in messages:
if hasattr(msg, "content"): # 确保是消息对象
await self.recorder.record_message(self._session_id, msg)
# Record node output
await self.recorder.record_node_output(
self._session_id,
node_name,
result
)
return result
except Exception as e:
# Record error
await self.recorder.record_error(
self._session_id,
error_type=type(e).__name__,
error_message=str(e),
node_name=node_name
)
raise
return wrapped_node
async def __aenter__(self):
"""Start recording session."""
self._session_id = await self.recorder.start_session()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""End recording session."""
if self._session_id:
success = exc_type is None
await self.recorder.end_session(self._session_id, success=success)
self._session_id = None
-
优点:这是一个非常可行的方案。Checkpoint 是状态变更的关键枢纽,在这里做文章能确保捕捉到所有重要的变化。 -
缺点:它将轨迹记录与状态持久化逻辑强耦合。如果我们未来想更换 Checkpoint 的后端(比如从内存换到 Redis),或者在某些场景下禁用 Checkpoint,可能会影响到轨迹记录功能。当然也可以做一个组合的checkpoint,也是一个可行的方案!
-
我们设计了一个 MessageProcessor,它的唯一职责就是对比上一次的状态和当前状态,找出 messages 列表中的**新消息**。 -
一旦发现新消息,就将其标准化为一条或多条轨迹事件(Event),并赋予它们正确的 Trace ID 和 Span ID。 -
这种方式不关心这些消息是哪个 Node 产生的,也不关心 Checkpoint 是否启用。它只关心最终的结果,从而实现了与 LangGraph 内部执行逻辑的解耦。
实现
-
TraceContext: 追踪上下文,一个轻量级的数据容器,负责在整个 LangGraph 的执行流程中传递追踪状态,主要包含 trace_id、span_id和 parent_sntext: 追踪上下文,一个轻量级的数据容器,负责在整个 LangGraph 的执行流程中传递追踪状态,主要包含 trace_id、span_id 和 parent_span_id。 -
TrajectoryRecorder: 轨迹记录器,是系统的核心协调者。它本身是无状态的,接收处理过的事件数据,并通过一个可插拔的 TrajectoryBackend 将数据写入到指定的存ecorder: 轨迹记录器,是系统的核心协调者。它本身是无状态的,接收处理过的事件数据,并通过一个可插拔的 TrajectoryBackend 将数据写入到指定的存储中。 -
TrajectoryBackend: 存储后端,定义了数据写入的接口。LocalFileBackendend: 存储后端,定义了数据写入的接口。LocalFileBackend 是其默认实现,将轨迹数据以 JSONL 格式写入本地文件,每一行代表一个事件。这种设计使得更换后端(如 Kafka、Redis、PostgreSQL)变得非常简单。 -
MessageProcessor: 消息处理器,负责将 LangGraph state[‘messages’] 中的原始消息转换为结构化的、可记录的 Trajectory 事件。它能识别消息类型(human, ai, tool)并生成相应的事件 payload。 -
ReactTrajectoryHook **TrajectoryNode**: 集成层,负责将轨迹记录功能接入 LangGraph。
-
ReactTrajectoryHook 作为 ReAct Agent 的 post_model_hook,在每次模型调用后触发,自动管理 Trace 的生命周期(开始与结束)。 -
TrajectoryNode 作为一个独立的 LangGraph 节点,可以在图的任意位置被调用,手动记录轨迹。
分布式追踪机制
-
当前实现:我们的策略相对直接——**通过新的用户输入来开启一个新的 Trace**。当一个 HumanMessage(用户消息)在上一轮对话结束后出现时,系统会生成一个新的 trace_id。本轮中所有的后续事件(AIMessage、ToolCall、ToolMessage)都会沿用这个 trace_id,但会拥有各自独立的 span_id,并通过 parent_span_id 建立父子关系。
-
langgraph_hook.py -
提供了 TrajectoryNode,一个可以被添加到任何 LangGraph 图中的独立节点。它提供了与 ReactTrajectoryHook 类似的功能,但给予开发者更大的控制权,可以在图的任意位置手动记录状态。 -
trajectory_viewer.py -
TrajectoryViewer 是一个离线分析工具。它读取 JSONL 文件,使用 itertools.groupby **按 ****trace_id**用 itertools.groupby **按 ****trace_id**** 对所有事件进行分组**,然后格式化输出每一次完整的交互,清晰地展示了思考链和工具调用过程。
调用过程
效果
-
实现了完整的轨迹记录与追踪:成功构建了一个从数据捕获、处理、存储到展示的端到端轨迹系统。 -
高度解耦和可扩展:通过 TrajectoryBackend 抽象,系统可以轻松适配不同的生产环境存储方案。 -
优雅的 Trace 划分机制:通过监控用户输入来判定新 Trace 的开始,符合直觉且实现简洁,准确地将多轮对话划分为独立追踪单元。 -
双重集成模式:提供了 Hook 和 Node 两种集成方式,兼顾了自动化和灵活性,适用于不同类型的 LangGraph 应用。 -
简化的实现逻辑:从最初复杂的节点装饰器和状态快照方案,演进到最终“扫描消息增量”的模式,代码更简洁、鲁棒性更强。 -
实用的可视化工具:TrajectoryViewer 提供了类似 LangSmith 的分组展示功能,极大地提升了调试和分析的效率。
未来展望
-
丰富后端支持:实现更多 TrajectoryBackend,如 Kafka(用于实时数据流)、Redis(用于快速缓存)或 ClickHouse/PostgreSQL(用于持久化存储和复杂查询)。 -
增强可视化界面:将 TrajectoryViewer 从命令行工具升级为交互式 Web 应用,提供更丰富的过滤、搜索和可视化功能。 -
与日志/数据管道集成:在生产环境中,将 TrajectoryBackend 对接到公司统一的日志系统和数据管道(如 Flink、Spark),实现企业级的监控和分析。 -
优化 Trace 结束判定:探索更复杂的 Trace 结束逻辑,例如基于特定事件或超时机制,以适应更复杂的业务场景。
-
要满足数据合规的要求,有的不可以存明文,可能就需要大模型改写信息来脱敏 -
关联用户的点赞,不点赞的行为,分析上下文,来改写模型 -
将数据进行冷热分离,持久归档冷数据到便宜的存储,省钱
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END