当前位置:网站首页>activemq message persistence

activemq message persistence

2022-08-09 10:45:00 Woody woodpecker

queue消息持久化

 queue默认是持久化的;When the producer,生产1000条消息;此时关闭mq服务,再开启mq服务,The background or display at this time1000Message to be processed;

queue设置为非持久化;代码如下;When the producer,生产1条消息;此时关闭mq服务,再开启mq服务,Background shows no news to be processed at this time;如下图所示:

 1 package com.mock.utils;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.DeliveryMode;
 5 import javax.jms.JMSException;
 6 import javax.jms.MessageProducer;
 7 import javax.jms.ObjectMessage;
 8 import javax.jms.Queue;
 9 import javax.jms.Session;
10 
11 import org.apache.activemq.ActiveMQConnectionFactory;
12 
13 public class TestActiveMqProducerCanDelete {
14     private static final String ACTIVEMQ_URL = "tcp://192.168.2.189:61616";
15     private static final String QUEUE_NAME = "queue_01";
16     private static final String QUEUE_NAME_2 = "queue_02";
17 
18     public static void main(String[] args) throws JMSException {
19         // 创建连接工厂,按照给定的URL,The default user name password
20         ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
21         // 通过连接工厂 获取connection 并启动访问
22         Connection conn = activeMQConnectionFactory.createConnection();
23         conn.start();
24         // 创建session会话
25         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
26         // 创建目的地 (具体是队列还是主题topic)
27         Queue queue = session.createQueue(QUEUE_NAME);
28         // 创建消息的生产者
29         MessageProducer messageProducer = session.createProducer(queue);
30         messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
31         // Byte类型的数据
32         ObjectMessage message = session.createObjectMessage();
33         User user = new User();
34         user.setAddress("嘉兴");
35         user.setName("Joy");
36         message.setObject(user);
37         message.setStringProperty("StringProperty", "我是 属性xxxxxxx");
38         messageProducer.send(message);
39 
40         messageProducer.close();
41         session.close();
42         conn.close();
43         System.out.println("发送消息成功");
44     }
45 
46 }
View Code

 

 

 

 

topic消息持久化:

topic 默认是不持久化的.或者说topicThe persistence of need to modify the code; 在hello world中的实现方式,The message will not be persisted;
  • topic消息持久化,Like attention WeChat public number,If the exit WeChat after,仍然可以接收To the public on the client side, quit after some push;
  • topic消息非持久化:Like attention WeChat public number,If the exit WeChat after,无法接收Went to the public in the exit this time push;If the device is online again,The offline this time don't push so;At this time if online again,This time can only receive online push;

消费者代码:Create a persistent subscribers,clientId为 xx;监听消息;

 1 package com.mock.utils;
 2 
 3 import java.io.IOException;
 4 
 5 import javax.jms.Connection;
 6 import javax.jms.JMSException;
 7 import javax.jms.Message;
 8 import javax.jms.MessageListener;
 9 import javax.jms.Session;
10 import javax.jms.TextMessage;
11 import javax.jms.Topic;
12 import javax.jms.TopicSubscriber;
13 
14 import org.apache.activemq.ActiveMQConnectionFactory;
15 
16 public class TestActiveMqTopicConsumer {
17     private static final String ACTIVEMQ_URL = "tcp://192.168.2.189:61616";
18     private static final String TOPIC_NAME = "TOPIC_NAME_1";
19 
20     public static void main(String[] args) throws JMSException, IOException {
21         System.out.println("****我是1号消费者******");
22         // 创建连接工厂,按照给定的URL,The default user name password
23         ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
24         // 通过连接工厂 获取connection 并启动访问
25         Connection conn = activeMQConnectionFactory.createConnection();
26         conn.setClientID("xx");
27         // 创建session会话
28         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
29         // 创建目的地 (具体是队列还是主题topic)
30         Topic topic = session.createTopic(TOPIC_NAME);
31         TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "xx_xx");
32         conn.start();
33         topicSubscriber.setMessageListener(new MessageListener() {
34             @Override
35             public void onMessage(Message message) {
36                 if (message != null && message instanceof TextMessage) {
37                     TextMessage textMessage = (TextMessage) message;
38                     try {
39                         System.out.println("收到消息:" + textMessage.getText());
40                     } catch (JMSException e) {
41                         // TODO Auto-generated catch block
42                         e.printStackTrace();
43                     }
44                 }
45             }
46 
47         });
48         System.in.read();
49 
50         topicSubscriber.close();
51         session.close();
52         conn.close();
53 
54     }
55 
56 }
View Code

生产者代码:

 1 package com.mock.utils;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.DeliveryMode;
 5 import javax.jms.JMSException;
 6 import javax.jms.MessageProducer;
 7 import javax.jms.Session;
 8 import javax.jms.TextMessage;
 9 import javax.jms.Topic;
10 
11 import org.apache.activemq.ActiveMQConnectionFactory;
12 
13 public class TestActiveMqTopicProducer {
14     private static final String ACTIVEMQ_URL = "tcp://192.168.2.189:61616";
15     private static final String TOPIC_NAME = "TOPIC_NAME_1";
16 
17     public static void main(String[] args) throws JMSException {
18         // 创建连接工厂,按照给定的URL,The default user name password
19         ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
20         // 通过连接工厂 获取connection 并启动访问
21         Connection conn = activeMQConnectionFactory.createConnection();
22 
23         // 创建session会话
24         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
25         // 创建目的地 (具体是队列还是主题topic)
26         Topic topic = session.createTopic(TOPIC_NAME);
27 
28         // 创建消息的生产者
29         MessageProducer messageProducer = session.createProducer(topic);
30         messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
31         conn.start();
32         for (int i = 0; i < 3; i++) {
33             // 创建消息;Can understand in accordance with the requirements for the students to write a good question
34             TextMessage textMessage = session.createTextMessage("mession-------" + i);
35             // 通过messageProducer 发送给mq
36             messageProducer.send(textMessage);
37         }
38         messageProducer.close();
39         session.close();
40         conn.close();
41         System.out.println("发送消息成功");
42     }
43 
44 }
View Code

由于JMSDeliveryMode默认是持久化的,所以,Producers code can also be so write:

 1 public static void main(String[] args) throws JMSException {
 2         // 创建连接工厂,按照给定的URL,The default user name password
 3         ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
 4         // 通过连接工厂 获取connection 并启动访问
 5         Connection conn = activeMQConnectionFactory.createConnection();
 6         conn.start();
 7         // 创建session会话
 8         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
 9         // 创建目的地 (具体是队列还是主题topic)
10         Topic topic = session.createTopic(TOPIC_NAME);
11 
12         // 创建消息的生产者
13         MessageProducer messageProducer = session.createProducer(topic);
14 
15         for (int i = 0; i < 3; i++) {
16             // 创建消息;Can understand in accordance with the requirements for the students to write a good question
17             TextMessage textMessage = session.createTextMessage("mession-------" + i);
18             // 通过messageProducer 发送给mq
19             messageProducer.send(textMessage);
20         }
21         messageProducer.close();
22         session.close();
23         conn.close();
24         System.out.println("发送消息成功");
25     }
View Code

 

activemq Management background according to the diagram below,Start the consumer code,显示如下图;

启动消费者,显示如下图,An onlineTOPIC 订阅者;

If consumers closed this time,So the background shows the same foreverTopicThe subscriber offline;

In the permanentTopicThe subscriber offline cases,生产者生产了3条消息,Then the background management according 永久TopicThe subscriber has3条待处理的消息;

 

原网站

版权声明
本文为[Woody woodpecker]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/221/202208091043257942.html