目录
1:JMS是什么
2:JMS的四大元素
3:MS message:JMS消息;包含消息头,消息属性和消息体;
3.1:消息头,所有类型的这部分格式都是一样的
3.2:消息属性;按类型可以分为应用设置的属性,标准属性和消息中间件定义的属性
3.3:消息体,指我们具体需要消息传输的内容。共有5种格式;
4:消息的可靠性
4.1:持久性---宕机后还能不能再取到消息
1:队列:队列的默认是持久化的;
2:订阅持久化
4.2:事务---事务偏生产者
1:生产者事务:
2:消费者消费的时候的事务
4.3:签收---偏消费者
1:非事务状态下
2:事务状态下
4.4:签收和事务的关系
5:点对点模型总结
6:发布订阅总结
6.1:非持久化订阅总结
6.2:持久化订阅总结
JMS是JAVAEE框架中的一个模块,是java消息服务,相当于一套规范;
2.1:JMS provider :实现JMS接口和规范的消息中间件;也就是我们的mq服务器
2.2:JMS producer:消息生产者,创建和发送JMS消息的客户端应用;
2.3:JMS consumer:消息消费者,接受和处理JMS消息的客户端应用;
2.4:JMS message:JMS消息;包含消息头,消息属性和消息体;
1
JMSDestination
消息发送的目的地,是一个Topic或Queue send2
JMSDeliveryMode
消息的发送模式,分为NON_PERSISTENT和PERSISTENT,即持久化的和非持久化的,持久和非持久模式,一条持久的消息应该被传送“一次仅仅一次”,这就意味着JMS提供者出现故障该信息不会丢失,他会在服务器恢复之后再次传递;一条非持久的消息,最多只传送一次;这意味出现故障,他就会丢失; send3
JMSMessageID
消息ID,需要以ID:开头 send4
JMSTimestamp
消息发送时的时间,也可以理解为调用send()方法时的时间,而不是该消息发送完成的时间 send5
JMSCorrelationID
关联的消息ID,这个通常用在需要回传消息的时候 client6
JMSReplyTo
消息回复的目的地,其值为一个Topic或Queue, 这个由发送者设置,但是接收者可以决定是否响应 client7
JMSRedelivered
消息是否重复发送过,如果该消息之前发送过,那么这个属性的值需要被设置为true, 客户端可以根据这个属性的值来
确认这个消息是否重复发送过,以避免重复处理。
Provider8
JMSType
由消息发送者设置的个消息类型,代表消息的结构,有的消息中间件可能会用到这个,但这个并不是是批消息的种类,比如
TextMessage之类的
client9
JMSExpiration
消息的过期时间,以毫秒为单位,根据定义,它应该是timeToLive的值再加上发送时的GMT时间,也就是说这个指的是过期时间,而不是有效期,默认永不过期;
send10
JMSPriority
消息的优先级,0-4为普通的优化级,而5-9为高优先级,通常情况下,高优化级的消息需要优先发送 send如果需要除消息头以外的值来帮我们识别,去重,重点标注等,那么消息属性非常有用;
如:
textMessage.setStringProperty("vip","yes");1:主题发布者代码
package com.wkl.PersistenceTopic; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * Description:持久化的topic生产 * Date: 2020/9/3 - 下午 8:20 * author: wangkanglu * version: V1.0 */ public class Produce { /*账号密码如果都是默认的admin,可以不用穿;直传url*/ private static final String USERNAME = "admin"; private static final String PASSWORD = "admin"; /*这个url以tcp协议开头,java程序访问的是61616端口*/ private static final String ACTIVE_URL = "tcp://192.168.43.122:61616"; private static final String TOPIC_NAME = "topic01"; public static void main(String[] args) throws JMSException { //1:创建连接工厂,按照给定的账号,密码,url地址来链接; ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVE_URL); //2:通过链接工厂获得connection并启动 Connection connection = activeMQConnectionFactory.createConnection(); //3:通过connection创建会话,参数分别是事务和签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4:创建目的地(队列还是主题) Topic topic = session.createTopic(TOPIC_NAME); //5:创建消息生产者 MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.PERSISTENT); //先表明持久化才启动 connection.start(); //6:通过消息生产者producer产生3条消息发到mq的主题中 for (int i = 1; i <= 3; i++) { //7:创建消息 TextMessage textMessage = session.createTextMessage("TOPICmsg--" + i); //8:通过perducer发送给mq producer.send(textMessage); } //9:关闭资源;顺着申请,倒着关闭 producer.close(); session.close(); connection.close(); System.out.println("---end---"); } }2:主题订阅者代码
package com.wkl.PersistenceTopic; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.io.IOException; /** * Description:持久化的消费者改造 * Date: 2020/9/3 - 下午 8:29 * author: wangkanglu * version: V1.0 */ public class Consumer { /*账号密码如果都是默认的admin,可以不用穿;直传url*/ private static final String USERNAME = "admin"; private static final String PASSWORD = "admin"; /*这个url以tcp协议开头,java程序访问的是61616端口*/ private static final String ACTIVE_URL = "tcp://192.168.43.122:61616"; private static final String TOPIC_NAME = "topic01"; public static void main(String[] args) throws JMSException, IOException { System.out.println("zhangsan"); //1:创建连接工厂,按照给定的账号,密码,url地址来链接; ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,ACTIVE_URL); //2:通过链接工厂获得connection并启动 Connection connection = activeMQConnectionFactory.createConnection(); //设定是谁订阅了我 connection.setClientID("zhansan"); //3:通过connection创建会话,参数分别是事务和签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4:创建目的地(队列还是主题) Topic topic = session.createTopic(TOPIC_NAME); //创建主题订阅者 TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"备注。。。"); //5:启动 connection.start(); Message message = topicSubscriber.receive(); while(null!=message){ TextMessage textMessage = (TextMessage) message; System.out.println("收到消息:" + textMessage.getText()); message = topicSubscriber.receive(5000L); } session.close(); connection.close(); System.out.println("----end----"); } }3:先启动订阅
4:receive(5000L)超时时间过后,订阅者自动取消了
5:持久化表现
当zhangsan离线后,生产者发送消息;张三不会收到;但是当张三再次上线后,会受到历史消息;
总结:
一定要先运行一次消费者,等于向mq注册;类似于我订阅了这个主题然后运行生产者发送消息,此时无论消费者是否在线,都会收到,不在线的话,下次链接的时候,会把没有收到的消息都接受下来;false:只要执行send就会就如队列中;
true:先执行send,再执行session.commit,消息才被真正提交到队列中;
false:消费者直接从队列中取出并消费
true:消费者从队列中取出数据,并且进行session.conmit后,队列中的消息才认定是被消费了;
自动签收(默认):AUTO_ACKNOWLEDGE
手动签收:CLIENT_ACKNOWLEDGE,需要消费者客户端调用.acknowledge方法进行手动确认签收
允许重复消息:DUPS_OK_ACKNOWLEDGE
消费者事务开启后;只有commit()才会将全部的消息变成已消费;
通俗来讲,事务的级别比签收大;
1:在事务性会话中,当一个事务被成功提交(commit)则消息被自动签收,如果事务回滚,则消息会被再次传送,就是没消费
2:在非事务会话中,消息何时被确认消费取决于创建会话时的应答模式,(Message.acknowledge();)