# spring-boot-rabbitmq **Repository Path**: ouzhenxing/spring-boot-rabbitmq ## Basic Information - **Project Name**: spring-boot-rabbitmq - **Description**: springboot整合rabbitmq,使用rabbitmq解决高并发访问问题 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 3 - **Forks**: 2 - **Created**: 2020-09-30 - **Last Updated**: 2023-02-17 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## SpringBoot集成RabbitMQ实现延迟消息 ### 环境说明 erlang 版本:21.3 RabbitMQ 版本:3.7.14 SpringBoot 版本:2.3.3.RELEASE #### rabbitmq_delayed_message_exchange插件 下载地址:http://www.rabbitmq.com/community-plugins.html ![输入图片说明](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/6ebb6b61c1c948da9899cbc712b41423~tplv-k3u1fbpfcp-zoom-1.image "image-20210623114425902.png") rabbitmq-delayed-message-exchange v3.8版本适用于RabbitMQ3.7.X版本,插件要与RabbitMQ版本对应,不然使用延迟消息,会遇到各种版本不兼容的问题。 ![输入图片说明](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/ec99d17e7483459ba96dd15093c8bb0c~tplv-k3u1fbpfcp-zoom-1.image "image-20210623171448293.png") 下载插件后放在RabbitMQ安装目录下plugins目录,执行如下命令启动该插件: ```yml rabbitmq-plugins enable rabbitmq_delayed_message_exchange ``` ```yml The following plugins have been configured: rabbitmq_delayed_message_exchange ``` 启动插件后需要重启RabbitMQ使插件生效 重启RabbitMQ服务通过两个命令来实现: ```yml rabbitmqctl stop:停止RabbitMQ rabbitmq-server restart:重启RabbitMQ 注意点:rabbitmqctl是没有restart命令,所哟重启RabbitMQ需要执行以上两条命令 ``` #### 集成RabbitMQ 在pom.xml文件中加入RabbitMQ依赖 ```xml org.springframework.boot spring-boot-starter-amqp ``` #### 配置RabbitMQ连接信息 ```yml spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / ``` #### 定义ConnectionFactory和RabbitTemplate ```java package com.ozx.rabbitmqconsumer.config; import lombok.Data; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @ClassName: RabbitMqConfig * @Description: RabbitMQ相关配置 * @Author Gxin * @Date 2021/6/24 16:06 * @Version: 1.0 **/ @Data @Configuration @ConfigurationProperties(prefix = "spring.rabbitmq") public class RabbitMqConfig { private String host; private int port; private String userName; private String password; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(host,port); cachingConnectionFactory.setUsername(userName); cachingConnectionFactory.setPassword(password); cachingConnectionFactory.setVirtualHost("/"); cachingConnectionFactory.setPublisherConfirms(true); return cachingConnectionFactory; } @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory()); return rabbitTemplate; } } ``` #### 配置Queue、交换机、路由键等信息 ```java package com.ozx.rabbitmqconsumer.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * @ClassName: QueueConfig * @Description: 定义队列、路由键、交换机,路由键绑定交换机、交换机分派消息对应队列 * @Author Gxin * @Date 2021/6/23 16:49 * @Version: 2.0 **/ @Configuration public class QueueConfig { /** * 分派普通消息交换机 */ @Bean public TopicExchange topicExchange(){ return new TopicExchange("ordinary_exchange",true,false); } @Bean public Queue queue() { Queue queue = new Queue("ordinary_queue", true); return queue; } @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(topicExchange()).with("ordinary_queue"); } /** * 分派延迟消息交换机 */ @Bean public CustomExchange delayExchange(){ Map paramMap = new HashMap(); paramMap.put("x-delayed-type","direct"); return new CustomExchange("delay_exchange","x-delayed-message",true,false,paramMap); } @Bean public Queue delayQueue(){ Queue delayQueue = new Queue("delay_queue", true); return delayQueue; } @Bean public Binding delayMessagebinding(){ return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("delay_queue").noargs(); } } ``` ### 注意 延迟消息使用CustomExchange,而不是使用DirectExchange、TopicExchange,此外CustomExchange的类型必须是x-delayed-message #### 实现延迟消息 ```java package com.ozx.rabbitmqconsumer.service.impl; import com.ozx.rabbitmqconsumer.service.MessageService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.text.SimpleDateFormat; import java.util.Date; /** * @ClassName: MessageServiceImpl * @Description: 生产者生产消息 * @Author Gxin * @Date 2021/6/23 17:01 * @Version: 2.0 **/ @Service @Slf4j public class MessageServiceImpl implements MessageService { @Autowired private RabbitTemplate rabbitTemplate; @Override public void sendMsg(String queueName,String msg) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("消息发送时间:"+sdf.format(new Date())); rabbitTemplate.convertAndSend("ordinary_exchange", queueName, msg); } /** * 实现延迟消息 * @param queueName * @param message */ @Override public void sendDelayMessage(String queueName, String message) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); log.debug("消息发送时间:{}",sdf.format(new Date())); rabbitTemplate.convertAndSend("delay_exchange", queueName, message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setHeader("x-delay",3000); return message; } }); } } ``` #### 注意 发送消息需添加一个Header请求头,x-delay设置延迟时间是3s #### 消息者消费消息 ```java package com.ozx.rabbitmqconsumer.consumer; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date; /** * @ClassName: MessageReceiver * @Description: 消费者接收并处理消息 * @Author Gxin * @Date 2021/6/23 16:49 * @Version: 2.0 **/ @Component @Slf4j public class MessageReceiver { @RabbitListener(queues = "ordinary_queue") public void receive(String msg) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); log.debug("消息接收时间:{},接收到的消息:{}",sdf.format(new Date()),msg); } @RabbitListener(queues = "delay_queue") public void receiveDelayMessage(String message) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); log.debug("消息接收时间:{},接收到的消息:{}",sdf.format(new Date()),message); } } ``` #### Controller层 ```java package com.ozx.rabbitmqconsumer.controller; import com.ozx.rabbitmqconsumer.common.ApiRest; import com.ozx.rabbitmqconsumer.common.BaseController; import com.ozx.rabbitmqconsumer.service.MessageService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; /** * @ClassName: MessageController * @Description: 集成RabbitMQ实现延迟消息 * @Author Gxin * @Date 2021/6/23 16:32 * @Version: 2.0 **/ @RestController public class MessageController extends BaseController { @Autowired private MessageService messageService; @GetMapping("send") public ApiRest sendMessage(String queueName, String msg){ messageService.sendMsg(queueName,msg); return this.success(); } @GetMapping("delaySend") public ApiRest sendDelayMessage(String queueName,String message){ messageService.sendDelayMessage(queueName, message); return this.success(); } } ``` ##### 使用postman调试发送消息接口,结果如下 ![image-20210625160009544](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/7ba907481ec84743a880459a11e2e393~tplv-k3u1fbpfcp-zoom-1.image) ##### 控制台输出日志如下: ![image-20210625160051494](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/f1c82163c41c4b25962bfdd784938566~tplv-k3u1fbpfcp-zoom-1.image) ##### 实现延迟消息结果如下: ![image-20210625160147518](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/bcc6a34851124085bad3ca9928b2f8a0~tplv-k3u1fbpfcp-zoom-1.image) ![image-20210625160210246](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/c4d6a9642f3a4611aff4d3deb424002b~tplv-k3u1fbpfcp-zoom-1.image) 消息延迟3S后,被消息者接收并处理