获取安装包
下载地址:点击下载
解压安装包
由于RocketMQ 官方没有提供tar文件,所以需要下载unzip的工具包用于在CenterOS 下解压文件
安装unzip的安装包
测试unzip工具
解压安装包
运行RocketMQ-mqnamesrv服务
进入bin目录
sh bin/mqnamesrv
&
查看运行日志 tail -f ~/logs/rocketmqlogs/namesrv.log
运行RocketMQ-broker服务
在当时目录下执行sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true & 查看运行日志tail -f ~/logs/rocketmqlogs/broker.log 查看是否运行正常sh mqadmin clusterList -n localhost:9876
脚本测试
export NAMESRV_ADDR=localhost:9876 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer 收到的内容sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
Java代码测试
提供者
DefaultMQProducer producer
= new DefaultMQProducer("test_sync_group");
producer
.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
producer
.setNamesrvAddr("10.168.1.248:9876");
try {
producer
.start();
for (int i
= 0; i
< 5; i
++) {
Message msg
= new Message("MQTopic" ,
"TagA" ,
("Hello RocketMQ " + i
).getBytes(RemotingHelper
.DEFAULT_CHARSET
)
);
SendResult sendResult
= producer
.send(msg
);
System
.out
.printf("%s%n", sendResult
);
}
}catch (Exception e
){
}finally {
producer
.shutdown();
}
}
消费者
public static void main(String
[] args
) {
DefaultMQPushConsumer consumer
= new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer
.setNamesrvAddr("10.168.1.248:9876");
consumer
.setVipChannelEnabled(false);
try {
consumer
.setConsumeFromWhere(ConsumeFromWhere
.CONSUME_FROM_FIRST_OFFSET
);
consumer
.subscribe("MQTopic", "*");
consumer
.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus
consumeMessage(
List
<MessageExt> msgs
, ConsumeConcurrentlyContext context
) {
System
.out
.printf("%s Receive New Messages: %s %n", Thread
.currentThread().getName(), msgs
);
for (MessageExt msg
: msgs
) {
System
.out
.println(new String(msg
.getBody()));
}
return ConsumeConcurrentlyStatus
.CONSUME_SUCCESS
;
}
});
consumer
.start();
System
.out
.println("Consumer Started.");
} catch (MQClientException e
) {
e
.printStackTrace();
}
}
总结
No route info of this topicsendDefaultImpl call timeout