当前位置:网站首页>基于inotify实现落盘文件的跨进程实时读写交互
基于inotify实现落盘文件的跨进程实时读写交互
2022-08-10 14:25:00 【土豆西瓜大芝麻】
本文是在基于kafka原理单机高性能微秒级别队列改造_Sweet_Oranges的博客-CSDN博客基础上进行部分完善的。因为该文提供的代码不完整,无法使用。
场景:
线上业务需要一款拥有超低延迟(us),支持多消费者,并且能够处理海量的消息积压的消息队列。
调研:
- kafka是我们日常生活中比较常见的消息队列,非常适合做消息的离线处理。但是在一些实时性要求比较高的场景下,消息自带的延迟是不可忍受的,测试发现一条消息转发大概需要200ms的耗时,实际情况可能有所出入,但肯定是毫秒级别的。kafka从设计上就是倾向于面向大众,满足大部分需求。当然满足这些要求的成本就是通过牺牲了性能。所以说kafka适合做离线处理。而不是做一些非常实时的应用。
- zeromq就是针对实时应用的一款消息队列,提供了各个拓扑结构的链接方式,性能不错,但不足的是当消息积压有可能会写满内存。
- 市面上目前的消息队列都与我们的设计目标不符。
设计:
要想拥有微秒级别的延迟,
- 不能走网络,数据必须放本地
- 用磁盘以及顺序io来保证写入读取性能。生产者将数据以换行符(\n)append的形式写入文件(顺序写),消费者getline一直到文件尾(顺序读)
- 使用inotify等待新数据的产生
- 不适用get_line, fgets等库函数,减少数据拷贝,自己实现拆包逻辑
- 读取的buffer_size不能设置的过小,由于下游消费通常存在一定的耗时,我们尽量一次多读取一些,否则系统调用read的成本很高
- 当消息产生积压的情况,我们采用water_mark机制来自动调节每次数据读取的最大字节
实现:
#include <sys/inotify.h>
#include <sys/stat.h>
#include <unistd.h>
#include <iostream>
#include <stdio.h>
#include <fcntl.h>
#define EVENT_SIZE (sizeof(struct inotify_event))
#define BUF_LEN (10 * (EVENT_SIZE + FILENAME_MAX + 1))
using namespace std;
void cb_(char * line)//消费者的数据处理函数,回调函数
{
printf("%s",line);
}
int main()
{
int wartermark = 1024 * 1024;//用于控制一次读取数据的大小
int max_buffer_size = wartermark * 6;
std::string path_ = "/tmp/writter.txt";//生产者产生的数据的落盘文件
char *line_ = new char[10240]; // 每行缓冲区
char *buffer_ = new char[max_buffer_size]; // 文件缓冲区
char *buffer2_ = new char[BUF_LEN]; // inotify事件缓冲区
int read_ = -1;
// 注册监听文件变化
int ifd_ = inotify_init();
if (ifd_ < 0)
{
return;
}
inotify_add_watch(ifd_, path_.c_str(), IN_MODIFY | IN_CREATE | IN_DELETE);
// 打开文件
int fd_ = open(path_.c_str(), O_RDONLY);
if (fd_ < 0)
{
return;
}
// 读取文件
register long int buffer_size = wartermark;
register char *cs;
register int i = 0;
cs = line_;
bool ok_ = true;
while (ok_)
{
while ((read_ = read(fd_, buffer_, buffer_size)) != 0)
{
for (; i < read_; i++)
{
if ((*cs++ = buffer_[i]) != '\n')
{
continue;
}
//remove last \n
*(--cs) = '\0';
cb_(line_);
if (!ok_)
break;
cs = line_;
}
if (!ok_)
break;
if (read_ == wartermark)
{
buffer_size = max_buffer_size;
}
else
{
buffer_size = wartermark;
}
i = 0;
}
read(ifd_, buffer2_, BUF_LEN);//当没有事件时,程序阻塞在此处,直到有事件到来。
}
return 0;
}
消费者只需要将自己业务入口注册为cb_,就可以实现消费。
总结:
实际测试发现从消息生产到cb_入口消息延迟大概在100个us以内。以上代码虽然实现起来很简单,但是正是由于其简单才保证了超高的性能。
参考链接:https://blog.csdn.net/Sweet_Oranges/article/details/105428875
边栏推荐
- 池化技术有多牛?来,告诉你阿里的Druid为啥如此牛逼!
- Flask框架——基于Celery的后台任务
- How does IT Xiaobai learn PHP systematically
- 等保2.0一个中心三重防护指的是什么?如何理解?
- MySQL - storage engine for databases
- Send a post request at the front desk can't get the data
- Summary of tensorflow installation stepping on the pit
- 微信小程序,自定义输入框与导航胶囊对其
- 快速了解大端模式和小端模式
- 高数_证明_弧微分公式
猜你喜欢
How does IT Xiaobai learn PHP systematically
第三方软件测评有什么作用?权威软件检测机构推荐
PyTorch multi-machine multi-card training: DDP combat and skills
AWS Security Fundamentals
Flask框架——MongoEngine使用MongoDB数据库
面试面到了一个腾讯30k出来的,有见识到何为精通MySQL调优
基于ArcGIS水文分析、HEC-RAS模拟技术在洪水危险性及风险评估
使用mysq语句操作数据库
AWS 安全基础知识
王学岗—————————哔哩哔哩直播-手写哔哩哔哩硬编码录屏推流(硬编)(26节课)
随机推荐
安装mysql报错处理
“国资云”和“国家云”能给市场带来怎样的变革?
How is the monthly salary table stored in the database?Ask for a design idea
字节终面:CPU 是如何读写内存的?
tensorflow安装踩坑总结
正则表达式(包含各种括号,echo,正则三剑客以及各种正则工具)
vivado闪退或者message无显示
1W字详解线程本地存储 ThreadLocal
1004 (tree array + offline operation + discretization)
d为何用模板参数
awk的简单使用
【219】慕课三千多的那个go工程师的培训课笔记 02 go语言的编程思想
统信 UOS V20 专业版(1050update2)发布:文件共享、全局搜索等优化
ES5和SE6来实现一个Promise效果
【剑指offer】---数组中的重复数字
文件系统设计
2011年下半年 系统架构设计师 下午试卷 II
Circle 创始人回应美财政部禁止 Tornado :隐私与安全之间关系紧张
一种能让大型数据聚类快2000倍的方法,真不戳
数据产品经理那点事儿 二