当前位置:网站首页>Redis - 时间序列数据类型的保存方案和消息队列实现
Redis - 时间序列数据类型的保存方案和消息队列实现
2022-08-09 02:21:00 【Zong_0915】
Redis - 时间序列数据类型的保存方案和消息队列实现
一. 用 Redis 保存时间序列类型数据方案
我们日常开发中,有很多这种类似的场景,记录某一个时刻下,某个目标的相关属性或者状态。 那么常规的,我们可以用时间戳作为Key
,而这个目标的相关属性我们可以将其转化为JSON
串或者通过字符串拼接的方式来保存为String
类型,然后作为Value
。
但是,这样的存储是针对于一个时间戳而言的,而实际环境中,往往需要记录非常多的这样的数据,甚至可能需要对这种数据进行统计、聚合、范围查找等操作。那么可想而知,Redis
中的String
类型,虽然可以提供存储功能,但是却难以提供统计、聚合、范围查找等这些复杂操作。 除此之外,String
类型我在String内存开销问题以及基本/扩展数据类型的使用这篇文章有提到,如果存储的数据量太大,那么内存的占用是非常庞大的。耗内存。
同时用时间戳作为Key
的数据,往往也有以下特点:
- 数据的插入比较频繁。
- 读操作的查询模式的种类比较多(统计、聚合…)。
1.1 内存和范围查找的支持性问题
场景:按照一定的时间间隔记录某一个设备集群中每台机器的温度。
针对内存占用问题,我们可以选择Hash
类型去替代String
。
# 传统的String类型存储
set 1659947381000 31
set 1659947392000 35
set 1659947413000 28
# 改为Hash存储
set temperature 1659947381000 31 1659947392000 35 1659947413000 28
虽然Hash
结构弥补了String
类型在内存开销上的短板。但是仅仅这样是无法满足数据的一些范围查询或者操作的。
那么针对范围查找问题:因此我们可以再使用Sorted Set
去保存相同的一份数据。这里做个解释,为何要同时用两个数据结构来存储相同的数据:
Hash
结构:用来提供单值查询。Sorted Set
:提供范围查询。虽然将范围的前后指定为一个相同的值,看起来像是单个值查询,但是本质上依旧是范围查询,效率不如人家Hash
来的快。
那么此时就可以用这样的命令来进行查找:
zrangebyscore temperature 1659947381000 1659947413000
此外,我们既然使用了两种数据结构来保存相同的数据,就应该保证数据一致性。我们应该保证两个数据结构中的数据是完全一样的,不能出现哪个结构中的数据有少或者不一致的情况。因此我们可以在进行数据插入的时候,保障两个操作的原子性。即使用简单的事务操作:
multi
:事务开始。之后的操作将会放入一个队列中,而不会真正的去执行。exec
:事务结束,开始执行队列中的一系列命令。
例如:
multi
hset temperature 1659947381000 25
zadd temperature 1659947381000 25
exec
那么Java
对应的操作就是:
Transaction multi = jedis.multi();
multi.hset("temperature", "1659947381000", "25");
System.out.println("事务执行中:hash:" + multi.hget("temperature", "1659947381000"));
multi.zadd("temperatureZSet", 1659947381000L, "25");
System.out.println("事务执行中:sorted set:" + multi.zrangeByScore("temperatureZSet", 1659947381000L, 1659947381000L));
multi.exec();
System.out.println("**************事务执行完毕******************");
System.out.println("事务执行中:hash:" + jedis.hget("temperature", "1659947381000"));
System.out.println("事务执行中:sorted set:" + jedis.zrangeByScore("temperatureZSet", 1659947341000L, 1659947392000L));
注意:
hash
和sorted set
两个集合使用的key
不能是同一个。- 并且事务中不能使用普通的
jedis
对象。multi
对象拥有和jedis
对象同样的API
操作。因为Redis
中事务开启后,执行的操作是放到队列中的,并不是马上执行的,因此需要做区分。
结果如下:
到这里,内存问题和范围查找的问题已经解决了。虽然我们用了两个数据类型来保存相同的一份数据,但是整体的内存消耗,是比全部用String
类型存储要节省的。那么接下来要解决的,就是聚合操作问题。
备注:
- 到这里为止,如果业务上只涉及到时序的范围查找,是可以同时用
Hash
和Sorted Set
去替代传统的String
的。如果仅仅限于此,我个人建议1.2节可以不看。 Redis
中对于事务的使用,在文章中提到的原子性问题也是有一定缺陷的。因为Redis
中的事务并不像Mysql
那样,倘若在一个事务中,先后执行了A和B操作,但是在执行C操作的时候发生了错误,A和B的操作是不会回滚的。Redis
主要还是拿来做缓存比较多,这种专门的时序数据处理最好交给专门的时序数据库处理,例如influxDB
。- 1.2节内容仅供参考,并且实用性和实际操作起来是否简单这个问题上,有待商榷,因为并不容易实现。(至少我写这篇文章的时候,关于
RedisTimeSeries
的JavaAPI
操作没有找到)
1.2 聚合操作的支持性问题(仅供参考)
首先,我们当然可以在客户端将相关的数据全部读取过来,然后再客户端自行完成聚合操作。但是倘若有这么几个点:
- 数据量很大。
- 聚合操作的频率很高。
那么这种情况下,就会有很多请求(包含了大量数据)在Redis
和客户端之间来回穿梭,就会造成资源的竞争,降低Redis
的性能。
那么针对聚合操作问题:我们可以使用RedisTimeSeries
,它支持在Redis
实例上对时间维度进行聚合计算。
但是使用这个,却比较麻烦,需要了解这么几个点:
RedisTimeSeries
是Redis
的扩展模块,原生Redis
并不支持。- 使用的时候需要将
Redis
源码单独编译成动态链接库redistimeseries.so
,再使用loadmodule
命令进行加载。
loadmodule redistimeseries.so
那么针对上述的聚合场景,使用RedisTimeSeries
的大致流程如下:
# 创建时间序列数据集合,创建一个key为temperature ,数据的有效期为800s。(过后会自动删除),同时这个集合的标签属性uid为1
ts.create temperature retention 800000 labels uid 1
# 插入数据
ts.add temperature 1659947381000 25
# 最新数据的获取 ts.get只能返回最新的数据
ts.get temperature
# 按照标签过滤查询
ts.mget FILTER uid = xxx
# 聚合计算 在[1659947371000 ,1659947381000]范围内,按照每180s的时间间隔,对这个时间窗口内的数据做均值计算
ts.range temperature 1659947371000 1659947381000 AGGREGATION avg 180000
二. 用 Redis 实现消息队列
前言:Redis
是可以做消息队列的,但是对于一些不允许出现消息丢失的情形下,例如金融支付操作。不要用Redis
作为中间件,请使用专门的中间件去做存储。例如Kafka、ActiveMQ、RabbitMQ
等。具体原因下面分析。
首先消息队列需要解决三个问题:
- 消息保序。
- 消息的重复消费问题。
- 消息的可靠性保证。
2.1 消息保序的实现
那么如何用Redis
作为消息队列呢?利用Redis
中的List
数据结构。
List
这个数据结构本身就是FIFO
,先进先出的顺序对数据进行存储的。- 实际操作上,生产者通过
lpush
命令将数据写入List
中。消费者端则通过rpop
命令将其弹出。
这是一般的操作。但是光凭这样的操作并不满足一个合格的消息中间件具备的条件。因为在生产者向Redis
中写入数据的时候,Redis
并不会主动地通知消费者有新消息写入了。此时消费者只能通过这样的伪代码来实现轮询:
while(true){
String json = jedis.rpop('key');
process(json);
}
问题:这样的无限循环,会导致CPU
一直消耗在这里执行rpop
命令。造成性能损失。
解决:建议使用brpop
命令,即阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。
2.2 重复消费问题解决
对于消息的重复消费问题,我们只需要提供一个唯一标识,然后消费的时候做判断即可。
- 生产者端:发送消息的时候,给消息里面塞一个唯一标识。
- 消费者端:将消费完成的消息的唯一标识记录下来。在后续消费的时候,都要反查一遍先。
# 唯一标识:主题:内容
lpush key 1000001:title:helloworld
2.3 消息可靠性保证
背景:当消费者程序从Redis
中读取一条消息并做处理,但是还没处理完成的时候就发生了宕机,那么Redis
中这条数据已经被剔除,但这个数据并没有被真正的消费掉。怎么办?
解决:生产者在推消息给Redis
的时候,使用 BRPOPLPUSH
命令,其作用如下:
- 在生产者推消息的时候,
Redis
会把这个消息再插入到另一个List
留存。 - 这样一来,如果消费者程序读了消息但没能正常处理,等它重启后,就可以从备份
List
中重新读取消息并处理。
综上所述,常规情况下:
- 生产者端使用
BRPOPLPUSH
命令往Redis
中推数据,同时塞入唯一标识。 - 消费者端使用
brpop
命令。防止无限循环调用rpop()
命令。将消费过的消息的唯一标识做数据存储。 - 消费者倘若消费某个消息成功,由于生产者端往两个
List
都插入了数据,此时最好将备份队列中的消息删除,避免备份队列中存储过多过期数据,造成内存浪费。
2.4 Redis 做中间件的优劣势
先来说下Redis
做中间件的优势:
- 用
Redis
作为消息队列,由于Redis
的特性,在内存上操作,因此性能高。 API
操作起来非常方便,没有复杂的操作,部署轻量。Kafka
的操作相比之下就会复杂许多。维护成本也要更高点。
Redis
做中间件的劣势:可能出现数据丢失。 有这么个几个场景:
AOF
策略为每秒写盘。该过程为异步,若Redis
发生宕机,会丢失1秒的数据。若改为同步写盘,则会导致性能下降。- 在主从集群下,倘若写操作的频率非常大,那么主从的数据同步就会存在延迟,那么在进行主从切换的时候,也可能存在数据丢失问题。详细可以看Redis - Redis主从数据一致性和哨兵机制。
- 无法保证数据的完整性,而像Kafka这样的专业中间件,副本等机制保证了数据的可靠性。哪怕集群的某个节点挂掉了,也不会丢失数据。详细可以参考Kafka复习计划 - Kafka基础知识以及集群参方案和参数。
边栏推荐
- [C language brush questions] Application of fast and slow pointers in linked lists
- OpenMLDB + Jupyter Notebook:快速搭建机器学习应用
- 2020.10.13 Development log
- JDBC技术(一)——一个简单的JDBC测试
- Likou Brush Question Record--Common Functions
- Latex示例参考
- Z-Game on grid
- HCIP-R&S By Wakin自用笔记(3)OSPF之各类LSA及LSA更新规则
- 如何保护智能家居避免黑客攻击
- 力扣刷题记录5.1-----59. 螺旋矩阵 II
猜你喜欢
How js implements array deduplication (7 kinds)
mysql连接超过八小时报错
Group DETR:分组一对多匹配是加速DETR收敛的关键
Data recovery software EasyRecovery supports recovery of all types of files
HCIP-R&S By Wakin自用笔记(3)OSPF之各类LSA及LSA更新规则
MT4 / MQ4L entry to the master of EA tutorial lesson two (2) - - MQL language commonly used function account information commonly used functions
[C language brush questions] Application of fast and slow pointers in linked lists
企业从云服务的承诺支出中获得最大收益的四种方法
JDBC技术(二)——设置通用的sql和配置文件
The 7 taboos of time management summarized by the postgraduate students, how many have you won?
随机推荐
Latex示例参考
嵌入式设备驱动开发
Likou Brush Question Record--Common Functions
JDBC技术(一)——一个简单的JDBC测试
Which is the best increased whole life insurance?Is it really safe?
显著性检验--学习笔记
力扣刷题记录7.1-----707. 设计链表
SQLite切换日志模式优化
18.flink Table/Sql API之 catlog
【AspNetCore】实现JWT(使用Microsoft.AspNetCore.Authentication.JwtBearer)
JDBC technology (1) - a simple JDBC test
2022 Eye Care Products Exhibition, Beijing Eye Health Exhibition, Ophthalmology Exhibition, Myopia Correction Equipment Exhibition
数据库设计的总结
The last exam before the NPDP revision!caution
The most fierce "employee" in history, madly complaining about the billionaire boss Xiao Zha: So rich, he always wears the same clothes!
Duplicate class com.google.common.util.concurrent.ListenableFuture found in modules
MAYA发动机建模
JDBC technology (3) - use Druid database connection pool test
MT4/MQL4入门到精通EA教程第一课-MQL语言常用函数(一)OrderSend()函数
How to play knowledge graph in recommender system