1.RabbitMQ后台管理页面
2.RabbitMQ 核心(自我理解)
3.RabbitMQ6种工作模式介绍
4. RabbitMQ的消息可靠性
5.RabbitMQ普通MAVEN项目使用
6.SpringBoot整合RabbitMQ的入门
7.延时队列&死信队列&死信&死信交换机
8.RabbitMQ的应用场景
1.RabbitMQ后台管理页面
是rabbitmq的后台管理页面http://localhost:15672
默认的用户 guest 密码 guest
1.1 创建用户
#进入rabbitmq的安装目录下的sbin目录下有提供的命令
rabbitmqctl add_user <用户名> <密码>
1.2 用户等级
用户等级名称登录控制台查看所有信息制定策略rabbitmq进行管理
administrator√√√√monitoring√√policymaker√√managment√
#进入rabbitmq的安装目录下的sbin目录下有提供的命令
rabbitmqctl set_user_tags <用户名> <用户等级名称>
1.3 RabbitMQ管理后台的操作
后台可以对RabbitMQ进行任何的操作
用户,虚拟机,队列,交换机,绑定的管理操作
2.RabbitMQ 核心(自我理解)
2.1 虚拟机 (可以理解为数据库,一个虚拟机包含多个队列和交换机)
2.2 交换机 (可以理解为分发者,由它决定消息推送到哪个队列)
2.3 队列 (可以理解为表)
2.4 绑定 (交换机和队列之间的关系)
3.RabbitMQ 6种工作模式介绍
3.1 简单模式
一个生产者 一个 消费者,生产者把消息 放入队列,消费者监听队列一有消息就消费消息
3.2 工作模式
一个生产者,多个消费者,和简单模式相比就可以有多个消费者,当生产者发送消息到队列,两个消费者就轮询分发消息(就是一人一个),也可以设置成公平分发消息(就是能者多劳,先处理完的先获得)。
3.3 订阅模式
一个生成者,一个交换机,多个队列。相比较上面两种工作模式,订阅模式是把消息发送到交换机,由交换机去发送到绑定到该交换机的所有队列。
3.4 路由模式
和订阅模式差不多,不一样地方就是路由模式,队列与交换机以一个键值绑定关系,如上图,orange这个键就是Q1与交换机之间绑定关系,一个队列可以有多个键,当生产者发送消息给交换机并指派交换机发送给绑定在该交换机指定的键的队列。
3.5 主题模式
和路由模式差不多,不一样的是当生产者发送消息给交换机并指派交换机发送给绑定在该交换机符合指定的规则键的队列。(类似上图,规则 * .orangee.*与Q1是绑定关系,当生产者指派x.oragne.y是符合该规则,则由交换机发送消息给Q1队列 )
4.RabbitMQ的消息可靠性
使用RabbitMQ,我们会去考虑消息可靠性,而消息可靠性就是消息是否会丢失,以下就是(消息丢失的场景)
1)生产者发送消息给交换机,队列,而交换机,队列有没有收到。
2) 消费者收到消息,如何确保消息是否被消费
3)当机器宕机或rabbitmq服务重启,消息是否会丢失
4.1 处理消息可靠性方法
1、开启事务(不推荐,因为rabbitmq事务很费性能)
当 通道channel 开启 事务模式 ,当抛出异常捕获异常 可以做 事务回滚txRollbac() 将发送的消息拿回来,只有与事务提交才可以将消息发送
try {
channel
.txSelect();
channel
.basicPublish("", QUEUE_NAME
, null
, message
.getBytes());
int result
= 1 / 0;
channel
.txCommit();
} catch (Exception e
) {
e
.printStackTrace();
channel
.txRollback();
}
2、开启confirm(推荐)
2.1 简单模式
publish一条消息后,等待服务器端confirm,如果服务端返回false或者超时时间内未返回,客户端进行消息重传,当发送多条信息,如果有一条返回false,就将这批消息全部重新发送但是因为不是异步的,如果不返回结果,waitForConfirms就会等待造成阻塞
channel
.confirmSelect();
channel
.basicPublish("", QUEUE_NAME
, null
, message
.getBytes());
boolean b
= channel
.waitForConfirms();
System
.out
.println(b
?"消息被接受":"消息未被接收");
channel
.confirmSelect();
for(int i
=0;i
<batchCount
;i
++){
channel
.basicPublish(ConfirmConfig
.exchangeName
, ConfirmConfig
.routingKey
, MessageProperties
.PERSISTENT_TEXT_PLAIN
, ConfirmConfig
.msg_10B
.getBytes());
}
if(!channel
.waitForConfirms()){
System
.out
.println("send message failed.");
}
2.2 异步模式
与简单模式相比,它是异步,开启一个监听器线程,不会造成线程阻塞addConfirmListener 发送消息,交换机是否接收的监听器addReturnListener 发送消息,队列是否接收的监听器
– 普通的MAVEN项目
channel
.basicPublish("", QUEUE_NAME
, null
, message
.getBytes());
channel
.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long l
, boolean b
) throws IOException
{
System
.out
.println("消息唯一标识 deliveryTag " + l
);
System
.out
.println("是否是多条消息确认 multiple " + b
);
System
.out
.println("消息发送已接收");
}
@Override
public void handleNack(long l
, boolean b
) throws IOException
{
System
.out
.println("消息唯一标识 deliveryTag " + l
);
System
.out
.println("是否是多条消息确认 multiple " + b
);
System
.out
.println("消息发送未被接收");
}
});
channel
.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int i
, String s
, String s1
, String s2
, > >AMQP
.BasicProperties basicProperties
, byte[] bytes
) throws
IOException
{
System
.out
.println("返回标识" + i
);
System
.out
.println("未接受原因" + s
);
System
.out
.println("发送消息交换机" + s1
);
System
.out
.println("接受消息的队列" + s2
);
System
.out
.println("消息内容" + new String(bytes
));
System
.out
.println("消息的基础参数"+ basicProperties
);
System
.out
.println("消息发送队列已接收");
}
});
– SpringBoot项目
配置文件开启配置项,创建类注入spring容器,通过注入rabbitmqtemplate模板对象中的returncallback,confirmcallback属性,通过实现ReturnCallback ,ConfirmCallback接口重写returnedMessage方法(监听队列是否接收到消息),重写confirm方法(监控交换机消息是否接收)
spring:
rabbitmq:
host: localhost
port: 5672
username: lzj
password: 123456
virtual-host: /vhost_lzj
publisher-returns: true
publisher-confirm-type: correlated
@Component
public class MQReturn implements RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate
;
@PostConstruct
public void init(){
this.rabbitTemplate
.setReturnCallback(this);
this.rabbitTemplate
.setConfirmCallback(this);
}
@Override
public void returnedMessage(Message message
, int i
, String s
, String s1
, String s2
) {
System
.out
.println("返回标识" + i
);
System
.out
.println("未接受原因" + s
);
System
.out
.println("发送消息交换机" + s1
);
System
.out
.println("接受消息的队列" + s2
);
System
.out
.println("消息基础内容" + new String(message
.getBody()));
System
.out
.println("消息的基础参数对象 : " + message
.getMessageProperties());
System
.out
.println("队列接受消息失败");
}
@Override
public void confirm(CorrelationData correlationData
, boolean b
, String s
) {
System
.out
.println(correlationData
.getId());
if(b
){
System
.out
.println("交换机接收消息成功");
}else{
System
.out
.println("交换机接收消息失败,原因:"+ s
);
}
}
}
3、开启RabbitMQ持久化(交换机、队列、消息)
– 三个持久化可以通过rabbitmq后台创建,也可以通过代码方式创建(我后面也有介绍)
交换机持久化队列持久化消息持久化 (消息是在队列里,所以消息持久化要建立在队列持久化的基础上)
4、消息确认机制(message acknowledgment),关闭RabbitMQ自动ack(改成手动)
当关闭自动应答ack , RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。如果自动应答ack是开启的,RabbitMQ会在队列中消息被消费后立即删除它
普通MAVEN项目
Consumer consumer
= new DefaultConsumer(channel
){
@Override
public void handleDelivery(String consumerTag
, Envelope envelope
, AMQP
.BasicProperties properties
, byte[] body
) throws IOException
{
String msg
= new String(body
);
System
.out
.println("接收到消息" + msg
);
System
.out
.println("处理逻辑");
channel
.basicAck(envelope
.getDeliveryTag(),false);
}
};
channel
.basicConsume(QUEUE_NAME
,AUTO_ACK
,Consumer
);
SpringBoot项目
配置文件开启对应的配置项,在消费处使用 channel.basicAck 进行消费应答
spring:
rabbitmq:
host: localhost
port: 5672
username: lzj
password: 123456
virtual-host: /vhost_lzj
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
@RabbitHandler
public void getMsgString(String msg
, Channel channel
, Message message
) throws IOException
{
System
.out
.println("正在消费一条消息 : " + msg
);
System
.out
.println(message
.getMessageProperties().getDeliveryTag());
Scanner scanner
= new Scanner(System
.in
);
System
.out
.println("请输入任意字符 消费该消息");
String s
= scanner
.nextLine();
channel
.basicAck(message
.getMessageProperties().getDeliveryTag(),false);
}
5.RabbitMQ普通MAVEN项目使用
5.1 导入依赖
<dependency>
<groupId>com.rabbitmq
</groupId>
<artifactId>amqp-client
</artifactId>
<version>5.6.0
</version>
</dependency>
5.2 API介绍
封装rabbitmq连接工具
public class MQUtil {
public static Connection
getConnection() throws IOException
, TimeoutException
{
ConnectionFactory factory
= new ConnectionFactory();
factory
.setHost("127.0.0.1");
factory
.setPort(5672);
factory
.setVirtualHost("/vhost_lzj");
factory
.setUsername("lzj");
factory
.setPassword("123456");
return factory
.newConnection();
}
}
使用封装rabbitmq连接工具的流程【1】 获得连接【2】 获得通道【3】rabbitmq的操作【4】关闭连接【5】关闭通道
Connection connection
= MQUtil
.getConnection();
Channel channel
= connection
.createChannel();
channel
.close();
connection
.close();
常用API介绍
API作用
queueDeclare创建声明队列exchangeDeclare创建声明交换机queueBind队列绑定basicPublish消息推送basicConsume/basicGet消息消费basicAck消息应答basicCancel取消消费者订阅basicReject消息拒绝basicRecover恢复消息到队列basicQos设置服务端每次发送给消费者的消息数量txSelect事务开启txCommit事务提交txRollback事务回滚confirmSelectconfirm模式开启addConfirmListener添加confirm监听器,监听交换机是否接收到消息addReturnListener添加return监听器,监听队列是否接收到消息
1) 创建声明队列
chnnel.queueDeclare(String, boolean, boolean, boolean,Map<String, Object>);
String – 队列的名字boolean – 消息持久化是否开启 (队列持久化就是就是即时重启rabbitmq服务,队列也存在,因为rabbitmq把队列存储在硬盘,非持久化就是存储在内存,重启宕机队列就会丢失)boolean – 是否排外 (当前队列是否为当前连接私有)boolean – 是否自动删除 (当最后一个消费者断开连接之后,就自动删除队列,不管队列里有没有消息)Map<String, Object> – 属性参数设置 案例
参数作用
Message TTL(x-message-ttl)设置队列中的所有消息的生存周期(统一为整个队列的所有消息设置生命周期), 也可以在发布消息的时候单独为某个消息指定剩余生存时间,单位毫秒, 类似于redis中的ttl,生存时间到了,消息会被从队里中删除,注意是消息被删除,而不是队列被删除, 特性Features=TTL, 单独为某条消息设置过期时间AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().expiration(“6000”);channel.basicPublish(EXCHANGE_NAME, “”, properties.build(), message.getBytes(“UTF-8”));Auto Expire(x-expires)当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…)就会被删除,Features=ExpMax Length(x-max-length)限定队列的消息的最大值长度,超过指定长度将会把最早的几条删除掉, 类似于mongodb中的固定集合,例如保存最新的100条消息, Feature=LimMaxLength Bytes(x-max-length-bytes)限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小, Features=Lim BDead letter exchange(x-dead-letter-exchange)当队列消息长度大于最大长度、或者过期的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉,Features=DLXDead letter routing key(x-dead-letter-routing-key)将删除的消息推送到指定交换机的指定路由键的队列中去, Feature=DLKMaximum priority(x-max-priority)优先级队列,声明队列时先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费,Lazy mode(x-queue-mode=lazy)Lazy Queues: 先将消息保存到磁盘上,不放在内存中,当消费者开始消费的时候才加载到内存中Master locator(x-queue-master-locator)
2) 创建声明交换机
channel.exchangeDeclare(String, String, boolean, boolean, boolean, Map<String, Object>);
String – 交换机名称String – 交换机类型 【“fanout”-订阅模式,“direct”-路由模式,“topic”-主题模式】boolean – 交换机是否持久化(持久化就是即时重启宕机,交换机依然存在,因为不是存在内存而是存在硬盘中)boolean – 是否自删除 (当最后一个队列与该交换机解绑,该交换机自动删除)boolean – 是否是内置的交换器,如果是,生产者客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。Map<String, Object> – 属性参数设置 (不常用不做说明,一般为NULL)
3) 队列绑定
channel.queueBind(String, String , String)
String – 队列名字String – 交换机名字String - 路由键值(routing key)
4) 消息推送
- 4.1 channel.basicPublish(String, String, BasicProperties, byte [])
String – 交换机名字 可以是空字符串String – 队列名或路由键值(如果交换机名字为空字符串,这个就是队列名,如果不为空字符串,就是路由键值)BasicProperties – 基础配置参数 (如果业务涉及要求消息携带以下参数,不要把参数写在消息内容,要设置在这,可以理解为JWT中的参数) 通过创建 BasicProperties 构造方法 ,将配置参数放入构造参数中
属性作用
contentType消息类型如(text/plain)contentEncoding消息内容编码headers消息头部(可以填写任何K-V)deliveryMode消息的投递方式(非持久【1】,持久【2】,就是消息存在硬盘还是内存中)priority优先级(优先级越高消息,越先被消费)expiration消息过期时间timestamp发送消息的时间戳replyTo发送该消息到的队列名称correlationId相关的标识messageId消息的标识userId用户标识appId应用标识type消息类型clusterId集群标识
byte [] – 消息内容
- 4.1 channel.basicPublish(String, String, boolean mandatory, BasicProperties, byte [])
方法重载 比上面多了一个 boolean形参
boolean mandatory – 当交换器无法根据自动的类型和路由键找到一个符合条件的队列,如果mandatory = true ,则返回消息给生产者,mandatory = false,消息直接丢弃
- 4.3 channel.basicPublish(String, String, boolean mandatory, boolean immediate, BasicProperties, byte [])
boolean immediate – true,如果交换器在消息路由到队列时发现没有任何消费者,那么这个消息将不会存和队列。rabbit3.0被丢弃了这个参数
5) 消息消费
- 4.1 channel.basicConsume
消费队列的消息,只要队列还有消息就一直取
String queue – 队列名字boolean autoAck – 自动应答,true,他就会自动ack回应。false,必须手动basicAck回应(只有生产者接受到ack才会从队列移除消息完成消费)String consumerTag – 消费者标签,用来区分多个消费者boolean onLocal – 设置为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者boolean exclusive – 是否排他Map<String,Object> arguments – 消费者的参数DeliverCallback deliverCallback – 当一个消息发送过来后的回调接口(函数式接口)CancelCallback cancelCallback – 除了调用basicCancel的其他原因导致消息被取消时调用该接口。(函数式接口)ConsumerShutdownSignalCallback shutdownSignalCallback – 当Channel与Conenction关闭的时候会调用(函数式接口)Consumer consumer – 消费的回调对象 通过重写 DefaultConsumer 对象的方法 去实现回调
方法名作用
handleDelivery消息接收时被调用handleConsumeOk任意basicComsume调用导致消费者被注册时调用handleRecoverOkbasicRecover调用并被接收时调用handleCancel除了调用basicCancel的其他原因导致消息被取消时调用。handleCancelOkbasicCancel调用导致的订阅取消时被调用handleShutdownSignal当Channel与Conenction关闭的时候会调用
- 4.2 channel.basicGet(String, boolean)
消费队列中的第一条消息
String – 队列名字boolean autoAck – 是否自动应答
6) 消息应答
channel.basicAck(long, boolean)
消息消费(basicConsume/basicGet)如果关闭了自动应答,则要使用该API来手动应答,如果关闭了自动应答,而在消费消息时不手动应答,当消费者消费消息时,队列会移除该消息,但是没有接收到消费者的应答前,该消息真正意义上是没被消费的,但消费者channel或connection关闭时,队列还没接收到应答就会返还该消息
long – 服务器端向消费者推送消息,消息会携带一个deliveryTag参数,也可以成此参数为消息 * 的唯一标识,是一个递增的正整数boolean – true表示确认所有消息,包括消息唯一标识小于等于deliveryTag的消息,false只确认 * * deliveryTag指定的消息
7) 取消消费者订阅
basicCancel(String consumerTag)
取消消费者对队列的订阅关系,此消费者不再消费该队列消息
String consumerTag – 服务器端生成的消费者标识
8) 消息拒绝
- 8.1 basicReject(long deliveryTag, boolean)
一次拒绝一个消息
long – 服务器端向消费者推送消息,消息会携带一个deliveryTag参数,也可以成此参数为消息 * 的唯一标识,是一个递增的正整数boolean – true则重新入队列,否则丢弃或者进入死信队列。
- 8.2 basicAck(long, boolean, boolean)
批量拒绝消息
long – 服务器端向消费者推送消息,消息会携带一个deliveryTag参数,也可以成此参数为消息 * 的唯一标识,是一个递增的正整数boolean – true表示确认所有消息,包括消息唯一标识小于等于deliveryTag的消息,false只确认 * * deliveryTag指定的消息boolean – 表示拒绝的消息,true是消息重新入队,false是消息丢弃
9) 恢复消息到队列
- 9.1 basicRecover()
将未确认的消息重新恢复到队列,投递给其他消费者,而不是自己
- 9.2 basicRecover(boolean)
将未确认的消息重新恢复到队列 boolean – true则重新入队列,并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。false则消息会重新被投递给自己。
10)设置服务端每次发送给消费者的消息数量
因为basicConsume是监听队列,一有消息就消费,但是就会出现没有消费消息没有应答,队列会一直给你消息,造成消费者吃不消现象,所以有了Qos(Quality of Service)保证服务质量,设置服务端每次发送给消费者的消息数量的阈值,当消费者达到该阈值,就会停止发送,直达消费者消费完消息后返回应答,才会继续发送。
- 10.1 basicQos(int prefetchCount)
int prefetchCount 服务端每次发送给消费者的消息数量
- 10.2 basicQos(int prefetchCount, boolean)
boolean – 如果为true,则当前设置将会应用于整个Channel(频道)
- 10.3 basicQos(int prefetchSize, intprefetchCount, boolean)
prefetchSize – 服务器传送最大内容量(以八位字节计算)
6.SpringBoot整合RabbitMQ的入门
6.1 依赖
<dependency>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-starter-amqp
</artifactId>
</dependency>
6.2 配置文件 基本配置 。 详细的配置说明 看这 传送门
spring:
rabbitmq:
host: localhost
port: 5672
username: lzj
password: 123456'
virtual-host: /vhost_lzj
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
6.3 通过 AmpqTemplate 或者 RabbitTemplate 对象去调用对 RabbitMQ 的操作
@Autowired
private AmpqTemplate ampqTemplate
;
@Autowired
private RabbitTemplate rabbitTemplate
;
6.4 常用API介绍
API作用
一队列,交换机的创建,以及交换机和队列绑定convertAndSend / send消息的发送三消息接收四confirm监听器,监听交换机是否接收到消息五return监听器,监听队列是否接收到消息六消费者应答七消费者的质量保证
1) 队列,交换机的创建,以及交换机和队列绑定
通过配置类,在项目启动的时候创建 队列,交换机,和交换机和队列绑定
@Configuration
public class MQConfiguration {
@Bean
public FanoutExchange
lzjFanoutExchange() {
FanoutExchange fanoutExchange
= new FanoutExchange("lzj_fanout");
return fanoutExchange
;
}
@Bean
public DirectExchange
lzjDirectExchange() {
DirectExchange directExchange
= new DirectExchange("lzj_direct");
return directExchange
;
}
@Bean
public TopicExchange
lzjTopicExchange(){
TopicExchange topicExchange
= new TopicExchange("lzj_topic");
return topicExchange
;
}
@Bean
public Queue
lzjQueue() {
Queue queue
= new Queue("lzj_queue_1");
return queue
;
}
@Bean
public Binding
lzjFanoutQueueBinding(FanoutExchange lzjFanoutExchange
, Queue lzjQueue
){
Binding to
= BindingBuilder
.bind(lzjQueue
).to(lzjFanoutExchange
);
return to
;
}
}
2) 消息的发送
- 2.1 rabbitTemplate.send(Message message)
Message — 消息对象(里面包含消息内容字节类型,和消息参数)
rabbitTemplate
.setDefaultReceiveQueue(QUEUE_NAME
);
rabbitTemplate
.send(new Message(msg
.getBytes(),new MessageProperties()));
rabbitTemplate
.setExchange("ex_lzj_fanout");
rabbitTemplate
.send(new Message(msg
.getBytes(),new MessageProperties()));
rabbitTemplate
.setExchange("ex_lzj_direct");
rabbitTemplate
.setRoutingKey("a");
rabbitTemplate
.send(new Message(msg
.getBytes(),new MessageProperties()));
- 2.2 rabbitTemplate.send(String routingKey, Message message)
routingKey — 当没有设置setExchange设置发送指定交换机,routingKey 就是队列名,当设置了发送指定交换机,routingKey 就为路由键Message — 消息对象(里面包含消息内容字节类型,和消息参数[MessageProperties]: headers, properties)
rabbitTemplate
.send(QUEUE_NAME
,new Message(msg
.getBytes(),new MessageProperties()));
rabbitTemplate
.setExchange("ex_lzj_fanout");
rabbitTemplate
.send("",new Message(msg
.getBytes(),new MessageProperties()));
rabbitTemplate
.setExchange("lzj_direct");
rabbitTemplate
.send("a",new Message(msg
.getBytes(),new MessageProperties()));
- 2.3 rabbitTemplate.send(String exchange, String routingKey, Message message)
exchange — 交换机名字routingKey — 路由键,可以为空字符串Message — 消息对象(里面包含消息内容字节类型,和消息参数[MessageProperties]: headers, properties)
- 2.4 rabbitTemplate.send(String exchange, String routingKey, Message message, @Nullable CorrelationData correlationData)
待研究
- 2.5 rabbitTemplate.convertAndSend(Object object)
发送消息,与send()方法用法相同,不同地方是他接收的参数是一个Object对象,该对象要实现Serializable接口。
Object — 接收对象,Object 转换再传输
rabbitTemplate
.setDefaultReceiveQueue(QUEUE_NAME
);
rabbitTemplate
.convertAndSend(msg
);
rabbitTemplate
.setExchange("ex_lzj_fanout");
rabbitTemplate
.send(msg
);
rabbitTemplate
.setExchange("ex_lzj_direct");
rabbitTemplate
.setRoutingKey("a");
rabbitTemplate
.send(msg
);
- 2.6 rabbitTemplate.convertAndSend(Object message, MessagePostProcessor messagePostProcessor)
Object — 接收对象,Object 转换再传输MessagePostProcessor — 在信息发送之前设置参数的接口对象,通过重写该接口的postProcessMessage去设置消息的餐数据【headers, properties】
MessagePostProcessor() {
@Override
public Message
postProcessMessage(Message message
) throws AmqpException
{
message
.getMessageProperties().setHeader("K","V");
message
.getMessageProperties().setMessageId("1");
return message
;
}
};
rabbitTemplate
.convertAndSend((Object
) msg
,messagePostProcessor
); ```
- 2.7 rabbitTemplate.convertAndSend(String routingKey, Object object)
routingKey — 当没有设置setExchange设置发送指定交换机,routingKey 就是队列名,当设置了发送指定交换机,routingKey 就为路由键Object — 接收对象,Object 转换再传输
rabbitTemplate
.convertAndSend(QUEUE_NAME
,msg
);
rabbitTemplate
.setExchange("ex_lzj_fanout");
rabbitTemplate
.convertAndSend("",msg
);
rabbitTemplate
.setExchange("lzj_direct");
rabbitTemplate
.convertAndSend("a",msg
);
- 2.8 rabbitTemplate.convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor)
routingKey — 当没有设置setExchange设置发送指定交换机,routingKey 就是队列名,当设置了发送指定交换机,routingKey 就为路由键Object — 接收对象,Object 转换再传输MessagePostProcessor — 在信息发送之前设置参数的接口对象,通过重写该接口的postProcessMessage去设置消息的餐数据【headers, properties】
- 2.9 rabbitTemplate.convertAndSend(String exchange, String routingKey, Object object)
exchange — 交换机名字routingKey — 路由键,可以为空字符串Object — 接收对象,Object 转换再传输
- 2.10 rabbitTemplate.convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor)
exchange — 交换机名字routingKey — 路由键,可以为空字符串Object — 接收对象,Object 转换再传输MessagePostProcessor — 在信息发送之前设置参数的接口对象,通过重写该接口的postProcessMessage去设置消息的餐数据【headers, properties】
3) 消息的接收
创建一个类用于监听队列接受消息,标注@Component把此对象交给spring容器管理,标注@RabbitListener用于监听队列,再通过标注@RabbitHandler进行一个接收方法的声明。
注意:生产者发送的消息对象要序列化,所以消费者接收消息对象,要与生产者的消息对象一致(包括包名 ),@RabbitHandler标注的方法可以自动注入 Channel , Message对象。
@Component
@RabbitListener(queues
= "fanout_queue1") public class MyService {
@RabbitHandler
public void getMsgString(String msg
, Channel channel
, Message message
) throws IOException
{
System
.out
.println("正在消费一条消息 : " + msg
);
System
.out
.println(message
.getMessageProperties().getDeliveryTag());
Scanner scanner
= new Scanner(System
.in
);
System
.out
.println("请输入任意字符 消费该消息");
String s
= scanner
.nextLine();
channel
.basicAck(message
.getMessageProperties().getDeliveryTag(),false);
}
4) 消费者的质量保证
通过配置文件,配置消息消费质量保证
spring.rabbitmq.listener.simple.prefetch=100
7.延时队列&死信队列&死信&死信交换机
1)延时队列:
过指定的时间再消费消息。因为rabbitmq没有支持此类型队列 ,但是我们可以利用消息的一个参数属性(TTL)去实现延时队列。创建一条队列没有消费者,设定消息的过期时间(TTL),和x-dead-letter-exchange(过期消息后发送到交换机)或x-dead-letter-routing-key(过期消息后发送到路由键),当过期消息到期了,就被发送到交换机 再发送到指定队列,从而达到延时队列的效果
2)死信:
消息被拒绝(Basic.Reject或Basic.Nack)并且设置 requeue 参数的值为 false
消息过期了
队列达到最大的长度
3)死信队列&死信交换机 :
当消息变成一个死信之后,如果这个消息所在的队列存在x-dead-letter-exchange参数,那么它会被发送到x-dead-letter-exchange对应值的交换器上,这个交换器就称之为死信交换器,与这个死信交换器绑定的队列就是死信队列。
8.RabbitMQ的应用场景
1) 消息通信
Netty框架做的聊天系统,可以用rabbitmq来作为会话类别管理
2) 流量削峰
在大量请求的情况下,可以使用rabbitmq进行一个中间件的流量削峰,一个个请求进行处理。
3) 异步处理
例如登录发送邮件短信,邮件短信进行一个异步的处理发送。还有订单下单,
4) 日志处理
sleuth 微服务跟踪和熔断 + zipkin 一个开源项目 + rabbimq 进行一个削峰 filebeat 日志监控 + logtash 日志过滤 + rabbitmq 进行一个削峰 + elasticsearch 搜索引擎