一、简介
1 应用场景:物联网行业
2 MQTT协议的特点:
- 轻量级:MQTT协议占用系统资源少、报文数据较少
- 可靠性较强:提供了多种消息的质量等级
- 安全性较强:提供了传输层和套阶层的加密功能
- 双向通讯:客户端即可以发送数据、也可以从MQTT代理软件中获取数据
- 多语言支持
3 MQTT核心概念
MQTT客户端
任何运行MQTT客户端库(MQTT开发工具包SDK)的应用或设备都是MQTT客户端
MQTT Borker
实现了MQTT协议的代理软件
主题
存储在MQTT Borker中,就是普通的字符串,来对消息进行分类
二、MQTT 入门案例
1 EMQX
可以把EMQX当作消息中间件来理解
docker run -d --name emqx \
-p 1883:1883 -p 8083:8083 \
-p 8084:8084 -p 8883:8883 \
-p 18083:18083 \
-v emqx_data:/opt/emqx/data \
-v emqx_log:/opt/emqx/log \
emqx/emqx:5.8.3
访问控制台:ip:18083,默认用户名:admin 密码:public。首次登陆会提示修改密码,然后进入控制台

2 MQTTX
安装完成后,进入MQTTX客户端工具,新建一个客户端连接
连接成功后,在EMQX控制台中,可以查看到客户端的连接信息
MQTTX发布订阅的演示
新建两个连接、名称分别为pub、sub(名称随意)
sub 新建一个主题订阅
pub 给test_topic主题发送一条消息
查看sub收到的消息
docker pull emqx/mqttx-web
docker run -d --name mqttx-web -p 80:80 emqx/mqttx-web
使用方式和Desktop工具方法一致
三、MQTT 控制报文
1 简介
报文是网络中交换与传输的数据最小单元,通俗来讲就是站点一次性要发送的数据块。它包含了将要发送的完整数据信息,其长短不一致,长度不限且可变,MQTT 客户端和服务端通过交换控制报文来完成它们的工作,比如订阅主题和发布消息。
2 常见的控制报文
链接
CONNECT
:客户端向服务端发起连接CONNACK
:作为响应返回接连的结果DISCONNECT
:想要结束通信,或者遇到了一个必须终止连接的错误,发送该报文关闭网络连接AUTH
:是 MQTT 5.0 引入的全新的报文类型,它仅用于增强认证,为客户端和服务端提供更安全的身份验证。PINGREQ
、PINRERESP
:用于连接保活和探活,客户端定期发出 PINGREQ 报文向服务端表示自己仍然活跃,然后根据 PINGRESP 报文是否及时返回判断服务端是否活跃。(可以理解为心跳)
发布
PUBLISH
:发布消息PUBACK
:QoS1 时,服务端向发送端相应控制报文PUBREC
:Qos2 时,服务端向发送端相应报文,且释放packet IDPUBREL
:QoS2 时,发送端向服务器发送释放packet ID的报文PUBCOMP
:QoS2 时,服务端相应发送端释放packet ID的报文,且释放packet ID
订阅
SUBSCRIBE
:发起订阅UNSUBSCRIBE
:取消订阅SUBACK
:返回订阅结果UNSUBACK
:返回取消订阅结果
3 控制报文格式
在 MQTT 中,无论是什么类型的控制报文,它们都由**三个部分组成**。例如用于维持连接的 PINGRE0 报文就只有一个固定报头,用于传递应用消息的 PUBLISH 报文则完整地包含了这三个部分。
3.1 固定报头

报文类型:
占4个bit位,是一个无符号的整数,常见的报文类型:https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html
标识位
占4个bit位,不过到 QTT5.0 为止,只有 PUBLISH 报文的这四个比特位被赋予了明确的含义:
1、Bit 3:DUP,表示当前 PUBLISH 报文是否是一个重传的报文。
2、Bit 2,1:QoS,表示当前 PUBLISH 报文使用的服务质量等级。
3、Bit 0:Retain,表示当前 PUBLISH 报文是否是一个保留消息。
其他所有的报文中,这 4位都仍是保留的。
剩余长度
控制报文剩余字节数(可变报头+有效载荷)
3.2 可变报头
可变报头中的

3.3 有效载荷
有效载荷是用于实现对应报文的核心功能。例如:
- 在 PUBLISH 报文中,Payload 用于承载具体的应用消息内容,这也是 PUBLISH 报文最核心的功能。
- 在 SUBSCRIBE 报文中,Payload 包含了想要订阅的主题以及对应的订阅选项,这也是 SUBSCRIBE 报文最主要的工作。

4 QoS
0
:消息最多发送一次,可能会导致消息丢失。,因此对于接收端来说,不会担心接收到重复的消息。
1
:消息最少发送一次,消息可能会重复引用了。
发送端会缓存PUBLISH报文,报文中会带一个packetID
接收端在返回PUBACK时,会将packetID带上返回,此时发送端就会去删除缓存的PUBLISH报文。
如果发送端没有接收到带有此packetID的PUBACK,那么就会将此报文重新发送一次
2
:消息仅发送一次,消息不会丢失、也不会重复- 发送端缓存PUBLISH报文,且发送PUBLISH报文到接收端
- 接收端返回PUBREC报文
- 发送端删除缓存的PUBLISH报文
- 发送端缓存并发送PUBREL报文
- 接收端返回PUBCOMP,且释放packetID(标识packet ID为可用)
- 发送端删除PUBREL缓存报文,且释放packetID(标识packet ID为可用)
。而在QoS=1时,服务端在发送了PUBACK报文后,就会把packet ID释放,所以在Qos=1时,服务端无法识别到消息的唯一性
4.2 Qos常见取值的应用场景
值 | 场景 |
---|---|
0 | 传输一些高频且不那么重要的数据,比如传感器数据 |
1 | 传输相对重要的数据,比如下达关键指令 |
2 | 系统开销较大,通常应用在金融领域 |
四、主题
1 简介
主题本质上是一个UTF-8编码的字符串,是MQTT协议进行路由的基础。MQTT主题类似于URL路径,使用/
进行分成。为了避免歧义,
MQTT主题不需要提前创建,客户端在发布或者订阅时,会
2 主题通配符
2.1 单层通配符+
单个层级匹配的通配符,
+ //有效
tmp/+ //有效
tmp/+/tmp //有效
tmp+ //无效
2.2 多层通配符#
多个层级匹配的通配符,也是必须占据整个层级,且必须是主题中的最后一个字符
tmp/# //有效
tmp/#/tmp //无效
# //有效,匹配所有层级
tmp# //无效
3 系统主题
以 $SYS/
开头的主题为系统主题,系统主题主要用于获取HQTT服务器自身运行状态、消息统计、客户端上下线事件等数据。目前,MQTT 协议暂未明确规定 $SYS/主题标准,但大多数 MQTT 服务器都遵循该标准建议
五、会话
1 MQTT会话
MQTT 会话本质上就是一组需要服务端和客户端额外存储的上下文数据,这些数据可以仅持续与网络连接一样长的时间,也可以跨越多个连续的网络连接存在。当客户端与服务端借助这些会话数据恢复通信时,可以让网络中断就像从未发生过一样
通过会话可以实现客户端和服务器之间的消息传递。MQTT客户端和MQTT服务器之间的连接被称为会话。每个MOTT客户端都可以启动一个或多个会话
2 常见配置参数
2.1 Clean start
用于指示客户端在和服务器建立连接的时候应该尝试恢复之前的会话还是直接创建全新的会话,。
0
:服务端存在一个关联此客户端标识符(ClientI0)的会话,服务端必须基于此会话的状态恢复与客户端的通信(之前的订阅信息会再次绑定,并且会接收到客户端断开时,发布者所发布的消息)。如果不存在任何关联此客户端标识符的会话,服务端必须创建一个新的会话。1
:客户端和服务端必须丢弃任何已存在的会话,并开始一个新的会话。
2.2 Session Expiry Interval
Session Expiry Interval
决定了会话状态数据在服务端的存储时长。
- 没有指定此属性或者设置为0,表示会话将在网络连接断开时立即结束,
- 设置为一个大于0的值,则表示会话将在网络连接断开的多少秒之后过期,
- 置为 0xFFFFFFFF,即 Session Expiry Interval 属性能够设置的最大值时,表示会话数据永不过期
- 服务端使用 Client ID 来唯一地标识每个会话,如果客户端想要在连接时复用之前的会话,那么必须使用与此前一致的 Client ID,
六、消息
1 保留消息
当发布者发送消息到服务器时,如果不存在订阅者时,普通消息将会被,
保留消息的常见使用场景:
- 智能家居设备的状态只有在变更时才会上报,但是控制端需要在上线后就能获取到设备的状态;
- 传感器上报数据的间隔太长,但是订阅者需要在订阅后立即获取到最新的数据;
- 传感器的版本号、序列号等不会经常变更的属性,可在上线后发布一条保留消息告知后续的所有订阅者;
2 消息时间间隔
MQTT 可以通过Session Expiry Interval
为离线客户端缓存尚未发送的消息,然后在客户端恢复连接时发送,但如果客户端离线时间较长,可能有一些寿命较短的消息已经没有必要必须发送给客户端了,继续发送这些过期的消息,只会浪费网络带宽和客户端资源。
消息过期间隔是 MQTT 5.0 引入的一个新特性,它允许发布端为有时效性的消息设置一个过期间隔,,,默认情况下,消息中不会包含消息过期间隔,这表示该消息永远不会过期
注意:如果客户端在发布消息时设置了过期间隔,那么服务端在转发这个消息时也会包含过期间隔,但过期间隔的值会被更新为服务端接收到的值减去该消息在服务端停留的时间。这可以避免消息的时效性在传递的过程中丢失,特别是在桥接到另一个MQTT服务器的时候。
3 遗嘱消息
在MQTT中,,与普通消息类似,我们可以设置遗嘱消息的主题、有效载荷等等。。这些接收者也因此可以及时地采取行动,例如向用户发送通知、切换备用设备等等。作用:借助于遗嘱消息可以感知到客户端是意外断开
Will Delay Interval
遗嘱消息只是多了一个专属属性 Will Delay Interval
,这个属性决定了服务端将在网络连接关闭后延迟多久发布遗嘱消息,并以秒为单位。
默认情况下,服务端总是在网络连接意外关闭时立即发布遗嘱消息、但是很多时候,网络连接的中断是短暂的,所以客户端往往能够重新连接并继续之前的会话,这导致遗嘱消息可能被频繁地且无意义地发送。
如果没有指定 Will Delay Interval 或者将其设置为 0,服务端将仍然在网络连接关闭时立即发布遇嘱消息,但如果将 Will Delay Interval 设置为一个大于8 的值,并且客户端能够在 Will Delay Interval 到期前恢复连接,那么该遗嘱消息将不会被发布。
遗嘱消息与会话 遇嘱消息是会话状态的一部分,当会话结束,遇嘱消息也无法继续单独存在,但是在遇嘱消息延迟发布期间,会话可能过期,也可能因为客户端在新的连接中设置ClearStart 为1所以服务端雲要丢弃之前的会话,为了避免丢失遗嘱,此时服务端必须发布该遗嘱消息,即便 Will Delay Interval 还没有到期,所以服务端最终何时发布遗嘱消息,取决于 Will Delay Interval 到期和会话结束这两种情况谁先发生
4 延迟发布
简介:MQTT服务端接收到消息以后,
延迟发布的使用场景:
1、农业智能化管理:在智能农业中,可能需要在特定时间启动灌溉系统或调节温室环境。通过MQTT延迟发布,可以预先设定好指令发布时间,如在消晨或傍晚自动发送开启灌溉的信号,确保水资源的有效利用且不对作物生长周期造成干扰。
2、能源管理与自动控制:智能家居或智能建筑中的照明、供暖、通风系统可以根据居民生活习惯或节能策略,利用延迟发布在预设时间自动调整,如在居民到家前半小时开启空调或在离开家后一定时间关闭所有非必要电器,以达到节能减排的目的。
3、公共设施维护:城市中的公共照明、广告牌等设施可能聚要在特定时间统一开关,以节省能源或遵守当地法规、通过MQTT延迟发布功能,可以安排在夜间自动发送开关指令,无需人工干预,简化运维流程,
4、医疗健康监护:在远程医疗监护中,设备可能露要在一天中的特定时间收集患者数据或发送提醒,如定时提醒患者服药或在固定时间收集心率、血压等生理参数,以优化患者护理计划。
延迟发布主题格式:$delayed/{DelayInterval}/
$delayed
:作为主题前缀的消息都将被视为需要延迟发布的消息
DelavInteryal
:延迟发布的时间间隔,单位为秒,允许的最大间隔是4294967 秒,如果DelavInterval无法被解析为一个整型数字,EMOX 将丢弃该消息,客户端不会收到任何信息。
TopicName
:主题,例如:test/a;test/b
七、订阅
1 订阅选项
主题过滤器:决定了服务端将向我们转发哪些主题下的消息
订阅选项:是允许我们进一步定制服务端的转发行为
QoS:
是最常用的一个订阅选项,它表示服务端在向订阅端发送消息时可以使用的最大QoS等级。()
No Local:
(一个客户端既可以是发布者),0表示允许,1表示不允许。在多个Borker桥接的场景中,如果没有将No Local订阅选项的值设置为1,那么此时会形成****Retain As Published
服务端在向此订阅转发应用消息时是否需要清除消息中的 Retain 标识。0-清除标识,1-保持不变
Retain Handling
0
(默认值) 表示只要订阅建立,就发送保留消息,1
表示只有建立全新的订阅而不是重复订阅时,才发送保留消息2
表示订阅建立时不要发送保留消息,
2 共享订阅
2.1 带群组的共享订阅
您可以通过在原始主题前添加 $share/<group-nane>
前缀为分组的订阅者启用共享订间,组名可以是任意字符用,EMQX同时将消息转发给不同的组,属于同一组的订阅者可以使用。
2.2 不带群组的共享订阅
以 $queue/
为前缀的是不带群组的共享订阅。它是 $share
订阅的一种特例,可以将其理解为所有订阅者都在一个订阅组中
负载均衡算法配置:
随机
(Random),在共享订阅组内随机选择一个会话发送消息。轮询
(Round Robin),在共享订阅组内按顺序选择一个会话发送消息,循环往复。哈希
(Hash),基于某个字段的哈希结果来分配。粘性
(Sticky),在共享订阅组内随机选择一个会话发送消息,此后保持这一选择,直到该会话结束再重复这一过程。本地优先
(Local),随机选择,但优先选择与消息的发布者处于同一节点的会话,如果不存在这样的会话,则退化为普通的随机策略。
3 排他订阅
排它订阅允许对主题进行互斥订阅,,在当前订阅者未取消订阅前,其他订阅者都将无法订阅对应主题。要进行排它订阅,您需要为订阅的主题名称添加$exclusive/
前缀。
4 自动订阅
自动订阅能够给 EMQX,设置多个规则,在设备成功连接后按照规则为其订阅指定主题,不需要额外发起订阅。
八、数据集成
作为一个 MQTT 消息平台,EMQX 通过 MQTT 协议连接物联网设备并实时传递消息。在此基础上,数据集成为 EMQX 引入了与外部数据系统的连接,从而以实现设备与其他业务系统的无缝集成。
数据集成使用 Sink 与 Source 组件与外部数据系统连接,Sink 用于将消息发送到外部数据系统,例如 MySQL、Kafka 或 HTTP 服务等;而 Source 则用于从外部数据系统接收消息,例如 MQTT、Kafka 或 GCP PubSub。
这一过程允许 EMQX 不仅仅局限于物联网设备之间的消息传递,还能够将设备产生的数据有机地融入到整个业务生态系统中,为物联网应用提供了更广泛的应用场景,使得设备与业务系统之间的交互更为丰富和多样化。
https://docs.emqx.com/zh/emqx/latest/data-integration/data-bridges.html
1 工作原理
EMQX 数据集成是一个开箱即用的功能。作为一个 MQTT 消息平台,EMQX 通过 MQTT 协议从物联网设备接收数据。借助内置的规则引擎,接收到的数据会被规则引擎中配置的规则处理。规则将触发一个动作,通过配置的 Sink/Source 将处理后的数据转发到外部数据系统。您可以在 Dashboard 上使用规则或流设计器轻松创建规则、添加动作并创建 Sink/Source,无需任何编码工作。
2 规则引擎
EMQX 配备了一款功能强大的基于 SQL 规则的内置引擎,这是处理和分发数据的核心组件。规则引擎具有广泛的功能,包括条件判断、字符操作、数据类型转换以及压缩/解压功能,能够实现复杂数据的灵活处理。
当客户端触发特定事件,或发布的消息到达 EMQX 时,规则引擎可以根据预定义的规则,对数据进行实时的处理,执行如数据提取、过滤、丰富以及格式转换等操作,然后将处理后的数据发送到指定的 Sink。
3 Sink
Sink 是数据输出组件,被添加到规则的动作中,规则引擎处理完成后的数据将被转发到指定的 Sink,可以在 Sink 中配置数据的操作方式,例如使用 ${var}
或者 ${.var}
语法从数据中提取变量,动态生成 SQL 语句或数据模板,再通过连接器发送到外部数据系统,实现消息存储、数据更新和事件通知等操作。

4 Source
Source 是数据输入组件,作为规则的数据源,通过规则 SQL 进行选择。
Source 从外部 MQTT 或 Kafka 等外部数据系统订阅或消费消息,当新的消息通过连接器到达 EMQX 时,规则引擎将匹配并执行相应规则,对数据进行筛选和处理,处理完成后的数据可以发布到指定 EMQX 主题中,实现云端指令下发等操作。

5 数据集成入门案例(HTTP方式)
5.1 准备一个web服务
此处采用SpringBoot快速构建一个web服务
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
@RestController
class MQTTController {
@PostMapping("/mqtt")
public Object mqtt(@RequestBody Map<String, Object> message) {
System.out.println("接收mqtt消息:" + JSONUtil.toJsonPrettyStr(message));
return "success";
}
}
5.2 新建连接器
创建连接器==>选择HTTP服务
填写HTTP服务的URL==>测试链接==>创建
5.3 创建规则
添加动作==>动作类型选择HTTP服务==>选择连接器==>输入URL路==>测试链接==>创建
输入名称==>修改SQL语句==>保存
5.4 测试
给t/a主题发送报文==>查看应用控制台
6 SQL语句
https://docs.emqx.com/zh/emqx/latest/data-integration/rule-sql-syntax.html
九、SpringBoot整合MQTT客户端
需要提前给客服端授权,否则可能会拒绝连接。授权文档
1 引入依赖
<!-- SpringBoot 项目集成消息中间件基础依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<!-- SpringBoot 项目集成MQTT客服端起步依赖 -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.4.3</version>
</dependency>
2 配置文件
application.yml
spring: mqtt: username: mqtt_u password: mqtt_pwd url: tcp://192.168.163.128:1883 subClientId: sub_client_id subTopic: test/01,test/02 pubClientId: pub_client_id
配置文件对应配置类
@Data @ConfigurationProperties(prefix = "spring.mqtt") @Component public class MqttConfigProperties { String username; String password; String url; String subClientId; String subTopic; String pubClientId; }
3 配置连接工厂
@Configuration
public class MqttConnectionFactory {
@Autowired
MqttConfigProperties mqttConfigProperties;
@Bean
public MqttPahoClientFactory mqttPahoClientFactory() {
// 创建客户端工厂
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
// 创建连接参数对象
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(mqttConfigProperties.getUsername());
options.setPassword(mqttConfigProperties.getPassword().toCharArray());
options.setServerURIs(new String[]{mqttConfigProperties.getUrl()});
factory.setConnectionOptions(options);
return factory;
}
}
4 订阅主题
@Configuration
public class MqttInboundConfiguration {
@Autowired
MqttConfigProperties mqttConfigProperties;
@Autowired
MqttPahoClientFactory mqttPahoClientFactory;
// 消息入栈通道
@Bean
public MessageChannel messageInboundChannel() {
return new DirectChannel();
}
// 配置入栈适配器
@Bean
public MessageProducer messageProducer() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
// 服务端url
mqttConfigProperties.getUrl(),
// 客服端id
mqttConfigProperties.getSubClientId(),
// 连接工厂
mqttPahoClientFactory,
// 主题
mqttConfigProperties.getSubTopic().split(",")
);
// qos
adapter.setQos(1);
// 消息转换器
adapter.setConverter(new DefaultPahoMessageConverter());
// 输出通道
adapter.setOutputChannel(messageInboundChannel());
return adapter;
}
// 消息入栈处理器
@Bean
@ServiceActivator(inputChannel = "messageInboundChannel") // 指定通道
public MessageHandler messageHandler() {
return message -> System.out.println(JSONUtil.toJsonPrettyStr(message));
}
}
5 启动SpringBoot
给test/01,test/02主题分别发送一条消息==>查看应用控制台

6 发布消息到mqtt
配置出栈消息通道
@Configuration public class MqttOutboundConfiguration { @Autowired MqttConfigProperties mqttConfigProperties; @Autowired MqttPahoClientFactory mqttPahoClientFactory; // 消息出战通道 @Bean public MessageChannel messageOutboundChannel() { return new DirectChannel(); } // 消息出战处理器 @Bean @ServiceActivator(outputChannel = "messageOutboundChannel") public MessageHandler mqttOutboundHandler() { MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler( mqttConfigProperties.getUrl(), mqttConfigProperties.getPubClientId(), mqttPahoClientFactory ); mqttPahoMessageHandler.setAsync(true); mqttPahoMessageHandler.setDefaultQos(0); mqttPahoMessageHandler.setDefaultTopic("default"); return mqttPahoMessageHandler; } }
定义消息发送的网关接口
@MessagingGateway(defaultRequestChannel = "messageOutboundChannel") public interface MqttGateway { void sendMessageToMqtt(@Header(value = MqttHeaders.TOPIC) String topic, String payload); void sendMessageToMqtt(@Header(value = MqttHeaders.TOPIC) String topic, @Header(value = MqttHeaders.QOS) int qos, String payload); }
测试类
@SpringBootTest public class TestApplication { @Autowired MqttGateway mqttGateway; @Test public void sendMessage() { mqttGateway.sendMessageToMqtt("test/03", JSONUtil.createObj().set("message", "ok").toString()); System.out.println("=================发送成功=================="); } }
订阅者