当前位置:网站首页>Community version Alibaba MQ ordinary message sending subscription demo
Community version Alibaba MQ ordinary message sending subscription demo
2022-04-23 12:56:00 【Play ha ha 527】
Tips : When the article is finished , Directories can be generated automatically , How to generate it, please refer to the help document on the right
Community version Ali MQ Ordinary message sending subscription Demo
Preface
Tips : The following is the main body of this article , The following cases can be used for reference
One 、 Send a normal message
Alicloud message queue RocketMQ Version provides three ways to send ordinary messages : The synchronous 、 Asynchronous transmission and one-way transmission (Oneway) send out . This article only introduces synchronous transmission , This method has a wide range of application scenarios , For example, important notice email 、 Sign up for SMS notification 、 Marketing SMS system, etc ..
import java.util.Date;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQProducer {
/** * Replace with your alicloud account AccessKey ID and AccessKey Secret. */
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY"));
}
public static void main(String[] args) throws MQClientException {
/** * establish Producer, And turn on the message track . It is set to you in Alibaba cloud message queue RocketMQ Created by version console Group ID. * If you don't want to turn on the message track , You can create : *DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook()); */
DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);
/** * Set the access method to Alibaba cloud , When using message tracks on the cloud , You need to set this , If you do not turn on the message trace function , This item is not set when running . */
producer.setAccessChannel(AccessChannel.CLOUD);
/** * It is set to you from Alibaba cloud message queue RocketMQ The access point information obtained from the console , similar “http://MQ_INST_XXXX.aliyuncs.com:80”. */
producer.setNamesrvAddr("YOUR ACCESS POINT");
producer.start();
for (int i = 0; i < 128; i++) {
try {
Message msg = new Message("YOUR TOPIC",
"YOUR MESSAGE TAG",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
// Message delivery failed , Need to retry processing , You can resend the message or persist the data for compensation processing .
System.out.println(new Date() + " Send mq message failed.");
e.printStackTrace();
}
}
// Before the app exits , The destruction Producer object .
// Be careful : If you don't destroy it, there's no problem .
producer.shutdown();
}
}
Two 、 Subscribe to general news
There is only one way to subscribe to ordinary messages
import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
public class RocketMQPushConsumer {
/** * Replace with your alicloud account AccessKey ID and AccessKey Secret. */
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY"));
}
public static void main(String[] args) throws MQClientException {
/** * establish Consumer, And turn on the message track . It is set to you in Alibaba cloud message queue RocketMQ Created by version console Group ID. * If you don't want to turn on the message track , You can create : * DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely()); */
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null);
// Set as alicloud message queue RocketMQ Access point of version instance .
consumer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
// The message track on Alibaba cloud needs to be set to CLOUD The way , When using message tracks on the cloud , You need to set this , If you do not turn on the message trace function , This item is not set when running .
consumer.setAccessChannel(AccessChannel.CLOUD);
// It is set to you in Alibaba cloud message queue RocketMQ Created on the console Topic.
consumer.subscribe("YOUR TOPIC", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("Receive New Messages: %s %n", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
版权声明
本文为[Play ha ha 527]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204230615230965.html
边栏推荐
- 31. 下一个排列
- 使用Source Insight查看编辑源代码
- If you were a golang interviewer, what questions would you ask?
- Van uploader upload picture implementation process, using native input to upload pictures
- Can I take the CPDA data analyst certificate for 0 foundation
- NPDP|产品经理如何做到不会被程序员排斥?
- box-sizing
- [csnote] ER diagram
- 洛谷P3236 [HNOI2014]画框 题解
- 标签与路径
猜你喜欢

Synchronously update the newly added and edited data to the list

Labels and paths

No idle servers? Import OVF image to quickly experience smartx super fusion community version

使用Source Insight查看编辑源代码

SSM framework series - annotation development day2-2

22. Bracket generation

Record a website for querying compatibility, string Replaceall() compatibility error

风尚云网学习-h5的input:type属性的image属性

解决disagrees about version of symbol device_create

STM32 control stepper motor (ULN2003 + 28byj)
随机推荐
Ad20 supplementary note 3 - shortcut key + continuous update
22. 括号生成
BUUCTF WEB [BUUCTF 2018]Online Tool
Deploying MySQL in cloud native kubesphere
BaseRecyclerViewAdapterHelper 实现下拉刷新和上拉加载
甲辰篇 創世紀《「內元宇宙」聯載》
数据库中的日期时间类型
Customize classloader and implement hot deployment - use loadclass
【csnote】ER图
Use source insight to view and edit source code
Web17 -- use of El and JSTL
Idea的src子文件下无法创建servlet
Unable to create servlet under SRC subfile of idea
NPDP | how can product managers not be excluded by programmers?
mysql中 innoDB执行过程分析
Keyword interpretation and some APIs in RT thread
Customize the shortcut options in El date picker, and dynamically set the disabled date
Record the problems encountered in using v-print
Van uploader upload picture implementation process, using native input to upload pictures
BUUCTF WEB [BJDCTF2020]The mystery of ip