可视化网关编程系列教程-----(8)MQTT设备接入与控制
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
这里我们使用轻量级 mosquitto 为例
我一般都是用这个方法安装,如果这个方法不可行,请自行百度尝试其他方案。
使用MQTT 节点就行,一个是订阅,一个是发布。如图所示
这里只做简单介绍,如果需要配置安全或者其他方面的信息,可以查看节点的信息介绍。
这样一个订阅节点就做好了。发布信息的话一样,用的是发布节点。
因为现场花园灯没有将线布置到智能控制箱,所以只能采用MQTT继电器来实现控制。
购买MQTT 继电器(淘宝买sonoff 刷 固件,也可以上闲鱼买我刷好的)
配置设备的无线网,可以看我这篇文章怎么配置 KNX无线继电器使用说明
,并且启用MQTT协议即可。
通过这一系列骚操作,成功将花园灯控制了起来。
可以自行导入测试
1、在这里下载Apollo服务器,下载后解压,然后运行apache-apollo-16\bin\apollocmd,输入create mybroker(名字任意取,这里是根据官网介绍的来取的)创建服务器实例,服务器实例包含了所有的配置,运行时数据等,并且和一个服务器进程关联。
2、create mybroker之后会在bin目录下生成mybroker文件夹,里面包含有很多信息,其中etc\apolloxml文件下是配置服务器信息的文件,etc\usersproperties文件包含连接MQTT服务器时用到的用户名和密码,后面会介绍,可以修改原始的admin=password,可以接着换行添加新的用户名密码。
3、打开cmd,运行…apache-apollo-16\bin\mybroker\bin\apollo-brokercmd run 开启服务器,可以在浏览器中输入http://127001:61680/查看是否安装成功,该界面展示了topic,连接数等很多信息。
经过上面的简单步骤,服务器基本上就已经完成,下一篇将介绍Android客户端的编写和注意事项。
客户端使用的API,开始我使用的是mqtt-client,使用过后发现问题百出,不能很好的满足要求,后来使用了官方推荐的Eclipse Paho,
MQTT只是IBM推出的一个消息协议,基于TCP/IP的。两个App端发送和接收消息需要中间人,这个中间人就是消息服务器(比如ActiveMQ/RabbitMQ),三者通信协议就是MQTT。
wmqttjar是IBM实现的App端收发消息的具体实现,W意思为Webspare,说明消息服务器采用Webspare(WebSphere MQ Integrator Broker)。
MQTT 协议 是基于发布/订阅模式的物联网通信协议,凭借简单易实现、支持 QoS、报文小等特点,占据了物联网协议的半壁江山。
常用于 IOT 物联网和一些需要服务端主动通知客户端的场景。
1 导入依赖
2 创建 MqttHelper 辅助类,设置回调监听
3 连接 MQTT
连接成功或失败,以及中途的连接掉线,会触发 OnMqttStatusChangeListener 回调
4 MQTT 连接状态监听
5 MQTT 收发消息监听
onSubMessage 订阅的消息回调,因为存在订阅多个 topic 的情况,所以回调能知道是来自哪个 Topic 的消息;
onPubMessage 发布的消息回调,用于确认发布的消息是否发送成功。
6 MQTT 订阅 Topic
需要在 MQTT 连接成功后才能订阅 topic,否则订阅 Topic 不成功,收不到对应消息
7 MQTT 取消订阅 Topic
8 MQTT 发布消息
9 MQTT 断开连接
10 通知设置
由于 MQTT 启动了一个 Service,而 Android 80 以上对于后台 Service 限制时长 5 秒;所以将 MqttService 绑定到 Notification 上成为了一个前台通知;通知的标题和内容显示可以在 stringsxml 中设置,对应属性如下:
Android 80 及以上开启前台服务绑定到通知,80 以下默认不启用,可将 mqtt_foreground_notification_low_26 设为 true,将 80 以下设备也开启前台通知服务
创建 MQTT 实例时需要传送参数 MqttOptions,下面将介绍下部分参数;
1 Topic
MQTT 是一种发布/订阅的消息协议, 通过设定的主题 Topic,
发布者向 Topic 发送的 payload 负载消息会经过服务器, 转发到所有订阅
该 Topic 的订阅者
通配符 : 假想移动端消息推送场景,有的系统消息是全体用户接收,有的消息是 Android 或 iOS 设备接收, 又或者是某些消息具体推送到用户,当然, 对应的多种类型消息可以通过多订阅几个对应的 Topic 解决,也可以使用通配符;
通配符有两个, " + " 和 " # ", 与正斜杠 " / " 组合使用;加号只能表示一级Topic, 井号可以表示任意层级 Topic; 例如: 订阅 Topic为 " System/+ ", 发布者发布的 Topic 可以是 System、System/Android、System/iOS; 但是不能是 System/iOS/123, 而订阅的 Topic 如果是" System/# " 则可以收到;
注意,只有订阅的 Topic 才可以使用 通配符, 发布和遗嘱的 Topic 不能包含通配符
2 ClientID
发布者和订阅者都是属于客户端, 客户端与服务端建立连接之后,发送的第一个报文消息必须是 Connect 消息,而 Connect 的消息载荷中必须包含 clientID 客户端唯一标识;
如果两个客户端的 clientID 一样, 则服务端记录第一个客户端连接之后再收到第二个客户端连接请求,则会向一个客户端发送 Disconnect 报文断开连接, 并连接第二个客户端, 而如果此时设置了自动重连, 第一个客户端再次连接,服务端又断开与第二个的连接, 连上第一个客户端, 如此将导致两个客户端不断的被挤掉重连
注意: clientID 使用的字符最好是 大小写字母和数字, 长度最好限制在[1, 23] 之间;
3 遗嘱消息
可选参数, 客户端没有主动向服务端发起 disconnect 断开连接消息,然而服务端检测到和客户端之间的连接已断开, 此时服务端将该客户端设置的遗嘱消息发送出去
应用场景: 客户端因网络等情况掉线之后, 可以及时通知到所有订阅该遗嘱 Topic 的客户端;
遗嘱 Topic 中不能存在通配符
4 Session
客户端和服务端之间建立的会话状态, 一般用于消息保存, 如果设置清除 Session,则每次客户端和服务端建立连接会创建一个新的会话,之前连接中的消息不能恢复,
而设置不清除会话, 对应发布者发送的 qos 为 1和2 的消息,还未被订阅者接收确认,则需要保存在会话中, 以便订阅者下次连接可以恢复这些消息;
注意: Session 存储的消息是保存在内容中的, 所以如果不是重要的消息,最好是设置清除 Session, 或者设置 qos = 0;
5 心跳包
标识客户端传输一次控制报文到下一次传输之间允许的空闲时间;在这段时间内,如果客户端没有其他任何报文发送,必须发送一个 PINGREQ 报文到服务器,而如果服务端在 15 倍心跳时间内没有收到客户端消息,则会主动断开客户端的连接,发送其遗嘱消息给所有订阅者。而服务端收到 PINGREQ 报文之后,立即返回 PINGRESP 报文给客户端
心跳时间单位为秒,占用2个字节,最大 2^16 - 1 = 65535秒(18小时12分钟15秒),设置为 0 表示不使用心跳机制; 心跳时间一般设置为几分钟或几十秒即可,时间短点可以更快的发出遗嘱消息通知掉线,但是时间短会增加消息频率,影响服务端并发; 微信长连接为 300 秒,而三大运营商貌似也有个连接时间最小的为 5 分钟。
6 qos
服务质量等级 qos 对应两部分,一是客户端到服务端发送的消息, 一是服务端到客户端订阅的消息; 从发布者到订阅者实际 qos 为两段路中 qos 最小的。
qos 可选值 0(最多交付一次)、1(最少交付一次)、2(正好交付一次);
qos = 0 :接收方不发送响应,发送方不进行重试;发送方只管发一次,不管是否发成功,也不管接收方是否成功接收,适用于不重要的数据传输;
qos = 1 :确保消息至少有一次到达接收方,发送方向接收方发送消息,需要等待接收方返回应答消息,如果发送方在一定时间之内没有收到应答,发送方继续下一次消息发送,直到收到应答消息,删除本地消息缓存,不再发送;所以接收方可能收到1-n次消息;适用于需要收到所有消息,客户端可以处理重复消息。
qos = 2 :确保消息只一次到达接收方,发送方和接收方之间消息处理流程最复杂;
Mqtt Qos 深度解读 和 MQTT协议QoS2 准确一次送达的实现
7 payload 负载消息
字节流类型, 是 MQTT 通信传输的真实数据
8 保留消息
发布消息时设置, 对应参数 retain, 服务端将保留对应 Topic 最新的一条消息记录; 保留消息的作用是每次客户端连接上线都会收到其 Topic 的最后一条保留消息, 所以可能存在网络不稳定,频繁掉线重连,每次重连重复收到保留消息;
可以向对应的 Topic 发送一条 空消息,用于清除保留消息。
MQTT 服务搭建 Apache Apollo 服务器 搭建 MQTT 服务
Github 仓库
mqtt 协议
时间:2018-07-26
Q: 什么是网络连接?
A: 网络连接是传输层定义的概念,在传输层以下只存在网络数据包的相互交换。
所谓连接,其实也不是在网络上有一条真实存在的数据通道。只要通信双方在一段时间内持续保持数据包交换,就可以视为双方建立的连接并没有断开。
连接的建立是依托于TCP协议的三次握手,一旦连接已经建立完毕,通信双方就可以复用这条虚拟通道进行数据交换。如果连接保持长时间工作一直没有被中断,那么这样的TCP连接就俗称为长连接。
Message Queue Telemetry Transport ,中文直译: 消息队列遥测传输协议 。
在MQTT协议被设计出来的年代,还没有物联网这么时髦的词汇,当年叫做 遥测设备 。
MQTT协议真正开始声名鹊起的原因,是其正好恰恰踩中移动互联网发展的节拍,为消息推送场景提供了一个既简便又具有良好扩展性的现成解决方案。
http://docsoasis-openorg/mqtt/mqtt/v311/os/mqtt-v311-oshtml
可以看出,MQTT对消息头的规定十分精简, 固定头部占用空间大小仅为1个字节 ,一个最小的报文占用的空间也 只有两个字节 (带一字节的长度标识位)。
这也是MQTT协议针对不稳定及带宽低下的网络环境做出的特定设计 - - - - 尽可能地节省一切不必要的网络开销 。
Q:为什么MQTT协议需要心跳报文(PINGREQ, PINGRESP)来维护连接状态,只监控该TCP的连接状态是否可以实现目的?
A: TCP数据传输默认的超时时间过长,不符合应用层上细粒度的要求。
TCP数据传输超时的情况可分成三种: 服务端断开 、 客户端断开 、 中间网络断开 。
在前两种场景下,若断开操作是一方主动发起的,即表示为TCP连接正常结束,双方走四次挥手流程;若程序异常结束,则会触发被动断开事件,通信另一方也能立刻感知到本次连接所打开的 Socket 出现中断异常。
唯独中间网络的状态是通信双方不能掌握的。 在Linux系统下 ,TCP的连接超时由内核参数来控制,如果通信中的一方没有得到及时回复,默认会主动再尝试 6次 。如果还没有得到及时回应,那么其才会认定本次数据超时。
连带首次发包与六次重试,Linux系统下这7次发包的超时时间分别为 2的0次方 至 2的6次方 ,即1秒、2秒、4秒、8秒、16秒、32秒、64秒,一共127秒。MQTT协议认为如此长的超时时间对应用层而言粒度太大,因此其在应用层上还单独设计属于自身的心跳响应控制。常见的MQTT连接超时多被设定为 60秒 。
扩展知识 - TCP的KeepAlive机制: http://hengyunabcgithubio/why-we-need-heartbeat/
由通信中的 报文标识符 ( Packet Identifier )传达。
Q:仅Publish与Pubrec能保证消息只被投递一次吗?
A: 业务上可以实现,但MQTT协议并没有如此设计,原因如下:
每个消息都会拥有属于自己的报文标识符,但如果需要两次数据交换就实现消息仅只收到一次,就需要通信双方记录下每次使用的报文标识符,并且在处理每一条消息时都需要去重处理,以防消息被重复消费。
但MQTT协议最初被设计的工作对象是轻量级物联设备,为此在协议的设计中报文标识符被约定为 可重用 ,以减少对设备性能的消耗,换回的代价不得不使用四次网络数据交换,才能确保消息正好被消费一次。
Q:两个不同客户端在发布与订阅同一Topic下的消息时,都可以提出通信Qos要求,此时以哪项为基准?
A: 伪命题,故意在分享时埋下坑,等人来踩。
两个不同客户端的通信是需要 Broker 进行中转,而不是直连。因此,通信中存在两个不同的会话,双方的Qos要求仅仅作用于它们与 Broker 之间的会话,最终的Qos基准只会向最低要求方看齐。
例:遗嘱消息的正确使用方式可参考此篇文章: https://wwwhivemqcom/blog/mqtt-essentials-part-9-last-will-and-testament
虽然可以借助 Retain Message 实现绑定一条消息至某个Topic,以达到消息的暂时保留目的。
但首先 Retain Message 并不是为存储场景而设计的,再次MQTT协议并没有对消息的持久化作出规定,也就是说Broker重启后,现有保留消息也将丢失。
Q:两种特殊消息的使用场景?
A: 遗嘱消息,多用于客户端间获取互相之间异常断线的消息通知;
保留消息,可保存 最近一条 广播通知,多用于公告栏信息的发布。
Eclipse Mosquitto :MQTT协议的最小集实现
有 EMQ , HiveMQ , RabbitMQ MQTT Adapter 等。
Qos=2 消息保障的网络I/O次数过多,如果不是必需,尽少在程序里使用此类消息。
毕竟当初其设计的目的是为了减少设备的性能占用,但若应用场景并不是物联网,而是用于手机、电脑或浏览器端等现在已不缺性能的设备上,最好在报文体中,使用UUID生成全局唯一的消息ID,然后自行在业务解析中判断此报文是否被消费过。
或者,业务方在处理消息时保证其被消费的幂等性,也可消除重复消息对系统带来的影响。
正如MQTT协议并没有依赖TCP连接状态,自己在应用层协议上实现心跳报文来控制连接状态,业务方作为MQTT协议的使用者,也不要完全依赖协议的工作状态,而是依托MQTT协议建立属于业务本身的信息汇报机制,以加强系统的稳健性。
Retain Message 可视为客户端主动拉取的行为。如果业务系统采用 HTTP+MQTT 双协议描述业务过程,主动拉取的操作也可使用 HTTP 请求替代。
作为一个长连接型的应用,上线前需要根据业务量级,评估对操作系统 端口数 与 文件描述符 的占用要求,以防服务器资源被打满。
在服务端的配置文件和客户端的连接参数中,都拥有 max_inflight_messages 此项配置,来维护 Qos=1 or 2 消息是否被成功消费的状态。
MQTT 最初被设计为物联网级的通信协议,因此此参数的默认配额较小(大多数情况下被限制到10至20)。
但如果将MQTT协议应用至手机、PC或Web端的推送场景时,硬件性能已不在是瓶颈,在实际使用中推荐把此参数调大。
Mosquitto提供Bridge功能,需要我们自己配置。
Bridge 意为桥接,当我们把两台Broker桥接在一起时,只需要修改一台Broker的配置,填上另一台Broker的运行地址。前一台Broker将作为客户端发布与订阅后一台Broker的所有Topic,实现消息互通的目的。
桥接带来的问题有以下几点:
我的建议:
Websockets协议被设计的目的是为浏览器提供一个全双工的通信协议,方便实现消息推送功能。
在Websockets协议被设计出来前,受限于HTTP协议的一问一答模型,消息的推送只能靠轮询来实现,在资源消耗与时效性保障上,均难以达到令人满意的效果。
Websockets协议复用了HTTP协议的头部信息,告知浏览器接下来的操作将触发协议升级,然后通信双方继续复用HTTP的Header,但报文内容已转变为双方均接受的新协议的格式。
Websockets协议改进了网页浏览中的消息推送的方式,因此被广泛应用在聊天、支付通知等实时性要求比较高的场合下。
MQTT协议重点在于 消息队列的实现,其对消息投递的方式作出约定,并提供一些额外的通信保障 。
MQTT可采取原生的TCP实现,也有基于Websockets的实现版本。当然后者在网络字节的利用率上,不如前者那么精简。但浏览器端无法直接使用TCP协议,所以就只能基于Websockets协议开发。
不过基于Websockets的应用也有方便之处:一是证书不需要额外配置,直接与网站共用一套基础设施;二是可使用 Nginx 等工具管理流量,与普通HTTP流量可共用一套配置方法。
MQTT非常适合入门,原因如下:
实际的应用场景远比理想中的复杂,无法一招走遍天下,必须做好取舍。
MQTT协议在这方面做得很优秀,以后工作中可以作为参考,设计好自己负责的业务系统。
1、打开群晖Docker,注册表搜索:mqtt;
2、点击下载好的镜像文件创建容器,输入容器名字,点击高级设置
3、勾选启用自动重新启动
4、点击网络—-勾选与docker host相同的网络,点击应用,一直下一步直到创建完成
0条评论