目录
1:idea新建maven工程,pom.xml导包
2:JMS(java消息服务)的总体架构
3:在点对点的消息传递中;目的地被称为Queue(队列)
3.1:消息生产者编码
3.2:消息消费者编码---同步阻塞方式(超时之前一直阻塞)
1:consumer.receive();---消费者死等
2:consumer.receive(超时时间单位毫秒);---等待时间结束没人的话就自己关闭连接了
3.3:消息消费者编码---异步非阻塞(消费者监听消息)
3.4:多个消费者同时启动了;再来消息如何处理
3.5:点对点的传递域小总结
4:在发布订阅的消息传递中;目的地被称为Topic(主题)
4.1:发布订阅消息传递的特点;
4.2:Topic的生产者代码
4.3:Topic的消费者代码
4.4:消费者先订阅;然后生产者再发布消息
4.5:生产者先生产,消费者后消费,那么这些就是废消息
5:队列和主题的简单比较
1:idea新建maven工程,pom.xml导包
<dependencies>
<!--activeMq需要的jar包-->
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.16.0</version>
</dependency>
<!--跟spring整合的包-->
<!-- https://mvnrepository.com/artifact/org.apache.xbean/xbean-spring -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>4.17</version>
</dependency>
<!--其他的基础包-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
2:JMS(java消息服务)的总体架构
点对点:发送到Queue(队列)--发短信
点对多:发送到Topic(主题)--订阅公众号
3:在点对点的消息传递中;目的地被称为Queue(队列)
3.1:消息生产者编码
package com.wkl;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import javax.jms.*;
/**
* Description:第一次连接ActvieMq的服务
* Date: 2020/9/3 - 下午 1:51
* author: wangkanglu
* version: V1.0
*/
public class helloword {
/*账号密码如果都是默认的admin,可以不用穿;直传url*/
private static final String USERNAME = "admin";
private static final String PASSWORD = "admin";
/*这个url以tcp协议开头,java程序访问的是61616端口*/
private static final String ACTIVE_URL = "tcp://192.168.43.122:61616";
private static final String Queue_NAME = "queue01";
public static void main(String[] args) throws JMSException {
//1:创建连接工厂,按照给定的账号,密码,url地址来链接;
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,ACTIVE_URL);
//2:通过链接工厂获得connection并启动
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3:通过connection创建会话,参数分别是事务和签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4:创建目的地(队列还是主题)
Queue queue = session.createQueue(Queue_NAME);
//5:创建消息生产者
MessageProducer producer = session.createProducer(queue);
//6:通过消息生产者producer产生3条消息发到mq的队列中
for (int i = 1; i <=3 ; i++) {
//7:创建消息
TextMessage textMessage = session.createTextMessage("msg--" + i);
//8:通过perducer发送给mq
producer.send(textMessage);
}
//9:关闭资源;顺着申请,倒着关闭
producer.close();
session.close();
connection.close();
System.out.println("---end---");
}
}
3.2:消息消费者编码---同步阻塞方式(超时之前一直阻塞)
1:consumer.receive();---消费者死等
package com.wkl;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Description:消息的消费者就是处理者的helloword
* Date: 2020/9/3 - 下午 2:38
* author: wangkanglu
* version: V1.0
*/
public class Consumer {
/*账号密码如果都是默认的admin,可以不用穿;直传url*/
private static final String USERNAME = "admin";
private static final String PASSWORD = "admin";
/*这个url以tcp协议开头,java程序访问的是61616端口*/
private static final String ACTIVE_URL = "tcp://192.168.43.122:61616";
private static final String Queue_NAME = "queue01";
public static void main(String[] args) throws JMSException {
//1:创建连接工厂,按照给定的账号,密码,url地址来链接;
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,ACTIVE_URL);
//2:通过链接工厂获得connection并启动
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3:通过connection创建会话,参数分别是事务和签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4:创建目的地(队列还是主题)
Queue queue = session.createQueue(Queue_NAME);
//5:创建消息消费者
MessageConsumer consumer = session.createConsumer(queue);
//消息的消费者循环读取读取消息
while(true){
System.out.println("-------------");
//consumer读取消息,当初发送消息是testMessage,接受时也是这个类型
TextMessage textMessage = (TextMessage) consumer.receive();
if(null!=textMessage){
System.out.println("接收到的消息:"+textMessage.getText());
}else {
break;
}
}
}
}
2:consumer.receive(超时时间单位毫秒);---等待时间结束没人的话就自己关闭连接了
开始:
package com.wkl;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Description:消息的消费者就是处理者的helloword
* Date: 2020/9/3 - 下午 2:38
* author: wangkanglu
* version: V1.0
*/
public class Consumer {
/*账号密码如果都是默认的admin,可以不用穿;直传url*/
private static final String USERNAME = "admin";
private static final String PASSWORD = "admin";
/*这个url以tcp协议开头,java程序访问的是61616端口*/
private static final String ACTIVE_URL = "tcp://192.168.43.122:61616";
private static final String Queue_NAME = "queue01";
public static void main(String[] args) throws JMSException {
//1:创建连接工厂,按照给定的账号,密码,url地址来链接;
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,ACTIVE_URL);
//2:通过链接工厂获得connection并启动
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3:通过connection创建会话,参数分别是事务和签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4:创建目的地(队列还是主题)
Queue queue = session.createQueue(Queue_NAME);
//5:创建消息消费者
MessageConsumer consumer = session.createConsumer(queue);
//消息的消费者循环读取读取消息
while(true){
//6:consumer读取消息,当初发送消息是testMessage,接受时也是这个类型
//消费者只等待4秒;4秒后没消息就走了
TextMessage textMessage = (TextMessage) consumer.receive(4000L);
if(null!=textMessage){
System.out.println("接收到的消息:"+textMessage.getText());
}else {
break;
}
}
//7:关闭资源;顺着申请,倒着关闭
consumer.close();
session.close();
connection.close();
System.out.println("----end----");
}
}
3.3:消息消费者编码---异步非阻塞(消费者监听消息)
package com.wkl;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
/**
* Description:消息的消费者就是处理者的helloword,利用监听器的方式
* Date: 2020/9/3 - 下午 2:38
* author: wangkanglu
* version: V1.0
*/
public class ConsumerLisner {
/*账号密码如果都是默认的admin,可以不用穿;直传url*/
private static final String USERNAME = "admin";
private static final String PASSWORD = "admin";
/*这个url以tcp协议开头,java程序访问的是61616端口*/
private static final String ACTIVE_URL = "tcp://192.168.43.122:61616";
private static final String Queue_NAME = "queue01";
public static void main(String[] args) throws JMSException, IOException {
//1:创建连接工厂,按照给定的账号,密码,url地址来链接;
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,ACTIVE_URL);
//2:通过链接工厂获得connection并启动
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3:通过connection创建会话,参数分别是事务和签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4:创建目的地(队列还是主题)
Queue queue = session.createQueue(Queue_NAME);
//5:创建消息消费者
MessageConsumer consumer = session.createConsumer(queue);
//6:为消费者设置监听器,监听队列
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(null!=message && message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接受消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
//7:保持系统不灭
System.in.read();
//8:关闭资源;顺着申请,倒着关闭
consumer.close();
session.close();
connection.close();
System.out.println("----end----");
}
}
3.4:多个消费者同时启动了;再来消息如何处理
总结;队列中的消息只能被取一次;多个消费者监听的时候那就是平均分配;相当于按照消费者进来的顺序,一替一个的分配;
3.5:点对点的传递域小总结
4:在发布订阅的消息传递中;目的地被称为Topic(主题)
4.1:发布订阅消息传递的特点;
4.2:Topic的生产者代码
package com.wkl.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Description:发布订阅模型的发布者
* Date: 2020/9/3 - 下午 3:45
* author: wangkanglu
* version: V1.0
*/
public class TopicProduce {
/*账号密码如果都是默认的admin,可以不用穿;直传url*/
private static final String USERNAME = "admin";
private static final String PASSWORD = "admin";
/*这个url以tcp协议开头,java程序访问的是61616端口*/
private static final String ACTIVE_URL = "tcp://192.168.43.122:61616";
private static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws JMSException {
//1:创建连接工厂,按照给定的账号,密码,url地址来链接;
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVE_URL);
//2:通过链接工厂获得connection并启动
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3:通过connection创建会话,参数分别是事务和签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4:创建目的地(队列还是主题)
Topic topic = session.createTopic(TOPIC_NAME);
//5:创建消息生产者
MessageProducer producer = session.createProducer(topic);
//6:通过消息生产者producer产生3条消息发到mq的主题中
for (int i = 1; i <= 5; i++) {
//7:创建消息
TextMessage textMessage = session.createTextMessage("TOPICmsg--" + i);
//8:通过perducer发送给mq
producer.send(textMessage);
}
//9:关闭资源;顺着申请,倒着关闭
producer.close();
session.close();
connection.close();
System.out.println("---end---");
}
}
4.3:Topic的消费者代码
package com.wkl.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
/**
* Description:
* Date: 2020/9/3 - 下午 4:35
* author: wangkanglu
* version: V1.0
*/
public class TopicConsumerLisner {
/*账号密码如果都是默认的admin,可以不用穿;直传url*/
private static final String USERNAME = "admin";
private static final String PASSWORD = "admin";
/*这个url以tcp协议开头,java程序访问的是61616端口*/
private static final String ACTIVE_URL = "tcp://192.168.43.122:61616";
private static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws JMSException, IOException {
System.out.println("1---");
//1:创建连接工厂,按照给定的账号,密码,url地址来链接;
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,ACTIVE_URL);
//2:通过链接工厂获得connection并启动
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3:通过connection创建会话,参数分别是事务和签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4:创建目的地(队列还是主题)
Topic topic = session.createTopic(TOPIC_NAME);
//5:创建消息消费者
MessageConsumer consumer = session.createConsumer(topic);
//6:为消费者设置监听器,监听队列
consumer.setMessageListener((message -> {
if(null!=message && message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接受消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}));
//7:保持系统不灭
System.in.read();
//8:关闭资源;顺着申请,倒着关闭
consumer.close();
session.close();
connection.close();
System.out.println("----end----");
}
}
4.4:消费者先订阅;然后生产者再发布消息
生产者发布消息后:每个消费者都读取到了全部的信息
4.5:生产者先生产,消费者后消费,那么这些就是废消息
5:队列和主题的简单比较