目录
1:Broker是什么
2:根据不同的conf启动不同的activemq
3:java嵌入式mq
3.1:pom.xml
3.2:启动自己java代码写的内嵌式的mq
3.3:利用自己内嵌式的mq当做队列,进行测试
生产者:
package com.wkl.queuq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * Description:第一次连接ActvieMq的服务 * Date: 2020/9/3 - 下午 1:51 * author: wangkanglu * version: V1.0 */ public class Produce { /*账号密码如果都是默认的admin,可以不用穿;直传url*/ private static final String USERNAME = "admin"; private static final String PASSWORD = "admin"; /*这个url以tcp协议开头,java程序访问的是61616端口*/ private static final String ACTIVE_URL = "tcp://localhost:61616"; private static final String Queue_NAME = "queue01"; public static void main(String[] args) throws JMSException { //1:创建连接工厂,按照给定的账号,密码,url地址来链接; ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,ACTIVE_URL); //2:通过链接工厂获得connection并启动 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //3:通过connection创建会话,参数分别是事务和签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4:创建目的地(队列还是主题) Queue queue = session.createQueue(Queue_NAME); //5:创建消息生产者 MessageProducer producer = session.createProducer(queue); //6:通过消息生产者producer产生3条消息发到mq的队列中 for (int i = 1; i <=5 ; i++) { //7:创建消息 TextMessage textMessage = session.createTextMessage("brokermsg--" + i); //8:通过perducer发送给mq producer.send(textMessage); } //9:关闭资源;顺着申请,倒着关闭 producer.close(); session.close(); connection.close(); System.out.println("---end---"); } }消费者:
package com.wkl.queuq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * Description:消息的消费者就是处理者的helloword * Date: 2020/9/3 - 下午 2:38 * author: wangkanglu * version: V1.0 */ public class Consumer { /*账号密码如果都是默认的admin,可以不用穿;直传url*/ private static final String USERNAME = "admin"; private static final String PASSWORD = "admin"; /*这个url以tcp协议开头,java程序访问的是61616端口*/ private static final String ACTIVE_URL = "tcp://localhost:61616"; private static final String Queue_NAME = "queue01"; public static void main(String[] args) throws JMSException { System.out.println("2-----"); //1:创建连接工厂,按照给定的账号,密码,url地址来链接; ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,ACTIVE_URL); //2:通过链接工厂获得connection并启动 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //3:通过connection创建会话,参数分别是事务和签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4:创建目的地(队列还是主题) Queue queue = session.createQueue(Queue_NAME); //5:创建消息消费者 MessageConsumer consumer = session.createConsumer(queue); //消息的消费者循环读取读取消息 while(true){ //6:consumer读取消息,当初发送消息是testMessage,接受时也是这个类型 //消费者只等待4秒;4秒后没消息就走了 TextMessage textMessage = (TextMessage) consumer.receive(4000L); if(null!=textMessage){ System.out.println("接收到的消息broker:"+textMessage.getText()); // textMessage.acknowledge(); }else { break; } } //7:关闭资源;顺着申请,倒着关闭 consumer.close(); session.close(); connection.close(); System.out.println("----end----"); } }