Java 封装阿里云 RocketMQ

tech2026-03-11  2


前言:

     基于阿里云rocketMQ收发消息进行封装,介于官方的案例实在太不灵活。在项目开发中耦合度太高,故封装一套嵌入方便的组件,希望能够有所帮助。

     不做过多介绍,本文主要是针对收发消息进行介绍,解锁更多功能可以参考官方文档。

      以下内容只列举了部分代码,完整代码实现请参考:

      https://gitee.com/DHing/ali-rocket-mq/tree/master/rocket-mq

1.导入最新client:

compile 'com.aliyun.openservices:ons-client:1.8.7.1.Final'

 基础配置(参考官方文档demo) 

@Configuration @ConfigurationProperties(prefix = "rocketmq") @Data @Component @ConditionalOnProperty(prefix = "rocketmq",name = "config",havingValue = "true") public class MQProperties { private String accessKey; private String secretKey; private String nameSrvAddr; private String groupId; } ----------- bootstrap.yml rocketmq: config: true accessKey: ******* secretKey: ******* nameSrvAddr: ****** groupId: ******

2.定义Producer:  

/** * 阿里MQ生产者 * */ @Slf4j public class AliMQProducerProcess extends AbstractMessageProducerProcess { private Producer producer; private static volatile AliMQProducerProcess INSTANCE; private AliMQProducerProcess() { } public static AliMQProducerProcess getInstance() { if (INSTANCE == null) { synchronized (AliMQProducerProcess.class) { if (INSTANCE == null) { INSTANCE = new AliMQProducerProcess(); } } } return INSTANCE; } /** * 初始化AliMQ配置信息 */ @Override public void initProducer(MQProperties mqProperties) { Properties properties = new Properties(); properties.put(PropertyKeyConst.AccessKey, mqProperties.getAccessKey()); properties.put(PropertyKeyConst.SecretKey, mqProperties.getSecretKey()); properties.put(PropertyKeyConst.NAMESRV_ADDR, mqProperties.getNameSrvAddr()); properties.put(PropertyKeyConst.SendMsgTimeoutMillis, 10000); producer = ONSFactory.createProducer(properties); producer.start(); } @Override public void sendMessageFromQueue() { Message message = null; try { message = pool.take(); producer.send(message); log.info("send message to quene topic is: {}, body is: {}",message.getTopic(), new String(message.getBody())); } catch (InterruptedException e) { log.error("[{}] is stopping, due to following error:", AliMQProducerProcess.class); log.error(e.getMessage(), e); Thread.interrupted(); } catch (Exception e) { log.error(e.getMessage(), e); } } @Override public void pushMessageToQueue(MessageEntity msg) { try { String json = JSON.toJSONString(msg); if (null == json || json.trim().isEmpty()) { return; } String key = msg.getKey(); if (null == key || key.trim().isEmpty()) { key = ""; } Message message = new Message(msg.getTopic(), msg.getTag(), key, json.getBytes()); //设置消息投送时间,最大为7天 if (msg.getStartDeliverTime() != null && msg.getStartDeliverTime() > 0l) { msg.setStartDeliverTime(msg.getStartDeliverTime()); } pool.offer(message); } catch (Exception e) { log.error(e.getMessage(), e); } } }

   3.消息消费(自动获取topic下内容):  

/** * 阿里MQ消费者 * */ @Slf4j public class AliMQConsumerProcess extends AbstractMessageConsumerProcess { private static Consumer consumer; private static volatile AliMQConsumerProcess INSTANCE; private AliMQConsumerProcess() {} public static AliMQConsumerProcess getInstance() { if (INSTANCE == null) { synchronized (AliMQConsumerProcess.class) { if (INSTANCE == null) { INSTANCE = new AliMQConsumerProcess(); } } } return INSTANCE; } /** * 初始化AliMQ配置信息 */ @Override public void initConsumer(MQProperties mqProperties) { Properties properties = new Properties(); properties.put(PropertyKeyConst.AccessKey, mqProperties.getAccessKey()); properties.put(PropertyKeyConst.SecretKey, mqProperties.getSecretKey()); properties.put(PropertyKeyConst.NAMESRV_ADDR, mqProperties.getNameSrvAddr()); properties.put(PropertyKeyConst.GROUP_ID, mqProperties.getGroupId()); consumer = ONSFactory.createConsumer(properties); } @Override public void handlerMessage(MessageConsumer listener) { consumer.subscribe(listener.getTopic(), listener.getTag(), new MessageListener() { public Action consume(Message message, ConsumeContext consumeContext) { String tag = message.getTag(); if (null == tag || tag.trim().isEmpty()) { log.info("Wrong message, 'tag' is null or empty, message: {}", message); return Action.CommitMessage; } try { String msg = new String(message.getBody()); if (msg != null && !msg.trim().isEmpty()) { listener.onMessage(msg); } return Action.CommitMessage; } catch (Exception e) { log.error(e.getMessage(), e); return Action.ReconsumeLater; } } }); } @Override public void start() { consumer.start(); } }

4.使用方式:

/** * 供第三方调用发送消息 * */ @Component public class MessageProducer { private AbstractMessageProducerProcess messageProducerProcess; @Autowired public MessageProducer(MQProperties properties) { this.messageProducerProcess = AliMQProducerProcess.getInstance(); this.messageProducerProcess.init(properties); } public void send(MessageEntity me) { messageProducerProcess.pushMessageToQueue(me); } } /** * 供第三方实现接口,会自动加载 */ public interface MessageConsumer { /** * 接受消息 * @param msg */ public void onMessage(String msg); /** * 获取监听Topic * @return */ public String getTopic(); /** * 获取监听Topic中的TAG,如果该字段为空,默认监听该Topic下面所有消息 * @return */ public String getTag(); }

 


 

最新回复(0)