Skip to content

复杂 Agent 系统解析

单个 Agent 在有明确工具和清晰任务边界的情况下工作得很好。但当任务复杂度上升——需要并行执行、长程规划、人工介入、不同专业能力组合——单 Agent 的局限就开始显现。

这页讨论复杂 Agent 系统的典型架构模式、工程实现中真正棘手的问题,以及一些判断"什么时候需要复杂架构"的实用标准。

单 Agent 的局限在哪里

不是所有任务都需要多 Agent。单 Agent 的局限通常在这几个地方:

上下文窗口压力:一个 Agent 同时处理太多事情,历史对话和工具返回会快速消耗 context,导致早期信息被截断。

专业能力碎片化:一个 Agent 什么都能做,但哪个领域都不深。比如代码审查需要安全视角、性能视角、可维护性视角,同一个 Agent 很难在每个维度都给出高质量的判断。

错误传播:单 Agent 中途出错,往往需要从头重跑。没有隔离,没有检查点。

并行限制:单 Agent 是顺序执行的,不能同时做多件事。

主流架构模式

Orchestrator-Worker 模式

最常见的多 Agent 架构。一个 Orchestrator(协调者)负责任务拆分和结果聚合,多个 Worker Agent 各自负责一个专门领域。

python
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from typing import TypedDict, List

class OrchestratorState(TypedDict):
    original_task: str
    subtasks: List[str]
    results: List[str]
    final_output: str

llm = ChatOpenAI(model="gpt-4o")

def orchestrator_node(state: OrchestratorState) -> dict:
    """拆分任务,决定分配给哪些 Worker"""
    response = llm.invoke(
        f"把以下任务拆成 2-3 个具体子任务,每个子任务一行:\n{state['original_task']}"
    )
    subtasks = [line.strip() for line in response.content.split('\n') if line.strip()]
    return {"subtasks": subtasks}

def worker_code_review(state: OrchestratorState) -> dict:
    """代码质量 Worker"""
    task = state['subtasks'][0] if state['subtasks'] else state['original_task']
    response = llm.invoke(f"从代码质量角度分析:{task}")
    return {"results": state.get('results', []) + [response.content]}

def worker_security(state: OrchestratorState) -> dict:
    """安全审查 Worker"""
    task = state['subtasks'][1] if len(state['subtasks']) > 1 else state['original_task']
    response = llm.invoke(f"从安全角度审查:{task}")
    return {"results": state.get('results', []) + [response.content]}

def aggregator_node(state: OrchestratorState) -> dict:
    """汇总所有 Worker 的结果"""
    combined = "\n\n".join(state.get('results', []))
    response = llm.invoke(f"整合以下分析结果,生成综合报告:\n{combined}")
    return {"final_output": response.content}

Orchestrator 本身也是一个 LLM,负责理解任务、制定计划、分配工作、最终整合。Worker 各自专注,互不干扰。

Plan-Execute 模式

先让 Agent 制定完整计划,再按计划逐步执行。和 ReAct 的区别是:ReAct 是边想边做,Plan-Execute 是先想清楚再动手。

适合任务目标确定、步骤可以提前列出的场景:

python
def planning_node(state):
    """制定完整执行计划"""
    response = llm.invoke(
        f"任务:{state['task']}\n"
        "请制定详细的执行步骤(每步一行,用数字编号):"
    )
    steps = parse_numbered_steps(response.content)
    return {"plan": steps, "current_step": 0}

def execution_node(state):
    """按计划执行当前步骤"""
    step = state['plan'][state['current_step']]
    result = execute_with_tools(step, state['tools'])
    return {
        "step_results": state.get('step_results', []) + [result],
        "current_step": state['current_step'] + 1
    }

def should_continue(state):
    if state['current_step'] >= len(state['plan']):
        return END
    return "execute"

Plan-Execute 的问题是计划可能不适应中途出现的新信息,需要配合重新规划的机制。

Hierarchical Agent(层级 Agent)

多层嵌套结构:顶层 Agent 做战略决策,中层 Agent 协调子领域,底层 Agent 执行具体操作。

在大型企业应用里常见,比如:

CEO Agent(战略)
├── Engineering Agent(工程决策)
│   ├── Backend Worker(具体实现)
│   └── Frontend Worker(具体实现)
└── Research Agent(信息收集)
    ├── Web Search Worker
    └── Document Analysis Worker

层级越深,响应延迟越高,出错时排查路径也越长。非必要不要引入太多层次。

状态管理:最难的工程问题

多 Agent 系统里最难的不是"让多个 Agent 跑起来",而是"怎么在多个 Agent 之间安全地共享和传递状态"。

共享状态的问题

如果多个 Worker 并发写同一个状态对象,会有竞态条件。LangGraph 通过 Annotated 类型和 reducer 函数处理并发写入:

python
from typing import TypedDict, Annotated
import operator

class SharedState(TypedDict):
    messages: Annotated[list, operator.add]  # 追加语义,安全并发
    results: Annotated[dict, lambda a, b: {**a, **b}]  # 合并语义
    step_count: int  # 不加注解,只能单写

operator.add 告诉 LangGraph:多个节点同时写 messages 时,把它们的结果合并追加,而不是覆盖。

持久化与中断恢复

Agent 执行到一半崩溃,或者需要等待人工审批,需要能保存当前状态并恢复:

python
from langgraph.checkpoint.sqlite import SqliteSaver

# 用 SQLite 持久化状态
with SqliteSaver.from_conn_string("agent_state.db") as memory:
    graph = compiled_graph.with_config({"checkpointer": memory})

    # 第一次运行
    config = {"configurable": {"thread_id": "task-001"}}
    result = graph.invoke({"task": "分析这份报告"}, config=config)

    # 中断后恢复(从上次保存点继续)
    result = graph.invoke(None, config=config)

这在需要"Human-in-the-loop"的场景里特别有用:Agent 执行到需要人工确认的节点时暂停,人工审批后继续。

常见失控点

Prompt 链路太长,质量衰减

每经过一个 Agent,任务描述被重新诠释一次。三层下来,执行的和最初要求的可能已经不一样了。保持 Orchestrator 的指令简洁、明确,Worker 的任务范围聚焦,减少转述损耗。

工具调用不加限制

Agent 可以无限循环调用工具,或者调用了一个副作用很大的操作(删文件、发邮件)。生产环境里必须:

  • 设置最大步骤数(max_iterations
  • 对危险操作加确认节点
  • 关键操作前做 Human-in-the-loop 审批

调试困难

多个 Agent 并发运行,日志交叉,出了问题很难追踪是哪一步出的问题。LangSmith 这类工具可以可视化整个调用链,是复杂 Agent 系统的标配。

过度设计

很多任务用单 Agent + 好工具就能解决,不需要复杂的多 Agent 架构。引入复杂性之前,先问:单 Agent 真的达到瓶颈了吗?

什么时候需要复杂架构

以下情况才真正需要多 Agent 架构:

  • 任务需要真正的并行执行(不是顺序执行能优化的)
  • 不同子任务需要不同的专业深度,单一 Agent 质量不满足要求
  • 需要 Human-in-the-loop 审批机制
  • 跨系统集成,不同 Agent 由不同团队维护(这时候需要 A2A 等协议)

大多数"感觉需要多 Agent"的场景,实际上是单 Agent + 更好的工具设计 + 更清晰的 prompt。架构复杂度是成本,不是荣誉。

子 Agent Spawn 的边界问题

当 Orchestrator 动态创建子 Agent 时,边界设计是最容易出问题的地方。

资源边界

子 Agent 能访问哪些工具、调用哪些 API、读哪些文件,要在 spawn 时明确传入,而不是让子 Agent 自己决定:

python
def spawn_research_agent(topic: str, allowed_sources: list[str]) -> dict:
    """
    明确传入资源边界:
    只能搜索 allowed_sources 里的域名,最多调用 10 次工具,超时 60 秒
    """
    return research_agent.invoke(
        {"task": f"研究:{topic}"},
        config={
            "allowed_domains": allowed_sources,
            "max_tool_calls": 10,
            "timeout": 60,
        }
    )

没有资源边界的子 Agent,一旦触发错误路径,可能会无限调用工具或把大量费用花在无关的 API 调用上。

权限边界

子 Agent 不应该自动继承父 Agent 的所有权限。最小权限原则:只给子 Agent 完成当前任务所需的工具,不多一个。代码审查 Agent 不需要数据库写权限;信息检索 Agent 不需要发邮件的能力。权限过宽会增加出错时的爆炸半径。

超时边界

子 Agent 卡住了,必须有机制让它超时退出:

python
import asyncio

async def spawn_with_timeout(agent, input_data, timeout_seconds=30):
    try:
        result = await asyncio.wait_for(
            asyncio.to_thread(agent.invoke, input_data),
            timeout=timeout_seconds
        )
        return {"status": "success", "result": result}
    except asyncio.TimeoutError:
        return {"status": "timeout", "result": None, "error": f"超时({timeout_seconds}s)"}
    except Exception as e:
        return {"status": "error", "result": None, "error": str(e)}

Orchestrator 收到 status: timeout 时,可以决定用部分结果继续、重试,或者整体中止。

幂等性设计

幂等性是指:同一个操作执行多次,和执行一次的结果相同。Agent 调用工具可能失败,失败后重试是常见策略。但如果工具调用不是幂等的,重试会导致重复操作——发两封确认邮件、重复创建数据库记录、向第三方 API 发起两次付款。

幂等键(Idempotency Key)

python
import uuid

def send_notification(user_id: str, message: str, idempotency_key: str = None) -> dict:
    if idempotency_key is None:
        idempotency_key = str(uuid.uuid4())
    
    # 检查是否已执行过
    existing = db.get_notification_by_key(idempotency_key)
    if existing:
        return {"status": "already_sent", "notification_id": existing.id}
    
    # 执行并记录
    notification = db.create_notification(user_id, message, idempotency_key)
    return {"status": "sent", "notification_id": notification.id}

状态检查 + 条件执行

python
def create_order_if_not_exists(order_id: str, order_data: dict) -> dict:
    existing = db.get_order(order_id)
    if existing:
        return {"status": "exists", "order": existing}
    new_order = db.create_order(order_id, order_data)
    return {"status": "created", "order": new_order}

LangGraph 的 Checkpointer 机制在整个工作流层面实现幂等性:它记录每个节点的执行状态,重启后跳过已成功的节点,只从失败点继续。

会话隔离与并发

会话隔离

多用户场景下,不同用户的 Agent 实例不能共享状态,否则会出现数据泄漏:

python
def handle_user_request(user_id: str, message: str):
    # thread_id 区分不同用户的会话,每个用户的状态完全隔离
    config = {"configurable": {"thread_id": f"user-{user_id}"}}
    result = graph.invoke({"message": message}, config=config)
    return result

并发节点

LangGraph 支持并行节点执行,两个没有依赖关系的节点可以同时跑:

python
# 两个搜索节点并行执行,都完成后才进入聚合
workflow.add_edge(START, "search_web")
workflow.add_edge(START, "search_database")
workflow.add_edge("search_web", "aggregate")
workflow.add_edge("search_database", "aggregate")

并行节点对共享状态的写入需要 reducer 函数处理冲突(参见前面的 SharedState 示例)。

错误处理与故障转移

复杂 Agent 系统的错误处理需要分层设计:

工具级别:有最大重试次数和退避策略;记录失败原因,不静默失败。

Agent 级别:子 Agent 失败时,Orchestrator 能感知;根据任务重要性决定是整体中止还是降级处理(用部分结果继续)。

系统级别:整个工作流有超时保护;记录完整的执行轨迹,方便事后审查;对外暴露明确的错误响应格式。

python
class AgentExecutionError(Exception):
    def __init__(self, agent_name: str, step: str, cause: str):
        self.agent_name = agent_name
        self.step = step
        self.cause = cause
        super().__init__(f"Agent {agent_name} failed at step '{step}': {cause}")

显式错误类型让 Orchestrator 能根据错误种类做不同的降级策略,而不是用一个通用的 Exception 处理所有失败。


A2A 协议提供了多 Agent 系统跨服务通信的标准方案,参考 A2A 协议。LangGraph 的有状态编排是这里提到的很多模式的具体实现框架,参考 LangChain 与 LangGraph

面向开发者系统学习 AI 应用开发、RAG、Agent 与 Vibe Coding。