消息中间件-ACTIVEMQ-4:ActiveMQ的Broker

tech2025-09-05  19

目录

1:Broker是什么

2:根据不同的conf启动不同的activemq

​3:java嵌入式mq

3.1:pom.xml

3.2:启动自己java代码写的内嵌式的mq

3.3:利用自己内嵌式的mq当做队列,进行测试


1:Broker是什么

2:根据不同的conf启动不同的activemq

3:java嵌入式mq

3.1:pom.xml

<dependencies> <!--activeMq需要的jar包--> <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.16.0</version> </dependency> <!--跟spring整合的包--> <!-- https://mvnrepository.com/artifact/org.apache.xbean/xbean-spring --> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>4.17</version> </dependency> <!--其他的基础包--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.30</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.3</version> </dependency> <!--json装换的包--> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.11.2</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> </dependencies>

3.2:启动自己java代码写的内嵌式的mq

package com.wkl.broker; import org.apache.activemq.broker.BrokerService; /** * Description:自己java内嵌式的mq * Date: 2020/9/4 - 上午 11:31 * author: wangkanglu * version: V1.0 */ public class MyBroker { public static void main(String[] args) throws Exception { //Activemq也支持在jvm中内嵌的broker BrokerService brokerService = new BrokerService(); brokerService.setUseJmx(true); brokerService.addConnector("tcp://localhost:61616"); brokerService.start(); } }

3.3:利用自己内嵌式的mq当做队列,进行测试

生产者:

package com.wkl.queuq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * Description:第一次连接ActvieMq的服务 * Date: 2020/9/3 - 下午 1:51 * author: wangkanglu * version: V1.0 */ public class Produce { /*账号密码如果都是默认的admin,可以不用穿;直传url*/ private static final String USERNAME = "admin"; private static final String PASSWORD = "admin"; /*这个url以tcp协议开头,java程序访问的是61616端口*/ private static final String ACTIVE_URL = "tcp://localhost:61616"; private static final String Queue_NAME = "queue01"; public static void main(String[] args) throws JMSException { //1:创建连接工厂,按照给定的账号,密码,url地址来链接; ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,ACTIVE_URL); //2:通过链接工厂获得connection并启动 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //3:通过connection创建会话,参数分别是事务和签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4:创建目的地(队列还是主题) Queue queue = session.createQueue(Queue_NAME); //5:创建消息生产者 MessageProducer producer = session.createProducer(queue); //6:通过消息生产者producer产生3条消息发到mq的队列中 for (int i = 1; i <=5 ; i++) { //7:创建消息 TextMessage textMessage = session.createTextMessage("brokermsg--" + i); //8:通过perducer发送给mq producer.send(textMessage); } //9:关闭资源;顺着申请,倒着关闭 producer.close(); session.close(); connection.close(); System.out.println("---end---"); } }

消费者:

package com.wkl.queuq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * Description:消息的消费者就是处理者的helloword * Date: 2020/9/3 - 下午 2:38 * author: wangkanglu * version: V1.0 */ public class Consumer { /*账号密码如果都是默认的admin,可以不用穿;直传url*/ private static final String USERNAME = "admin"; private static final String PASSWORD = "admin"; /*这个url以tcp协议开头,java程序访问的是61616端口*/ private static final String ACTIVE_URL = "tcp://localhost:61616"; private static final String Queue_NAME = "queue01"; public static void main(String[] args) throws JMSException { System.out.println("2-----"); //1:创建连接工厂,按照给定的账号,密码,url地址来链接; ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,ACTIVE_URL); //2:通过链接工厂获得connection并启动 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //3:通过connection创建会话,参数分别是事务和签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4:创建目的地(队列还是主题) Queue queue = session.createQueue(Queue_NAME); //5:创建消息消费者 MessageConsumer consumer = session.createConsumer(queue); //消息的消费者循环读取读取消息 while(true){ //6:consumer读取消息,当初发送消息是testMessage,接受时也是这个类型 //消费者只等待4秒;4秒后没消息就走了 TextMessage textMessage = (TextMessage) consumer.receive(4000L); if(null!=textMessage){ System.out.println("接收到的消息broker:"+textMessage.getText()); // textMessage.acknowledge(); }else { break; } } //7:关闭资源;顺着申请,倒着关闭 consumer.close(); session.close(); connection.close(); System.out.println("----end----"); } }

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

最新回复(0)