# mqtt
**Repository Path**: lfyainr/mqtt
## Basic Information
- **Project Name**: mqtt
- **Description**: mqtt 连接池
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 1
- **Forks**: 1
- **Created**: 2018-11-05
- **Last Updated**: 2022-10-27
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# mqtt
#### 项目介绍
mqtt 单例和连接池
使用的版本是 ActiveMQ Apollo 1.7.1,下载地址:http://activemq.apache.org/apollo/download.html。
该项目提供了 mqtt 连接池,提高了消息发送的效率,
但是,
请注意,
但是这样以来就不能保证自身发布消息的时序性了,如果需要保证时序,则只能单例执行。
单例模式:mqtt1包下
连接池模式:mqtt2包下
#### 软件架构
项目中使用了lombok插件和Slf4j插件。如果你需要你没有安装的话,可以先安装一下;如果不想安装的话,可以将代码里面的 @Slf4j @Setter @Getter @Data等注解去掉,换上相应的代码实现。
#### 安装教程
1. windows环境下安装教程:https://blog.csdn.net/jiangcsc/article/details/74025672
2. linux环境下安装教程:https://blog.csdn.net/Bluechalk/article/details/78571561
#### 使用说明
1. 如果想看测试效果的话,可以参考 com.htby.bksw.imcmqtt.mqtt 包下的 Server 和 Client 例子;如果是想使用线程池模式的话,可以使用 com.htby.bksw.imcmqtt.mqtt2 包下的
MqttUtils 类提供的相关方法。
2. 配置文件 -- appliction.yml
# mq
com:
mqtt:
host: tcp://1p8988h861.iask.in:37546
clientid: servertpye_serverid
username: admin
password: password
timeout: 10
keepalive: 20
# mqttpool
mqttclientpool:
maximal: 100
minimum: 5
initial: 5
step: 10
3. 需要的依赖
org.projectlombok
lombok
true
org.springframework.boot
spring-boot-starter-integration
org.springframework.integration
spring-integration-stream
org.springframework.integration
spring-integration-mqtt
4. 消息推送与订阅
mqtt 工具类
package com.htby.bksw.imcmqtt.mqtt2;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
/**
* mqtt 工具类
* @author 周西栋
* @date
* @param
* @return
*/
@Slf4j
public class MqttUtils {
private MqttClientPool pool = MqttClientPool.instance();
/**
* 发布消息 MqttMessage
* @author 周西栋
* @date
* @param
* @return
*/
public boolean publish(String topic, MqttMessage msg){
MqttClient client = pool.getClient();
MqttTopic mqttTopic = client.getTopic(topic);
try {
mqttTopic.publish(msg);
log.info("消息发送成功!");
log.info("消息内容是:{}",msg.getPayload());
} catch (MqttException e) {
log.error("消息发送失败!");
log.error("异常信息为:{}",e.getMessage());
return false;
}finally {
pool.close(client);
}
return true;
}
/**
* 订阅消息
* @author 周西栋
* @date
* @param
* @return
*/
public void subscrib(String topic){
MqttClient client = pool.getClient();
//订阅消息
int[] Qos = {1};
String[] array_topic = {topic};
try {
client.subscribe(array_topic, Qos);
} catch (MqttException e) {
log.error("订阅消息异常!");
log.error("异常的topic有:{}",topic.toString());
log.error("异常信息为:{}",e.getMessage());
}finally {
pool.close(client);
}
}
}
mqtt 线程池
package com.htby.bksw.imcmqtt.mqtt2;
import lombok.Data;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* MqttClient 连接池
* @author 周西栋
* @date
* @param
* @return
*/
@Slf4j
@Configuration
public class MqttClientPool {
private static MqttClientPool MQTTCLIENTPOOL = new MqttClientPool();
private MqttClientPool(){}
public static MqttClientPool instance(){
return MQTTCLIENTPOOL;
}
// 连接池的最大容量
@Value("${mqttclientpool.maximal:#{200}}")
private static int maximal;
// 连接池的最小容量
@Value("${mqttclientpool.minimum:#{20}}")
private static int minimum;
// 连接池的初始容量
@Value("${mqttclientpool.initial:#{20}}")
private static int initial;
// 连接池的扩充容量
@Value("${mqttclientpool.step:#{10}}")
private static int step;
// 空闲连接池
private static LinkedList pool;
// 已用连接池
private static LinkedList pool_used;
// 静态代码块
static {
pool = new LinkedList<>();
pool_used = new LinkedList<>();
AutoMqttProperties properties = new AutoMqttProperties();
MqttClientFactory factory = new MqttClientFactory();
// 初始化连接池
for (int i=0; i= maximal){
return false;
}
AutoMqttProperties properties = new AutoMqttProperties();
MqttClientFactory factory = new MqttClientFactory();
int limitnum = (pool.size() + step > maximal) ? (pool.size() + step - maximal) : step;
// 扩容连接池
for (int i=0; i minimum || pool.size() > 0){
client = pool.get(0);
pool.remove(0);
pool_used.add(client);
} else if (pool.size() <= minimum || pool.size() == 0){
dilatation();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error("线程睡眠异常,异常信息为:{}",e.getMessage());
}
} else {
log.error("实在抱歉,我是mqtt连接池,我已经没有连接可以提供了!");
throw new RuntimeException("实在抱歉,连接池里已经没有连接了!");
}
return client;
}
// 归还连接
public boolean close(MqttClient client){
if (client == null){
return false;
}
boolean b = pool_used.remove(client);
if (b){
if (pool.add(client)){
return true;
}else {
pool_used.add(client);
return false;
}
}else {
return false;
}
}
}
订阅回调
package com.htby.bksw.imcmqtt.mqtt2;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
/**
* 发布消息的回调类
*
* 必须实现MqttCallback的接口并实现对应的相关接口方法
* ◦CallBack 类将实现 MqttCallBack。每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。在回调中,将它用来标识已经启动了该回调的哪个实例。
* ◦必须在回调类中实现三个方法:
*
* public void messageArrived(MqttTopic topic, MqttMessage message)
* 接收已经预订的发布。
*
* public void connectionLost(Throwable cause)
* 在断开连接时调用。
*
* public void deliveryComplete(MqttDeliveryToken token))
* 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
* ◦由 MqttClient.connect 激活此回调。
*
*/
@Slf4j
public class PushCallback implements MqttCallback {
public void connectionLost(Throwable cause) {
log.error("连接断开,正在重试连接。。。");
}
@Override
public void messageArrived( String topic, MqttMessage mqttMessage ) throws Exception {
// subscribe后得到的消息会执行到这里面
log.info("接收消息主题:{}",topic);
log.info("接收消息Qos:{}",mqttMessage.getQos());
log.info("接收消息内容:{}",new String(mqttMessage.getPayload()));
}
public void messageArrived(MqttTopic topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
log.info("接收消息主题:{}",topic.getName());
log.info("接收消息Qos:{}",message.getQos());
log.info("接收消息内容:{}",new String(message.getPayload()));
}
@Override
public void deliveryComplete( IMqttDeliveryToken iMqttDeliveryToken ) {
// publish后会执行到这里
log.info("deliveryComplete: {}",iMqttDeliveryToken.isComplete());
}
}
测试代码
package com.htby.bksw.imcmqtt.testMqtt;
import com.htby.bksw.imcmqtt.mqtt2.MqttUtils;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.junit.Test;
import java.util.Date;
/**
* 测试mqtt2 下的类
* @author 周西栋
* @date
* @param
* @return
*/
@Slf4j
public class Test2 {
@Test
public void test1() throws InterruptedException {
MqttUtils mqttUtils = new MqttUtils();
MqttMessage message = new MqttMessage();
message.setQos(1);
message.setRetained(false);
long t1 = System.currentTimeMillis();
for(int i=0; i<2000;i++){
Thread.sleep(1);
message.setPayload(("你好啊" + " " +i).getBytes());
mqttUtils.publish("abc",message);
}
long t2 = System.currentTimeMillis();
log.info("用时:{}",(t2 - t1)/1000);
}
}
#### 参与贡献
1. 由于作者水平有限,若有不足之处,欢迎指正。
2. 若有更好改进提议,请自行提交代码,并提供更改说明。
3. 也可以将相关修改意见发送到 2050576675@qq.com ,作者收到后,及时更新项目
#### 码云特技
1. 使用 Readme\_XXX.md 来支持不同的语言,例如 Readme\_en.md, Readme\_zh.md
2. 码云官方博客 [blog.gitee.com](https://blog.gitee.com)
3. 你可以 [https://gitee.com/explore](https://gitee.com/explore) 这个地址来了解码云上的优秀开源项目
4. [GVP](https://gitee.com/gvp) 全称是码云最有价值开源项目,是码云综合评定出的优秀开源项目
5. 码云官方提供的使用手册 [https://gitee.com/help](https://gitee.com/help)
6. 码云封面人物是一档用来展示码云会员风采的栏目 [https://gitee.com/gitee-stars/](https://gitee.com/gitee-stars/)