# IOT Manager **Repository Path**: laterlove/iot-manager ## Basic Information - **Project Name**: IOT Manager - **Description**: 使用cursor ai实现的IOT mqtt管理服务端,包含前端和后端 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 2 - **Created**: 2025-05-19 - **Last Updated**: 2026-03-15 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # utf8 encode # MQTT Broker 技术规格文档 ## 1. 项目概述 ### 1.1 项目目标 开发一个企业级MQTT Broker,完整支持MQTT 3.1.1协议规范,参考EMQX架构设计,提供高性能、高可靠的消息传输服务。 ### 1.2 技术栈 - **语言**: C++20 - **持久化存储**: SQLite(嵌入式数据库,无需额外部署) - **网络框架**: Asio(独立版或 Boost.Asio)+ 线程池/协程 - **日志**: spdlog - **配置管理**: yaml-cpp ### 1.3 设计原则 - 模块化设计,各组件低耦合 - 接口优先,便于扩展和测试 - 持久化优先,确保消息不丢失 - 资源可控,适合单机部署 --- ## 2. 系统架构 ### 2.1 整体架构图 ``` ┌─────────────────────────────────────────────────────────────────┐ │ MQTT Broker │ ├─────────────────────────────────────────────────────────────────┤ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │ │ │ TCP/WS │ │ TLS/WSS │ │ HTTP API / Metrics │ │ │ │ Listener │ │ Listener │ │ Server │ │ │ └──────┬──────┘ └──────┬──────┘ └───────────┬─────────────┘ │ │ │ │ │ │ │ ┌──────┴────────────────┴──────────────────────┴─────────────┐ │ │ │ Connection Manager │ │ │ │ - 连接生命周期管理 │ │ │ │ - Keep-Alive检测 │ │ │ │ - 连接状态追踪 │ │ │ └──────────────────────────┬──────────────────────────────────┘ │ │ │ │ │ ┌──────────────────────────┴──────────────────────────────────┐ │ │ │ Session Manager │ │ │ │ - 会话创建/销毁 │ │ │ │ - 订阅状态管理 │ │ │ │ - 消息队列(QoS 1/2) │ │ │ │ - 离线消息存储 │ │ │ └──────────────────────────┬──────────────────────────────────┘ │ │ │ │ │ ┌──────────────────────────┴──────────────────────────────────┐ │ │ │ MQTT Protocol Handler │ │ │ │ - 报文解析/编码 │ │ │ │ - 协议状态机 │ │ │ │ - QoS处理 │ │ │ └──────────────────────────┬──────────────────────────────────┘ │ │ │ │ │ ┌──────────────────────────┴──────────────────────────────────┐ │ │ │ Pub/Sub Engine │ │ │ │ - Topic匹配(通配符支持) │ │ │ │ - 消息路由 │ │ │ │ - 保留消息处理 │ │ │ └──────────────────────────┬──────────────────────────────────┘ │ │ │ │ │ ┌──────────────────────────┴──────────────────────────────────┐ │ │ │ Storage Layer │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │ │ │ │ │ Session │ │ Message │ │ Retained │ │ │ │ │ │ Store │ │ Store │ │ Message Store │ │ │ │ │ └─────────────┘ └─────────────┘ └─────────────────────┘ │ │ │ │ SQLite (Embedded) │ │ │ └──────────────────────────────────────────────────────────────┘ │ │ │ │ ┌──────────────────────────────────────────────────────────────┐ │ │ │ Plugin System (Hooks) │ │ │ │ - 认证钩子 │ │ │ │ - 授权钩子 │ │ │ │ - 消息钩子 │ │ │ └──────────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────┘ ``` ### 2.2 核心模块说明 #### 2.2.1 网络层 (Network Layer) - **TCP Listener**: 标准MQTT over TCP - **WebSocket Listener**: MQTT over WebSocket - **TLS支持**: 基于证书的加密连接 #### 2.2.2 连接管理器 (Connection Manager) - 管理所有客户端连接 - 实现Keep-Alive检测机制 - 连接状态追踪和统计 #### 2.2.3 会话管理器 (Session Manager) - 客户端会话生命周期管理 - Clean Session / Clean Start支持 - 订阅状态持久化 - 离线消息队列 #### 2.2.4 协议处理器 (Protocol Handler) - MQTT报文解析与编码 - 协议状态机管理 - QoS流程处理 #### 2.2.5 发布订阅引擎 (Pub/Sub Engine) - Topic匹配算法 - 消息路由分发 - 保留消息管理 #### 2.2.6 存储层 (Storage Layer) - 会话持久化 - 消息持久化(QoS 1/2) - 保留消息存储 --- ## 3. MQTT 3.1.1 协议实现规格 ### 3.1 支持的报文类型 | 报文类型 | 方向 | 描述 | 支持状态 | |---------|------|------|---------| | CONNECT | Client → Server | 连接请求 | ✅ | | CONNACK | Server → Client | 连接确认 | ✅ | | PUBLISH | 双向 | 发布消息 | ✅ | | PUBACK | 双向 | QoS 1确认 | ✅ | | PUBREC | 双向 | QoS 2接收确认 | ✅ | | PUBREL | 双向 | QoS 2释放 | ✅ | | PUBCOMP | 双向 | QoS 2完成 | ✅ | | SUBSCRIBE | Client → Server | 订阅请求 | ✅ | | SUBACK | Server → Client | 订阅确认 | ✅ | | UNSUBSCRIBE | Client → Server | 取消订阅 | ✅ | | UNSUBACK | Server → Client | 取消订阅确认 | ✅ | | PINGREQ | Client → Server | 心跳请求 | ✅ | | PINGRESP | Server → Client | 心跳响应 | ✅ | | DISCONNECT | Client → Server | 断开连接 | ✅ | ### 3.2 QoS级别支持 #### QoS 0 - At Most Once - 消息最多传递一次 - 无确认机制 - 适用于可丢失消息 #### QoS 1 - At Least Once - 消息至少传递一次 - PUBACK确认机制 - 需要持久化存储 #### QoS 2 - Exactly Once - 消息确保传递一次 - 四次握手(PUBREC/PUBREL/PUBCOMP) - 需要状态管理 ### 3.3 Topic支持 #### 3.3.1 Topic层级 - 使用 `/` 分隔层级 - 支持多级Topic(如 `a/b/c/d`) #### 3.3.2 通配符 - `+`: 单级通配符 - `sensor/+/temperature` 匹配 `sensor/1/temperature` - `#`: 多级通配符 - `sensor/#` 匹配 `sensor/1/temperature` 和 `sensor/1/humidity` ### 3.4 保留消息 (Retained Messages) - 每个Topic保留最新一条保留消息 - 新订阅者立即收到保留消息 - 空payload删除保留消息 ### 3.5 遗嘱消息 (Will Message) - 客户端异常断开时发送 - 支持Will QoS和Will Retain - 连接时指定遗嘱配置 ### 3.6 Clean Session - Clean Session = true: 断开后清除会话 - Clean Session = false: 保持会话,保留订阅和离线消息 --- ## 4. 数据模型设计 ### 4.1 数据库Schema (SQLite) #### 4.1.1 会话表 (sessions) ```sql CREATE TABLE sessions ( client_id TEXT PRIMARY KEY, clean_session INTEGER NOT NULL, created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL, will_topic TEXT, will_payload BLOB, will_qos INTEGER, will_retain INTEGER, expiry_interval INTEGER ); ``` #### 4.1.2 订阅表 (subscriptions) ```sql CREATE TABLE subscriptions ( id INTEGER PRIMARY KEY AUTOINCREMENT, client_id TEXT NOT NULL, topic_filter TEXT NOT NULL, qos INTEGER NOT NULL, created_at INTEGER NOT NULL, UNIQUE(client_id, topic_filter), FOREIGN KEY (client_id) REFERENCES sessions(client_id) ON DELETE CASCADE ); CREATE INDEX idx_subscriptions_client ON subscriptions(client_id); CREATE INDEX idx_subscriptions_topic ON subscriptions(topic_filter); ``` #### 4.1.3 消息存储表 (messages) ```sql CREATE TABLE messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, message_id INTEGER NOT NULL, client_id TEXT NOT NULL, topic TEXT NOT NULL, payload BLOB NOT NULL, qos INTEGER NOT NULL, retain INTEGER NOT NULL, dup INTEGER NOT NULL, created_at INTEGER NOT NULL, status TEXT NOT NULL, UNIQUE(client_id, message_id) ); CREATE INDEX idx_messages_client ON messages(client_id); CREATE INDEX idx_messages_status ON messages(status); ``` #### 4.1.4 保留消息表 (retained_messages) ```sql CREATE TABLE retained_messages ( topic TEXT PRIMARY KEY, payload BLOB NOT NULL, qos INTEGER NOT NULL, updated_at INTEGER NOT NULL ); ``` #### 4.1.5 QoS 2状态表 (qos2_state) ```sql CREATE TABLE qos2_state ( id INTEGER PRIMARY KEY AUTOINCREMENT, client_id TEXT NOT NULL, message_id INTEGER NOT NULL, direction TEXT NOT NULL, status TEXT NOT NULL, created_at INTEGER NOT NULL, UNIQUE(client_id, message_id, direction) ); CREATE INDEX idx_qos2_client ON qos2_state(client_id); ``` ### 4.2 内存数据结构 #### 4.2.1 连接信息 ```cpp struct WillMessage { std::string topic; std::vector payload; uint8_t qos {0}; bool retain {false}; }; struct Connection { std::string clientId; std::shared_ptr socket; bool cleanSession {true}; uint16_t keepAlive {60}; std::chrono::steady_clock::time_point lastActivity; std::optional will; std::string username; std::chrono::system_clock::time_point connectedAt; std::shared_ptr session; }; ``` #### 4.2.2 会话信息 ```cpp struct InflightMessage { uint16_t messageId {0}; std::string topic; std::vector payload; uint8_t qos {0}; bool retain {false}; bool dup {false}; MessageStatus status; std::chrono::system_clock::time_point timestamp; }; struct Session { std::string clientId; bool cleanSession {true}; std::unordered_map subscriptions; std::unordered_map inflightQueue; uint16_t nextMessageId {1}; std::chrono::system_clock::time_point createdAt; }; ``` #### 4.2.3 订阅树 ```cpp struct TopicNode { std::string level; std::unordered_map> children; std::unordered_map subscribers; }; struct TopicTree { std::unique_ptr root; }; ``` --- ## 5. 接口设计 ### 5.1 核心接口定义 #### 5.1.1 Broker接口 ```cpp class IBroker { public: virtual ~IBroker() = default; virtual Result start() = 0; virtual Result stop() = 0; virtual Result addListener(std::shared_ptr listener) = 0; virtual Result removeListener(const std::string& name) = 0; }; ``` #### 5.1.2 Session Manager接口 ```cpp class ISessionManager { public: virtual ~ISessionManager() = default; virtual Result> create(const std::string& clientId, bool cleanSession) = 0; virtual Result> get(const std::string& clientId) = 0; virtual Result remove(const std::string& clientId) = 0; virtual Result addSubscription(const std::string& clientId, const std::string& topic, uint8_t qos) = 0; virtual Result removeSubscription(const std::string& clientId, const std::string& topic) = 0; virtual Result> getSubscriptions(const std::string& clientId) = 0; }; ``` #### 5.1.3 Storage接口 ```cpp class IMessageStore { public: virtual ~IMessageStore() = default; virtual Result storeMessage(const Message& msg) = 0; virtual Result getMessage(const std::string& clientId, uint16_t messageId) = 0; virtual Result deleteMessage(const std::string& clientId, uint16_t messageId) = 0; virtual Result> getPendingMessages(const std::string& clientId) = 0; }; class IRetainedStore { public: virtual ~IRetainedStore() = default; virtual Result storeRetained(const std::string& topic, const std::vector& payload, uint8_t qos) = 0; virtual Result getRetained(const std::string& topic) = 0; virtual Result> getMatchingRetained(const std::string& topicFilter) = 0; virtual Result deleteRetained(const std::string& topic) = 0; }; ``` #### 5.1.4 Hook接口 ```cpp class IHook { public: virtual ~IHook() = default; virtual std::string name() const = 0; virtual Result onConnect(const std::string& clientId, const std::string& username) = 0; virtual void onDisconnect(const std::string& clientId) = 0; virtual Result onSubscribe(const std::string& clientId, const std::string& topic, uint8_t qos) = 0; virtual void onUnsubscribe(const std::string& clientId, const std::string& topic) = 0; virtual Result onPublish(const std::string& clientId, const std::string& topic, const std::vector& payload, uint8_t qos, bool retain) = 0; virtual Result onDeliver(const std::string& clientId, const std::string& topic, const std::vector& payload, uint8_t qos, bool retain, const std::string& fromClientId) = 0; }; ``` --- ## 6. 配置规格 ### 6.1 配置文件格式 (YAML) ```yaml broker: id: "mqtt-broker-1" version: "1.0.0" listeners: tcp: enabled: true bind: "0.0.0.0:1883" max_connections: 10000 tls: enabled: false bind: "0.0.0.0:8883" cert_file: "" key_file: "" websocket: enabled: true bind: "0.0.0.0:8083" path: "/mqtt" http: enabled: true bind: "0.0.0.0:8080" rate_limit_per_minute: 120 session: max_session_expiry: 86400 max_message_queue: 1000 max_inflight: 32 message_expiry: 86400 message: max_packet_size: 1048576 max_topic_length: 65535 max_payload_size: 268435455 storage: type: "sqlite" path: "./data/mqtt.db" pool_size: 10 log: level: "info" format: "json" output: "stdout" hooks: auth: enabled: true type: "anonymous" acl: enabled: false app_auth: admin_username: "admin" admin_password: "admin123" ``` --- ## 7. 性能指标 ### 7.1 目标性能 - 并发连接: 10,000+ - 消息吞吐: 50,000+ msg/sec - 消息延迟: < 10ms (P99) - 内存占用: < 500MB (10k连接) ### 7.2 资源限制 - 单连接内存: ~50KB - 消息队列大小: 可配置 - 最大包大小: 256MB --- ## 8. 安全设计 ### 8.1 认证机制 - 用户名/密码认证 - 匿名访问控制 - 可扩展认证钩子 ### 8.2 授权机制 - Topic级别ACL - 发布/订阅权限分离 - 通配符权限支持 ### 8.3 TLS支持 - TLS 1.2+ - 证书验证 - 双向认证支持 --- ## 9. 监控与运维 ### 9.1 指标暴露 - Prometheus格式指标 - HTTP `/metrics` 端点 ### 9.2 HTTP API(整理版) **统一说明** - 除 `/auth/*` 外,接口需要 `Authorization: Bearer ` - 角色权限 - `admin`:可创建/删除客户端、管理激活码与 ACL,查看全部数据 - `user`:仅可查看“已绑定设备”的数据,可绑定/解绑设备 - 管理员账号由 `app_auth.admin_username/admin_password` 配置,服务启动时会确保账号存在并同步密码 - 服务器地址123.56.226.74:8080 #### 9.2.1 认证 ``` POST /auth/register Body: { "username": "admin", "password": "secret" } Response: { "id": 1, "username": "admin", "role": "user", "created_at": 1700000000 } ``` ``` POST /auth/login Body: { "username": "admin", "password": "secret", "ttl_seconds": 86400 } Response: { "token": "...", "expires_at": 1700086400, "role": "admin" } ``` ``` GET /auth/me Response: { "id": 1, "username": "admin", "role": "admin", "created_at": 1700000000 } ``` #### 9.2.2 客户端(设备) ``` GET /managed-clients GET /managed-clients/{client_id} POST /managed-clients PUT /managed-clients/{client_id} DELETE /managed-clients/{client_id} ``` 示例: ``` POST /managed-clients Body: { "client_id": "device001", "username": "u1", "password": "p1", "enabled": true, "description": "IoT Device" } ``` #### 9.2.3 激活码与绑定 ``` POST /activation-codes GET /activation-codes GET /activation-codes/download ``` ``` POST /device-bindings/activate GET /device-bindings DELETE /device-bindings/{client_id} ``` 示例: ``` POST /device-bindings/activate Body: { "code": "激活码" } ``` #### 9.2.4 消息记录与订阅 ``` GET /message-records?client_id=device001&start_time=1700000000&end_time=1700086399&limit=100&offset=0 GET /subscriptions?client_id=device001 ``` 参数: - `client_id`: 可选,按客户端过滤 - `start_time/end_time`: 秒级时间戳 - `limit/offset`: 分页 #### 9.2.5 ACL(仅 admin) ``` GET /acl-rules GET /acl-rules/{client_id} POST /acl-rules DELETE /acl-rules/{client_id} ``` #### 9.2.6 实时推送(SSE) ``` GET /events?client_id=device001 Header: Authorization: Bearer ``` #### 9.2.7 会话 ``` GET /sessions ``` #### 9.2.8 快速流程示例 1. 管理员创建客户端:`POST /managed-clients` 2. 管理员生成激活码:`POST /activation-codes` 3. App 登录后扫码绑定:`POST /device-bindings/activate` 4. App 查询设备数据:`GET /managed-clients` / `GET /message-records` #### 9.2.9 API 速查 认证 - `POST /auth/register` - `POST /auth/login` - `GET /auth/me` 设备 - `GET /managed-clients` - `GET /managed-clients/{client_id}` - `POST /managed-clients` (admin) - `PUT /managed-clients/{client_id}` (admin) - `DELETE /managed-clients/{client_id}` (admin) 激活码与绑定 - `POST /activation-codes` (admin) - `GET /activation-codes` (admin) - `GET /activation-codes/download` (admin) - `POST /device-bindings/activate` - `GET /device-bindings` - `DELETE /device-bindings/{client_id}` 消息与订阅 - `GET /message-records` - `GET /subscriptions` - `GET /events` (SSE) ACL - `GET /acl-rules` (admin) - `GET /acl-rules/{client_id}` (admin) - `POST /acl-rules` (admin) - `DELETE /acl-rules/{client_id}` (admin) 会话 - `GET /sessions` #### 9.2.10 详细使用示例与说明 基础说明 所有非 `/auth/*` 接口需要 `Authorization: Bearer `。普通用户仅能访问已绑定设备数据。 认证 ```bash curl -X POST http://127.0.0.1:8080/auth/register \ -H "Content-Type: application/json" \ -d '{"username":"u1","password":"p1"}' ``` 响应字段说明 - `id`: 用户 ID - `username`: 用户名 - `role`: 角色(`user` 或 `admin`) - `created_at`: 创建时间(秒级时间戳) ```bash curl -X POST http://127.0.0.1:8080/auth/login \ -H "Content-Type: application/json" \ -d '{"username":"admin","password":"admin123","ttl_seconds":86400}' ``` 响应字段说明 - `token`: Bearer Token - `expires_at`: 过期时间(秒级时间戳) - `role`: 角色 ```bash curl http://127.0.0.1:8080/auth/me \ -H "Authorization: Bearer $TOKEN" ``` 设备(managed clients) ```bash curl http://127.0.0.1:8080/managed-clients?limit=200&offset=0 \ -H "Authorization: Bearer $TOKEN" ``` 返回字段说明(每条设备) - `id` - `client_id` - `username` - `enabled` - `description` - `created_at` - `updated_at` ```bash curl -X POST http://127.0.0.1:8080/managed-clients \ -H "Authorization: Bearer $ADMIN_TOKEN" \ -H "Content-Type: application/json" \ -d '{"client_id":"device001","username":"dev1","password":"pwd1","enabled":true,"description":"sensor"}' ``` ```bash curl -X PUT http://127.0.0.1:8080/managed-clients/device001 \ -H "Authorization: Bearer $ADMIN_TOKEN" \ -H "Content-Type: application/json" \ -d '{"username":"dev1","password":"pwd2","enabled":true,"description":"sensor-v2"}' ``` ```bash curl -X DELETE http://127.0.0.1:8080/managed-clients/device001 \ -H "Authorization: Bearer $ADMIN_TOKEN" ``` 激活码 ```bash curl -X POST http://127.0.0.1:8080/activation-codes \ -H "Authorization: Bearer $ADMIN_TOKEN" \ -H "Content-Type: application/json" \ -d '{"client_id":"device001","force":false}' ``` 字段说明 - `force`: 为 `true` 时会覆盖已有激活码 返回字段说明 - `id` - `client_id` - `code` - `enabled` - `created_at` - `updated_at` - `bound` / `bound_user_id` / `bound_at` ```bash curl http://127.0.0.1:8080/activation-codes \ -H "Authorization: Bearer $ADMIN_TOKEN" ``` ```bash curl -o activation_codes.csv \ -H "Authorization: Bearer $ADMIN_TOKEN" \ http://127.0.0.1:8080/activation-codes/download ``` 设备绑定 ```bash curl -X POST http://127.0.0.1:8080/device-bindings/activate \ -H "Authorization: Bearer $TOKEN" \ -H "Content-Type: application/json" \ -d '{"code":"abcd1234ef567890"}' ``` 绑定失败错误码 - `INVALID_CODE`:激活码不存在 - `CODE_DISABLED`:激活码禁用 - `ALREADY_BOUND`:激活码已绑定 - `BIND_FAILED`:绑定失败(如已绑定其他用户) ```bash curl http://127.0.0.1:8080/device-bindings \ -H "Authorization: Bearer $TOKEN" ``` ```bash curl -X DELETE http://127.0.0.1:8080/device-bindings/device001 \ -H "Authorization: Bearer $TOKEN" ``` 消息记录 ```bash curl "http://127.0.0.1:8080/message-records?client_id=device001&start_time=1700000000&end_time=1700086399&limit=100&offset=0" \ -H "Authorization: Bearer $TOKEN" ``` 时间参数说明 - `start_time` / `end_time` 支持秒级时间戳 - 也支持 UTC 时间串 `YYYY-MM-DDTHH:mm:ssZ` 返回字段说明(每条记录) - `id` - `client_id` - `topic` - `payload`(十六进制字符串) - `qos` - `retain` - `direction`(`inbound`/`outbound`) - `created_at` 订阅 ```bash curl "http://127.0.0.1:8080/subscriptions?client_id=device001" \ -H "Authorization: Bearer $TOKEN" ``` 返回字段说明(每条订阅) - `client_id` - `topic_filter` - `qos` 会话 ```bash curl "http://127.0.0.1:8080/sessions?connected=true" \ -H "Authorization: Bearer $TOKEN" ``` 返回字段说明(每条会话) - `client_id` - `clean_session` - `connected` - `updated_at` ACL(仅 admin) ```bash curl http://127.0.0.1:8080/acl-rules?limit=200&offset=0 \ -H "Authorization: Bearer $ADMIN_TOKEN" ``` ```bash curl http://127.0.0.1:8080/acl-rules/device001 \ -H "Authorization: Bearer $ADMIN_TOKEN" ``` ```bash curl -X POST http://127.0.0.1:8080/acl-rules \ -H "Authorization: Bearer $ADMIN_TOKEN" \ -H "Content-Type: application/json" \ -d '{"client_id":"device001","topic_filter":"sensors/+","can_publish":true,"can_subscribe":true}' ``` ```bash curl -X DELETE http://127.0.0.1:8080/acl-rules/device001 \ -H "Authorization: Bearer $ADMIN_TOKEN" ``` 实时事件(SSE) ```bash curl -N "http://127.0.0.1:8080/events?client_id=device001" \ -H "Authorization: Bearer $TOKEN" ``` 事件推送内容为消息记录 JSON,字段与 `GET /message-records` 一致。 指标 ```bash curl http://127.0.0.1:8080/metrics ``` ### 9.3 日志 - 结构化日志 - 可配置级别 - 请求追踪 --- ## 10. 项目结构 ``` mqtt-broker/ ├── src/ │ ├── broker/ │ │ ├── broker.cpp │ │ └── options.hpp │ ├── connection/ │ │ ├── manager.cpp │ │ └── connection.hpp │ ├── session/ │ │ ├── manager.cpp │ │ └── session.hpp │ ├── protocol/ │ │ ├── packet/ │ │ │ ├── packet.hpp │ │ │ ├── connect.cpp │ │ │ ├── publish.cpp │ │ │ ├── subscribe.cpp │ │ │ └── ... │ │ └── handler.cpp │ ├── pubsub/ │ │ ├── engine.cpp │ │ ├── topic.cpp │ │ └── router.cpp │ ├── storage/ │ │ ├── store.hpp │ │ ├── sqlite/ │ │ │ ├── session_store.cpp │ │ │ ├── message_store.cpp │ │ │ └── retained_store.cpp │ │ └── interface.hpp │ ├── hooks/ │ │ ├── hook.hpp │ │ ├── auth.cpp │ │ └── acl.cpp │ └── listener/ │ ├── tcp_listener.cpp │ ├── tls_listener.cpp │ └── websocket_listener.cpp ├── include/ │ ├── broker/ │ ├── protocol/ │ └── common/ ├── app/ │ └── main.cpp ├── third_party/ ├── config/ │ └── config.yaml ├── test/ │ ├── integration/ │ └── unit/ ├── CMakeLists.txt └── conanfile.txt ``` --- ## 11. 测试策略 ### 11.1 单元测试 - 协议解析测试 - Topic匹配测试 - 存储层测试 ### 11.2 集成测试 - 端到端消息流程 - QoS流程验证 - 会话持久化验证 ### 11.3 压力测试 - 并发连接测试 - 消息吞吐测试 - 长时间稳定性测试 --- ## 12. 依赖库 ```cmake # 推荐通过 Conan 管理依赖 find_package(asio CONFIG REQUIRED) # 或 Boost::asio find_package(spdlog CONFIG REQUIRED) find_package(yaml-cpp CONFIG REQUIRED) find_package(SQLite3 REQUIRED) find_package(OpenSSL REQUIRED) find_package(websocketpp CONFIG REQUIRED) # 或 Boost.Beast ``` ## 13. 前端ui 支持前端注册登录 登录成功后才能和后端进行交互 参考EMQX的实现 ### 13.1 App 开发方案(激活码绑定) **核心页面** - 登录/注册:使用 `/auth/register`、`/auth/login` 获取 Token - 设备绑定:输入激活码,调用 `/device-bindings/activate` - 设备列表:调用 `/managed-clients`,展示绑定设备 - 设备详情:设备基础信息 + 最近消息 - 消息历史:调用 `/message-records?client_id=...` **App 侧基础流程** 1. 用户注册/登录,保存 Token(本地安全存储) 2. 输入激活码或扫描二维码绑定设备 3. 拉取绑定设备列表 4. 进入设备详情,拉取消息记录与订阅状态 5. 解绑设备后,从列表移除 **权限与安全** - 所有 App 请求带 `Authorization: Bearer ` - Token 过期后重新登录 - 未绑定设备不允许访问其数据(服务端已强制过滤) **数据刷新建议** - 设备列表:进入页面时刷新 - 设备消息:分页/按时间区间查询 - 需要实时展示时,使用 SSE:`GET /events?client_id=...` --- ## 14. C++实现落地说明(MVP) ### 14.1 当前已实现模块 - 基础工程结构(`CMakeLists.txt` + `conanfile.txt`) - 配置加载(YAML) - 日志初始化(spdlog) - TCP Listener(1883) - TLS Listener(8883,证书配置) - MQTT 报文基础解析(CONNECT / PUBLISH / SUBSCRIBE / PINGREQ / DISCONNECT) - MQTT 报文基础解析(CONNECT / PUBLISH / SUBSCRIBE / UNSUBSCRIBE / PINGREQ / DISCONNECT) - QoS1/QoS2 入站流程(PUBACK / PUBREC / PUBREL / PUBCOMP) - QoS1/QoS2 出站 inflight 跟踪与超时重传(含 DUP/PUBREL) - QoS inflight SQLite 持久化与重连恢复发送 - 订阅关系 SQLite 持久化与重连恢复 - Retained 消息持久化与订阅后下发 - ACL 规则持久化与发布/订阅鉴权(`/acl-rules`) - 会话管理(内存) - Topic 订阅路由(支持 `+` / `#`) - 消息转发(在线转发 + 离线队列暂存) - SQLite 持久化(托管客户端、消息记录) - 认证模式:`anonymous` / `managed` - HTTP 管理 API(`/managed-clients`、`/message-records`、`/metrics`) - Prometheus 指标扩展(发布吞吐、ACL拒绝、重传计数) - 管理 API 安全加固(Bearer Token、密码哈希、基础限流) - 前端注册登录 API(`/auth/register`、`/auth/login`) ### 14.2 代码入口 - 启动入口:`app/main.cpp` - 核心 Broker:`src/broker/broker.cpp` - 协议编解码:`src/protocol/packet.cpp` - 配置模块:`src/common/config.cpp` ### 14.3 构建方式(Linux + Conan) ```bash # 1) 生成默认 profile(可选) conan profile detect --force # 2) 使用 Linux profile 安装依赖和生成工具链 conan install . \ --output-folder=build \ --build=missing \ -pr:h=profiles/linux-gcc-release # 3) 配置 cmake -S . -B build \ -DCMAKE_TOOLCHAIN_FILE=build/build/Release/generators/conan_toolchain.cmake \ -DCMAKE_BUILD_TYPE=Release # 4) 编译 cmake --build build -j # 5) 运行 ./build/mqtt-broker config/config.yaml ``` ### 14.4 下一步建议 - 补全 QoS1/QoS2 状态机与 ACK 流程 - 增加 TLS/WSS 监听 - 接入 SQLite 持久化会话/离线消息/保留消息 - 补充 HTTP 管理 API 与 Prometheus 指标 ### 14.5 管理API安全说明 - 托管客户端密码以 `sha256` 哈希形式存储(接口返回不包含密码字段) - 管理 API 启用基础限流:`listeners.http.rate_limit_per_minute` ### 14.6 功能验收命令(Linux) ```bash # 0) 启动 Broker ./build/mqtt-broker config/config.yaml ``` ```bash # 1) 注册与登录(获取 API token) curl -s -X POST http://127.0.0.1:8080/auth/register \ -H "Content-Type: application/json" \ -d '{"username":"admin","password":"123456"}' TOKEN=$(curl -s -X POST http://127.0.0.1:8080/auth/login \ -H "Content-Type: application/json" \ -d '{"username":"admin","password":"123456","ttl_seconds":86400}' | jq -r .token) echo "$TOKEN" ``` ```bash # 2) 托管客户端与 ACL 配置 curl -s -X POST http://127.0.0.1:8080/managed-clients \ -H "Authorization: Bearer $TOKEN" \ -H "Content-Type: application/json" \ -d '{"client_id":"device001","username":"u1","password":"p1","enabled":true}' curl -s -X POST http://127.0.0.1:8080/acl-rules \ -H "Authorization: Bearer $TOKEN" \ -H "Content-Type: application/json" \ -d '{"client_id":"device001","topic_filter":"demo/#","can_publish":true,"can_subscribe":true}' ``` ```bash # 3) MQTT TCP 连接与收发(mosquitto-clients) # 终端A:订阅 mosquitto_sub -h 127.0.0.1 -p 1883 -i device001 -u u1 -P p1 -t demo/test -q 1 -d # 终端B:发布 mosquitto_pub -h 127.0.0.1 -p 1883 -i device001 -u u1 -P p1 -t demo/test -m "hello" -q 1 -d ``` ```bash # 4) Retained 消息验证 mosquitto_pub -h 127.0.0.1 -p 1883 -i device001 -u u1 -P p1 -t demo/retain -m "retained" -r -q 1 mosquitto_sub -h 127.0.0.1 -p 1883 -i device001 -u u1 -P p1 -t demo/# -q 1 -C 1 -v ``` ```bash # 5) TLS 监听验证(先在 config 启用 listeners.tls,并配置 cert_file/key_file) mosquitto_sub -h 127.0.0.1 -p 8883 --cafile server.crt -i device001 -u u1 -P p1 -t demo/tls -q 1 -d mosquitto_pub -h 127.0.0.1 -p 8883 --cafile server.crt -i device001 -u u1 -P p1 -t demo/tls -m "tls-ok" -q 1 -d ``` ```bash # 6) 管理 API 查询 curl -s -H "Authorization: Bearer $TOKEN" http://127.0.0.1:8080/managed-clients curl -s -H "Authorization: Bearer $TOKEN" "http://127.0.0.1:8080/message-records?limit=20&offset=0" curl -s -H "Authorization: Bearer $TOKEN" http://127.0.0.1:8080/metrics ``` ## 15. Frontend��EMQX ��� - ��̬��ԴĿ¼Ĭ�ϣ�`./web` - HTTP ����������`listeners.http.web_root` - �������ʣ�`http://127.0.0.1:8080/` - ҳ�������� - ע��/��¼��`/auth/register`��`/auth/login`�� - ����ָ�꿴�壨`/metrics`�� - �йܿͻ��˹��� - ACL ������� - ��Ϣ��¼��ѯ ```bash # Linux ������֤ ./scripts/build_linux.sh ./build/mqtt-broker config/config.yaml # ��������� xdg-open http://127.0.0.1:8080/ ``` ## 16. 最新实现进展(2026-02-26) - 已实现:离线消息 SQLite 持久化(`offline_messages` 表) - 行为:客户端离线时消息落库;客户端重连后自动回放并删除已投递记录 - 清理:`clean session=true` 时会清理该客户端离线消息 - 额外:已实现 KeepAlive 超时踢线 + Will Message 异常断开发布 ## 17. WebSocket / WSS 监听实现 - 已实现 MQTT over WebSocket (`listeners.websocket`) - 已实现 MQTT over Secure WebSocket (`listeners.wss`) - `path` 可配置(默认 `/mqtt`) - WSS 需配置证书:`listeners.wss.cert_file` 与 `listeners.wss.key_file` - WS/WSS 连接已接入现有鉴权、ACL、QoS、离线消息回放、KeepAlive、Will Message 处理流程 ## 18. Hook 插件系统 支持动态加载 Hook 插件,用于认证、授权、消息处理、审计等扩展逻辑。 ### 18.1 配置 ```yaml hooks: plugins: - "./plugins/libsample_hook.so" ``` ### 18.2 IHook 方法含义(速查) - `onConnect(clientId, username)`:客户端连接鉴权,返回失败会拒绝连接 - `onDisconnect(clientId)`:客户端断开通知 - `onSubscribe(clientId, topic, qos)`:订阅鉴权 - `onUnsubscribe(clientId, topic)`:取消订阅通知 - `onPublish(clientId, topic, payload, qos, retain)`:发布鉴权 - `onDeliver(clientId, topic, payload, qos, retain, fromClientId)`:服务端向订阅者投递前的鉴权 ### 18.3 插件导出符号 插件需导出以下 C 符号: - `mqtt_create_hook()` - `mqtt_destroy_hook(IHook*)` - `mqtt_hook_version()`(返回 `1`) 接口定义见: - `include/hooks/hook.hpp` - `include/hooks/plugin_api.hpp` ### 18.4 示例插件构建 ```bash # 生成 build cmake -S . -B build -DCMAKE_BUILD_TYPE=Release # 构建 broker + sample_hook cmake --build build -j # sample_hook 输出位置: # build/plugins/libsample_hook.so ``` ## 19. OTA 固件升级 ### 19.1 设计说明 - 管理员通过 Web 控制台上传固件,**按 client_id 绑定固件**(同一 client_id 内版本号唯一) - Client 端定期调用公开接口查询**指定 client_id** 的最新版本,与本地版本对比,决定是否下载升级 - 下载接口**无需 token**,Client 直接 HTTP GET 即可 - 固件文件存储在服务器 `ota//` 目录(与 `web/` 同级) ### 19.2 管理员操作(Web 控制台) 登录后在左侧菜单选择 **OTA 固件**(仅 admin 角色可见): - **上传固件**:选择 `client_id`,填写版本号(如 `1.2.0`)、描述,选择固件文件,点击上传 - **下载固件**:点击列表中的"下载"按钮 - **删除固件**:点击"删除",同时删除服务器上的文件 ### 19.3 API 说明 | 方法 | 路径 | 认证 | 说明 | |------|------|------|------| | GET | `/ota/latest?client_id=...` | 无 | 查询指定 client_id 最新固件信息 | | GET | `/ota/firmware/:id/download` | 无 | 下载固件文件 | | GET | `/ota/firmwares` | admin token | 列出所有固件(可选 `client_id` 过滤) | | POST | `/ota/firmwares` | admin token | 上传固件(multipart/form-data) | | DELETE | `/ota/firmwares/:id` | admin token | 删除固件及文件 | 上传固件的 multipart 字段: | 字段 | 类型 | 必填 | 说明 | |------|------|------|------| | `client_id` | text | 是 | 绑定的 client_id | | `version` | text | 是 | 版本号字符串,需唯一 | | `file` | file | 是 | 固件二进制文件 | | `description` | text | 否 | 版本描述 | #### 查询最新固件 ```bash curl "http://127.0.0.1:8080/ota/latest?client_id=device-001" ``` 响应示例: ```json { "id": 3, "client_id": "device-001", "version": "1.2.0", "filename": "firmware_v1.2.0.bin", "filesize": 204800, "description": "修复连接稳定性问题", "created_at": 1741132800 } ``` 无固件时返回 `404`。 #### 下载固件 ```bash curl -O http://127.0.0.1:8080/ota/firmware/3/download ``` #### 上传固件(admin) ```bash curl -X POST http://127.0.0.1:8080/ota/firmwares \ -H "Authorization: Bearer $ADMIN_TOKEN" \ -F "client_id=device-001" \ -F "version=1.2.0" \ -F "description=修复连接稳定性问题" \ -F "file=@firmware_v1.2.0.bin" ``` #### 删除固件(admin) ```bash curl -X DELETE http://127.0.0.1:8080/ota/firmwares/3 \ -H "Authorization: Bearer $ADMIN_TOKEN" ``` ### 19.4 Client 端升级流程 ```c // 1. 查询最新版本(无需认证) response = http_get("http://server/ota/latest?client_id=DEVICE_ID"); if (response.status == 404) return; // 无固件 latest = json_parse(response.body); // 2. 对比版本号,相同则跳过 if (strcmp(latest.version, CURRENT_VERSION) == 0) return; // 3. 下载固件 fw_data = http_get("http://server/ota/firmware/" + latest.id + "/download"); // 4. 校验并写入 flash,重启 ota_write_and_reboot(fw_data); ``` ## 20. Android App 自动更新 ### 20.1 设计说明 - 管理员通过 Web 控制台上传 APK,支持标记"强制更新" - App 每次登录成功后自动在后台查询 `/app/latest`,对比当前版本号 - 有新版本时弹出对话框提示用户,强制更新时对话框不可取消 - 下载和安装均在 SDK 内部完成,业务层无需关心 ### 20.2 服务器 API | 方法 | 路径 | 认证 | 说明 | |------|------|------|------| | GET | `/app/latest` | 无 | 查询最新 App 版本信息 | | GET | `/app/release/:id/download` | 无 | 下载 APK 文件 | | GET | `/app/releases` | admin token | 列出所有版本 | | POST | `/app/releases` | admin token | 上传 APK(multipart/form-data) | | DELETE | `/app/releases/:id` | admin token | 删除版本及文件 | 上传字段: | 字段 | 类型 | 必填 | 说明 | |------|------|------|------| | `version` | text | 是 | 版本号,需与 APK versionName 一致 | | `file` | file | 是 | APK 文件 | | `description` | text | 否 | 更新说明 | | `force_update` | text | 否 | `true` 表示强制更新 | `/app/latest` 响应示例: ```json { "id": 5, "version": "1.2.0", "filename": "iotapp-1.2.0.apk", "filesize": 5242880, "description": "新增 OTA 升级功能", "force_update": false, "created_at": 1741132800 } ``` APK 文件存储在服务器 `apk/` 目录(与 `web/` 同级)。 ### 20.3 管理员操作(Web 控制台) 登录后在左侧菜单选择 **App 版本**(仅 admin 可见): - **上传 APK**:填写版本号、描述,勾选是否强制更新,选择 APK 文件,点击上传 - **下载 APK**:点击列表中的"下载" - **删除版本**:点击"删除",同时删除服务器文件 ### 20.4 Android SDK 使用 #### 自动检查(推荐) 登录成功后 SDK 自动触发,无需额外代码。`LoginActivity.openMain()` 中已调用: ```java AppUpdateManager.check(context, sdk.apiClient, true); ``` #### 手动触发检查 ```java // silent=false 时若无更新也会打印日志 AppUpdateManager.check(context, SdkFacade.get(context).apiClient, false); ``` #### 仅查询版本信息(不弹框) ```java AppExecutors.io().execute(() -> { try { AppUpdateInfo info = AppUpdateManager.fetchLatest(sdk.apiClient); if (info != null) { // 对比 info.version 与当前版本 } } catch (Exception e) { ... } }); ``` #### 涉及文件 | 文件 | 说明 | |------|------| | `sdk/update/AppUpdateInfo.java` | 版本信息数据模型 | | `sdk/update/AppUpdateManager.java` | 检查、下载、安装逻辑 | | `res/xml/file_provider_paths.xml` | FileProvider 路径配置 | #### AndroidManifest 已添加 ```xml ``` ### 20.5 curl 示例 ```bash # 上传 APK(admin) curl -X POST http://127.0.0.1:8080/app/releases \ -H "Authorization: Bearer $ADMIN_TOKEN" \ -F "version=1.2.0" \ -F "description=新增 OTA 升级功能" \ -F "force_update=false" \ -F "file=@iotapp-1.2.0.apk" # 查询最新版本(无需认证) curl http://127.0.0.1:8080/app/latest # 下载 APK(无需认证) curl -O http://127.0.0.1:8080/app/release/5/download # 删除版本(admin) curl -X DELETE http://127.0.0.1:8080/app/releases/5 \ -H "Authorization: Bearer $ADMIN_TOKEN" ```