# message_middleware **Repository Path**: zhoukai1992/message_middleware ## Basic Information - **Project Name**: message_middleware - **Description**: 支持Kafka、ActiveMQ等多种主流消息中间件的无缝动态切换 - **Primary Language**: Unknown - **License**: AGPL-3.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 3 - **Forks**: 0 - **Created**: 2025-11-27 - **Last Updated**: 2026-02-10 ## Categories & Tags **Categories**: Uncategorized **Tags**: Java, Kafka, ActiveMq, SpringBoot, RabbitMQ ## README # 消息中间件平台 ## 介绍 这是一个基于 Spring Boot 开发的消息中间件平台,提供了统一的消息发送和接收接口,支持 Kafka、ActiveMQ 和 RabbitMQ 等多种消息中间件的动态切换。通过简单的配置和注解,开发者可以轻松实现消息的生产和消费,无需关心底层消息中间件的实现细节。 ### 核心功能 - **统一消息接口**:提供统一的消息发送和接收接口,屏蔽不同消息中间件的差异 - **动态中间件切换**:支持在运行时动态切换 Kafka、ActiveMQ 和 RabbitMQ 等消息中间件 - **基于注解的开发**:使用简单的注解即可实现消息的生产和消费 - **异步消息处理**:支持异步消息发送和接收 - **动态监听器注册**:支持动态注册和注销消息监听器 - **消息增强机制**:支持自定义消息增强器,可在消息发送前对消息进行增强处理 - **自定义错误处理**:支持灵活的错误处理机制,可配置重试、死信队列、告警通知等策略 - **全局异常处理**:提供统一的异常处理框架,支持多种错误处理策略 ## 软件架构 ![输入图片说明](images/screenshot_2025-11-27_15-58-00.png) ## 工作原理 ### Kafka 实现方式 在 Kafka 中,消费者组是一个核心概念。每个消费者组通过 [group.id](message/src/main/java/com/zhoukai/message/config/KafkaConfig.java#L_44-L_44) 进行标识,组内的消费者共同消费主题的数据。Kafka 保证每条消息只会被同一个消费者组内的一个消费者消费,从而实现负载均衡。 当有多个消费者组订阅同一个主题时,每个消费者组都会收到完整的消息副本,实现广播语义。 ### RabbitMQ 实现方式 本项目中,为了实现类似 Kafka 的消费者组概念,采用了以下策略: 1. **队列设计**:为每个消费者组创建独立的队列,例如 `ORDER_CREATED_GROUP_1` 和 `ORDER_CREATED_GROUP_2` 2. **交换器绑定**:所有队列都绑定到同一个 DirectExchange 上,并使用相同的路由键 3. **消息发送**:当发送消息到某个主题时,会查找所有订阅该主题的消费者组,并将消息发送到每个组对应的队列中 4. **消费行为**: - 相同组内:多个消费者监听同一队列,实现轮询消费(竞争消费者) - 不同组间:各自监听自己的队列,实现广播消费(每个组都能收到完整消息) 核心实现类包括: - [RabbitMQConfig](message/src/main/java/com/zhoukai/message/config/RabbitMQConfig.java):配置交换器、队列和绑定关系 - [RabbitMessageProcessor](message/src/main/java/com/zhoukai/message/processor/RabbitMessageProcessor.java) :处理消息发送逻辑,确保消息能发送到所有相关消费者组 ### ActiveMQ 实现方式 ActiveMQ 通过 Virtual Topics(虚拟主题)实现消费者组语义: 1. 生产者发送消息到虚拟主题(如 `VirtualTopic.Orders`) 2. 消费者从各自的队列中消费消息(如 `Consumer.Group1.VirtualTopic.Orders`) 3. 同一组内的消费者共享队列实现负载均衡 4. 不同组的消费者使用不同的队列实现消息广播 ## 快速开始 ### 环境要求 - JDK 1.8+ - Maven 3.5+ - Spring Boot 2.1.6.RELEASE - Kafka 2.0.1+ (可选) - ActiveMQ 2.6.4+ (可选) - RabbitMQ 3.8+ (可选) ### 安装步骤 1. **克隆项目** ```bash git clone <项目地址> cd message_middleware ``` 2. **构建项目** ```bash mvn clean install ``` 3. **运行示例** ```bash cd use-samples mvn spring-boot:run ``` ## 使用说明 ### 1. 添加依赖 ```xml com.zhoukai message 2.1.0-SNAPSHOT ``` ### 2. 添加消息中间件依赖(重要!) 根据你要使用的消息中间件,在你的项目中添加相应的依赖: **Kafka 依赖:** ```xml org.springframework.kafka spring-kafka ``` **ActiveMQ 依赖:** ```xml org.springframework.boot spring-boot-starter-artemis ``` **RabbitMQ 依赖:** ```xml org.springframework.boot spring-boot-starter-amqp ``` > ⚠️ **重要提示**: > - message 模块本身不传递这些依赖(设置为 provided 作用域) > - 使用方必须显式添加所需的消息中间件依赖 > - 如果未添加任何依赖,应用启动时会报错并提示具体解决方案 ### 3. 配置文件 在 `application.yml` 中配置消息中间件: #### Kafka 配置 ```yaml app: message: middleware: active: kafka async-enabled: true spring: application: name: message-middleware kafka: bootstrap-servers: localhost:9092 consumer: group-id: my-consumer-group producer: retries: 3 ``` #### ActiveMQ 配置 ```yaml app: message: middleware: active: activemq async-enabled: true spring: application: name: message-middleware artemis: host: localhost port: 61616 user: admin password: admin ``` #### RabbitMQ 配置 ```yaml app: message: middleware: active: rabbitmq async-enabled: true spring: application: name: message-middleware rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: / ``` ### 3. 消息生产者 使用 `@MessageEvent` 注解标记方法,当方法执行时,会自动将返回结果发送到指定主题: ```java @Service @Slf4j public class OrderService { @MessageEvent(topic = "ORDER_CREATED") public Order createOrder(CreateOrderRequest request) { Order order = new Order(); order.setOrderId("ORDER_" + UUID.randomUUID().toString().substring(0, 8)); order.setUserId(request.getUserId()); order.setAmount(request.getAmount()); order.setProductName(request.getProductName()); order.setQuantity(request.getQuantity()); order.setStatus("CREATED"); order.setCreateTime(new Date()); log.info("创建订单: {}", order.getOrderId()); return order; } } ``` ### 4. 消息消费者 使用 `@MessageEventListener` 注解标记方法,当有消息到达指定主题时,会自动调用该方法: ```java @Component @Slf4j public class OrderCreatedListener { @MessageEventListener(topic = "ORDER_CREATED") public void orderCreatedGroup1(Order order) { log.info("收到订单创建消息-GROUP: {}, message: {}", "GROUP_1", JSONUtil.toJsonStr(order)); } @MessageEventListener(topic = "ORDER_CREATED", groupId = "GROUP_2") public void orderCreatedGroup2(BusinessMessage message) { Order order = message.getMessageContent(); log.info("收到订单创建消息-GROUP: {}, message: {}", "GROUP_2", JSONUtil.toJsonStr(message)); } } ``` ### 5. 自定义错误处理 系统支持为消息生产者和消费者配置自定义错误处理器,当消息处理失败时会自动调用指定的错误处理逻辑。 #### 生产者错误处理 在 [@MessageEvent](message/src/main/java/com/zhoukai/message/annotation/MessageEvent.java) 注解中指定 `errorHandler` 属性: ```java @Service @Slf4j public class OrderService { @MessageEvent( topic = "ORDER_CREATED", errorHandler = CustomProducerOrderErrorHandler.class // 指定自定义错误处理器 ) public Order createOrder(CreateOrderRequest request) { Order order = new Order(); order.setOrderId("ORDER_" + UUID.randomUUID().toString().substring(0, 8)); order.setUserId(request.getUserId()); order.setAmount(request.getAmount()); log.info("创建订单: {}", order.getOrderId()); return order; } } ``` #### 消费者错误处理 在 [@MessageEventListener](message/src/main/java/com/zhoukai/message/annotation/MessageEventListener.java) 注解中指定 `errorHandler` 属性: ```java @Component @Slf4j public class OrderCreatedListener { @MessageEventListener( topic = "ORDER_CREATED", errorHandler = CustomOrderErrorHandler.class // 指定自定义错误处理器 ) public void orderCreatedGroup1(Order order) { // 故意抛出异常来测试错误处理 if (order.getAmount().compareTo(BigDecimal.valueOf(1000)) > 0) { throw new RuntimeException("订单金额过高: " + order.getAmount()); } log.info("收到订单创建消息: {}", JSONUtil.toJsonStr(order)); } } ``` #### 自定义错误处理器实现 创建实现对应错误处理器接口的类: ##### 生产者错误处理 ```java @Component public class CustomProducerOrderErrorHandler implements MessageProducerErrorHandler { @Override public void handleException(BusinessMessage message, Exception exception, String className, String methodName) throws Exception { log.error("订单消息处理失败 - 类: {}, 方法: {}, 错误: {}", className, methodName, exception.getMessage()); // 实现具体的错误处理逻辑 if (message.getMessageContent() instanceof Order) { Order failedOrder = (Order) message.getMessageContent(); log.info("失败的订单信息: 订单ID={}, 金额={}", failedOrder.getOrderId(), failedOrder.getAmount()); } // 可以实现重试逻辑、死信队列、告警通知等 handleWithRetry(message, exception); sendAlertNotification(exception.getMessage()); // 重新抛出异常让上层处理 throw exception; } private void handleWithRetry(BusinessMessage message, Exception exception) { // 实现重试逻辑 log.info("执行重试逻辑: messageId={}", message.getMessageId()); } private void sendAlertNotification(String errorMessage) { // 发送告警通知逻辑 log.info("发送告警通知: {}", errorMessage); } } ``` ##### 消费者错误处理 ```java @Component public class CustomConsumerOrderErrorHandler implements MessageConsumerErrorHandler { @Override public void handleException(BusinessMessage message, Exception exception, String className, String methodName) throws Exception { log.error("订单消息处理失败 - 类: {}, 方法: {}, 错误: {}", className, methodName, exception.getMessage()); // 实现具体的错误处理逻辑 if (message.getMessageContent() instanceof Order) { Order failedOrder = (Order) message.getMessageContent(); log.info("失败的订单信息: 订单ID={}, 金额={}", failedOrder.getOrderId(), failedOrder.getAmount()); } // 可以实现重试逻辑、死信队列、告警通知等 handleWithRetry(message, exception); sendAlertNotification(exception.getMessage()); // 重新抛出异常让上层处理 throw exception; } private void handleWithRetry(BusinessMessage message, Exception exception) { // 实现重试逻辑 log.info("执行重试逻辑: messageId={}", message.getMessageId()); } private void sendAlertNotification(String errorMessage) { // 发送告警通知逻辑 log.info("发送告警通知: {}", errorMessage); } } ``` #### 默认错误处理 如果不指定 `errorHandler`,系统将使用默认的重试机制: ```java @Component @Slf4j public class OrderListener { @MessageEventListener(topic = "ORDER_CREATED") // 使用默认错误处理器(包含3次重试机制) public void orderCreatedDefaultHandler(Order order) { // 消费逻辑 log.info("处理订单消息: {}", order.getOrderId()); } } ``` #### 错误处理机制特点 1. **生产者错误处理**:支持消息发送失败时的重试和异常处理 2. **消费者错误处理**:支持消息消费失败时的自定义处理逻辑 3. **重试机制**:默认提供3次重试,支持指数退避算法 4. **异常传播**:处理完错误后可以选择重新抛出异常 ### 6. 消息增强功能 系统支持灵活的消息增强机制,可以在消息发送前对消息进行自定义增强处理。 #### 内置增强器 **主题前缀增强器**:为所有消息主题添加统一前缀 ```yaml app: message: enhancer: topic-prefix: enabled: true value: "prod_" order: 0 ``` **消息追踪增强器**:为消息添加追踪信息 ```yaml app: message: enhancer: tracking: enabled: true order: 50 ``` #### 自定义增强器 开发者可以实现 `MessageEnhancer` 接口创建自定义消息增强器: ```java @Component public class CustomMessageEnhancer implements MessageEnhancer { @Override public BusinessMessage enhanceMessage(BusinessMessage message) { if (message != null) { // 添加自定义头部信息 message.addHeader("environment", "production"); } return message; } @Override public int getOrder() { return 150; // 执行顺序 } @Override public boolean isEnabled() { return true; // 是否开启增强 } } ``` 详细使用文档请参考:[消息增强接口使用指南](MessageEnhancer-Guide.md) ### 6. Controller 示例 ```java @Slf4j @RestController @RequestMapping("/api/orders") public class OrderController { @Autowired private OrderService orderService; @PostMapping public Order createOrder(@RequestBody CreateOrderRequest request) { log.info("收到创建订单请求 - 用户: {}", request.getUserId()); return orderService.createOrder(request); } } ``` ## 核心功能详解 ### 1. 注解说明 #### @MessageEvent - **用途**:标记方法为消息生产者 - **参数**: - `topic`:消息主题 - `middleware`:指定使用的中间件(可选) #### @MessageEventListener - **用途**:标记方法为消息消费者 - **参数**: - `topic`:消息主题 - `groupId`:消费者组ID(可选) - `middleware`:指定使用的中间件(可选) - `concurrency`:并发消费者数量(可选) - `errorHandler`:自定义错误处理器类(可选) - `async`:是否异步处理(可选,默认false) ### 2. 动态中间件切换 可以通过配置文件或代码动态切换消息中间件: ```yaml spring: message-middleware: type: activemq # 切换为 ActiveMQ ``` ## 启动示例 ### ActiveMQ 1. **启动 ActiveMQ** ![ActiveMQ 配置](images/配置1.png) 2. **运行应用** ![ActiveMQ 启动1](images/启动1.png) ![ActiveMQ 启动2](images/启动2.png) ### Kafka 1. **启动 Zookeeper** ```bash zookeeper-server-start.bat config\zookeeper.properties ``` 2. **启动 Kafka** ```bash kafka-server-start.bat config\server.properties ``` 3. **运行应用** ![Kafka 启动1](images/4.png) ![Kafka 启动2](images/5.png) ![Kafka 启动3](images/6.png) ## 版本说明 - **1.0-SNAPSHOT** (2025-11) - 初始版本,实现 Kafka 和 ActiveMQ 消息中间件基本功能 - **1.0.1-SNAPSHOT** (2025-12) - 统一消息接口设计 - 支持动态中间件切换 - **2.0.0-SNAPSHOT** (2025-12) - 基于注解实现消息生产和消费 - 优化监听器管理 - **2.1.0-SNAPSHOT** (2026-02) - 消息增强机制实现 - 依赖管理优化(中间件依赖改为 provided scope) - 自定义错误处理机制 - 支持重试、死信队列、告警通知等多种错误处理策略 ## 贡献指南 1. Fork 项目 2. 创建特性分支 (`git checkout -b feature/AmazingFeature`) 3. 提交更改 (`git commit -m 'Add some AmazingFeature'`) 4. 推送到分支 (`git push origin feature/AmazingFeature`) 5. 提交 Pull Request ## 许可证 本项目采用 Apache License 2.0 许可证。详情请见 [LICENSE](LICENSE) 文件。 ## 联系方式 - 项目地址:https://gitee.com/zhoukai1992/message_middleware - 作者:ZhouKai - 邮箱:zhoukai1219@126.com - Issues:https://gitee.com/zhoukai1992/message_middleware/issues ## 开发进度计划 1. ✅ 实现 Kafka 和 ActiveMQ 动态切换基本功能 2. ✅ 基于注解实现消息生产和消费 3. ✅ 支持自定义消息增强器 4. ✅ 自定义错误处理 5. 🔄 批量消费(计划中) 6. 🔄 支持更多消息中间件(计划中) 7. 🔄 实现消息监控和管理界面(计划中)