RocketMQ介绍
一.消息中间件
1.应用场景
1.异步解耦
例
:注册 发短信 发邮件的操作
,我们以前要先注册
,再等着发短信
,最后等着发邮件
,
而使用消息件就直接注册将其封装成一个对象放到消息中间件中
,然后由消息中间件去监控发短信与发邮件
,
然后接着去干其他事情
,不用等着
2.削峰填谷
3.分布式缓存同步
/消息分发
未使用消息中间件
使用消息中间件(异步解耦)
削峰填谷
2.消息中间件
1.ActiveMQ
2.KafKa
3.RabbitMQ
4.RocketMQ
消息中间件对比
二.RocketMQ的核心概念
RocketMQ主要有四大核心组成部分:NameServer、Broker、Producer以及Consumer四部分
1.名字服务NameServer
主要负责对于源数据的管理,包括了对于Topic和路由信息的管理
1.生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。
2.多个Namesrv实例组成集群,但相互独立,没有信息交换。
注意
:Broker向NameServer发心跳时,会带上当前自己所负责的所有Topic信息,如果Topic个数太多(万级别),
会导致一次心跳中,Topic的数据就几十M,网络情况差的话,网络传输失败,心跳失败,导致NameServer误认为Broker心跳失败
2.代理服务器Broker Server
消息中转角色,负责存储消息、转发消息
1.Broker是具体提供业务的服务器,单个Broker节点与所有的NameServer节点保持长连接及心跳,
并会定时将Topic信息注册到NameServer
2.代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备
3.生产者Producer
消息生产者,负责产生消息,一般由业务系统负责产生消息
1.一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。
2.RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。
3.同步和异步方式均需要Broker返回确认信息,单向发送不需要。
4.消费者Consumer
消息消费者,负责消费消息,一般是后台系统负责异步消费
Consumer也由用户部署,支持PUSH和PULL两种消费模式,支持集群消费和广播消息,提供实时的消息订阅机制
5.消息内容Message
消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题
1.RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。
2.系统提供了通过Message ID和Key查询消息的功能。
6.消息主题Topic
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题
是RocketMQ进行消息订阅的基本单位
7.标签Tag
为消息设置的标志,用于同一主题下区分不同类型的消息
1.标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统
2.消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
8.消息队列MessageQueue
主题被划分为一个或多个子主题,即消息队列
对于每个Topic都可以设置一定数量的消息队列用来进行数据的读取
三.发送消息方式
1.同步发送(默认同步)
必须要等消息持久化到磁盘中以后
,rocketmq给生产者返回一个消息
,持久化完成
2.异步发送
不需要等消息持久化到磁盘中
,就可以执行后面的业务逻辑
3.单向发送
一次性发送
,把消息发送到消息中间件中
,不需要确认返回结果
四.消费模式
1.集群模式(默认模式)
MessageModel
.CLUSTERING 多个消费者分担一个消费者的压力
, 一个消息只会给一个消费者消费
(订单只会支付一次
)
consumer
.setMessageModel(MessageModel
.CLUSTERING
);
2.广播模式
MessageModel
.BROADCASTING 需要对同一个消息进行不同处理的时候
, 比如对同一个消息
, 需要同时发送短信和发送邮件
,
一个消息会发送给所有的消费者
(同时发送短信
,邮箱
)
consumer
.setMessageModel(MessageModel
.BROADCASTING
);
五.消费方式
1.推送消费(push)
2.拉取消费(pul)
关键类
六.延时消息
比如电商里,提交了一个订单就可以发送一个延时消息,
1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
message
.setDelayTimeLevel(2);
延时消息的使用限制
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从
1s到
2h分别对应着等级
1到
18
private String messageDelayLevel
= "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
七.消息过滤
1.Tag标签过滤(默认这个)
生产者
Message message
=
new Message("06-filter", "tagC", "hello,rocketmq".getBytes("utf-8"));
消费者
consumer
.subscribe("06-filter", "tagA||tagC");
2.SQL92过滤
只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:
public void subscribe(finalString topic
, final MessageSelector messageSelector
)
步骤
:
1.在生产者中设置条件 message
.putUserProperty("age", "20");
2.在消费者中设置匹配条件
consumer
.subscribe("06-filter", MessageSelector
.bySql("age>18"));
八.使用api实现
生产者
public class Producer {
public static void main(String
[] args
) throws Exception
{
DefaultMQProducer producer
= new DefaultMQProducer("wolfcode-producer");
producer
.setNamesrvAddr("127.0.0.1:9876");
producer
.start();
Message message
= new Message("01-hello", "tagA", "hello,rocketmq".getBytes("utf-8"));
producer
.send(message
);
producer
.shutdown();
}
}
消费者
public class Consumer {
public static void main(String
[] args
) throws Exception
{
DefaultMQPushConsumer consumer
= new DefaultMQPushConsumer("wolfcode-consumer");
consumer
.setNamesrvAddr("127.0.0.1:9876");
consumer
.subscribe("01-hello", "*");
consumer
.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus
consumeMessage(List
<MessageExt> msgs
, ConsumeConcurrentlyContext context
) {
for (MessageExt msg
: msgs
) {
System
.out
.println("消费线程:"+Thread
.currentThread().getName()+",消息ID:"+msg
.getMsgId()+",消息内容:"+new String(msg
.getBody()));
}
return ConsumeConcurrentlyStatus
.CONSUME_SUCCESS
;
}
});
consumer
.start();
}
}
九.SpringBoot集成RocketMQ
1>同步/异步/一次性消息
1.同步消息(生产者)
RestController
public class SendMsgController {
@Autowired
private RocketMQTemplate template
;
@RequestMapping("/sendMsg")
public String
sendMsg(String msg
) {
template
.syncSend("01-boot-hello", msg
);
return "发送成功";
}
}
2.异步消息(生产者)
@RequestMapping("/asyncMsg")
public String
asyncMsg(String msg
) {
template
.asyncSend("01-boot-hello", msg
, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult
) {
System
.out
.println("发送成功");
}
@Override
public void onException(Throwable throwable
) {
System
.out
.println("发送失败");
}
});
return "发送成功";
}
3.一次性消息(生产者)
@RequestMapping("/sendOneWay")
public String
sendOneWay(String msg
)
template
.sendOneWay("01-boot-hello", msg
);
return "发送成功";
}
同步/异步/一次性(消费者代码不变)
@Component
@RocketMQMessageListener(
consumerGroup
= "boot-consumer-demo",
topic
="01-boot-hello"
)
public class Consumer implements RocketMQListener<String> {
@Override
public void onMessage(String msg
) {
System
.out
.println("消息的内容" + msg
);
}
}
2>集群模式/广播模式
@RestController
public class SendMsgController {
@Autowired
private RocketMQTemplate template
;
@RequestMapping("/sendMsg")
public String
sendMsg(String msg
)
template
.syncSend("02-consumer-model", msg
);
return "发送成功";
}
}
@Component
@RocketMQMessageListener(
consumerGroup
= "boot-consumer-demo",
topic
="02-consumer-model",
messageModel
= MessageModel
.CLUSTERING
)
public class Consumer1 implements RocketMQListener<String> {
@Override
public void onMessage(String msg
) {
System
.out
.println("消费者一,消息的内容" + msg
);
}
}
@Component
@RocketMQMessageListener(
consumerGroup
= "boot-consumer-demo",
topic
="02-consumer-model",
messageModel
= MessageModel
.BROADCASTING
)
public class Consumer2 implements RocketMQListener<String> {
@Override
public void onMessage(String msg
) {
System
.out
.println("消费者二,消息的内容" + msg
);
}
}
3>延时消息
@RestController
public class SendMsgController {
@Autowired
private RocketMQTemplate template
;
@RequestMapping("/sendDelayMsg")
public String
sendDelayMsg(String msg
) {
MessageBuilder
<String> builder
= MessageBuilder
.withPayload(msg
);
template
.syncSend("03-boot-delay", builder
.build(),5000,2);
return "发送成功";
}
}
@Component
@RocketMQMessageListener(
consumerGroup
= "boot-consumer-demo1",
topic
="03-boot-delay"
)
public class Consumer3 implements RocketMQListener<String> {
@Override
public void onMessage(String msg
) {
System
.out
.println("消息的内容" + msg
);
}
}
4>tag过滤
@RestController
public class SendMsgController {
@RequestMapping("/filter")
public String
filter(String msg
,String tag
) {
template
.syncSend("04-boot-filter:"+tag
, msg
);
return "发送成功";
}
}
@Component
@RocketMQMessageListener(
consumerGroup
= "boot-consumer-demo",
topic
= "04-boot-filter",
selectorType
= SelectorType
.TAG
,
selectorExpression
= "TagA||TagC"
)
public class Consumer4 implements RocketMQListener<String> {
@Override
public void onMessage(String msg
) {
System
.out
.println("消息的内容" + msg
);
}
}
5>SQL92过滤
@RestController
public class SendMsgController {
@RequestMapping("/filter")
public String
filter(String msg
,String age
) {
HashMap map
= new HashMap();
map
.put("age",age
);
template
.convertAndSend("04-boot-filter:", msg
,map
);
return "发送成功";
}
}
@Component
@RocketMQMessageListener(
consumerGroup
= "boot-consumer-demo",
topic
= "04-boot-filter",
selectorType
= SelectorType
.SQL92
,
selectorExpression
= "age>20"
)
public class Consumer4 implements RocketMQListener<String> {
@Override
public void onMessage(String msg
) {
System
.out
.println("消息的内容" + msg
);
}
}
pom.xml依赖
<dependency>
<groupId>org.apache.rocketmq
</groupId>
<artifactId>rocketmq-spring-boot-starter
</artifactId>
<version>2.0.3
</version>
</dependency>
application.properties
rocketmq
.name
-server
=127.0.0.1:9876
rocketmq
.producer
.group
=my
-group
rocketmq
.producer
.send
-message
-timeout
=10000
server
.port
=8089
rocketmq
.name
-server
=127.0.0.1:9876
server
.port
=8088
十.小结
1.RcoketMQ 是一款低延迟、高可靠、可伸缩、易于使用的消息中间件
2.RcoketMQ使用场景有异步解耦 削峰填谷 分布式缓存同步
/消息分发等
3.生产者提供三种发送方式
:1.同步发送
2.异步发送
3.单向发送
4.消费者支持PUSH和PULL两种消费模式,支持集群消费和广播消息
5.消费者的集群消费和广播消息只需要修改消费者端的代码即可
6.2种消息过滤都只要更改消费端注解上的类型和过滤方式即可