当前位置:网站首页>activemq 消息持久化

activemq 消息持久化

2022-08-09 10:43:00 啄木鸟伍迪

queue消息持久化

 queue默认是持久化的;当启动生产者,生产1000条消息;此时关闭mq服务,再开启mq服务,此时后台还是显示1000条消息待处理;

queue设置为非持久化;代码如下;当启动生产者,生产1条消息;此时关闭mq服务,再开启mq服务,此时后台显示没有消息待处理;如下图所示:

 1 package com.mock.utils;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.DeliveryMode;
 5 import javax.jms.JMSException;
 6 import javax.jms.MessageProducer;
 7 import javax.jms.ObjectMessage;
 8 import javax.jms.Queue;
 9 import javax.jms.Session;
10 
11 import org.apache.activemq.ActiveMQConnectionFactory;
12 
13 public class TestActiveMqProducerCanDelete {
14     private static final String ACTIVEMQ_URL = "tcp://192.168.2.189:61616";
15     private static final String QUEUE_NAME = "queue_01";
16     private static final String QUEUE_NAME_2 = "queue_02";
17 
18     public static void main(String[] args) throws JMSException {
19         // 创建连接工厂,按照给定的URL,采用默认用户名密码
20         ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
21         // 通过连接工厂 获取connection 并启动访问
22         Connection conn = activeMQConnectionFactory.createConnection();
23         conn.start();
24         // 创建session会话
25         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
26         // 创建目的地 (具体是队列还是主题topic)
27         Queue queue = session.createQueue(QUEUE_NAME);
28         // 创建消息的生产者
29         MessageProducer messageProducer = session.createProducer(queue);
30         messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
31         // Byte类型的数据
32         ObjectMessage message = session.createObjectMessage();
33         User user = new User();
34         user.setAddress("嘉兴");
35         user.setName("Joy");
36         message.setObject(user);
37         message.setStringProperty("StringProperty", "我是 属性xxxxxxx");
38         messageProducer.send(message);
39 
40         messageProducer.close();
41         session.close();
42         conn.close();
43         System.out.println("发送消息成功");
44     }
45 
46 }
View Code

 

 

 

 

topic消息持久化:

topic 默认是不持久化的。或者说topic的持久化需要修改代码; 在hello world中的实现方式,消息是不会被持久化的;
  • topic消息持久化,就好比关注了微信公众号,之后如果退出了微信,仍然可以接收到公众号的在客户端退出之后的一些推送;
  • topic消息非持久化:就好比关注了微信公众号,之后如果退出了微信,无法接收到公众号在退出这段时间内的推送;如果此时设备又上线了,那么离线这段时间内的推送就没有了;此时如果又在线了,只能接收到在线这个时间内的推送;

消费者代码:创建一个持久化的订阅者,clientId为 xx;监听消息;

 1 package com.mock.utils;
 2 
 3 import java.io.IOException;
 4 
 5 import javax.jms.Connection;
 6 import javax.jms.JMSException;
 7 import javax.jms.Message;
 8 import javax.jms.MessageListener;
 9 import javax.jms.Session;
10 import javax.jms.TextMessage;
11 import javax.jms.Topic;
12 import javax.jms.TopicSubscriber;
13 
14 import org.apache.activemq.ActiveMQConnectionFactory;
15 
16 public class TestActiveMqTopicConsumer {
17     private static final String ACTIVEMQ_URL = "tcp://192.168.2.189:61616";
18     private static final String TOPIC_NAME = "TOPIC_NAME_1";
19 
20     public static void main(String[] args) throws JMSException, IOException {
21         System.out.println("****我是1号消费者******");
22         // 创建连接工厂,按照给定的URL,采用默认用户名密码
23         ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
24         // 通过连接工厂 获取connection 并启动访问
25         Connection conn = activeMQConnectionFactory.createConnection();
26         conn.setClientID("xx");
27         // 创建session会话
28         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
29         // 创建目的地 (具体是队列还是主题topic)
30         Topic topic = session.createTopic(TOPIC_NAME);
31         TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "xx_xx");
32         conn.start();
33         topicSubscriber.setMessageListener(new MessageListener() {
34             @Override
35             public void onMessage(Message message) {
36                 if (message != null && message instanceof TextMessage) {
37                     TextMessage textMessage = (TextMessage) message;
38                     try {
39                         System.out.println("收到消息:" + textMessage.getText());
40                     } catch (JMSException e) {
41                         // TODO Auto-generated catch block
42                         e.printStackTrace();
43                     }
44                 }
45             }
46 
47         });
48         System.in.read();
49 
50         topicSubscriber.close();
51         session.close();
52         conn.close();
53 
54     }
55 
56 }
View Code

生产者代码:

 1 package com.mock.utils;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.DeliveryMode;
 5 import javax.jms.JMSException;
 6 import javax.jms.MessageProducer;
 7 import javax.jms.Session;
 8 import javax.jms.TextMessage;
 9 import javax.jms.Topic;
10 
11 import org.apache.activemq.ActiveMQConnectionFactory;
12 
13 public class TestActiveMqTopicProducer {
14     private static final String ACTIVEMQ_URL = "tcp://192.168.2.189:61616";
15     private static final String TOPIC_NAME = "TOPIC_NAME_1";
16 
17     public static void main(String[] args) throws JMSException {
18         // 创建连接工厂,按照给定的URL,采用默认用户名密码
19         ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
20         // 通过连接工厂 获取connection 并启动访问
21         Connection conn = activeMQConnectionFactory.createConnection();
22 
23         // 创建session会话
24         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
25         // 创建目的地 (具体是队列还是主题topic)
26         Topic topic = session.createTopic(TOPIC_NAME);
27 
28         // 创建消息的生产者
29         MessageProducer messageProducer = session.createProducer(topic);
30         messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
31         conn.start();
32         for (int i = 0; i < 3; i++) {
33             // 创建消息;可以理解为学生按照要求写好问题
34             TextMessage textMessage = session.createTextMessage("mession-------" + i);
35             // 通过messageProducer 发送给mq
36             messageProducer.send(textMessage);
37         }
38         messageProducer.close();
39         session.close();
40         conn.close();
41         System.out.println("发送消息成功");
42     }
43 
44 }
View Code

由于JMSDeliveryMode默认是持久化的,所以,生产者代码也可以这么写:

 1 public static void main(String[] args) throws JMSException {
 2         // 创建连接工厂,按照给定的URL,采用默认用户名密码
 3         ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
 4         // 通过连接工厂 获取connection 并启动访问
 5         Connection conn = activeMQConnectionFactory.createConnection();
 6         conn.start();
 7         // 创建session会话
 8         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
 9         // 创建目的地 (具体是队列还是主题topic)
10         Topic topic = session.createTopic(TOPIC_NAME);
11 
12         // 创建消息的生产者
13         MessageProducer messageProducer = session.createProducer(topic);
14 
15         for (int i = 0; i < 3; i++) {
16             // 创建消息;可以理解为学生按照要求写好问题
17             TextMessage textMessage = session.createTextMessage("mession-------" + i);
18             // 通过messageProducer 发送给mq
19             messageProducer.send(textMessage);
20         }
21         messageProducer.close();
22         session.close();
23         conn.close();
24         System.out.println("发送消息成功");
25     }
View Code

 

activemq 管理后台显示如下图,启动消费者代码,显示如下图;

启动消费者,显示如下图,即在线的拥有TOPIC 订阅者;

如果此时关闭消费者,那么后台显示该同永久Topic订阅者离线;

在永久Topic订阅者离线情况下,生产者生产了3条消息,那么后台管理显示 永久Topic订阅者有3条待处理的消息;

 

原网站

版权声明
本文为[啄木鸟伍迪]所创,转载请带上原文链接,感谢
https://www.cnblogs.com/lixiuming521125/p/16564976.html