当前位置:网站首页>共享内存+inotify机制实现多进程低延迟数据共享
共享内存+inotify机制实现多进程低延迟数据共享
2022-08-05 05:56:00 【土豆西瓜大芝麻】
本文是对共享内存实现多进程低延迟队列 10us_Sweet_Oranges的博客-CSDN博客的部分修正。
起因
之前的博客写过通过“inotify +file”的形式来实现多进程队列(跨进程共享)的文章。这种方式在通常情况下表现不错,但是这里存在一个问题就是“当消费者过慢,会产生大量的击穿内核高速缓冲区io,导致消费者卡在读取数据的瓶颈上,无法使用负载均衡等手段来提高处理能力。”
为了解决上述问题,引入了共享内存,众所周知,这是所有ipc中最快的通信方式,从根本上解决这个问题。下面通过实现一个producer 和 consumer 程序,来展示我的设计思路。
生产者
由于物理内存有限,生产者会使用一个环形缓冲区来保证热点数据始终在内存中(类似A/B缓存这个长度为2的最小环形队列一样)。
同时为了保证消费者的接入配置最小化,生产者将配置通过一个固定大小的结构体映射到内存中(类似我们的A/B缓冲区,前面加了一个header,header中放元信息,而A/B缓冲区放树)。消费者首先映射结构体读取配置信息,从结构体中的得知缓冲区大小后执行mremap进行重新调整大小,这样消费者只需要知道共享内存的地址(一个文件名),就可以实现消费(我们的A/B缓冲区不存在重新调整大小的情况,因为已经将数据源单帧数据大小写到配置文件中了)。同时采用了消息计数,来标识消费者是否已经处理所有消息,触发等待。当新数据到达后,唤醒消费者。我们选择了通过向指定文件写入一个字节的内容触发inotify,虽然通过信号量也可以实现,但是使用信号量会导致生产者要多开一个线程实现管理,引入额外的复杂度。
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/inotify.h>
#include <functional>
#include <unistd.h>
#include <cstring>
#include <string>
#include <sys/mman.h>
#include <sys/time.h>
#include <iostream>
#include <semaphore.h>
#include <gflags/gflags.h>
DEFINE_int64(shm_size, 6, "shm_size m");//gflag中的内容
DEFINE_string(inotify_file, "/tmp/writer.txt", "inotify file path");
DEFINE_string(shm_file, "test", "shm file path");
DEFINE_string(shm_key, "", "shm key");
using namespace std;
class Producer
{
public:
Producer(const std::string &inotify_path, const std::string &shm_path) : inotify_path_(inotify_path), shm_path_(shm_path)
{
shm_size_ = FLAGS_shm_size * 1024 * 1024; // 1g; // 1g
}
bool Init(const std::string &key)
{
fd_ = open(inotify_path_.c_str(), O_WRONLY | O_APPEND | O_CREAT | O_TRUNC, 0644);
if (fd_ < 0)
{
printf("1. open inotify path failed\n");
return false;
}
else
{
printf("1. open inotify path successed\n");
}
// 打开共享内存
shm_fd_ = shm_open(shm_path_.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0777);
if (shm_fd_ < 0)
{
printf("2. shm_open failed\n");
return false;
}
else
{
printf("2. open shm path successed\n");
}
uint64_t size = shm_size_ + sizeof(SHM_Data);
printf("size = %ld\n", size);
if (ftruncate(shm_fd_, size) == -1)
{
printf("3. ftruncate failed\n");
return false;
}
else
{
printf("3. ftruncate successed\n");
}
shm_data_ = (SHM_Data *)mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd_, 0);
if (shm_data_ == MAP_FAILED)
{
printf("4. mmap failed\n");
return false;
}
else
{
printf("4. mmap successed\n");
}
shm_data_->total = 0;
shm_data_->size = shm_size_;
memcpy(shm_data_->inotify_name, inotify_path_.c_str(), inotify_path_.size());
memcpy(shm_data_->key, key.c_str(), key.size());
return true;
}
void Write(const char *line)
{
for (int i = 0; line[i] != '\0'; i++)
{
if (current_offset_ >= shm_size_)
{
current_offset_ = 0;
}
shm_data_->buffer[current_offset_++] = line[i];
}
if (current_offset_ >= shm_size_)
{
current_offset_ = 0;
}
shm_data_->buffer[current_offset_++] = '\0';
shm_data_->total++;
struct timeval tv;
struct timezone tz;
write(fd_, "8", 1);
gettimeofday(&tv, &tz);
//std::cout << "second : \t" << tv.tv_sec << std::endl; //秒
std::cout <<count++ << " second : \t" << tv.tv_sec * 1000 +tv.tv_usec/1000<<"." << tv.tv_usec%1000 << std::endl; // 微秒
}
private:
struct SHM_Data
{
uint64_t total; // 记录消息总数
char inotify_name[512]; // inotify 文件名
char key[64]; // 当前数据标识
uint64_t size; // 环形缓冲区大小
char buffer[]; // 环形缓冲区
};
SHM_Data *shm_data_ = nullptr; // 共享内存
int fd_;
int shm_fd_;
uint64_t shm_size_ = 0;
uint64_t buffer_size_ = 0;
uint64_t total_read = 0;
uint64_t current_offset_ = 0;
int count = 0;//tmp
std::string inotify_path_;
std::string shm_path_;
};
int main(int argc, char *argv[])
{
char line[10] = "123456789";
gflags::ParseCommandLineFlags(&argc, &argv, true);
Producer producer(FLAGS_inotify_file, FLAGS_shm_file);
producer.Init(FLAGS_shm_key);//FLAGS开头的这些指的是用户命令行输入的,如果不输入,默认就是空字符串
for(int i =0;i<200;i++)//测试两百次
{
producer.Write(line);
sleep(1);
}
// producer.Write(line);
std::cout << "write finished" << std::endl;
}消费者
消费者实现就相对简单一些,读取配置结构体(header),执行mremap调整大小。 如果机器性能足够,可以选择不等待inotify,类似自旋锁的方式。这种方式测试发现新消息能在10us左右被消费者感知,使用inoitfy新消息感知需要40us左右。
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/inotify.h>
#include <functional>
#include <unistd.h>
#include <cstring>
#include <string>
#include <iostream>
#include <sys/mman.h>
#include <sys/time.h>
#include <semaphore.h>
#include <gflags/gflags.h>
DEFINE_string(shm_file, "test", "shm file path");
DEFINE_bool(shm_nowait, false, "shm no wait mode");
#define EVENT_SIZE (sizeof(struct inotify_event))
#define BUF_LEN (10 * (EVENT_SIZE + FILENAME_MAX + 1))
using namespace std;
class Consumer
{
public:
Consumer(int tag, const std::string &shm_path)
: tag_(tag), shm_path_(shm_path)
{
line_size_ = 1024;
inotify_buffer_ = new char[BUF_LEN];
line_ = new char[line_size_];
}
~Consumer()
{
delete[] line_;
delete[] inotify_buffer_;
}
bool Init(const std::string &key)
{
shm_fd_ = shm_open(shm_path_.c_str(), O_RDWR, 0777);
if (shm_fd_ < 0)
{
printf("1. shm_open failed\n");
return false;
}
else
{
printf("1. shm_open successed\n");
}
SHM_Data *shm_info_ = (SHM_Data *)mmap(NULL, sizeof(SHM_Data), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd_, 0);
if (shm_info_ == MAP_FAILED)
{
printf("2. mmap info failed\n");
return false;
}
else
{
printf("2. mmap info successed\n");
}
printf("info size=%ld inotify_name=%s,key=%s\n", shm_info_->size, shm_info_->inotify_name, shm_info_->key);
if (strcasecmp(shm_info_->key, key.c_str()) != 0)
{
printf("3. key not match \n");
return false;
}
else
{
printf("3. key matched \n");
}
// 开始监听文件变化
inotify_fd_ = inotify_init();
if (inotify_fd_ < 0)
{
printf("4. inotify_init failed\n");
return false;
}
else
{
printf("4. inotify_init successed\n");
}
shm_size_ = shm_info_->size;
uint64_t real_size = shm_info_->size + sizeof(SHM_Data);
printf("realsize = %ld\n", real_size);
inotify_add_watch(inotify_fd_, shm_info_->inotify_name, IN_MODIFY | IN_CREATE | IN_DELETE);
shm_data_ = (SHM_Data *)mremap(shm_info_, sizeof(SHM_Data), real_size, MREMAP_MAYMOVE);
if (shm_data_ == MAP_FAILED)
{
printf("5. mmap data failed\n");
return false;
}
else
{
printf("5. mmap data successed\n");
}
return true;
}
void Loop()
{
while (true)
{
while (total_read < shm_data_->total)
{
// printf("5---------------\n");
for (int i = 0; i < line_size_; i++)
{
if (current_offset_ >= shm_size_)
{
current_offset_ = 0;
}
line_[i] = shm_data_->buffer[current_offset_++];
if (line_[i] == '\0')
{
break;
}
}
total_read++;
printf("current_offset=%d, total=%d, read=%d, %s\n", current_offset_, shm_data_->total, total_read, line_);
}
// printf("6.-------\n");
if (!FLAGS_shm_nowait)
{
//printf("7.-------\n");
read(inotify_fd_, inotify_buffer_, BUF_LEN);
struct timeval tv;
struct timezone tz;
gettimeofday(&tv, &tz);
// std::cout<< "second : \t" << tv.tv_sec << std::endl; //秒
std::cout <<count++ << " second : \t" << tv.tv_sec * 1000 +tv.tv_usec/1000<<"." << tv.tv_usec%1000 << std::endl; // 微秒
}
}
}
private:
struct SHM_Data
{
uint64_t total; // 记录消息总数
char inotify_name[512]; // inotify 文件名
char key[64]; // 当前数据标识
uint64_t size; // 环形缓冲区大小
char buffer[]; // 环形缓冲区
};
SHM_Data *shm_data_ = nullptr; // 共享对象指针
uint64_t shm_size_ = 0; // 共享内存大小
uint64_t line_size_ = 0; // 每条数据最大值
uint64_t total_read = 0; // 当前读取总记录数
uint64_t current_offset_ = 0; // 当前读取的偏移量
int count = 0;//tmp
std::string shm_path_;
int inotify_fd_;
int shm_fd_;
int tag_;
char *line_;
char *inotify_buffer_;
};
DEFINE_string(shm_key, "", "shm key");
int main(int argc, char *argv[])
{
gflags::ParseCommandLineFlags(&argc, &argv, true);
Consumer consumer(2, FLAGS_shm_file);
if (!consumer.Init(FLAGS_shm_key))
{
return 1;
}
consumer.Loop();
}Loop()这部分要改成回调机制。
编译
因为这里使用了gflag,所以需要先编译安装gflag。过程参考linux下编译、安装和使用gflags_I_belong_to_jesus的博客-CSDN博客_gflags编译安装
对于本文的两个程序,使用 g++ producer.cpp -o pro -I /usr/local/include -L /usr/local/lib/ -lgflags -lrt 即可编译。
结果如下:

可以发现,这种机制下,生产者生产完数据后,消费者能在100us内感知到,这个效率还是非常高的。

使用top也看不出来这两个进程对CPU的明显消耗。
本文的消费者目前采用的是while循环来持续监听inotify,这个机制还是要改造成事件机制比较好。
参考链接:https://blog.csdn.net/Sweet_Oranges/article/details/107082050
边栏推荐
猜你喜欢

多用户商城多商户B2B2C拼团砍价秒杀支持小程序H5+APP全开源

更改小程序原生radio的颜色及大小

2022杭电多校六 1007-Shinobu loves trip(同余方程)

Q 2020, the latest senior interview Laya soul, do you know?

Writing OpenCV in VSCode

图像处理、分析与机器视觉一书纠错笔记

LabVIEW中如何实现任意形状的不规则按键

After docker is deployed, mysql cannot connect

Cocos Creator Mini Game Case "Stick Soldier"

System basics - study notes (some command records)
随机推荐
LaTeX uses frame to make PPT pictures without labels
【5】Docker中部署MySQL
Error correction notes for the book Image Processing, Analysis and Machine Vision
【FAQ】什么是 Canon CCAPI
Pytorch distributed parallel processing
邮件管理 过滤邮件
Some basic method records of commonly used languages in LeetCode
Matplotlib绘图笔记
(2022杭电多校六)1010-Planar graph(最小生成树)
滚动条问题,未解决
日本卫生设备行业协会:日本温水喷淋马桶座出货量达1亿套
NB-IOT智能云家具项目系列实站
AH8669-AC380/VAC220V转降5V12V24V500MA内电源芯片IC方案
人人AI(吴恩达系列)
## 简讲protobuf-从原理到使用
深入分析若依数据权限@datascope (注解+AOP+动态sql拼接) 【循序渐进,附分析过程】
DevOps process demo (practical record)
Unable to import torchvision. IO. Read_image
Tips for formatting code indentation
scikit-image image processing notes