RocketMQ是什么?

tech2025-09-25  9

RocketMQ介绍

一.消息中间件

1.应用场景
1.异步解耦 例:注册 发短信 发邮件的操作,我们以前要先注册,再等着发短信,最后等着发邮件, 而使用消息件就直接注册将其封装成一个对象放到消息中间件中,然后由消息中间件去监控发短信与发邮件, 然后接着去干其他事情,不用等着 2.削峰填谷 //请求先到消息中间件 然后服务器一点一点的获取中间件中的请求,避免一次性所有请求全部到达服务器 3.分布式缓存同步/消息分发 //微服务中,每个服务都放到消息中间件中,当一个服务需要调用另一个服务时,直接到消息中间件中获取
未使用消息中间件

使用消息中间件(异步解耦)

削峰填谷

2.消息中间件
1.ActiveMQ 2.KafKa 3.RabbitMQ 4.RocketMQ
消息中间件对比

二.RocketMQ的核心概念

RocketMQ主要有四大核心组成部分:NameServer、Broker、Producer以及Consumer四部分
1.名字服务NameServer
主要负责对于源数据的管理,包括了对于Topic和路由信息的管理
1.生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。 2.多个Namesrv实例组成集群,但相互独立,没有信息交换。 注意:Broker向NameServer发心跳时,会带上当前自己所负责的所有Topic信息,如果Topic个数太多(万级别), 会导致一次心跳中,Topic的数据就几十M,网络情况差的话,网络传输失败,心跳失败,导致NameServer误认为Broker心跳失败
2.代理服务器Broker Server
消息中转角色,负责存储消息、转发消息
1.Broker是具体提供业务的服务器,单个Broker节点与所有的NameServer节点保持长连接及心跳, 并会定时将Topic信息注册到NameServer 2.代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备
3.生产者Producer
消息生产者,负责产生消息,一般由业务系统负责产生消息
1.一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。 2.RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。 3.同步和异步方式均需要Broker返回确认信息,单向发送不需要。
4.消费者Consumer
消息消费者,负责消费消息,一般是后台系统负责异步消费
Consumer也由用户部署,支持PUSH和PULL两种消费模式,支持集群消费和广播消息,提供实时的消息订阅机制
5.消息内容Message
消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题
1.RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。 2.系统提供了通过Message ID和Key查询消息的功能。
6.消息主题Topic
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题
是RocketMQ进行消息订阅的基本单位
7.标签Tag
为消息设置的标志,用于同一主题下区分不同类型的消息
1.标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统 2.消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
8.消息队列MessageQueue
主题被划分为一个或多个子主题,即消息队列
对于每个Topic都可以设置一定数量的消息队列用来进行数据的读取

三.发送消息方式

1.同步发送(默认同步)
必须要等消息持久化到磁盘中以后,rocketmq给生产者返回一个消息,持久化完成
2.异步发送
不需要等消息持久化到磁盘中,就可以执行后面的业务逻辑
3.单向发送
一次性发送,把消息发送到消息中间件中,不需要确认返回结果

四.消费模式

1.集群模式(默认模式)
MessageModel.CLUSTERING 多个消费者分担一个消费者的压力, 一个消息只会给一个消费者消费(订单只会支付一次) //默认集群模式 //设置集群模式 consumer.setMessageModel(MessageModel.CLUSTERING);//不设置默认也可以
2.广播模式
MessageModel.BROADCASTING 需要对同一个消息进行不同处理的时候, 比如对同一个消息, 需要同时发送短信和发送邮件, 一个消息会发送给所有的消费者(同时发送短信,邮箱) //在消费者端加一个设置广播模式就好 //设置广播模式 consumer.setMessageModel(MessageModel.BROADCASTING);

五.消费方式

1.推送消费(push)
//主动将信息推送给消费者 // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer");
2.拉取消费(pul)
//消费者主动broker中获取,前提是broker中要存在生产者已经发送过来的信息,否则报错 关键类 //1.DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("consumer"); //2.PullResult pullResult = consumer.pull(new MessageQueue("pull", "broker-a", 0), "*", 0, 1);

六.延时消息

比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。 //在生产端设置延时等级2, 5秒后发送 message.setDelayTimeLevel(2); //延迟等级 延时消息的使用限制 现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级118 private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

七.消息过滤

1.Tag标签过滤(默认这个)
生产者 Message message = new Message("06-filter", "tagC", "hello,rocketmq".getBytes("utf-8")); 消费者 consumer.subscribe("06-filter", "tagA||tagC"); //生产者第二个参数tag与消费者第二个参数相对应就都能匹配,如果消费者使用*就都能接受
2.SQL92过滤
只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下: public void subscribe(finalString topic, final MessageSelector messageSelector) //注意: 在使用SQL过滤的时候, 需要在conf中broker.conf配置参数enablePropertyFilter=true 步骤: 1.在生产者中设置条件 message.putUserProperty("age", "20"); 2.在消费者中设置匹配条件 consumer.subscribe("06-filter", MessageSelector.bySql("age>18"));

八.使用api实现

生产者
public class Producer { public static void main(String[] args) throws Exception{ //1 创建一个生产者对象, 并且指定一个生产者组 DefaultMQProducer producer = new DefaultMQProducer("wolfcode-producer"); //2 设置名字服务的地址 producer.setNamesrvAddr("127.0.0.1:9876"); //3 启动生产者 producer.start(); //4 创建一个消息 Message message = new Message("01-hello", "tagA", "hello,rocketmq".getBytes("utf-8")); //5 使用生产者发送消息 producer.send(message); //6 关闭连接 producer.shutdown(); } }
消费者
public class Consumer { public static void main(String[] args) throws Exception{ //创建一个拉取消息的消费者对象 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wolfcode-consumer"); //设置名字地址 consumer.setNamesrvAddr("127.0.0.1:9876"); //绑定消息的主题 consumer.subscribe("01-hello", "*"); //消费者监听处理消息方法 consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("消费线程:"+Thread.currentThread().getName()+",消息ID:"+msg.getMsgId()+",消息内容:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动消费者 consumer.start(); } }

九.SpringBoot集成RocketMQ

1>同步/异步/一次性消息
1.同步消息(生产者)
RestController public class SendMsgController { @Autowired private RocketMQTemplate template; @RequestMapping("/sendMsg") public String sendMsg(String msg) { //第一个参数目的地(主题) //第二个参数:发送的消息 //同步发送的方式 template.syncSend("01-boot-hello", msg); return "发送成功"; } }
2.异步消息(生产者)
@RequestMapping("/asyncMsg") public String asyncMsg(String msg) { //异步发送的方式 template.asyncSend("01-boot-hello", msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("发送成功"); } @Override public void onException(Throwable throwable) { System.out.println("发送失败"); } }); return "发送成功"; }
3.一次性消息(生产者)
@RequestMapping("/sendOneWay") public String sendOneWay(String msg) //一次性发送的方式 template.sendOneWay("01-boot-hello", msg); return "发送成功"; }
同步/异步/一次性(消费者代码不变)
@Component @RocketMQMessageListener( consumerGroup = "boot-consumer-demo", //组 topic="01-boot-hello" //主题 ) public class Consumer implements RocketMQListener<String> { @Override public void onMessage(String msg) { System.out.println("消息的内容" + msg); } }
2>集群模式/广播模式
//生产者 @RestController public class SendMsgController { @Autowired private RocketMQTemplate template; @RequestMapping("/sendMsg") public String sendMsg(String msg) template.syncSend("02-consumer-model", msg); return "发送成功"; } } //消费者 //集群模式 @Component @RocketMQMessageListener( consumerGroup = "boot-consumer-demo", topic="02-consumer-model", messageModel = MessageModel.CLUSTERING //集群模式 ) public class Consumer1 implements RocketMQListener<String> { @Override public void onMessage(String msg) { System.out.println("消费者一,消息的内容" + msg); } } //消费者 //广播模式 @Component @RocketMQMessageListener( consumerGroup = "boot-consumer-demo", topic="02-consumer-model", messageModel = MessageModel.BROADCASTING //广播模式 ) public class Consumer2 implements RocketMQListener<String> { @Override public void onMessage(String msg) { System.out.println("消费者二,消息的内容" + msg); } }
3>延时消息
//生产者 @RestController public class SendMsgController { @Autowired private RocketMQTemplate template; @RequestMapping("/sendDelayMsg") public String sendDelayMsg(String msg) { //延时发送的方式 MessageBuilder<String> builder = MessageBuilder.withPayload(msg); template.syncSend("03-boot-delay", builder.build(),5000,2); return "发送成功"; } } //消费者 @Component @RocketMQMessageListener( consumerGroup = "boot-consumer-demo1", topic="03-boot-delay" ) public class Consumer3 implements RocketMQListener<String> { @Override public void onMessage(String msg) { System.out.println("消息的内容" + msg); } }
4>tag过滤
//生产者 @RestController public class SendMsgController { @RequestMapping("/filter") public String filter(String msg,String tag) { template.syncSend("04-boot-filter:"+tag, msg); return "发送成功"; } } //消费者 @Component @RocketMQMessageListener( consumerGroup = "boot-consumer-demo", topic = "04-boot-filter", selectorType = SelectorType.TAG, //tag过滤方式 selectorExpression = "TagA||TagC" //过滤格式 ) public class Consumer4 implements RocketMQListener<String> { @Override public void onMessage(String msg) { System.out.println("消息的内容" + msg); } } //测试:http://localhost:8089/filter?msg=hello&tag=TagA
5>SQL92过滤
//生产者 @RestController public class SendMsgController { @RequestMapping("/filter") public String filter(String msg,String age) { HashMap map = new HashMap(); map.put("age",age ); template.convertAndSend("04-boot-filter:", msg,map); return "发送成功"; } } //消费者 @Component @RocketMQMessageListener( consumerGroup = "boot-consumer-demo", topic = "04-boot-filter", /* selectorType = SelectorType.TAG, selectorExpression = "TagA||TagC"*/ selectorType = SelectorType.SQL92, //SQL92过滤 selectorExpression = "age>20" //过滤格式 ) public class Consumer4 implements RocketMQListener<String> { @Override public void onMessage(String msg) { System.out.println("消息的内容" + msg); } } //测试:http://localhost:8089/filter?msg=hello&age=25
pom.xml依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency>
application.properties
//生产者 rocketmq.name-server=127.0.0.1:9876 rocketmq.producer.group=my-group //设置发送消息时间,默认3秒,设置10秒,防止网络延迟导致发送失败 rocketmq.producer.send-message-timeout=10000 server.port=8089 //消费者 rocketmq.name-server=127.0.0.1:9876 server.port=8088
十.小结
1.RcoketMQ 是一款低延迟、高可靠、可伸缩、易于使用的消息中间件 2.RcoketMQ使用场景有异步解耦 削峰填谷 分布式缓存同步/消息分发等 3.生产者提供三种发送方式:1.同步发送 2.异步发送 3.单向发送 4.消费者支持PUSH和PULL两种消费模式,支持集群消费和广播消息 5.消费者的集群消费和广播消息只需要修改消费者端的代码即可 6.2种消息过滤都只要更改消费端注解上的类型和过滤方式即可
最新回复(0)