RocketMQ windows下的安装(最新版)

tech2023-08-18  91

RocketMQ是什么?

RocketMQ是Apache旗下的一款开源的高性能、高吞吐量的分布式消息中间件。

特性

是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点。Producer、Consumer、队列都可以分布式。Producer向一些队列轮流发送消息,队列集合称为Topic,Consumer如果做广播消费,则一个consumer实例消费这个Topic对应的所有队列,如果做集群消费,则多个Consumer实例平均消费这个topic对应的队列集合。能够保证严格的消息顺序提供丰富的消息拉取模式高效的订阅者水平扩展能力实时的消息订阅机制亿级消息堆积能力较少的依赖

开发文档

点击跳转

环境及工具

RocketMQ 版本:4.7.1 开发工具:IntelliJ IDEA 2020.2.1 、 JDK1.8

获取安装包

下载地址:点击下载

解压安装包

准备启动

mqnamesrv启动

mqnamesrv.cmd 启动出错

配置环境变量

再次运行mqnamesrv.cmd

broker启动

broker启动时可能会提示内存不够,配置文件中配置的要求是比较高的,基本都是2G的内存,也许你本机没有那么多空闲的内存,这个时需要修改下配置文件, 编辑runbroker.cmd中的java配置,可根据自己的机器修改

set "JAVA_OPT=%JAVA_OPT% -server -Xms1024m -Xmx1024m -Xmn1024m"

运行mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true autoCreateTopicEnable 表示会自动创建Topic 启动成功。

代码测试

本遍提供的是java测试代码,你也可以用其它的脚本做测试,官网也有不同的语言请自行选择

提供者

// 实例化消息生产者Producer DefaultMQProducer producer = new DefaultMQProducer("MQ_PRODUCER_GROUP"); //producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY"); 最新版本不需要设置。3.4版本需要 // 设置NameServer的地址 producer.setNamesrvAddr("127.0.1.1:9876"); // 启动Producer实例 try { producer.start(); for (int i = 0; i < 5; i++) { // 创建消息,并指定Topic,Tag和消息体 Message msg = new Message("MQTopic" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); // 发送消息到一个Broker SendResult sendResult = producer.send(msg); // 通过sendResult返回消息是否成功送达 System.out.printf("%s%n", sendResult); } }catch (Exception e){ }finally { // 如果不再发送消息,关闭Producer实例。 producer.shutdown(); } }

消费者

public static void main(String[] args) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("MQ_PUSH_CONSUMER"); consumer.setNamesrvAddr("127.0.1.1:9876"); consumer.setVipChannelEnabled(false); try { consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //设置消费者拉取消息的策略,*表示消费该topic下的所有消息,也可以指定tag进行消息过滤 consumer.subscribe("MQTopic", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } catch (MQClientException e) { e.printStackTrace(); } }

总结

这是个简单的RocketMQ安装与测试API的功能 ,更复杂的功能扔需要探索,更需要用到实际的项目中去才会真真发现很多的更高级的功能。 P安装RocketMQ时遇到的问题, 1、内存不够,需要配置文件 2、本机安装启动的时候直接用127.0.0.1启动broker 但是如果用的是服务器需要给固定IP客户端才可以连接 如有问题,欢迎有问题及时交流。谢谢!

最新回复(0)