消息中间件之rocketMQ

tech2025-11-12  7

RocketMQ消息中间件

应用场景

异步解耦 削峰填谷 //请求都先来到消息中间件,然后服务器根据内需拉取请求数量 分布式缓存同步/消息分发

异步解耦 : 作为淘宝/天猫主站最核心的交易系统,每笔交易订单数据的产生会引起几百个下游业务系统的关注,包括物流、购物车、积分、阿里妈妈、流计算分析等等,整体业务系统庞大而且复杂,架构设计稍有不合理,将直接影响主站业务的连续性; 削峰填谷 : 诸如秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,削峰填谷是解决该问题的最佳方式; 顺序消息 : 细数日常中需要保证顺序的应用场景非常多,比如证券交易过程时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等等。与FIFO原理类似,MQ提供的顺序消息即保证消息的先进先出; 分布式事物消息 : 阿里巴巴的交易系统、支付红包等场景需要确保数据的最终一致性,大量引入 MQ 的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性。 大数据分析 数据在"流动"中产生价值,传统数据分析大多是基于批量计算模型,而无法做到实时的数据分析,利用阿里云消息队列(MQ)与流式计算引擎相结合,可以很方便的实现将业务数据进行实时分析。 分布式模缓存同步 天猫双11大促,各个分会场琳琅满目的商品需要实时感知价格变化,大量并发访问数据库导致会场页面响应时间长,集中式缓存因为带宽瓶颈限制商品变更的访问流量,通过 MQ 构建分布式缓存,实时通知商品数据的变化

常见消息中间件

ActiveMQ //淘汰了 KafKa //运用于大数据 日志分析 如 RabbitMQ RocketMQ

消息中间件的核心概念

消息模型(Message Model)

RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息, Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。 ConsumerGroup 由多个Consumer 实例构成。

生产者 :

一般由业务系统生产消息 , 消息会来到 broker服务器,同步和异步方式均需要Broker返回确认信息

消费者 :

一般是后台系统负责异步消费。消息消费者会从broker服务器中拉取消息,并将其提供给应用程序,从消费形式来看分为:拉取消费,推送消费

名字服务 nameaerver :

名称服务充当路由消息的提供者,生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表

代理服务器 (BrokerServer) :

消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

Message :

消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。

Topic :

表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。

标签 Tag :

为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。

消息队列MessageQueue

对于每个Topic都可以设置一定数量的消息队列用来进行数据的读取


核心基础的使用

导入依赖

<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency>

原始api发送消息

public class Producer { public static void main(String[] args) throws Exception{ //1 创建一个生产者对象, 并且指定一个生产者组 DefaultMQProducer producer = new DefaultMQProducer("wolfcode-producer"); //2 设置名字服务的地址 producer.setNamesrvAddr("127.0.0.1:9876"); //3 启动生产者 producer.start(); //4 创建一个消息 Message message = new Message("01-hello", "tagA", "hello,rocketmq".getBytes("utf-8")); //5 发送消息 producer.send(message); //6 关闭连接 producer.shutdown(); } }

消费消息

public class Consumer { public static void main(String[] args) throws Exception{ //创建一个拉取消息的消费者对象 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wolfcode-consumer"); //设置名字地址 consumer.setNamesrvAddr("127.0.0.1:9876"); //绑定消息的主题 consumer.subscribe("01-hello", "*"); //消费者监听处理消息方法 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("消费线程:"+Thread.currentThread().getName()+",消息ID:"+msg.getMsgId()+",消息内容:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动消费者 consumer.start(); } }

rocketMQ提供三种发送三种消息的方式

同步发送

必须要等到消息持久化到磁盘后,rocketMQ会给生产者返回一个信息,持久化完成

public class SyncProducer { public static void main(String[] args) throws Exception { // 实例化消息生产者Producer DefaultMQProducer producer = new DefaultMQProducer("wolfcode-producer"); // 设置NameServer的地址 producer.setNamesrvAddr("127.0.0.1:9876"); // 启动Producer实例 producer.start(); for (int i = 0; i < 100; i++) { // 创建消息,并指定Topic,Tag和消息体 Message msg = new Message("04-producer-type" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes("utf-8") /* Message body */ ); //发送同步消息到一个Broker SendResult sendResult = producer.send(msg); // 通过sendResult返回消息是否成功送达 System.out.println(JSON.toJSONString(sendResult)); } // 如果不再发送消息,关闭Producer实例。 producer.shutdown(); } }

异步发送

不需要的等消息持久化到磁盘中,就可以执行到后面的业务逻辑

public class ASyncProducer { public static void main(String[] args) throws Exception { // 实例化消息生产者Producer DefaultMQProducer producer = new DefaultMQProducer("wolfcode-producer"); // 设置NameServer的地址 producer.setNamesrvAddr("127.0.0.1:9876"); // 启动Producer实例 producer.start(); CountDownLatch count = new CountDownLatch(100); for (int i = 0; i < 100; i++) { // 创建消息,并指定Topic,Tag和消息体 Message msg = new Message("04-producer-type" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes("utf-8") /* Message body */ ); //发送同步消息到一个Broker producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { count.countDown(); System.out.println("消息发送成功"); System.out.println(JSON.toJSONString(sendResult)); } @Override public void onException(Throwable e) { count.countDown(); System.out.println("消息发送失败"+e.getMessage()); System.out.println("处理失败消息"); } }); } count.await(); // 如果不再发送消息,关闭Producer实例。 producer.shutdown(); } }

一次性发送

把消息发送到消息中间件中,不关心返回结果

public class OneWayProducer { public static void main(String[] args) throws Exception { // 实例化消息生产者Producer DefaultMQProducer producer = new DefaultMQProducer("wolfcode-producer"); // 设置NameServer的地址 producer.setNamesrvAddr("127.0.0.1:9876"); // 启动Producer实例 producer.start(); for (int i = 0; i < 100; i++) { // 创建消息,并指定Topic,Tag和消息体 Message msg = new Message("04-producer-type" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes("utf-8") /* Message body */ ); //发送同步消息到一个Broker producer.sendOneway(msg); } // 如果不再发送消息,关闭Producer实例。 producer.shutdown(); } }

消费模式

集群模式 : MessageModel.CLUSTERING

​ 多个消费者分担一个消费者的压力, 一个消息只会给一个消费者消费

广播模式 : MessageModel.BROADCASTING

​ 需要对同一个消息进行不同处理的时候, 比如对同一个消息, 需要同时发送短信和发送邮件, 一个消息会发送给所有的消费者

消费方式

推送消费 : DefaultMQPushConsumer 对象

拉取消费 : DefaultMQPullConsumer 对象

延时消息

message.setDelayTimeLevel(3);

消息过滤

Tag标签过滤

selectorType = SelectorType.TAG, selectorExpression = "TagA || TagC

SQL92过滤

RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。 - 数值比较,比如:**>>=<<=,BETWEEN,=** - 字符比较,比如:**=<>,IN;** - **IS NULL** 或者 **IS NOT NULL;** - 逻辑符号 **AND,OR,NOT;** 常量支持类型为: - 数值,比如:**1233.1415** - 字符,比如:**'abc',必须用单引号包裹起来;** - **NULL**,特殊的常量 - 布尔值,**TRUE****FALSE** 只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下: public void subscribe(finalString topic, final MessageSelector messageSelector) 注意: 在使用SQL过滤的时候, 需要配置参数在conf/broker.conf / enablePropertyFilter=true

SpringBoot集成

导入依赖

<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency>

生产者配置信息

rocketmq.name-server=127.0.0.1:9876 rocketmq.producer.group=my-group rocketmq.producer.send-message-timeout=5000 server.port=8089

代码实现

@RestController public class HelloController { @Autowired private RocketMQTemplate rocketMQTemplate; @RequestMapping("01-hello") public String sendMsg(String message,String age) throws Exception{ SendResult sendResult = rocketMQTemplate.syncSend("01-boot:", message); System.out.println(sendResult.getMsgId()); System.out.println(sendResult.getSendStatus()); return "success"; } }

消费者

配置信息

rocketmq.name-server=127.0.0.1:9876 server.port=8088

代码实现

@Component @RocketMQMessageListener( topic = "01-boot-hello", consumerGroup = "wolfcode-consumer" ) public class HelloConsumer implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt messageExt) { System.out.println("消费消息"+messageExt); } }

其他常见问题

生产消费类型

同步消息

rocketMQTemplate.syncSend("01-boot-hello", message);

异步消息

rocketMQTemplate.asyncSend("01-boot-hello", message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("消息消费成功"); } @Override public void onException(Throwable e) { System.out.println("消息消费失败"); } });

一次性消息

rocketMQTemplate.sendOneWay("01-boot-hello", message);

消费模式

@RocketMQMessageListener注解的配置项配置 messageModel = MessageModel.CLUSTERING, messageModel = MessageModel.BROADCASTING

延时消息

rocketMQTemplate.syncSend("01-boot-hello", MessageBuilder.withPayload(message).build(), 3000, 3);

设置消息标签

在发送的消息Topic:Tag 中间使用冒号隔开 rocketMQTemplate.convertAndSend("01-boot-hello:TagB",message,map); 在消费者中@RocketMQMessageListener注解的配置项配置 selectorType = SelectorType.TAG, selectorExpression = "TagA || TagC"

自定义属性设置

Map<String,Object> map=new HashMap<>(); //用户自定义属性 map.put("age", age); map.put("name", "hesj"); //也可以设置系统属性 map.put(MessageConst.PROPERTY_KEYS,age); rocketMQTemplate.convertAndSend("01-boot-hello:TagB",message,map); 过滤设置: 主要, 需要开启broker的支持用户属性配置 enablePropertyFilter=true 在消费者中@RocketMQMessageListener注解的配置项配置 消息过滤 selectorType = SelectorType.SQL92, selectorExpression = "age >18

agA || TagC"

### 自定义属性设置 ```java Map<String,Object> map=new HashMap<>(); //用户自定义属性 map.put("age", age); map.put("name", "hesj"); //也可以设置系统属性 map.put(MessageConst.PROPERTY_KEYS,age); rocketMQTemplate.convertAndSend("01-boot-hello:TagB",message,map); 过滤设置: 主要, 需要开启broker的支持用户属性配置 enablePropertyFilter=true 在消费者中@RocketMQMessageListener注解的配置项配置 消息过滤 selectorType = SelectorType.SQL92, selectorExpression = "age >18
最新回复(0)