当前位置 : 主页 > 编程语言 > java >

消息队列遥测传输协议

来源:互联网 收集:自由互联 发布时间:2023-12-28
1. 物联网消息协议MQTT 1.1. MQTT简介 MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议。 客户端 服务端 一个使


1. 物联网消息协议MQTT

1.1. MQTT简介

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议。

消息队列遥测传输协议_客户端

客户端

服务端

一个使用MQTT协议的应用程序或者设备,它总是建立到服务器的网络连接。客户端可以:

也称为"消息代理"(Broker),可以是一个应用程序或一台设备。它是位于消息发布者和订阅者之间,它可以:

- (1)发布其他客户端可能会订阅的信息;

- (1)接受来自客户的网络连接;

- (2)订阅其它客户端发布的消息;

- (2)接受客户发布的应用信息;

- (3)退订或删除应用程序的消息;

- (3)处理来自客户端的订阅和退订请求;

- (4)断开与服务器连接。

- (4)向订阅的客户转发应用程序消息。

设计原则:

①精简,不添加可有可无的功能;

②发布/订阅(Pub/Sub)模式,方便消息在传感器之间传递,解耦Client/Server模式,带来的好处在于不必预先知道对方的存在(ip/port),不必同时运行;

③允许用户动态创建主题(不需要预先创建主题),零运维成本;

④把传输量降到最低以提高传输效率;

⑤把低带宽、高延迟、不稳定的网络等因素考虑在内;

⑥支持连续的会话保持和控制(心跳);

⑦理解客户端计算能力可能很低;

⑧提供服务质量( quality of service level:QoS)管理

⑨不强求传输数据的类型与格式,保持灵活性(指的是应用层业务数据)。

MQTT应用领域:

①物联网M2M通信,物联网大数据采集

②Android消息推送,WEB消息推送

③移动即时消息,例如Facebook Messenger

②智能硬件、智能家居、智能电器

⑤车联网通信,电动车站桩采集

⑥智慧城市、远程医疗、远程教育

⑦电力、石油与能源等行业市场

1.2 MQTT协议相关概念

MQTT协议构成

消息队列遥测传输协议_服务器_02

MQTT协议中的方法

- (1)CONNECT:客户端连接到服务器

- (9)SUBACK:订阅确认

- (2)CONNACK:连接确认

- (10)UNSUBSCRIBE:取消订阅

- (3)PUBLISH:发布消息

- (11)UNSUBACK:取消订阅确认

- (4)PUBACK:发布确认

- (12)PINGREQ:客户端发送心跳

- (5)PUBREC:发布的消息已接收

- (13)PINGRESP:服务端心跳响应

- (6)PUBREL:发布的消息已释放

- (14)DISCONNECT:断开连接

- (7)PUBCOMP:发布完成

- (15)AUTH:认证

- (8)SUBSCRIBE:订阅请求


1.3. 消息服务质量QOS

MQTT 协议中规定了消息服务质量(Quality of Service),它保证了在不同的网络环境下消息传递的可靠性,QoS 的设计是 MQTT 协议里的重点。

MQTT 设计了 3 个 QoS 等级。

- QoS 0:消息最多传递一次。

- QoS 1:消息传递至少 1 次。

- QoS 2:消息仅传送一次。

QoS0:"至多一次",消息发布完全依赖底层TCP/IP网络。会发生消息丢失。一个消息不会被接收端应答,也不会被发送者存储并再发送。这个也被叫做“即发即弃”

消息队列遥测传输协议_服务器_03

QoS1:"至少一次",确保消息到达,但消息重复可能会发生。发送者将会存储发送的信息直到发送者收到一次来自接收者的PUBACK格式的应答。

消息队列遥测传输协议_持久化_04

QoS2:"只有一次",确保消息到达一次。如果接收端接收到了一个QoS为2的PUBLISH消息,它将相应地处理PUBLISH消息,并通过PUBREC消息向发送方确认。

PUBLISH:发布消息 PUBREC:发布收到 PUBREL:发布释放 PUBCOMP:发布完成

消息队列遥测传输协议_服务器_05

发送订阅不同QOS情况下是如何生效的

消息队列遥测传输协议_客户端_06

1.4. topic通配符匹配规则

消息队列遥测传输协议_持久化_07

消息队列遥测传输协议_服务器_08

消息队列遥测传输协议_客户端_09

消息队列遥测传输协议_持久化_10

2. 物联网级消息中间件EMQ

2.1. EMQX 简介

EMQ X Broker 是基于高并发的 Erlang/OTP 语言平台开发,支持百万级连接和分布式集群架构,发布订阅模式的开源 MQTT 消息服务器。

EMQ官网:https://www.emqx.cn/

为什么选择EMQ X ?从支持 MQTT5.0、稳定性、扩展性、集群能力等方面考虑,EMQX 的表现应该是最好的。

消息队列遥测传输协议_持久化_11

EMQX的特点

  • EMQ X 目前为开源社区中最流行的 MQTT 消息中间件;
  • EMQ X 是开源社区中第一个支持 5.0协议规范的消息服务器,并且完全兼容 MQTT V3.1 和 V3.1.1 协议。
  • 除了 MQTT 协议之外,EMQ X 还支持MQTT-SN、CoAP、 LwM2M、LoRaWAN 和 WebSocket 等物联网协议
  • 单机支持百万连接,集群支持千万级连接;毫秒级消息转发。
  • 易于安装和使用;
  • 中国本地的技术支持服务;
  • 扩展模块和插件,EMQ X 提供了灵活的扩展机制,支持企业的一些定制场景;
  • 桥接
  • 共享订阅

2.2. 环境搭建与配置

Docker 运行与安装

拉取emqx镜像

[root@docker emqx]# docker pull emqx/emqx:v4.1.0

创建emqx容器

[root@docker emqx]# docker run -tid --name emqx -p 1883:1883 -p 8083:8083 -p 8081:8081 -p 8883:8883 -p 8084:8084 -p 18083:18083  emqx/emqx:v4.1.0

EMQ X 目录结构

2.3. EMQDashboard

EMQ X 提供了 Dashboard 以方便用户管理设备与监控相关指标。通过 Dashboard可以查看服务器基本信息、负载情况和统计数据,可以查看某个客户端的连接状态等信息甚至断开其连接,也可以动态加载和卸载指定插件。

访问地址 http://192.168.150.102:18083 来查看Dashboard,**默认用户名是 admin,密码是 public*

消息队列遥测传输协议_客户端_12

消息队列遥测传输协议_客户端_13

消息队列遥测传输协议_持久化_14

2.4. 客户端调试工具MQTTX

MQTT X 是 EMQ 开源的一款优雅的跨平台 MQTT 5.0 桌面客户端,它支持 macOS, Linux, Windows。

MQTT X 的 UI 采用了聊天界面形式,简化了页面操作逻辑,用户可以快速创建连接,允许保存多个客户端,方便用户快速测试 MQTT/MQTTS 连接,及 MQTT 消息的订阅和发布。

消息队列遥测传输协议_服务器_15

2.5. 延迟消息

EMQ X 的延迟发布功能可以实现按照用户配置的时间间隔延迟发布 PUBLISH 报文的功能。模块开启emqx_mod_delayed

延迟发布主题的具体格式如下:

$delayed/{DelayInterval}/{TopicName}

- 使用delayed 作为主题前缀的消息都将被视为需要延迟发布的消息。

- {DelayInterval}: 指定该 MQTT 消息延迟发布的时间间隔,单位是秒,允许的最大间隔是 4294967 秒。

- {TopicName}: MQTT 消息的主题名称。

1.在Websocket上订阅主题:t2/a

2.在Websocket上发布消息主题:topic:  $delayed/10/t2/a

观察运行效果

消息队列遥测传输协议_持久化_16

2.6. 共享订阅

不带群组的共享订阅

格式:

$queue/{TopicName}

消息队列遥测传输协议_服务器_17

消息队列遥测传输协议_客户端_18

带群组的共享订阅

格式:

$share/<group-name>/{TopicName}

消息队列遥测传输协议_服务器_19

3.Eclipse Paho

3.1. Eclipse Paho是什么

Eclipse paho 是EMQx官方推荐的实现了mqtt协议java客户端。

其关系类似于Mysql于JDBC,我们的项目代码要连接数据库需要用到JDBC,而我们的项目需要连接EMQX需要用到Eclipse Paho  ,并且它提供了基础的消息收发。

3.2. Eclipse Paho技术调研

集成Eclipse paho

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>

发布消息到EMQ

(1)搭建基本的springBoot程序。

(2)编写controller,新增发布消息方法。

@GetMapping("/publish")
public void publish() throws MqttException {

    MqttClientPersistence persistence = new MemoryPersistence();;//内存持久化
    MqttClient client = new MqttClient("tcp://192.168.200.128:1883", "abc", persistence);

    //连接选项中定义用户名密码和其它配置
    MqttConnectOptions options = new MqttConnectOptions();
    options.setCleanSession(true);//参数为true表示清除缓存,也就是非持久化订阅者,这个时候只要参数设为true,一定是非持久化订阅者。而参数设为false时,表示服务器保留客户端的连接记录
    options.setAutomaticReconnect(true);//是否自动重连
    options.setConnectionTimeout(30);//连接超时时间  秒
    options.setKeepAliveInterval(10);//连接保持检查周期  秒
    options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //版本
    
    client.connect(options);//连接
    client.publish("topic", "发送内容".getBytes(), 2, false);

}

订阅消息

在controller新增方法,订阅消息

@GetMapping("/subscribe")
public void subscribe() throws MqttException {

    MqttClientPersistence persistence = new MemoryPersistence();;//内存持久化
    MqttClient client = new MqttClient("tcp://192.168.200.128:1883", "abc", persistence);

    //连接选项中定义用户名密码和其它配置
    MqttConnectOptions options = new MqttConnectOptions();
    options.setCleanSession(true);//参数为true表示清除缓存,也就是非持久化订阅者,这个时候只要参数设为true,一定是非持久化订阅者。而参数设为false时,表示服务器保留客户端的连接记录
    options.setAutomaticReconnect(true);//是否自动重连
    options.setConnectionTimeout(30);//连接超时时间  秒
    options.setKeepAliveInterval(10);//连接保持检查周期  秒
    options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //版本

    client.setCallback(new MqttCallbackExtended() {
        @Override
        public void connectionLost(Throwable throwable) {
            System.out.println("连接丢失!");
        }

        @Override
        public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
            System.out.println( "接收到消息  topic:" +s+"  id:"+mqttMessage.getId() +" message:"+ mqttMessage.toString());
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

        }

        @Override
        public void connectComplete(boolean b, String s) {
            System.out.println("连接成功!");
        }
    });
    client.connect(options);//连接
    client.subscribe("test");  //订阅主题

}


 


【文章转自中东服务器 http://www.558idc.com/dibai.html欢迎留下您的宝贵建议】
上一篇:WebMvcConfigurerAdapter
下一篇:没有了
网友评论