Skip to content
你将构建:一个 Orchestrator-Worker 多 Agent 系统,支持任务拆解、并行执行与结果聚合
高阶时长:60 min
前置:P1P11
技术:Multi-AgentOrchestratorParallelTypeScriptOpenAI SDK

开始前先看:实践环境准备。本章对应示例文件位于 practice/ 目录,可直接按命令运行。

前置准备

开始本章前,请先确认:

  • 已阅读 实践环境准备
  • 基础依赖已就绪:openai
  • 环境变量已配置:OPENAI_API_KEY
  • 建议先完成前置章节:P1P11
  • 本章建议入口命令:bun run p15-multi-agent.ts
  • 示例文件位置:practice/p15-multi-agent.ts

背景与目标

P1 到 P14 的所有 Agent 都是单体的:一个 Agent 实例从头到尾处理一个任务。这在任务可以线性分解时够用,但面对某些场景你会撞上天花板:

一个 Agent 做不了的事,多个也做不了吗?

举一个具体的例子:你要对一个开源项目做全面的代码审查,涉及安全性、性能、代码风格三个维度。单 Agent 的做法是在一次对话里依次检查三个维度,但每个维度都需要不同的 system prompt 和评审标准。塞在一个 prompt 里,维度之间会互相干扰——模型在思考安全问题时容易忽略性能细节,反之亦然。

更实际的限制是 上下文窗口。三个维度的审查结果可能各自占用数千 Token,单次对话的上下文很快被填满。

解决方案直觉上很自然:拆成多个 Agent,各管一摊,最后汇总。这就是多 Agent 编排。

多 Agent 编排已经成为很多 Agent 产品与框架里的常见模式。不同系统对"编排器""专家 Agent""控制权移交"的实现差异很大,但核心目标一致:把复杂任务拆成更小、更专注的子问题,再把结果汇总回来。

时效说明:本段涉及的产品和框架仅用于帮助你建立生态感知,不代表唯一实现路线;真正落地时,应优先回到你当前使用框架的官方文档。

本章目标

  1. 理解三种主流多 Agent 编排模式的适用场景
  2. 实现 Orchestrator-Worker 架构:编排器拆解任务,Worker 并行执行
  3. 实现结果聚合:将多个 Worker 的输出整合为最终回答

核心概念

三种编排模式

多 Agent 系统的编排方式可以归纳为三种基本模式:

模式结构适用场景复杂度
Sequential(顺序链)A → B → C流水线,前一步的输出是下一步的输入
Parallel(并行扇出)A → [B, C, D] → A独立子任务,可以同时执行
Orchestrator-WorkerO → [W1, W2, ...] → O编排器动态决定任务拆分和 Worker 分配

Sequential 最简单:Agent A 处理完交给 Agent B,B 处理完交给 Agent C。适合确定的流水线(比如"翻译 → 校对 → 排版")。

Parallel 是 Sequential 的扩展:一个任务被拆成多个独立子任务,分别交给不同 Agent 并行执行,结果汇总。适合子任务之间没有依赖关系的场景(比如"同时检查安全、性能、风格")。

Orchestrator-Worker 是最灵活的:编排器(Orchestrator)本身也是一个 LLM Agent,它动态决定怎么拆任务、分配给谁、执行几轮。适合任务结构事先不确定的场景。

本章实现 Orchestrator-Worker 模式,因为它包含了前两种模式的能力——如果编排器决定顺序执行,它就退化成 Sequential;如果决定并行执行,它就是 Parallel。

Orchestrator 的职责

Orchestrator 不直接完成任何具体工作,它的职责是三件事:

  1. 任务拆解:接收用户的原始请求,将其分解为多个独立的子任务
  2. Worker 分配:为每个子任务指定执行者(Worker),包括 Worker 的专业领域和 system prompt
  3. 结果聚合:收集所有 Worker 的输出,综合整理为最终回答

Orchestrator 通过工具调用来分派任务。我们给它注册一个 dispatch_workers 工具,输入是子任务列表,输出是所有 Worker 的执行结果。这样 Orchestrator 的决策过程对模型来说就是一次普通的工具调用。

Worker 的设计

每个 Worker 是一个独立的 LLM 调用,有自己的 system prompt 和上下文。Worker 之间互不可见——Worker A 不知道 Worker B 的存在,也看不到 Worker B 的输出。这种隔离是刻意的:

  • 避免干扰:专注于自己的子任务,不被其他维度的信息分散注意力
  • 并行友好:没有依赖关系,可以用 Promise.all 同时执行
  • 可替换:每个 Worker 的实现可以独立调整,换 prompt、换模型,不影响其他 Worker

Worker 的 system prompt 需要高度聚焦。一个负责"安全审查"的 Worker 不应该评论代码风格,即使它看到了风格问题。范围越窄,输出质量越高。

任务拆解的结构化输出

让 Orchestrator 返回结构化的任务列表,而不是自由文本。我们用工具调用的 input schema 来约束输出格式:

ts
// Orchestrator 调用 dispatch_workers 工具时的输入结构
interface DispatchInput {
  tasks: Array<{
    id: string           // 子任务标识,如 "security", "performance"
    title: string        // 子任务名称
    description: string  // 详细的执行指令
    expertise: string    // Worker 的专业领域(用于生成 system prompt)
  }>
}

id 而不是数组下标来标识子任务,因为聚合阶段需要按语义引用("安全审查发现了……"),而不是按序号。

动手实现

$bun run p15-multi-agent.ts

当前仓库已提供对应文件,完成前置准备后可直接执行。

运行与验证

  • 先按前置准备完成依赖和环境变量配置
  • 执行上面的推荐入口命令
  • 将输出与下文的“运行结果”或章节描述对照,确认主链路已经跑通
  • 如果遇到命令、依赖、环境变量或样例输入问题,先回到 实践环境准备 排查

第一步:定义类型和配置

ts
// p15-multi-agent.ts
import OpenAI from 'openai'

const client = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY,
  baseURL: process.env.OPENAI_BASE_URL,
})

// 子任务描述(Orchestrator 输出)
interface SubTask {
  id: string
  title: string
  description: string
  expertise: string
}

// Worker 执行结果
interface WorkerResult {
  taskId: string
  title: string
  output: string
}

第二步:实现 Worker

Worker 是一个纯函数:接收子任务描述,返回执行结果。每个 Worker 是一次独立的 LLM 调用,system prompt 根据 expertise 动态生成。

ts
// p15-multi-agent.ts(续)

async function runWorker(task: SubTask): Promise<WorkerResult> {
  console.log(`[Worker:${task.id}] 开始执行: ${task.title}`)

  const response = await client.chat.completions.create({
    model: process.env.OPENAI_MODEL || 'gpt-4o',
    messages: [
      {
        role: 'system',
        content: [
          `你是一位专注于「${task.expertise}」的专家。`,
          '请严格围绕分配给你的任务进行分析,不要涉及其他维度。',
          '输出格式:先给出核心结论(1-2句),再列出具体发现(要点列表)。',
        ].join('\n'),
      },
      { role: 'user', content: task.description },
    ],
  })

  const output = response.choices[0].message.content ?? ''

  console.log(`[Worker:${task.id}] 完成 (${output.length} chars)\n`)

  return {
    taskId: task.id,
    title: task.title,
    output,
  }
}

注意 Worker 用的是示例 Sonnet 型号而不是 Opus——Worker 执行的是聚焦子任务,通常更适合性价比模型;Orchestrator 则更适合承担任务拆解和聚合。这种分层用模型的策略在生产中非常常见,但具体型号应以你当前账号可用模型为准。

第三步:实现 dispatch 工具

这个工具是 Orchestrator 和 Worker 之间的桥梁:接收 Orchestrator 拆解出的子任务列表,并行启动所有 Worker,收集结果后返回。

ts
// p15-multi-agent.ts(续)

// dispatch_workers 工具的 schema(OpenAI ChatCompletionTool 格式)
const dispatchToolSchema: OpenAI.ChatCompletionTool = {
  type: 'function',
  function: {
    name: 'dispatch_workers',
    description:
      '将任务分派给多个专家 Worker 并行执行。每个 Worker 独立工作,互不可见。执行完成后返回所有 Worker 的结果。',
    parameters: {
      type: 'object',
      properties: {
        tasks: {
          type: 'array',
          items: {
            type: 'object',
            properties: {
              id: {
                type: 'string',
                description: '子任务唯一标识,如 "security"、"performance"',
              },
              title: {
                type: 'string',
                description: '子任务名称',
              },
              description: {
                type: 'string',
                description: '给 Worker 的详细执行指令,要包含足够的上下文',
              },
              expertise: {
                type: 'string',
                description: 'Worker 的专业领域,如 "安全审计"、"性能优化"',
              },
            },
            required: ['id', 'title', 'description', 'expertise'],
          },
          description: '要分派的子任务列表',
        },
      },
      required: ['tasks'],
    },
  },
}

// 执行 dispatch:并行启动所有 Worker
async function executeDispatch(tasks: SubTask[]): Promise<string> {
  console.log(`\n[Orchestrator] 分派 ${tasks.length} 个子任务,并行执行...\n`)

  // Promise.all 并行执行所有 Worker
  const results = await Promise.all(tasks.map(runWorker))

  // 将结果格式化为文本,供 Orchestrator 聚合
  return results
    .map((result) => [`### ${result.title} (${result.taskId})`, '', result.output].join('\n'))
    .join('\n\n---\n\n')
}

Promise.all 是关键——所有 Worker 的 LLM 调用同时发出,总耗时等于最慢的那个 Worker,而不是所有 Worker 耗时之和。如果有 3 个 Worker 各需要 3 秒,串行要 9 秒,并行只要 3 秒。

第四步:实现 Orchestrator 主循环

Orchestrator 本身也是一个 Agent 循环:发送用户请求,模型决定是否调用 dispatch_workers,调用后拿到结果再做聚合。

ts
// p15-multi-agent.ts(续)

// 类型守卫:验证子任务结构
function isSubTask(value: unknown): value is SubTask {
  if (typeof value !== 'object' || value === null) return false
  const obj = value as Record<string, unknown>
  return (
    typeof obj['id'] === 'string' &&
    typeof obj['title'] === 'string' &&
    typeof obj['description'] === 'string' &&
    typeof obj['expertise'] === 'string'
  )
}

async function orchestrate(userMessage: string): Promise<string> {
  console.log(`用户: ${userMessage}\n`)

  const messages: OpenAI.ChatCompletionMessageParam[] = [
    {
      role: 'system',
      content: [
        '你是一个任务编排器(Orchestrator)。你的职责是:',
        '1. 分析用户的请求,将其拆解为多个独立的子任务',
        '2. 使用 dispatch_workers 工具将子任务分派给专家 Worker',
        '3. 收到 Worker 的结果后,综合整理为一份完整、连贯的最终回答',
        '',
        '拆解原则:',
        '- 每个子任务应该是独立的,Worker 之间互不可见',
        '- 子任务的 description 要包含足够的上下文,Worker 只能看到自己的任务描述',
        '- 为每个子任务指定准确的 expertise,确保 Worker 聚焦于自己的专业领域',
        '- 如果任务不需要拆解(太简单),直接回答即可,不必强行分派',
        '',
        '聚合原则:',
        '- 不要简单拼接 Worker 的输出,要提炼和整合',
        '- 如果多个 Worker 的发现有冲突,指出分歧并给出你的判断',
        '- 最终回答应该是一份结构清晰的综合报告',
      ].join('\n'),
    },
    { role: 'user', content: userMessage },
  ]

  // Orchestrator 的 Agent 循环
  while (true) {
    const response = await client.chat.completions.create({
      model: process.env.OPENAI_MODEL || 'gpt-4o',
      tools: [dispatchToolSchema],
      messages,
    })

    const message = response.choices[0].message
    const toolCalls = message.tool_calls ?? []

    if (response.choices[0].finish_reason === 'stop' || toolCalls.length === 0) {
      // 模型完成了聚合,返回最终文本
      return message.content ?? ''
    }

    // 将 assistant 消息加入历史
    messages.push(message)

    // 处理 dispatch_workers 工具调用
    for (const toolCall of toolCalls) {
      if (toolCall.type !== 'function' || toolCall.function.name !== 'dispatch_workers') continue

      const input = JSON.parse(toolCall.function.arguments) as Record<string, unknown>
      const tasksRaw = input['tasks']
      const tasks = Array.isArray(tasksRaw) ? tasksRaw.filter((item) => isSubTask(item)) : []
      const result = await executeDispatch(tasks)

      messages.push({
        role: 'tool',
        tool_call_id: toolCall.id,
        content: result,
      })
    }
  }
}

第五步:入口与测试

ts
// p15-multi-agent.ts(续)

async function main(): Promise<void> {
  const codeToReview = `
function processUserData(data: any) {
  const result = eval(data.expression)

  const allUsers = []
  for (let i = 0; i < 1000000; i++) {
    allUsers.push(fetch('/api/user/' + i))
  }

  let output = ''
  for (const user of allUsers) {
    output = output + JSON.stringify(user) + '\\n'
  }

  return result + output
}
  `.trim()

  const answer = await orchestrate(
    `请对以下 TypeScript 代码进行全面审查,涵盖安全性、性能和代码质量三个维度:\n\n\`\`\`ts\n${codeToReview}\n\`\`\``
  )

  console.log('\n========== 最终报告 ==========\n')
  console.log(answer)
}

main().catch((error) => {
  console.error(error)
  process.exitCode = 1
})

运行结果

用户: 请对以下 TypeScript 代码进行全面审查...

[Orchestrator] 分派 3 个子任务,并行执行...

[Worker:security] 开始执行: 安全性审查
[Worker:performance] 开始执行: 性能审查
[Worker:quality] 开始执行: 代码质量审查
[Worker:security] 完成 (487 chars)
[Worker:performance] 完成 (523 chars)
[Worker:quality] 完成 (412 chars)

========== 最终报告 ==========

## 代码审查综合报告

### 严重问题

1. **eval() 注入漏洞(安全)**:直接对用户输入执行 eval(),攻击者可执行任意代码。
   必须移除 eval,改用安全的表达式解析库。

2. **百万次串行 HTTP 请求(性能)**:循环发起 1,000,000 次 fetch 请求,
   会耗尽内存和网络连接。应改为分页查询或批量 API。

### 中等问题

3. **字符串拼接(性能)**:循环内用 += 拼接字符串,每次创建新字符串对象。
   应使用数组 push + join。

4. **any 类型(质量)**:参数类型为 any,失去类型安全保护。
   应定义明确的接口类型。

### 建议

5. **缺少错误处理**:fetch 请求没有 catch,任一请求失败会导致整体崩溃。
6. **缺少输入验证**:data 参数没有校验,应在入口处验证数据结构。

关键点梳理

概念说明
Orchestrator-Worker 模式Orchestrator 负责任务拆解和结果聚合,Worker 负责具体执行,职责分离
dispatch_workers 工具把多 Agent 分派抽象为一次工具调用,对 Orchestrator 来说就是"调用工具,拿到结果"
Worker 隔离每个 Worker 是独立的 LLM 调用,有自己的 system prompt 和上下文,互不可见
Promise.all 并行所有 Worker 同时执行,总耗时 = max(各 Worker 耗时),而非 sum
分层用模型Orchestrator 用强模型(Opus)做推理和聚合,Worker 用性价比模型(Sonnet)做执行
结构化任务拆解通过工具 input schema 约束 Orchestrator 的输出格式,确保可解析
动态拆解Orchestrator 根据请求内容动态决定拆成几个子任务、每个子任务的专业方向
聚合不是拼接Orchestrator 收到 Worker 结果后要整合、去重、排序、解决冲突,不是简单粘贴

常见问题

Q: 什么时候该用多 Agent,什么时候单 Agent 就够了?

判断标准是子任务之间是否需要不同的专业视角。如果一个任务的所有部分都需要同样的上下文和推理模式,单 Agent 更高效——多 Agent 的通信开销(编排 + 聚合各消耗一次 LLM 调用)是实打实的成本。

多 Agent 的优势出现在三种场景:

  • 维度正交:子任务之间需要不同的评估标准(安全 vs 性能 vs 风格)
  • 上下文隔离:子任务的上下文互相干扰,分开处理效果更好
  • 并行加速:子任务独立,可以同时执行节省时间

Q: Worker 用不同的模型可以吗?

完全可以,这正是多 Agent 的优势之一。你可以根据子任务的特性选择最合适的模型:

ts
async function runWorker(task: SubTask): Promise<WorkerResult> {
  // 根据任务类型选择模型
  const model = task.expertise.includes('代码')
    ? 'gpt-4o'       // 代码任务:用更强的模型
    : 'gpt-4o-mini'  // 简单分析:用性价比模型

  const response = await client.chat.completions.create({
    model,
    // ...
  })
  // ...
}

在 P18(模型路由)中会系统地讨论这个话题。

Q: 如果某个 Worker 执行失败了怎么办?

当前实现用 Promise.all,一个失败全部失败。生产环境通常用 Promise.allSettled 来容错:

ts
const settled = await Promise.allSettled(tasks.map(runWorker))

const results: WorkerResult[] = []
const failures: Array<{ taskId: string; error: string }> = []

for (let i = 0; i < settled.length; i++) {
  const outcome = settled[i]
  if (outcome.status === 'fulfilled') {
    results.push(outcome.value)
  } else {
    failures.push({
      taskId: tasks[i].id,
      error: String(outcome.reason),
    })
  }
}

// 把成功结果和失败信息一起返回给 Orchestrator
// Orchestrator 可以决定是否重试失败的子任务

P4 的重试策略在这里也适用——可以对失败的 Worker 做指数退避重试。

Q: Orchestrator 可以做多轮分派吗?比如先分派一轮,看到结果后再分派第二轮?

可以。因为 Orchestrator 本身是一个 Agent 循环,只要模型在第一轮聚合后觉得信息不够,它可以再次调用 dispatch_workers 启动新的 Worker。这在需要迭代深入的场景中很有用:第一轮做粗粒度扫描,发现问题后第二轮针对具体问题深入分析。

小结与延伸

你现在有了一个 Orchestrator-Worker 多 Agent 系统:

  • Orchestrator 接收用户请求,用 LLM 动态拆解为多个子任务
  • dispatch_workers 工具并行启动所有 Worker,隔离执行
  • Worker 各自独立完成分析,Orchestrator 收到结果后整合为最终报告
  • 分层用模型策略降低总成本

这是多 Agent 编排的基础架构。接下来两章会在此基础上扩展:

  • P16 子 Agent 与任务分解:Worker 自身也是一个完整的 Agent(带工具调用和多轮循环),而不只是单次 LLM 调用
  • P17 Agent 间通信:Worker 之间可以共享中间状态,支持协作而非纯并行

如果本章对你有帮助

给本书仓库点一个 Star,是对作者最直接的支持。

Star 支持本书