模块 74: AI 编排层系统 (chainlesschain orchestrate)
版本: v5.0.2.4 状态: ✅ 完成 测试: 106 个测试(72 单元 + 15 集成 + 19 E2E) 新增文件:
packages/cli/src/lib/claude-code-bridge.jspackages/cli/src/lib/orchestrator.jspackages/cli/src/lib/agent-router.jspackages/cli/src/lib/notifiers/telegram.jspackages/cli/src/lib/notifiers/wecom.jspackages/cli/src/lib/notifiers/dingtalk.jspackages/cli/src/lib/notifiers/feishu.jspackages/cli/src/lib/notifiers/websocket.jspackages/cli/src/lib/notifiers/index.jspackages/cli/src/commands/orchestrate.js新增测试:__tests__/unit/claude-code-bridge.test.js__tests__/unit/notifiers.test.js__tests__/unit/agent-router.test.js__tests__/integration/orchestrator-workflow.test.js__tests__/e2e/orchestrate-command.test.js
一、背景与目标
1.1 问题陈述
ChainlessChain 此前已具备完善的 AI 对话与 Agent 能力,但缺少面向工程任务自动化的编排层:
- 无法将一个复杂编程任务分解并并行分发给多个 AI 执行器
- 无法对接 Claude Code、Codex 等专业 AI 编程 CLI 工具作为执行后端
- 无法在代理执行完成后自动运行 CI/CD 并根据结果决策是否重试
- 无法将任务进度和结果通知到企业微信、钉钉、飞书、Telegram 等即时通讯平台
- 无法通过即时通讯平台发送指令来触发编排任务
1.2 设计目标
参考 OpenClaw 架构(编排层 → 并行 AI 代理 → CI/CD 验证 → 通知),实现:
- ChainlessChain 作为编排层:任务分解、Agent 调度、CI 验证、重试策略
- Claude Code / Codex 作为执行层:调用
claude -p "task" --output-format stream-json - 多路 Agent 路由:支持 claude/codex/gemini/openai/ollama 后端,5 种路由策略
- 多渠道通知:Telegram、企业微信、钉钉、飞书、WebSocket 同时推送
- 双向 IM 集成:不仅推送通知,还支持从企业微信/钉钉/飞书接收指令
1.3 设计原则
- 可测性优先:所有模块通过
_deps注入模式解耦外部依赖 - 渐进降级:无 Claude Code 时自动退化到 ollama 等可用后端
- 零配置运行:
cc orchestrate "task"自动检测可用工具和通知渠道 - 非阻塞通知:IM 平台通知失败不影响主流程
二、架构设计
2.1 整体流程
用户指令 (CLI / WS / IM Webhook)
│
▼
Orchestrator.addTask()
│
├─ LLM 分解任务 (_chat → subtasks[])
│
├─ AgentRouter.dispatch(subtasks)
│ │
│ ├─ ClaudeCodePool (claude CLI)
│ ├─ ClaudeCodePool (codex CLI)
│ ├─ API Backend (gemini/openai/ollama)
│ └─ 路由策略: round-robin / primary / parallel-all / by-type
│
├─ CI 检查 (execSync ciCommand)
│ │
│ ├─ 通过 → 完成
│ └─ 失败 → 携带错误上下文重试 (maxRetries)
│
└─ NotificationManager.broadcast()
│
├─ TelegramNotifier
├─ WeComNotifier
├─ DingTalkNotifier
├─ FeishuNotifier
└─ WebSocketNotifier (实时推送给 WS 客户端)2.2 模块结构
packages/cli/src/
├── commands/
│ └── orchestrate.js # CLI 命令注册,--status/--watch/--webhook 模式
└── lib/
├── claude-code-bridge.js # ClaudeCodeAgent, ClaudeCodePool, detectClaudeCode/Codex
├── orchestrator.js # Orchestrator, TASK_STATUS, TASK_SOURCE
├── agent-router.js # AgentRouter, BACKEND_TYPE, 5 种路由策略
└── notifiers/
├── index.js # NotificationManager, 3 个 incoming 解析器
├── telegram.js # TelegramNotifier
├── wecom.js # WeComNotifier (企业微信)
├── dingtalk.js # DingTalkNotifier (钉钉,含加签)
├── feishu.js # FeishuNotifier (飞书,含签名校验)
└── websocket.js # WebSocketNotifier (实时事件推送)三、核心模块详解
3.1 ClaudeCodeBridge
3.1.1 ClaudeCodeAgent
负责调用单个 AI CLI 进程执行任务:
javascript
const agent = new ClaudeCodeAgent({ id: "a1", cliCommand: "claude", model: "claude-opus-4-6" });
const result = await agent.executeTask("Fix the auth bug", {
cwd: "/path/to/project",
timeout: 300_000,
context: "Error: null reference at auth.js:42",
});
// result: { success, output, exitCode, duration, timedOut?, error? }关键实现:
- 调用
spawn("claude", ["-p", prompt, "--output-format", "stream-json"]) - 通过
_parseStreamJson()解析流式 JSON 输出,提取{ type: "result" }行的result字段 - 支持
timeout(发送 SIGTERM)和abort()(主动终止) _deps = { spawn, execSync }支持测试注入
3.1.2 ClaudeCodePool
批量管理并行 Agent:
javascript
const pool = new ClaudeCodePool({ maxParallel: 3, cliCommand: "claude" });
const results = await pool.dispatch(subtasks, { cwd });
// 自动以 maxParallel 为批次并发执行,返回 results[]3.2 AgentRouter
3.2.1 后端类型
| BACKEND_TYPE | 类型 | 触发条件 |
|---|---|---|
claude | CLI | claude --version 可执行 |
codex | CLI | codex --version 可执行 |
gemini | API | GEMINI_API_KEY 环境变量 |
openai | API | OPENAI_API_KEY 环境变量 |
anthropic | API | ANTHROPIC_API_KEY 环境变量 |
ollama | API | 始终作为本地 fallback |
3.2.2 路由策略
| 策略 | 说明 |
|---|---|
round-robin | 加权轮询,weight 越高分配越多任务 |
primary | 优先使用第一个后端,失败自动 fallback |
parallel-all | 所有后端同时执行,选取最优结果(成功 > 失败,最短耗时) |
by-type | 根据任务 type 字段(code-generation/analysis/review)路由到最适合的后端 |
javascript
const router = AgentRouter.autoDetect({ maxParallel: 3, strategy: "round-robin" });
const results = await router.dispatch(subtasks, { cwd });3.3 Orchestrator
3.3.1 任务生命周期
pending → decomposing → dispatched → ci-checking → completed
→ retrying → ... → failed3.3.2 关键配置
javascript
const orch = new Orchestrator({
cwd: "/project",
maxParallel: 3, // 并发 Agent 数
maxRetries: 3, // CI 失败最大重试次数
ciCommand: "npm test",
verbose: false,
});3.3.3 CI 重试机制
当 CI 失败时,Orchestrator 会:
- 调用
notifier.notifyFailure()发送失败通知 - 将 CI 错误输出作为
context传给下一轮 Agent 执行 - 重试 prompt 包含
"Previous attempt failed CI: <error output>" - 重试次数达到
maxRetries后标记任务为FAILED
3.4 NotificationManager
3.4.1 通知渠道配置
通过环境变量自动启用:
| 渠道 | 所需环境变量 |
|---|---|
| Telegram | TELEGRAM_BOT_TOKEN + TELEGRAM_CHAT_ID |
| 企业微信 | WECOM_WEBHOOK_URL |
| 钉钉 | DINGTALK_WEBHOOK_URL (+ DINGTALK_SECRET 可选) |
| 飞书 | FEISHU_WEBHOOK_URL (+ FEISHU_SECRET 可选) |
| WebSocket | 通过 addWebSocketChannel({ send, requestId }) 动态注入 |
3.4.2 扇出机制
javascript
// 所有渠道并发通知,单个失败不阻塞其他渠道
await notifier.notifySuccess({ taskId, description, agentCount, duration });内部使用 Promise.allSettled() 确保所有渠道都尝试发送,单个失败仅记录日志。
3.5 Incoming IM 解析器
支持从三个 IM 平台接收指令并触发编排任务:
javascript
// 企业微信 XML 解析
parseWeComIncoming('<xml><Content><![CDATA[Fix auth bug]]></Content></xml>')
// → "Fix auth bug"
// 钉钉 JSON 解析
parseDingTalkIncoming({ msgtype: "text", text: { content: " Deploy to prod " } })
// → "Deploy to prod"
// 飞书事件解析(含 challenge 验证)
parseFeishuIncoming({ event: { message: { content: '{"text":"Fix bug"}' } } })
// → "Fix bug"Webhook 服务器启动:cc orchestrate --webhook [--webhook-port 18820]
| 端点 | 平台 | 说明 |
|---|---|---|
POST /wecom | 企业微信 | XML body,提取 <Content> |
POST /dingtalk | 钉钉 | JSON body,提取 text.content |
POST /feishu | 飞书 | JSON body,支持 challenge 验证 |
四、CLI 命令参考
4.1 基本用法
bash
# 执行编排任务
cc orchestrate "Fix the authentication bug in login.ts"
# 多路后端,parallel-all 策略
cc orchestrate "Refactor payment module" --backends claude,gemini --strategy parallel-all
# 指定 CI 命令,最多 5 次重试
cc orchestrate "Add unit tests" --ci "npm run test:unit" --retries 5
# JSON 输出(适合脚本集成)
cc orchestrate "task" --json
# 跳过 CI 检查
cc orchestrate "task" --no-ci4.2 状态与检测
bash
cc orchestrate detect # 检测已安装的 AI CLI 工具
cc orchestrate --status # 查看编排器状态
cc orchestrate --status --json # JSON 格式状态(含 backends 列表)4.3 Webhook 模式
bash
# 启动 HTTP webhook 服务器(默认端口 18820)
cc orchestrate --webhook
# 自定义端口
cc orchestrate --webhook --webhook-port 90904.4 通过 WebSocket 触发
json
{
"type": "orchestrate",
"id": "req-001",
"task": "Fix the failing test in auth.spec.ts",
"cwd": "/path/to/project",
"ciCommand": "npm test"
}响应类型:orchestrate:event(实时进度)、orchestrate:done(最终结果)
五、测试覆盖
5.1 单元测试(72 个)
| 文件 | 测试数 | 覆盖内容 |
|---|---|---|
claude-code-bridge.test.js | 20 | detectClaudeCode/Codex, Agent 生命周期, Pool 批次, stream-json 解析 |
notifiers.test.js | 39 | 5 个通知器 + NotificationManager 扇出 + 3 个 incoming 解析器 |
agent-router.test.js | 13 | autoDetect, 4 种路由策略, 加权轮询, 边界情况 |
5.2 集成测试(15 个)
| 描述 | 测试数 |
|---|---|
| 任务生命周期(完成、事件、通知、跳过 CI、时间戳) | 5 |
| LLM 分解(多子任务、fallback、decomposed 事件) | 3 |
| CI 失败与重试(成功重试、耗尽重试、多次失败通知、错误上下文传递) | 4 |
| status() 和 cron watch | 3 |
5.3 E2E 测试(19 个)
| 描述 | 测试数 |
|---|---|
--help 选项展示 | 7 |
detect 子命令 | 3 |
--status 输出结构 | 4 |
| 无任务时显示用法 | 1 |
--no-ci --provider ollama 优雅失败 | 1 |
| Webhook 服务器(DingTalk POST、Feishu challenge) | 2 |
自动修复: --status --json 包含 backends 字段 | 1(包含在 status 中) |
六、安全考虑
6.1 Webhook 安全
- 默认只监听
127.0.0.1(本机),不对外暴露 - 企业微信、钉钉、飞书均支持签名验证(
WECOM_TOKEN、DINGTALK_SECRET、FEISHU_SECRET) - 飞书 challenge 验证防止伪造订阅
6.2 CLI 执行安全
claude -p使用spawn()而非exec(),避免 shell 注入- 任务描述不插入 shell 命令字符串,始终作为独立参数传递
6.3 通知内容安全
- 通知消息中的任务描述截断为前 120 字符,避免大量数据泄露
- Telegram 使用 HTML parse mode,输出需做 HTML 转义(当前版本由上层保证)
七、已知限制
- Codex CLI:OpenAI Codex CLI 已停止维护,实际使用以 claude 为主
- API 后端:gemini/openai/anthropic API 后端当前依赖
cowork-adapter.js的createChatFn,大型任务可能受 context 窗口限制 - 并发控制:
ClaudeCodePool.maxParallel控制进程数,但未对系统资源(内存/CPU)做额外限制 - Webhook 认证:当前 WeCom webhook 仅通过内容解析,无 IP 白名单验证
八、版本历史
| 版本 | 内容 |
|---|---|
| v5.0.2.4 | 初始实现:ClaudeCodeBridge + AgentRouter + Orchestrator + 5 通知渠道 + Webhook 接收 + 106 个测试 |
