RabbitMQ详细介绍哦

tech2022-08-11  109

RabbitMQ

一、走进RabbitMQ

1.消息中间件的简介

​ 消息中间件(消息队列)是分布式系统中重要的组件,主要解决了应用耦合、异步消息、流量削峰等问题实现高性能、高可用、可伸缩和最终一致性,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ等。

​ 以下介绍消息队列在实际应用中常用的使用场景:异步处理、应用解耦、流量削峰和消息通讯四个场景。

2.什么是RabbitMQ

​ RabbitMQ是一个由Erlang语言开发的AMQP的开源实现。

​ AMQP: Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品和开发语言等条件的限制。

​ RabbitMQ最初起源于金融系统,用在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:

可靠性

使用一些机制来保证可靠性,如持久化、传输确认、发布确认。

灵活的路由

在消息进入队列之前,通过Exchange来路由消息。对于典型的路由功能,RabbitMQ已经提供了一些内置的Exchange来实现,针对复杂的路由功能,可以将多个Exchage绑定在一起,也通过插件机制实现自己的Exchange。

消息集群

多个RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。

高可用

队列可以在集群中的机器上进行镜像,使得在部分节点出现问题的情况下队列仍然可用。

多种协议

RabbitMQ迟滞多种消息队列协议,比如STOMP、MQTT等。

多语言客户端

RabbitMQ几乎迟滞所有常用语言,比如java、net、ruby

管理界面

RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息Broker的许多方面。

3.架构图与主要概念

RabbitMQ Server:也叫Broker server,它是一种传输服务,它的角色就是维护一条从Producer到Consumer的路线,保证数据能够按照指定的方式进行传输。

Producer:消息的生产者。如图中的ABC,都是数据的发送方。消息的生产者连接RabbitMQ服务器,然后将消息投递到Exchange。

Consumer:消息的消费者,如图123,数据的接收方。消息消费者订阅队列,RabbitMQ将Queue中的消息发送到消息消费者。

Exchange:生产者将消息发送到Exchange(交换器),由Exchange将消息路由到一个或多个Queue中(或者丢弃)。Exchange并不存储消息,RabbitMq中的Exchange有direct、fanout、topic、headers四种类型,每种类型对应不同的路由规则。

Queue:队列是RabbitMQ的内存对象,用于存储消息。消息消费者就是通过订阅队列来获取消息的,RabbitMQ中的消息都只能存储在Queue中,生产者生产消息最终投递到Queu中,消费者可以从Queue中获取消息并消费,多个消费者可以订阅同一个Queu,这时Queu中的消息会被平均分摊给多个消费这进行处理,而不是每个消费者都收到所有的消息并处理。

RoutingKey:生产者将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。在Exchange Type与binding key固定的情况下,我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。RabbitMQ为routing key设定的长度限制为255 bytes。

Connection:连接,Producer和Consumer都是通过tcp连接到RabbitMQ Server的,胰脏我们可以看到,程序的起初处就是建立这个tcp连接。

Channells:信道,它建立在上述的tcp连接中,数据流动都是在channel中进行的,也就是说,一般情况时程序起始建立tcp,第二步就是建立这个channel。

VirtualHost:权限控制的基本单位,一个VirtualHost中有如干个Exchange和MessageQueue,以及指定被哪些user使用。

4.安装与启动

4.1 下载并安装Eralng

4.2 下载并安装rabbitmq

4.3 安装管理界面(插件)

进入rabbitMQ安装目录的sbin目录,输入命令

rabbitmq -plugins enable rabbitmq_management

4.4 重新启动服务

4.5 打开浏览器,地址栏输入http://127.0.0.1:15672,即可看到管理界面的登录页

默认账户和密码都是guest guest

二、RabbitMQ发送与接收消息

1.直接模式 Direct

1.1 什么是Direct模式

我们需要将消息发给唯一一个节点使用时使用这种模式,这时最简单的一种形式。图片如下:

任何发送到Direct Exchange的消息都会被转发到RouteKey中指定的Queue。

一般情况下可以使用rabbitMQ中的Exchange。这种模式下可以不需要将Exchange进行任何绑定操作消息传递时需要一个RouteKey,可以简单的理解为要发送到的队列名字如果vhost中不存在RouteKey中指定的对列明,则该消息会被抛弃。

1.2 创建队列

​ 直接在管理器中添加队列即可(这里添加一个quest.test,为了和下边的例子相同)

1.3 代码实现消息生产者

//我们需要先进行测试,先导入相关jar,然后导入rabbitMQ配置文件,这里的配置文件是配置生产者的 ApplicationContext contxxt = new ClassPathXmlApplicationContext("applicationContext-rabbit-produer.xml"); rabbitTemplate.converAndSend("","quest.test","直接模式测试"); ((ClassPathXmlApplicationContext)context).close();

1.4 代码实现 消息消费者

//我们还是需要导入一个配置文件,这里的配置文件是用来配置消费者的 //我们需要创建一个监听类,用来接收消息 public class MessageConsumer implements MessageListener{ public void onMessage(Message message){ System.out.println(message.getBody()); } } ApplicationContext contxxt = new ClassPathXmlApplicationContext("applicationContext-rabbit-consumer.xml");

2.分列模式 Fanout

2.1 什么是分列模式

任何发送到FanoutExchange的消息都会转发到与该Exchange绑定到所有的Queue上。

这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queu可以同多个Exchange进行绑定。这种模式下不要要RouteKey。如果接收到消息的Exchange没有与任何的Queue绑定,那么消息就会被抛弃。

2.2 交换器绑定队列

创建队列quest.test1 quest.test2

创建交换器exchage.fanout_test

2.3 代码实现 消息生产者

ApplicationContext contxxt = new ClassPathXmlApplicationContext("applicationContext-rabbit-produer.xml"); rabbitTemplate.converAndSend("exchage.fanout_test","q","分列模式测试"); ((ClassPathXmlApplicationContext)context).close();

2.4 代码实现 消息消费者

//我们需要创建一个监听类,用来接收消息 public class MessageConsumer implements MessageListener{ public void onMessage(Message message){ System.out.println(message.getBody()); } } ApplicationContext contxxt = new ClassPathXmlApplicationContext("applicationContext-rabbit-consumer.xml");

2. 5 创建队列与交换器

我们这里都可以写在配置文件里边。

三、RabbitMQ常见面试题

1.为什么使用MQ?MQ的优点?

异步处理 - 相比于传统的串行、并行方式,提高了系统吞吐量。应用解耦 - 系统间通过消息通信,不用关心其他系统的处理。流量削锋 - 可以通过消息队列长度控制请求量;可以缓解短时间内的高并发请求。日志处理 - 解决大量日志传输。消息通讯 - 消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等。

2.多个消费者监听一个队列时,消息如何分发?

一般使用轮询的方式,这时默认的策略,消费者轮流、平均第接收信息,公平分发,根据消费者的能力来分发消息,给空闲的消费者发送更多消息。

3.无法被路由的消息去了哪里?

没有设置的情况下,无法路由(routing key错误)的消息会被直接丢弃。 解决方案:将manddatory设置为ture,并配合ReturnListener,实现消息的回发。

4.消息在什么时候会变成死信?

消息拒绝并且没有设置重新入队。消息过期。消息堆积,并且队列达到最大长度。

5.RabbitMQ如何实现延时队列?

利用装配的消息存货时间或者消息存货时间,加上死信交换机。

// 设置属性,消息10秒钟过期 AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .expiration("10000") // TTL // 指定队列的死信交换机 Map<String,Object> arguments = new HashMap<String,Object>(); arguments.put("x-dead-letter-exchange","DLX_EXCHANGE");

6.如何保证消息的可靠性投递?

发送方确认模式 如果设置为这个模式,则所有在信道上发布的消息都会被指派一个唯一的ID。一旦消息被投递到目的队列后,或者消息被写入磁盘后,信号会发送一个确认给生产者(生产者中包含唯一消息ID)。如果RabbitMQ发生内部错误从而导致消息丢失,会发送一条未确认的消息。发送方确认模式时异步的,生产者应用程序在等待确认的同时,可以继续发送消息,当确认消息到达生产者应用车徐,生产者应用程序的回调方法就会触发来处理确认消息。 接收方确认机制 消费者接收每一条消息后都必须进行确认(我们要知道的是,消息接收和消息确认是两个不同的操作)。只有消费者确认了消息,RabbitMQ才能安全的从队列这删除,这里并没有用到超时机制,RabbitMQ仅通过Consumer的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ给了Consumer足够长的时间来处理消息。保证数据的一致性。 下面是几种特殊的情况: 如果消费者接收到消息,在确认连接之前断开了连接或者取消订阅,RabbitMQ会认为消息没有被分发,然后重新发给下一个订阅的消费者。如果消费者接收到消息却没有确认消息,连接也未断开,则RabbitMQ认为该消费者繁忙,不会给该消费者分发更多的消息。

7.消息幂等性

生产者方面:可以对每条信息生成一个msgID,以控制消息重复投递。 消费者方面:消息体中必须携带一个业务ID,如银行流水号,消费者可以根据业务ID去重,避免重复消费。

8.消息如何被优先消费?

我们可以设置优先级

//生产者 Map<String, Object> argss = new HashMap<String, Object>(); argss.put("x-max-priority",10); //消费者 AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .priority(5) // 优先级,默认为5,配合队列的 x-max-priority 属性使用

9.如何保证消息的顺序性?

一个队列只有一个消费者的情况下才能保证顺序如果不是只有一个消费者的话,那么只能通过全局ID来实现,(每条消息都一个msgId,关联的消息拥有一个parentMsgId。可以在消费端实现前一条消息未消费,不处理下一条消息;也可以在生产端实现前一条消息未处理完毕,不发布下一条消息)

10 如何自动删除长时间没有消费的消息

// 通过队列属性设置消息过期时间 Map<String, Object> argss = new HashMap<String, Object>(); argss.put("x-message-ttl",6000); // 对每条消息设置过期时间 AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .expiration("10000") // TTL

11.消息是基于什么传输?

RabbitMQ使用信道的方式来传输数据。 信道是建立在真实的TCP连接内的消除连接,且每条TCP连接的信道数量没有限制。

12 如何确保消息不丢失?

消息的持久化,当然前提是队列必须持久化。 RabbitMQ确保持久性消息能从服务器重启中恢复的方式是,将它们写入磁盘上的一个持久化日志文件,当发布一条持久性消息到持久交换器上时,Rabbit会在消息提交到日志文件后才发送响应。

最新回复(0)