厉害!一文了解消息中间件-RabbitMQ

厉害!一文了解消息中间件-RabbitMQ,第1张

RabbitMQ是2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,简称MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法,由Erlang(专门针对于大数据高并发的语言)语言开发,可复用的企业消息系统,是当前最主流的消息中间件之一,具有可靠性、灵活的路由、消息集群简单、队列高可用、多种协议的支持、管理界面、跟踪机制以及插件机制。

1消息 就是数据,增删改查的数据。例如在员工管理系统中增删改查的数据

2队列 指的是一端进数据一端出数据,例如C#中(Queue数据结构)

1消息队列指:一端进消息,一端出消息

2RabbitMQ就是实现了消息队列概念的一个组件,以面向对象的思想去理解,消息队列就是类,而RabbitMQ就是实例,当然不仅仅只有RabbitMQ,例如ActiveMQ,RocketMQ,Kafka,包括Redis也可以实现消息队列。

1在常见的单体架构中,主要流程是用户UI操作发起Http请求>服务器处理>然后由服务器直接和数据库交互,最后同步反馈用户结果

2在微服务架构中,UI与微服务通信,主要是通过Http或者gRPC同步通信

问题分析

在上述2种情况下,我们发现在UI请求时都是同步操作 ,第2种架构虽然将整体服务按业务拆分成不同的微服务并且对应各自的数据库,但是在用户与微服务通信时,存在的问题依然没有解决,例如数据库的承载能力只能处理10w个请求,如果遇到高并发情况下,UI发起50w请求,那数据库是远远承载不了的,从而导致如下问题。

1高并发请求导致系统性能下降响应慢,同时数据库承载风险加大

2扩展性不强UI操作的交互对业务的依赖较大,导致用户体验下降

3瞬时流量涌入巨大的话,服务器可能直接挂了

解决方案

RabbitMQ的优势

RabbitMQ的不足

1ConnectionFactory 为Connection的制造工厂。

2Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。

3Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。

4Exchange(交换机) 我们通常认为生产者将消息投递到Queue中,实际上实际的情况是,生产者将消息发送到Exchange,由Exchange将消息路由到一个或多个Queue中(或者丢弃),而在RabbitMQ中的Exchange一共有4种策略,分别为:fanout(扇形)、direct(直连)、topic(主题)、headers(头部)

1下载RabbitMQ

2运行环境erlang

3安装完成之后,加载RabbitMQ管理插件

4安装成功访问RabbitMQ管理后台http://localhost:15672

1分别创建考勤服务,请假服务,计算薪酬服务,邮件服务,短信服务消费者角色

2创建员工管理网站用于模拟前端调用,主要充当生产者角色

3在员工管理网站和每一个模拟微服务中通过nuget引入RabbitMQClient

4在员工管理网站中创建模拟添加考勤的控制器并加入生产者代码

5在考勤微服务中创建接口,并在接口中加入消费者代码

fanout类型的Exchange路由规则非常简单,工作方式类似于多播一对多,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。

业务实例

当我们有员工需要请假,在员工管理系统提交请假,但是由于公司规定普通员工请假,需要发送短信到他的主管领导,针对此业务场景我们需要调用请假服务的同时去发送短信,这时需要两个消费者(请假服务,短信服务)来消费同一条消息,其实本质就是往RabbitMQ写入一个能被多个消费者接收的消息,所以可以使用 扇形交换机,一个生产者,多个消费者

生产者模拟使用调用控制器来实现

消费者实现IHostedService 接口创建一个监听主机

直接交换器,工作方式类似于单播一对一,Exchange会将消息发送完全匹配ROUTING_KEY的Queue,缺陷是无法实现多生产者对一个消费者

当我们员工管理系统需要计算薪资并将结果以发送短信的方式告诉员工,这个时候我们就不太适合用“扇形交换机”了,因为换做是你,你也不想你的工资全公司都知道吧?这个时候就需要定制了一对一的场景了,那就在生产消息时使用直连交换机根据routingKey发送指定的消费者

生产者模拟使用调用控制器来实现

消费者实现IHostedService 接口创建一个监听主机

Exchange绑定队列需要制定Key; Key 可以有自己的规则;Key可以有占位符; 或者# , 匹配一个单词、#匹配多个单词,在Direct基础上加上模糊匹配;多生产者一个消费者,可以多对对,也可以多对1, 真实项目当中,使用主题交换机。可以满足所有场景

1生产者定义Exchange,然后不同的routingKey绑定

3消费者routingKey的模糊匹配,生产者发送消息时routingKey定义以sms开头, 号只能匹配的routingKey为一级,例如(smsA)或(smsB)的发送的消息,# 能够匹配的routingKey为一级及多级以上 ,例如 (smsA)或者(smsAQWEIOP)

在月底的时候我们需要把员工存在异常考勤信息,薪资结算信息,请假信息分别以邮件的形式发送给我们的员工查阅,我们知道这是一个典型的多个生产者,一个消费者场景,异常考勤信息,薪资结算信息,请假信息分别需要生产消息发送到RabbitMQ,然后供我们员工消费

分别模拟3个生产者:异常考勤信息,薪资结算信息,请假信息

headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。

在绑定Queue与Exchange时指定一组键值对以及x-match参数,x-match参数是字符串类型,可以设置为any或者all。如果设置为any,意思就是只要匹配到了headers表中的任何一对键值即可,all则代表需要全部匹配。

1不需要依赖Key

2更多的时候,像这种Key Value 的键值,可能会存储在数据库中,那么我们就可以定义一个动态规则来拼装这个Key value ,从而达到消息灵活转发到不同的队列中去

我们根据上面的业务和代码简单实现了由生产者到消费者的一个业务流程,我们可以总结出知道,整个消息的收发过程包含有三个角色,生产者(员工管理网站)、RabbitMQ(Broker)、消费者(微服务),在理想状态下,按照这样实现,整个流程以及系统的稳定性,可能不会发生太大的问题,但是真正在实际应用中我们要去思考可能存在的问题,主要从三个大的方面去分析,然后发散。

1生产端

2存储端

3消费端

我们在给RabbitMQ发送消息时,如何去保证消息一定到达呢,我们可以使用RabbitMQ提供了2种生产端的消息确认机制

我们生产端给RabbitMQ发送消息成功后,如果RabbitMQ宕机了,会导致RabbitMQ中消息丢失,如何解决消息丢失问题,针对RabbitMQ消息丢失,我们可以在生产者中使用

1持久化消息

2集群

当生产者写入消息到RabbitMQ后,消费服务接收消息期间,服务器宕机,导致消息丢失了,这个时候我们就应该使用RabbitMQ的消费端消息确认机制

1自动确认

2手动确认

消费者收到消息。消费者发送确认消息给rabbitmq期间。执行业务逻辑失败了,但是消息已经确认被消费了,我们应该在我们的消费者接收消息回调执行业务逻辑后面,执行使用手动确认消息机制,保证消息不被丢失

原文链接:https://wwwcnblogscom/yuxl01/p/15978229html

1、安装

在Mac下安装RabbitMQ是非常简单的,一般默认RabbitMQ服务器依赖的Erlang已经安装,只需要用下面两个命令就可以完成RabbitMQ的安装(前提是homebrew已经被安装):

brew update

brew install rabbitmq

安装完成后需要将/usr/local/sbin添加到$PATH,可以将下面这两行加到~/bash_profile或者~/profile:

# RabbitMQ Config

export PATH=$PATH:/usr/local/sbin

在Windows下安装稍微麻烦些,需要先安装ErLang,然后下载RabbitMQ可执行文件安装

2、启动RabbitMQ服务

上面配置完成后,需要关闭终端窗口,重新打开,然后输入下面命令即可启动RabbitMQ服务:

rabbitmq-server!

可以在后面加-detatched选项参数表示以守护进程方式启动

各组件解释如下:

AMQP 消息的路由中增加了 Exchange 和 Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。

Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:

消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为"dog",则只转发 routing key 标记为"dog"的消息,不会转发"dogpuppy",也不会转发"dogguard"等等。它是完全匹配、单播的模式。

每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。

topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号"#"和符号" "。

"#"匹配0个或多个单词," "匹配不多不少一个单词。

RabbbitMQ 的分发机制非常适合扩展,而且它是专门为并发程序设计的,如果现在 load 加重,那么只需要创建更多的 Consumer 来进行任务处理。

在实际应用中,可能会发生消费者收到 Queue 中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给 RabbitMQ,RabbitMQ 收到消息回执(Message acknowledgment)后才将该消息从 Queue 中移除;如果 RabbitMQ 没有收到回执并检测到消费者的 RabbitMQ 连接断开,则 RabbitMQ 会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在 timeout 概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的 RabbitMQ 连接断开。 这里会产生另外一个问题,如果我们的开发人员在处理完业务逻辑后,忘记发送回执给 RabbitMQ,这将会导致严重的 bug——Queue 中堆积的消息会越来越多;消费者重启后会重复消费这些消息并重复执行业务逻辑。

另外 pub message 是没有 ack 的。

如果我们希望即使在 RabbitMQ 服务重启的情况下,也不会丢失消息,我们可以将 Queue 与 Message 都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的 RabbitMQ 消息不会丢失。但依然解决不了小概率丢失事件的发生(比如 RabbitMQ 服务器已经接收到生产者的消息,但还没来得及持久化该消息时 RabbitMQ 服务器就断电了),如果我们需要对这种小概率事件也要管理起来,那么我们要用到事务。由于这里仅为 RabbitMQ 的简单介绍,所以这里将不讲解 RabbitMQ 相关的事务。

要持久化队列 queue 的持久化需要在声明时指定 durable=True;

这里要注意,队列的名字一定要是 Broker 中不存在的,不然不能改变此队列的任何属性

队列和交换机有一个创建时候指定的标志 durable,durable 的唯一含义就是具有这个标志的队列和交换机会在重启之后重新建立,它不表示说在队列中的消息会在重启后恢复。

消息持久化包括 3 部分

如果 exchange 和 queue 都是持久化的,那么它们之间的 binding 也是持久化的,如果 exchange 和 queue 两者之间有一个持久化,一个非持久化,则不允许建立绑定

注意:一旦创建了队列和交换机,就不能修改其标志了,例如,创建了一个 non-durable 的队列,然后想把它改变成 durable 的,唯一的办法就是删除这个队列然后重现创建。

你可能也注意到了,分发机制不是那么优雅,默认状态下,RabbitMQ 将第 n 个 Message 分发给第 n 个 Consumer。n 是取余后的,它不管 Consumer 是否还有 unacked Message,只是按照这个默认的机制进行分发

那么如果有个 Consumer 工作比较重,那么就会导致有的 Consumer 基本没事可做,有的 Consumer 却毫无休息的机会,那么,Rabbit 是如何处理这种问题呢?

RabbitMQ 使用 ProtoBuf 序列化消息,它可作为 RabbitMQ 的 Message 的数据格式进行传输,由于是结构化的数据,这样就极大的方便了 Consumer 的数据高效处理,当然也可以使用 XML,与 XML 相比,ProtoBuf 有以下优势:

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的 中间件 设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

那么,基于 RabbitMQ(消息队列)实现数据异步入库有什么好处呢?

RabbitMQ客户端模块

参考:

RabbitMQ基础概念详细介绍

消息队列

RabbitMQ是一个分布式系统

一、使用rabbitmq时的系统架构图

通过路由键将交换机和队列进行绑定,从而实现消息的发送和接收。

二、rabbitmq基本概念

rabbitmq是AMQP协议的一个开源实现,所以其内部实际上也是AMQP中的基本概念,如下图所示:

1、Message(消息)

消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(传输模式,指出该消息可能需要持久化存储)等。

2、Publisher

消息生产者,也是一个向交换器发布消息的客户端应用程序,就是投递消息的程序。

3、Exchange

交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。消息交换机,它指定消息按什么规则,路由到哪个队列。

4、Routing Key

路由关键字,exchange根据这个关键字进行消息投递。

5、Binding(绑定)

用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

它的作用就是把exchange和queue按照路由规则绑定起来。

绑定其实就是关联了exchange和queue,或者这么说:queue对exchange的内容感兴趣,exchange要把它的Message deliver到queue。

6、Queue(消息队列)

消息的载体,每个消息都会被投到一个或多个队列,等待消费者连接到这个队列将其取走。它是消息的容器,也是消息的终点。

7、Connection

网络连接,例如一个TCP连接。

8、Channel(信道,通道)

消息通道,在客户端的每个连接里,可建立多个channel。

多路复用连接中的一条独立双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念以达到复用一条TCP连接的目的。

9、Consumer

消息消费者,表示一个从消息队列中取得消息的客户端应用程序,就是接受消息的程序。

10、Virtual Host

虚拟主机,表示一批交换器、消息队列和相关对象。一个broker里可以有多个vhost,用作不同用户的权限分离。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的rabbitmq服务器,拥有自己的队列、交换器、绑定和权限机制。

vhost是AMQP概念的基础,必须在连接时指定,rabbitmq默认的vhost是 / 。

11、Broker

表示消息队列服务器实体。它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输。

三、AMQP中的消息路由

生产者把消息发布到Exchange上,消息最终到达队列并被消费者接收,而Binding决定交换器的消息应该发送到那个队列。如下图所示:

四、Exchange类型

Exchange分发消息时根据类型的不同分发策略有区别,目前共有四种类型:direct、fanout、topic、headers。headers匹配AMQP消息的header而不是路由键,此外headers交换器和direct交换器完全一致,但性能差很多,目前几乎用不到。且看direct、fanout、topic这三种类型。

1、direct类型

消息中的路由键routing key如果和Binding中的binding key一致,交换器就将消息发到对应的队列中去。路由键与队列名完全匹配,如果一个队列绑定到交换器要求路由键为“dog”,则只转发routing key标记为“dog”的消息,不会转发“dogpuppy”等等。 它是完全匹配、单传播的模式。

Driect exchange的路由算法非常简单:通过bindingkey的完全匹配,可以用下图来说明:

Exchange和两个队列绑定在一起,Q1的bindingkey是orange,Q2的binding key是black和green。

当Producer publish key是orange时,exchange会把它放到Q1上,如果是black或green就会到Q2上,其余的Message被丢弃。

2、fanout类型

每个发到fanout类型交换器的消息都会分到所有绑定的队列上去。fanout交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。类似于子网广播,每台子网内的主机都获得了一份复制的消息。 fanout类型转发消息是最快的。  如下图所示:

3、topic类型

topic交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:#和,#匹配0个或多个单词,只能匹配一个单词。

对于Message的routing_key是有限制的,不能是任意的。格式是以点号“”分割的字符表。比如:”stockusdnyse”,“nysevmw”, “quickorangerabbit”。你可以放任意的key在routing_key中,当然最长不能超过255 bytes。对于routing_key,有两个特殊字符#和,#匹配0个或多个单词,只能匹配一个单词。如下图所示:

Producer发送消息时需要设置routing_key,routing_key包含三个单词和两个点号,第一个key描述了celerity(灵巧),第二个是color(色彩),第三个是物种。

在这里我们创建了两个绑定: Q1 的binding key 是”orange“; Q2 是 “rabbit” 和 “lazy#”:Q1感兴趣所有orange颜色的动物;Q2感兴趣所有rabbits和所有的lazy的。

例如:rounting_key 为 “quickorangerabbit”将会发送到Q1和Q2中。rounting_key 为”lazyorangerabbithujjddd”会被投递到Q2中,#匹配0个或多个单词。

五、ConnectionFactory、Connection、Channel

ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。

1、Connection

Connection是Rabbitmq的socket连接,它封装了socket协议相关部分逻辑。

2、ConnectionFactory

ConnectionFactory是connection的制造工厂。

3、Channel

Channel是我们与rabbitmq打交道的最重要的一个接口,大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。

六、任务分发机制

1、Round-robin dispathching 循环分发

RabbbitMQ的分发机制非常适合扩展,而且它是专门为并发程序设计的,如果现在load加重,那么只需要创建更多的Consumer来进行任务处理。

2、Message acknowledgment 消息确认

为了保证数据不被丢失,RabbitMQ支持消息确认机制,为了保证数据能被正确处理而不仅仅是被Consumer收到,这就需要在处理完数据之后发送一个确认ack。

在处理完数据之后发送ack,就是告诉RabbitMQ数据已经被接收并且处理完成,RabbitMQ可以将消息从队列中移除了。如果Consumer退出了但是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer,这样就保证在Consumer异常退出情况下数据也不会丢失。

RabbitMQ没有用到超时机制,它仅仅通过Consumer的连接中断来确认该Message并没有被正确处理,一个消费者处理消息的时间再长也不会导致该消息被发送给其他消费者,即RabbitMQ给了Consumer足够长的时间来做数据处理。如果忘记ack,那么当Consumer退出时,Mesage会被重新分发,从而导致队列中的累积的消息越来越多,然后RabbitMQ会占用越来越多的内存。

3、Message durability 消息持久化

如果我们希望即使在rabbitmq服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置成可持久化的(durable),这样就可以保证绝大部分情况下我们的rabbitmq消息不会丢失。但依然解决不了小概率丢失事件的发生(例如rabbitmq服务器已经接收到了生产者的消息,但还没来得及持久化该消息时rabbitmq服务器就断电了)。如果也要将这种小概率事件管理起来就需要使用到事务了。要持久化队列需要在声明时指定durable=True;这里要注意,队列的名字一定要是Broker中不存在的,不然不能改变此队列的任何属性。队列和交换机有一个创建时候指定的标志durable,durable的唯一含义就是让具有这个标志的队列和交换机会在重启之后重新建立。

消息持久化包括3部分

(1)exchange持久化,在声明时指定durable => true

channelExchangeDeclare(ExchangeName,"direct", durable:true, autoDelete:false, arguments:null);//声明消息队列,且为可持久的

(2)queue持久化,在声明时指定durable => true

channelQueueDeclare(QueueName, durable:true, exclusive:false, autoDelete:false, arguments:null);//声明消息队列,且为可持久的

(3)消息持久化,在投递时指定delivery_mode => 2(1是非持久化)。

channelbasicPublish("", queueName, MessagePropertiesPERSISTENT_TEXT_PLAIN, msggetBytes());

如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的;如果exchange和queue两者之间有一个持久化,一个非持久化,则不允许建立绑定。

注意:一旦创建了队列和交换机,就不能修改其标志了。例如创建了一个non-durable的队列,然后想把它改变成durable的,唯一的办法就是删除这个队列然后重新创建。

4、Fair dispath 公平分发

你可能也注意到了,分发机制不是那么优雅,默认状态下,RabbitMQ将第n个Message分发给第n个Consumer。n是取余后的,它不管Consumer是否还有unacked Message,只是按照这个默认的机制进行分发。那么如果有个Consumer工作比较重,那么就会导致有的Consumer基本没事可做,有的Consumer却毫无休息的机会,那么Rabbit是如何处理这种问题呢

通过basicqos方法设置prefetch_count=1,如下设置

channelbasic_qos(prefetch_count=1)

这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message,换句话说,在接收到该Consumer的ack前,它不会将新的Message分发给它。但是这种方法可能会导致queue满。当然,这种情况下你可能需要添加更多的Consumer,或者创建更多的virtualHost来细化你的设计。

5、分发到多个Consumer

Direct Exchange:直接匹配,通过Exchange名称+RountingKey来发送与接收消息。

Fanout Exchange:广播订阅,向所有的消费者发布消息,但是只有消费者将队列绑定到该路由器才能收到消息,忽略Routing Key。

Topic Exchange:主题匹配订阅,这里的主题指的是RoutingKey,RoutingKey可以采用通配符,如:或#,RoutingKey命名采用英文句点来分隔多个词,只有消息将队列绑定到该路由器且指定RoutingKey符合匹配规则时才能收到消息。

Headers Exchange:消息头订阅,消息发布前为消息定义一个或多个键值对的消息头,然后消费者接收消息,同时需要定义类似的键值对请求头(如

x-mactch=all或者x_match=any),只有请求头与消息头匹配,才能接收消息,忽略RoutingKey。

默认的exchange:如果用空字符串去声明一个exchange,那么系统就会使用”amqdirect”这个exchange。我们创建一个queue时,默认的都会有一个和新建queue同名的routingKey绑定到这个默认的exchange上去。如下:

channelBasicPublish("","TaskQueue", properties, bytes);

因为在第一个参数选择了默认的exchange,而我们声明的队列叫TaskQueue,所以默认的,它要新建一个也叫TaskQueue的routingKey,并绑定在默认的exchange上,导致了我们可以在第二个参数routingKey中写TaskQueue,这样它就会找到定义的同名的queue并把消息放进去。

如果有两个接收程序都是用了同一个的queue和相同的routingKey去绑定direct exchange的话,分发的行为是负载均衡的,也就是说第一个是程序1收到,第二个是程序2收到,以此类推。

如果有两个接收程序用了各自的queue,但使用相同的routingKey去绑定direct exchange的话,分发的行为是复制的,即每个程序都会收到这个消息的副本。行为相当于fanout类型的exchange。

多个queue绑定同一个key也是可以的,对于下图的例子,Q1和Q2都绑定了black,对于routing key是black的Message,会被deliver到Q1和Q2,其余的Message都会被丢弃。

七、RPC远程过程调用

MQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。 但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。在RabbitMQ中也支持RPC。

RabbitMQ中实现RPC的机制如下图所示:

客户端发送请求(消息)时,在消息的属性(MessageProperties ,在AMQP 协议中定义了14种properties ,这些属性会随着消息一起发送)中设置两个值replyTo (一个Queue 名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue 中)和correlationId (此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行或执行失败)。

服务器端收到消息并处理,服务器端处理完消息后,将生成一条应答消息到replyTo 指定的Queue中 ,同时带上correlationId 属性,客户端之前已订阅replyTo 指定的Queue ,从中收到服务器的应答消息后,根据其中的correlationId 属性分析哪条请求被执行了,然后根据执行结果进行后续业务处理。

转发:https://wwwcnblogscom/jasonboren/p/13280745html

rabbitMQ 架构为  rabbitMQ + keepalived  镜像模式。

rabbitmq01  1921681101

rabbitmq01  1921681102

vip 1921681110  在rabbitmq02 上

现在 rabbitmq01 故障 ,起不来,队列数据同步rabbbit02 有问题。

解决方法是将rabbitmq01 这个故障节点从集群中剔除,然后重新加入。

在rabbitmq02  1921681102服务器上

剔除rabbitmq01节点

# rabbitmqctl cluster_status

# rabbitmqctl  forget_cluster_node  rabbit@192-168-1-101

# rabbitmqctl cluster_status

 

在rabbitmq01  1921681101服务器上

停止rabbitmq相关进程

# systemctl  stop rabbitmq-server

# ps aux | grep rabbit | grep -v grep | awk ‘{print $2}’| xargs kill -9

移除rabbitmq相关数据文件

# mkdir /kingdee/rabbitmqBackup

# mv  /var/lib/rabbitmq/  /kingdee/rabbitmqBackup/

重新启动rabbitmq

# systemctl  start  rabbitmq-server

# ps aux | grep rabbit

创建rabbitMQ用户

# rabbitmqctl  add_user   mquser    rabbitMQ@123

注:此密码为安装时设置的rabbitMQ密码

# rabbitmqctl  list_users

# rabbitmqctl  set_user_tags   mquser    administrator

# rabbitmqctl  set_permissions   -p  /   mquser    ''  ''  ''

# rabbitmqctl  stop_app

# rabbitmqctl  join_cluster   rabbit@192-168-1-102

# rabbitmqctl  cluster_status

# rabbitmqctl  start_app

# systemctl  start  keepalived

在rabbitmq02  1921681102服务器上

添加策略同步策略

# rabbitmqctl   set_policy ha-all  "^"   '{"ha-mode":"all","ha-sync-mode":"automatic"}'

RabbitMQ Cluster群集安装配置

https://wwwcnblogscom/elvi/p/7736661html

Network partition detected

Mnesia reports that this RabbitMQ cluster has experienced a network partition 

There is a risk of losing data Please read RabbitMQ documentation about network partitions and the possible solutions

https://wwwrabbitmqcom/partitionshtml

更多里。根据DMS帐号密码规范,rabbitmq密码复杂度在更多里设置。RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。

本文为官方文档翻译版本 rabbitmq375版本,原地址: https://githubcom/rabbitmq/rabbitmq-server/blob/master/docs/rabbitmqconfexample 。以#开头的行为配置,key和等号以及value之间尽量保持有一个空格。以下的"默认"指的为在没有添加配置文件或者该key没有配置。

rabbitmq是使用基于tcp的amqp协议通信(如果需要ssl,可参考 这里 ),所以这里都是监听的tcp的端口。rabbitmq支持监听多端口,并支持指定网卡的ipv4和ipv6。格式为listenerstcp${name} = ${value},name可以是任意不重复的值,如:defaul、local、local_v6等。value的格式有:

(1)包括了(2)和(3),(2)包括了(4)和(6),(3)包括了(5)和(7)。下面对应的为其中情况的配置,按需求进行配置,不需要都配,大部分情况只配置(1)。默认的配置为listenerstcpdefault = 5672

例:

接受TCP侦听器连接的Erlang进程数。一旦打开了一个使用tcp连接的套接字,它就始终保持打开状态,直至任何一方关闭它或因为一个错误而终止。在建立一个连接时,一般为每一次请求产生一个新进程,num_acceptors就是控制产生新进程的个数。假设有一个监听进程,其任务是等待传入的tcp请求。只要一个请求到达,响应该连接请求的进程就变成了接收进程。默认的配置为num_acceptorstcp = 10。

例:

AMQP 0-9-1握手(socket连接和TLS握手之后)的最大时间,以毫秒为单位。

默认的配置为handshake_timeout = 10000。

例:

设置为'true'以在接受一个连接时执行反DNS反查询。 在rabbitmqctl中和web管理中将显示主机名称而不是IP地址。默认的配置为reverse_dns_lookups = true。

例:

开启后的效果

仅允许通过本地(即localhost)连接到代理的用户列表。如果您希望允许guest用户远程连接,则需要将其更改为 loopback_users = none。

要将其他用户限制为仅限localhost的连接,请像这样执行(monitoring是用户的名称): loopback_usersmonitoring = true。默认的配置为loopback_usersguest = true。推荐设置loopback_usersguest = false。

例:

关于ssl的配置,内容比较多,参考 官网 。默认不配置。

选择要使用的认证/授权后端。可以配置ldap相关的设置。具体可以参考 access-control 。internal为由rabbitmq内部处理,默认的配置为 auth_backends1 = internal。

例:

RabbitMQ具有对各种SASL认证机制的可插拔支持。服务器内置了三种这样的机制:PLAIN,AMQPLAIN和RABBIT-CR-DEMO,以及EXTERNAL 可作为 插件使用 。您还可以通过 在插件中实现rabbit_auth_mechanism行为来实现自己的身份验证机制。有关常规插件开发的更多信息,请参阅 插件开发指南 。默认的配置为PLAIN和AMQPLAIN。

内置的机制:

例:

有关rabbitmq-auth-mechanism-ssl插件的配置,查看: https://githubcom/rabbitmq/rabbitmq-auth-mechanism-ssl

SSL handshake超时时间,毫秒为单位。默认的配置为ssl_handshake_timeout = 5000

例:

rabbitmq的用户的密码加密算法。修改该值只会影响新创建的用户,对应老用户需要重置密码进行更新。一般情况下不更改。默认的配置为password_hashing_module = rabbit_password_hashing_sha256。要使用SHA-512,请设置为rabbit_password_hashing_sha512。

例:

和web端的Import definitions、Export definitions有关。好像没啥用==。

默认的用户及其权限和vhost。如果一个connect没有配置以下的配置,则使用默认值进行连接。

默认用户的tag。默认的配置default_user_tagsadministrator = true。一般不需要改。

例:

heartbeat通常用来检测通信的对端是否存活(未正常关闭socket连接而异常crash)。其基本原理是检测对应的socket连接上数据的收发是否正常,如果一段时间内没有收发数据,则向对端发送一个心跳检测包,如果一段时间内没有回应则认为心跳超时,即认为对端可能异常crash了。

rabbitmq也不例外,heatbeat在客户端和服务端之间用于检测对端是否正常,即客户端与服务端之间的tcp链接是否正常。

heartbeat检测时间间隔的设置:

这里要注意的是:如果时间间隔配置为0,则表示不启用heartbeat检测。

例:

设置amqp协议最大允许的字节数。默认的配置为frame_max = 131072(单位为字节,也就是128k),注意该值不要设置过大,如果一条消息比较大(如传输文件),可以通过Publish Confirm和Consumer Acknowledgement机制,如设置过大,那么broker内存会容易被占完。也不要设置过小,保持在128k-1m之间。 引用:使用RabbitMQ传输大文件,保证其完整性

例:

初始化时的最大字节,不知道哪里使用的。原文:Set the max frame size the server will accept before connection tuning occurs。

例:

设置每个连接的最大允许通道数量。 0表示“没有限制”。默认的配置为channel_max = 128。

例:

tcp连接相关的配置。尽量不要改。以下为默认的配置

例:

设置rabbitmq使用内存的阈值。有相对和绝对两种阈值。默认为vm_memory_high_watermarkrelative = 04。

队列开始将消息导出到光盘来释放内存的高水位限制的值。

例如,当vm_memory_high_watermark被设置为04并且该值被设置为05时,

可以在节点使用总可用RAM的20%时开始分页。大于10的值可能很危险,应谨慎使用。

一种替代方法是使用持久队列并发布消息,作为持久性。 有了这个组合队列将消息更快地移动到磁盘。

另一种方法是配置队列来分页所有消息(都是持久和瞬态)到磁盘。

尽可能参阅 http://rabbitmqcom/lazy-queueshtml 。

例:

内存使用情况报告策略。可以是以下之一,默认的配置为rss:

allocated:使用Erlang内存分配器统计信息

rss:使用操作系统RSS内存报告。这使用特定于操作系统的手段,并可能启动短暂的子进程。

legacy:使用legacy内存报告(运行时考虑使用多少内存)。这个策略相当不准确。

erlang:与legacy相同,为了向后兼容而保留

例:

根据 watermarks检查内存级别。没发现具体作用。

例:

可用内存总量,不使用特定于操作系统的方式从环境中推断内存。 只有当节点可用的实际最大RAM数量与节点将要推断的值不匹配时,才应使用这种方法。 该值可以设置为整数个字节,或者可以以信息单位(例如“8GB”)设置。 例如,当该值设置为4 GB时,该节点会认为它在具有4 GB RAM的计算机上运行。默认不设置该值。

例:

和vm_memory_high_watermark类似,disk_free_limit是控制硬盘的使用阈值。RabbitMQ正在存储数据的分区的磁盘可用空间限制。当可用磁盘空间低于此限制时,将触发流量控制。该值可以相对于RAM的总量或以字节或以信息单位表示的绝对值(例如"50MB"或"5GB"或"5KB")来设置,或者,我们可以设置相对于可用RAM总量的限制。低于10的值可能很危险,应谨慎使用。默认为disk_free_limitabsolute = 50MB。

例:

网络分裂。一种在系统的任何两个组之间的所有网络连接同时发生故障后所出现的情况。发生这种情况时,分裂的系统双方都会从对方一侧重新启动应用程序,进而导致重复服务或裂脑。由网络分裂造成的最为严重的问题是它会影响共享磁盘上的数据。默认为ignore模式。如何处理网络分裂?详细的文档可以参考 官网文档

可用的模式是:

在消息中镜像同步批量大小。增加这将加快同步,但批量总大小(以字节为单位)不得超过2 GiB。该设置可用于RabbitMQ 360或更高版本。默认的配置为 mirroring_sync_batch_size = 4096(4k)。

例:

集群相关的配置,为了形成一个集群,新的(“空白”)节点需要能够发现他们的同伴。这可以使用各种机制(后端)来完成。有些机制假定所有集群成员都提前知道(例如,在配置文件中列出),其他机制是动态的(节点可以动态增删)。

内置的发现机制如下:

cluster_formationnode_type:节点类型。默认为disc。

cluster_keepalive_interval:像集群里的其他子节点发送存活消息的间隔(毫秒)。默认为cluster_keepalive_interval = 10000

统计相关,与web管理插件显示有关。可配置的值如下:

例:

设置为true,以便使用HiPE预编译RabbitMQ的部分,这是Erlang的即时编译器。 这会以增加启动时间为代价来提高服务器吞吐量。

您可能会看到启动时延迟几分钟的成本提高20-50%。 这些数据非常依赖于工作负载和硬件。

HiPE支持可能不会编译到您的Erlang安装中。 如果不是,启用此选项只会导致显示一条警告消息,启动将按正常进行。 例如,Debian / Ubuntu用户需要安装erlang-base-hipe软件包。

HiPE在某些平台上完全不可用,特别包括Windows。

HiPE在175之前的Erlang / OTP版本中存在已知问题。 HiPE强烈建议使用最新的Erlang / OTP版本。默认的配置为hipe_compile = false。

等待集群中的Mnesia tables变得可用时使用的超时。默认的配置mnesia_table_loading_retry_timeout = 30000。

在等待集群中的Mnesia tables可用时,需要重试的次数。默认的配置mnesia_table_loading_retry_limit = 10。

在消息的字节数中,消息将被直接嵌入到队列索引中。详情请看 persister tuning 。默认的配置queue_index_embed_msgs_below = 4096。

是否启用后台定期强制GC为“等待”状态运行节点上的所有Erlang进程。

禁用后台GC可以减少客户端操作的延迟,保持启用状态可以减少二进制堆的RAM使用量(请参阅 https://wwwerlang-solutionscom/blog/erlang-garbage-collectorhtml )。

在尝试此选项之前,请查看内存( http://wwwrabbitmqcom/memory-usehtml )。

默认的配置background_gc_enabled = false,当配置为true时,可以设置gc的间隔,默认的配置为background_gc_target_interval = 60000(毫秒)。

设置是否启用代理,启用后不能直连到broker。默认的配置proxy_protocol = false。

未知

有关web管理后台的配置。

查看 http://wwwrabbitmqcom/stomphtml 。

http://wwwrabbitmqcom/mqtthtml

查看 https://githubcom/rabbitmq/rabbitmq-amqp10

查看 http://rabbitmqcom/ldaphtml 。

DABAN RP主题是一个优秀的主题,极致后台体验,无插件,集成会员系统
网站模板库 » 厉害!一文了解消息中间件-RabbitMQ

0条评论

发表评论

提供最优质的资源集合

立即查看 了解详情