当前位置:网站首页>Community version Alibaba MQ ordinary message sending subscription demo
Community version Alibaba MQ ordinary message sending subscription demo
2022-04-23 12:56:00 【Play ha ha 527】
Tips : When the article is finished , Directories can be generated automatically , How to generate it, please refer to the help document on the right
Community version Ali MQ Ordinary message sending subscription Demo
Preface
Tips : The following is the main body of this article , The following cases can be used for reference
One 、 Send a normal message
Alicloud message queue RocketMQ Version provides three ways to send ordinary messages : The synchronous 、 Asynchronous transmission and one-way transmission (Oneway) send out . This article only introduces synchronous transmission , This method has a wide range of application scenarios , For example, important notice email 、 Sign up for SMS notification 、 Marketing SMS system, etc ..
import java.util.Date;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQProducer {
/** * Replace with your alicloud account AccessKey ID and AccessKey Secret. */
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY"));
}
public static void main(String[] args) throws MQClientException {
/** * establish Producer, And turn on the message track . It is set to you in Alibaba cloud message queue RocketMQ Created by version console Group ID. * If you don't want to turn on the message track , You can create : *DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook()); */
DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);
/** * Set the access method to Alibaba cloud , When using message tracks on the cloud , You need to set this , If you do not turn on the message trace function , This item is not set when running . */
producer.setAccessChannel(AccessChannel.CLOUD);
/** * It is set to you from Alibaba cloud message queue RocketMQ The access point information obtained from the console , similar “http://MQ_INST_XXXX.aliyuncs.com:80”. */
producer.setNamesrvAddr("YOUR ACCESS POINT");
producer.start();
for (int i = 0; i < 128; i++) {
try {
Message msg = new Message("YOUR TOPIC",
"YOUR MESSAGE TAG",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
// Message delivery failed , Need to retry processing , You can resend the message or persist the data for compensation processing .
System.out.println(new Date() + " Send mq message failed.");
e.printStackTrace();
}
}
// Before the app exits , The destruction Producer object .
// Be careful : If you don't destroy it, there's no problem .
producer.shutdown();
}
}
Two 、 Subscribe to general news
There is only one way to subscribe to ordinary messages
import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
public class RocketMQPushConsumer {
/** * Replace with your alicloud account AccessKey ID and AccessKey Secret. */
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY"));
}
public static void main(String[] args) throws MQClientException {
/** * establish Consumer, And turn on the message track . It is set to you in Alibaba cloud message queue RocketMQ Created by version console Group ID. * If you don't want to turn on the message track , You can create : * DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely()); */
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null);
// Set as alicloud message queue RocketMQ Access point of version instance .
consumer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
// The message track on Alibaba cloud needs to be set to CLOUD The way , When using message tracks on the cloud , You need to set this , If you do not turn on the message trace function , This item is not set when running .
consumer.setAccessChannel(AccessChannel.CLOUD);
// It is set to you in Alibaba cloud message queue RocketMQ Created on the console Topic.
consumer.subscribe("YOUR TOPIC", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("Receive New Messages: %s %n", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
版权声明
本文为[Play ha ha 527]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204230615230965.html
边栏推荐
- Servlet监听器&过滤器介绍
- 洛谷P3236 [HNOI2014]画框 题解
- SSL certificate refund instructions
- Zigbee之CC2530最小系统及寄存器配置(1)
- Free and open source intelligent charging pile SaaS cloud platform of Internet of things
- MySQL supports IP access
- 4. DRF permission & access frequency & filtering & sorting
- NPDP | how can product managers not be excluded by programmers?
- Stm32cubeprogrammer basic instructions
- How to click an object to play an animation
猜你喜欢

云原生KubeSphere部署Mysql

Recovering data with MySQL binlog

Pytorch: a pit about the implementation of gradreverselayer

解锁OpenHarmony技术日!年度盛会,即将揭幕!

World Book Day: I'd like to recommend these books

Trier les principales utilisations de l'Agent IP réseau

实现一个盒子在父盒子中水平垂直居中的几种“姿势”

The quill editor image zooms, multiple rich text boxes are used on one page, and the quill editor upload image address is the server address

CVPR 2022 & ntire 2022 | the first transformer for hyperspectral image reconstruction

Wonderful review | the sixth issue of "source" - open source economy and industrial investment
随机推荐
大家帮我看一下这是啥情况,MySQL5.5的。谢了
拥抱机器视觉新蓝海,冀为好望开启数字经济发展新“冀”遇
Aviation core technology sharing | overview of safety characteristics of acm32 MCU
Object. The disorder of key value array after keys
Luogu p3236 [hnoi2014] picture frame solution
Unlock openharmony technology day! The annual event is about to open!
Luogu p5540 [balkanoi2011] timeismoney | minimum product spanning tree problem solution
Van uploader upload picture implementation process, using native input to upload pictures
Fashion cloud learning - input attribute summary
Kubernets Getting started tutoriel
[daily question] chessboard question
Bert base Chinese Download (SMART)
leetcode-791. Custom string sorting
Introduction to servlet listener & filter
Wonderful review | the sixth issue of "source" - open source economy and industrial investment
Zero trust in network information security
Baserecyclerviewadapterhelper realizes pull-down refresh and pull-up loading
Summary of JVM knowledge points - continuously updated
Softbank vision fund entered the Web3 security industry and led a new round of investment of US $60 million in certik
Customize the shortcut options in El date picker, and dynamically set the disabled date