- 一、MQTT协议概念
- 发布/订阅机制
- MQTT客户端
- Broker代理(服务器)
- MQTT消息结构
- 二、MQTT协议实现原理
- MQTT连接
- MQTT消息发布
- MQTT订阅机制
- MQTT订阅确认
- MQTT取消订阅
- MQTT确认取消订阅
- 三、MQTT基本功能
- 持久会话
- 四、MQTT Demo
- 搭建MQTT服务器
- 搭建MQTT消息推送客户端
- 搭建MQTT消息订阅客户端
- 环境测试
- 五、MQTT常见问题
- MQTT消息持久化
- MQTT订阅恢复机制
- MQTT和消息队列的区别
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),它是一个极其轻量级的发布/订阅
消息传输协议,轻量级指的是较少的代码和带宽。因为在物联网行业有类似充电桩、娃娃机、遥控飞行器等等这样的设备,它们的网络可能存在不稳定的情况并且只需要传输少量的数据,MQTT就应运而生专为受限设备和低带宽、高延迟或不可靠的网络而设计。
发布/订阅模型将发送消息的客户端(发布者)与接收消息的客户端(订阅者)分离。发布者和订阅者从不直接联系。他们甚至不知道对方的存在,它们之间由一个第三方组件(代理)处理帮助筛选所有传入消息,并将其正确分发给订阅者。消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者
这个机制最重要的是将发布者和订阅者进行解耦
- 发布者、订阅者不需要交换端口知道对方的主机,只需要知道代理的主机和端口
- 发布者、订阅者不需要同时都运行,哪怕一方下线
- 发布或接收期间,这两个组件上的操作都不需要中断
发布者和订阅者都是客户端,可以是设备也可以是服务器,简单来说就是网络连接到MQTT代理的任何设备。
Broker代理(服务器)代理负责接收所有消息、过滤消息、确定谁订阅了每条消息,并将消息发送到这些订阅的客户端。代理还保存具有持久会话的所有客户端的会话数据,包括订阅和丢失的消息。代理的另一个职责是客户端的身份验证和授权。通常,代理是可扩展的,这有助于自定义身份验证、授权和集成到后端系统中。
MQTT消息结构MQTT消息包含三个部分:
-
固定头(Fixed header)
-
可变头(Variable header)
-
消息体(payload)
MQTT 客户端需要连接到代理后立即发布消息,然后订阅者从里面订阅数据,这里涉及到六个部分:CONNECT
、Publish
、Subscribe
、Unsubscribe
、SUBACK
、Unsuback
客户端向代理发送CONNECT消息。代理响应一个CONNACK消息和一个状态码。连接建立后,代理将保持连接打开,直到客户端发送断开连接命令或连接断开
CONNECT消息主要包含以下内容:
-
ClientId:代理使用ClientId来标识客户端和客户端当前状态,对于每个客户端和代理ClientId是唯一的
-
Clean Session:标志告诉代理客户端是否想要建立一个持久会话。如果为false代理会存储客户端的所有订阅以及使用服务质量(QoS)级别1或2进行订阅的客户端的所有错过的消息。如果为true代理不为客户端存储任何内容,并清除以前任何持久会话中的所有信息
-
Username/Password:用户名和密码用于客户端身份验证和授权。强烈建议用户名和密码与安全传输使用SSL证书验证客户端,因此不需要用户名和密码
-
Will Message:遗嘱,当客户端断开连接时,此消息通知其他客户端
-
KeepAlive:客户端指定并在连接建立时与代理通信。这个间隔定义了代理和客户端在不发送消息的情况下可以忍受的最长时间
-
LWT字段:包含lastWillTopic、lastWillMessage、lastWillRetain、lastWillQos
这个字段可以帮助了解客户端是正常断开连接(使用 MQTT 断开连接消息)还是不正常断开连接(没有断开连接消息),检测到客户端已不正常地断开连接。为了响应不正常的断开连接,代理将最后一个将消息发送到最后一个将消息主题的所有订阅客户端。如果客户端使用正确的断开连接消息正常断开连接,那么代理将丢弃存储的 LWT 消息
代理收到 CONNECT 消息时,返回连接确认标志
MQTT消息发布每条消息都必须包含一个主题,代理可以使用该主题将消息转发给感兴趣的客户端
Publish消息包含以下内容:
-
packetID:数据包标识符在消息在客户端和代理之间流动时唯一标识消息。数据包标识符仅与大于零的 QoS 级别相关
-
topicName:主题名称,主题区分大小写
主题格式就像URL:deviceName/1638791867
- +:表示任意匹配某一级主题,例如
deviceName/+/weaved
可以匹配deviceName/1638791867/weaved
,但是无法匹配deviceName/1638791867/weaving
- #:表示匹配多级,例如
deviceName/#
可以匹配deviceName/1638791867/weaved
- $:是为 MQTT 代理的内部统计信息保留的,客户端无法向这些主题发布消息
- +:表示任意匹配某一级主题,例如
-
QOS:服务级别质量,有3 个 QoS 级别
-
最多一次 (0)
只会传输一次,不能保证对方一定会收到
-
至少一次 (1)常用
至少保证对方能够收到一次消息,获得接收方发来的 PUBACK数据包,如果发送方在合理的时间内未收到 PUBACK 数据包,则发送方将重新发送 PUBLISH 数据包
-
正好一次 (2)
QoS 2 是最安全、最慢的服务质量级别,由发送方和接收方之间的至少两个请求/响应流(四部分握手)提供。
(1)、当接收方从发送方获取 QoS 2 PUBLISH 数据包时,它会相应地处理发布消息,并使用确认 PUBLISH 数据包的PUBREC 数据包回复发送方。如果发送方未从接收方获取 PUBREC 数据包,它将再次发送带有重复 (DUP) 标志的 PUBLISH 数据包,直到收到确认。
(2)、接收方收到
PUBREC
数据包,发送方就可以安全地丢弃初始PUBLISH
数据包。(3)、发送方存储来自接收方的
PUBREC
数据包,并使用PUBREL
数据包进行响应(4)、接收方获得
PUBREL
数据包后,它可以丢弃所有存储的状态并使用PUBCOMP
数据包进行应答
如果数据包在此过程中丢失,发件人负责在合理的时间内重新传输消息
-
-
retainFlag:消息是否由代理保存为指定主题的最后一个已知正确值。当新客户端订阅某个主题时,它们会收到保留在该主题上的最后一条消息
保留的消息可帮助新订阅的客户端在订阅主题后立即获取状态更新,而不需要等到客户端下一次推送消息。保留的消息消除了等待发布客户端发送下一个更新的时间
-
payload:消息的实际内容包含图像,任何编码的文本,加密数据以及二进制的数据
-
dupFlag:标志指示邮件是重复的,这个重复发送跟QoS大于0的时候有关
客户端将消息发送到 MQTT代理进行发布时,代理将读取消息,确认消息(根据 QoS 级别),并处理消息。代理的处理包括确定哪些客户端订阅了主题并向它们发送消息
MQTT订阅机制MQTT客户端发送了消息。如果没人接收消息将毫无意义,所以也会有客户端来订阅消息,客户端会向 MQTT 代理发送一条 SUBSCRIBE消息
Subscribe消息包含以下内容:
-
packetID:数据包标识符在消息在客户端和代理之间流动时唯一标识消息。数据包标识符仅与大于零的 QoS 级别相关
-
订阅列表:一个 SUBSCRIBE 消息可以包含一个客户端的多个订阅,每个订阅都由一个主题和一个 QoS 级别组成
为了确认每个订阅,代理向客户端发送 SUBACK确认消息
SUBACK消息包含以下内容:
- packetID:数据包标识符在消息在客户端和代理之间流动时唯一标识消息
- rerurnCode:每订阅一个主题发送一个返回代码
客户端成功发送 SUBSCRIBE 消息并接收 SUBACK 消息后,它将获取与 SUBSCRIBE 消息包含的订阅中的主题匹配的每个已发布消息
MQTT取消订阅消息可以订阅那么也可以取消订阅,会删除代理上客户端的现有预订
Unsubscribe消息包含以下内容:
- packetID:数据包标识符在消息在客户端和代理之间流动时唯一标识消息
- List of Topic(主题列表):主题列表可以包含多个客户要取消订阅的主题。只需发送主题
要确认取消订阅,代理会向客户端发送 Unsuback确认消息
Unsuback消息包含以下内容:
- packetID:数据包标识符在消息在客户端和代理之间流动时唯一标识消息,这与取消订阅消息中的数据包标识符相同
客户端需要连接到代理并且订阅主题,但是客户端和代理之间如果连接在非持久会话中中断,那么主题会丢失,需要在重新连接时再次订阅。为了避免这个问题可以使用持久会话功能,它主要是在代理中存储了:
- 客户端的会话以及订阅
- QOS为1和2中没有确认的消息
- 客户端在断联时候错过的消息
- 客户端接收到的所有尚未完全确认的 QoS 2 消息
为了开启代理上的持久会话,在MQTT客户端连接到代理服务器的时候有个cleanSession
字段设置为false表示开启持久会话,所有信息和消息都将保留,代理存储会话,直到客户端重新联机并收到消息,如果长时间不联机,那么会消耗内存
客户端上的持久会话,当客户端请求服务器保存会话数据时,客户端负责存储以下信息:
- QoS 1 或 2 流中尚未由代理确认的所有消息
- 从代理接收到的所有尚未完全确认的 QoS 2 消息
官方文档:产品概览 | EMQX 文档
EMQX (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。
Erlang/OTP是出色的软实时 (Soft-Realtime)、低延时 (Low-Latency)、分布式 (Distributed)的语言平台。
MQTT 是轻量的 (Lightweight)、发布订阅模式 (PubSub) 的物联网消息协议。
EMQX 设计目标是实现高可靠,并支持承载海量物联网终端的 MQTT 连接,支持在海量物联网设备间低延时消息路由:
- 稳定承载大规模的 MQTT 客户端连接,单服务器节点支持 200 万连接。
- 分布式节点集群,快速低延时的消息路由。
- 消息服务器内扩展,支持定制多种认证方式、高效存储消息到后端数据库。
- 完整物联网协议支持,MQTT、MQTT-SN、CoAP、LwM2M、WebSocket 或私有协议支持
使用Docker安装EMQX
1、获取Docker镜像
docker pull emqx/emqx:4.4.3
2、启动Docker
docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:4.4.3
3、访问Web管理控制台
控制台地址: http://XXXXXX:18083,默认用户: admin,密码:public
搭建MQTT消息推送客户端各个服务端口说明:
1883:MQTT 协议端口
8883:MQTT/SSL 端口
8083:MQTT/WebSocket 端口
8080:HTTP API 端口
18083:Dashboard 管理控制台端口
引入相关依赖包
<dependencies>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
<optional>true</optional>
</dependency>
</dependencies>
MQTT客户端
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;
/**
* 消息推送客户端
*
* @author yanglingcong
*/
@Slf4j
@Component
public class MyMqttClient {
private final static int QOS_1 = 1;
private final static String USER_NAME = "ylc";
private final static int PASSWORLD = 123456;
private final static int KEEP_ALIVE = 60;
/**
* 连接地址
* */
public static final String HOST = "tcp://XXXXX:1883";
/**
* 订阅主题
* */
public static final String TOPIC = "deviceName/";
//客户端唯一ID
private static final String clientid = "pubClient";
public static void main(String[] args) {
MqttClient mqtt = createMqtt();
publishMessage("Hello", TOPIC, mqtt);
}
public static MqttClient createMqtt() {
MqttClient client = null;
MqttConnectOptions connectOptions = new MqttConnectOptions();
//断开之后自动重联
connectOptions.setAutomaticReconnect(true);
//设置会话心跳时间 代理和客户端在不发送消息的情况下可以忍受的最长时间
connectOptions.setKeepAliveInterval(KEEP_ALIVE);
//不建立持久会话
connectOptions.setCleanSession(true);
//用户名
connectOptions.setUserName(USER_NAME);
//密码
connectOptions.setPassword(String.valueOf(PASSWORLD).toCharArray());
try {
client = new MqttClient(HOST, clientid, new MemoryPersistence());
//MQTT连接
client.connect(connectOptions);
//消息回调
client.setCallback(new MqttCallBackHandle(client));
} catch (MqttException e) {
log.warn("MQTT消息异常{}", e);
}
return client;
}
/**
* 消息推送
*
* @param message 消息内容
* @param topic 发送的主题
* @author yanglingcong
* @date 2022/4/18 21:25
*/
public static void publishMessage(String message, String topic, MqttClient mqttClient) {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(QOS_1);
//保留在该主题上的最后一条消息
//mqttMessage.setRetained(true);
mqttMessage.setPayload(message.getBytes());
try {
mqttClient.publish(topic, mqttMessage);
log.info("MQTT消息发送成功:{}", message);
} catch (MqttException e) {
log.warn("MQTT消息推送失败");
e.printStackTrace();
}
}
}
MQTT回调接口
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.MqttClient;
/**
* MQTT消息回调方法
*/
@Slf4j
public class MqttCallBackHandle implements MqttCallbackExtended {
private MqttClient client;
public MqttCallBackHandle(MqttClient client){
this.client=client;
}
//订阅主题
private final static String CMD_TOP_FORMAT = "deviceName/";
/**
* 连接成功后调用该方法
* @param reconnect
* @param serverURI
*/
@Override
public void connectComplete(boolean reconnect, String serverURI) {
try {
//重新订阅主题
client.subscribe(CMD_TOP_FORMAT);
log.info("=====MQTT重联成功=====");
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 断开连接后回调方法
* @param throwable
*/
@Override
public void connectionLost(Throwable throwable) {
log.info("=====MQTT连接断开=====");
}
/**
* 接收订阅到的消息
* @param topic
* @param message
* @throws Exception
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
log.info("=====MQTT消息订阅成功=====");
log.info("主题:{},内容:{}",topic,message);
}
/**
* 发送完成
* @param iMqttDeliveryToken
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.info("=====MQTT消息发送完毕=====");
}
}
搭建MQTT消息订阅客户端
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;
/**
* 消息订阅客户端
*
* @author yanglingcong*/
@Component
@Slf4j
public class MyMqttSubClient {
private final static int QOS_1 = 1;
private final static String USER_NAME = "ylc";
private final static int PASSWORLD = 123456;
private final static int KEEP_ALIVE = 60;
//连接地址
public static final String HOST = "tcp://xxxx:1883";
// 订阅主题
public static final String TOPIC = "deviceName/";
//客户端唯一ID
private static final String clientid = "subClient";
public static void main(String[] args) {
subscribe();
}
public MyMqttSubClient() throws MqttException {
//订阅
subscribe();
}
public static void subscribe() {
MqttClient client=null;
MqttConnectOptions connectOptions=new MqttConnectOptions();
//断开之后自动重联
connectOptions.setAutomaticReconnect(true);
//设置会话心跳时间 代理和客户端在不发送消息的情况下可以忍受的最长时间
connectOptions.setKeepAliveInterval(KEEP_ALIVE);
//不建立持久会话
connectOptions.setCleanSession(true);
//用户名
connectOptions.setUserName(USER_NAME);
//密码
connectOptions.setPassword(String.valueOf(PASSWORLD).toCharArray());
try {
client=new MqttClient(HOST,clientid, new MemoryPersistence());
//MQTT连接
client.connect(connectOptions);
} catch (MqttException e) {
e.printStackTrace();
}
//消息回调
client.setCallback(new MqttCallBackHandle(client));
try {
client.subscribe(TOPIC,QOS_1);
} catch (MqttException e) {
log.warn("MQTT消息订阅异常{}",e);
e.printStackTrace();
}
}
}
环境测试
1、MQTT客户端pubClient
向服务器推送消息
2、MQTT客户端subClient
从服务器订阅消息
3、踢除客户端,会自动重联,因为设置了MQTT断开自动重联
五、MQTT常见问题 MQTT消息持久化如果 cleanSession 设为true,一旦掉线客户端不会存储任何内容,并清除以前任何持久会话中的所有信息
如果 cleanSession 设为false,重连后可以接收之前订阅主题的消息,还有离线时期未接收的消息
MQTT订阅恢复机制MQTT掉线设置自动重联之后,无法再进行订阅。MqttCallbackExtended接口有一个connectComplete方法用于重新订阅主题
MQTT和消息队列的区别-
消息队列可以存储消息,直到被消费为止
-
消息队列只能被消费处理一次,不像MQTT订阅的人都可以收到消息
-
消息队列需要先创建队列,MQTT可以使用时候创建
-
MQTT是一种通信协议,MQ是消息通道
-
MQTT面向海量设备连接、MQ是面向海量数据