消息中间件-ACTIVEMQ-3:简单介绍JMS的规范和产品

tech2023-12-16  35

目录

1:JMS是什么

2:JMS的四大元素

3:MS message:JMS消息;包含消息头,消息属性和消息体;

3.1:消息头,所有类型的这部分格式都是一样的

3.2:消息属性;按类型可以分为应用设置的属性,标准属性和消息中间件定义的属性

3.3:消息体,指我们具体需要消息传输的内容。共有5种格式;

4:消息的可靠性

4.1:持久性---宕机后还能不能再取到消息

1:队列:队列的默认是持久化的;

2:订阅持久化

4.2:事务---事务偏生产者

1:生产者事务:

2:消费者消费的时候的事务

4.3:签收---偏消费者

1:非事务状态下

2:事务状态下

4.4:签收和事务的关系

5:点对点模型总结

6:发布订阅总结

6.1:非持久化订阅总结

6.2:持久化订阅总结


1:JMS是什么

JMS是JAVAEE框架中的一个模块,是java消息服务,相当于一套规范;

2:JMS的四大元素

2.1:JMS provider :实现JMS接口和规范的消息中间件;也就是我们的mq服务器

2.2:JMS producer:消息生产者,创建和发送JMS消息的客户端应用;

2.3:JMS consumer:消息消费者,接受和处理JMS消息的客户端应用;

2.4:JMS message:JMS消息;包含消息头,消息属性和消息体;

3:MS message:JMS消息;包含消息头,消息属性和消息体;

3.1:消息头,所有类型的这部分格式都是一样的

序号属性名称说明设置者

1

JMSDestination

消息发送的目的地,是一个Topic或Queue send

2

JMSDeliveryMode

消息的发送模式,分为NON_PERSISTENT和PERSISTENT,即持久化的和非持久化的,持久和非持久模式,一条持久的消息应该被传送“一次仅仅一次”,这就意味着JMS提供者出现故障该信息不会丢失,他会在服务器恢复之后再次传递;一条非持久的消息,最多只传送一次;这意味出现故障,他就会丢失; send

3

JMSMessageID

消息ID,需要以ID:开头    send

4

JMSTimestamp 

 消息发送时的时间,也可以理解为调用send()方法时的时间,而不是该消息发送完成的时间 send

5

JMSCorrelationID

 关联的消息ID,这个通常用在需要回传消息的时候     client

6

JMSReplyTo

 消息回复的目的地,其值为一个Topic或Queue, 这个由发送者设置,但是接收者可以决定是否响应 client

7

JMSRedelivered 

 消息是否重复发送过,如果该消息之前发送过,那么这个属性的值需要被设置为true, 客户端可以根据这个属性的值来

确认这个消息是否重复发送过,以避免重复处理。

 Provider

8

JMSType

 由消息发送者设置的个消息类型,代表消息的结构,有的消息中间件可能会用到这个,但这个并不是是批消息的种类,比如

TextMessage之类的

 client

9

JMSExpiration 

 消息的过期时间,以毫秒为单位,根据定义,它应该是timeToLive的值再加上发送时的GMT时间,也就是说这个指的是过期时间,而不是有效期,默认永不过期;

send 

10

JMSPriority

 消息的优先级,0-4为普通的优化级,而5-9为高优先级,通常情况下,高优化级的消息需要优先发送 send

3.2:消息属性;按类型可以分为应用设置的属性,标准属性和消息中间件定义的属性

如果需要除消息头以外的值来帮我们识别,去重,重点标注等,那么消息属性非常有用;

如:

textMessage.setStringProperty("vip","yes");

3.3:消息体,指我们具体需要消息传输的内容。共有5种格式;

StreamMessage   一种主体中包含Java基元值流的消息。其填充和读取均按顺序进行。MapMessage       一种主体中包含一组键--值对的消息。没有定义条目顺序。TextMessage       一种主体中包含Java字符串的消息(例如,XML消息)。ObjectMessage    一种主体中包含序列化Java对象的消息。BytesMessage     一种主体中包含连续字节流的消息。

4:消息的可靠性

4.1:持久性---宕机后还能不能再取到消息

1:队列:队列的默认是持久化的;

2:订阅持久化

1:主题发布者代码

package com.wkl.PersistenceTopic; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * Description:持久化的topic生产 * Date: 2020/9/3 - 下午 8:20 * author: wangkanglu * version: V1.0 */ public class Produce { /*账号密码如果都是默认的admin,可以不用穿;直传url*/ private static final String USERNAME = "admin"; private static final String PASSWORD = "admin"; /*这个url以tcp协议开头,java程序访问的是61616端口*/ private static final String ACTIVE_URL = "tcp://192.168.43.122:61616"; private static final String TOPIC_NAME = "topic01"; public static void main(String[] args) throws JMSException { //1:创建连接工厂,按照给定的账号,密码,url地址来链接; ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVE_URL); //2:通过链接工厂获得connection并启动 Connection connection = activeMQConnectionFactory.createConnection(); //3:通过connection创建会话,参数分别是事务和签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4:创建目的地(队列还是主题) Topic topic = session.createTopic(TOPIC_NAME); //5:创建消息生产者 MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.PERSISTENT); //先表明持久化才启动 connection.start(); //6:通过消息生产者producer产生3条消息发到mq的主题中 for (int i = 1; i <= 3; i++) { //7:创建消息 TextMessage textMessage = session.createTextMessage("TOPICmsg--" + i); //8:通过perducer发送给mq producer.send(textMessage); } //9:关闭资源;顺着申请,倒着关闭 producer.close(); session.close(); connection.close(); System.out.println("---end---"); } }

2:主题订阅者代码

package com.wkl.PersistenceTopic; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.io.IOException; /** * Description:持久化的消费者改造 * Date: 2020/9/3 - 下午 8:29 * author: wangkanglu * version: V1.0 */ public class Consumer { /*账号密码如果都是默认的admin,可以不用穿;直传url*/ private static final String USERNAME = "admin"; private static final String PASSWORD = "admin"; /*这个url以tcp协议开头,java程序访问的是61616端口*/ private static final String ACTIVE_URL = "tcp://192.168.43.122:61616"; private static final String TOPIC_NAME = "topic01"; public static void main(String[] args) throws JMSException, IOException { System.out.println("zhangsan"); //1:创建连接工厂,按照给定的账号,密码,url地址来链接; ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,ACTIVE_URL); //2:通过链接工厂获得connection并启动 Connection connection = activeMQConnectionFactory.createConnection(); //设定是谁订阅了我 connection.setClientID("zhansan"); //3:通过connection创建会话,参数分别是事务和签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4:创建目的地(队列还是主题) Topic topic = session.createTopic(TOPIC_NAME); //创建主题订阅者 TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"备注。。。"); //5:启动 connection.start(); Message message = topicSubscriber.receive(); while(null!=message){ TextMessage textMessage = (TextMessage) message; System.out.println("收到消息:" + textMessage.getText()); message = topicSubscriber.receive(5000L); } session.close(); connection.close(); System.out.println("----end----"); } }

3:先启动订阅

 

4:receive(5000L)超时时间过后,订阅者自动取消了

5:持久化表现

当zhangsan离线后,生产者发送消息;张三不会收到;但是当张三再次上线后,会受到历史消息;

总结:

一定要先运行一次消费者,等于向mq注册;类似于我订阅了这个主题然后运行生产者发送消息,此时无论消费者是否在线,都会收到,不在线的话,下次链接的时候,会把没有收到的消息都接受下来; 

4.2:事务---事务偏生产者

1:生产者事务:

false:只要执行send就会就如队列中;

true:先执行send,再执行session.commit,消息才被真正提交到队列中;

2:消费者消费的时候的事务

false:消费者直接从队列中取出并消费

true:消费者从队列中取出数据,并且进行session.conmit后,队列中的消息才认定是被消费了; 

4.3:签收---偏消费者

1:非事务状态下

自动签收(默认):AUTO_ACKNOWLEDGE

手动签收:CLIENT_ACKNOWLEDGE,需要消费者客户端调用.acknowledge方法进行手动确认签收

允许重复消息:DUPS_OK_ACKNOWLEDGE

2:事务状态下

消费者事务开启后;只有commit()才会将全部的消息变成已消费;

通俗来讲,事务的级别比签收大;

4.4:签收和事务的关系

1:在事务性会话中,当一个事务被成功提交(commit)则消息被自动签收,如果事务回滚,则消息会被再次传送,就是没消费

2:在非事务会话中,消息何时被确认消费取决于创建会话时的应答模式,(Message.acknowledge();)

5:点对点模型总结

6:发布订阅总结

6.1:非持久化订阅总结

6.2:持久化订阅总结

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

最新回复(0)