当前位置:网站首页>Detailed explanation of channel implementation of cocoyaxi Library
Detailed explanation of channel implementation of cocoyaxi Library
2022-04-22 13:52:00 【asp-cc】
Channel Realization
cocoyaxi Library profile
CO It's an elegant 、 efficient C++ Base library , Support Linux, Windows And Mac Such as platform , It implements something like golang Cooperation of 、 Network programming framework based on coprocessing 、 Command line parameters and configuration file parsing library 、 High performance log Library 、 Unit test framework 、JSON Library and a series of high-quality basic components .
Official document description
https://idealvin.github.io/cn/co/coroutine/#channelcochan
co::Chan It's a template class , It is similar to golang Medium channel, Used to transfer data between processes .
template <typename T> class Chan;
-
co::ChanInternal memory copy based implementation , Template parameter T It can be a built-in type 、 Pointer types , perhaps Copy operation is a structure type with simple memory copy semantics . In short ,T The following conditions must be met : about T Two variables or objects of type a And b, a = b Equivalent to memcpy(&a, &b, sizeof(T)). -
image
std::stringor STL Container type in , The copy operation is not a simple memory copy , So it can't be directly in channel In the transfer .
Code example
#include "co/co.h"
void f() {
co::Chan<int> ch;
go([ch]() {
ch << 7; });
int v = 0;
ch >> v;
LOG << "v: " << v;
}
void g() {
co::Chan<int> ch(32, 500);
go([ch]() {
ch << 7;
if (co::timeout()) LOG << "write to channel timeout..";
});
int v = 0;
ch >> v;
if (!co::timeout()) LOG << "v: " << v;
}
DEF_main(argc, argv) {
f();
g();
return 0;
}
In the above code channel The object is on the stack , and CO The shared stack implementation is adopted , The data on a process stack may be overwritten by other processes , Generally, the data communication between processes cannot be directly through the data on the stack , So the lambda Adopted Capture by value The way , take channel Made a copy of , Transfer to the new collaboration .channel The copy operation of just adds the internal reference count 1, Little impact on performance .
Preface
Channel Is a common producer - Consumer model application . Often used in the transmission of information .
cocoyaxi In the library Channel The implementation of can be said to be very characteristic , As you can see from the code examples in the official documentation above ,channel It also provides timeout function , Give developers more flexibility .
thorough Channel
// chan.h
template <typename T>
class Chan {
public:
/** * @param cap max capacity of the queue, 1 by default. * @param ms default timeout in milliseconds, -1 by default. */
explicit Chan(uint32 cap=1, uint32 ms=(uint32)-1)
: _p(cap * sizeof(T), sizeof(T), ms) {
}
~Chan() = default;
Chan(Chan&& c) : _p(std::move(c._p)) {
}
Chan(const Chan& c) : _p(c._p) {
}
void operator=(const Chan&) = delete;
void operator<<(const T& x) const {
_p.write(&x);
}
void operator>>(T& x) const {
_p.read(&x);
}
private:
xx::Pipe _p;
};
co::Chan For the definition of chan.h, It can be seen that ,channel The specific implementation is left to xx::Pipe.
But after all, this is the outermost interface for developers , The code is worth looking at .
Continue with the text , The specific realization Chain is this kind of :
co::Chan → xx::Pipe → xx::PipeImpl
Let's go straight to xx::PipeImpl Code for
class PipeImpl {
public:
PipeImpl(uint32 buf_size, uint32 blk_size, uint32 ms)
: _buf_size(buf_size), _blk_size(blk_size),
_rx(0), _wx(0), _ms(ms), _full(false) {
_buf = (char*) malloc(_buf_size);
}
void read(void* p);
void write(const void* p);
...
private:
::Mutex _m;
std::deque<waitx*> _wq;
char* _buf; // buffer
uint32 _buf_size; // buffer size
uint32 _blk_size; // block size
...
uint32 _ms; // timeout in milliseconds
bool _full; // 0: not full, 1: full
};
First look at the function of member variables from the constructor , A simple analysis of the down-regulation chain
/** * @param cap max capacity of the queue, 1 by default. * @param ms default timeout in milliseconds, -1 by default. */
explicit Chan(uint32 cap=1, uint32 ms=(uint32)-1)
: _p(cap * sizeof(T), sizeof(T), ms) {
}
PipeImpl(uint32 buf_size, uint32 blk_size, uint32 ms) {
...
}
It is not difficult to find the corresponding relationship :
buf_size → cap * sizeof(T)
blk_size → sizeof(T)
ms → ms
sizeof(T), among T It's a template parameter , This is the type of information transmission ,sizeof(T) You can find the size of this type ,blk_size just sizeof(T). We call it block size (block size).
cap Is capacity , That is, the maximum number of messages delivered this time , So the buffer buf_ Allocated just enough to hold cap A data space .
ms It's a timeout , In Milliseconds .
Next, let's look at other contents
class PipeImpl {
...
struct waitx {
co::Coroutine* co;
union {
uint8 state;
void* dummy;
};
void* buf;
...
};
waitx Structure can be understood as waiting context , It's used in two places :
-
When external data needs to be read ( For variables, it's a write operation , Direct operation pointer ), That is, when you need to read the buffer , If the buffer is empty , No data to read , At this time, it is necessary to suspend the current collaboration .
-
When external data needs to be written ( For variables, it's a read operation ), That is, when writing buffer , If the buffer is full , Unable to write data , At this time, it is necessary to suspend the current collaboration .
waitx There are four states , No need to say more.
enum co_state_t : uint8 {
st_init = 0, // initial state
st_wait = 1, // wait for an event
st_ready = 2, // ready to resume
st_timeout = 4, // timeout
};
There is one left void* buf member , We need to synthesize it to know its usefulness .
// buf In fact, it is the pointer of the variable that needs to be read and written
waitx* create_waitx(co::Coroutine* co, void* buf) {
waitx* w;
// Judge buf Whether on the stack
const bool on_stack = gSched->on_stack(buf);
if (on_stack) {
// Switching out the coprocess may affect the value of the variable , See the following analysis
// Variable reads one block size at a time , So just allocate one more _blk_size
w = (waitx*) malloc(sizeof(waitx) + _blk_size);
// w->buf Point to waitx The end of the structure
w->buf = (char*)w + sizeof(waitx);
} else {
// Variables are not defined on the current stack
// The value of the variable will not be affected when the coprocess is switched out
// w->buf Still use existing variable pointers
w = (waitx*) malloc(sizeof(waitx));
w->buf = buf;
}
w->co = co;
w->state = st_init;
return w;
}
Why do we need to check whether the variable is on the co process stack buf Do two different operations ?
If it is a variable created in the process, start from channel Middle reading data ,( The running process uses the shared stack ), After the current collaboration is suspended , The data in the shared stack may be occupied by other processes , The variables created in the coroutine will be transferred to the private stack , Be careful , The process of writing the read value to the variable pointer may not occur in the co process of the variable , In this case, if the variable pointer is modified , In fact, what is modified is the memory value on the shared stack , Instead of variables on the private stack of the suspended coroutine , This will lead to exceptions .
therefore , You need to allocate a new space to temporarily store read and write values . This can be considered as a protection of stack pointer variables .
Take a popular example , The football field is a public place , There are many football teams waiting to play inside . At some point ,A The football team is preparing to play , Zhang San is short of a good pair of sneakers ,A Zhang San, the forward of the team, called a friend , Ask a friend to bring his sneakers , But my friend just went out , It didn't come that fast . Unexpectedly, the team manager came to their team for a meeting , It is said that we should discuss whether to eat fried sea cucumber with scallion or boil sea cucumber soup in the evening . The team members immediately left the stadium in high spirits and went to the meeting place . Not long ago , My friend sent me shoes , Zhang San told him to stand at the gate of the court , Now there is a man standing on the court , But he is not Zhang San . Friends want to entrust shoes to people nearby , Ask them to help give the shoes to Zhang San . But , Who should I look for ? Find someone from another team ? Maybe they're a team from out of town , After a game, you'll never come back , Besides, they don't recognize Zhang San . Then give it to Zhang San who knows him nearby , Old people who often come to training , My friend entrusted it to an old man , And then I went back . Zhang San and his party agreed to eat fried sea cucumber with scallions. They were very happy , Back on the pitch to play . But Zhang San still lacks a pair of good sneakers , The entrusted old man is very righteous , Seeing Zhang San coming, he immediately gave him his sneakers . So Zhang San can happily think of sea cucumber and play football in his mind .
The stadium is equivalent to a shared stack , A team is a team , The team can have its own resting place ( Private stack , It is used to save the process stack during interrupt ). however , If you want to play football ( function ) Must be on the court ( Shared stack ) Go ahead .
A Zhang San of the team is equivalent to a variable on the synergy stack , The team is gone , He will follow .
Zhang San needs a good pair of sneakers , Equivalent to the variable wants to start from Channel Middle reading data . A friend is equivalent to Channel The process of writing data .
A The team's meeting is equivalent to the interruption of the process , At this time, the court may be occupied by other teams .
However, the friends who give shoes don't care whether Zhang San's team plays football on the field or not .
Here he is , Or give the shoes to Zhang San , Or entrust it to someone else .
But Zhang San has left with the team . Although Zhang San told his friends , He waited for him at the gate of the court , But now even if there are people on the court , That's not true, Zhang San , It could be B Li Si of the team , It's impossible to give him the shoes ( After all, it's not his shoes , He probably doesn't recognize Zhang San ).
So friends need to find someone to stay long enough , Someone who knows Zhang San , It can be an old man who often stays on the court , It can also be a child who spends money on temporary entrustment .
All in all , When the team comes back to play again , The client has to give the shoes to Zhang San .
To make a long story short , In fact, the problem is that Zhang San only gave it to his friends Where he was at that time . Where did he go after he left with the team , Friends don't know . Recognize that the problem is , It's easy to understand .
There are still some last member variables to introduce :
::Mutex _m; // The mutex , Manage critical areas
std::deque<waitx*> _wq; // Waiting in line
char* _buf; // buffer buffer
uint32 _buf_size; // buffer size Buffer size
uint32 _blk_size; // block size [ A piece of data ] Size
uint32 _rx; // read pos Read pointer Read offset
uint32 _wx; // write pos The write pointer Write offset
uint32 _ms; // timeout in milliseconds Timeout time
bool _full; // 0: not full, 1: full Is the buffer full
Class introduction is over , The latter method of reading and writing is the main play .
read Method
read It refers to reading data from the buffer , Write to the incoming pointer p in .
void PipeImpl::read(void* p) {
// thread local Global scheduler
auto s = gSched;
CHECK(s) << "must be called in coroutine..";
_m.lock();
// The read and write pointers are not equal , Indicates that the buffer is not empty , It's not full
if (_rx != _wx) {
/* buffer is neither empty nor full */
assert(!_full);
assert(_wq.empty());
// To the pointer p Write data
// _buf Point to the starting address of the buffer ,_buf+_rx Refers to the current position of the read pointer
// This code means
// Start with the read pointer , Read backward _blk_size Length data , Copied to the p Pointer to the address
memcpy(p, _buf + _rx, _blk_size);
// The reading pointer moves backward
_rx += _blk_size;
// If you move to the end , To return to one's place
// The buffer period can be regarded as a ring queue
if (_rx == _buf_size) _rx = 0;
_m.unlock();
} else {
// Buffer is empty , I can't read the data , The collaboration needs to be suspended
if (!_full) {
/* buffer is empty */
// The current schedule
auto co = s->running();
// Create wait context
waitx* w = this->create_waitx(co, p);
// Join the wait queue
_wq.push_back(w);
_m.unlock();
// Collaboration scheduler update
if (co->s != s) co->s = s;
co->waitx = w;
// Start the timeout function
if (_ms != (uint32)-1) s->add_timer(_ms);
// Pending process
s->yield();
// Run here , It indicates that the cooperation process has been restored
// Judge whether it's time-out
if (!s->timeout()) {
// From the foregoing create_waitx It can be found that
// When p When in the process stack w->buf It's not equal to p
// Because a new space has been allocated to replace p
// But it's still someone else's thing
// I want to return it to p
if (w->buf != p) memcpy(p, w->buf, _blk_size);
::free(w);
}
co->waitx = 0;
} else {
/* buffer is full */
// The buffer is full , It shows that the production speed is fast , Consumption is slow
// At this time, can there only be cooperative processes that want to write but can't write in the waiting queue
// There can be no collaborative process you want to read
memcpy(p, _buf + _rx, _blk_size);
_rx += _blk_size;
if (_rx == _buf_size) _rx = 0;
while (!_wq.empty()) {
waitx* w = _wq.front(); // wait for write
_wq.pop_front();
if (atomic_compare_swap(&w->state, st_init, st_ready) == st_init) {
// What is taken out is the write process wait context , Instead of reading the coroutine wait context
// Think about why
// So here is the write operation
memcpy(_buf + _wx, w->buf, _blk_size);
_wx += _blk_size;
if (_wx == _buf_size) _wx = 0;
_m.unlock();
// Scheduling write process
((co::SchedulerImpl*) w->co->s)->add_ready_task(w->co);
return;
} else {
/* timeout */
::free(w);
}
}
_full = false;
_m.unlock();
}
}
}
write Method
write From the pointer p Middle reading data , Write to buffer .
p The pointer is read-only and not written , therefore write Parameters void* p Add a const Modifier .
void PipeImpl::write(const void* p) {
// thread local Global scheduler
auto s = gSched;
CHECK(s) << "must be called in coroutine..";
_m.lock();
if (_rx != _wx) {
/* buffer is neither empty nor full */
assert(!_full);
assert(_wq.empty());
// take p The data of the pointer is copied to the write pointer
memcpy(_buf + _wx, p, _blk_size);
// The write pointer moves to the right
_wx += _blk_size;
// That's the end of it , place
if (_wx == _buf_size) _wx = 0;
// Reading and writing pointers meet , It means things are full
// The buffer period can be regarded as a ring queue
// Reading and writing pointers meet , Indicates that the buffer is either empty , Or full
// Currently writing data to the buffer
// So it can only be full , It can't be empty
if (_rx == _wx) _full = true;
_m.unlock();
} else {
if (!_full) {
/* buffer is empty */
// If the buffer is empty , It shows that the consumption speed is fast , The production speed is slow
// If there is data in the waiting queue , That must be a collaborative process that you want to read but can't read
// There can be no collaborative process you want to write
while (!_wq.empty()) {
// Extract a wait context from the buffer , It includes waiting process
// A collaboration can only be read
waitx* w = _wq.front(); // wait for read
_wq.pop_front();
if (atomic_compare_swap(&w->state, st_init, st_ready) == st_init) {
_m.unlock();
// Write the current pointer data to w->buf
// Yes w->buf That is to say “ read “ Here's the data
memcpy(w->buf, p, _blk_size);
// Xie Cheng got the data , Scheduling protocol
((co::SchedulerImpl*) w->co->s)->add_ready_task(w->co);
// At present, the content has been read , Nothing to read , Exit function
// This does not mean that the waiting queue does not have a waiting process
return;
} else {
/* timeout */
::free(w);
}
}
// Run here , It means that the waiting queue is either empty , Either the wait context has timed out
// To make a long story short , There is no successful reading in the waiting queue
// Now you can read the pointer data and write it to the buffer
memcpy(_buf + _wx, p, _blk_size);
_wx += _blk_size;
if (_wx == _buf_size) _wx = 0;
if (_rx == _wx) _full = true;
_m.unlock();
} else {
/* buffer is full */
// Buffer full , No more writing , Need to suspend the collaboration
auto co = s->running();
waitx* w = this->create_waitx(co, (void*)p);
// Variables are allocated in the stack , therefore w->buf It's not a pointer to the original variable
// It's waiting for the context (waitx) A new space in the back
// Therefore, you need to save the value of the variable again
if (w->buf != p) memcpy(w->buf, p, _blk_size);
_wq.push_back(w);
_m.unlock();
if (co->s != s) co->s = s;
co->waitx = w;
if (_ms != (uint32)-1) s->add_timer(_ms);
// Pending process
s->yield();
// Run here , It indicates that the cooperation process has been restored
// Judge whether it's time-out
if (!s->timeout()) ::free(w);
co->waitx = 0;
}
}
}
版权声明
本文为[asp-cc]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204221349391974.html
边栏推荐
- Database tablespace is damaged. How to deal with the exception of backup table data?
- Sixtool multi-functional multi-in-one generation hanging assistant system source code
- Go down and continue to go down. Xiaohui's fund has lost 640000...
- Is it safe to open an account in Zhujiang futures?
- On QPS, TPS, number of concurrent users and throughput
- 苏小红C语言程序设计第四、五章知识总结
- Full link remodeling! From thinking to practice, digital transformation is the successful path of it operation
- 数据库表空间损坏,备份表数据异常如何处理?
- 快速串讲——JVM内存的区域划分
- Harbor V2. 5 Mise à jour, quelles fonctions ont été ajoutées?
猜你喜欢

Database resource load management (Part 2)
![[zeekr_tech] Introduction to ros/ros 2](/img/96/351ae626951cfb016146aa52ff7f32.png)
[zeekr_tech] Introduction to ros/ros 2

产业园区数字化运营管理之“精准招商”篇

Harbor V2. 5 Mise à jour, quelles fonctions ont été ajoutées?

Redis (VI) - set of common data types of redis

SixTool多功能多合一代挂助手系统源码

C 7.0 use underline to ignore variables used

QT explorer and Use of QRC file

產業園區數字化運營管理之“精准招商”篇

CDF global survey: stagnant software delivery performance
随机推荐
CVPR 2022 Oral | 大连理工提出小样本识别DeepBDC,6项基准性能最好
How to turn on self-monitoring of Apache skywalking?
MySQL DNS解析和主机缓存
BCC-stackcount
那些年我们一起优化的SQL
ICLR2022杰出论文奖出炉,清华、人大获奖,浙大提名
BCC-funccount
字长和数据类型
Super user of oceanbase - system tenant sys
The more "intelligent" the machine is, the easier it is for data taggers to be eliminated? Manfu Technology
This is how redis pipeline was used
#yyds干货盘点# 解决剑指offer:机器人的运动范围
快速串讲——JVM内存的区域划分
LeetCode-3 无重复字符的最长子串
EnvironmentPostProcessor怎么做单元测试?阿里P7解答
[software test series x] stress test scheme
What are the types of blocking queues in the thread pool?
SixTool多功能多合一代挂助手系统源码
Harbor v2.5更新,都增加了哪些功能?
产业园区数字化运营管理之“企业服务”篇