RocketMQ CenterOS 下的安装(最新版)

tech2023-12-21  76

获取安装包

下载地址:点击下载

解压安装包

由于RocketMQ 官方没有提供tar文件,所以需要下载unzip的工具包用于在CenterOS 下解压文件

安装unzip的安装包

测试unzip工具

解压安装包

运行RocketMQ-mqnamesrv服务

进入bin目录

sh bin/mqnamesrv &

查看运行日志 tail -f ~/logs/rocketmqlogs/namesrv.log

运行RocketMQ-broker服务

在当时目录下执行sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true & 查看运行日志tail -f ~/logs/rocketmqlogs/broker.log 查看是否运行正常sh mqadmin clusterList -n localhost:9876

脚本测试

export NAMESRV_ADDR=localhost:9876 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer 收到的内容sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

Java代码测试

提供者

// 实例化消息生产者Producer DefaultMQProducer producer = new DefaultMQProducer("test_sync_group"); producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY"); // 设置NameServer的地址 producer.setNamesrvAddr("10.168.1.248:9876"); // 启动Producer实例 try { producer.start(); for (int i = 0; i < 5; i++) { // 创建消息,并指定Topic,Tag和消息体 Message msg = new Message("MQTopic" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); // 发送消息到一个Broker SendResult sendResult = producer.send(msg); // 通过sendResult返回消息是否成功送达 System.out.printf("%s%n", sendResult); } }catch (Exception e){ }finally { // 如果不再发送消息,关闭Producer实例。 producer.shutdown(); } }

消费者

public static void main(String[] args) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3"); consumer.setNamesrvAddr("10.168.1.248:9876"); consumer.setVipChannelEnabled(false); try { consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //设置消费者拉取消息的策略,*表示消费该topic下的所有消息,也可以指定tag进行消息过滤 consumer.subscribe("MQTopic", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } catch (MQClientException e) { e.printStackTrace(); } }

总结

No route info of this topicsendDefaultImpl call timeout
最新回复(0)