如何用wireshark分析mqtt协议
MQTT是一项消息传递技术,由IBM再2001年发布。
总结一下,机制就是使用一个代理服务器messagebroker,
客户端client连接上这个服务器,然后告诉服务器说,我可以接收哪些类型的消息,
同时,client也可以发布自己的消息,这些消息根据协议的内容,可以被其他client获取。
只要手机客户端,连上服务器,然后就可以接收和发布消息了,不用自己写socket什么了,
低带宽,低耗电量,代码量也少,很简单吧。
package compigtestmqtt;
import comibmmqttMqttClient;
import
comibmmqttMqttException;
import comibmmqttMqttSimpleCallback;
public class SubscribeClient {
private final static String
CONNECTION_STRING = "tcp://192168160:1883";
private final static boolean
CLEAN_START = true;
private final static short KEEP_ALIVE =
30;//低耗网络,但是又需要及时获取数据,心跳30s
private final static String CLIENT_ID =
"client1";
private final static String[] TOPICS =
{
"Test/TestTopics/Topic1",
"Test/TestTopics/Topic2",
"Test/TestTopics/Topic3",
"tokudu/client1"
};
private
final static int[] QOS_VALUES = {0, 0, 2,
0};
//////////////////
private MqttClient mqttClient =
null;
public SubscribeClient(String i){
try {
mqttClient =
new MqttClient(CONNECTION_STRING);
SimpleCallbackHandler
simpleCallbackHandler = new
SimpleCallbackHandler();
mqttClientregisterSimpleHandler(simpleCallbackHandler);//注册接收消息方法
mqttClientconnect(CLIENT_ID+i,
CLEAN_START, KEEP_ALIVE);
mqttClientsubscribe(TOPICS,
QOS_VALUES);//订阅接主题
/
完成订阅后,可以增加心跳,保持网络通畅,也可以发布自己的消息
/
mqttClientpublish(PUBLISH_TOPICS, "keepalive"getBytes(), QOS_VALUES[0],
true);
} catch (MqttException e) {
// TODO Auto-generated
catch block
eprintStackTrace();
}
}
/
简单回调函数,处理client接收到的主题消息
@author pig
/
class SimpleCallbackHandler implements MqttSimpleCallback{
/
当客户机和broker意外断开时触发
可以再此处理重新订阅
/
@Override
public void connectionLost() throws Exception {
//
TODO Auto-generated method
stub
Systemoutprintln("客户机和broker已经断开");
}
/
客户端订阅消息后,该方法负责回调接收处理消息
/
@Override
public void
publishArrived(String topicName, byte[] payload, int Qos, boolean retained)
throws Exception {
// TODO Auto-generated method
stub
Systemoutprintln("订阅主题: " +
topicName);
Systemoutprintln("消息数据: " + new
String(payload));
Systemoutprintln("消息级别(0,1,2): " +
Qos);
Systemoutprintln("是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): " +
retained);
}
}
/
高级回调
@author pig
/
class AdvancedCallbackHandler implements MqttSimpleCallback{
@Override
public void connectionLost() throws Exception {
//
TODO Auto-generated method stub
}
@Override
public void publishArrived(String arg0, byte[] arg1, int
arg2,
boolean arg3) throws Exception {
// TODO Auto-generated
method stub
}
}
/
@param args
/
public static void main(String[] args) {
// TODO Auto-generated
method stub
new SubscribeClient("" + i);
}
}
broker服务器,MQTT的jar包,记得下载啊,没有就消息我咯~
到这里,如果完成IBM的MQTT协议实现push消息的实例的,
都会有个问题,好像没考虑到安全问题,如果客户端连上来作乱怎么办呢?
上面用的broker时rsmb的,mqtt的简单服务器。
IBM已经推出了MQTT V31版本,已经加入了安全验证机制,不要怕啦。
转载仅供参考,版权属于原作者。祝你愉快,哦
定义3个虚拟用户组,分别进行连接,发布,订阅操作。
连接者:虚拟用户数=10, 发起10个到服务器的背景连接,每2秒钟发一次心跳请求,15秒后断连。
发布者:虚拟用户数=5,每隔一秒发送一条32字节的随机串,带时间戳,循环10次结束。
订阅者:虚拟用户数=2,接收相应话题上的消息,每隔一秒输出采样间隔里接收到的消息统计,循环10次结束。
在JMeter中运行该场景,15秒钟左右运行结束。可以在测试结果树中看到每个操作的详细信息。表格视图的"Sample Time"列出了操作花费的时间(毫秒)。由于这次测试在本地,并发用户数很少,所以连接、消息发送、接收时延都很短。您可以指向实际的MQTT服务器,模拟更大的并发用户数。
时间:2018-07-26
Q: 什么是网络连接?
A: 网络连接是传输层定义的概念,在传输层以下只存在网络数据包的相互交换。
所谓连接,其实也不是在网络上有一条真实存在的数据通道。只要通信双方在一段时间内持续保持数据包交换,就可以视为双方建立的连接并没有断开。
连接的建立是依托于TCP协议的三次握手,一旦连接已经建立完毕,通信双方就可以复用这条虚拟通道进行数据交换。如果连接保持长时间工作一直没有被中断,那么这样的TCP连接就俗称为长连接。
Message Queue Telemetry Transport ,中文直译: 消息队列遥测传输协议 。
在MQTT协议被设计出来的年代,还没有物联网这么时髦的词汇,当年叫做 遥测设备 。
MQTT协议真正开始声名鹊起的原因,是其正好恰恰踩中移动互联网发展的节拍,为消息推送场景提供了一个既简便又具有良好扩展性的现成解决方案。
http://docsoasis-openorg/mqtt/mqtt/v311/os/mqtt-v311-oshtml
可以看出,MQTT对消息头的规定十分精简, 固定头部占用空间大小仅为1个字节 ,一个最小的报文占用的空间也 只有两个字节 (带一字节的长度标识位)。
这也是MQTT协议针对不稳定及带宽低下的网络环境做出的特定设计 - - - - 尽可能地节省一切不必要的网络开销 。
Q:为什么MQTT协议需要心跳报文(PINGREQ, PINGRESP)来维护连接状态,只监控该TCP的连接状态是否可以实现目的?
A: TCP数据传输默认的超时时间过长,不符合应用层上细粒度的要求。
TCP数据传输超时的情况可分成三种: 服务端断开 、 客户端断开 、 中间网络断开 。
在前两种场景下,若断开操作是一方主动发起的,即表示为TCP连接正常结束,双方走四次挥手流程;若程序异常结束,则会触发被动断开事件,通信另一方也能立刻感知到本次连接所打开的 Socket 出现中断异常。
唯独中间网络的状态是通信双方不能掌握的。 在Linux系统下 ,TCP的连接超时由内核参数来控制,如果通信中的一方没有得到及时回复,默认会主动再尝试 6次 。如果还没有得到及时回应,那么其才会认定本次数据超时。
连带首次发包与六次重试,Linux系统下这7次发包的超时时间分别为 2的0次方 至 2的6次方 ,即1秒、2秒、4秒、8秒、16秒、32秒、64秒,一共127秒。MQTT协议认为如此长的超时时间对应用层而言粒度太大,因此其在应用层上还单独设计属于自身的心跳响应控制。常见的MQTT连接超时多被设定为 60秒 。
扩展知识 - TCP的KeepAlive机制: http://hengyunabcgithubio/why-we-need-heartbeat/
由通信中的 报文标识符 ( Packet Identifier )传达。
Q:仅Publish与Pubrec能保证消息只被投递一次吗?
A: 业务上可以实现,但MQTT协议并没有如此设计,原因如下:
每个消息都会拥有属于自己的报文标识符,但如果需要两次数据交换就实现消息仅只收到一次,就需要通信双方记录下每次使用的报文标识符,并且在处理每一条消息时都需要去重处理,以防消息被重复消费。
但MQTT协议最初被设计的工作对象是轻量级物联设备,为此在协议的设计中报文标识符被约定为 可重用 ,以减少对设备性能的消耗,换回的代价不得不使用四次网络数据交换,才能确保消息正好被消费一次。
Q:两个不同客户端在发布与订阅同一Topic下的消息时,都可以提出通信Qos要求,此时以哪项为基准?
A: 伪命题,故意在分享时埋下坑,等人来踩。
两个不同客户端的通信是需要 Broker 进行中转,而不是直连。因此,通信中存在两个不同的会话,双方的Qos要求仅仅作用于它们与 Broker 之间的会话,最终的Qos基准只会向最低要求方看齐。
例:遗嘱消息的正确使用方式可参考此篇文章: https://wwwhivemqcom/blog/mqtt-essentials-part-9-last-will-and-testament
虽然可以借助 Retain Message 实现绑定一条消息至某个Topic,以达到消息的暂时保留目的。
但首先 Retain Message 并不是为存储场景而设计的,再次MQTT协议并没有对消息的持久化作出规定,也就是说Broker重启后,现有保留消息也将丢失。
Q:两种特殊消息的使用场景?
A: 遗嘱消息,多用于客户端间获取互相之间异常断线的消息通知;
保留消息,可保存 最近一条 广播通知,多用于公告栏信息的发布。
Eclipse Mosquitto :MQTT协议的最小集实现
有 EMQ , HiveMQ , RabbitMQ MQTT Adapter 等。
Qos=2 消息保障的网络I/O次数过多,如果不是必需,尽少在程序里使用此类消息。
毕竟当初其设计的目的是为了减少设备的性能占用,但若应用场景并不是物联网,而是用于手机、电脑或浏览器端等现在已不缺性能的设备上,最好在报文体中,使用UUID生成全局唯一的消息ID,然后自行在业务解析中判断此报文是否被消费过。
或者,业务方在处理消息时保证其被消费的幂等性,也可消除重复消息对系统带来的影响。
正如MQTT协议并没有依赖TCP连接状态,自己在应用层协议上实现心跳报文来控制连接状态,业务方作为MQTT协议的使用者,也不要完全依赖协议的工作状态,而是依托MQTT协议建立属于业务本身的信息汇报机制,以加强系统的稳健性。
Retain Message 可视为客户端主动拉取的行为。如果业务系统采用 HTTP+MQTT 双协议描述业务过程,主动拉取的操作也可使用 HTTP 请求替代。
作为一个长连接型的应用,上线前需要根据业务量级,评估对操作系统 端口数 与 文件描述符 的占用要求,以防服务器资源被打满。
在服务端的配置文件和客户端的连接参数中,都拥有 max_inflight_messages 此项配置,来维护 Qos=1 or 2 消息是否被成功消费的状态。
MQTT 最初被设计为物联网级的通信协议,因此此参数的默认配额较小(大多数情况下被限制到10至20)。
但如果将MQTT协议应用至手机、PC或Web端的推送场景时,硬件性能已不在是瓶颈,在实际使用中推荐把此参数调大。
Mosquitto提供Bridge功能,需要我们自己配置。
Bridge 意为桥接,当我们把两台Broker桥接在一起时,只需要修改一台Broker的配置,填上另一台Broker的运行地址。前一台Broker将作为客户端发布与订阅后一台Broker的所有Topic,实现消息互通的目的。
桥接带来的问题有以下几点:
我的建议:
Websockets协议被设计的目的是为浏览器提供一个全双工的通信协议,方便实现消息推送功能。
在Websockets协议被设计出来前,受限于HTTP协议的一问一答模型,消息的推送只能靠轮询来实现,在资源消耗与时效性保障上,均难以达到令人满意的效果。
Websockets协议复用了HTTP协议的头部信息,告知浏览器接下来的操作将触发协议升级,然后通信双方继续复用HTTP的Header,但报文内容已转变为双方均接受的新协议的格式。
Websockets协议改进了网页浏览中的消息推送的方式,因此被广泛应用在聊天、支付通知等实时性要求比较高的场合下。
MQTT协议重点在于 消息队列的实现,其对消息投递的方式作出约定,并提供一些额外的通信保障 。
MQTT可采取原生的TCP实现,也有基于Websockets的实现版本。当然后者在网络字节的利用率上,不如前者那么精简。但浏览器端无法直接使用TCP协议,所以就只能基于Websockets协议开发。
不过基于Websockets的应用也有方便之处:一是证书不需要额外配置,直接与网站共用一套基础设施;二是可使用 Nginx 等工具管理流量,与普通HTTP流量可共用一套配置方法。
MQTT非常适合入门,原因如下:
实际的应用场景远比理想中的复杂,无法一招走遍天下,必须做好取舍。
MQTT协议在这方面做得很优秀,以后工作中可以作为参考,设计好自己负责的业务系统。
1、 下载mosquitto安装文件()
2、 找到相应系统的安装文件安装,如果不想做任何设置直接在服务里启动就行。
3、配置文件
如果需要配置一些用户名、密码、用户权限的参数,则需要修改安装目录下的mosquittoconf文件
下面来说说我用到的一些参数吧:
①用户密码: #password_file pwfileexample 后面跟着是用户密码配置文件,需写上绝对路径并且路径不带空格
②创建用户密码:打开doc窗口,进入mosquitto安装目录,运行mosquitto_passwd -c pwfileexample userName 回车,然后输入密码(密码输入两遍后,在该文件里会自动加密密码)
生成的文件内容格式例如:
userName:$6$Ls7JYQTdn9xagJJ2$zngeT758n1Wn1hnVLjFdK2cHb6lcmI5CMrMTNZe2SqkUj0fBgKts62gvlyWYwdY3/WArx/SAtFRKlvKKnHRCUg==
userName2:$6$bymgVcrtj+7wj8mR$nq1atPD3nreRgA6gDbDjfbUGZIlrmenOcWrXMoneBp+zmAxnOybqJvrBZboxX1XXPnz/TKZwz9aKQJ72zJym5A=
③如果想再增加用户,则执行mosquitto_passwd -u pwfileexample userName2即可
④用户权限:#acl_file aclfileexample 后面跟着是用户权限配置文件,需写上绝对路径并且路径
文件内容格式为:
user userName
/etc/ldsoconfd
mosquittoconf
/usr/local/lib/python26/site-packages ( mosquittopy )
/usr/local/bin
vi /etc/sysconfig/iptables
/usr/local/src/mosquitto-113/lib/python
make install
ldconfig
不改configmk里面的东西
需要安装
yum -y install patch make gcc gcc-c++ gcc-g77 flex bison
centos56下 yum -y install gcc automake autoconf libtool make
yum -y install openssl openssl-devel vim-minimal
这里为只读
topic read 主题
user userName2
这里为可读可写
topic 主题
topic #(或+)表示可以读写任何主题
到这里用户密码及权限已配置完成,订阅和发布的时候加上用户名及密码即可验证:
例如:订阅
client = new MqttClient("tcp://127001:1883","java_client0000000000");
// 回调处理类
Myback callback = new Myback();
clientsetCallback(callback);
// 创建连接可选项信息
MqttConnectOptions conOptions = new MqttConnectOptions();
conOptionssetCleanSession(false);
conOptionssetUserName("userName");
conOptionssetPassword("pwd"toCharArray());
// 连接broker
clientconnect(conOptions);
clientsubscribe("主题");
}
发布:
MqttClient client = new MqttClient("tcp://127001:1883","mqttserver-pub");
MqttTopic topic = clientgetTopic("主题");
MqttMessage message = new MqttMessage(topicgetName()getBytes());
messagesetQos(1);
MqttConnectOptions options = new MqttConnectOptions();
optionssetUserName("userName");
optionssetPassword("pwd"toCharArray());
clientconnect(options);
topicpublish(message);
}
即可验证!
windows的平台 由于有32位的 也有64位的 所以 去下一个和自己的电脑 相区配的。
下载后直接双击安装就行 之后 进入安装目录 最好用命令行的方式 启动mosquittoexe 然后就是 编写 android 客户端代码 mqtt 是订阅/发布的方式 eoeandroid 安卓开发社区上有详细的教程,你自己可以找一下。。
MQTT通信协议的基本介绍参考文章 NT35 MQTT通信 ,本篇给出阿里云的基本操作,NT35E通过订阅阿里云的主题&发布信息与阿里云平台相互通信。
登录阿里云 → 工作台 → 物联网平台 → 进入控制台→ 公共实例
阿里云默认通信协议为MQTT,不需要特殊选择,用户按照如下步骤创建自己的产品:
创建产品 → 添加设备
在"查看"标签中,包含了MQTT连接的基本三元组信息,也就是后面设备要填充的基本参数
用户每定义一类产品都会自动生成对应的Topic列表,当然我们也可以"自定义Topic"便于自己测试。
指令解析参考《Lierda NT35E&NT26E-CN AT命令手册》,这里给出使用到的AT指令对应参数说明以便于理解。
AT+LMQTTCFG=cloud,<tcpconnectID>[,<cloud _ type>,<data_type > ]
<tcpconnectID> 。MQTT Socket 标识符。范围:0~4。
<cloud_type>整型。2 alibaba,其他参数指定其他平台
<data_type>整型。阿里云平台 1 json数据
AT+LMQTTCFG="cloud",0,2,1 对应就是对接阿里云平台,发送json格式的数据
AT+LMQTTCFG=aliauth,<tcpconnectID>[,<product_key>,<device_name>,<device_secret>]
填充阿里云平台中设备的三元组信息
AT+LMQTTCFG="aliauth",0,"a1JszCpjS61","NT35E_06011","390358fc595040aa73221e8393aba86c"
这部分是模组进行TCP链路连接(需抓包确认)
AT+LMQTTOPEN=<tcpconnectID>,<host_name>,<port>
host_name对应阿里云 "设备信息"→"MQTT连接参数" 中的 "mqttHostUrl"
AT+LMQTTOPEN=0,"a1JszCpjS61iot-as-mqttcn-shanghaialiyuncscom",1883
模组作为客户端,通过MQTT协议连接到服务器(需抓包确认)
AT+LMQTTCONN=<tcpconnectID>[,<clientID>[,<username>[,<password>]]]
<clientID>字符串型。客户端标识符。用户可以随便定义。 <username>,<password> 不需要填写
AT+LMQTTCONN=0,"NT35E"
AT+LMQTTSUBUNSUB=<tcpconnectID>,<subflag>,<msgID>,<topic1>[,<qos1>[,<topic2>[,<qos2>]d…]]
<subflag>整型。消息类型 0 订阅 1 取消订阅
<msgID>整型。数据包消息标识符。范围:0~65535。
<topic>带双引号的字符串型。客户端订阅或者退订的主题。长度范围:0~256 字节。
<qos>整型。客户端发送订阅消息(SUBSCRIBE)的 QoS 等级,此时为必选参数。2 正好一次,该主题下的消息确保接收端仅接收到一次
AT+LMQTTSUBUNSUB=0,0,1,"/a1JszCpjS61/ NT35E_06011 /user/COMMUTEST",2
这里注意<topic>对应参数的替换,里面的deviceName需要替换。
订阅主题之后,服务器下发的数据模组就可以正常接收了。模组下发位置
发布消息在对应的设备目录下,如果有设备"订阅"对应的消息,平台"发布"相应的数据设备就可以接收到了。
AT+LMQTTPUB=<tcpconnectID>,<msgID>,<qos>,<retain>,<topic>,<msglen>,<msg>
<msgID>整型。 0~65535。任意定义,但<qos>=0 时,该参数值只能为0。
<qos>整型。 0 最多一次 1 至少一次 2 正好一次
<retain>整型 。服务器是否保存该消息。0 不保存 1 保存
<topic>带双引号的字符串型。 客户端发布消息的主题。长度范围:0~256 字节
<msglen>整型 。指定的消息数据长度。范围:0~1460。
<msg>字符串型。 需要发布的消息数据。
AT+LMQTTPUB=0,0,0,1,"/a1JszCpjS61/ NT35E_06011 /user/COMMUTEST",10,"1122334455"
注意刚刚自己创建的主题属性是" 发布和订阅 ",所以模组发送该主题的信息,阿里云也是可以收到的
注意这里模组发送数据的时候,也推送了自己发送的数据,因为刚刚订阅了这个主题,所以模组订阅(收)到了对应的数据
前面我们通过NT35E与平台进行信息交互,那么为什么是这样填写对应的参数呢,每个参数对应的说明在阿里云上是什么样的呢,用户可以查看阿里云的<帮助文档>进行确认。
上面我们使用三元组的方式( 一机一密 )实现NT35E与阿里云平台通信,但实际生产过程中该方式不好实现,比如工厂有1000个设备生产,如果每个设备都复制不同的三元组,很难实现工厂批量化生产,此时可以通过 一型一密 的通信方式解决该问题。
一型一密模组端实现方式后续更新。
0条评论