当前位置:网站首页>社区版阿里MQ普通消息发送订阅Demo
社区版阿里MQ普通消息发送订阅Demo
2022-04-23 06:15:00 【玩哈哈527】
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
前言
提示:以下是本篇文章正文内容,下面案例可供参考
一、发送普通消息
阿里云消息队列RocketMQ版提供三种方式来发送普通消息:同步发送、异步发送和单向(Oneway)发送。本文仅介绍同步发送,此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。。
import java.util.Date;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQProducer {
/** * 替换为您阿里云账号的AccessKey ID和AccessKey Secret。 */
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY"));
}
public static void main(String[] args) throws MQClientException {
/** *创建Producer,并开启消息轨迹。设置为您在阿里云消息队列RocketMQ版控制台创建的Group ID。 *如果不想开启消息轨迹,可以按照如下方式创建: *DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook()); */
DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);
/** *设置使用接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项,如果不开启消息轨迹功能,则运行不设置此项。 */
producer.setAccessChannel(AccessChannel.CLOUD);
/** *设置为您从阿里云消息队列RocketMQ版控制台获取的接入点信息,类似“http://MQ_INST_XXXX.aliyuncs.com:80”。 */
producer.setNamesrvAddr("YOUR ACCESS POINT");
producer.start();
for (int i = 0; i < 128; i++) {
try {
Message msg = new Message("YOUR TOPIC",
"YOUR MESSAGE TAG",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
//消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
System.out.println(new Date() + " Send mq message failed.");
e.printStackTrace();
}
}
//在应用退出前,销毁Producer对象。
//注意:如果不销毁也没有问题。
producer.shutdown();
}
}
二、订阅普通消息
订阅普通消息的方式仅此一种
import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
public class RocketMQPushConsumer {
/** * 替换为您阿里云账号的AccessKey ID和AccessKey Secret。 */
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY"));
}
public static void main(String[] args) throws MQClientException {
/** * 创建Consumer,并开启消息轨迹。设置为您在阿里云消息队列RocketMQ版控制台创建的Group ID。 * 如果不想开启消息轨迹,可以按照如下方式创建: * DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely()); */
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null);
//设置为阿里云消息队列RocketMQ版实例的接入点。
consumer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
//阿里云上消息轨迹需要设置为CLOUD方式,在使用云上消息轨迹的时候,需要设置此项,如果不开启消息轨迹功能,则运行不设置此项。
consumer.setAccessChannel(AccessChannel.CLOUD);
// 设置为您在阿里云消息队列RocketMQ版控制台上创建的Topic。
consumer.subscribe("YOUR TOPIC", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("Receive New Messages: %s %n", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
版权声明
本文为[玩哈哈527]所创,转载请带上原文链接,感谢
https://blog.csdn.net/qq_28058509/article/details/124324974
边栏推荐
- Chapter 4 pytoch data processing toolbox
- unhandled system error, NCCL version 2.7.8
- PyTorch 13. 嵌套函数和闭包(狗头)
- PyTorch 22. PyTorch常用代码段合集
- torch.where能否传递梯度
- Error in multi machine and multi card training
- “Expression #1 of SELECT list is not in GROUP BY clause and contains nonaggregated
- 面试总结之特征工程
- 多机多卡训练时的错误
- 美摄科技推出桌面端专业视频编辑解决方案——美映PC版
猜你喜欢
随机推荐
UEFI学习01-ARM AARCH64编译、ArmPlatformPriPeiCore(SEC)
SPI NAND FLASH小结
美摄科技受邀LVSon2020大会 分享《AI合成虚拟人物的技术框架与挑战》
基于51单片机的三路超声波测距系统(定时器方式测距)
美摄科技推出桌面端专业视频编辑解决方案——美映PC版
armv8m(cortex m33) MPU实战
swin transformer 转 onnx
AUTOSAR从入门到精通100讲(五十)-AUTOSAR 内存管理系列- ECU 抽象层和 MCAL 层
PyTorch 18. torch.backends.cudnn
基于51单片机的体脂检测系统设计(51+oled+hx711+us100)
PyTorch 17. GPU并发
unhandled system error, NCCL version 2.7.8
GIS实战应用案例100篇(五十一)-ArcGIS中根据指定的范围计算nc文件逐时次空间平均值的方法
Systrace parsing
《Multi-modal Visual Tracking:Review and Experimental Comparison》翻译
如何利用qemu搭建SOC protoype:80行代码实现一个Cortex M4 模拟器
CMSIS CM3源码注解
ARMCC/GCC下的stack protector
Detailed explanation of unwind stack backtracking
PyTorch 17. GPU concurrency