# message_middleware
**Repository Path**: zhoukai1992/message_middleware
## Basic Information
- **Project Name**: message_middleware
- **Description**: 支持Kafka、ActiveMQ等多种主流消息中间件的无缝动态切换
- **Primary Language**: Unknown
- **License**: AGPL-3.0
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 3
- **Forks**: 0
- **Created**: 2025-11-27
- **Last Updated**: 2026-02-10
## Categories & Tags
**Categories**: Uncategorized
**Tags**: Java, Kafka, ActiveMq, SpringBoot, RabbitMQ
## README
# 消息中间件平台
## 介绍
这是一个基于 Spring Boot 开发的消息中间件平台,提供了统一的消息发送和接收接口,支持 Kafka、ActiveMQ 和 RabbitMQ
等多种消息中间件的动态切换。通过简单的配置和注解,开发者可以轻松实现消息的生产和消费,无需关心底层消息中间件的实现细节。
### 核心功能
- **统一消息接口**:提供统一的消息发送和接收接口,屏蔽不同消息中间件的差异
- **动态中间件切换**:支持在运行时动态切换 Kafka、ActiveMQ 和 RabbitMQ 等消息中间件
- **基于注解的开发**:使用简单的注解即可实现消息的生产和消费
- **异步消息处理**:支持异步消息发送和接收
- **动态监听器注册**:支持动态注册和注销消息监听器
- **消息增强机制**:支持自定义消息增强器,可在消息发送前对消息进行增强处理
- **自定义错误处理**:支持灵活的错误处理机制,可配置重试、死信队列、告警通知等策略
- **全局异常处理**:提供统一的异常处理框架,支持多种错误处理策略
## 软件架构

## 工作原理
### Kafka 实现方式
在 Kafka
中,消费者组是一个核心概念。每个消费者组通过 [group.id](message/src/main/java/com/zhoukai/message/config/KafkaConfig.java#L_44-L_44)
进行标识,组内的消费者共同消费主题的数据。Kafka 保证每条消息只会被同一个消费者组内的一个消费者消费,从而实现负载均衡。
当有多个消费者组订阅同一个主题时,每个消费者组都会收到完整的消息副本,实现广播语义。
### RabbitMQ 实现方式
本项目中,为了实现类似 Kafka 的消费者组概念,采用了以下策略:
1. **队列设计**:为每个消费者组创建独立的队列,例如 `ORDER_CREATED_GROUP_1` 和 `ORDER_CREATED_GROUP_2`
2. **交换器绑定**:所有队列都绑定到同一个 DirectExchange 上,并使用相同的路由键
3. **消息发送**:当发送消息到某个主题时,会查找所有订阅该主题的消费者组,并将消息发送到每个组对应的队列中
4. **消费行为**:
- 相同组内:多个消费者监听同一队列,实现轮询消费(竞争消费者)
- 不同组间:各自监听自己的队列,实现广播消费(每个组都能收到完整消息)
核心实现类包括:
- [RabbitMQConfig](message/src/main/java/com/zhoukai/message/config/RabbitMQConfig.java):配置交换器、队列和绑定关系
- [RabbitMessageProcessor](message/src/main/java/com/zhoukai/message/processor/RabbitMessageProcessor.java)
:处理消息发送逻辑,确保消息能发送到所有相关消费者组
### ActiveMQ 实现方式
ActiveMQ 通过 Virtual Topics(虚拟主题)实现消费者组语义:
1. 生产者发送消息到虚拟主题(如 `VirtualTopic.Orders`)
2. 消费者从各自的队列中消费消息(如 `Consumer.Group1.VirtualTopic.Orders`)
3. 同一组内的消费者共享队列实现负载均衡
4. 不同组的消费者使用不同的队列实现消息广播
## 快速开始
### 环境要求
- JDK 1.8+
- Maven 3.5+
- Spring Boot 2.1.6.RELEASE
- Kafka 2.0.1+ (可选)
- ActiveMQ 2.6.4+ (可选)
- RabbitMQ 3.8+ (可选)
### 安装步骤
1. **克隆项目**
```bash
git clone <项目地址>
cd message_middleware
```
2. **构建项目**
```bash
mvn clean install
```
3. **运行示例**
```bash
cd use-samples
mvn spring-boot:run
```
## 使用说明
### 1. 添加依赖
```xml
com.zhoukai
message
2.1.0-SNAPSHOT
```
### 2. 添加消息中间件依赖(重要!)
根据你要使用的消息中间件,在你的项目中添加相应的依赖:
**Kafka 依赖:**
```xml
org.springframework.kafka
spring-kafka
```
**ActiveMQ 依赖:**
```xml
org.springframework.boot
spring-boot-starter-artemis
```
**RabbitMQ 依赖:**
```xml
org.springframework.boot
spring-boot-starter-amqp
```
> ⚠️ **重要提示**:
> - message 模块本身不传递这些依赖(设置为 provided 作用域)
> - 使用方必须显式添加所需的消息中间件依赖
> - 如果未添加任何依赖,应用启动时会报错并提示具体解决方案
### 3. 配置文件
在 `application.yml` 中配置消息中间件:
#### Kafka 配置
```yaml
app:
message:
middleware:
active: kafka
async-enabled: true
spring:
application:
name: message-middleware
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my-consumer-group
producer:
retries: 3
```
#### ActiveMQ 配置
```yaml
app:
message:
middleware:
active: activemq
async-enabled: true
spring:
application:
name: message-middleware
artemis:
host: localhost
port: 61616
user: admin
password: admin
```
#### RabbitMQ 配置
```yaml
app:
message:
middleware:
active: rabbitmq
async-enabled: true
spring:
application:
name: message-middleware
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
```
### 3. 消息生产者
使用 `@MessageEvent` 注解标记方法,当方法执行时,会自动将返回结果发送到指定主题:
```java
@Service
@Slf4j
public class OrderService {
@MessageEvent(topic = "ORDER_CREATED")
public Order createOrder(CreateOrderRequest request) {
Order order = new Order();
order.setOrderId("ORDER_" + UUID.randomUUID().toString().substring(0, 8));
order.setUserId(request.getUserId());
order.setAmount(request.getAmount());
order.setProductName(request.getProductName());
order.setQuantity(request.getQuantity());
order.setStatus("CREATED");
order.setCreateTime(new Date());
log.info("创建订单: {}", order.getOrderId());
return order;
}
}
```
### 4. 消息消费者
使用 `@MessageEventListener` 注解标记方法,当有消息到达指定主题时,会自动调用该方法:
```java
@Component
@Slf4j
public class OrderCreatedListener {
@MessageEventListener(topic = "ORDER_CREATED")
public void orderCreatedGroup1(Order order) {
log.info("收到订单创建消息-GROUP: {}, message: {}", "GROUP_1", JSONUtil.toJsonStr(order));
}
@MessageEventListener(topic = "ORDER_CREATED", groupId = "GROUP_2")
public void orderCreatedGroup2(BusinessMessage message) {
Order order = message.getMessageContent();
log.info("收到订单创建消息-GROUP: {}, message: {}", "GROUP_2", JSONUtil.toJsonStr(message));
}
}
```
### 5. 自定义错误处理
系统支持为消息生产者和消费者配置自定义错误处理器,当消息处理失败时会自动调用指定的错误处理逻辑。
#### 生产者错误处理
在 [@MessageEvent](message/src/main/java/com/zhoukai/message/annotation/MessageEvent.java) 注解中指定 `errorHandler` 属性:
```java
@Service
@Slf4j
public class OrderService {
@MessageEvent(
topic = "ORDER_CREATED",
errorHandler = CustomProducerOrderErrorHandler.class // 指定自定义错误处理器
)
public Order createOrder(CreateOrderRequest request) {
Order order = new Order();
order.setOrderId("ORDER_" + UUID.randomUUID().toString().substring(0, 8));
order.setUserId(request.getUserId());
order.setAmount(request.getAmount());
log.info("创建订单: {}", order.getOrderId());
return order;
}
}
```
#### 消费者错误处理
在 [@MessageEventListener](message/src/main/java/com/zhoukai/message/annotation/MessageEventListener.java)
注解中指定 `errorHandler` 属性:
```java
@Component
@Slf4j
public class OrderCreatedListener {
@MessageEventListener(
topic = "ORDER_CREATED",
errorHandler = CustomOrderErrorHandler.class // 指定自定义错误处理器
)
public void orderCreatedGroup1(Order order) {
// 故意抛出异常来测试错误处理
if (order.getAmount().compareTo(BigDecimal.valueOf(1000)) > 0) {
throw new RuntimeException("订单金额过高: " + order.getAmount());
}
log.info("收到订单创建消息: {}", JSONUtil.toJsonStr(order));
}
}
```
#### 自定义错误处理器实现
创建实现对应错误处理器接口的类:
##### 生产者错误处理
```java
@Component
public class CustomProducerOrderErrorHandler implements MessageProducerErrorHandler {
@Override
public void handleException(BusinessMessage> message, Exception exception,
String className, String methodName) throws Exception {
log.error("订单消息处理失败 - 类: {}, 方法: {}, 错误: {}",
className, methodName, exception.getMessage());
// 实现具体的错误处理逻辑
if (message.getMessageContent() instanceof Order) {
Order failedOrder = (Order) message.getMessageContent();
log.info("失败的订单信息: 订单ID={}, 金额={}",
failedOrder.getOrderId(), failedOrder.getAmount());
}
// 可以实现重试逻辑、死信队列、告警通知等
handleWithRetry(message, exception);
sendAlertNotification(exception.getMessage());
// 重新抛出异常让上层处理
throw exception;
}
private void handleWithRetry(BusinessMessage> message, Exception exception) {
// 实现重试逻辑
log.info("执行重试逻辑: messageId={}", message.getMessageId());
}
private void sendAlertNotification(String errorMessage) {
// 发送告警通知逻辑
log.info("发送告警通知: {}", errorMessage);
}
}
```
##### 消费者错误处理
```java
@Component
public class CustomConsumerOrderErrorHandler implements MessageConsumerErrorHandler {
@Override
public void handleException(BusinessMessage> message, Exception exception,
String className, String methodName) throws Exception {
log.error("订单消息处理失败 - 类: {}, 方法: {}, 错误: {}",
className, methodName, exception.getMessage());
// 实现具体的错误处理逻辑
if (message.getMessageContent() instanceof Order) {
Order failedOrder = (Order) message.getMessageContent();
log.info("失败的订单信息: 订单ID={}, 金额={}",
failedOrder.getOrderId(), failedOrder.getAmount());
}
// 可以实现重试逻辑、死信队列、告警通知等
handleWithRetry(message, exception);
sendAlertNotification(exception.getMessage());
// 重新抛出异常让上层处理
throw exception;
}
private void handleWithRetry(BusinessMessage> message, Exception exception) {
// 实现重试逻辑
log.info("执行重试逻辑: messageId={}", message.getMessageId());
}
private void sendAlertNotification(String errorMessage) {
// 发送告警通知逻辑
log.info("发送告警通知: {}", errorMessage);
}
}
```
#### 默认错误处理
如果不指定 `errorHandler`,系统将使用默认的重试机制:
```java
@Component
@Slf4j
public class OrderListener {
@MessageEventListener(topic = "ORDER_CREATED")
// 使用默认错误处理器(包含3次重试机制)
public void orderCreatedDefaultHandler(Order order) {
// 消费逻辑
log.info("处理订单消息: {}", order.getOrderId());
}
}
```
#### 错误处理机制特点
1. **生产者错误处理**:支持消息发送失败时的重试和异常处理
2. **消费者错误处理**:支持消息消费失败时的自定义处理逻辑
3. **重试机制**:默认提供3次重试,支持指数退避算法
4. **异常传播**:处理完错误后可以选择重新抛出异常
### 6. 消息增强功能
系统支持灵活的消息增强机制,可以在消息发送前对消息进行自定义增强处理。
#### 内置增强器
**主题前缀增强器**:为所有消息主题添加统一前缀
```yaml
app:
message:
enhancer:
topic-prefix:
enabled: true
value: "prod_"
order: 0
```
**消息追踪增强器**:为消息添加追踪信息
```yaml
app:
message:
enhancer:
tracking:
enabled: true
order: 50
```
#### 自定义增强器
开发者可以实现 `MessageEnhancer` 接口创建自定义消息增强器:
```java
@Component
public class CustomMessageEnhancer implements MessageEnhancer {
@Override
public BusinessMessage> enhanceMessage(BusinessMessage> message) {
if (message != null) {
// 添加自定义头部信息
message.addHeader("environment", "production");
}
return message;
}
@Override
public int getOrder() {
return 150; // 执行顺序
}
@Override
public boolean isEnabled() {
return true; // 是否开启增强
}
}
```
详细使用文档请参考:[消息增强接口使用指南](MessageEnhancer-Guide.md)
### 6. Controller 示例
```java
@Slf4j
@RestController
@RequestMapping("/api/orders")
public class OrderController {
@Autowired
private OrderService orderService;
@PostMapping
public Order createOrder(@RequestBody CreateOrderRequest request) {
log.info("收到创建订单请求 - 用户: {}", request.getUserId());
return orderService.createOrder(request);
}
}
```
## 核心功能详解
### 1. 注解说明
#### @MessageEvent
- **用途**:标记方法为消息生产者
- **参数**:
- `topic`:消息主题
- `middleware`:指定使用的中间件(可选)
#### @MessageEventListener
- **用途**:标记方法为消息消费者
- **参数**:
- `topic`:消息主题
- `groupId`:消费者组ID(可选)
- `middleware`:指定使用的中间件(可选)
- `concurrency`:并发消费者数量(可选)
- `errorHandler`:自定义错误处理器类(可选)
- `async`:是否异步处理(可选,默认false)
### 2. 动态中间件切换
可以通过配置文件或代码动态切换消息中间件:
```yaml
spring:
message-middleware:
type: activemq # 切换为 ActiveMQ
```
## 启动示例
### ActiveMQ
1. **启动 ActiveMQ**

2. **运行应用**


### Kafka
1. **启动 Zookeeper**
```bash
zookeeper-server-start.bat config\zookeeper.properties
```
2. **启动 Kafka**
```bash
kafka-server-start.bat config\server.properties
```
3. **运行应用**



## 版本说明
- **1.0-SNAPSHOT** (2025-11)
- 初始版本,实现 Kafka 和 ActiveMQ 消息中间件基本功能
- **1.0.1-SNAPSHOT** (2025-12)
- 统一消息接口设计
- 支持动态中间件切换
- **2.0.0-SNAPSHOT** (2025-12)
- 基于注解实现消息生产和消费
- 优化监听器管理
- **2.1.0-SNAPSHOT** (2026-02)
- 消息增强机制实现
- 依赖管理优化(中间件依赖改为 provided scope)
- 自定义错误处理机制
- 支持重试、死信队列、告警通知等多种错误处理策略
## 贡献指南
1. Fork 项目
2. 创建特性分支 (`git checkout -b feature/AmazingFeature`)
3. 提交更改 (`git commit -m 'Add some AmazingFeature'`)
4. 推送到分支 (`git push origin feature/AmazingFeature`)
5. 提交 Pull Request
## 许可证
本项目采用 Apache License 2.0 许可证。详情请见 [LICENSE](LICENSE) 文件。
## 联系方式
- 项目地址:https://gitee.com/zhoukai1992/message_middleware
- 作者:ZhouKai
- 邮箱:zhoukai1219@126.com
- Issues:https://gitee.com/zhoukai1992/message_middleware/issues
## 开发进度计划
1. ✅ 实现 Kafka 和 ActiveMQ 动态切换基本功能
2. ✅ 基于注解实现消息生产和消费
3. ✅ 支持自定义消息增强器
4. ✅ 自定义错误处理
5. 🔄 批量消费(计划中)
6. 🔄 支持更多消息中间件(计划中)
7. 🔄 实现消息监控和管理界面(计划中)