示例demo:
https://download.csdn.net/download/qq_43560721/12805721
理论:
ConnectionFactory用于管理连接的连接工厂JmsTemplate用于发送和接收消息的模板类MessageListerner消息监听器ConnectionFactory:
一个Spring为我们提供的连接池JmsTemplate每次发消息都会重新创建连接,会话和productorSpring提供了SingleConnectionFactory和CachingConnectionFactoryJmsTemplate:
是Spring提供的,只需向Spring容器内注册这个类就可以使用JmsTemplate方便的操作jmsJmsTemplate类是线程安全的,可以在整个应用范围使用。MessageListerner:
实现一个onMessage方法,该方法只接收一个Message参数。
代码演示:
队列模式:
demo结构:
新建一个maven项目
首先是引入相关jar依赖:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.xxs.jms</groupId> <artifactId>jms-spring</artifactId> <version>1.0-SNAPSHOT</version> <properties> <spring.version>4.2.5.RELEASE</spring.version> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> <exclusions> <exclusion> <artifactId>spring-context</artifactId> <groupId>org.springframework</groupId> </exclusion> </exclusions> </dependency> </dependencies> </project>创建一个接口ProducerService:提供一个sendMessage方法:
package cn.xxs.jms.producer; /** * @Author xxs * @Date 2020/9/2 17:40 */ public interface ProducerService { void sendMessage(String message); }实现这个接口:
package cn.xxs.jms.producer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import javax.annotation.Resource; import javax.jms.*; /** * @Author xxs * @Date 2020/9/2 18:06 */ public class ProducerServiceImpl implements ProducerService { @Autowired JmsTemplate jmsTemplate; @Resource(name = "queueDestination") Destination destination; public void sendMessage(final String message) { //使用JmsTemplate发送消息 jmsTemplate.send(destination, new MessageCreator() { //创建一个消息 public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage(message); return textMessage; } }); System.out.println("发送消息:"+message); } }写公共配置:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!-- 开启注解 --> <context:annotation-config/> <!-- ActiveMQ为我们提供的ConnectionFactory --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.4.123:61616"/> </bean> <!-- spring jms为我们提供的连接池 --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="targetConnectionFactory"></property> </bean> <!-- 一个队列目的地,点对点的--> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 参数 队列名字--> <constructor-arg value="queue"/> </bean> </beans>生产者配置:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!-- 引入公共配置文件--> <import resource="common.xml"/> <!-- 配置JmsTemplate,用于发送消息--> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 引用jms为我们提供的连接池--> <property name="connectionFactory" ref="connectionFactory"/> </bean> <bean class="cn.xxs.jms.producer.ProducerServiceImpl"/> </beans>生产者:
package cn.xxs.jms.producer; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * @Author xxs * @Date 2020/9/2 18:15 */ public class AppProducer { public static void main(String[] args) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("producer.xml"); ProducerService service = context.getBean(ProducerService.class); for (int i=0;i<10;i++){ service.sendMessage("send test message"+i); } context.close(); } }创建消息监听器:
package cn.xxs.jms.consumer; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * 消息监听者 * @Author xxs * @Date 2020/9/3 9:24 */ public class ConsumerMessageListener implements MessageListener { public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("接收消息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }写消费者配置:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <!-- 引入公共配置文件--> <import resource="common.xml"/> <!-- 配置消息监听器--> <bean id="ConsumerMessageListener" class="cn.xxs.jms.consumer.ConsumerMessageListener"/> <!-- 配置消息监听容器 spring提供的--> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <!-- 消息监听目的地 地址--> <property name="destination" ref="queueDestination"/> <property name="messageListener" ref="ConsumerMessageListener"/> </bean> </beans>创建两个消息启动器:
代码一样
package cn.xxs.jms.consumer; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * Consumer启动器 * @Author xxs * @Date 2020/9/3 9:33 */ public class AppConsumer { public static void main(String[] args) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml"); } }ok,我们先启动两个消费者
然后启动生产者之前我们来看一下ActiveMQ管理平台:
两个消费者,以及上个测试已经消费的10条消息:
我们启动生产者发送消息后:
到这里,我们的队列模式代码演示完毕。
主题模式:
我们只需改动一些代码就ok:
首先我们需要在公共配置common.xml中建立主题目的地
<!-- 一个主题目的地,发布订阅模式--> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic"/> </bean>然后改动接口的实现中的目标地址:
改为主题目的地
另外,将消费者配置中的消息监听目的地 地址改为主题目的地
同样我们可以看一下管理平台,当前主题里面是没有我们新建的那个的
运行两个消费者后再看管理平台,已有当前主题,并且有两个消费者:
接下来我们运行生产者发布消息:
看一下两个消费者监听消息情况(全部接收到):
管理平台也可以看出:
结束啦。。。