1. 物联网消息协议MQTT
1.1. MQTT简介
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议。
客户端
服务端
一个使用MQTT协议的应用程序或者设备,它总是建立到服务器的网络连接。客户端可以:
也称为"消息代理"(Broker),可以是一个应用程序或一台设备。它是位于消息发布者和订阅者之间,它可以:
- (1)发布其他客户端可能会订阅的信息;
- (1)接受来自客户的网络连接;
- (2)订阅其它客户端发布的消息;
- (2)接受客户发布的应用信息;
- (3)退订或删除应用程序的消息;
- (3)处理来自客户端的订阅和退订请求;
- (4)断开与服务器连接。
- (4)向订阅的客户转发应用程序消息。
设计原则:
①精简,不添加可有可无的功能;
②发布/订阅(Pub/Sub)模式,方便消息在传感器之间传递,解耦Client/Server模式,带来的好处在于不必预先知道对方的存在(ip/port),不必同时运行;
③允许用户动态创建主题(不需要预先创建主题),零运维成本;
④把传输量降到最低以提高传输效率;
⑤把低带宽、高延迟、不稳定的网络等因素考虑在内;
⑥支持连续的会话保持和控制(心跳);
⑦理解客户端计算能力可能很低;
⑧提供服务质量( quality of service level:QoS)管理
⑨不强求传输数据的类型与格式,保持灵活性(指的是应用层业务数据)。
MQTT应用领域:
①物联网M2M通信,物联网大数据采集
②Android消息推送,WEB消息推送
③移动即时消息,例如Facebook Messenger
②智能硬件、智能家居、智能电器
⑤车联网通信,电动车站桩采集
⑥智慧城市、远程医疗、远程教育
⑦电力、石油与能源等行业市场
1.2 MQTT协议相关概念
MQTT协议构成
MQTT协议中的方法
- (1)CONNECT:客户端连接到服务器
- (9)SUBACK:订阅确认
- (2)CONNACK:连接确认
- (10)UNSUBSCRIBE:取消订阅
- (3)PUBLISH:发布消息
- (11)UNSUBACK:取消订阅确认
- (4)PUBACK:发布确认
- (12)PINGREQ:客户端发送心跳
- (5)PUBREC:发布的消息已接收
- (13)PINGRESP:服务端心跳响应
- (6)PUBREL:发布的消息已释放
- (14)DISCONNECT:断开连接
- (7)PUBCOMP:发布完成
- (15)AUTH:认证
- (8)SUBSCRIBE:订阅请求
1.3. 消息服务质量QOS
MQTT 协议中规定了消息服务质量(Quality of Service),它保证了在不同的网络环境下消息传递的可靠性,QoS 的设计是 MQTT 协议里的重点。
MQTT 设计了 3 个 QoS 等级。
- QoS 0:消息最多传递一次。
- QoS 1:消息传递至少 1 次。
- QoS 2:消息仅传送一次。
QoS0:"至多一次",消息发布完全依赖底层TCP/IP网络。会发生消息丢失。一个消息不会被接收端应答,也不会被发送者存储并再发送。这个也被叫做“即发即弃”
QoS1:"至少一次",确保消息到达,但消息重复可能会发生。发送者将会存储发送的信息直到发送者收到一次来自接收者的PUBACK格式的应答。
QoS2:"只有一次",确保消息到达一次。如果接收端接收到了一个QoS为2的PUBLISH消息,它将相应地处理PUBLISH消息,并通过PUBREC消息向发送方确认。
PUBLISH:发布消息 PUBREC:发布收到 PUBREL:发布释放 PUBCOMP:发布完成
发送订阅不同QOS情况下是如何生效的
1.4. topic通配符匹配规则
2. 物联网级消息中间件EMQ
2.1. EMQX 简介
EMQ X Broker 是基于高并发的 Erlang/OTP 语言平台开发,支持百万级连接和分布式集群架构,发布订阅模式的开源 MQTT 消息服务器。
EMQ官网:https://www.emqx.cn/
为什么选择EMQ X ?从支持 MQTT5.0、稳定性、扩展性、集群能力等方面考虑,EMQX 的表现应该是最好的。
EMQX的特点
- EMQ X 目前为开源社区中最流行的 MQTT 消息中间件;
- EMQ X 是开源社区中第一个支持 5.0协议规范的消息服务器,并且完全兼容 MQTT V3.1 和 V3.1.1 协议。
- 除了 MQTT 协议之外,EMQ X 还支持MQTT-SN、CoAP、 LwM2M、LoRaWAN 和 WebSocket 等物联网协议
- 单机支持百万连接,集群支持千万级连接;毫秒级消息转发。
- 易于安装和使用;
- 中国本地的技术支持服务;
- 扩展模块和插件,EMQ X 提供了灵活的扩展机制,支持企业的一些定制场景;
- 桥接
- 共享订阅
2.2. 环境搭建与配置
Docker 运行与安装
拉取emqx镜像
[root@docker emqx]# docker pull emqx/emqx:v4.1.0
创建emqx容器
[root@docker emqx]# docker run -tid --name emqx -p 1883:1883 -p 8083:8083 -p 8081:8081 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx:v4.1.0
EMQ X 目录结构
2.3. EMQDashboard
EMQ X 提供了 Dashboard 以方便用户管理设备与监控相关指标。通过 Dashboard可以查看服务器基本信息、负载情况和统计数据,可以查看某个客户端的连接状态等信息甚至断开其连接,也可以动态加载和卸载指定插件。
访问地址 http://192.168.150.102:18083 来查看Dashboard,**默认用户名是 admin,密码是 public*
2.4. 客户端调试工具MQTTX
MQTT X 是 EMQ 开源的一款优雅的跨平台 MQTT 5.0 桌面客户端,它支持 macOS, Linux, Windows。
MQTT X 的 UI 采用了聊天界面形式,简化了页面操作逻辑,用户可以快速创建连接,允许保存多个客户端,方便用户快速测试 MQTT/MQTTS 连接,及 MQTT 消息的订阅和发布。
2.5. 延迟消息
EMQ X 的延迟发布功能可以实现按照用户配置的时间间隔延迟发布 PUBLISH 报文的功能。模块开启emqx_mod_delayed
延迟发布主题的具体格式如下:
$delayed/{DelayInterval}/{TopicName}
- 使用delayed 作为主题前缀的消息都将被视为需要延迟发布的消息。
- {DelayInterval}: 指定该 MQTT 消息延迟发布的时间间隔,单位是秒,允许的最大间隔是 4294967 秒。
- {TopicName}: MQTT 消息的主题名称。
1.在Websocket上订阅主题:t2/a
2.在Websocket上发布消息主题:topic: $delayed/10/t2/a
观察运行效果
2.6. 共享订阅
不带群组的共享订阅
格式:
$queue/{TopicName}
带群组的共享订阅
格式:
$share/<group-name>/{TopicName}
3.Eclipse Paho
3.1. Eclipse Paho是什么
Eclipse paho 是EMQx官方推荐的实现了mqtt协议java客户端。
其关系类似于Mysql于JDBC,我们的项目代码要连接数据库需要用到JDBC,而我们的项目需要连接EMQX需要用到Eclipse Paho ,并且它提供了基础的消息收发。
3.2. Eclipse Paho技术调研
集成Eclipse paho
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
发布消息到EMQ
(1)搭建基本的springBoot程序。
(2)编写controller,新增发布消息方法。
@GetMapping("/publish")
public void publish() throws MqttException {
MqttClientPersistence persistence = new MemoryPersistence();;//内存持久化
MqttClient client = new MqttClient("tcp://192.168.200.128:1883", "abc", persistence);
//连接选项中定义用户名密码和其它配置
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);//参数为true表示清除缓存,也就是非持久化订阅者,这个时候只要参数设为true,一定是非持久化订阅者。而参数设为false时,表示服务器保留客户端的连接记录
options.setAutomaticReconnect(true);//是否自动重连
options.setConnectionTimeout(30);//连接超时时间 秒
options.setKeepAliveInterval(10);//连接保持检查周期 秒
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //版本
client.connect(options);//连接
client.publish("topic", "发送内容".getBytes(), 2, false);
}
订阅消息
在controller新增方法,订阅消息
@GetMapping("/subscribe")
public void subscribe() throws MqttException {
MqttClientPersistence persistence = new MemoryPersistence();;//内存持久化
MqttClient client = new MqttClient("tcp://192.168.200.128:1883", "abc", persistence);
//连接选项中定义用户名密码和其它配置
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);//参数为true表示清除缓存,也就是非持久化订阅者,这个时候只要参数设为true,一定是非持久化订阅者。而参数设为false时,表示服务器保留客户端的连接记录
options.setAutomaticReconnect(true);//是否自动重连
options.setConnectionTimeout(30);//连接超时时间 秒
options.setKeepAliveInterval(10);//连接保持检查周期 秒
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //版本
client.setCallback(new MqttCallbackExtended() {
@Override
public void connectionLost(Throwable throwable) {
System.out.println("连接丢失!");
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println( "接收到消息 topic:" +s+" id:"+mqttMessage.getId() +" message:"+ mqttMessage.toString());
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
@Override
public void connectComplete(boolean b, String s) {
System.out.println("连接成功!");
}
});
client.connect(options);//连接
client.subscribe("test"); //订阅主题
}