RocketMQ消息中间件
应用场景
异步解耦
削峰填谷
分布式缓存同步
/消息分发
异步解耦
:
作为淘宝
/天猫主站最核心的交易系统,每笔交易订单数据的产生会引起几百个下游业务系统的关注,包括物流、购物车、积分、阿里妈妈、流计算分析等等,整体业务系统庞大而且复杂,架构设计稍有不合理,将直接影响主站业务的连续性;
削峰填谷
:
诸如秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,削峰填谷是解决该问题的最佳方式;
顺序消息
:
细数日常中需要保证顺序的应用场景非常多,比如证券交易过程时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等等。与FIFO原理类似,MQ提供的顺序消息即保证消息的先进先出;
分布式事物消息
:
阿里巴巴的交易系统、支付红包等场景需要确保数据的最终一致性,大量引入 MQ 的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性。
大数据分析
数据在
"流动"中产生价值,传统数据分析大多是基于批量计算模型,而无法做到实时的数据分析,利用阿里云消息队列(MQ)与流式计算引擎相结合,可以很方便的实现将业务数据进行实时分析。
分布式模缓存同步
天猫双
11大促,各个分会场琳琅满目的商品需要实时感知价格变化,大量并发访问数据库导致会场页面响应时间长,集中式缓存因为带宽瓶颈限制商品变更的访问流量,通过 MQ 构建分布式缓存,实时通知商品数据的变化
常见消息中间件
ActiveMQ
KafKa
RabbitMQ
RocketMQ
消息中间件的核心概念
消息模型(Message Model)
RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,
Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个
Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message
Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。
ConsumerGroup 由多个Consumer 实例构成。
生产者 :
一般由业务系统生产消息
, 消息会来到 broker服务器
,同步和异步方式均需要Broker返回确认信息
消费者 :
一般是后台系统负责异步消费。消息消费者会从broker服务器中拉取消息
,并将其提供给应用程序
,从消费形式来看分为
:拉取消费
,推送消费
名字服务 nameaerver :
名称服务充当路由消息的提供者
,生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表
代理服务器 (BrokerServer) :
消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
Message :
消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。
Topic :
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
标签 Tag :
为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
消息队列MessageQueue
对于每个Topic都可以设置一定数量的消息队列用来进行数据的读取
核心基础的使用
导入依赖
<dependency>
<groupId>org
.apache
.rocketmq
</groupId
>
<artifactId>rocketmq
-client
</artifactId
>
<version>4.4.0</version
>
</dependency
>
原始api发送消息
public class Producer {
public static void main(String
[] args
) throws Exception
{
DefaultMQProducer producer
= new DefaultMQProducer("wolfcode-producer");
producer
.setNamesrvAddr("127.0.0.1:9876");
producer
.start();
Message message
= new Message("01-hello", "tagA", "hello,rocketmq".getBytes("utf-8"));
producer
.send(message
);
producer
.shutdown();
}
}
消费消息
public class Consumer {
public static void main(String
[] args
) throws Exception
{
DefaultMQPushConsumer consumer
= new DefaultMQPushConsumer("wolfcode-consumer");
consumer
.setNamesrvAddr("127.0.0.1:9876");
consumer
.subscribe("01-hello", "*");
consumer
.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus
consumeMessage(List
<MessageExt> msgs
, ConsumeConcurrentlyContext context
) {
for (MessageExt msg
: msgs
) {
System
.out
.println("消费线程:"+Thread
.currentThread().getName()+",消息ID:"+msg
.getMsgId()+",消息内容:"+new String(msg
.getBody()));
}
return ConsumeConcurrentlyStatus
.CONSUME_SUCCESS
;
}
});
consumer
.start();
}
}
rocketMQ提供三种发送三种消息的方式
同步发送
必须要等到消息持久化到磁盘后,rocketMQ会给生产者返回一个信息,持久化完成
public class SyncProducer {
public static void main(String
[] args
) throws Exception
{
DefaultMQProducer producer
= new DefaultMQProducer("wolfcode-producer");
producer
.setNamesrvAddr("127.0.0.1:9876");
producer
.start();
for (int i
= 0; i
< 100; i
++) {
Message msg
= new Message("04-producer-type" ,
"TagA" ,
("Hello RocketMQ " + i
).getBytes("utf-8")
);
SendResult sendResult
= producer
.send(msg
);
System
.out
.println(JSON
.toJSONString(sendResult
));
}
producer
.shutdown();
}
}
异步发送
不需要的等消息持久化到磁盘中,就可以执行到后面的业务逻辑
public class ASyncProducer {
public static void main(String
[] args
) throws Exception
{
DefaultMQProducer producer
= new DefaultMQProducer("wolfcode-producer");
producer
.setNamesrvAddr("127.0.0.1:9876");
producer
.start();
CountDownLatch count
= new CountDownLatch(100);
for (int i
= 0; i
< 100; i
++) {
Message msg
= new Message("04-producer-type" ,
"TagA" ,
("Hello RocketMQ " + i
).getBytes("utf-8")
);
producer
.send(msg
, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult
) {
count
.countDown();
System
.out
.println("消息发送成功");
System
.out
.println(JSON
.toJSONString(sendResult
));
}
@Override
public void onException(Throwable e
) {
count
.countDown();
System
.out
.println("消息发送失败"+e
.getMessage());
System
.out
.println("处理失败消息");
}
});
}
count
.await();
producer
.shutdown();
}
}
一次性发送
把消息发送到消息中间件中,不关心返回结果
public class OneWayProducer {
public static void main(String
[] args
) throws Exception
{
DefaultMQProducer producer
= new DefaultMQProducer("wolfcode-producer");
producer
.setNamesrvAddr("127.0.0.1:9876");
producer
.start();
for (int i
= 0; i
< 100; i
++) {
Message msg
= new Message("04-producer-type" ,
"TagA" ,
("Hello RocketMQ " + i
).getBytes("utf-8")
);
producer
.sendOneway(msg
);
}
producer
.shutdown();
}
}
消费模式
集群模式 : MessageModel.CLUSTERING
多个消费者分担一个消费者的压力, 一个消息只会给一个消费者消费
广播模式 : MessageModel.BROADCASTING
需要对同一个消息进行不同处理的时候, 比如对同一个消息, 需要同时发送短信和发送邮件, 一个消息会发送给所有的消费者
消费方式
推送消费 : DefaultMQPushConsumer 对象
拉取消费 : DefaultMQPullConsumer 对象
延时消息
message.setDelayTimeLevel(3);
消息过滤
Tag标签过滤
selectorType = SelectorType.TAG,
selectorExpression = "TagA || TagC
SQL92过滤
RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
- 数值比较,比如:
**>,
>=,
<,
<=,BETWEEN,
=;
**
- 字符比较,比如:
**=,
<>,IN;
**
- **IS NULL
** 或者
**IS NOT NULL;
**
- 逻辑符号
**AND,OR,NOT;
**
常量支持类型为:
- 数值,比如:
**123,
3.1415;
**
- 字符,比如:
**'abc',必须用单引号包裹起来;
**
- **NULL
**,特殊的常量
- 布尔值,
**TRUE
** 或
**FALSE
**
只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:
public void subscribe(finalString topic
, final MessageSelector messageSelector
)
注意
: 在使用SQL过滤的时候
, 需要配置参数在conf
/broker
.conf
/ enablePropertyFilter
=true
SpringBoot集成
导入依赖
<dependency>
<groupId>org
.apache
.rocketmq
</groupId
>
<artifactId>rocketmq
-spring
-boot
-starter
</artifactId
>
<version>2.0.3</version
>
</dependency
>
生产者配置信息
rocketmq
.name
-server
=127.0.0.1:9876
rocketmq
.producer
.group
=my
-group
rocketmq
.producer
.send
-message
-timeout
=5000
server
.port
=8089
代码实现
@RestController
public class HelloController {
@Autowired
private RocketMQTemplate rocketMQTemplate
;
@RequestMapping("01-hello")
public String
sendMsg(String message
,String age
) throws Exception
{
SendResult sendResult
= rocketMQTemplate
.syncSend("01-boot:", message
);
System
.out
.println(sendResult
.getMsgId());
System
.out
.println(sendResult
.getSendStatus());
return "success";
}
}
消费者
配置信息
rocketmq
.name
-server
=127.0.0.1:9876
server
.port
=8088
代码实现
@Component
@RocketMQMessageListener(
topic
= "01-boot-hello",
consumerGroup
= "wolfcode-consumer"
)
public class HelloConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt
) {
System
.out
.println("消费消息"+messageExt
);
}
}
其他常见问题
生产消费类型
同步消息
rocketMQTemplate
.syncSend("01-boot-hello", message
);
异步消息
rocketMQTemplate
.asyncSend("01-boot-hello", message
, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult
) {
System
.out
.println("消息消费成功");
}
@Override
public void onException(Throwable e
) {
System
.out
.println("消息消费失败");
}
});
一次性消息
rocketMQTemplate
.sendOneWay("01-boot-hello", message
);
消费模式
@RocketMQMessageListener注解的配置项配置
messageModel
= MessageModel
.CLUSTERING
,
messageModel
= MessageModel
.BROADCASTING
延时消息
rocketMQTemplate
.syncSend("01-boot-hello", MessageBuilder
.withPayload(message
).build(), 3000, 3);
设置消息标签
在发送的消息Topic
:Tag 中间使用冒号隔开
rocketMQTemplate
.convertAndSend("01-boot-hello:TagB",message
,map
);
在消费者中
@RocketMQMessageListener注解的配置项配置
selectorType
= SelectorType
.TAG
,
selectorExpression
= "TagA || TagC"
自定义属性设置
Map
<String,Object> map
=new HashMap<>();
map
.put("age", age
);
map
.put("name", "hesj");
map
.put(MessageConst
.PROPERTY_KEYS
,age
);
rocketMQTemplate
.convertAndSend("01-boot-hello:TagB",message
,map
);
过滤设置
:
主要
, 需要开启broker的支持用户属性配置
enablePropertyFilter
=true
在消费者中
@RocketMQMessageListener注解的配置项配置 消息过滤
selectorType
= SelectorType
.SQL92
,
selectorExpression
= "age
>18
agA || TagC"
### 自定义属性设置
```java
Map<String,Object> map=new HashMap<>();
//用户自定义属性
map.put("age", age);
map.put("name", "hesj");
//也可以设置系统属性
map.put(MessageConst.PROPERTY_KEYS,age);
rocketMQTemplate.convertAndSend("01-boot-hello:TagB",message,map);
过滤设置:
主要, 需要开启broker的支持用户属性配置
enablePropertyFilter=true
在消费者中@RocketMQMessageListener注解的配置项配置 消息过滤
selectorType = SelectorType.SQL92,
selectorExpression = "age >18