消息中间件-ACTIVEMQ-2:Java编码简单实现ActiveMq的通讯

tech2023-05-08  108

目录

1:idea新建maven工程,pom.xml导包

2:JMS(java消息服务)的总体架构

3:在点对点的消息传递中;目的地被称为Queue(队列)

3.1:消息生产者编码

3.2:消息消费者编码---同步阻塞方式(超时之前一直阻塞)

1:consumer.receive();---消费者死等

2:consumer.receive(超时时间单位毫秒);---等待时间结束没人的话就自己关闭连接了

3.3:消息消费者编码---异步非阻塞(消费者监听消息)

3.4:多个消费者同时启动了;再来消息如何处理

3.5:点对点的传递域小总结

4:在发布订阅的消息传递中;目的地被称为Topic(主题)

4.1:发布订阅消息传递的特点;

4.2:Topic的生产者代码

4.3:Topic的消费者代码

4.4:消费者先订阅;然后生产者再发布消息

4.5:生产者先生产,消费者后消费,那么这些就是废消息

5:队列和主题的简单比较


1:idea新建maven工程,pom.xml导包

<dependencies> <!--activeMq需要的jar包--> <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.16.0</version> </dependency> <!--跟spring整合的包--> <!-- https://mvnrepository.com/artifact/org.apache.xbean/xbean-spring --> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>4.17</version> </dependency> <!--其他的基础包--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.30</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.3</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> </dependencies>

2:JMS(java消息服务)的总体架构

点对点:发送到Queue(队列)--发短信

点对多:发送到Topic(主题)--订阅公众号

3:在点对点的消息传递中;目的地被称为Queue(队列)

3.1:消息生产者编码

package com.wkl; import org.apache.activemq.ActiveMQConnectionFactory; import org.springframework.beans.factory.annotation.Qualifier; import javax.jms.*; /** * Description:第一次连接ActvieMq的服务 * Date: 2020/9/3 - 下午 1:51 * author: wangkanglu * version: V1.0 */ public class helloword { /*账号密码如果都是默认的admin,可以不用穿;直传url*/ private static final String USERNAME = "admin"; private static final String PASSWORD = "admin"; /*这个url以tcp协议开头,java程序访问的是61616端口*/ private static final String ACTIVE_URL = "tcp://192.168.43.122:61616"; private static final String Queue_NAME = "queue01"; public static void main(String[] args) throws JMSException { //1:创建连接工厂,按照给定的账号,密码,url地址来链接; ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,ACTIVE_URL); //2:通过链接工厂获得connection并启动 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //3:通过connection创建会话,参数分别是事务和签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4:创建目的地(队列还是主题) Queue queue = session.createQueue(Queue_NAME); //5:创建消息生产者 MessageProducer producer = session.createProducer(queue); //6:通过消息生产者producer产生3条消息发到mq的队列中 for (int i = 1; i <=3 ; i++) { //7:创建消息 TextMessage textMessage = session.createTextMessage("msg--" + i); //8:通过perducer发送给mq producer.send(textMessage); } //9:关闭资源;顺着申请,倒着关闭 producer.close(); session.close(); connection.close(); System.out.println("---end---"); } }

 

 

3.2:消息消费者编码---同步阻塞方式(超时之前一直阻塞)

1:consumer.receive();---消费者死等

package com.wkl; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * Description:消息的消费者就是处理者的helloword * Date: 2020/9/3 - 下午 2:38 * author: wangkanglu * version: V1.0 */ public class Consumer { /*账号密码如果都是默认的admin,可以不用穿;直传url*/ private static final String USERNAME = "admin"; private static final String PASSWORD = "admin"; /*这个url以tcp协议开头,java程序访问的是61616端口*/ private static final String ACTIVE_URL = "tcp://192.168.43.122:61616"; private static final String Queue_NAME = "queue01"; public static void main(String[] args) throws JMSException { //1:创建连接工厂,按照给定的账号,密码,url地址来链接; ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,ACTIVE_URL); //2:通过链接工厂获得connection并启动 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //3:通过connection创建会话,参数分别是事务和签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4:创建目的地(队列还是主题) Queue queue = session.createQueue(Queue_NAME); //5:创建消息消费者 MessageConsumer consumer = session.createConsumer(queue); //消息的消费者循环读取读取消息 while(true){ System.out.println("-------------"); //consumer读取消息,当初发送消息是testMessage,接受时也是这个类型 TextMessage textMessage = (TextMessage) consumer.receive(); if(null!=textMessage){ System.out.println("接收到的消息:"+textMessage.getText()); }else { break; } } } }

2:consumer.receive(超时时间单位毫秒);---等待时间结束没人的话就自己关闭连接了

开始:

package com.wkl; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * Description:消息的消费者就是处理者的helloword * Date: 2020/9/3 - 下午 2:38 * author: wangkanglu * version: V1.0 */ public class Consumer { /*账号密码如果都是默认的admin,可以不用穿;直传url*/ private static final String USERNAME = "admin"; private static final String PASSWORD = "admin"; /*这个url以tcp协议开头,java程序访问的是61616端口*/ private static final String ACTIVE_URL = "tcp://192.168.43.122:61616"; private static final String Queue_NAME = "queue01"; public static void main(String[] args) throws JMSException { //1:创建连接工厂,按照给定的账号,密码,url地址来链接; ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,ACTIVE_URL); //2:通过链接工厂获得connection并启动 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //3:通过connection创建会话,参数分别是事务和签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4:创建目的地(队列还是主题) Queue queue = session.createQueue(Queue_NAME); //5:创建消息消费者 MessageConsumer consumer = session.createConsumer(queue); //消息的消费者循环读取读取消息 while(true){ //6:consumer读取消息,当初发送消息是testMessage,接受时也是这个类型 //消费者只等待4秒;4秒后没消息就走了 TextMessage textMessage = (TextMessage) consumer.receive(4000L); if(null!=textMessage){ System.out.println("接收到的消息:"+textMessage.getText()); }else { break; } } //7:关闭资源;顺着申请,倒着关闭 consumer.close(); session.close(); connection.close(); System.out.println("----end----"); } }

3.3:消息消费者编码---异步非阻塞(消费者监听消息)

package com.wkl; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.io.IOException; /** * Description:消息的消费者就是处理者的helloword,利用监听器的方式 * Date: 2020/9/3 - 下午 2:38 * author: wangkanglu * version: V1.0 */ public class ConsumerLisner { /*账号密码如果都是默认的admin,可以不用穿;直传url*/ private static final String USERNAME = "admin"; private static final String PASSWORD = "admin"; /*这个url以tcp协议开头,java程序访问的是61616端口*/ private static final String ACTIVE_URL = "tcp://192.168.43.122:61616"; private static final String Queue_NAME = "queue01"; public static void main(String[] args) throws JMSException, IOException { //1:创建连接工厂,按照给定的账号,密码,url地址来链接; ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,ACTIVE_URL); //2:通过链接工厂获得connection并启动 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //3:通过connection创建会话,参数分别是事务和签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4:创建目的地(队列还是主题) Queue queue = session.createQueue(Queue_NAME); //5:创建消息消费者 MessageConsumer consumer = session.createConsumer(queue); //6:为消费者设置监听器,监听队列 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if(null!=message && message instanceof TextMessage){ TextMessage textMessage = (TextMessage) message; try { System.out.println("接受消息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }); //7:保持系统不灭 System.in.read(); //8:关闭资源;顺着申请,倒着关闭 consumer.close(); session.close(); connection.close(); System.out.println("----end----"); } }

3.4:多个消费者同时启动了;再来消息如何处理

总结;队列中的消息只能被取一次;多个消费者监听的时候那就是平均分配;相当于按照消费者进来的顺序,一替一个的分配;

3.5:点对点的传递域小总结

 

4:在发布订阅的消息传递中;目的地被称为Topic(主题)

4.1:发布订阅消息传递的特点;

4.2:Topic的生产者代码

package com.wkl.topic; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * Description:发布订阅模型的发布者 * Date: 2020/9/3 - 下午 3:45 * author: wangkanglu * version: V1.0 */ public class TopicProduce { /*账号密码如果都是默认的admin,可以不用穿;直传url*/ private static final String USERNAME = "admin"; private static final String PASSWORD = "admin"; /*这个url以tcp协议开头,java程序访问的是61616端口*/ private static final String ACTIVE_URL = "tcp://192.168.43.122:61616"; private static final String TOPIC_NAME = "topic01"; public static void main(String[] args) throws JMSException { //1:创建连接工厂,按照给定的账号,密码,url地址来链接; ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVE_URL); //2:通过链接工厂获得connection并启动 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //3:通过connection创建会话,参数分别是事务和签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4:创建目的地(队列还是主题) Topic topic = session.createTopic(TOPIC_NAME); //5:创建消息生产者 MessageProducer producer = session.createProducer(topic); //6:通过消息生产者producer产生3条消息发到mq的主题中 for (int i = 1; i <= 5; i++) { //7:创建消息 TextMessage textMessage = session.createTextMessage("TOPICmsg--" + i); //8:通过perducer发送给mq producer.send(textMessage); } //9:关闭资源;顺着申请,倒着关闭 producer.close(); session.close(); connection.close(); System.out.println("---end---"); } }

4.3:Topic的消费者代码

package com.wkl.topic; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.io.IOException; /** * Description: * Date: 2020/9/3 - 下午 4:35 * author: wangkanglu * version: V1.0 */ public class TopicConsumerLisner { /*账号密码如果都是默认的admin,可以不用穿;直传url*/ private static final String USERNAME = "admin"; private static final String PASSWORD = "admin"; /*这个url以tcp协议开头,java程序访问的是61616端口*/ private static final String ACTIVE_URL = "tcp://192.168.43.122:61616"; private static final String TOPIC_NAME = "topic01"; public static void main(String[] args) throws JMSException, IOException { System.out.println("1---"); //1:创建连接工厂,按照给定的账号,密码,url地址来链接; ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,ACTIVE_URL); //2:通过链接工厂获得connection并启动 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //3:通过connection创建会话,参数分别是事务和签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4:创建目的地(队列还是主题) Topic topic = session.createTopic(TOPIC_NAME); //5:创建消息消费者 MessageConsumer consumer = session.createConsumer(topic); //6:为消费者设置监听器,监听队列 consumer.setMessageListener((message -> { if(null!=message && message instanceof TextMessage){ TextMessage textMessage = (TextMessage) message; try { System.out.println("接受消息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } })); //7:保持系统不灭 System.in.read(); //8:关闭资源;顺着申请,倒着关闭 consumer.close(); session.close(); connection.close(); System.out.println("----end----"); } }

4.4:消费者先订阅;然后生产者再发布消息

生产者发布消息后:每个消费者都读取到了全部的信息

4.5:生产者先生产,消费者后消费,那么这些就是废消息

5:队列和主题的简单比较

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

最新回复(0)