ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

MQTT3.1.1协议阅读笔记1

2021-08-05 19:31:06  阅读:299  来源: 互联网

标签:MQTT3.1 协议 报文 笔记 报头 MQTT client new import


本文主要是记录阅读 MQTT3.1.1协议中文版 时的心得感悟。

环境信息

  1. 使用Docker运行emqx,作为MQTT的服务端
  2. 使用mqtt-spy.jar作为MQTT的客户端
  3. 使用Paho写一个简单的Java-MQTT客户端
  4. 使用WireShark进行协议抓包

MQTT 简介

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于 TCP/IP 协议之上,由IBM在1999年发布。。

一个 MQTT 控制报文包含三个部分:

组成部分 长度
固定报头 2-5个字节 存在于所有MQTT控制包
可变报头 存在于某些MQTT控制包
载荷 存在于某些MQTT控制包
  • 我们借助 MQTT 协议发送的消息内容保存在载荷中

1.固定报头

固定报头由两部分组成:控制包类型和剩余长度

  • 控制包类型目前有 14 种;
  • 剩余长度表示的是“可变报头+载荷”的总长度

如上图所示,这是一条控制包类型为 CONNECT 的 MQTT 报文,固定报头的中 剩余长度用16进制表示为 0x1e,用10进制表示为 30。

从 1e 的后一个字节 00 到末尾刚好是 30 个字节。

1.1 剩余长度与控制包最大长度256M

剩余长度使用了一种可变长度的结构来编码,这种结构使用单一字节表示0-127的值。大于127的值如下处理。每个字节的低7位用来编码数据,最高位用来表示是否还有后续字节。剩余长度最多可以用四个字节来表示。

用n个字节表示剩余长度 剩余长度范围起始值 剩余长度范围结束值
1 0 (0x00) 127 (0x7F)
2 128 (0x80, 0x01) 16 383 (0xFF, 0x7F)
3 16 384 (0x80, 0x80, 0x01) 2 097 151 (0xFF, 0xFF, 0x7F)
4 2 097 152 (0x80, 0x80, 0x80, 0x01) 268 435 455 (0xFF, 0xFF, 0xFF, 0x7F)

这将允许应用发送可变报头和载荷总长度为255M大小的控制包。这个数字用16进制表示为:0xFF,0xFF,0xFF,0x7F。

换句话说,这将允许应用发送最多256M大小的控制包。

2.可变报头

以 CONNECT 报文的可变报头为例,主要包含协议名称(MQTT)和协议版本号(v3.1.1对应4);

2.1 MSB 和 LSB

至于 Length MSB(Most Significant Bit,最高有效位) 和 Length LSB (Last/Least Significant Bit,译作最低有效位),

把 MSB 和 LSB 用大端字节/网络字节序来读取,读取的值可以表示协议名称的长度。

以下是 Java 写成的 Demo:

import java.io.*;
import java.util.Arrays;

public class Utf8Characters {

    public static void main(String[] args) throws IOException {
        // 模拟写入
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream dataOut = new DataOutputStream(baos);

        dataOut.writeUTF("MQTT");

        byte[] bytes = baos.toByteArray();
        System.out.println(Arrays.toString(bytes)); // 打印 [0, 4, 77, 81, 84, 84]

        // 模拟读取
        ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
        DataInputStream dataIn = new DataInputStream(bais);

        int len = dataIn.readUnsignedShort(); // 2 bytes
        byte[] decodedString = new byte[len]; // 4 bytes
        dataIn.read(decodedString);
        String target = new String(decodedString, "UTF-8");
        System.out.println(target); // 打印 MQTT

        // 重置一下,重新读取
        dataIn.reset();
        // 等同于
        String result = dataIn.readUTF();
        System.out.println(result); // 打印 MQTT

    }
}

3. MQTT的特别之处

我们在学习TCP/IP协议的时候,就知道 ACK 这个概念,其实许多构建在TCP/IP协议之上的应用层协议也都会使用 XXXACK 包来表示已经成功接收 XXX 信息。
MQTT也不能“免俗”:

  • 连接报文 CONNECT 对应连接确认报文 CONNACK;
  • 订阅报文 SUBSCRIBE 对应订阅确认报文 SUBACK;
  • 取消订阅报文 UNSUBSCRIBE 对应取消订阅确认报文 UNSUBACK;
  • 发布报文 PUBLISH 对应发布确认报文 PUBACK。

还有就是名字中没有使用 ACK,但是实际上也是“一问一答”式的 PINGREQ 和 PINGRESP。但是 MQTT 的控制类型中还是有两处“怪异之处”

  • 唯独 DISCONNECT 没有对应的确认报文;
  • PUBLISH 除了有 PUBACK 之外,还有 PUBREC,PUBREL,PUBCOMP

3.1 遗言/遗嘱Will

对于一般IoT设备而言,就是一个大循环while不断接收消息,不存在正常退出的逻辑,一般都是断电断网导致的异常退出。DISCONNECT 并不常用,也不用确认。

但是,如果客户端正常发出了 DISCONNECT 报文,那么服务端收到 DISCONNECT 后必须丢弃所有和当前连接有关的Will Message,不发布。

我们通常都有判断IoT设备是否在线的需求,使用遗言机制就很好实现。

  • 遗言/遗嘱是CONNECT类型报文中,伴随客户端连接服务端的请求一并发出的;
  • 可变报头中包含 Will Flag,Will QoS,Will Retain;其中 QoS 和 Retain 效果同 publish 报文中的 QoS 和 Retain;
  • 如果可变报头连接标识位 Will Flag 等于1,那么载荷中将包含 Will Topic 和 Will Message 字段;

模拟遗嘱发送和接收

由于 mqtt-spy.jar 的无论是点击x关闭,还是杀死进程,都是正常的Disconnect退出,所以只好写一个 Java 客户端来模拟异常退出的场景。

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.UUID;

public class Main {

    public static void main(String[] args) throws MqttException {
        String clientId = Arrays.stream(args).findFirst().orElse(UUID.randomUUID().toString());
        MqttClientPersistence persistence = new MemoryPersistence();
        MqttClient client = new MqttClient("tcp://localhost:1883", clientId, persistence);

        MqttConnectOptions options = new MqttConnectOptions();
        // 2 表示 EXACTLY_ONCE
        options.setWill("DeviceStatus", ("{\"device\":\""+ clientId + "\",\"state\":\"offline\"}").getBytes(StandardCharsets.UTF_8),
                2, false);
        client.connect(options);
    }
}

运行这个程序,然后再用 mqtt-spy.jar 模拟一个WEB服务器上的MQTT客户端:

Connections -> New Connection 打开如下图所示的页面,输入Client ID为web,其他都默认,然后 点击Open Connection

在 Subscriptions and received messages 这一栏点击 New,弹出如下图所示对话框,输入主题DeviceStatus,然后点击 Subscribe

然后,我们就可以去关闭 Java 的 MQTT客户端了。接着,就收到了遗言:

然后,我还找到了 WireShark 抓取的Java的MQTT客户端发出的Connect报文:

3.1.1 WILL MESSAGE长度限制65535个字节

从理论上来说,MQTT 中的字符串符合以下形式:

用两个字节表示内容长度,因此内容长度可以是0到65535个字节。

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.UUID;
import java.util.stream.IntStream;

public class Main {

    public static void main(String[] args) throws MqttException, IOException {
        String clientId = Arrays.stream(args).findFirst().orElse(UUID.randomUUID().toString());
        MqttClientPersistence persistence = new MemoryPersistence();
        MqttClient client = new MqttClient("tcp://localhost:1883", clientId, persistence);

        MqttConnectOptions options = new MqttConnectOptions();
        StringBuilder sb = new StringBuilder();
        IntStream.range(0, 65536).forEach(i -> {
                int result = i % 10;
                sb.append(result);
        });
        // 2 表示 EXACTLY_ONCE
        byte[] payload = sb.toString().getBytes(StandardCharsets.UTF_8);
        options.setWill("DeviceStatus", payload,
                2, false);
        client.connect(options);
    }
}

如上面这段代码模拟了 Will Message 为 65536 个字节,服务器直接断开了客户端的连接:

从图中可以看出,Will Message 超长了,导致长度为0。具体可以看这段代码:org.eclipse.paho.client.mqttv3.internal.wire.MqttConnect#getPayload

if (willMessage != null) {
  encodeUTF8(dos, willDestination);
  dos.writeShort(willMessage.getPayload().length); // 这段代码再跟进去看,(v >>> 8) & 0xFF 计算等于 0,(v >>> 0) & 0xFF 计算也等于 0
  dos.write(willMessage.getPayload());
}

3.1.2 Will Retain只保持最新一条

RETAIN(保持)
1:表示发送的消息需要一直持久保存(不受服务器重启影响),不但要发送给当前的订阅者,并且以后新来的订阅了此Topic name的订阅者会马上得到推送。
备注:新来乍到的订阅者,只会取出最新的一个RETAIN flag = 1的消息推送。

※ 实验如下:
修改 3.1 中 Main 的代码:

// retain 由 false 改为 true
options.setWill("DeviceStatus", ("{\"device\":\""+ clientId + "\",\"state\":\"offline\"}").getBytes(StandardCharsets.UTF_8),
                2, true);

然后,在 Run Configuration 中拷贝三份 Main,并分别命名为 iot_1,iot_2,iot_3,并且 Program arguments 也分别为 iot_1,iot_2,iot_3:

分别运行 iot_1,iot_2,iot_3,然后再依次结束他们。

然后,再启动 mqtt-spy.jar,并订阅主题 DeviceStatus:

如图所示,我们观察到新订阅者只获取到主题中的最新一条消息!

3.2 QoS

这个又有很多内容,还是另开一篇。

参考文档

MQTT协议笔记之头部信息 阅读

这篇文章主要解答了我对 Length MSB 和 Length LSB 的疑惑

Java MQTT 客户端之 Paho 阅读

如果你对 Java 实现 MQTT 客户端感兴趣,可以读一下这篇

标签:MQTT3.1,协议,报文,笔记,报头,MQTT,client,new,import
来源: https://www.cnblogs.com/kendoziyu/p/15100021.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有