# agentmesh **Repository Path**: Huiyun-Co_admin/agentmesh ## Basic Information - **Project Name**: agentmesh - **Description**: No description available - **Primary Language**: Python - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-05-02 - **Last Updated**: 2026-05-02 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # MCP AgentMesh MCP AgentMesh 是一个 Python MCP Server,用 **Streamable HTTP** 暴露 agent 协作接口,用 **ZooKeeper** 维护机器节点在线 lease,用 **Redis Streams** 存储任务、结果和事件。 它同时提供: - MCP 远程服务:`POST/GET/DELETE /mcp` - 运维控制台:`GET /` - Dashboard API:`/api/*` - Python Agent SDK:自动注册、心跳、拉取任务、ack/fail ## 1. 功能概览 节点能力: - agent 启动后在 ZooKeeper 创建 ephemeral lease。 - agent 连接 MCP Server 后调用 `node.register` 自动注册。 - MCP Server 可查看全部机器节点、节点在线状态、标签、能力、心跳、指标和当前任务。 - 节点状态包括 `online`、`stale`、`offline`。 任务能力: - 控制方可向指定节点发送 `message`、`command`、`custom` JSON 任务。 - worker agent 通过 `tasks.poll` 拉取任务。 - worker 执行后调用 `tasks.ack` 或 `tasks.fail`。 - 运行中任务超时后会重新投递。 - 任务投递语义是 at-least-once,业务侧需要基于 `idempotency_key` 做幂等。 前端控制台能力: - 查看所有机器节点。 - 按状态、能力、标签筛选节点。 - 查看节点详情 JSON。 - 向指定节点发送任务。 - 查询、拉取、Ack、Fail、取消任务。 - 查看 MCP resources。 - 查看前端和 MCP endpoint 配置。 ## 2. 项目结构 ```text src/agentmesh_mcp/ agent.py # Python Agent SDK 和 sample-agent 入口 dashboard.py # 前端页面和 /api/* Dashboard API models.py # 节点、任务、事件模型 redis_store.py # Redis Streams / Hash / Set 适配 server.py # FastMCP + Dashboard 统一 ASGI 入口 service.py # 核心业务状态机 zookeeper.py # ZooKeeper lease 读取适配 web/static/ # 控制台 HTML/CSS/JS tests/ # 单元测试和接口测试 docker-compose.yml # Redis + ZooKeeper + MCP Server + sample-agent ``` ## 3. 环境准备 需要: - Python 3.12+ - `uv` - Docker / Docker Compose 安装开发依赖: ```bash uv sync --extra dev ``` 运行验证: ```bash uv run pytest uv run ruff check . uv run ruff format --check . ``` ## 4. 使用 Docker Compose 启动 启动完整环境: ```bash docker compose up --build ``` 默认启动 4 个服务: - `redis`:任务和事件后端 - `zookeeper`:节点在线 lease - `mcp-server`:MCP Server + 前端控制台 + Dashboard API - `sample-agent`:演示 agent,自动注册并拉取任务 启动后访问: - 控制台:`http://127.0.0.1:8000/` - MCP endpoint:`http://127.0.0.1:8000/mcp` - Redis:`127.0.0.1:6379` - ZooKeeper:`127.0.0.1:2181` 查看日志: ```bash docker compose logs -f mcp-server docker compose logs -f sample-agent ``` 停止服务: ```bash docker compose down ``` ## 5. 前端配置教程 前端控制台不需要单独启动 Node.js,也不需要单独构建。它是静态 HTML/CSS/JS,随 Python 包一起发布,由 MCP Server 同进程提供。 默认路由: - `GET /`:控制台入口页面。 - `GET /assets/app.js`:前端逻辑。 - `GET /assets/styles.css`:前端样式。 - `/api/*`:控制台调用的维护 API。 - `/mcp`:MCP Streamable HTTP endpoint。 本地访问配置: ```text 前端地址: http://127.0.0.1:8000/ API 地址: 同源 /api/* MCP 地址: http://127.0.0.1:8000/mcp ``` 浏览器前端默认使用同源请求,所以 `app.js` 里不需要配置 API base URL。只要页面从 `http://127.0.0.1:8000/` 打开,它会自动请求 `http://127.0.0.1:8000/api/*`。 Docker Compose 默认配置: ```yaml mcp-server: environment: AGENTMESH_HOST: 0.0.0.0 AGENTMESH_PORT: "8000" AGENTMESH_MCP_PATH: /mcp ports: - "8000:8000" ``` 如果要从其他机器访问前端,例如 `http://192.168.1.20:8000/`,需要确认: 1. Docker 或服务器防火墙放行 `8000` 端口。 2. `AGENTMESH_HOST=0.0.0.0`。 3. `AGENTMESH_ALLOWED_HOSTS` 包含访问域名或 IP。 示例: ```bash AGENTMESH_ALLOWED_HOSTS='["127.0.0.1","127.0.0.1:*","localhost","localhost:*","192.168.1.20","192.168.1.20:*"]' ``` 如果前端通过域名或反向代理访问,例如 `https://agentmesh.example.com/`,建议配置: ```bash AGENTMESH_ALLOWED_HOSTS='["agentmesh.example.com","agentmesh.example.com:*"]' AGENTMESH_ALLOWED_ORIGINS='["https://agentmesh.example.com"]' ``` 反向代理需要保留这些路径: ```text / -> mcp-server:8000 /assets/* -> mcp-server:8000 /api/* -> mcp-server:8000 /mcp -> mcp-server:8000 ``` ## 6. 控制台使用教程 打开 `http://127.0.0.1:8000/` 后,左侧是功能菜单: - 节点管理 - 任务管理 - 消息发送 - MCP 资源查看 - 系统配置 ### 节点管理 用于查看和筛选所有机器节点。 - 页面会自动请求 `GET /api/nodes`。 - 表格展示节点 ID、状态、能力、标签、当前任务和最近心跳。 - 点击某一行会调用 `GET /api/nodes/{node_id}` 并显示完整 JSON。 筛选节点: - 状态:选择 `online`、`stale`、`offline`。 - 能力:输入 `message` 或 `command`。 - 标签:输入 `role=worker,region=cn` 这类逗号分隔条件。 ### 任务管理 用于维护任务生命周期。 可执行操作: - 查询任务:输入 `task_id` 后点击“查询”。 - 取消任务:输入 `task_id` 后点击“取消”。 - 拉取任务:输入 `node_id`、`lease_id`、数量和等待秒数后点击“拉取”。 - Ack 任务:输入 `task_id`、`node_id`、`lease_id` 和 result JSON 后点击“Ack”。 - Fail 任务:输入 `task_id`、`node_id`、`lease_id` 和 error JSON 后点击“Fail”。 如果从“节点管理”点击了某个节点,控制台会自动把该节点的 `node_id` 和 `lease_id` 带入任务管理表单。 ### 消息发送 用于向指定节点发送 `message`、`command` 或 `custom` JSON 任务。 1. 在“目标节点”里填写节点 ID,例如 `sample-agent-1`。 2. 选择任务类型:`message`、`command` 或 `custom`。 3. 填写 payload JSON,例如: ```json {"text":"hello"} ``` 4. 可选填写优先级、超时时间和幂等键。 5. 点击“发送任务”。 发送成功后,页面会显示返回的 `task_id` 和任务详情。`sample-agent` 收到任务后会打印日志并自动 ack。 ### MCP 资源查看 用于查看 MCP resource 风格的数据视图。 支持: - `agentmesh://nodes` - `agentmesh://nodes/{node_id}` - `agentmesh://tasks/{task_id}` 选择资源类型后,填写需要的 `node_id` 或 `task_id`,点击“读取资源”。 ### 系统配置 用于查看前端运行时配置,包括: - 控制台入口 - Dashboard API 前缀 - MCP endpoint - 静态资源路径 - 已启用功能菜单 ## 7. Dashboard API 使用教程 健康检查: ```bash curl http://127.0.0.1:8000/api/health ``` 查看全部节点: ```bash curl http://127.0.0.1:8000/api/nodes ``` 按状态筛选节点: ```bash curl "http://127.0.0.1:8000/api/nodes?status=online" ``` 按能力筛选节点: ```bash curl "http://127.0.0.1:8000/api/nodes?capability=message" ``` 按标签筛选节点: ```bash curl "http://127.0.0.1:8000/api/nodes?labels=role=worker,region=cn" ``` 查看单个节点: ```bash curl http://127.0.0.1:8000/api/nodes/sample-agent-1 ``` 发送任务: ```bash curl -X POST http://127.0.0.1:8000/api/tasks \ -H "content-type: application/json" \ -d '{ "target_node_id": "sample-agent-1", "task_type": "message", "payload": {"text": "hello"}, "priority": 5, "timeout_seconds": 300, "idempotency_key": "hello-1" }' ``` 查询任务: ```bash curl http://127.0.0.1:8000/api/tasks/ ``` 取消任务: ```bash curl -X POST http://127.0.0.1:8000/api/tasks//cancel \ -H "content-type: application/json" \ -d '{"reason":"operator cancelled"}' ``` 模拟 worker 拉取任务: ```bash curl -X POST http://127.0.0.1:8000/api/tasks/poll \ -H "content-type: application/json" \ -d '{ "node_id": "sample-agent-1", "lease_id": "", "max_items": 1, "wait_seconds": 0 }' ``` 确认任务成功: ```bash curl -X POST http://127.0.0.1:8000/api/tasks//ack \ -H "content-type: application/json" \ -d '{ "node_id": "sample-agent-1", "lease_id": "", "result": {"ok": true} }' ``` 标记任务失败: ```bash curl -X POST http://127.0.0.1:8000/api/tasks//fail \ -H "content-type: application/json" \ -d '{ "node_id": "sample-agent-1", "lease_id": "", "error": {"message": "failed"}, "retryable": true }' ``` ## 8. MCP 客户端配置教程 任何支持 MCP Streamable HTTP Transport 的客户端,都连接同一个 MCP endpoint: ```text http://127.0.0.1:8000/mcp ``` 如果修改了 `AGENTMESH_MCP_PATH`,客户端 URL 也要同步修改。例如: ```bash AGENTMESH_MCP_PATH=/agentmesh-mcp ``` 对应客户端 URL: ```text http://127.0.0.1:8000/agentmesh-mcp ``` 通用 MCP 客户端配置示例: ```json { "mcpServers": { "agentmesh": { "transport": "streamable-http", "url": "http://127.0.0.1:8000/mcp" } } } ``` 不同 MCP 客户端的配置文件字段名可能略有差异;关键点是: - transport 选择 `streamable-http` 或客户端等价的 HTTP/Streamable HTTP 选项。 - URL 填 `http://:`。 - v1 暂无鉴权,通常不需要额外 headers。 - 如果客户端会发送 `Origin`,需要把该 Origin 加入 `AGENTMESH_ALLOWED_ORIGINS`。 - 如果客户端使用域名、IP 或 Docker 服务名访问,需要把对应 Host 加入 `AGENTMESH_ALLOWED_HOSTS`。 常见访问场景: ```text 本机客户端 -> http://127.0.0.1:8000/mcp Docker 内部 sample-agent -> http://mcp-server:8000/mcp 局域网客户端 -> http://192.168.1.20:8000/mcp 反向代理 HTTPS -> https://agentmesh.example.com/mcp ``` 对应 Host 白名单示例: ```bash # 本机 AGENTMESH_ALLOWED_HOSTS='["127.0.0.1","127.0.0.1:*","localhost","localhost:*"]' # Docker Compose 内部 AGENTMESH_ALLOWED_HOSTS='["mcp-server","mcp-server:*"]' # 局域网 IP AGENTMESH_ALLOWED_HOSTS='["192.168.1.20","192.168.1.20:*"]' # 域名 AGENTMESH_ALLOWED_HOSTS='["agentmesh.example.com","agentmesh.example.com:*"]' AGENTMESH_ALLOWED_ORIGINS='["https://agentmesh.example.com"]' ``` Python MCP SDK 最小连接示例: ```python import asyncio from mcp import ClientSession from mcp.client.streamable_http import streamablehttp_client async def main(): async with streamablehttp_client("http://127.0.0.1:8000/mcp") as ( read_stream, write_stream, _, ): async with ClientSession(read_stream, write_stream) as session: await session.initialize() tools = await session.list_tools() print([tool.name for tool in tools.tools]) asyncio.run(main()) ``` 通过 Python MCP SDK 发送任务: ```python result = await session.call_tool( "tasks.send", { "target_node_id": "sample-agent-1", "task_type": "message", "payload": {"text": "hello"}, "idempotency_key": "hello-1", }, ) print(result.structuredContent) ``` 如果 MCP 客户端报 `421 Misdirected Request`,说明 Host header 没有通过白名单校验。 把客户端实际访问的 host 加入 `AGENTMESH_ALLOWED_HOSTS`,带端口访问时建议使用 `:*` 通配,例如 `mcp-server:*` 或 `192.168.1.20:*`。 ## 9. MCP 工具和资源 MCP tools: - `node.register(node_id, lease_id, zk_path, labels, capabilities, metadata)` - `node.heartbeat(node_id, lease_id, metrics, current_task_id?)` - `nodes.list(status?, label_selector?, capability?)` - `nodes.get(node_id)` - `tasks.send(target_node_id, task_type, payload, priority?, timeout_seconds?, idempotency_key?)` - `tasks.poll(node_id, lease_id, max_items?, wait_seconds?)` - `tasks.ack(node_id, lease_id, task_id, result?)` - `tasks.fail(node_id, lease_id, task_id, error, retryable?)` - `tasks.cancel(task_id, reason?)` MCP resources: - `agentmesh://nodes` - `agentmesh://nodes/{node_id}` - `agentmesh://tasks/{task_id}` ## 10. Python Agent SDK 使用教程 `sample-agent` 已经在 `docker-compose.yml` 里配置好。它会自动: 1. 连接 ZooKeeper。 2. 创建 `/mcp/nodes/{node_id}/leases/{lease_id}` ephemeral lease。 3. 连接 MCP Server。 4. 调用 `node.register`。 5. 周期心跳。 6. 通过 `tasks.poll` 长轮询任务。 7. 收到任务后打印并 `tasks.ack`。 手动运行 sample agent: ```bash uv run agentmesh-sample-agent \ --mcp-url http://127.0.0.1:8000/mcp \ --zookeeper-hosts 127.0.0.1:2181 \ --node-id local-agent-1 ``` 自定义 agent 示例: ```python import asyncio from agentmesh_mcp.agent import AgentMeshClient, AgentNodeConfig, ZooKeeperNodeLease from kazoo.client import KazooClient from mcp import ClientSession from mcp.client.streamable_http import streamablehttp_client async def main(): config = AgentNodeConfig( node_id="custom-agent-1", labels={"role": "worker"}, capabilities=["message", "command"], metadata={"version": "0.1.0"}, ) zk_client = KazooClient(hosts="127.0.0.1:2181") lease = ZooKeeperNodeLease(client=zk_client, config=config) zk_path = await lease.start() try: async with streamablehttp_client("http://127.0.0.1:8000/mcp") as ( read_stream, write_stream, _, ): async with ClientSession(read_stream, write_stream) as session: await session.initialize() client = AgentMeshClient(session) await client.register( node_id=config.node_id, lease_id=lease.lease_id, zk_path=zk_path, labels=config.labels, capabilities=config.capabilities, metadata=config.metadata, ) while True: await client.heartbeat(config.node_id, lease.lease_id, {}, None) tasks = await client.poll(config.node_id, lease.lease_id) for task in tasks: try: print("received", task) await client.ack( config.node_id, lease.lease_id, task["task_id"], {"ok": True}, ) except Exception as exc: await client.fail( config.node_id, lease.lease_id, task["task_id"], {"message": str(exc)}, retryable=True, ) finally: await lease.stop() asyncio.run(main()) ``` ## 11. 配置说明 可通过 `.env` 或环境变量配置: ```bash AGENTMESH_REDIS_URL=redis://127.0.0.1:6379/0 AGENTMESH_ZOOKEEPER_HOSTS=127.0.0.1:2181 AGENTMESH_ZOOKEEPER_ROOT_PATH=/mcp/nodes AGENTMESH_HOST=0.0.0.0 AGENTMESH_PORT=8000 AGENTMESH_MCP_PATH=/mcp AGENTMESH_HEARTBEAT_STALE_AFTER_SECONDS=60 AGENTMESH_ALLOWED_HOSTS='["127.0.0.1","127.0.0.1:*","localhost","localhost:*","0.0.0.0","0.0.0.0:*","mcp-server","mcp-server:*"]' AGENTMESH_ALLOWED_ORIGINS='[]' ``` 常用项: - `AGENTMESH_REDIS_URL`:Redis 地址。 - `AGENTMESH_ZOOKEEPER_HOSTS`:ZooKeeper 地址。 - `AGENTMESH_ZOOKEEPER_ROOT_PATH`:节点 lease 根路径。 - `AGENTMESH_PORT`:服务端口。 - `AGENTMESH_MCP_PATH`:MCP Streamable HTTP 路径。 - `AGENTMESH_HEARTBEAT_STALE_AFTER_SECONDS`:节点多久没心跳后显示为 `stale`。 - `AGENTMESH_ALLOWED_HOSTS`:Host header 白名单;跨容器访问 MCP 时需要包含 `mcp-server:*`,否则 MCP SDK 会对 `Host: mcp-server:8000` 返回 `421 Misdirected Request`。 - `AGENTMESH_ALLOWED_ORIGINS`:Origin 白名单。 ## 12. Redis 和 ZooKeeper 数据布局 Redis: - `stream:tasks:{node_id}`:目标节点任务流。 - `hash:task:{task_id}`:任务详情。 - `hash:node:{node_id}`:节点最近运行数据。 - `stream:events`:节点和任务事件。 - `set:nodes:known`:历史出现过的节点。 ZooKeeper: - `/mcp/nodes/{node_id}/leases/{lease_id}`:agent ephemeral lease。 ZooKeeper 不存任务队列,也不存大 payload。 ## 13. 常见问题 看不到节点: - 确认 `sample-agent` 是否启动:`docker compose logs -f sample-agent` - 确认 ZooKeeper 可用:`docker compose logs -f zookeeper` - 确认 agent 创建了 lease,路径应类似 `/mcp/nodes/sample-agent-1/leases/`。 节点显示 `stale`: - 说明 ZooKeeper lease 仍在线,但 Redis 里的 heartbeat 太旧。 - 检查 agent 是否仍在调用 `node.heartbeat`。 任务一直是 `queued`: - 说明目标节点还没有 poll 到任务。 - 检查目标节点 ID 是否正确。 - 检查 worker 是否在调用 `tasks.poll`。 任务重复执行: - 当前系统是 at-least-once 语义。 - 业务任务需要使用 `idempotency_key` 或自己的业务幂等逻辑。 Docker 镜像构建失败: - 如果报错发生在拉取 `python:3.12-slim`、`redis:7.4-alpine` 或 `zookeeper:3.9`,通常是 Docker Hub 网络问题。 - 网络恢复后重新运行: ```bash docker compose up --build ``` ## 14. 开发和测试 运行全部测试: ```bash uv run pytest -q ``` 运行 lint: ```bash uv run ruff check . ``` 检查格式: ```bash uv run ruff format --check . ``` 构建 Python 包: ```bash uv build ```