跪求:Linux系统下基于MQ通讯配置SSL的操作步骤

跪求:Linux系统下基于MQ通讯配置SSL的操作步骤,第1张

安装的MQ软件包为WMQv600Trial-x86_linux_2targz 将WMQv600Trial-x86_linux_2targz解压至某一目录。

(1)执行接受许可脚本: /mqlicensesh –accept 这个脚本是要安装WebSphere MQ 软件包的MQ许可程序 至关重要,这个脚本没有正确配置的话将导致MQ安装失败–accept是不启动图形直接接受许可。

(2) 安装 WebSphere MQ for Linux 服务器:

[root@localhost mq]# rpm-ivh MQSeriesRuntime-600-0i386rpm

[root@localhost mq]# rpm-ivh MQSeriesSDK-600-0i386rpm

[root@localhost mq]# rpm-ivh MQSeriesServer-600-0i386rpm

注:安装完成后,相关文件会被自动安装在/opt/mqm下,在安装

MQSeriesRuntime-600-0i386rpm时候,安装程序为系统自动创建了一个mqm用户和mqm组,安装完毕后,需要使用该用户来进行MQ的配置。

(3) 安装 WebSphere MQ for Linux 客户端:

[root@localhost mq]# rpm-ivh MQSeriesClient-600-0i386rpm

(4) 安装 WebSphere MQ 样本程序(其中包括amqsput、amqsget、amqsgbr和amqsbcg等)

[root@localhost mq]# rpm-ivh MQSeriesSamples-600-0i386rpm 样本程序安装在/opt/mqm/samp/bin 中。

(5)安装MQ其他软件包

[root@localhost mq]# rpm-ivh MQSeriesMan-600-0i386rpm

[root@localhost mq]# rpm-ivh MQSeriesJava-600-0i386rpm

[root@localhost mq]# rpm-ivh IBMJava2-SDK-142-00i386rpm

上面最后一步安装的是JDK运行环境,如果已经有相同或更高版本的JDK,不需要再安装。 (6) 安装过程创建了一个名为mqm 的用户和一个同样名为mqm 的组,此时,新用户是被锁定的,必须设置一个密码来解锁,这样才能正常使用该用户。用passwd 命令:

[root@localhost mq]# passwd mqm 以上操作均在root用户下操作,至此MQ60安装结束。MQ的配置相关命令操作均在mqm用户下。

注意:

如果执行crtmqm命令时提示

-bash-32$ crtmqm

-bash: crtmqm: command not found

则需要配置mqm用户的环境变量,编辑如下文件,并添加下面的内容,如下:

第一种方法: 相对第二种较安全 仅对 mqm用户有效

1)-bash-32$ vi /var/mqm/bash_profile

PATH=$PATH:/opt/mqm/samp/bin:/opt/mqm/bin:bin:/usr/bin:/usr/sbin:/usr/local/bin:/usr/X11R6/bin

2)执行“”命令,使这个文件生效

-bash-32$ bash_profile

3)再次尝试实行crtmqm或是dspmqm命令,即可发现已经生效。

第二种方法:

1、su root

2、vim /etc/profile

3、在最后面加上:PATH=$PATH:/opt/mqm/samp/bin:/opt/mqm/bin:bin:/usr/bin:/usr/sbin:/usr/local/bin:/usr/X11R6/bin

4、关闭远程终端重新打开,无需重启服务器

5、ok了!!

RocketMQ架构上主要分为四部分构成:

消息生产者,负责生产消息。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟

RocketMQ中的消息生产者都是以生产者组(Producer Group)的形式出现的。生产者组是同一类生产者的集合,这类Producer发送相同Topic类型的消息。一个生产者组可以同时发送多个主题的消息。

消息消费者,负责消费消息。一个消息消费者会从Broker服务器中获取到消息,并对消息进行相关业务处理

RocketMQ中的消息消费者都是以消费者组(Consumer Group)的形式出现的。消费者组是统一类消费者的集合,这类Consumer消费的是同一个Topic类型的消息。消费者组使得在消息消费方法,实现负载均衡(讲一个Topic中不同的Queue平均分配给同一个Consumer Group的不同Consumer,并不是负载均衡)和容错(一个Consumer挂了,该Consumer Group中的其他Consumer可以接着消费元Consumer消费的Queue)的目标变得非常容易

消费者组中Consumer的数量应小于等于Topic的Queue数量。如果超出Queue数量,则多出的Consumer将不能消费消息。

不过一个Topic类型的消息可以被多个消费者组同时消费。

NameServer是一个Broker与Topic路由的注册中心,支持Broker的动态注册与发现。

RocketMQ的思想来自于Kafuka,而Kafka是以来了Zookeeper的。所以,在RocketMQ的早期版本也依赖Zookeeper。从30开始去掉了Zookeeper的依赖,使用了自己的NameServer。

NameServer通常也是以集群的方式部署,不过,NameServer是无状态的,即NameServer集群中的各个节点之间是无差异的,各个节点相互不进行信息通讯。那各个节点中的数据是如何进行数据同步的呢?在Broker节点启动时,轮询NameServer列表,与每个NameServer节点建立长连接,发起注册请求。在NameServer内部维护者一个Broker列表,用来动态存储Broker信息

Broker节点为了证明自己是活着的,为了维护与NameServer间的长连接,会将最新的信息以心跳包的方式上报给NameServer,每30秒发送一次心跳。心跳包中包含BrokerId、Broker地址(IP+Port)、Broker名称、Broker所属集群名称等等。NameServer在接收到心跳包后,会更新心跳时间戳,记录这个Broker的最新存活时间。

由于Broker关机、宕机或网络抖动等原因,NameServer没有收到Broker的心跳,NameServer可能会将其从Broker列表中剔除

NameServer中有一个定时任务,每隔10秒就会扫描一次Broker表,查看每一个Broker的最新心跳时间戳距离当前时间是否超过120秒,如果超过,则会判定Broekr失效,然后将其从Broker列表中剔除。

RocketMQ的路由发现采用的是Pull模型。当Topic路由信息出现变化时,NameServer不会主动推送给客户端,而是客户端定时拉取最新的路由。默认每30秒拉取一次最新的路由

客户端再配置时必须要写上NameServer集群的地址,那么客户端道理连接在哪个NameServer节点呢客户端首先会生产一个随机数,然后再与NameServer节点数取模,此时得到的就是要连接的节点索引,然后就会进行连接。如果连接失败,则会采用round-robin策略,逐个尝试去连接其他节点。

首先采用的是 随机策略 进行选择,失败后采用的是轮询策略。

Broker充当着消息中转角色,负责存储消息、转发消息。Broker在RocketMQ系统中负责接收并存储从生产者发送来的消息,同时为消费者的拉取请求作准备。Broker同时也存储着消息相关的元数据,包括消费者组、消费进度偏移offset、主题、队列等

Remoting Module :整个Broker的实体,负责处理来自clients端的请求。而这个Broker实体则由以下模块构成。

Client Manager :客户端管理器。负责接收、解析客户端(Producer/Consumer)请求,管理客户端。

Store Service :存储服务。提供方便简单的API接口,处理消息存储到物理硬盘和消息查询功能。

HA Service :高可用服务,提供Master Broker和Slave Broker之间的数据同步功能。

Index Service :索引服务。根据特定的Message Key,对投递到Broker的消息进行索引服务,同时也提供根据Message Key对消息进行快速查询的功能

为了增强Broker性能与吞吐量,Broker一般都是以集群形式出现的。各集群节点中可能存放着相同Topic的不同Queue。

如果某Broker节点宕机,如何保证数据不丢失呢?

其解决方案是,将每个Broekr集群节点进行横向扩展,即将Broker节点再建为一个HA集群,解决单点问题。

Broker节点集群是一个主从集群,即有Master和Slave两种角色。Master负责处理读写操作请求,Slave负责对Master中的数据进行备份。当Master挂掉了,Slaver会自动切换为Master去工作。所以这个Broker集群式主备集群。Master与Slave的对应关系是通过指定相同的BrokerName、不同的BrokerId来确定的。BrokerId为0表示Master,非0表示Slave。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。

①启动NameServer,NameServer启动后开始监听端口,等待Broker、Producer、Consumer连接

②启动Broker时,Broker会与所有的NameServer保持长连接,每30秒向NameServer定时发送心跳包

③发送消息前,可以先创建Topic ,创建Topic时需要指定该Topic要存储在哪些Broker上,当然,在创建Topic时也会将Topic与Broker的关系写入到NameServer中。也可以在发送消息时自动创建Topic。

④Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取路由信息,即当前发送Topic的Queue与Broker地址的映射关系。然后根据算法策略从队选择一个Queue,与队列所在的Broker建立长连接从而向Broker发送消息。

⑤Consumer与Producer类似,跟其中一台NameServer建立长连接,获取其所订阅Topic的路由信息,然后根据算法策略从路由信息中获取到其要消费的Queue,然后与Broker建立长连接,消费其中的消息。Consumer会向Broker发送心跳,以确保Broker的存活状态

手动创建Topic时,有两种模式:

自动创建Topic时,默认采用的是Broker模式,会为每个Broker默认创建四个Queue

从物理上讲,读/写队列是同一个队列。所以,不存在读/写队列数据同步问题。读/写队列是逻辑上进行区分的概念 。一般来说,读/写队列数量是相同的。

读/写队列数量不同是有问题的。

但这样可以方便缩容

perm用于设置对当前创建Topic的操作权限:2表示只写,4表示只读,6表示读写

一、消息中间件相关知识

1、概述

消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。

2、消息中间件的组成

21 Broker

消息服务器,作为server提供消息核心服务

22 Producer

消息生产者,业务的发起方,负责生产消息传输给broker,

23 Consumer

消息消费者,业务的处理方,负责从broker获取消息并进行业务逻辑处理

24 Topic

25 Queue

26 Message

消息体,根据不同通信协议定义的固定格式进行编码的数据包,来封装业务数据,实现消息的传输

3 消息中间件模式分类

31 点对点

PTP点对点:使用queue作为通信载体

说明:

消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。

消息被消费以后,queue中不再存储,所以消息消费者不可能消费到已经被消费的消息。 Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

说明:

queue实现了负载均衡,将producer生产的消息发送到消息队列中,由多个消费者消费。但一个消息只能被一个消费者接受,当没有消费者可用时,这个消息会被保存直到有一个可用的消费者。

4 消息中间件的优势

41 系统解耦

交互系统之间没有直接的调用关系,只是通过消息传输,故系统侵入性不强,耦合度低。

42 提高系统响应时间

例如原来的一套逻辑,完成支付可能涉及先修改订单状态、计算会员积分、通知物流配送几个逻辑才能完成;通过MQ架构设计,就可将紧急重要(需要立刻响应)的业务放到该调用方法中,响应要求不高的使用消息队列,放到MQ队列中,供消费者处理。

43 为大数据处理架构提供服务

通过消息作为整合,大数据的背景下,消息队列还与实时处理架构整合,为数据处理提供性能支持。

44 Java消息服务——JMS

Java消息服务(Java Message Service,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

5 消息中间件应用场景

51 异步通信

有些业务不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

52 解耦

降低工程间的强依赖程度,针对异构系统进行适配。在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。通过消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口,当应用发生变化时,可以独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

53 冗余

有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。

54 扩展性

因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。便于分布式扩容。

55 过载保护

在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量无法提取预知;如果以为了能处理这类瞬间峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

56 可恢复性

系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

57 顺序保证

在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。

58 缓冲

在任何重要的系统中,都会有需要不同的处理时间的元素。消息队列通过一个缓冲层来帮助任务最高效率的执行,该缓冲有助于控制和优化数据流经过系统的速度。以调节系统响应时间。

59 数据流处理

分布式系统产生的海量数据流,如:业务日志、监控数据、用户行为等,针对这些数据流进行实时或批量采集汇总,然后进行大数据分析是当前互联网的必备技术,通过消息队列完成此类数据收集是最好的选择。

6 消息中间件常用协议

61 AMQP协议

AMQP即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。

优点:可靠、通用

62 MQTT协议

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协议。

优点:格式简洁、占用带宽小、移动端通信、PUSH、嵌入式系统

63 STOMP协议

STOMP(Streaming Text Orientated Message Protocol)是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息的中间件)设计的简单文本协议。STOMP提供一个可互操作的连接格式,允许客户端与任意STOMP消息代理(Broker)进行交互。

优点:命令模式(非topic\queue模式)

64 XMPP协议

XMPP(可扩展消息处理现场协议,Extensible Messaging and Presence Protocol)是基于可扩展标记语言(XML)的协议,多用于即时消息(IM)以及在线现场探测。适用于服务器之间的准即时操作。核心是基于XML流传输,这个协议可能最终允许因特网用户向因特网上的其他任何人发送即时消息,即使其操作系统和浏览器不同。

优点:通用公开、兼容性强、可扩展、安全性高,但XML编码格式占用带宽大

65 其他基于TCP/IP自定义的协议

有些特殊框架(如:redis、kafka、zeroMq等)根据自身需要未严格遵循MQ规范,而是基于TCP\IP自行封装了一套协议,通过网络socket接口进行传输,实现了MQ的功能。

7 常见消息中间件MQ介绍

71 RocketMQ

阿里系下开源的一款分布式、队列模型的消息中间件,原名Metaq,30版本名称改为RocketMQ,是阿里参照kafka设计思想使用java实现的一套mq。同时将阿里系内部多款mq产品(Notify、metaq)进行整合,只维护核心功能,去除了所有其他运行时依赖,保证核心功能最简化,在此基础上配合阿里上述其他开源产品实现不同场景下mq的架构,目前主要多用于订单交易系统。

具有以下特点:

官方提供了一些不同于kafka的对比差异:

https://rocketmqapacheorg/docs/motivation/

72 RabbitMQ

使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP,STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。同时实现了Broker架构,核心思想是生产者不会将消息直接发送给队列,消息在发送给客户端时先在中心队列排队。对路由(Routing),负载均衡(Load balance)、数据持久化都有很好的支持。多用于进行企业级的ESB整合。

73 ActiveMQ

Apache下的一个子项目。使用Java完全支持JMS11和J2EE 14规范的 JMS Provider实现,少量代码就可以高效地实现高级应用场景。可插拔的传输协议支持,比如:in-VM, TCP, SSL, NIO, UDP, multicast, JGroups and JXTA transports。RabbitMQ、ZeroMQ、ActiveMQ均支持常用的多种语言客户端 C++、Java、Net,、Python、 Php、 Ruby等。

74 Redis

使用C语言开发的一个Key-Value的NoSQL数据库,开发维护很活跃,虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。

75 Kafka

Apache下的一个子项目,使用scala实现的一个高性能分布式Publish/Subscribe消息队列系统,具有以下特性:

76 ZeroMQ

号称最快的消息队列系统,专门为高吞吐量/低延迟的场景开发,在金融界的应用中经常使用,偏重于实时数据通信场景。ZMQ能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,开发成本高。因此ZeroMQ具有一个独特的非中间件的模式,更像一个socket library,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序本身就是使用ZeroMQ API完成逻辑服务的角色。但是ZeroMQ仅提供非持久性的队列,如果down机,数据将会丢失。如:Twitter的Storm中使用ZeroMQ作为数据流的传输。

ZeroMQ套接字是与传输层无关的:ZeroMQ套接字对所有传输层协议定义了统一的API接口。默认支持 进程内(inproc) ,进程间(IPC) ,多播,TCP协议,在不同的协议之间切换只要简单的改变连接字符串的前缀。可以在任何时候以最小的代价从进程间的本地通信切换到分布式下的TCP通信。ZeroMQ在背后处理连接建立,断开和重连逻辑。

特性:

二、主要消息中间件的比较

配置ActiveMQ的集群需要修改conf目录下的activemqxml

具体信息

配置方式:Master-Slave方式

对于ActiveMQ有两种运行方式,一种是放入lib,和web应用一同启动,另外一种是作为单独的mq服务器运行,因为涉及了cluster,我们采用了单独运行的配置。

下载并解压后

我们进入activeMq目录。

进入%ActiveMQ%/bin

选择启动activemqbat

如果启动过程中没有出现错误,我们就可以进行其他的配置。

配置文件的位置在%ActiveMQ%/conf中

编写测试程序:一个用来发送Message,一个用来接收Message

配置文件

修改conf目录下的activemqxml文件

name="host61616" uri="static://(tcp://19216818069:61616,tcp://19216818069:11616)" />

name="host11616" uri="static://(tcp://19216818069:61616,tcp://19216818069:11616)" />

具体看http://embed21iccom/了解

1,为什么要用mq?

2,引入mq会多哪些问题?

3,如何解决这些问题?

---

一:传统模式有哪些痛点

(1)有些复杂的业务系统,一次用户请求可能会同步调用N个系统的接口,需要等待所有的接口都返回了,才能真正的获取执行结果。这种同步接口调用的方式总耗时比较长,非常影响用户的体验,特别是在网络不稳定的情况下,极容易出现接口超时问题。

(2)系统之间耦合性太高,如果调用的任何一个子系统出现异常,整个请求都会异常,对系统的稳定性非常不利。

(3)对于类似于秒杀场景的峰值爆炸的场景,系统的稳定性堪忧;

二:为什么要用mq?

(1)异步:同步接口调用导致响应时间长的问题,使用mq之后,将同步调用改成异步,能够显著减少系统响应时间。避免耗时时间长,影响用户体验的事情发生

(2)解耦:子系统间耦合性太大的问题,使用mq之后,我们只需要依赖于mq,避免了各个子系统间的强依赖问题。这样就把之前复杂的业务子系统的依赖关系,转换为只依赖于mq的简单依赖,从而显著的降低了系统间的耦合度。

(3)削峰:由于突然出现的请求峰值,导致系统不稳定的问题。使用mq后,能够起到消峰的作用。

订单系统接收到用户请求之后,将请求直接发送到mq,然后订单消费者从mq中消费消息,做写库操作。如果出现请求峰值的情况,由于消费者的消费能力有限,会按照自己的节奏来消费消息,多的请求不处理,保留在mq的队列中,不会对系统的稳定性造成影响。

三:引入mq会出现哪些问题:

1,重复消息问题

重复消费问题可以说是mq中普遍存在的问题,不管你用哪种mq都无法避免。

有哪些场景会出现重复的消息呢?

消息生产者产生了重复的消息

kafka和rocketmq的offset被回调了

消息消费者确认失败

消息消费者确认时超时了

业务系统主动发起重试

如果重复消息不做正确的处理,会对业务造成很大的影响,产生重复的数据,或者导致数据异常;

(2)数据一致性问题

如果mq的消费者业务处理异常的话,就会出现数据一致性问题。

比如:一个完整的业务流程是,下单成功之后,送100个积分。下单写库了,但是消息消费者在送积分的时候失败了,就会造成数据不一致的情况,即该业务流程的部分数据写库了,另外一部分没有写库。

如果下单和送积分在同一个事务中,要么同时成功,要么同时失败,是不会出现数据一致性问题的。

但由于跨系统调用,为了性能考虑,一般不会使用强一致性的方案,而改成达成最终一致性即可。

(3)消息丢失问题:

哪些场景会出现消息丢失问题呢?

消息生产者发生消息时,由于网络原因,发生到mq失败了。

mq服务器持久化时,磁盘出现异常

kafka和rocketmq的offset被回调时,略过了很多消息。

消息消费者刚读取消息,已经ack确认了,但业务还没处理完,服务就被重启了。

导致消息丢失问题的原因挺多的,生产者、mq服务器、消费者 都有可能产生问题,我在这里就不一一列举了。最终的结果会导致消费者无法正确的处理消息,而导致数据不一致的情况。

(4)消息顺序问题

有些业务数据是有状态的,比如订单有:下单、支付、完成、退货等状态,如果订单数据作为消息体,就会涉及顺序问题了。如果消费者收到同一个订单的两条消息,第一条消息的状态是下单,第二条消息的状态是支付,这是没问题的。但如果第一条消息的状态是支付,第二条消息的状态是下单就会有问题了,没有下单就先支付了?

消息顺序问题是一个非常棘手的问题,比如:

kafka同一个partition中能保证顺序,但是不同的partition无法保证顺序。

rabbitmq的同一个queue能够保证顺序,但是如果多个消费者同一个queue也会有顺序问题。

如果消费者使用多线程消费消息,也无法保证顺序。

如果消费消息时同一个订单的多条消息中,中间的一条消息出现异常情况,顺序将会被打乱。

还有如果生产者发送到mq中的路由规则,跟消费者不一样,也无法保证顺序。

(5)消息堆积问题

如果消息消费者读取消息的速度,能够跟上消息生产者的节奏,那么整套mq机制就能发挥最大作用。但是很多时候,由于某些批处理,或者其他原因,导致消息消费的速度小于生产的速度。这样会直接导致消息堆积问题,从而影响业务功能。

这里以下单开通会员为例,如果消息出现堆积,会导致用户下单之后,很久之后才能变成会员,这种情况肯定会引起大量用户投诉。

(6)系统复杂度提升

这里说的系统复杂度和系统耦合性是不一样的,比如以前只有:系统A、系统B和系统C 这三个系统,现在引入mq之后,你除了需要关注前面三个系统之外,还需要关注mq服务,需要关注的点越多,系统的复杂度越高。

mq的机制需要:生产者、mq服务器、消费者。

有一定的学习成本,需要额外部署mq服务器,而且有些mq比如:rocketmq,功能非常强大,用法有点复杂,如果使用不好,会出现很多问题。有些问题,不像接口调用那么容易排查,从而导致系统的复杂度提升了。

如何解决?

1,重复消费问题的解决:

不管是由于生产者产生的重复消息,还是由于消费者导致的重复消息,我们都可以在消费者中这个问题。

这就要求消费者在做业务处理时,要做幂等设计;

在这里我推荐增加一张消费消息表,来解决mq的这类问题。消费消息表中,使用messageId做唯一索引,在处理业务逻辑之前,先根据messageId查询一下该消息有没有处理过,如果已经处理过了则直接返回成功,如果没有处理过,则继续做业务处理。

2,数据一致性的解决:

数据一致性分为:

强一致性

弱一致性

最终一致性

而mq为了性能考虑使用的是最终一致性,那么必定会出现数据不一致的问题。这类问题大概率是因为消费者读取消息后,业务逻辑处理失败导致的,这时候可以增加重试机制。

重试分为:同步重试 和 异步重试。

有些消息量比较小的业务场景,可以采用同步重试,在消费消息时如果处理失败,立刻重试3-5次,如何还是失败,则写入到记录表中。但如果消息量比较大,则不建议使用这种方式,因为如果出现网络异常,可能会导致大量的消息不断重试,影响消息读取速度,造成消息堆积。

而消息量比较大的业务场景,建议采用异步重试,在消费者处理失败之后,立刻写入重试表,有个job专门定时重试。

还有一种做法是,如果消费失败,自己给同一个topic发一条消息,在后面的某个时间点,自己又会消费到那条消息,起到了重试的效果。如果对消息顺序要求不高的场景,可以使用这种方式。

3,消息丢失问题

不管你是否承认有时候消息真的会丢,即使这种概率非常小,也会对业务有影响。生产者、mq服务器、消费者都有可能会导致消息丢失的问题。

为了解决这个问题,我们可以增加一张消息发送表,当生产者发完消息之后,会往该表中写入一条数据,状态status标记为待确认。如果消费者读取消息之后,调用生产者的api更新该消息的status为已确认。有个job,每隔一段时间检查一次消息发送表,如果5分钟(这个时间可以根据实际情况来定)后还有状态是待确认的消息,则认为该消息已经丢失了,重新发条消息。

4,消息顺序问题

消息顺序问题是我们非常常见的问题,我们以kafka消费订单消息为例。订单有:下单、支付、完成、退货等状态,这些状态是有先后顺序的,如果顺序错了会导致业务异常。

订单号路由到不同的partition,同一个订单号的消息,每次到发到同一个partition。

5,消息堆积

如果消费者消费消息的速度小于生产者生产消息的速度,将会出现消息堆积问题。其实这类问题产生的原因很多,如果你想进一步了解,可以看看我的另一篇文章《 我用kafka两年踩过的一些非比寻常的坑 》。

那么消息堆积问题该如何解决呢?

这个要看消息是否需要保证顺序。

如果不需要保证顺序,可以读取消息之后用多线程处理业务逻辑。

这样就能增加业务逻辑处理速度,解决消息堆积问题。但是线程池的核心线程数和最大线程数需要合理配置,不然可能会浪费系统资源。

如果需要保证顺序,可以读取消息之后,将消息按照一定的规则分发到多个队列中,然后在队列中用单线程处理。

RabbitMQ Server安装

################################################

1安装Erlang

sudo yum install erlang

2安装RabbitMQ Server

需要先导入key

sudo rpm --import http://wwwrabbitmqcom/rabbitmq-signing-key-publicasc

可先将内容保存至文本文件,如,rabbitmq-signing-key-publicasctxt

sudo rpm --import rabbitmq-signing-key-publicasctxt

3安装rabbitmq-server-341-1noarchrpm

sudo yum install rabbitmq-server-341-1noarchrpm

4启动RabbitMQ Server

注册为系统服务

sudo chkconfig rabbitmq-server on

启动RabbitMQ Server

sudo /etc/initd rabbitmq-server stop/start/etc

sudo service rabbitmq-server stop/start/etc

5若启动失败检测端口是否被占用

4369 (epmd), 25672 (Erlang distribution)

5672, 5671 (AMQP 0-9-1 without and with TLS)

15672 (if management plugin is enabled)

61613, 61614 (if STOMP is enabled)

1883, 8883 (if MQTT is enabled)

各组件解释如下:

AMQP 消息的路由中增加了 Exchange 和 Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。

Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:

消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为"dog",则只转发 routing key 标记为"dog"的消息,不会转发"dogpuppy",也不会转发"dogguard"等等。它是完全匹配、单播的模式。

每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。

topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号"#"和符号" "。

"#"匹配0个或多个单词," "匹配不多不少一个单词。

RabbbitMQ 的分发机制非常适合扩展,而且它是专门为并发程序设计的,如果现在 load 加重,那么只需要创建更多的 Consumer 来进行任务处理。

在实际应用中,可能会发生消费者收到 Queue 中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给 RabbitMQ,RabbitMQ 收到消息回执(Message acknowledgment)后才将该消息从 Queue 中移除;如果 RabbitMQ 没有收到回执并检测到消费者的 RabbitMQ 连接断开,则 RabbitMQ 会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在 timeout 概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的 RabbitMQ 连接断开。 这里会产生另外一个问题,如果我们的开发人员在处理完业务逻辑后,忘记发送回执给 RabbitMQ,这将会导致严重的 bug——Queue 中堆积的消息会越来越多;消费者重启后会重复消费这些消息并重复执行业务逻辑。

另外 pub message 是没有 ack 的。

如果我们希望即使在 RabbitMQ 服务重启的情况下,也不会丢失消息,我们可以将 Queue 与 Message 都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的 RabbitMQ 消息不会丢失。但依然解决不了小概率丢失事件的发生(比如 RabbitMQ 服务器已经接收到生产者的消息,但还没来得及持久化该消息时 RabbitMQ 服务器就断电了),如果我们需要对这种小概率事件也要管理起来,那么我们要用到事务。由于这里仅为 RabbitMQ 的简单介绍,所以这里将不讲解 RabbitMQ 相关的事务。

要持久化队列 queue 的持久化需要在声明时指定 durable=True;

这里要注意,队列的名字一定要是 Broker 中不存在的,不然不能改变此队列的任何属性

队列和交换机有一个创建时候指定的标志 durable,durable 的唯一含义就是具有这个标志的队列和交换机会在重启之后重新建立,它不表示说在队列中的消息会在重启后恢复。

消息持久化包括 3 部分

如果 exchange 和 queue 都是持久化的,那么它们之间的 binding 也是持久化的,如果 exchange 和 queue 两者之间有一个持久化,一个非持久化,则不允许建立绑定

注意:一旦创建了队列和交换机,就不能修改其标志了,例如,创建了一个 non-durable 的队列,然后想把它改变成 durable 的,唯一的办法就是删除这个队列然后重现创建。

你可能也注意到了,分发机制不是那么优雅,默认状态下,RabbitMQ 将第 n 个 Message 分发给第 n 个 Consumer。n 是取余后的,它不管 Consumer 是否还有 unacked Message,只是按照这个默认的机制进行分发

那么如果有个 Consumer 工作比较重,那么就会导致有的 Consumer 基本没事可做,有的 Consumer 却毫无休息的机会,那么,Rabbit 是如何处理这种问题呢?

RabbitMQ 使用 ProtoBuf 序列化消息,它可作为 RabbitMQ 的 Message 的数据格式进行传输,由于是结构化的数据,这样就极大的方便了 Consumer 的数据高效处理,当然也可以使用 XML,与 XML 相比,ProtoBuf 有以下优势:

DABAN RP主题是一个优秀的主题,极致后台体验,无插件,集成会员系统
网站模板库 » 跪求:Linux系统下基于MQ通讯配置SSL的操作步骤

0条评论

发表评论

提供最优质的资源集合

立即查看 了解详情