消息中间件(消息队列)是分布式系统中重要的组件,主要解决了应用耦合、异步消息、流量削峰等问题实现高性能、高可用、可伸缩和最终一致性,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ等。
以下介绍消息队列在实际应用中常用的使用场景:异步处理、应用解耦、流量削峰和消息通讯四个场景。
RabbitMQ是一个由Erlang语言开发的AMQP的开源实现。
AMQP: Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品和开发语言等条件的限制。
RabbitMQ最初起源于金融系统,用在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:
可靠性
使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
灵活的路由
在消息进入队列之前,通过Exchange来路由消息。对于典型的路由功能,RabbitMQ已经提供了一些内置的Exchange来实现,针对复杂的路由功能,可以将多个Exchage绑定在一起,也通过插件机制实现自己的Exchange。
消息集群
多个RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
高可用
队列可以在集群中的机器上进行镜像,使得在部分节点出现问题的情况下队列仍然可用。
多种协议
RabbitMQ迟滞多种消息队列协议,比如STOMP、MQTT等。
多语言客户端
RabbitMQ几乎迟滞所有常用语言,比如java、net、ruby
管理界面
RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息Broker的许多方面。
RabbitMQ Server:也叫Broker server,它是一种传输服务,它的角色就是维护一条从Producer到Consumer的路线,保证数据能够按照指定的方式进行传输。
Producer:消息的生产者。如图中的ABC,都是数据的发送方。消息的生产者连接RabbitMQ服务器,然后将消息投递到Exchange。
Consumer:消息的消费者,如图123,数据的接收方。消息消费者订阅队列,RabbitMQ将Queue中的消息发送到消息消费者。
Exchange:生产者将消息发送到Exchange(交换器),由Exchange将消息路由到一个或多个Queue中(或者丢弃)。Exchange并不存储消息,RabbitMq中的Exchange有direct、fanout、topic、headers四种类型,每种类型对应不同的路由规则。
Queue:队列是RabbitMQ的内存对象,用于存储消息。消息消费者就是通过订阅队列来获取消息的,RabbitMQ中的消息都只能存储在Queue中,生产者生产消息最终投递到Queu中,消费者可以从Queue中获取消息并消费,多个消费者可以订阅同一个Queu,这时Queu中的消息会被平均分摊给多个消费这进行处理,而不是每个消费者都收到所有的消息并处理。
RoutingKey:生产者将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。在Exchange Type与binding key固定的情况下,我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。RabbitMQ为routing key设定的长度限制为255 bytes。
Connection:连接,Producer和Consumer都是通过tcp连接到RabbitMQ Server的,胰脏我们可以看到,程序的起初处就是建立这个tcp连接。
Channells:信道,它建立在上述的tcp连接中,数据流动都是在channel中进行的,也就是说,一般情况时程序起始建立tcp,第二步就是建立这个channel。
VirtualHost:权限控制的基本单位,一个VirtualHost中有如干个Exchange和MessageQueue,以及指定被哪些user使用。
4.1 下载并安装Eralng
4.2 下载并安装rabbitmq
4.3 安装管理界面(插件)
进入rabbitMQ安装目录的sbin目录,输入命令
rabbitmq -plugins enable rabbitmq_management4.4 重新启动服务
4.5 打开浏览器,地址栏输入http://127.0.0.1:15672,即可看到管理界面的登录页
默认账户和密码都是guest guest我们需要将消息发给唯一一个节点使用时使用这种模式,这时最简单的一种形式。图片如下:
任何发送到Direct Exchange的消息都会被转发到RouteKey中指定的Queue。
一般情况下可以使用rabbitMQ中的Exchange。这种模式下可以不需要将Exchange进行任何绑定操作消息传递时需要一个RouteKey,可以简单的理解为要发送到的队列名字如果vhost中不存在RouteKey中指定的对列明,则该消息会被抛弃。 直接在管理器中添加队列即可(这里添加一个quest.test,为了和下边的例子相同)
任何发送到FanoutExchange的消息都会转发到与该Exchange绑定到所有的Queue上。
这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queu可以同多个Exchange进行绑定。这种模式下不要要RouteKey。如果接收到消息的Exchange没有与任何的Queue绑定,那么消息就会被抛弃。创建队列quest.test1 quest.test2
创建交换器exchage.fanout_test
我们这里都可以写在配置文件里边。
一般使用轮询的方式,这时默认的策略,消费者轮流、平均第接收信息,公平分发,根据消费者的能力来分发消息,给空闲的消费者发送更多消息。
没有设置的情况下,无法路由(routing key错误)的消息会被直接丢弃。 解决方案:将manddatory设置为ture,并配合ReturnListener,实现消息的回发。
利用装配的消息存货时间或者消息存货时间,加上死信交换机。
// 设置属性,消息10秒钟过期 AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .expiration("10000") // TTL // 指定队列的死信交换机 Map<String,Object> arguments = new HashMap<String,Object>(); arguments.put("x-dead-letter-exchange","DLX_EXCHANGE");生产者方面:可以对每条信息生成一个msgID,以控制消息重复投递。 消费者方面:消息体中必须携带一个业务ID,如银行流水号,消费者可以根据业务ID去重,避免重复消费。
我们可以设置优先级
//生产者 Map<String, Object> argss = new HashMap<String, Object>(); argss.put("x-max-priority",10); //消费者 AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .priority(5) // 优先级,默认为5,配合队列的 x-max-priority 属性使用RabbitMQ使用信道的方式来传输数据。 信道是建立在真实的TCP连接内的消除连接,且每条TCP连接的信道数量没有限制。
消息的持久化,当然前提是队列必须持久化。 RabbitMQ确保持久性消息能从服务器重启中恢复的方式是,将它们写入磁盘上的一个持久化日志文件,当发布一条持久性消息到持久交换器上时,Rabbit会在消息提交到日志文件后才发送响应。