# mqtt **Repository Path**: lfyainr/mqtt ## Basic Information - **Project Name**: mqtt - **Description**: mqtt 连接池 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 1 - **Created**: 2018-11-05 - **Last Updated**: 2022-10-27 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # mqtt #### 项目介绍 mqtt 单例和连接池 使用的版本是 ActiveMQ Apollo 1.7.1,下载地址:http://activemq.apache.org/apollo/download.html。 该项目提供了 mqtt 连接池,提高了消息发送的效率, 但是, 请注意, 但是这样以来就不能保证自身发布消息的时序性了,如果需要保证时序,则只能单例执行。 单例模式:mqtt1包下 连接池模式:mqtt2包下 #### 软件架构 项目中使用了lombok插件和Slf4j插件。如果你需要你没有安装的话,可以先安装一下;如果不想安装的话,可以将代码里面的 @Slf4j @Setter @Getter @Data等注解去掉,换上相应的代码实现。 #### 安装教程 1. windows环境下安装教程:https://blog.csdn.net/jiangcsc/article/details/74025672 2. linux环境下安装教程:https://blog.csdn.net/Bluechalk/article/details/78571561 #### 使用说明 1. 如果想看测试效果的话,可以参考 com.htby.bksw.imcmqtt.mqtt 包下的 Server 和 Client 例子;如果是想使用线程池模式的话,可以使用 com.htby.bksw.imcmqtt.mqtt2 包下的 MqttUtils 类提供的相关方法。 2. 配置文件 -- appliction.yml # mq com: mqtt: host: tcp://1p8988h861.iask.in:37546 clientid: servertpye_serverid username: admin password: password timeout: 10 keepalive: 20 # mqttpool mqttclientpool: maximal: 100 minimum: 5 initial: 5 step: 10 3. 需要的依赖 org.projectlombok lombok true org.springframework.boot spring-boot-starter-integration org.springframework.integration spring-integration-stream org.springframework.integration spring-integration-mqtt 4. 消息推送与订阅 mqtt 工具类 package com.htby.bksw.imcmqtt.mqtt2; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttTopic; /** * mqtt 工具类 * @author 周西栋 * @date * @param * @return */ @Slf4j public class MqttUtils { private MqttClientPool pool = MqttClientPool.instance(); /** * 发布消息 MqttMessage * @author 周西栋 * @date * @param * @return */ public boolean publish(String topic, MqttMessage msg){ MqttClient client = pool.getClient(); MqttTopic mqttTopic = client.getTopic(topic); try { mqttTopic.publish(msg); log.info("消息发送成功!"); log.info("消息内容是:{}",msg.getPayload()); } catch (MqttException e) { log.error("消息发送失败!"); log.error("异常信息为:{}",e.getMessage()); return false; }finally { pool.close(client); } return true; } /** * 订阅消息 * @author 周西栋 * @date * @param * @return */ public void subscrib(String topic){ MqttClient client = pool.getClient(); //订阅消息 int[] Qos = {1}; String[] array_topic = {topic}; try { client.subscribe(array_topic, Qos); } catch (MqttException e) { log.error("订阅消息异常!"); log.error("异常的topic有:{}",topic.toString()); log.error("异常信息为:{}",e.getMessage()); }finally { pool.close(client); } } } mqtt 线程池 package com.htby.bksw.imcmqtt.mqtt2; import lombok.Data; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import java.util.LinkedList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * MqttClient 连接池 * @author 周西栋 * @date * @param * @return */ @Slf4j @Configuration public class MqttClientPool { private static MqttClientPool MQTTCLIENTPOOL = new MqttClientPool(); private MqttClientPool(){} public static MqttClientPool instance(){ return MQTTCLIENTPOOL; } // 连接池的最大容量 @Value("${mqttclientpool.maximal:#{200}}") private static int maximal; // 连接池的最小容量 @Value("${mqttclientpool.minimum:#{20}}") private static int minimum; // 连接池的初始容量 @Value("${mqttclientpool.initial:#{20}}") private static int initial; // 连接池的扩充容量 @Value("${mqttclientpool.step:#{10}}") private static int step; // 空闲连接池 private static LinkedList pool; // 已用连接池 private static LinkedList pool_used; // 静态代码块 static { pool = new LinkedList<>(); pool_used = new LinkedList<>(); AutoMqttProperties properties = new AutoMqttProperties(); MqttClientFactory factory = new MqttClientFactory(); // 初始化连接池 for (int i=0; i= maximal){ return false; } AutoMqttProperties properties = new AutoMqttProperties(); MqttClientFactory factory = new MqttClientFactory(); int limitnum = (pool.size() + step > maximal) ? (pool.size() + step - maximal) : step; // 扩容连接池 for (int i=0; i minimum || pool.size() > 0){ client = pool.get(0); pool.remove(0); pool_used.add(client); } else if (pool.size() <= minimum || pool.size() == 0){ dilatation(); try { Thread.sleep(1000); } catch (InterruptedException e) { log.error("线程睡眠异常,异常信息为:{}",e.getMessage()); } } else { log.error("实在抱歉,我是mqtt连接池,我已经没有连接可以提供了!"); throw new RuntimeException("实在抱歉,连接池里已经没有连接了!"); } return client; } // 归还连接 public boolean close(MqttClient client){ if (client == null){ return false; } boolean b = pool_used.remove(client); if (b){ if (pool.add(client)){ return true; }else { pool_used.add(client); return false; } }else { return false; } } } 订阅回调 package com.htby.bksw.imcmqtt.mqtt2; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttTopic; /** * 发布消息的回调类 * * 必须实现MqttCallback的接口并实现对应的相关接口方法 * ◦CallBack 类将实现 MqttCallBack。每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。在回调中,将它用来标识已经启动了该回调的哪个实例。 * ◦必须在回调类中实现三个方法: * * public void messageArrived(MqttTopic topic, MqttMessage message) * 接收已经预订的发布。 * * public void connectionLost(Throwable cause) * 在断开连接时调用。 * * public void deliveryComplete(MqttDeliveryToken token)) * 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。 * ◦由 MqttClient.connect 激活此回调。 * */ @Slf4j public class PushCallback implements MqttCallback { public void connectionLost(Throwable cause) { log.error("连接断开,正在重试连接。。。"); } @Override public void messageArrived( String topic, MqttMessage mqttMessage ) throws Exception { // subscribe后得到的消息会执行到这里面 log.info("接收消息主题:{}",topic); log.info("接收消息Qos:{}",mqttMessage.getQos()); log.info("接收消息内容:{}",new String(mqttMessage.getPayload())); } public void messageArrived(MqttTopic topic, MqttMessage message) throws Exception { // subscribe后得到的消息会执行到这里面 log.info("接收消息主题:{}",topic.getName()); log.info("接收消息Qos:{}",message.getQos()); log.info("接收消息内容:{}",new String(message.getPayload())); } @Override public void deliveryComplete( IMqttDeliveryToken iMqttDeliveryToken ) { // publish后会执行到这里 log.info("deliveryComplete: {}",iMqttDeliveryToken.isComplete()); } } 测试代码 package com.htby.bksw.imcmqtt.testMqtt; import com.htby.bksw.imcmqtt.mqtt2.MqttUtils; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.junit.Test; import java.util.Date; /** * 测试mqtt2 下的类 * @author 周西栋 * @date * @param * @return */ @Slf4j public class Test2 { @Test public void test1() throws InterruptedException { MqttUtils mqttUtils = new MqttUtils(); MqttMessage message = new MqttMessage(); message.setQos(1); message.setRetained(false); long t1 = System.currentTimeMillis(); for(int i=0; i<2000;i++){ Thread.sleep(1); message.setPayload(("你好啊" + " " +i).getBytes()); mqttUtils.publish("abc",message); } long t2 = System.currentTimeMillis(); log.info("用时:{}",(t2 - t1)/1000); } } #### 参与贡献 1. 由于作者水平有限,若有不足之处,欢迎指正。 2. 若有更好改进提议,请自行提交代码,并提供更改说明。 3. 也可以将相关修改意见发送到 2050576675@qq.com ,作者收到后,及时更新项目 #### 码云特技 1. 使用 Readme\_XXX.md 来支持不同的语言,例如 Readme\_en.md, Readme\_zh.md 2. 码云官方博客 [blog.gitee.com](https://blog.gitee.com) 3. 你可以 [https://gitee.com/explore](https://gitee.com/explore) 这个地址来了解码云上的优秀开源项目 4. [GVP](https://gitee.com/gvp) 全称是码云最有价值开源项目,是码云综合评定出的优秀开源项目 5. 码云官方提供的使用手册 [https://gitee.com/help](https://gitee.com/help) 6. 码云封面人物是一档用来展示码云会员风采的栏目 [https://gitee.com/gitee-stars/](https://gitee.com/gitee-stars/)