RocketMQ是什么?
RocketMQ是Apache旗下的一款开源的高性能、高吞吐量的分布式消息中间件。
特性
是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点。Producer、Consumer、队列都可以分布式。Producer向一些队列轮流发送消息,队列集合称为Topic,Consumer如果做广播消费,则一个consumer实例消费这个Topic对应的所有队列,如果做集群消费,则多个Consumer实例平均消费这个topic对应的队列集合。能够保证严格的消息顺序提供丰富的消息拉取模式高效的订阅者水平扩展能力实时的消息订阅机制亿级消息堆积能力较少的依赖
开发文档
点击跳转
环境及工具
RocketMQ 版本:4.7.1 开发工具:IntelliJ IDEA 2020.2.1 、 JDK1.8
获取安装包
下载地址:点击下载
解压安装包
准备启动
mqnamesrv启动
mqnamesrv.cmd 启动出错
配置环境变量
再次运行mqnamesrv.cmd
broker启动
broker启动时可能会提示内存不够,配置文件中配置的要求是比较高的,基本都是2G的内存,也许你本机没有那么多空闲的内存,这个时需要修改下配置文件, 编辑runbroker.cmd中的java配置,可根据自己的机器修改
set "JAVA_OPT=%JAVA_OPT% -server -Xms1024m -Xmx1024m -Xmn1024m"
运行mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true autoCreateTopicEnable 表示会自动创建Topic 启动成功。
代码测试
本遍提供的是java测试代码,你也可以用其它的脚本做测试,官网也有不同的语言请自行选择
提供者
DefaultMQProducer producer
= new DefaultMQProducer("MQ_PRODUCER_GROUP");
producer
.setNamesrvAddr("127.0.1.1:9876");
try {
producer
.start();
for (int i
= 0; i
< 5; i
++) {
Message msg
= new Message("MQTopic" ,
"TagA" ,
("Hello RocketMQ " + i
).getBytes(RemotingHelper
.DEFAULT_CHARSET
)
);
SendResult sendResult
= producer
.send(msg
);
System
.out
.printf("%s%n", sendResult
);
}
}catch (Exception e
){
}finally {
producer
.shutdown();
}
}
消费者
public static void main(String
[] args
) {
DefaultMQPushConsumer consumer
= new DefaultMQPushConsumer("MQ_PUSH_CONSUMER");
consumer
.setNamesrvAddr("127.0.1.1:9876");
consumer
.setVipChannelEnabled(false);
try {
consumer
.setConsumeFromWhere(ConsumeFromWhere
.CONSUME_FROM_FIRST_OFFSET
);
consumer
.subscribe("MQTopic", "*");
consumer
.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus
consumeMessage(
List
<MessageExt> msgs
, ConsumeConcurrentlyContext context
) {
System
.out
.printf("%s Receive New Messages: %s %n", Thread
.currentThread().getName(), msgs
);
for (MessageExt msg
: msgs
) {
System
.out
.println(new String(msg
.getBody()));
}
return ConsumeConcurrentlyStatus
.CONSUME_SUCCESS
;
}
});
consumer
.start();
System
.out
.println("Consumer Started.");
} catch (MQClientException e
) {
e
.printStackTrace();
}
}
总结
这是个简单的RocketMQ安装与测试API的功能 ,更复杂的功能扔需要探索,更需要用到实际的项目中去才会真真发现很多的更高级的功能。 P安装RocketMQ时遇到的问题, 1、内存不够,需要配置文件 2、本机安装启动的时候直接用127.0.0.1启动broker 但是如果用的是服务器需要给固定IP客户端才可以连接 如有问题,欢迎有问题及时交流。谢谢!