3. MQTT简要介绍
——
[1MQTT项目工程](https://githubcom/LiamBindle/MQTT-C)
[2MQTT API说明文档](https://liambindleca/MQTT-C/group__apihtml)
[3MQTT协议中文版](https://mcxiaokegitbooksio/mqtt-cn/content/mqtt/01-Introductionhtml)
MQTT是一个客户端服务端架构的发布/订阅模式的消息传输协议。它的设计思想是轻巧、开放、简单、规范,易于实现。这些特点使得它对很多场景来说都是很好的选择,特别是对于受限的环境如机器与机器的通信(M2M)以及物联网环境(IoT)。
MQTT协议通过交换预定义的MQTT控制报文来通信。
报文格式: 固定包头+可变包头+payload。
固定包头: 由两个字节组成
byte1 高四位表示控制报文的类型,低四位表示控制报文类型的标志位。
byte2表示剩余长度,包括可变报头和负载的数据。剩余长度不包括用于编码剩余长度字段本身的字节数。
可变包头:
某些MQTT控制报文包含一个可变报头部分。它在固定报头和负载之间。可变报头的内容根据报文类型的不同而不同。可变报头的报文标(Packet Identifier)字段存在于在多个类型的报文里。
有效载荷:
对于PUBLISH来说有效载荷就是应用消息。
——1 网络建立连接后,客户端发送的第一个报文必须是CONNECT报文
——2 客户端只能发送一次CONNECT 报文,发送两次会当作协议违规处理,并断开连接。
——3 有效载荷包括:客户端唯一标识符,will主题,will消息,用户名和密码。
——4可变包头:协议名(protocol name)+协议级别(protocol level)+连接标志(connect flags)+保持连接(keep alive)。
——5 协议名: MQTT的UTF-8编码的字符串。(MSB+LSB +MQTT 六个字节)
——6 协议级别:一个字节
——7 连接标志:一个字节,包含一些用于指定MQTT连接行为的参数。同时还指出有效载荷中的字段是否存在。服务端必须验证CONNECT控制报文的保留标志位(第0位)是否为0,如果不为0必须断开客户端连接。
——8 清理会话:byte8 的bit1位标志。
这个二进制位指定了会话状态的处理方式。客户端和服务端可以保存会话状态,以支持跨网络连接的可靠消息传输。这个标志位用于控
制会话状态的生存时间。
标志设置为0:服务端必须基于当前会话(使用客户端标识符识别)的状态恢复与客户端的通信。
标志设置为1:客户端和服务端必须丢弃之前的任何会话并开始一个新的会话。会话仅持续和网络连接同样长的时间。
——9 遗嘱标志 WILL FLAG: byte8的bit2位标志。
遗嘱标志(Will Flag)被设置为1,表示如果连接请求被接受了,遗嘱(Will Message)消息必须被存储在服务端并且与这个网络连接关联。之后网络连接关闭时,服务端必须发布这个遗嘱消息,除非服务端收到DISCONNECT报文时删除了这个遗嘱消息。
服务端发送CONNACK报文响应从客户端收到的CONNECT报文。服务端发送给客户端的第一个报文必须是CONNACK。
如果客户端在合理的时间内没有收到服务端的CONNACK报文,客户端应该关闭网络连接。合理 的时间取决于应用的类型和通信基础设施。
CONNACK报文没有有效载荷。
PUBLISH控制报文是指从客户端向服务端或者服务端向客户端传输一个应用消息。
固定包头:
注意byte1的bit3为重发标志DUP。
如果DUP标志被设置为0,表示这是客户端或服务端第一次请求发送这个PUBLISH报文。如果DUP标志被设置为1,表示这可能是一个早前报文请求的重发。
服务端发送PUBLISH报文给订阅者时,收到(入站)的PUBLISH报文的DUP标志的值不会被传播。发送(出站)的PUBLISH报文与收到(入站)的PUBLISH报文中的DUP标志是独立设置的,它的值必须单独的根据发送(出站)的PUBLISH报文是否是一个重发来确定。
可变包头:
主题名 :topic name
报文标识符 :packet identitfier。
有效载荷 :有效载荷包含将被发布的应用消息。数据的内容和格式是应用特定的。有效载荷的长度这样计算:用固定报头中的剩余长度字段的值减去可变报头的长度。包含零长度有效载荷的PUBLISH报文是合法的。
响应:PUBLISH报文的接收者必须按照根据PUBLISH报文中的QoS等级发送响应。
PUBACK报文是对QoS 1等级的PUBLISH报文的响应。
PUBACK报文没有有效载荷。
PUBREC报文是对QoS等级2的PUBLISH报文的响应。它是QoS 2等级协议交换的第二个报文。
PUBREC报文没有有效载荷。
PUBREL报文是对PUBREC报文的响应。它是QoS 2等级协议交换的第三个报文。
PUBREL报文没有有效载荷。
PUBCOMP报文是对PUBREL报文的响应。它是QoS 2等级协议交换的第四个也是最后一个报文。
PUBCOMP报文没有有效载荷。
客户端向服务端发送 SUBSCRIBE 报文用于创建一个或多个订阅。每个订阅注册客户端关心的一个或多个主题。为了将应用消息转发给与那些订阅匹配的主题,服务端发送PUBLISH报文给客户端。SUBSCRIBE报文也(为每个订阅)指定了最大的QoS等级,服务端根据这个发送应用消息给客户端。
有效载荷:
SUBSCRIBE报文的有效载荷包含了一个主题过滤器列表,它们表示客户端想要订阅的主题。SUBSCRIBE报文的有效载荷必须包含至少一对主题过滤器 和 QoS等级字段组合。没有有效载荷的SUBSCRIBE报文是违反协议的。
响应:
服务端收到客户端发送的一个SUBSCRIBE报文时,必须使用SUBACK报文响应,SUBACK报文必须和等待确认的SUBSCRIBE报文有相同的报文标识符。
服务端发送SUBACK报文给客户端,用于确认它已收到并且正在处理SUBSCRIBE报文。SUBACK报文包含一个返回码清单,它们指定了SUBSCRIBE请求的每个订阅被授予的最大QoS等级。
有效载荷:
有效载荷包含一个返回码清单。每个返回码对应等待确认的SUBSCRIBE报文中的一个主题过滤器。返回码的顺序必须和SUBSCRIBE报文中主题过滤器的顺序相同。
客户端发送UNSUBSCRIBE报文给服务端,用于取消订阅主题。
有效载荷 :
UNSUBSCRIBE报文的有效载荷包含客户端想要取消订阅的主题过滤器列表。
UNSUBSCRIBE报文中的主题过滤器必须是连续打包的、按照定义的UTF-8编码字符串
UNSUBSCRIBE报文的有效载荷必须至少包含一个消息过滤器。没有有效载荷的UNSUBSCRIBE报文是违反协议的。
响应:
UNSUBSCRIBE报文提供的主题过滤器(无论是否包含通配符)必须与服务端持有的这个客 户端的当前主题过滤器集合逐个字符比较。如果有任何过滤器完全匹配,那么它(服务端)自己的订阅将被删除,否则不会有进一步的处理。
如果服务端删除了一个订阅:
——它必须停止分发任何新消息给这个客户端 []。
——它必须完成分发任何已经开始往客户端发送的QoS 1和QoS 2的消息 []。
——它可以继续发送任何现存的准备分发给客户端的缓存消息。
服务端必须发送UNSUBACK报文响应客户端的UNSUBSCRIBE请求。UNSUBACK报文必须包含和UNSUBSCRIBE报文相同的报文标识符 。即使没有删除任何主题订阅,服务端也必须发送一个UNSUBACK响应 。
如果服务端收到包含多个主题过滤器的UNSUBSCRIBE报文,它必须如同收到了一系列的多个UNSUBSCRIBE报文一样处理那个报文,除了将它们的响应合并到一个单独的UNSUBACK报文外。
服务端发送UNSUBACK报文给客户端用于确认收到UNSUBSCRIBE报文。
UNSUBACK报文没有有效载荷。
客户端发送PINGREQ报文给服务端的。用于:
1 在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着。
2 请求服务端发送 响应确认它还活着。
3 使用网络以确认网络连接没有断开。
保持连接(Keep Alive)处理中用到这个报文。
——PINGREQ报文没有可变报头。
——PINGREQ报文没有有效载荷。
响应:
服务端必须发送 PINGRESP报文响应客户端的PINGREQ报文。
服务端发送PINGRESP报文响应客户端的PINGREQ报文。表示服务端还活着。
保持连接(Keep Alive)处理中用到这个报文。
PINGRESP报文没有可变报头。
PINGRESP报文没有有效载荷。
DISCONNECT报文是客户端发给服务端的最后一个控制报文。表示客户端正常断开连接。
DISCONNECT报文没有可变报头。
DISCONNECT报文没有有效载荷。
响应:
客户端发送DISCONNECT报文之后:
—— 必须关闭网络连接 [MQTT-3144-1]。
——不能通过那个网络连接再发送任何控制报文。
服务端在收到DISCONNECT报文时:
——必须丢弃任何与当前连接关联的未发布的遗嘱消息。
——应该关闭网络连接,如果客户端 还没有这么做。
为了提供服务质量保证,客户端和服务端有必要存储会话状态。在整个会话期间,客户端和服务端都必须存储会话状态 。会话必须至少持续和它的活跃网络连接同样长的时间。服务端的保留消息不是会话状态的组成部分。服务端应该保留那种消息直到客户端删除它。
MQTT协议要求基础传输层能够提供有序的、可靠的、双向传输(从客户端到服务端 和从服务端到客户端)的字节流。
无连接的网络传输协议如UDP是不支持的,因为他们可能会丢失数据包或对数据包重排序。
MQTT按照这里定义的服务质量 (QoS) 等级分发应用消息。分发协议是对称的,在下面的描述中,客户端和服务端既可以是发送者也可以是接收者。分发协议关注的是从单个发送者到单个接收者的应用消息。服务端分发应用消息给多个客户端时,每个客户端独立处理。分发给客户端的出站应用消息和入站应用消息的QoS等级可能是不同的。
qos0:最多分发一次
qos1:至少分发一次
qos2:仅分发一次
客户端设置清理会话(CleanSession)标志为0重连时,客户端和服务端必须使用原始的报文标识符重发任何未确认的PUBLISH报文(如果QoS>0)和PUBREL报文 [MQTT-440-1]。这是唯一要求客户端或服务端重发消息的情况。
服务端接管入站应用消息的所有权时,它必须将消息添加到订阅匹配的客户端的会话状态。正常情况下,客户端收到发送给它的订阅的消息。客户端也可能收到不是与它的订阅精确匹配的消息。如果服务端自动给客户端分配了一个订阅,可能发生这种情况。正在处理UBSUBSCRIBE请求时也可能收到消息。客户端必须按照可用的服务质量(QoS)规则确认它收到的任何PUBLISH报文,不管它选择是否处理报文包含的应用消息 。
实现本章定义的协议流程时,客户端必须遵循下列规则:
重发任何之前的PUBLISH报文时,必须按原始PUBLISH报文的发送顺序重发(适用于QoS 1和QoS 2消息)[MQTT-460-1]。
——必须按照对应的PUBLISH报文的顺序发送PUBACK报文(QoS 1消息)。
——必须按照对应的PUBLISH报文的顺序发送PUBREC报文(QoS 2消息。
——必须按照对应的PUBREC报文的顺序发送PUBREL报文(QoS 2消息)。
服务端必须默认认为每个主题都是有序的。它可以提供一个管理功能或其它机制,以允许将一个或多个主题当作是无序的 。
服务端处理发送给有序主题的消息时,必须按照上面的规则将消息分发给每个订阅者。此外,它必须按照从客户端收到的顺序发送PUBLISH报文给消费者(对相同的主题和QoS)。
斜杠(‘/’ U+002F)用于分割主题的每个层级,为主题名提供一个分层结构
数字标志(‘#’ U+0023)是用于匹配主题中任意层级的通配符。
加号 (‘+’ U+002B) 是只能用于单个主题层级匹配的通配符。
服务端不能将 $ 字符开头的主题名匹配通配符 (#或+) 开头的主题过滤器
$SYS/ 被广泛用作包含服务器特定信息或控制接口的主题的前缀。
应用不能使用 $ 字符开头的主题。
订阅 “#” 的客户端不会收到任何发布到以 “$” 开头主题的消息。
订阅 “+/monitor/Clients” 的客户端不会收到任何发布到 “$SYS/monitor/Clients” 的消息。
订阅 “$SYS/#” 的客户端会收到发布到以 “$SYS/” 开头主题的消息。
订阅 “$SYS/monitor/+” 的客户端会收到发布到 “$SYS/monitor/Clients” 主题的消息。
如果客户端想同时接受以 “$SYS/” 开头主题的消息和不以 $ 开头主题的消息,它需要同
时订阅 “#” 和 ““$SYS/#”。
除非另有说明,如果服务端或客户端遇到了协议违规的行为,它必须关闭传输这个协议违规控制报文的网络连接
MQTT方案通常部署在不安全的通信环境中。在这种情况下,协议实现通常需要提供这些机制:
——用户和设备身份认证
——服务端资源访问授权
——MQTT控制报文和内嵌应用数据的完整性校验
——MQTT控制报文和内嵌应用数据的隐私控制
作为传输层协议,MQTT仅关注消息传输,提供合适的安全功能是实现者的责任。使用TLS[RFC5246] 是比较普遍的选择。
广泛采用高级加密标准 [AES] 数据加密标准 [DES] 。
推荐使用为受限的低端设备特别优化过的轻量级加密国际标准 ISO 29192 [ISO29192] 。
如果MQTT在WebSocket [RFC6455] 连接上传输,必须满足下面的条件:
——MQTT控制报文必须使用WebSocket二进制数据帧发送。如果收到任何其它类型的数据帧,接收者必须关闭网络连接 。
——单个WebSocket数据帧可以包含多个或者部分MQTT报文。接收者不能假设MQTT控制报文按WebSocket帧边界对齐 。
——客户端必须将字符串 mqtt 包含在它提供的WebSocket子协议列表里 。
——服务端选择和返回的WebSocket子协议名必须是 。
——用于连接客户端和服务器的WebSocket URI对MQTT协议没有任何影响。
MQTT规范定义了MQTT客户端实现和MQTT服务端实现的一致性要求
MQTT实现可以同时是MQTT客户端和MQTT服务端。接受入站连接和建立到其它服务端的出站连接的服务端必须同时符合MQTT客户端和MQTT服务端的要求 。
为了与任何其它的一致性实现交互操作,一致性实现不能要求使用在本规范之外定义的任何扩展 。
附录:
控制报文类型
byte1:标志位flags:
在使用 MQTT 协议连接 MQTT 服务器时,可能会遇到连接失败的情况。这通常是由于以下几个原因之一:
服务器配置错误:确保您的 MQTT 服务器已经正确配置,并且已经启动。您可以检查 MQTT 服务器的配置文件,确保 MQTT 服务器的端口号、认证方式、消息格式等参数都正确。
客户端配置错误:确保您的客户端已经正确配置,并且已经启动。您可以检查 MQTT 客户端的配置文件,确保 MQTT 客户端的 URL、端口号、认证方式、消息格式等参数都正确。
网络问题:确保您的网络连接正常,并且可以访问 MQTT 服务器。如果您的网络连接不稳定或者无法访问 MQTT 服务器,可能会导致连接失败。
配置文件错误:确保您的配置文件没有错误。如果您的配置文件中有错误或者不正确的参数,可能会导致连接失败。
如果您已经确保了以上几个方面都没有问题,但是仍然无法连接到 MQTT 服务器,您可以尝试以下几个步骤:
检查 MQTT 服务器的配置文件:确保您的 MQTT 服务器的配置文件没有错误。如果您的配置文件中有错误或者不正确的参数,可能会导致连接失败。
检查 MQTT 客户端的配置文件:确保您的 MQTT 客户端的配置文件没有错误。如果您的配置文件中有错误或者不正确的参数,可能会导致连接失败。
检查网络连接:确保您的网络连接正常,并且可以访问 MQTT 服务器。如果您的网络连接不稳定或者无法访问 MQTT 服务器,可能会导致连接失败。
检查 MQTT 服务器的状态:确保您的 MQTT 服务器已经正确启动,并且可以正常工作。如果您的 MQTT 服务器出现了故障,可能会导致连接失败。
如果以上步骤都没有解决问题,您可以尝试查看 MQTT 服务器的日志,以了解更多有关连接失败的信息。同时,您也可以尝试使用其他的 MQTT 客户端或者其他的 MQTT 服务器来连接,以确保您的连接是正常的。
1Socket是对TCP/IP协议的封装,Socket本身并不是协议,而是一个调用接口(API),通过Socket,我们才能使用TCP/IP协议。
2MQTT协议是应用层协议不依赖长连接,适合弱网络。通过topic缓存信息。符合物联网设备的使用场景。因为通过topic缓存信息,因此可以实现通过topic与多个端的一对多连接,而不是设备与设备的多对多连接,节省了能耗及带宽。
MQTT的心跳,及非信息的报文,较Websocket更少,更节省带宽及能耗。更适用于物理网的多种网络协议。
3WebSocket和Http一样在应用层,提供使用一个TCP连接进行双向通讯的机制,包括网络协议和API,以取代网页和服务器采用HTTP轮询进行双向通讯的机制。 本质上来说,WebSocket是不限于HTTP协议的,但是由于现存大量的HTTP基础设施,代理,过滤,身份认证等等,WebSocket借用HTTP和HTTPS的端口。由于使用HTTP的端口,因此TCP连接建立后的握手消息是基于HTTP的,由服务器判断这是一个HTTP协议,还是WebSocket协议。 WebSocket连接除了建立和关闭时的握手,数据传输和HTTP没丁点关系了。
Socket 连接,至少需要一对套接字,分为 clientSocket,serverSocket 连接分为3个步骤:
(1) 服务器监听:服务器并不定位具体客户端的套接字,而是时刻处于监听状态;
(2) 客户端请求:客户端的套接字要描述它要连接的服务器的套接字,提供地址和端口号,然后向服务器套接字提出连接请求;
(3) 连接确认:当服务器套接字收到客户端套接字发来的请求后,就响应客户端套接字的请求,并建立一个新的线程,把服务器端的套接字的描述发给客户端。一旦客户端确认了此描述,就正式建立连接。而服务器套接字继续处于监听状态,继续接收其他客户端套接字的连接请求
Socket为长连接:通常情况下Socket 连接就是 TCP 连接,因此 Socket 连接一旦建立,通讯双方开始互发数据内容,直到双方断开连接。在实际应用中,由于网络节点过多,在传输过程中,会被节点断开连接,因此要通过轮询高速网络,该节点处于活跃状态。
很多情况下,都是需要服务器端向客户端主动推送数据,保持客户端与服务端的实时同步。
若双方是 Socket 连接,可以由服务器直接向客户端发送数据。
若双方是 HTTP 连接,则服务器需要等客户端发送请求后,才能将数据回传给客户端。
因此,客户端定时向服务器端发送请求,不仅可以保持在线,同时也询问服务器是否有新数据,如果有就将数据传给客户端。
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是轻量级基于代理的发布/订阅的消息传输协议,设计思想是开放、简单、轻量、易于实现。这些特点使它适用于受限环境。
例如:
①网络代价昂贵,带宽低、不可靠。
②在嵌入设备中运行,处理器和内存资源有限。
该协议的特点有:
①使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。 ②对负载内容屏蔽的消息传输。
③使用 TCP/IP 提供网络连接。
④有三种消息发布服务质量:
⑤"至多一次",消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
⑥"至少一次",确保消息到达,但消息重复可能会发生。
⑦"只有一次",确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
⑧小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。
⑨使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。
实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
有三种消息发布服务质量:
“至多一次”,消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。这一种方式主要普通APP的推送,倘若你的智能设备在消息推送时未联网,推送过去没收到,再次联网也就收不到了。qos=0
“至少一次”,确保消息到达,但消息重复可能会发生。qos=1
“只有一次”,确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。在计费系统中,消息重复或丢失会导致不正确的结果。这种最高质量的消息发布服务还可以用于即时通讯类的APP的推送,确保用户收到且只会收到一次。qos=2
Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
payload,可以理解为消息的内容,是指订阅者具体要使用的内容。
在MQTT协议中,一个MQTT数据包由:固定头(Fixed header)、可变头(Variable header)、消息体(payload)三部分构成。MQTT数据包结构如下:
固定头(Fixed header)。存在于所有MQTT数据包中,表示数据包类型及数据包的分组类标识。
可变头(Variable header)。存在于部分MQTT数据包中,数据包类型决定了可变头是否存在及其具体内容。
消息体(Payload)。存在于部分MQTT数据包中,表示客户端收到的具体内容。
WebSocket则提供使用一个TCP连接进行双向通讯的机制,包括网络协议和API,以取代网页和服务器采用HTTP轮询进行双向通讯的机制。 本质上来说,WebSocket是不限于HTTP协议的,但是由于现存大量的HTTP基础设施,代理,过滤,身份认证等等,WebSocket借用HTTP和HTTPS的端口。由于使用HTTP的端口,因此TCP连接建立后的握手消息是基于HTTP的,由服务器判断这是一个HTTP协议,还是WebSocket协议。 WebSocket连接除了建立和关闭时的握手,数据传输和HTTP没丁点关系了。 由此可知两者的应用场景不一样: MQTT是为了物联网场景设计的基于TCP的Pub/Sub协议,有许多为物联网优化的特性,比如适应不同网络的QoS、层级主题、遗言等等。 WebSocket是为了HTML5应用方便与服务器双向通讯而设计的协议,HTTP握手然后转TCP协议,用于取代之前的Server Push、Comet、长轮询等老旧实现。 两者之所有有交集,是因为一个应用场景:如何通过HTML5应用来作为MQTT的客户端,以便接受设备消息或者向设备发送信息,那么MQTT over WebSocket自然成了最合理的途径了。
本文将介绍使用 Node-RED 连接到 MQTT 服务器,并对 MQTT 数据进行过滤和处理后再将其发送至 MQTT 服务器的完整操作流程。读者可以快速了解如何使用 Node-RED 对 MQTT 数据进行简单的流处理。
Node-RED 无论是在你本地的电脑上,还是树莓派等设备,亦或是云端服务器,都可以快速安装和使用,下面将使用两种比较常见的安装方式:
使用 npm 进行全局安装:
使用 Docker 进行安装:
如果使用的是 npm 进行的全局安装,那么在提示安装成功后,只需要在全局运行 node-red 命令就可以立即启动 Node-RED。
无论是使用 Docker 还是 npm 在启动成功后,我们只需要打开浏览器,输入当前地址加 1880 端口号,即可打开 Node-RED 的浏览器编辑器页面,例如在本地运行的话,打开浏览器,输入 http://127001:1880,当看到如下图所示页面后,说明 Node-RED 已经成功启动:
本文将使用 EMQ 提供的 免费公共 MQTT 服务器,该服务基于 MQTT 物联网云平台 - EMQX Cloud 创建,服务器接入信息如下:
在下面的功能演示中,我们将提供一个使用 Node-RED 来处理接收到的包含温湿度信息的 JSON 数据,然后对温度值进行规则判断,当温度发生改变的时候,就将当前发生改变的温度值通过 MQTT 再次发送出去的简单使用案例。
我们首先在左侧菜单栏中,拖拽一个 MQTT in 的节点到页面中,双击节点后,右侧出现一个编辑 MQTT 节点的配置页面,我们根据内容提示,新建一个连接信息后,再填入 MQTT 的其它连接信息后,点击 Done 按钮后,即可保存该节点信息。
接入数据:我们拖拽一个 JSON 节点到页面中,可以在 JSON 节点的配置页面中,配置一个 Action,我们设置为 Always convert to JavasScript Object ,因为我们无法确定发送过来的数据是一个 JSON 格式的数据还是一个 JSON 字符串,因此第一步都将接收到的消息进行一个 JSON 转换。配置完成后,我们将该节点与 MQTT in 节点进行连接。
过滤数据
我们配置完成格式化发送过来的消息数据后,我们就可以拖拽一个 filter 节点到页面中,同样双击节点后,在配置页面中配置规则,我们先选择一个 Mode,我们设置为 blcok unless value changes ,过滤规则为需要当前接收到数据的值发生改变,因为目前数据为 JSON 格式,我们判断的是 JSON 数据内的某一个值,因此我们需要在 Property 这里设置值为 msgpayloadtemperature 配置完成后我们点击 Done 按钮来保存数据过滤节点的配置,最后将该节点连接到上一步配置完成后的 JSON 节点。
使用模版
当过滤完数据后,同样拖拽一个 template 节点到页面中,双击节点后来配置模版内容,使过滤完成后的数据,能通过模版将数据进行输出。当然也可以不需要这个步骤,直接将过滤后的数据进行输出。
完成以上对数据的处理和过滤后,最后我们再来将处理完成后的数据使用 MQTT 将其发送出去,拖拽一个 MQTT out 的节点到页面中,填入和 MQTT in 节点相同的连接信息,配置一个用户接收数据的 Topic,最后保存完成后,再将其和 template 节点进行连接,点击右上角的 Deploy 按钮,即可对当前规则应用进行在线部署。
在完成整个流数据处理的功能编排以后,我们使用 MQTT 50 客户端工具 - MQTT X 来测试和验证该功能的可用性。我们新建一个连接,连接到刚才在 Node-RED 中配置的 MQTT 云服务地址,然后输入 MQTT in 节点内的 Topic 来发送一条消息,使 Node-RED 能够接收到我们发送的 MQTT 数据。
然后我们再在 MQTT X 中订阅一个在 MQTT out 节点内配置的 Topic,用于接收处理过的消息数据。当发送一条包含了温湿度的消息数据后,我们可以接收到一条根据我们设定的消息模版发送过来的消息,再次发送就无法接收到。
因为此时温度值没有发生变化,当我们再次修改温度值后,就会发现我们又接收到了一条包含提醒温度值发生变化的消息。
至此,我们完成了安装并使用 Node-RED 连接到 MQTT 云服务,以及对 MQTT 消息数据进行过滤和处理,最后再将处理完成后的数据消息发送至 MQTT 服务器的全部流程。
Node-RED 的交互和使用方式,即用 UI 方式描述通用业务逻辑,可以降低非专业开发人员的上手门槛,使用一个可视化工具快速地创建需要的复杂执行任务,可以通过简单 Node 即节点连接构建出复杂的任务,特别是针对一些物联网的应用场景,都很有帮助。
Eclipse Paho MQTT工具是一个基于Java的Eclipse桌面客户端程序,其底层的和MQTT服务器进行的交互的java类库就是Eclipse Paho java库。假设我们在本机(127001)已经启动了一个mosquitto MQTT服务器,其端口为1883。如何使用 Eclipse Paho MQTT工具?
(1) 下载Eclipse Paho MQTT
https://repoeclipseorg/content/repositories/paho-releases/org/eclipse/paho/orgeclipsepahouiapp/102/
(2) 解压缩后,双击pahoexe,打开后
(3) 点击图中的 十字图标,就能新建一个MQTT的客户端的连接,输入正确的MQTT服务端的连接地址,
比如,本例中的连接地址是tcp://localhost:1883,然后点击“Connect”按钮,这个时候,如果MQTT服务端没有设置密码(默认情况是没有密码的)的话,这个时候,我们就能看到连接得到状态(status)是“Connected”。
(4) 这个时候我们就能订阅消息了。选择“Subscription”下方的绿色十字图标,就可以输入订阅的主题(topic)的名字,比如我们设置主题名称为“test”,并点击 “Subscribe”按钮
(5) 往MQTT服务发送一条消息主题为“test”,内容为“大家好,这是我一条消息。”的MQTT消息。然后点击“Publish”按钮,这个时候,我们就能看到消息已经发送成功,且在步骤(4)订阅的同一主题也收到了消息。
0条评论