当前位置:网站首页>基于inotify实现落盘文件的跨进程实时读写交互
基于inotify实现落盘文件的跨进程实时读写交互
2022-08-10 14:25:00 【土豆西瓜大芝麻】
本文是在基于kafka原理单机高性能微秒级别队列改造_Sweet_Oranges的博客-CSDN博客基础上进行部分完善的。因为该文提供的代码不完整,无法使用。
场景:
线上业务需要一款拥有超低延迟(us),支持多消费者,并且能够处理海量的消息积压的消息队列。
调研:
- kafka是我们日常生活中比较常见的消息队列,非常适合做消息的离线处理。但是在一些实时性要求比较高的场景下,消息自带的延迟是不可忍受的,测试发现一条消息转发大概需要200ms的耗时,实际情况可能有所出入,但肯定是毫秒级别的。kafka从设计上就是倾向于面向大众,满足大部分需求。当然满足这些要求的成本就是通过牺牲了性能。所以说kafka适合做离线处理。而不是做一些非常实时的应用。
- zeromq就是针对实时应用的一款消息队列,提供了各个拓扑结构的链接方式,性能不错,但不足的是当消息积压有可能会写满内存。
- 市面上目前的消息队列都与我们的设计目标不符。
设计:
要想拥有微秒级别的延迟,
- 不能走网络,数据必须放本地
- 用磁盘以及顺序io来保证写入读取性能。生产者将数据以换行符(\n)append的形式写入文件(顺序写),消费者getline一直到文件尾(顺序读)
- 使用inotify等待新数据的产生
- 不适用get_line, fgets等库函数,减少数据拷贝,自己实现拆包逻辑
- 读取的buffer_size不能设置的过小,由于下游消费通常存在一定的耗时,我们尽量一次多读取一些,否则系统调用read的成本很高
- 当消息产生积压的情况,我们采用water_mark机制来自动调节每次数据读取的最大字节
实现:
#include <sys/inotify.h>
#include <sys/stat.h>
#include <unistd.h>
#include <iostream>
#include <stdio.h>
#include <fcntl.h>
#define EVENT_SIZE (sizeof(struct inotify_event))
#define BUF_LEN (10 * (EVENT_SIZE + FILENAME_MAX + 1))
using namespace std;
void cb_(char * line)//消费者的数据处理函数,回调函数
{
printf("%s",line);
}
int main()
{
int wartermark = 1024 * 1024;//用于控制一次读取数据的大小
int max_buffer_size = wartermark * 6;
std::string path_ = "/tmp/writter.txt";//生产者产生的数据的落盘文件
char *line_ = new char[10240]; // 每行缓冲区
char *buffer_ = new char[max_buffer_size]; // 文件缓冲区
char *buffer2_ = new char[BUF_LEN]; // inotify事件缓冲区
int read_ = -1;
// 注册监听文件变化
int ifd_ = inotify_init();
if (ifd_ < 0)
{
return;
}
inotify_add_watch(ifd_, path_.c_str(), IN_MODIFY | IN_CREATE | IN_DELETE);
// 打开文件
int fd_ = open(path_.c_str(), O_RDONLY);
if (fd_ < 0)
{
return;
}
// 读取文件
register long int buffer_size = wartermark;
register char *cs;
register int i = 0;
cs = line_;
bool ok_ = true;
while (ok_)
{
while ((read_ = read(fd_, buffer_, buffer_size)) != 0)
{
for (; i < read_; i++)
{
if ((*cs++ = buffer_[i]) != '\n')
{
continue;
}
//remove last \n
*(--cs) = '\0';
cb_(line_);
if (!ok_)
break;
cs = line_;
}
if (!ok_)
break;
if (read_ == wartermark)
{
buffer_size = max_buffer_size;
}
else
{
buffer_size = wartermark;
}
i = 0;
}
read(ifd_, buffer2_, BUF_LEN);//当没有事件时,程序阻塞在此处,直到有事件到来。
}
return 0;
}
消费者只需要将自己业务入口注册为cb_,就可以实现消费。
总结:
实际测试发现从消息生产到cb_入口消息延迟大概在100个us以内。以上代码虽然实现起来很简单,但是正是由于其简单才保证了超高的性能。
参考链接:https://blog.csdn.net/Sweet_Oranges/article/details/105428875
边栏推荐
- The a-modal in the antd component is set to a fixed height, and the content is scrolled and displayed
- MySQL - 数据库的存储引擎
- ES5和SE6来实现一个Promise效果
- 高薪程序员&面试题精讲系列135之你对分布式是怎么理解的?CAP理论你知道吗?
- 2022年网络安全培训火了,缺口达95%,揭开网络安全岗位神秘面纱
- 缺少比较器,运放来救场!(运放当做比较器电路记录)
- Stream通过findFirst()查找满足条件的一条数据
- Send a post request at the front desk can't get the data
- 系统架构系列文章三--解决传统企业核心系统的性能问题
- MySQL interview questions
猜你喜欢
Error: Rule can only have one resource source (provided resource and test + include + exclude)
Existing in the rain of PFAS chemical poses a threat to the safety of drinking water
Using data intelligence, Amazon cloud technology helps companies build endogenous brand growth
产品使用说明书小程序开发制作说明
Do not access Object.prototype method ‘hasOwnProperty‘ from target object....
线上线下课程教学培训小程序开发制作功能介绍
符合信创要求的堡垒机有哪些?支持哪些系统?
Flask框架——基于Celery的后台任务
开源SPL消灭数以万计的数据库中间表
2022年中国软饮料市场洞察
随机推荐
Stream通过findFirst()查找满足条件的一条数据
malloc 函数详解
微信小程序,自定义输入框与导航胶囊对其
串口服务器调试助手使用教程,串口调试助手使用教程【操作方式】
[Gazebo Introductory Tutorial] Lecture 3 Static/Dynamic Programming Modeling of SDF Files
2022-08-10 Daily: Swin Transformer author Cao Yue joins Zhiyuan to carry out research on basic vision models
Pointer (preliminary solution of C language)
学习MySQL 临时表
“国资云”和“国家云”能给市场带来怎样的变革?
Existing in the rain of PFAS chemical poses a threat to the safety of drinking water
Alibaba的秒杀系统—千亿级并发设计手册上线了
MySQL - storage engine for databases
tensorflow安装踩坑总结
2012年下半年 系统架构设计师 下午试卷 II
C#实现访问OPC UA服务器
FPN详解
统信 UOS V20 专业版(1050update2)发布:文件共享、全局搜索等优化
缺少比较器,运放来救场!(运放当做比较器电路记录)
高薪程序员&面试题精讲系列135之你对分布式是怎么理解的?CAP理论你知道吗?
日志@Slf4j介绍使用及配置等级