# mpush **Repository Path**: jeecp/mpush ## Basic Information - **Project Name**: mpush - **Description**: kiro重构消息推送 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-03-23 - **Last Updated**: 2026-03-23 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # MPush 技术手册 > 版本:1.0 | 日期:2026-03-21 | 适用模块:mpush-api / mpush-common / mpush-core / mpush-tools / mpush-client / alloc / mpush-client-js --- ## 目录 1. [项目概述](#1-项目概述) 2. [与开源原版架构对比](#2-与开源原版架构对比) - 2.1 [服务发现机制](#21-服务发现机制) - 2.2 [消息推送可靠性](#22-消息推送可靠性) - 2.3 [性能架构](#23-性能架构) - 2.4 [整体架构结构差异](#24-整体架构结构差异) 3. [整体架构设计](#3-整体架构设计) 4. [模块划分与职责](#4-模块划分与职责) - 4.1 [模块职责与依赖关系](#41-模块职责与依赖关系) - 4.2 [协议消息体系(ByteBufMessage)](#42-协议消息体系bytebufmessage) - 5.1 [整体设计](#51-整体设计) - 5.2 [Redis 数据结构](#52-redis-数据结构) - 5.3 [节点注册流程](#53-节点注册流程) - 5.4 [心跳续约机制](#54-心跳续约机制) - 5.5 [服务发现与变更订阅](#55-服务发现与变更订阅) - 5.6 [k8s 重启场景下的节点存活校验](#56-k8s-重启场景下的节点存活校验) - 5.7 [分布式锁](#57-分布式锁) - 5.8 [RedissonManager 初始化](#58-redissonmanager-初始化) 6. [关键技术方案](#6-关键技术方案) - 6.1 [Disruptor 推送任务调度](#61-disruptor-推送任务调度) - 6.2 [全链路消息追踪(traceId)](#62-全链路消息追踪traceid) - 6.3 [ACK 可靠投递与重试](#63-ack-可靠投递与重试) - 6.4 [双层路由管理](#64-双层路由管理) - 6.5 [Topic 异步推送](#65-topic-异步推送) - 6.6 [流量控制](#66-流量控制) - 6.7 [Fast Connect 快速重连](#67-fast-connect-快速重连) - 6.8 [FastFlowControl 广播流控](#68-fastflowcontrol-广播流控) 7. [消息推送全链路](#7-消息推送全链路) - 7.1 [链路总览](#71-链路总览) - 7.2 [HTTP 接收层](#72-http-接收层) - 7.3 [任务入队:Disruptor Producer](#73-任务入队disruptor-producer) - 7.4 [Disruptor 消费与分发](#74-disruptor-消费与分发) - 7.5 [路由查找与消息发送](#75-路由查找与消息发送) - 7.6 [Netty Channel 写出](#76-netty-channel-写出) - 7.7 [ACK 回调](#77-ack-回调) - 7.8 [线程模型总览](#78-线程模型总览) 8. [核心数据流](#8-核心数据流) - 8.1 [单播推送完整链路](#81-单播推送完整链路) - 8.2 [消息状态流转](#82-消息状态流转) 9. [百万并发能力评估](#9-百万并发能力评估) 10. [架构改善空间](#10-架构改善空间) 11. [配置参考](#11-配置参考) --- ## 1. 项目概述 MPush 是一套面向移动端和 IoT 设备的高性能长连接推送平台,支持单播、广播、Topic 订阅三种推送模式,具备端到端消息追踪、ACK 可靠投递、Disruptor 无锁调度等核心能力。 ### 核心能力 | 能力 | 说明 | |------|------| | 单播推送 | 按 userId + clientType 精确投递,支持本地路由优先 | | 广播推送 | 遍历全部本地路由,令牌桶流控分批发送 | | Topic 推送 | Redis Set 维护订阅关系,Disruptor 异步解耦 HTTP 线程 | | 全链路追踪 | 每条消息携带 traceId,各节点写 Redis Hash 记录状态 | | ACK 可靠投递 | 客户端 ACK 超时后服务端自动重试,最多 3 次 | | 消息去重 | 客户端基于 traceId 的 MsgDedup 防止重复消费 | | 弹性容错 | Disruptor 满时降级直接执行,Redis 异常不阻塞推送主路径 | --- ## 2. 与开源原版架构对比 MPush 基于开源项目 [mpush](https://github.com/mpusher/mpush) 二次开发,在保留其核心长连接推送能力的基础上,替换了服务发现组件、升级了 Redis 客户端、引入了 Disruptor 无锁调度、增加了全链路消息追踪与 ACK 可靠投递等能力。以下从四个维度说明本版本与开源原版的主要差异。 ### 2.1 服务发现机制 | 维度 | 开源原版(ZooKeeper) | 本版本(Redisson + Redis) | |------|----------------------|--------------------------| | 依赖组件 | ZooKeeper 集群 | Redis(已有依赖,无新增) | | 节点注册 | ZNode 临时节点 | `RMapCache` TTL 30s | | 心跳续约 | ZK Session 自动维持 | 独立 `srd-heartbeat` 线程每 15s 续约 | | 变更通知 | ZK Watcher | Redis `RTopic` Pub/Sub | | IP 漂移处理 | 无 | 心跳时检测 IP 变化,自动重注册 | | 节点恢复 | ZK Session 重连 | Redis 重启后心跳自动补注册 | ### 2.2 消息推送可靠性 | 维度 | 开源原版 | 本版本 | |------|----------|--------| | 消息追踪 | 无 | 全链路 traceId,Redis Hash 记录各节点状态 | | ACK 重试 | 无 | 最多 3 次,线性退避(2s/4s/6s) | | 消息去重 | 无 | 客户端 `MsgDedup`(基于 traceId) | | 脏路由清理 | 手动 | 自动检测并清除 | | k8s 节点保护 | 无 | redirect 前 SRD 存活校验 | ### 2.3 性能架构 | 维度 | 开源原版 | 本版本 | |------|----------|--------| | 推送调度 | 传统线程池 | Disruptor WorkerPool(无锁) | | Redis 客户端 | Jedis(同步阻塞) | Redisson(异步,支持集群/哨兵) | | 追踪写入 | 同步阻塞 | 异步非阻塞,不影响推送主路径 | | Topic 解耦 | 同步 | alloc 侧独立 Disruptor 异步解耦 | ### 2.4 整体架构结构差异 | 维度 | 开源原版 | 本版本 | |------|----------|--------| | 服务发现 | ZooKeeper | Redis(Redisson RMapCache + RTopic) | | 分布式锁 | ZooKeeper | Redisson RLock | | 启动入口 | mpush-boot | alloc(Spring Boot) | | 客户端 JS | 无 | mpush-client-js(ThPush WebSocket) | --- ## 3. 整体架构设计 ``` ┌─────────────────────────────────────────────────────────────────┐ │ 前端 / 业务方 │ │ mpush-client-js (ThPush WebSocket) / HTTP REST API │ └──────────────────────────┬──────────────────────────────────────┘ │ HTTP POST /exec ▼ ┌─────────────────────────────────────────────────────────────────┐ │ alloc 接入层 (Spring Boot) │ │ AllocController → PushHandler / TopicPushHandler │ │ PushService (单播/广播) │ TopicPushExecutor (Topic) │ │ PushWorkerPool (Disruptor RingBuffer) │ └──────────────────────────┬──────────────────────────────────────┘ │ PushSender.send(PushContext) ▼ ┌─────────────────────────────────────────────────────────────────┐ │ mpush-client (PushRequest) │ │ 查询 RemoteRouter → GatewayPushMessage 编码 → 发往 Gateway │ └──────────────────────────┬──────────────────────────────────────┘ │ TCP/WebSocket (GatewayPushMessage 二进制协议) ▼ ┌─────────────────────────────────────────────────────────────────┐ │ mpush-core Gateway 层 │ │ PushCenter → DisruptorPushTaskExecutor │ │ SingleUserPushTask / BroadcastPushTask │ │ GatewayPushListener (回调:DELIVERED/ACK/TIMEOUT/FAILED) │ │ AckTaskQueue + AckRetryScheduler │ └──────────────────────────┬──────────────────────────────────────┘ │ Netty Channel (PushMessage 二进制协议) ▼ ┌─────────────────────────────────────────────────────────────────┐ │ 移动端 / IoT 设备 / 浏览器 │ │ ThPush (WebSocket) / Android SDK / iOS SDK │ └─────────────────────────────────────────────────────────────────┘ ┌──────────────┐ │ Redis │ 路由表 / 消息状态 / Topic 订阅 └──────────────┘ ``` ### 关键设计原则 1. **推送主路径零阻塞**:所有 Redis 写操作(消息追踪)使用异步 API,不阻塞 Netty EventLoop。 2. **无锁调度**:Gateway 层使用 Disruptor WorkerPool 替代传统线程池,消除锁竞争。 3. **消息不丢失**:RingBuffer 满时降级直接执行(fallback),ACK 超时后服务端重试。 4. **per-message trace 控制**:追踪开关随消息协议传递,不依赖全局配置,精确控制。 5. **SPI 可扩展**:MessagePusher、PushListener、CacheManager 等核心组件均支持 SPI 替换。 --- ## 4. 模块划分与职责 ### 4.1 模块职责与依赖关系 | 模块 | 职责 | |------|------| | `mpush-api` | 接口定义层:Connection、IPushMessage、PushContext、事件体系、SPI 接口 | | `mpush-common` | 公共实现:协议编解码(GatewayPushMessage、AckMessage、PushMessage)、条件过滤、流控基类 | | `mpush-tools` | 工具层:DisruptorConfig、AbstractDisruptorWorkerPool、CC 配置读取、Jsons、日志 | | `mpush-core` | 核心推送引擎:PushCenter、DisruptorPushTaskExecutor、SingleUserPushTask、BroadcastPushTask、GatewayPushListener、AckTaskQueue、AckRetryScheduler | | `mpush-client` | 推送客户端:PushRequest、GatewayConnectionFactory,负责将 PushContext 转换为 GatewayPushMessage 并发往 Gateway | | `mpush-cache` | Redis 操作封装:RedisMessageTracer(消息状态写入)、RedissonManager | | `mpush-boot` | Gateway 服务启动入口:ServerLauncher 按序编排 CacheManager → ServiceRegistry/Discovery → ConnectionServer / WebSocketServer / GatewayServer → RouterCenter → PushCenter → Monitor,注册 JVM ShutdownHook 优雅停机 | | `alloc` | HTTP 接入层(Spring Boot):AllocController、PushService、TopicPushExecutor、PushWorkerPool、消息状态查询 REST API | | `mpush-client-js` | 浏览器 WebSocket 客户端:ThPush、XhrPool、OfflineQueue、MsgDedup | ### 模块依赖关系 ``` mpush-api ↑ mpush-common ← mpush-tools ↑ ↑ mpush-cache (disruptor) ↑ mpush-core ← mpush-client ↑ ↑ │ │ mpush-boot alloc (Spring Boot 启动入口) (Gateway 服务 (HTTP 接入层,推送请求入口) 独立进程) ↑ │ │ HTTP POST /exec(ThPush 发起推送请求) │ ◄───────────┘ │ │ ↕ WebSocket 长连接(下行:PUSH 帧;上行:ACK / BIND / HANDSHAKE) │ mpush-client-js(ThPush / XhrPool / OfflineQueue / MsgDedup) ``` > `mpush-boot` 与 `alloc` 是两个独立进程,分别承担不同职责: > - `mpush-boot`:启动 Gateway Server / Connection Server / WebSocket Server,负责维持设备长连接、消息下发、路由管理,通过 `ServerLauncher` 按顺序初始化 CacheManager → ServiceRegistry → 各 Server → RouterCenter → PushCenter → Monitor。 > - `alloc`:Spring Boot HTTP 服务,接收业务方推送请求,通过 `mpush-client` 将消息转发给 Gateway 节点。 --- ### 4.2 协议消息体系(ByteBufMessage) #### 设计思路 所有在 Netty Channel 上传输的消息均继承自 `BaseMessage`,其中需要自定义二进制编解码的子类进一步继承 `ByteBufMessage`。 ``` BaseMessage(抽象基类) │ 持有 Packet + Connection │ 统一处理:加密/解密、压缩/解压、JSON/Binary 两种体格式 │ send() / sendRaw() → connection.send(packet, listener) │ └─ ByteBufMessage(抽象,二进制编解码基类) │ decode(byte[]) → Unpooled.wrappedBuffer → decode(ByteBuf) │ encode() → channel.alloc().heapBuffer() → encode(ByteBuf) │ 提供 encodeString/decodeString、encodeBytes/decodeBytes、 │ encodeInt/decodeInt、encodeLong/decodeLong 等工具方法 │ 字符串/字节数组采用 2 字节 length 前缀(超过 Short.MAX_VALUE 时扩展 4 字节) │ ├─ 连接握手类 │ HandshakeMessage 客户端 → 服务端,首次握手(RSA 加密) │ HandshakeOkMessage 服务端 → 客户端,握手成功响应(含 sessionId + AES serverKey) │ FastConnectMessage 客户端 → 服务端,快速重连(复用 sessionId,跳过 RSA) │ FastConnectOkMessage 服务端 → 客户端,快速重连成功响应 │ ├─ 用户绑定类 │ BindUserMessage 客户端 → 服务端,绑定 userId + token + tags │ ├─ 推送消息类(核心) │ GatewayPushMessage alloc → Gateway,单播/广播/Topic 推送载体 │ AckMessage 客户端 → 服务端,消息确认回执 │ ├─ 踢人类 │ KickUserMessage 服务端 → 客户端,通知设备被踢下线 │ GatewayKickUserMessage Gateway 节点间,跨节点踢人指令 │ └─ 通用响应类 OkMessage 服务端 → 客户端,通用成功响应(cmd + code + data) ErrorMessage 服务端 → 客户端,通用错误响应(cmd + code + reason + data) ``` `Packet` 是底层传输单元,携带 `cmd`(Command 枚举)、`sessionId`、`flags`(加密/压缩/ACK/JSON 标志位)和 `body`(序列化后的消息体)。`ByteBufMessage` 的 `encode/decode` 只负责业务字段的序列化,加密和压缩由 `BaseMessage` 统一处理。 --- #### 各消息在三种推送模式中的作用 #### 单播推送(点对点) ``` alloc(Tomcat 线程) │ 构建 GatewayPushMessage │ userId = "user123" ← 非 null,标识单播目标 │ clientType = 1 ← 指定设备类型(Android/iOS/Web) │ content = byte[] ← 消息体 │ traceId / trace ← 追踪控制 │ tags / condition ← 条件过滤(可选) │ FLAG_AUTO_ACK ← 需要 ACK 时设置 ▼ Gateway(Netty EventLoop) │ SingleUserPushTask.doSend() │ → PushMessage.build(conn).setContent().send() │ → channel.writeAndFlush() ▼ 客户端收到 PUSH 命令帧 │ → 回 AckMessage { traceId, userId, trace } │ sessionId 与 GatewayPushMessage.sessionId 对应,用于 AckTaskQueue 匹配 ``` `GatewayPushMessage.isBroadcast()` 返回 `userId == null`,单播时 userId 非空,Gateway 通过 `LocalRouterManager.lookup(userId, clientType)` 精确定位连接。 #### Topic 推送 ``` alloc(Tomcat 线程) │ TopicPushHandler 从 Redis SMEMBERS topic:{topic} 获取订阅用户列表 │ for each userId → 构建 GatewayPushMessage(userId 非 null) │ → PushWorkerPool(alloc Disruptor)异步分发 ▼ (每个 userId 走独立的单播推送链路,GatewayPushMessage 结构与单播相同) ``` Topic 推送本质上是批量单播,每条 `GatewayPushMessage` 都携带具体的 `userId`,不使用广播标志。`taskId` 字段在 Topic 场景下为 null(每条独立计数),流控使用 `FastFlowControl`。 #### 广播推送 ``` alloc(Tomcat 线程) │ 构建 GatewayPushMessage │ userId = null ← null 标识广播 │ taskId = "broadcast-xxx" ← 广播任务 ID,用于 RedisFlowControl 跨节点计数 │ tags / condition ← 条件过滤,只推送满足条件的连接 ▼ Gateway(push-disruptor-worker 线程) │ BroadcastPushTask.run() │ 遍历 LocalRouterManager 全部连接 │ → GatewayPushMessage.getCondition().test(conn.getSessionContext()) │ TagsCondition:匹配 BindUserMessage 中注册的 tags │ ScriptCondition:执行 condition 脚本 │ AwaysPassCondition:无条件全量推送 │ → 通过条件的连接 → doSend(conn) │ → FastFlowControl.checkQps() 控制发送速率 ``` 广播时 `GatewayPushMessage.userId == null`,`isBroadcast()` 返回 `true`,`BroadcastPushTask` 不做路由查找,直接遍历本地所有连接。`tags` 和 `condition` 字段在广播场景下用于精准过滤,避免全量推送。 --- #### 消息字段与 Command 对照 | 消息类 | Command | 方向 | 关键字段 | |--------|---------|------|----------| | `HandshakeMessage` | `HANDSHAKE(2)` | Client→Server | deviceId, clientKey(RSA公钥加密的AES密钥), iv, timestamp | | `HandshakeOkMessage` | `HANDSHAKE(2)` | Server→Client | serverKey(AES密钥), sessionId, heartbeat, expireTime | | `FastConnectMessage` | `FAST_CONNECT(7)` | Client→Server | sessionId, deviceId, deviceToken | | `FastConnectOkMessage` | `FAST_CONNECT(7)` | Server→Client | heartbeat | | `BindUserMessage` | `BIND(5)` | Client→Server | userId, token, tags, clientId | | `GatewayPushMessage` | `GATEWAY_PUSH(16)` | alloc→Gateway | userId(null=广播), content, taskId, tags, condition, traceId, trace | | `AckMessage` | `ACK(23)` | Client→Server | traceId, userId, trace | | `KickUserMessage` | `KICK(13)` | Server→Client | deviceId, userId | | `GatewayKickUserMessage` | `GATEWAY_KICK(14)` | Gateway→Gateway | userId, deviceId, connId, targetServer, targetPort | | `OkMessage` | `OK(11)` | Server→Client | cmd(原命令), code, data | | `ErrorMessage` | `ERROR(10)` | Server→Client | cmd, code, reason, data | --- ## 5. Redisson 服务发现 ### 5.1 整体设计 本版本以 Redis(Redisson 客户端)替代 ZooKeeper 实现服务注册与发现(SRD),无需引入额外组件,复用已有 Redis 基础设施。 ``` ┌─────────────────────────────────────────────────────────────────┐ │ ServiceRegistry / ServiceDiscovery │ │ (SPI 接口层) │ └──────────────────────────┬──────────────────────────────────────┘ │ SPI order=1 ▼ ┌─────────────────────────────────────────────────────────────────┐ │ RedisServiceRegistryAndDiscovery (单例) │ │ │ │ register() → RMapCache.put(nodeId, nodeJson, TTL=30s) │ │ deregister() → RMapCache.remove(nodeId) │ │ lookup() → RMapCache.values() → List │ │ subscribe() → RTopic.addListener() │ │ renewAll() → 每 15s 刷新 TTL(srd-heartbeat 线程) │ └──────────────────────────┬──────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ RedissonManager │ │ 单节点 / 集群 / 哨兵 三种模式,统一 RedissonClient 入口 │ └─────────────────────────────────────────────────────────────────┘ ``` **SPI 工厂**:`RedisServiceRegistryFactory` 和 `RedisServiceDiscoveryFactory` 均标注 `@Spi(order=1)`,优先级高于默认实现,无需修改调用方代码。 --- ### 5.2 Redis 数据结构 | 用途 | Key 格式 | 类型 | TTL | |------|----------|------|-----| | 节点注册表 | `srd:{serviceName}` | `RMapCache` | 30s(非持久节点) | | 变更事件 | `srd:event:{serviceName}` | `RTopic` | 无(Pub/Sub) | **服务名常量**(`ServiceNames`): | 常量 | 值 | 说明 | |------|----|------| | `CONN_SERVER` | `/cluster/cs` | 长连接服务节点 | | `WS_SERVER` | `/cluster/ws` | WebSocket 服务节点 | | `GATEWAY_SERVER` | `/cluster/gs` | Gateway 服务节点 | **nodeJson 示例**: ```json { "host": "10.0.0.1", "port": 3001, "name": "/cluster/gs", "nodeId": "550e8400-e29b-41d4-a716-446655440000", "persistent": false } ``` --- ### 5.3 节点注册流程 ``` MPushServer 启动 │ ▼ ServerBoot.start() │ server.start() 成功后 ▼ ServiceRegistryFactory.create().register(node) │ node = ServerNodes.gs() │ host = gateway-server-register-ip(配置)或本机 IP │ port = gateway-server-port(默认 3001) │ persistent = false(临时节点,依赖 TTL) ▼ RedisServiceRegistryAndDiscovery.register(node) │ RMapCache.put(nodeId, nodeJson, 30s) │ RTopic.publish(SrdEvent{ADDED, nodeJson}) │ localNodes.put(nodeId, node) ← 记录本地节点,供心跳续约 ``` **关闭时注销**: ``` ServerBoot.stop() │ ▼ ServiceRegistryFactory.create().deregister(node) │ RMapCache.remove(nodeId) │ RTopic.publish(SrdEvent{REMOVED, nodeJson}) │ localNodes.remove(nodeId) ``` --- ### 5.4 心跳续约机制 `srd-heartbeat` 守护线程每 **15s**(= TTL/2)执行一次 `renewAll()`: ``` renewAll() │ for each (nodeId, node) in localNodes │ ├─ resolveCurrentIp(node) │ 优先读 gateway-server-register-ip / connect-server-register-ip │ 无配置则 ConfigTools.getLocalIp() │ ├─ IP 未变化 │ RMapCache.get(nodeId) != null │ → RMapCache.put(nodeId, existing, 30s) ← 刷新 TTL │ RMapCache.get(nodeId) == null(Redis 重启等导致丢失) │ → RMapCache.put(nodeId, nodeJson, 30s) ← 重新注册 │ → RTopic.publish(SrdEvent{ADDED, nodeJson}) │ └─ IP 已变化(k8s IP 漂移) deregister(oldNode) node.setHost(newIp) register(node) ← 以新 IP 重注册 ``` --- ### 5.5 服务发现与变更订阅 **主动查询**(`lookup`): ```java // 调用方示例:SingleUserPushTask.isGatewayNodeAlive() List nodes = mPushServer.getDiscovery().lookup(ServiceNames.GATEWAY_SERVER); ``` 内部实现: ```java // RedisServiceRegistryAndDiscovery.lookup() RMapCache map = r.getMapCache("srd:/cluster/gs", StringCodec.INSTANCE); for (String json : map.values()) { CommonServiceNode node = Jsons.fromJson(json, CommonServiceNode.class); node.setServiceName(serviceName); result.add(node); } ``` **变更订阅**(`subscribe`): ```java // 订阅 Gateway 节点变更 discovery.subscribe(ServiceNames.GATEWAY_SERVER, new ServiceListener() { public void onServiceAdded(String name, ServiceNode node) { /* 新节点上线 */ } public void onServiceUpdated(String name, ServiceNode node) { /* 节点信息更新 */ } public void onServiceRemoved(String name, ServiceNode node) { /* 节点下线 */ } }); ``` 底层通过 `RTopic` 接收 `SrdEvent` JSON,反序列化后分发给监听器。 --- ### 5.6 k8s 重启场景下的节点存活校验 在 k8s 环境中,Pod 重启后 IP 可能复用,旧路由可能指向已销毁的 Pod。`SingleUserPushTask.pushRemote()` 在 redirect 前执行存活校验: ```java private boolean isGatewayNodeAlive(String host, int port) { try { List nodes = mPushServer.getDiscovery() .lookup(ServiceNames.GATEWAY_SERVER); if (nodes == null || nodes.isEmpty()) return true; // 发现服务不可用时放行 for (ServiceNode node : nodes) { if (node.getHost().equals(host) && node.getPort() == port) return true; } return false; } catch (Exception e) { return true; // 发现服务异常时放行,避免误杀 } } ``` **校验失败处理**: ``` isGatewayNodeAlive() == false │ ▼ RemoteRouterManager.unRegister(userId, clientType) ← 清除脏路由 │ ▼ GatewayPushListener.onOffline(message) ← 通知消息离线 ``` --- ### 5.7 分布式锁 基于 Redisson `RLock` 实现,供需要跨节点互斥的场景使用: ```java // 使用示例 try (DistributedLockHandle handle = RedissonDistributedLock.I .tryLock("my-resource", 3, 30, TimeUnit.SECONDS, false)) { if (handle != null) { // 临界区 } } ``` | 参数 | 说明 | |------|------| | `key` | 锁名称,实际 Redis key = `mpush:lock:{key}` | | `waitTime` | 最长等待时间 | | `leaseTime` | 锁持有时间(-1 表示 watchdog 自动续期) | | `isFair` | `true` 使用 `RFairLock`,保证 FIFO 顺序 | --- ### 5.8 RedissonManager 初始化 `RedissonManager` 在系统启动时根据 `mp.redis.cluster-model` 配置选择连接模式: ```java // 单节点 config.useSingleServer().setAddress("redis://127.0.0.1:6379"); // 集群 config.useClusterServers().addNodeAddress("redis://node1:6379", "redis://node2:6379"); // 哨兵 config.useSentinelServers() .setMasterName("mymaster") .addSentinelAddress("redis://sentinel1:26379"); ``` 密码支持 DES 加密存储,启动时自动解密。 --- ## 6. 关键技术方案 ### 6.1 Disruptor 推送任务调度 Gateway 层使用 Disruptor WorkerPool 替代传统线程池,彻底消除锁竞争。 ``` Tomcat 线程(Producer) │ ringBuffer.tryNext() — CAS,纳秒级 ▼ RingBuffer(大小 1M,2 的幂次) │ ▼ push-disruptor-worker-{0..N}(WorkHandler,数量 = CPU 核数) │ onEvent() 仅做一件事:task.getExecutor().execute(task) ▼ Netty EventLoop(与目标 Channel 绑定) │ run() 执行路由查找 + 消息发送 ``` **降级策略**:RingBuffer 满时(`InsufficientCapacityException`)直接将 task 提交到对应 EventLoop,不丢消息。 **等待策略**(可配置): | 策略 | 延迟 | CPU 占用 | 适用场景 | |------|------|----------|----------| | `sleeping` | 低 | 低 | 默认,推荐生产 | | `yielding` | 极低 | 中 | 低延迟场景 | | `busy-spin` | 最低 | 高 | 极致低延迟,独占 CPU | | `blocking` | 高 | 最低 | 低吞吐量场景 | --- ### 6.2 全链路消息追踪(traceId) 每条消息携带 `traceId`(由调用方传入或系统生成),各节点异步写 Redis Hash 记录状态。 ``` Redis Key: trace:{traceId} Redis Field: {userId} Redis Value: ACCEPTED | DISPATCHED | DELIVERED | ACK | TIMEOUT | FAILED ``` 追踪开关随消息协议传递(`PushContext.trace` 字段),不依赖全局配置,可按消息粒度控制。 所有 Redis 写操作使用异步 API(`RMapAsync`),不阻塞 Netty EventLoop 推送主路径。 **查询接口**:`GET /exec?method=queryTrace&traceId={traceId}` 返回该消息在各节点的状态快照。 --- ### 6.3 ACK 可靠投递与重试 ``` doSend(conn) │ channel.writeAndFlush() 成功 ▼ AckTaskQueue.add(messageId, timeoutMills) │ ScheduledExecutorService.schedule(ackTask, timeout) │ ├─ 客户端在超时前回 AckMessage │ AckTask.onResponse() → record(ACK) → cancel scheduled task │ └─ 超时触发 AckRetryScheduler.retry() attemptNumber < maxRetries(3) → delayTask(attemptNumber * 2s, newSingleUserPushTask) attemptNumber >= maxRetries → record(TIMEOUT) → GatewayPushListener.onTimeout() ``` 重试间隔采用线性退避:第 1 次 2s,第 2 次 4s,第 3 次 6s。 --- ### 6.4 双层路由管理 | 层次 | 实现 | 存储 | 查找复杂度 | |------|------|------|------------| | 本地路由 | `LocalRouterManager` | JVM 内 `ConcurrentHashMap` | O(1) | | 远程路由 | `RemoteRouterManager` | Redis Hash | 一次网络 RTT | **查找策略**:优先本地路由,未命中再查 Redis 远程路由。命中本地路由时整个推送路径不产生任何 Redis 读操作。 **脏路由清理**:远程路由指向本机但本地无连接时,自动调用 `unRegister()` 清除脏数据,防止消息循环重定向。 **k8s 重启保护**:redirect 前通过 Redisson SRD 校验目标节点是否仍在线,避免消息投递到已销毁的 Pod(详见第 5 章)。 --- ### 6.5 Topic 异步推送 Topic 推送通过 alloc 侧独立 Disruptor 与 HTTP 线程解耦: ``` HTTP POST /exec { method:topicPush, topic, msgbody } │ ▼ TopicPushHandler → PushWorkerPool.addTask(TopicPushTask) │ alloc Disruptor RingBuffer(大小 1M,消费者 4 线程) ▼ TopicPushExecutor.execute(TopicPushTask) │ Redis SMEMBERS topic:{topic} → 获取订阅用户列表 │ for each userId → PushSender.send(PushContext) ▼ (后续走单播推送链路) ``` 订阅关系存储:`Redis Set key=topic:{topic} members={userId}` --- ### 6.6 流量控制 系统在两个层面实施流控: **Gateway 层(per-connection 令牌桶)** ```java // SingleUserPushTask.tryPushLocal() if (!flowControl.checkQps()) { delayTask(100ms, this); // 延迟重试,不丢消息 return; } ``` **全局流控配置**(`reference.conf`): | 类型 | 默认 QPS | 说明 | |------|----------|------| | 全局单播 | 5000/s | 非广播推送全局限速 | | 广播 | 3000/s,上限 10w | 单次广播任务内限速 | --- ### 6.7 Fast Connect 快速重连 #### 背景与价值 移动端网络切换(WiFi ↔ 4G)、短暂断网、App 切换到前台等场景下,TCP 连接会频繁断开重建。标准握手流程需要完整的 RSA 密钥交换 + 身份认证,耗时较长且消耗服务端 CPU。 **Fast Connect** 通过复用上次握手生成的 `ReusableSession`(含 AES 会话密钥),跳过 RSA 协商阶段,将重连耗时从数百毫秒降低到一次 Redis 查询的 RTT 级别,同时大幅减少服务端 CPU 开销。 #### 流程 ``` 客户端断线重连 │ ├─ 本地有缓存的 sessionId + AES 密钥? │ 是 → 发送 FAST_CONNECT 帧 │ 否 → 走完整 HANDSHAKE 流程 │ ▼ [FastConnectMessage] { sessionId, deviceId, deviceToken, appName, minHeartbeat, maxHeartbeat } │ ▼ FastConnectHandler.handle() │ ├─ ReusableSessionManager.querySession(sessionId) │ → Redis GET session:{sessionId} │ ├─ session == null → SESSION_EXPIRED 错误,客户端降级走完整握手 ├─ deviceId 不匹配 → INVALID_DEVICE 错误 │ └─ 校验通过 ├─ 重新计算心跳间隔 ├─ 发送 FastConnectOkMessage(明文,无需 RSA 加密) ├─ connection.setSessionContext(session.context) ← 恢复 AES 会话密钥 └─ ReusableSessionManager.cacheSession(session) ← 刷新 TTL,防止频繁重连耗尽有效期 ``` #### Session 存储结构 ``` Redis Key: session:{sessionId} Redis Value: {osName},{osVersion},{clientVersion},{deviceId},{aesKey},{aesIv} TTL: mp.core.session-expired-time(默认 30 分钟) ``` `sessionId` 由首次握手成功后服务端生成(`MD5(deviceId + timestamp)`),随 `HandshakeOkMessage` 下发给客户端,客户端持久化到本地存储。 #### 对项目的价值 | 维度 | 无 Fast Connect | 有 Fast Connect | |------|----------------|----------------| | 重连耗时 | RSA 解密 + 完整握手,数百 ms | 一次 Redis GET,< 10ms | | 服务端 CPU | RSA 运算(高开销) | 无 RSA,仅 Redis 查询 | | 用户体验 | 网络切换后明显卡顿 | 近乎无感知重连 | | 适用场景 | 首次连接 | 30 分钟内断线重连 | --- ### 6.8 FastFlowControl 广播流控 广播推送需要遍历所有本地连接逐一发送,单次任务可能涉及数十万条消息。`FastFlowControl` 是专为广播场景设计的纯内存令牌桶,相比 `RedisFlowControl` 无任何网络开销。 ```java // PushCenter.push() — 广播任务创建时初始化 FlowControl flowControl = (message.getTaskId() == null) ? new FastFlowControl(limit, max, duration) // 新广播任务,纯内存令牌桶 : new RedisFlowControl(message.getTaskId(), max); // 续传任务,Redis 计数保证跨节点一致 ``` **`FastFlowControl.checkQps()` 逻辑**: ``` checkQps() ├─ count < limit → 直接放行,count++,total++ ├─ total > maxLimit → 抛出 OverFlowException,终止广播任务 └─ 当前窗口已满 elapsed > duration? 是 → reset()(count=0,刷新窗口起点),放行 否 → 返回 false,调用方 delayTask(getDelay(), task) 延迟重试 ``` **与 `RedisFlowControl` 对比**: | 维度 | `FastFlowControl` | `RedisFlowControl` | |------|-------------------|--------------------| | 存储 | JVM 内存 | Redis 原子计数器 | | 网络开销 | 无 | 每次 checkQps 一次 Redis INCR | | 跨节点一致 | 否(单节点有效) | 是 | | 适用场景 | 新广播任务(单节点发起) | 续传任务(需跨节点限速) | | 精度 | 纳秒级窗口 | 秒级窗口 | --- ## 7. 消息推送全链路 ### 7.1 链路总览 从 HTTP 接口收到推送请求,到消息通过 WebSocket 到达前端,完整经过以下层次: ``` 外部调用方 / mpush-client-js(ThPush.push()) │ HTTP POST /exec { method:push, userId, msgbody, ack, trace, traceId } │ ── 或 ── │ WebSocket 帧(ThPush 内部通过 XhrPool 封装后发往 alloc) ▼ [alloc 模块 — Tomcat 线程] │ AllocController → PushHandler → PushService → PushSender.send() ▼ [mpush-core PushCenter — Tomcat 线程] │ new SingleUserPushTask() → DisruptorPushTaskExecutor.addTask() ▼ [Disruptor RingBuffer — push-disruptor-worker 线程] │ PushTaskEventHandler.onEvent() → task.getExecutor().execute(task) ▼ [SingleUserPushTask.run() — Netty EventLoop 线程] │ LocalRouterManager.lookup() → 找到 Connection │ flowControl.checkQps() → 流控通过 │ PushMessage.build() + send() → channel.writeAndFlush() ▼ [Netty Pipeline — Netty EventLoop 线程] │ PacketEncoder 编码(序列化 + 加密/压缩)→ 写入 TCP 发送缓冲区 ▼ mpush-client-js 浏览器端收到 PUSH 命令帧 │ ThPush 触发 onMessage 回调 │ MsgDedup 基于 traceId 去重 │ OfflineQueue 处理离线期间积压消息 ``` --- ### 7.2 HTTP 接收层 线程:Tomcat 线程池 ``` POST /exec └─ AllocController.exec() ├─ token 校验(TokenAuthenticator) ├─ HandlerRegistry.dispatch("push") └─ PushHandler.handler() ├─ PushRequest.ofPush(params) // 解析 userId / msgbody / ack / trace / traceId └─ PushService.push(req, params) ├─ trace=true → RedisMessageTracer.record(traceId, "_meta", ACCEPTED) └─ pushSender.send(PushContext) // 非阻塞,立即返回 FutureTask ``` `PushSender.send()` 提交完成后 HTTP 线程立即返回 200,不等待推送结果。 --- ### 7.3 任务入队:Disruptor Producer 线程:Tomcat 线程(仍在 `PushSender.send()` 调用栈内) ```java // PushCenter.push() addTask(new SingleUserPushTask(mPushServer, message, globalFlowControl)); // DisruptorPushTaskExecutor.addTask() try { sequence = ringBuffer.tryNext(); // CAS 无锁写入,纳秒级 ringBuffer.get(sequence).task = task; ringBuffer.publish(sequence); } catch (InsufficientCapacityException e) { task.getExecutor().execute(task); // RingBuffer 满时降级:直接提交 EventLoop,不丢消息 } ``` `SingleUserPushTask` 构造时记录 `DISPATCHED` 状态(trace=true 时)并初始化 `TimeLine`。 --- ### 7.4 Disruptor 消费与分发 线程:push-disruptor-worker-{n}(独立线程池,数量 = CPU 核数) ```java // PushTaskEventHandler.onEvent() — WorkHandler,执行时间纳秒级 public void onEvent(PushTaskEvent event) throws Exception { PushTask task = event.task; event.clear(); task.getExecutor().execute(task); // 提交到 task 绑定的 Netty EventLoop,自己不执行业务 } ``` `task.getExecutor()` 返回的是消息对应 `Connection` 所绑定的那个 Netty EventLoop: ```java // SingleUserPushTask.getExecutor() return ((Message) message).getConnection().getChannel().eventLoop(); ``` 这保证同一个 Channel 的所有操作始终在同一个 EventLoop 线程上执行,彻底无锁。 --- ### 7.5 路由查找与消息发送 线程:Netty EventLoop(与目标 Channel 绑定的那个) `SingleUserPushTask.run()` 执行双层路由查找: ``` run() ├─ isTimedOut() // 超时检查 │ ├─ tryPushLocal() // 优先查本地路由 │ ├─ LocalRouterManager.lookup(userId, clientType) │ │ └─ ConcurrentHashMap 查找 → LocalRouter(Connection) │ ├─ conn.isConnected() // 连接有效性 │ ├─ channel.isWritable() // TCP 缓冲区背压检查,满则直接 fail │ ├─ flowControl.checkQps() // 令牌桶流控,超限则 delayTask 延迟重试 │ └─ doSend(conn) // ✅ 命中本机,直接发送 │ └─ pushRemote() // 本地未命中,查远程路由 ├─ RemoteRouterManager.lookup() // 查 Redis,得到目标节点 host:port ├─ isThisMachine() // 指向本机但无本地连接 → 清理脏路由 → offline ├─ isGatewayNodeAlive() // 服务发现校验节点存活(k8s 重启场景) └─ onRedirect() // 通知 redirect,由目标节点处理 ``` `doSend()` 构建并发送 `PushMessage`: ```java private void doSend(Connection conn) { PushMessage pushMessage = PushMessage.build(conn) .setContent(message.getContent()) .setTraceId(message.getTraceId()); pushMessage.getPacket().addFlag(message.getFlags()); messageId = pushMessage.getSessionId(); pushMessage.send(this); // this 是 ChannelFutureListener,TCP 写完后回调 } ``` --- ### 7.6 Netty Channel 写出 线程:Netty EventLoop(同上) ```java // BaseMessage.send() public void send(ChannelFutureListener listener) { encode(); // 序列化消息体,按 session 加密/压缩 connection.send(packet, listener); } // NettyConnection.send() public ChannelFuture send(Packet packet, ChannelFutureListener listener) { ChannelFuture future = channel.writeAndFlush(packet.toFrame(channel)) .addListener(this); if (listener != null) future.addListener(listener); // 背压处理:TCP 写缓冲区满时,非 EventLoop 线程短暂等待 100ms if (!channel.isWritable() && !channel.eventLoop().inEventLoop()) { future.awaitUninterruptibly(100); } return future; } ``` `channel.writeAndFlush()` 是 Netty 的异步写,数据经过 Pipeline 的 `PacketEncoder` 编码后写入内核 TCP 发送缓冲区,由内核负责实际网络传输。 --- ### 7.7 ACK 回调 线程:Netty EventLoop(ChannelFuture 回调,仍在同一 EventLoop) `SingleUserPushTask` 实现 `ChannelFutureListener`,TCP 写入完成后触发: ``` operationComplete(future) ├─ future.isSuccess() == false │ └─ GatewayPushListener.onFailure() → record(FAILED) │ ├─ !message.isNeedAck() │ └─ GatewayPushListener.onSuccess() → record(DELIVERED) → 结束 │ └─ 需要 ACK ├─ record(traceId, userId, DELIVERED) └─ addAckTask(messageId) └─ AckTaskQueue.add(task, timeoutMills) └─ ScheduledExecutorService.schedule(task, timeout) ├─ 客户端回 AckMessage → AckTask.onResponse() → record(ACK) └─ 超时 → AckRetryScheduler → delayTask(attemptNumber*2s, newTask) ``` --- ### 7.8 线程模型总览 | 阶段 | 线程 | 说明 | |------|------|------| | HTTP 接收 | Tomcat 线程池 | 接收请求、token 校验、构建 PushContext | | 任务入队 | Tomcat 线程(同上) | `ringBuffer.tryNext()` CAS 写入,纳秒级 | | 任务分发 | push-disruptor-worker-{n} | 从 RingBuffer 取任务,提交到 EventLoop,纳秒级 | | 路由查找 + 发送 | Netty EventLoop | 查本地/远程路由、`channel.writeAndFlush()` | | ACK 回调 | Netty EventLoop(同上) | `ChannelFuture` 回调、ACK 等待注册 | | ACK 超时/重试 | AckTaskQueue 定时线程 | 超时检测、重试调度 | 关键设计:**同一个 Channel 的路由查找、消息编码、writeAndFlush、ChannelFuture 回调全部在同一个 Netty EventLoop 线程上串行执行,无任何锁竞争。** --- ## 8. 核心数据流 ### 8.1 单播推送完整链路 ``` 业务方 HTTP POST /exec │ ▼ AllocController → PushHandler → PushService │ trace=true: RedisMessageTracer.record(traceId, ACCEPTED) │ ▼ PushSender.send(PushContext) │ 构建 SingleUserPushTask │ ▼ PushCenter.push() │ DisruptorPushTaskExecutor.addTask() │ ringBuffer.tryNext() → publish [或降级直接 execute] │ trace: record(traceId, DISPATCHED) │ ▼ PushTaskEventHandler.onEvent() [push-disruptor-worker-N] │ task.getExecutor().execute(task) → 提交到目标 Channel 的 EventLoop │ ▼ SingleUserPushTask.run() [Netty EventLoop] ├─ tryPushLocal() │ LocalRouterManager.lookup(userId, clientType) │ → 命中 → flowControl.checkQps() → doSend(conn) │ └─ pushRemote() [本地未命中] RemoteRouterManager.lookup() → Redis 查路由 isGatewayNodeAlive() → Redisson SRD 校验节点存活 → redirect 到目标 Gateway 节点 │ ▼ channel.writeAndFlush(PushMessage) [Netty EventLoop] │ PacketEncoder 编码 → TCP 发送缓冲区 │ ▼ ChannelFutureListener.operationComplete() ├─ 失败 → record(FAILED) ├─ 无需 ACK → record(DELIVERED) → 结束 └─ 需要 ACK → record(DELIVERED) → AckTaskQueue.add(timeout) ├─ 客户端回 AckMessage → record(ACK) → 结束 └─ 超时 → AckRetryScheduler → 重试(最多 3 次)→ record(TIMEOUT) ``` --- ### 8.2 消息状态流转 ``` ACCEPTED HTTP 层收到推送请求 │ ▼ DISPATCHED 任务写入 Disruptor RingBuffer │ ▼ DELIVERED TCP 写入成功(channel.writeAndFlush 回调) │ ├──────────────────────────────────────┐ ▼ ▼ ACK TIMEOUT 客户端回 AckMessage ACK 等待超时(重试耗尽) │ ▼ FAILED(任意阶段写出失败) ``` 所有状态均写入 Redis Hash:`key = trace:{traceId} field = {userId} value = {status}` --- ## 9. 百万并发能力评估 | 资源 | 单机上限 | 说明 | |------|----------|------| | TCP 长连接 | ~100 万 | 受限于 fd 数量(`ulimit -n`)和内存,每连接约 10KB | | 推送 QPS | ~5 万/s | Disruptor + Netty EventLoop 无锁路径,单核约 5k QPS | | Redis 路由查询 | ~10 万/s | 远程路由命中时每条消息一次 Redis GET | | ACK 并发等待 | ~50 万 | `ScheduledExecutorService` 定时任务,内存约 1KB/task | **水平扩展**:Gateway 节点无状态,通过 Redisson SRD 自动注册/发现,alloc 层通过远程路由将消息路由到正确节点,线性扩容。 --- ## 10. 架构改善空间 | 方向 | 现状 | 建议 | |------|------|------| | 离线消息 | 配置项存在但默认关闭 | 接入 MQ(Kafka/RocketMQ)持久化离线消息 | | 服务发现订阅 | `unsubscribe` 未实现 | 补全 RTopic listener 生命周期管理 | | 分布式锁 watchdog | leaseTime=-1 时无自动续期 | 使用 Redisson 内置 watchdog 机制 | | 消息追踪存储 | Redis Hash,无过期 | 为 trace key 设置 TTL(如 7 天) | | 广播推送 | 遍历本地路由,单节点 | 结合 SRD 节点列表实现跨节点广播 | | 监控指标 | 日志为主 | 接入 Micrometer + Prometheus | --- ## 11. 配置参考 ### 核心推送配置 ```hocon mp.push { ack-max-retries = 3 # ACK 最大重试次数 ack-status-record-enabled = true # 是否将 ACK 状态写入 Redis executor-type = disruptor # 推送执行器:netty / disruptor / jdk disruptor { ring-buffer-size = 1048576 # RingBuffer 容量(必须为 2 的幂次) wait-strategy = "sleeping" # sleeping / yielding / busy-spin / blocking thread-count = 0 # 0 = CPU 核数 } flow-control { global { limit = 5000, duration = 1s } broadcast { limit = 3000, max = 100000, duration = 1s } } } ``` ### Redis / Redisson 配置 ```hocon mp.redis { cluster-model = single # single / cluster / sentinel sentinel-master = "" # 哨兵模式主节点名 nodes = ["127.0.0.1:6379"] password = "" # 支持 DES 加密 } ``` ### 网络配置 ```hocon mp.net { connect-server-port = 3000 # 长连接服务端口(公网) gateway-server-port = 3001 # Gateway 服务端口(内网) ws-server-port = 8088 # WebSocket 端口(0 = 禁用) admin-server-port = 3002 # 管理控制台端口 gateway-server-register-ip = "" # Gateway 注册到 SRD 的 IP(默认本机 IP) connect-server-register-ip = "" # ConnServer 注册到 SRD 的 IP(默认公网 IP) } ``` ### alloc Disruptor 配置 ```hocon mp.alloc.disruptor { ring-buffer-size = 1048576 # Topic 推送 RingBuffer 容量 wait-strategy = "sleeping" consumer-count = 4 # 消费者线程数 } ``` ### 日志说明 | Logger | 说明 | |--------|------| | `srd-mpush.log` | 服务注册/发现事件(注册、注销、心跳、IP 变化) | | `push-mpush.log` | 推送链路日志(路由查找、redirect、存活校验) | | `cache-mpush.log` | Redis 操作日志 | | `info-mpush.log` | 系统启动、连接事件 | | `heartbeat-mpush.log` | 心跳超时检测 |