如何在 JMeter 中使用 MQTT 插件
JMeter 内置 HTTP/HTTPS、TCP 等支持多种协议,还具备插件扩展机制。
MQTT 协议作为物联网界的主流协议,虽然并非 JMeter 自带的协议类型,但在物联网测试场景中极为普遍。为了支持 MQTT 协议的规模测试,EMQ 映云科技开发了基于 JMeter 的 MQTT 协议开源测试插件: https://githubcom/xmeter-net/mqtt-jmeter 。
经过几个版本的迭代,目前 JMeter MQTT 插件的最新版本为 202,支持连接、消息发布、消息订阅等多种采样器,并可通过组合构建更复杂的测试场景。
本文我们将具体介绍如何在 JMeter 中使用 MQTT 插件。
MQTT 插件的安装方式与其他 JMeter 第三方插件类似。
连接采样器模拟物联网设备,发起 MQTT 连接。
Server name or IP: 指向被测 MQTT 服务器地址。
Port number: 以 EMQ X 为例,默认 TCP 连接的端口是 1883, SSL 连接则是 8883。具体的端口请参照服务器的具体配置。
MQTT version : 目前支持 MQTT 31及311版本。
Timeout: 连接超时设置,以秒为单位。
Protocols: 支持TCP、SSL、WS 和 WSS 方式连接 MQTT 服务器。当选择 SSL 或 WSS 加密通道连接时,可以选择单向或者双向认证(Dual)。如果希望进行双向认证,还需要指定相应的客户端证书(p12证书),以及对应的文件保护密码(Secret)。
User authentication: 如果 MQTT 服务器配置了用户认证,需要提供相应的用户名( User name )和密码( Password )。
ClientId: 虚拟用户的标识。如果勾选了「Add random suffix for ClientId」,将会在 ClientId 的基础上给每个虚拟用户再添加一个 uuid 串作为后缀,整个作为虚拟用户标识。
Keep alive(s): 心跳信号发送间隔。例如,300 表示客户端每隔 300 秒向服务器发出 ping 请求,以保持连接活跃。
Connect attempt max: 第一次连接过程中,尝试重连的最大次数。超过该次数则认为连接失败。如果希望一直尝试重连,可以设为 -1。
Reconnect attempt max: 后继连接过程中,尝试重连的最大次数。超过该次数则认为连接失败。如果希望一直尝试重连,可以设为 -1。
Clean session : 如果希望在连接之间保留会话状态,可以将该选项设为 false。如果不希望在新的连接中保留会话状态,则将该项设为true。
消息发布采样器复用连接采样器中建立的 MQTT 连接,向目标 MQTT 服务器发布消息。
QoS Level: 服务质量,取值为 0,1,2,分别代表 MQTT 协议规范里的至多一次(AT_MOST_ONCE),至少一次(AT_LEAST_ONCE),精确一次(EXACTLY_ONCE)
Retained messages : 如果希望使用「保留消息」,可将该选项设为 true,MQTT 服务器端将会存储插件发布的保留消息及其 QoS,并在相应 topic 上发生订阅时,直接将最后一条保留消息投递给订阅端,使得订阅端不必等待即可获取发布端的最新状态值。
Topic name: 发布消息所属的主题。
Add timestamp in payload: 如果勾选,发布的消息体开头会附带当前时间戳,配合消息订阅采样器的 Payload includes timestamp 选项,可以在消息接收端计算消息达到的延时。如果不勾选则只发送实际的消息体。
Payloads Message type: 目前支持三种消息类型
消息发布采样器复用连接采样器中建立的 MQTT 连接,从目标 MQTT 服务器上订阅消息。
QoS Level: 服务质量,含义与消息发布采样器相同。
Topic name(s): 订阅消息所属的主题。支持单个消息订阅采样器订阅多个主题,主题之间用逗号分隔。
Payload includes timestamp: 如果勾选,会从消息体开头处解析发送时间戳,配合消息发布采样器的 Add timestamp in payload 选项,可以用于计算消息的接收延时。如果不勾选则只解析实际的消息体。
Sample on : 采样方式,默认为" specified elapsed time(ms) ",即每隔指定的毫秒时间采样一次。也可以选择" number of received messages ",即每接收到指定的消息数采样一次。
Debug response: 如果勾选,消息内容会打印在 JMeter 的响应结果中。该选项主要用于调试目的,正式运行测试不建议勾选,以免影响测试效率。
断开连接采样器中建立的 MQTT 连接。
本文我们介绍了 JMeter MQTT 插件的各测试组件,在下期文章中我们将针对不同的测试场景详细介绍如何用 MQTT 插件来构建测试脚本。
java mqtt是什么,让我们一起了解一下?
MQTT是一个基于客户端-服务器的消息发布或订阅传输协议,MQTT协议是轻量、简单、开放和易于实现的,如果工作在TCP或IP协议上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布或订阅型的消息协议。
MQTT主要应用在哪些场合?
1、它是一种机器之间通讯 machine-to-machine (M2M)、物联网 Internet of Things (IoT)常用的一种轻量级消息传输协议。
2、适用于网络带宽较低的场合。
3、包含发布、订阅模式,通过一个代理服务器(broker),任何一个客户端(client)都可以订阅或者发布某个主题的消息,然后订阅了该主题的客户端则会收到该消息。
实战操作:模拟客户端接收消息。 import javautilconcurrentScheduledExecutorService; import orgeclipsepahoclientmqttv3MqttClient; import orgeclipsepahoclientmqttv3MqttConnectOptions; import orgeclipsepahoclientmqttv3MqttTopic; import orgeclipsepahoclientmqttv3persistMemoryPersistence; / 模拟一个客户端接收消息 @author Unclue_liu / public class ClientMQTT { public static final String HOST = "tcp://192168177:1883"; public static final String TOPIC1 = "pos_message_all"; private static final String clientid = "12345678"; private MqttClient client; private MqttConnectOptions options; private String userName = "mqtt"; //非必须 private String passWord = "mqtt"; //非必须 private ScheduledExecutorService scheduler; private void start() { try { // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存 client = new MqttClient(HOST, clientid, new MemoryPersistence()); // MQTT的连接设置 options = new MqttConnectOptions(); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接 optionssetCleanSession(false); // 设置连接的用户名 optionssetUserName(userName); // 设置连接的密码 optionssetPassword(passWordtoCharArray()); // 设置超时时间 单位为秒 optionssetConnectionTimeout(10); // 设置会话心跳时间 单位为秒 服务器会每隔1520秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 optionssetKeepAliveInterval(20); //设置断开后重新连接 optionssetAutomaticReconnect(true); // 设置回调 clientsetCallback(new PushCallback()); MqttTopic topic = clientgetTopic(TOPIC1); //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息 //遗嘱 optionssetWill(topic, "close"getBytes(), 1, true); clientconnect(options); //订阅消息 int[] Qos = {1};//0:最多一次 、1:最少一次 、2:只有一次 String[] topic1 = {TOPIC1}; clientsubscribe(topic1, Qos); } catch (Exception e) { eprintStackTrace(); } } public static void main(String[] args) { ClientMQTT client = new ClientMQTT(); clientstart(); }
1、 什么是MQTT
MQTT(MessageQueueing Telemetry Transport Protocol)的全称是消息队列遥感传输协议的缩写,是由IBM公司推出的一种基于轻量级代理的发布/订阅模式的消息传输协议,运行在TCP协议栈之上,为其提供有序、可靠、双向连接的网络连接保证。由于其开放、简单和易于实现所以能够应用在资源受限的环境中,对于M2M和物联网应用程序来说是一个相当不错的选择。
2、 为什么要用MQTT?
MQTT协议是针对如下情况设计的:
M2M(Machine to Machine) communication,机器端到端通信,比如传感器之间的数据通讯 因为是Machine to Machine,需要考虑: Machine,或者叫设备,比如温度传感器,硬件能力很弱,协议要考虑尽量小的资源消耗,比如计算能力和存储等 M2M可能是无线连接,网络不稳定,带宽也比较小
MQTT的特点:
发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。这一点很类似于1 这里是列表文本XMPP,但是MQTT的信息冗余远小于XMPP
对负载内容屏蔽的消息传输。
使用TCP/IP提供网络连接。主流的MQTT是基于TCP连接进行数据推送的,但是同样有基于UDP的版本,叫做MQTT-SN。这两种版本由于基于不同的连接方式,优缺点自然也就各有不同了。
三种消息传输方式QoS:
0代表“至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
1代表“至少一次”,确保消息到达,但消息重复可能会发生。
2代表“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。 备注:由于服务端采用Mosca实现,Mosca目前只支持到QoS 1
如果发送的是临时的消息,例如给某topic所有在线的设备发送一条消息,丢失的话也无所谓,0就可以了(客户端登录的时候要指明支持的QoS级别,同时发送消息的时候也要指明这条消息支持的QoS级别),如果需要客户端保证能接收消息,需要指定QoS为1,如果同时需要加入客户端不在线也要能接收到消息,那么客户端登录的时候要指定session的有效性,接收离线消息需要指定服务端要保留客户端的session状态。
mqtt基于订阅者模型架构,客户端如果互相通信,必须在同一订阅主题下,即都订阅了同一个topic,客户端之间是没办法直接通讯的。订阅模型显而易见的好处是群发消息的话只需要发布到topic,所有订阅了这个topic的客户端就可以接收到消息了。
发送消息必须发送到某个topic,重点说明的是不管客户端是否订阅了该topic都可以向topic发送了消息,还有如果客户端订阅了该主题,那么自己发送的消息也会接收到。
小型传输,开销很小(固定长度的头部是2字节),协议交换最小化,以降低网络流量。这就是为什么在介绍里说它非常适合“在物联网领域,传感器与服务器的通信,信息的收集”,要知道嵌入式设备的运算能力和带宽都相对薄弱,使用这种协议来传递消息再适合不过了。
使用Last Will和Testament特性通知有关各方客户端异常中断的机制。Last Will:即遗言机制,用于通知同一主题下的其他设备发送遗言的设备已经断开了连接。Testament:遗嘱机制,功能类似于Last Will 。
1安装MQTT插件mqtt-xmeter-jar-with-dependencies
2添加取样器
这里qos:0、1、2不多说,要说的是
1Sample on 这里要选择number of xxxx,这个就会一直等待直到订阅有收到内容位置(这样放在一个循环里很好用,不会说订阅收不到)
2Debug response 打钩后,在你运行的时候可以在响应数据上看到数据(有场景是你的发布需要去到订阅到的信息,解析后发布的)
这里Response Body就能够看到订阅到的消息
这个结构是我要先获取订阅消息,然后把订阅消息提取出来发布。先运行Jmeter然后到订阅的时候就一直等待,我网站点击后(也可以做个Http接口访问操作),网站发布一条消息到我这,我订阅到了,然后取出订阅里的东西,组装后发布。
如果订阅的消息是加密的?我还不懂如何处理。
客户端有两种接口:
主要区别:1 为同步接口,2 为异步接口。
异步接口,提供非阻塞式的方法,后台处理任务,以连接为例,连接到MQTT server是一个耗时操作,非阻塞方式在后台进行连接的时候,可以通知调用方,连接busy的状态。
非阻塞方式在时间驱动型程序和图形界面程序中较为多用,不会影响UI线程的绘制。
举例:连接,同步方式
连接,异步方式
异步回调中如果需要使用context上下文,可以通过异步方法传入,最终可在回调中,返回给调用方。
同步阻塞方法
与异步client相似, ~~ 表示同上
Android client的主要实现类,extends BroadcastReceiver ,实现 IMqttAsyncClient
通过 Android的service服务于 MQTT服务进行通信。提供了包含 一下方法的简单易用的MQTT 客户端:
主要进行的操作:
创建连接
消息重发机制:在发送消息的时候,如果期间连接中断或client停止,消息会在满足所有以下条件且再次创建连接后,被已设定好的QoS被发送。
关于Topic
方式1:
方式2:
省略了方式1的 MqttMessage 的构造。
注意:
机制:
订阅单一Topic
订阅多个Topic
优势:优化比逐一订阅
取消订阅与订阅是相反的,取消订阅需要在服务端收到取消后,查询是否有match的订阅,然后移除。
取消订阅
取消多个订阅
注意:
断线时候消息发送的机制:
方式1:
方式2:带超时机制
方式3:带回调
在客户端stop的时候,可能还有未发送的message,这种情况下,可以通过 getPendingDeliveryTokens 方法在客户端重启后获取为发送消息(in-flight message)的token,从而追踪这些消息的状态。
替代方法:MqttCallback中的deliveryComplete 也可以获取到消息送达的状态。
注意:
设置 MqttTraceHandler ,和使能在service中trace的功能
MqttTraceCallback 是最简单的 MqttTraceHandler 的实现,直接输出到Android的Logcat
使用 BroadcastReceiver 的方式接收从 MqttService 的返回消息:
具体以sub的消息接收为参考(messageArrivedAction):
从SparseArray的tokenMap中移除token:
注意:
0条评论