当前位置:网站首页>golang源代码阅读,sync系列-Cond
golang源代码阅读,sync系列-Cond
2022-08-09 10:51:00 【ase2014】
总结
- Cond创建的时候需要传入Locker,一般是Mutex或者RWMutex,调用Wait的时候,需要显示调用Cond的L进行加锁和解锁。
- 调用Wait的goroutine会被挂起,当其它goroutine调用Signal会释放一个被挂起的goroutine。
- 调用Broadcast,会将所有挂起的goroutine释放。
- 不能被复制,否则会panic,Cond的copyChecker就是做这件事情的。go vet能检查出来warning,跟sync.WaitGroup一样,有noCopy
- 使用链表实现,链表里面存储sudog,调用Wait,会将当前的goroutine挂起,放到链表的最后一个,直到有goroutine调用Signal或者Broadcast
使用例子
Cond使用的到的场景并不是很多,比较少见(本人使用较少,可能是见识少),在golang的源码里有Server部分有使用。
Signal可以使用channel实现,Broadcast可以使用通过close channel的方式。
mu := new(sync.Mutex)
cond := sync.NewCond(mu)
go func() {
cond.L.Lock()
defer cond.L.Unlock()
fmt.Println("enter first")
cond.Wait()
fmt.Println("first, ", time.Now().String())
}()
go func() {
cond.L.Lock()
defer cond.L.Unlock()
fmt.Println("enter second")
cond.Wait()
fmt.Println("second, ", time.Now().String())
}()
go func() {
cond.L.Lock()
defer cond.L.Unlock()
fmt.Println("enter third")
cond.Wait()
fmt.Println("third, ", time.Now().String())
}()
go func() {
time.Sleep(time.Second)
fmt.Println("signal one, ", time.Now().String())
// first second third 会随机有个执行
cond.Signal()
}()
go func() {
time.Sleep(2 * time.Second)
fmt.Println("signal all")
// 剩下两个随机打印
cond.Broadcast()
}()
time.Sleep(3 * time.Second)
关键字
Mutex、链表、goroutine park、goroutine unpark
源码分析
结构体
type Cond struct {
noCopy noCopy // go vet检查使用,检查是否被copy了
// L is held while observing or changing the condition
L Locker // 锁,一般在Wait调用前使用
notify notifyList
checker copyChecker // 运行时检查是否被拷贝了,如果拷贝了,直接panic
}
type notifyList struct {
wait uint32 // 当调用Wait的编码数
notify uint32 // 当前通知Wait的编码书
lock uintptr // key field of the mutex 内部锁
head unsafe.Pointer // 链表的头,存储sudog
tail unsafe.Pointer // 链表的尾,存储sudog,wait加入,如果不是第一个,直接加入tail的next
}
创建Cond函数
locker一般是Mutex或者RWMutex
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}
Wait函数
func (c *Cond) Wait() {
// 执行Wait前必须加锁
c.checker.check() // 检查是否被copy,如果copy了,直接panic
// 将notify的wait加一,并返回wait加一前的数
// notifyListAdd的具体实现在runtime/sema.go里的notifyListAdd函数
t := runtime_notifyListAdd(&c.notify)
// 解锁,让其它Wait函数能进入
c.L.Unlock()
//notifyListWait的具体实现在runtime/sema.go里的notifyListWait函数
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
checker.check函数
- 首次进入,在第二个条件判断时,将c指针地址存入了c值,返回true,导致条件不满足,返回
- 之后进入,第一个条件就不满足了,直接返回
- 如果Cond被赋值给另一个变量,因为*c存储的是上一个cond的指针值,从而导致第一个判断返回true,第二个条件cas也会返回false,因为不等于0,第三个同第一个,从满足条件,触发panic,从而保证了Cond不能被复制(前提是前一个Cond变量已经执行了Wait、Broadcast、Signal中任意一个,否则不会panic)
func (c *copyChecker) check() {
if uintptr(*c) != uintptr(unsafe.Pointer(c)) && // *c存储的值转化为uintptr是否与c指针地址相等,初始是不相等的,
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
// cas操作,如果c存储的值为0,则将c指针地址存入c,第一次执行这个操作,是成功的,然后返回,下回进入,因为*c存的值和c的地址是同一个,所以第一个检查就会返回
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
// 同第一个
panic("sync.Cond is copied")
}
}
按照进入wait的顺序,将goroutine加入链表里面,到时通知也是顺序进行通知的
func notifyListWait(l *notifyList, t uint32) {
加锁
lockWithRank(&l.lock, lockRankNotifyList)
// Return right away if this ticket has already been notified.
// t小于l的notify,直接解锁返回
if less(t, l.notify) {
unlock(&l.lock)
return
}
// Enqueue itself.
// 下面的即把当前goroutine放入链表的尾部,并挂起当前goroutine,sudog后续会花一个章节单独讲
s := acquireSudog()
s.g = getg()
s.ticket = t
s.releasetime = 0
t0 := int64(0)
if blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
if t0 != 0 {
blockevent(s.releasetime-t0, 2)
}
releaseSudog(s)
}
Signal函数
func (c *Cond) Signal() {
c.checker.check() // 检查是否被copy
runtime_notifyListNotifyOne(&c.notify) // 通知第c.notify的notify个goroutine,进行释放,并且将notify加一
}
func notifyListNotifyOne(l *notifyList) {
// Fast-path: if there are no new waiters since the last notification
// we don't need to acquire the lock at all.
// wait跟notify相同,没有什么可以通知的,直接返回
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
// 加锁
lockWithRank(&l.lock, lockRankNotifyList)
// Re-check under the lock if we need to do anything.
// double check
t := l.notify
if t == atomic.Load(&l.wait) {
unlock(&l.lock)
return
}
// Update the next notify ticket number.
// 将notify加一,原始notify已经存储在t
atomic.Store(&l.notify, t+1)
// Try to find the g that needs to be notified.
// If it hasn't made it to the list yet we won't find it,
// but it won't park itself once it sees the new notify number.
//
// This scan looks linear but essentially always stops quickly.
// Because g's queue separately from taking numbers,
// there may be minor reorderings in the list, but we
// expect the g we're looking for to be near the front.
// The g has others in front of it on the list only to the
// extent that it lost the race, so the iteration will not
// be too long. This applies even when the g is missing:
// it hasn't yet gotten to sleep and has lost the race to
// the (few) other g's that we find on the list.
// 将第t个sudog释放
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
unlock(&l.lock)
s.next = nil
readyWithTime(s, 4)
return
}
}
unlock(&l.lock)
}
Broadcast函数
释放所有掉用了Wait的goroutine
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
循环链表,一次释放,释放goroutine实现在readyWithTime(后续将会有一篇详细讲解释放goroutine),并将notify设置为wait
func notifyListNotifyAll(l *notifyList) {
// Fast-path: if there are no new waiters since the last notification
// we don't need to acquire the lock.
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
// Pull the list out into a local variable, waiters will be readied
// outside the lock.
lockWithRank(&l.lock, lockRankNotifyList)
s := l.head
l.head = nil
l.tail = nil
// Update the next ticket to be notified. We can set it to the current
// value of wait because any previous waiters are already in the list
// or will notice that they have already been notified when trying to
// add themselves to the list.
atomic.Store(&l.notify, atomic.Load(&l.wait))
unlock(&l.lock)
// Go through the local list and ready all waiters.
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}
边栏推荐
- PoseNet: A Convolutional Network for Real-Time 6-DOF Camera Relocalization论文阅读
- 可能95%的人还在犯的PyTorch错误
- Solve 1. tensorflow runs using CPU but not GPU 2. GPU version number in tensorflow environment 3. Correspondence between tensorflow and cuda and cudnn versions 4. Check cuda and cudnn versions
- electron 应用开发优秀实践
- 笔记本电脑使用常见问题,持续更新
- numpy库中的函数 bincount() where() diag() all()
- 如何在gazebo进行 joint的转动控制
- golang runtime Caller、Callers、CallersFrames、FuncForPC、Stack作用
- ThreadLocal及其内存泄露分析
- linux mysql操作的相关命令
猜你喜欢
人物 | 从程序员到架构师,我是如何快速成长的?
shell脚本实战(第2版)/人民邮电出版社 脚本2 验证输入:仅限字母和数字
prometheus接入mysqld_exporter
性能测试(05)-表达式和业务关联-json关联
美的数字化平台 iBUILDING 背后的技术选型
linux mysql操作的相关命令
Solve the ali cloud oss - the original 】 【 exe double-click response can't open, to provide a solution
MNIST机器学习入门
PoseNet: A Convolutional Network for Real-Time 6-DOF Camera Relocalization论文阅读
在webgis中显示矢量化后的风险防控信息
随机推荐
Mysql多表查询
shap库源码和代码实现
力扣(LeetCode)220. 存在重复元素 III(2022.08.08)
性能测试(01)-jmeter元件-线程组、调试取样器
unix环境编程 第十五章 15.10 POSIX信号量
numpy的ndarray取数操作
多商户商城系统功能拆解26讲-平台端分销设置
Netscope:神经网络结构在线可视化工具
图片查看器viewer
乘积量化(PQ)
pip common commands and changing source files
faster-rcnn学习
基于STM32设计的环境检测设备
备份mongodb数据库(认证)
jvm-类加载系统
electron 应用开发优秀实践
性能测试(06)-逻辑控制器
Product Quantization (PQ)
依赖注入(Dependency Injection)框架是如何实现的
关于anaconda中conda下载包或者pip下载包很慢的原因,加速下载包的方法(无视anaconda版本和环境)