如何采用MQTT协议实现android消息推送
MQTT协议实现android消息推送,我想每个Android开发人员对它应该都是比较熟悉的。 MQ 遥测传输 (MQTT) 是轻量级基于代理的发布/订阅的消息传输协议,设计思想是开放、简单、轻量、易于实现。这些特点使它适用于受限环境。例如,但不仅限于此:网络代价昂贵,带宽低、不可靠。在嵌入设备中运行,处理器和内存资源有限。该协议的特点有:使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。对负载内容屏蔽的消息传输。 使用 TCP/IP 提供网络连接。有三种消息发布服务质量:“至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。“至少一次”,确保消息到达,但消息重复可能会发生。“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制MQTT最简单的使用包括两种,一种是发消息,一种是订阅消息。 发消息就是向一个固定IP地址的某个主题发送消息(publish)订阅消息是向服务器端订阅某些主题,当其他客户端向服务器的这个主题广播消息时,那么所有订阅这个主题的客户端就都能收到了MQTT是一项消息传递技术,由IBM再2001年发布。总结一下,机制就是使用一个代理服务器message broker,客户端client连接上这个服务器,然后告诉服务器说,我可以接收哪些类型的消息,同时,client也可以发布自己的消息,这些消息根据协议的内容,可以被其他client获取。只要手机客户端,连上服务器,然后就可以接收和发布消息了,不用自己写socket什么了,低带宽,低耗电量,代码量也少,很简单吧。 package compigtestmqtt;002003 import comibmmqttMqttClient;004 import comibmmqttMqttException;005 import comibmmqttMqttSimpleCallback;006007 public class SubscribeClient {008 private final static String CONNECTION_STRING = "tcp://192168160:1883";009 private final static boolean CLEAN_START = true;010 private final static short KEEP_ALIVE = 30;//低耗网络,但是又需要及时获取数据,心跳30s011 private final static String CLIENT_ID = "client1";012 private final static String[] TOPICS = {013 "Test/TestTopics/Topic1",014 "Test/TestTopics/Topic2",015 "Test/TestTopics/Topic3",016 "tokudu/client1"017 };018 private final static int[] QOS_VALUES = {0, 0, 2, 0};019020 //////////////////021 private MqttClient mqttClient = null;022023 public SubscribeClient(String i){024 try {025 mqttClient = new MqttClient(CONNECTION_STRING);026 SimpleCallbackHandler simpleCallbackHandler = new SimpleCallbackHandler();027 mqttClientregisterSimpleHandler(simpleCallbackHandler);//注册接收消息方法028 mqttClientconnect(CLIENT_ID+i, CLEAN_START, KEEP_ALIVE);029 mqttClientsubscribe(TOPICS, QOS_VALUES);//订阅接主题030031 /032 完成订阅后,可以增加心跳,保持网络通畅,也可以发布自己的消息033 /034035 mqttClientpublish(PUBLISH_TOPICS, "keepalive"getBytes(), QOS_VALUES[0], true);036037 } catch (MqttException e) {038 // TODO Auto-generated catch block039 eprintStackTrace();040 }041 }042043 /044 简单回调函数,处理client接收到的主题消息045 @author pig046 047 /048 class SimpleCallbackHandler implements MqttSimpleCallback{ 049050 /051 当客户机和broker意外断开时触发052 可以再此处理重新订阅053 /054 @Override055 public void connectionLost() throws Exception {056 // TODO Auto-generated method stub057 Systemoutprintln("客户机和broker已经断开");058 }059060 /061 客户端订阅消息后,该方法负责回调接收处理消息062 /063 @Override064 public void publishArrived(String topicName, byte[] payload, int Qos, booleanretained) throws Exception {065 // TODO Auto-generated method stub066 Systemoutprintln("订阅主题: " + topicName);067 Systemoutprintln("消息数据: " + new String(payload));068 Systemoutprintln("消息级别(0,1,2): " + Qos);069 Systemoutprintln("是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): " + retained);070 }071072 }073074 /075 高级回调076 @author pig077 078 /079 class AdvancedCallbackHandler implements MqttSimpleCallback{080081 @Override082 public void connectionLost() throws Exception {083 // TODO Auto-generated method stub084085 }086087 @Override088 public void publishArrived(String arg0, byte[] arg1, int arg2,089 boolean arg3) throws Exception {090 // TODO Auto-generated method stub091092 }093094 }095096 /097 @param args098 /099 public static void main(String[] args) {100 // TODO Auto-generated method stub101 new SubscribeClient("" + i);102103 }104105 }
什么是微服务?
微服务(Microservices Architecture)是一种架构风格,一个大型复杂软件应用由一个或多个微服务组成。系统中的各个微服务可被独立部署,各个微服务之间是松耦合的。每个微服务仅关注于完成一件任务并很好地完成该任务。在所有情况下,每个任务代表着一个小的业务能力。
微服务的概念源于2014年3月Martin Fowler所写的文章“Microservices” martinfowlercom/articles/mi…
单体架构(Monolithic Architecture )
企业级的应用一般都会面临各种各样的业务需求,而常见的方式是把大量功能堆积到同一个单体架构中去。比如:常见的ERP、CRM等系统都以单体架构的方式运行,同时由于提供了大量的业务功能,随着功能的升级,整个研发、发布、定位问题,扩展,升级这样一个“怪物”系统会变得越来越困难。
这种架构模式就是把应用整体打包部署,具体的样式依赖本身应用采用的语言,如果采用java语言,自然你会打包成war包,部署在Tomcat或者Jetty这样的应用服务器上,如果你使用spring boot还可以打包成jar包部署。其他还有Rails和Nodejs应用以目录层次的形式打包
上图:单体架构
大部分企业通过SOA来解决上述问题,SOA的思路是把应用中相近的功能聚合到一起,以服务的形式提供出去。因此基于SOA架构的应用可以理解为一批服务的组合。SOA带来的问题是,引入了大量的服务、消息格式定义和规范。
多数情况下,SOA的服务直接相互独立,但是部署在同一个运行环境中(类似于一个Tomcat实例下,运行了很多web应用)。和单体架构类似,随着业务功能的增多SOA的服务会变得越来越复杂,本质上看没有因为使用SOA而变的更好。图1,是一个包含多种服务的在线零售网站,所有的服务部署在一个运行环境中,是一个典型的单体架构。
单体架构的应用一般有以下特点:
微服务架构(Microservices Architecture)
微服务架构的核心思想是,一个应用是由多个小的、相互独立的、微服务组成,这些服务运行在自己的进程中,开发和发布都没有依赖。不同服务通过一些轻量级交互机制来通信,例如 RPC、HTTP 等,服务可独立扩展伸缩,每个服务定义了明确的边界,不同的服务甚至可以采用不同的编程语言来实现,由独立的团队来维护。简单的来说,一个系统的不同模块转变成不同的服务!而且服务可以使用不同的技术加以实现!
上图:微服务架构
微服务设计
那我们在微服务中应该怎样设计呢。以下是微服务的设计指南:
微服务消息
在单体架构中,不同功能之间通信通过方法调用,或者跨语言通信。SOA降低了这种语言直接的耦合度,采用基于SOAP协议的web服务。这种web服务的功能和消息体定义都十分复杂,微服务需要更轻量的机制。
同步消息 REST
同步消息就是客户端需要保持等待,直到服务器返回应答。REST是微服务中默认的同步消息方式,它提供了基于HTTP协议和资源API风格的简单消息格式,多数微服务都采用这种方式(每个功能代表了一个资源和对应的操作)
异步消息 – AMQP, STOMP, MQTT
异步消息就是客户端不需要一直等待服务应答,有应到后会得到通知。某些微服务需要用到异步消息,一般采用AMQP, STOMP, MQTT 这三种通讯协议
消息格式 – JSON, XML, Thrift, ProtoBuf, Avro
消息格式是微服务中另外一个很重要的因素。SOA的web服务一般采用文本消息,基于复杂的消息格式(SOAP)和消息定义(xsd)。微服务采用简单的文本协议JSON和XML,基于HTTP的资源API风格。如果需要二进制,通过用到Thrift, ProtoBuf, Avro。
服务约定 – 定义接口 – Swagger, RAML, Thrift IDL
如果把功能实现为服务,并发布,需要定义一套约定。单体架构中,SOA采用WSDL,WSDL过于复杂并且和SOAP紧耦合,不适合微服务。
REST设计的微服务,通常采用Swagger和RAML定义约定。
对于不是基于REST设计的微服务,比如Thrift,通常采用IDL(Interface Definition Languages),比如Thrift IDL。
微服务集成 (服务间通信)
大部分微服务基于RPC、HTTP、JSON这样的标准协议,集成不同标准和格式变的不再重要。另外一个选择是采用轻量级的消息总线或者网关,有路由功能,没有复杂的业务逻辑。下面就介绍几种常见的架构方式。
点对点方式
点对点方式中,服务之间直接用。每个微服务都开放REST API,并且调用其它微服务的接口。
上图:通过点对点方式通信
很明显,在比较简单的微服务应用场景下,这种方式还可行,随着应用复杂度的提升,会变得越来越不可维护。这点有些类似SOA的ESB,尽量不采用点对点的集成方式。
API-网关方式
API网关方式的核心要点是,所有的客户端和消费端都通过统一的网关接入微服务,在网关层处理所有的非业务功能个。通常,网关也是提供REST/HTTP的访问API。服务端通过API-GW注册和管理服务。
上图:通过API-网关暴露微服务
所有的业务接口通过API网关暴露,是所有客户端接口的唯一入口。微服务之间的通信也通过API网关。\
采用网关方式有如下优势:
目前,API网关方式应该是微服务架构中应用最广泛的设计模式。
消息代理方式
微服务也可以集成在异步的场景下,通过队列和订阅主题,实现消息的发布和订阅。一个微服务可以是消息的发布者,把消息通过异步的方式发送到队列或者订阅主题下。作为消费者的微服务可以从队列或者主题共获取消息。通过消息中间件把服务之间的直接调用解耦。
上图:异步通信方式
通常异步的生产者/消费者模式,通过AMQP, STOMP, MQTT 等异步消息通讯协议规范。
数据的去中心化
单体架构中,不同功能的服务模块都把数据存储在某个中心数据库中。
每个微服务有自己私有的数据库,其它微服务不能直接访问。单体架构,用一个数据库存储所有数据
微服务方式,多个服务之间的设计相互独立,数据也应该相互独立(比如,某个微服务的数据库结构定义方式改变,可能会中断其它服务)。因此,每个微服务都应该有自己的数据库。
每个微服务有自己私有的数据库,其它微服务不能直接访问。每个微服务有自己私有的数据库,其它微服务不能直接访问。
数据去中心话的核心要点:
数据的去中心化,进一步降低了微服务之间的耦合度,不同服务可以采用不同的数据库技术(SQL、NoSQL等)。在复杂的业务场景下,如果包含多个微服务,通常在客户端或者中间层(网关)处理。
微服务架构的优点:
微服务架构的缺点:
微服务的一些想法在实践上是好的,但当整体实现时也会呈现出其复杂性。
关于微服务架构的取舍
首先需要搭建MQTT服务器,然后搭建MySQL数据库,然后使用海为组态写段程序,最后配置客户端验证即可。具体可以参考内容 Haiwell(海为)HMI/CBOX/IPC MQTT 配置应用教程 网页链接
时间:2018-07-26
Q: 什么是网络连接?
A: 网络连接是传输层定义的概念,在传输层以下只存在网络数据包的相互交换。
所谓连接,其实也不是在网络上有一条真实存在的数据通道。只要通信双方在一段时间内持续保持数据包交换,就可以视为双方建立的连接并没有断开。
连接的建立是依托于TCP协议的三次握手,一旦连接已经建立完毕,通信双方就可以复用这条虚拟通道进行数据交换。如果连接保持长时间工作一直没有被中断,那么这样的TCP连接就俗称为长连接。
Message Queue Telemetry Transport ,中文直译: 消息队列遥测传输协议 。
在MQTT协议被设计出来的年代,还没有物联网这么时髦的词汇,当年叫做 遥测设备 。
MQTT协议真正开始声名鹊起的原因,是其正好恰恰踩中移动互联网发展的节拍,为消息推送场景提供了一个既简便又具有良好扩展性的现成解决方案。
http://docsoasis-openorg/mqtt/mqtt/v311/os/mqtt-v311-oshtml
可以看出,MQTT对消息头的规定十分精简, 固定头部占用空间大小仅为1个字节 ,一个最小的报文占用的空间也 只有两个字节 (带一字节的长度标识位)。
这也是MQTT协议针对不稳定及带宽低下的网络环境做出的特定设计 - - - - 尽可能地节省一切不必要的网络开销 。
Q:为什么MQTT协议需要心跳报文(PINGREQ, PINGRESP)来维护连接状态,只监控该TCP的连接状态是否可以实现目的?
A: TCP数据传输默认的超时时间过长,不符合应用层上细粒度的要求。
TCP数据传输超时的情况可分成三种: 服务端断开 、 客户端断开 、 中间网络断开 。
在前两种场景下,若断开操作是一方主动发起的,即表示为TCP连接正常结束,双方走四次挥手流程;若程序异常结束,则会触发被动断开事件,通信另一方也能立刻感知到本次连接所打开的 Socket 出现中断异常。
唯独中间网络的状态是通信双方不能掌握的。 在Linux系统下 ,TCP的连接超时由内核参数来控制,如果通信中的一方没有得到及时回复,默认会主动再尝试 6次 。如果还没有得到及时回应,那么其才会认定本次数据超时。
连带首次发包与六次重试,Linux系统下这7次发包的超时时间分别为 2的0次方 至 2的6次方 ,即1秒、2秒、4秒、8秒、16秒、32秒、64秒,一共127秒。MQTT协议认为如此长的超时时间对应用层而言粒度太大,因此其在应用层上还单独设计属于自身的心跳响应控制。常见的MQTT连接超时多被设定为 60秒 。
扩展知识 - TCP的KeepAlive机制: http://hengyunabcgithubio/why-we-need-heartbeat/
由通信中的 报文标识符 ( Packet Identifier )传达。
Q:仅Publish与Pubrec能保证消息只被投递一次吗?
A: 业务上可以实现,但MQTT协议并没有如此设计,原因如下:
每个消息都会拥有属于自己的报文标识符,但如果需要两次数据交换就实现消息仅只收到一次,就需要通信双方记录下每次使用的报文标识符,并且在处理每一条消息时都需要去重处理,以防消息被重复消费。
但MQTT协议最初被设计的工作对象是轻量级物联设备,为此在协议的设计中报文标识符被约定为 可重用 ,以减少对设备性能的消耗,换回的代价不得不使用四次网络数据交换,才能确保消息正好被消费一次。
Q:两个不同客户端在发布与订阅同一Topic下的消息时,都可以提出通信Qos要求,此时以哪项为基准?
A: 伪命题,故意在分享时埋下坑,等人来踩。
两个不同客户端的通信是需要 Broker 进行中转,而不是直连。因此,通信中存在两个不同的会话,双方的Qos要求仅仅作用于它们与 Broker 之间的会话,最终的Qos基准只会向最低要求方看齐。
例:遗嘱消息的正确使用方式可参考此篇文章: https://wwwhivemqcom/blog/mqtt-essentials-part-9-last-will-and-testament
虽然可以借助 Retain Message 实现绑定一条消息至某个Topic,以达到消息的暂时保留目的。
但首先 Retain Message 并不是为存储场景而设计的,再次MQTT协议并没有对消息的持久化作出规定,也就是说Broker重启后,现有保留消息也将丢失。
Q:两种特殊消息的使用场景?
A: 遗嘱消息,多用于客户端间获取互相之间异常断线的消息通知;
保留消息,可保存 最近一条 广播通知,多用于公告栏信息的发布。
Eclipse Mosquitto :MQTT协议的最小集实现
有 EMQ , HiveMQ , RabbitMQ MQTT Adapter 等。
Qos=2 消息保障的网络I/O次数过多,如果不是必需,尽少在程序里使用此类消息。
毕竟当初其设计的目的是为了减少设备的性能占用,但若应用场景并不是物联网,而是用于手机、电脑或浏览器端等现在已不缺性能的设备上,最好在报文体中,使用UUID生成全局唯一的消息ID,然后自行在业务解析中判断此报文是否被消费过。
或者,业务方在处理消息时保证其被消费的幂等性,也可消除重复消息对系统带来的影响。
正如MQTT协议并没有依赖TCP连接状态,自己在应用层协议上实现心跳报文来控制连接状态,业务方作为MQTT协议的使用者,也不要完全依赖协议的工作状态,而是依托MQTT协议建立属于业务本身的信息汇报机制,以加强系统的稳健性。
Retain Message 可视为客户端主动拉取的行为。如果业务系统采用 HTTP+MQTT 双协议描述业务过程,主动拉取的操作也可使用 HTTP 请求替代。
作为一个长连接型的应用,上线前需要根据业务量级,评估对操作系统 端口数 与 文件描述符 的占用要求,以防服务器资源被打满。
在服务端的配置文件和客户端的连接参数中,都拥有 max_inflight_messages 此项配置,来维护 Qos=1 or 2 消息是否被成功消费的状态。
MQTT 最初被设计为物联网级的通信协议,因此此参数的默认配额较小(大多数情况下被限制到10至20)。
但如果将MQTT协议应用至手机、PC或Web端的推送场景时,硬件性能已不在是瓶颈,在实际使用中推荐把此参数调大。
Mosquitto提供Bridge功能,需要我们自己配置。
Bridge 意为桥接,当我们把两台Broker桥接在一起时,只需要修改一台Broker的配置,填上另一台Broker的运行地址。前一台Broker将作为客户端发布与订阅后一台Broker的所有Topic,实现消息互通的目的。
桥接带来的问题有以下几点:
我的建议:
Websockets协议被设计的目的是为浏览器提供一个全双工的通信协议,方便实现消息推送功能。
在Websockets协议被设计出来前,受限于HTTP协议的一问一答模型,消息的推送只能靠轮询来实现,在资源消耗与时效性保障上,均难以达到令人满意的效果。
Websockets协议复用了HTTP协议的头部信息,告知浏览器接下来的操作将触发协议升级,然后通信双方继续复用HTTP的Header,但报文内容已转变为双方均接受的新协议的格式。
Websockets协议改进了网页浏览中的消息推送的方式,因此被广泛应用在聊天、支付通知等实时性要求比较高的场合下。
MQTT协议重点在于 消息队列的实现,其对消息投递的方式作出约定,并提供一些额外的通信保障 。
MQTT可采取原生的TCP实现,也有基于Websockets的实现版本。当然后者在网络字节的利用率上,不如前者那么精简。但浏览器端无法直接使用TCP协议,所以就只能基于Websockets协议开发。
不过基于Websockets的应用也有方便之处:一是证书不需要额外配置,直接与网站共用一套基础设施;二是可使用 Nginx 等工具管理流量,与普通HTTP流量可共用一套配置方法。
MQTT非常适合入门,原因如下:
实际的应用场景远比理想中的复杂,无法一招走遍天下,必须做好取舍。
MQTT协议在这方面做得很优秀,以后工作中可以作为参考,设计好自己负责的业务系统。
0条评论