多智能体消息转发与上下文共享中间件 — 基于 MCP 协议实现 agent 间通信与任务协同。支持多 Agent 接入,提供标准 MCP 工具接口和本地 SQLite 存储。
---
name: agent-comm-hub
description: "多智能体消息转发与上下文共享中间件 — 基于 MCP 协议实现 agent 间通信与任务协同。支持多 Agent 接入,提供标准 MCP 工具接口和本地 SQLite 存储。"
version: "2.4.5"
category: autonomous-ai-agents
triggers:
- "hub"
- "Hub"
- "agent-comm-hub"
- "通信"
- "消息"
- "workbuddy"
- "QClaw"
- "send_message"
- "assign_task"
env:
HUB_KEY:
description: "stdio mode connection key for agent authentication"
required: true
secret: true
---
# Agent Communication Hub
> 多智能体消息转发与上下文共享中间件 — **v2.4.5**
让两个或多个独立 AI 智能体之间实现**实时双向通信**和**上下文自动同步**。基于 MCP 协议 + stdio 模式,消息本地持久化,延迟 < 50ms。
## 架构概览
```
┌──────────────┐ ┌──────────────────────────────┐ ┌──────────────┐
│ Agent A │ SSE │ Agent Communication Hub │ SSE │ Agent B │
│ (Hermes) │◄───────►│ (stdio) │◄───────►│ (WorkBuddy) │
│ │ MCP │ │ MCP │ │
└──────────────┘◄───────►│ SQLite WAL + 30 表 │◄───────►└──────────────┘
│ 53 MCP 工具 + 4 级权限 │
│ 上下文暂存 + 建议闭环 │
└──────────────┬──────────────┘
│
SQLite (WAL)
```
**三层协议**:
| 层 | 协议 | 用途 | 延迟 |
|----|------|------|------|
| MCP 工具层 | stdio JSON-RPC | 结构化操作(发消息、分配任务、查状态) | <50ms |
| SSE 推送层 | Server-Sent Events | 实时事件通知(新消息、新任务、建议确认) | <50ms |
## 快速上手 (5 分钟)
从零到完成第一次 Agent 间通信的编号流程:
### Step 1: 确认 Hub 运行状态
确认 Agent Communication Hub 服务器正在运行。如果通过 stdio 模式接入,检查 MCP 配置是否正确加载:
```
调用: get_online_agents()
期望: 返回在线 Agent 列表(至少含自己)
失败: Hub 未运行 → 先启动 Hub 服务器
```
**[检查点] 用户确认**:如果 Hub 未运行,询问用户是否要启动 Hub 服务器。
### Step 2: 注册或确认身份
检查自己是否已在 Hub 注册,如果没有则注册:
```
1. 调用: query_agents(status='all') → 查看所有 Agent
2. 如果自己的 Agent ID 不在列表中
→ register_agent(invite_code, name, capabilities)
3. 如果已注册 → 记下自己的 agent_id 供后续使用
```
**[检查点] 用户确认**:注册新 Agent 需要 invite_code,先问用户是否有可用的邀请码。
### Step 3: 维持在线状态
启动心跳维持在线,确保能接收实时消息推送:
```
调用: heartbeat(agent_id='你的ID')
频率: 每 30 秒一次(超过 90 秒无心跳则自动标记为离线)
```
### Step 4: 检查未读消息
上线后第一时间检查是否有离线期间缓存的消息:
```
1. 调用: search_messages(query='你的ID', limit=20)
2. 筛选 status='unread' 的消息
3. 按时间顺序处理,先 acknowledge_message 确认收到,再回复
```
**[检查点] 用户确认**:找到未读消息后,逐条向用户摘要汇报,请用户确认如何处理。
### Step 5: 发送第一条消息
向另一个 Agent 发送消息,验证双向通信:
```
调用: send_message(from='你的ID', to='目标AgentID', content='通信链路确认畅通')
检查返回: delivered_realtime — true=对方在线, false=对方离线
```
**[检查点] 用户确认**:发送前向用户确认消息内容和目标 Agent。broadcast_message 必须逐条确认。
### 完整闭环示例
```
场景:Hub 在线 → 检查 WorkBuddy 是否有未读消息 → 处理并回复
1. get_online_agents() # 确认自己和对方在线
2. search_messages(limit=10) # 查最近消息
3. acknowledge_message(msg_id, agent_id) # 标记已读
4. send_message(to='workbuddy', content='已收到,正在处理') # 回复
5. mark_consumed(resource=msg_id, action='replied') # 消费水位线
```
## 核心能力
### 53 个 MCP 工具(v2.4.0)
#### Identity 身份 (6)
| 工具 | 功能 |
|------|------|
| `register_agent` | 注册新 Agent,需提供 HUB_KEY 认证 |
| `heartbeat` | Agent 心跳上报,维持在线状态,每 3 次连续心跳记录 +1 |
| `query_agents` | 查询 Agent 列表,支持状态/角色筛选 |
| `get_online_agents` | 获取当前在线 Agent 列表 |
#### Message 消息 (5)
| 工具 | 功能 |
|------|------|
| `send_message` | Agent 间点对点消息,支持 Markdown,自动去重(sha256) |
| `broadcast_message` | (需逐条确认后发送) |
| `acknowledge_message` | 确认已读消息,防止重复出现 |
| `search_messages` | 全文搜索消息历史 |
| `batch_acknowledge_messages` | 批量确认消息(1-500 条/次),用于清理消息积压 |
#### File 文件 (3)
| 工具 | 功能 |
|------|------|
| `send_file` | 发送文件附件(Base64,10MB 限制),关联到消息 |
| `receive_file` | 接收附件,返回 Base64 编码内容 |
| `list_attachments` | 列出附件,支持按消息/Agent 筛选 |
#### Task 任务 (3)
| 工具 | 功能 |
|------|------|
| `assign_task` | 创建并分配任务,支持上下文传递 |
| `update_task_status` | 更新任务状态(inbox→assigned→in_progress→completed/failed) |
| `get_task_status` | 查询任务详情,含依赖、Pipeline、交接信息 |
#### Context 上下文暂存 (5)
| 工具 | 功能 |
|------|------|
| `store_memory` | 临时暂存当前任务参考信息 |
| `recall_memory` | 检索已暂存的上下文 |
| `list_memories` | 列出当前 Agent 的暂存条目 |
| `delete_memory` | 删除暂存条目(仅 creator) |
| `search_memories` | 检索当前 Agent 的暂存内容 |
#### 经验记录
经验记录和策略管理需特定权限配置。
#### 任务协同
| 工具 | 功能 |
|------|------|
| `add_dependency` | 添加任务依赖关系(依赖检查) |
| `remove_dependency` | 删除任务依赖关系 |
| `get_task_dependencies` | 查询任务上下游依赖 |
| `create_parallel_group` | 创建并行任务组(2-10 个任务) |
| `request_handoff` | 请求任务交接 |
| `accept_handoff` | 接受任务交接 |
| `reject_handoff` | 拒绝任务交接(含理由) |
| `add_quality_gate` | 在 Pipeline 中添加质量门 |
| `evaluate_quality_gate` | 评估质量门(passed/failed) |
| `recalculate_trust_scores` | 按调度执行分数维护 |
| `create_pipeline` | 创建 Pipeline 流水线 |
| `get_pipeline` | 查询 Pipeline 详情 |
| `list_pipelines` | 列出 Pipeline |
| `add_task_to_pipeline` | 向 Pipeline 添加任务 |
#### 运维工具 (4)
| 工具 | 功能 |
|------|------|
| `get_db_stats` | 数据库统计信息(表行数、大小、Agent 数等) |
| `archive_data` | 数据维护工具 |
| (其余 2 个内部工具) | 权限验证与控制 |
| (其余 2 个内部工具) | 权限验证与控制 |
#### 消费水位线 (2)
| 工具 | 功能 |
|------|------|
| `mark_consumed` | 标记任务/消息为已消费,防止重复处理 |
| `check_consumed` | 查询资源是否已被消费 |
> 所有工具内置 try-catch + 3 次指数退避重试(100ms → 200ms → 400ms)。v2.4.0 统一错误格式:`HubError` 错误码 + `mcpError()`/`mcpFail()` 标准返回。`check_consumed` 查询失败时降级返回 `consumed=false`(不阻塞业务)。
### 任务状态机
```
inbox → assigned → [waiting] → in_progress → completed / failed / cancelled
```
## 用户确认检查点
以下操作必须在执行前暂停,向用户摘要说明并等待确认:
| # | 操作 | 检查点说明 | 风险 |
|---|------|-----------|------|
| 1 | **broadcast_message** | 广播消息会发送给多个 Agent,逐条确认内容和接收列表 | 高 |
| 2 | **assign_task** | 分配任务前确认:描述是否清晰、目标 Agent 是否合适、Priority 正确 | 中 |
| 3 | **batch_acknowledge_messages** | 批量确认会一次性标记多条消息为已处理,确认不会遗漏重要信息 | 中 |
| 4 | **create_pipeline** | 创建流水线前确认任务顺序、质量门设置、参与 Agent | 中 |
| 5 | **add_quality_gate** | 质量门失败会阻塞后续任务,确认评估标准合理 | 高 |
| 6 | **request_handoff** | 交接任务前确认目标 Agent 有能力接手、理由充分 | 中 |
| 7 | **archive_data** | 归档操作会移动数据到归档表,确认归档范围和天数 | 高 |
| 8 | **store_memory(scope='group')** | 写入组内共享记忆前确认内容适当,不会泄露敏感信息 | 中 |
| 9 | **propose_strategy** | 提议策略前确认内容准确、分类正确、有实际价值 | 低 |
| 10 | **reject_handoff** | 拒绝交接需提供理由,确认不会导致任务阻塞 | 中 |
> **规则**:LLM 遇到上表操作时,先向用户输出摘要说明,明确询问"是否继续?",得到肯定答复后再执行。用户可随时跳过检查点。
## 数据隔离与安全边界
| 边界 | 实现方式 |
|------|---------|
| **接收方校验** | `send_message`/`assign_task` 中的 `to_agent` 必须为已注册 Agent,未注册 Agent 被拒绝 |
| **Per-Agent 数据隔离** | 每个 Agent 仅可见自身消息、任务和暂存条目;跨 Agent 查询受 4 级权限控制 |
| **暂存内容保护** | `store_memory` 创建的条目仅 creator 可检索和删除,不会自动暴露给其他 Agent |
| **经验记录审批** | `share_experience` 提交的记录需经 `full` 权限确认后才对其他 Agent 可见 |
## 接入配置(stdio 模式)
在 MCP 配置文件中添加 Hub 为 stdio 服务器,提供 `HUB_KEY` 环境变量进行认证。Hub 通过 stdio 传输 MCP 协议,Agent 的 LLM 可直接调用 Hub 工具。
```json
{
"mcpServers": {
"agent-comm-hub": {
"command": "node",
"args": ["<hub-install-path>/stdio.js"],
"env": {
"HUB_KEY": "your-connection-key"
}
}
}
}
```
## 资源索引
此 skill 目录下已有 Hub 完整源码,可直接参考:
### 本地源文件
| 文件 | 用途 |
|------|------|
| `src/server.ts` | 服务端入口,Express + MCP/SSE 双通道 |
| `src/tools.ts` | 全部 MCP 工具的 TypeScript 实现 |
| `src/db.ts` | SQLite WAL 数据库初始化与连接 |
| `src/identity.ts` | Agent 注册、认证、4 级权限控制 |
| `src/dedup.ts` | SHA256 消息去重实现 |
| `src/errors.ts` | HubError 统一错误码(v2.4.0+) |
| `src/stdio.ts` | Stdio 模式传输层 |
| `src/sse.ts` | SSE 推送通道 |
| `src/orchestrator.ts` | 任务编排、Pipeline、质量门 |
| `src/evolution.ts` | Evolution Engine:策略/经验/信任分 |
| `src/memory.ts` | 上下文暂存与管理 |
| `src/security.ts` | 安全验证、CORS、Token 管理 |
| `src/metrics.ts` | 统计指标收集 |
| `src/repo/` | 数据访问层(repository pattern) |
| `package.json` | Node.js 依赖与版本定义 |
### 参考链接
- GitHub 仓库: https://github.com/liuboacean/agent-comm-hub
- MCP 协议规范: https://spec.modelcontextprotocol.io
## 权限说明(4 级)
| 级别 | 说明 | 可用工具范围 |
|------|------|----------|
| **authenticated** | 已认证(HUB_KEY) | register_agent(初始注册) |
| **member** | 已注册 Agent | 消息 `send_message`/`acknowledge_message` + 任务 `assign_task`/`get_task_status` |
| **group_manager** | 并行组管理 | 任务协同 + Pipeline 工具(不含暂存/经验) |
| **full** | 完整权限 | 全部工具(含运维与建议管理) |
> 部分管理类工具仅特定权限可调用,具体以实际角色配置为准。
> 初始分数 50,公式:`base(50) + verified_capabilities*3 + approved_strategies*2 + positive_feedback*1 - negative_feedback*2`,clamp(0,100)。
## v2.4.0 更新要点
| Phase | 内容 | 变更 |
|-------|------|------|
| **A** | tools.ts 拆分 | 2687 行 → 8 模块 + 30 行入口 + utils.ts |
| **B** | 单元测试 | 100 用例,role-control >= 70% / dedup branches>=60, functions>=70 / utils 100% |
| **C** | CI/CD | GitHub Actions:typecheck + test + coverage 3 Jobs |
| **D** | 类型健壮 | any 归零 + HubError 统一错误码 + MCP 返回格式标准化 |
## 踩坑经验速查
| # | 场景 | 要点 |
|---|------|------|
| 1 | MCP 多 Client | 必须用 Stateless 模式,Stateful 只允许一个 Client |
| 2 | MCP Accept Header | 必须带 `Accept: application/json, text/event-stream` |
| 3 | MCP 响应格式 | SDK 返回 SSE 格式(`data: {...}`),不是纯 JSON |
| 4 | ESM 兼容 | 不能用 `require()`,用 `import()` 动态导入 |
| 5 | UTF-8 块读取 | httpx `resp.read(1)` 会截断多字节字符,用 `read(4096)` |
| 6 | SSE 心跳 | 10 秒间隔,服务端发 `: ping` |
| 7 | MCP != SSE | MCP 是工具调用通道(Agent→Hub),SSE 是推送通道(Hub→Agent) |
| 8 | 离线补发 | 消息/任务存 SQLite,上线后 SSE 自动批量推送 |
| 9 | stdio 模式 | 所有日志走 stderr,stdout 保留给 JSON-RPC |
| 10 | better-sqlite3 boolean | 绑定参数必须用 1/0,不能用 true/false |
| 11 | HubError 错误码 | v2.4.0 统一用 mcpError()/mcpFail(),不要手动构造错误响应 |
## 安全配置
| 配置项 | 说明 |
|--------|------|
| `HUB_KEY` | stdio 模式连接密钥,所有 Agent 接入必须提供,用于身份认证与消息完整性校验 |
| 4 级权限模型 | authenticated → member → group_manager → full,逐级授权 |
| CORS 白名单 | 默认拒绝跨域,通过 `CORS_LIST` 显式配置允许的来源 |
## 环境变量
| 变量 | 默认值 | 说明 |
|------|--------|------|
| `HUB_KEY` | — | stdio 模式连接密钥(必填) |
| `DB_PATH` | ./comm_hub.db | SQLite 数据库路径 |
| `LOG_LEVEL` | info | 日志级别:debug / info / warn / error |
| `CORS_LIST` | (空) | CORS 白名单(逗号分隔),空=拒绝所有跨域 |
## 技术依赖
**Hub 服务器**:
- Node.js 18+
- @modelcontextprotocol/sdk ^1.10.2(支持 StdioServerTransport)
- express ^4.19
- better-sqlite3 ^11.9
- zod ^3.23
**Python 客户端(零外部依赖)**:
- Python 3.9+(纯标准库:http.client / json / asyncio)
don't have the plugin yet? install it then click "run inline in claude" again.