当前位置:网站首页>基于inotify实现落盘文件的跨进程实时读写交互
基于inotify实现落盘文件的跨进程实时读写交互
2022-08-10 14:25:00 【土豆西瓜大芝麻】
本文是在基于kafka原理单机高性能微秒级别队列改造_Sweet_Oranges的博客-CSDN博客基础上进行部分完善的。因为该文提供的代码不完整,无法使用。
场景:
线上业务需要一款拥有超低延迟(us),支持多消费者,并且能够处理海量的消息积压的消息队列。
调研:
- kafka是我们日常生活中比较常见的消息队列,非常适合做消息的离线处理。但是在一些实时性要求比较高的场景下,消息自带的延迟是不可忍受的,测试发现一条消息转发大概需要200ms的耗时,实际情况可能有所出入,但肯定是毫秒级别的。kafka从设计上就是倾向于面向大众,满足大部分需求。当然满足这些要求的成本就是通过牺牲了性能。所以说kafka适合做离线处理。而不是做一些非常实时的应用。
- zeromq就是针对实时应用的一款消息队列,提供了各个拓扑结构的链接方式,性能不错,但不足的是当消息积压有可能会写满内存。
- 市面上目前的消息队列都与我们的设计目标不符。
设计:
要想拥有微秒级别的延迟,
- 不能走网络,数据必须放本地
- 用磁盘以及顺序io来保证写入读取性能。生产者将数据以换行符(\n)append的形式写入文件(顺序写),消费者getline一直到文件尾(顺序读)
- 使用inotify等待新数据的产生
- 不适用get_line, fgets等库函数,减少数据拷贝,自己实现拆包逻辑
- 读取的buffer_size不能设置的过小,由于下游消费通常存在一定的耗时,我们尽量一次多读取一些,否则系统调用read的成本很高
- 当消息产生积压的情况,我们采用water_mark机制来自动调节每次数据读取的最大字节
实现:
#include <sys/inotify.h>
#include <sys/stat.h>
#include <unistd.h>
#include <iostream>
#include <stdio.h>
#include <fcntl.h>
#define EVENT_SIZE (sizeof(struct inotify_event))
#define BUF_LEN (10 * (EVENT_SIZE + FILENAME_MAX + 1))
using namespace std;
void cb_(char * line)//消费者的数据处理函数,回调函数
{
printf("%s",line);
}
int main()
{
int wartermark = 1024 * 1024;//用于控制一次读取数据的大小
int max_buffer_size = wartermark * 6;
std::string path_ = "/tmp/writter.txt";//生产者产生的数据的落盘文件
char *line_ = new char[10240]; // 每行缓冲区
char *buffer_ = new char[max_buffer_size]; // 文件缓冲区
char *buffer2_ = new char[BUF_LEN]; // inotify事件缓冲区
int read_ = -1;
// 注册监听文件变化
int ifd_ = inotify_init();
if (ifd_ < 0)
{
return;
}
inotify_add_watch(ifd_, path_.c_str(), IN_MODIFY | IN_CREATE | IN_DELETE);
// 打开文件
int fd_ = open(path_.c_str(), O_RDONLY);
if (fd_ < 0)
{
return;
}
// 读取文件
register long int buffer_size = wartermark;
register char *cs;
register int i = 0;
cs = line_;
bool ok_ = true;
while (ok_)
{
while ((read_ = read(fd_, buffer_, buffer_size)) != 0)
{
for (; i < read_; i++)
{
if ((*cs++ = buffer_[i]) != '\n')
{
continue;
}
//remove last \n
*(--cs) = '\0';
cb_(line_);
if (!ok_)
break;
cs = line_;
}
if (!ok_)
break;
if (read_ == wartermark)
{
buffer_size = max_buffer_size;
}
else
{
buffer_size = wartermark;
}
i = 0;
}
read(ifd_, buffer2_, BUF_LEN);//当没有事件时,程序阻塞在此处,直到有事件到来。
}
return 0;
}
消费者只需要将自己业务入口注册为cb_,就可以实现消费。
总结:
实际测试发现从消息生产到cb_入口消息延迟大概在100个us以内。以上代码虽然实现起来很简单,但是正是由于其简单才保证了超高的性能。
参考链接:https://blog.csdn.net/Sweet_Oranges/article/details/105428875
边栏推荐
- 2022年网络安全培训火了,缺口达95%,揭开网络安全岗位神秘面纱
- 强意识 压责任 安全培训筑牢生产屏障
- A method that can make large data clustering 2000 times faster
- In the second half of 2012 system architecture designers afternoon paper II
- 安装mysql报错处理
- Makefile missing separator. Stop.怎么解决「建议收藏」
- 静态变量存储在哪个区
- 缺少比较器,运放来救场!(运放当做比较器电路记录)
- 领域驱动实践总结(基本理论总结与分析V+架构分析与代码设计+具体应用设计分析)
- d为何用模板参数
猜你喜欢
Alibaba的秒杀系统—千亿级并发设计手册上线了
中学数学建模书籍及相关的视频等(2022.08.09)
Existing in the rain of PFAS chemical poses a threat to the safety of drinking water
使用mysq语句操作数据库
C#实现访问OPC UA服务器
Error: Rule can only have one resource source (provided resource and test + include + exclude)
面试面到了一个腾讯30k出来的,有见识到何为精通MySQL调优
第三方软件测评有什么作用?权威软件检测机构推荐
AWS 安全基础知识
win2012安装Oraclerac失败
随机推荐
win2012安装Oraclerac失败
八大排序总是忘?快来这里~
Open source SPL wipes out tens of thousands of database intermediate tables
奢侈品鉴定机构小程序开发制作功能介绍
file system design
PyTorch multi-machine multi-card training: DDP combat and skills
How to code like a pro in 2022 and avoid If-Else
易观分析联合中小银行联盟发布海南数字经济指数,敬请期待!
Error: Rule can only have one resource source (provided resource and test + include + exclude)
PEST 分析法
ICML 2022 | 基于随机注意力机制的可解释可泛化图学习
High-paid programmers & interview questions series 135 How do you understand distributed?Do you know CAP theory?
2022年网络安全培训火了,缺口达95%,揭开网络安全岗位神秘面纱
记录几道整型提升的题目
BCG库简介
[219] The training course notes of the go engineer with more than 3,000 MOOCs 02 Programming ideas in the go language
Classifying irises using decision trees
leetcode 739. Daily Temperatures Daily Temperatures (Moderate)
[JS Advanced] Creating sub-objects and replacing this_10 in ES5 standard specification
FPN详解