当前位置:网站首页>golang源代码阅读,sync系列-Cond

golang源代码阅读,sync系列-Cond

2022-08-09 10:51:00 ase2014

总结

  1. Cond创建的时候需要传入Locker,一般是Mutex或者RWMutex,调用Wait的时候,需要显示调用Cond的L进行加锁和解锁。
  2. 调用Wait的goroutine会被挂起,当其它goroutine调用Signal会释放一个被挂起的goroutine。
  3. 调用Broadcast,会将所有挂起的goroutine释放。
  4. 不能被复制,否则会panic,Cond的copyChecker就是做这件事情的。go vet能检查出来warning,跟sync.WaitGroup一样,有noCopy
  5. 使用链表实现,链表里面存储sudog,调用Wait,会将当前的goroutine挂起,放到链表的最后一个,直到有goroutine调用Signal或者Broadcast

使用例子

Cond使用的到的场景并不是很多,比较少见(本人使用较少,可能是见识少),在golang的源码里有Server部分有使用。
Signal可以使用channel实现,Broadcast可以使用通过close channel的方式。

    mu := new(sync.Mutex)
    cond := sync.NewCond(mu)

    go func() {
        cond.L.Lock()
        defer cond.L.Unlock()

        fmt.Println("enter first")
        cond.Wait()
        fmt.Println("first, ", time.Now().String())
    }()

    go func() {
        cond.L.Lock()
        defer cond.L.Unlock()

        fmt.Println("enter second")
        cond.Wait()
        fmt.Println("second, ", time.Now().String())
    }()

    go func() {
        cond.L.Lock()
        defer cond.L.Unlock()

        fmt.Println("enter third")
        cond.Wait()
        fmt.Println("third, ", time.Now().String())
    }()

    go func() {
        time.Sleep(time.Second)
        fmt.Println("signal one, ", time.Now().String())
        // first second third 会随机有个执行
        cond.Signal()
    }()

    go func() {
        time.Sleep(2 * time.Second)
        fmt.Println("signal all")
        // 剩下两个随机打印
        cond.Broadcast()
    }()

    time.Sleep(3 * time.Second)

关键字

Mutex、链表、goroutine park、goroutine unpark

源码分析

结构体

type Cond struct {
    noCopy noCopy  // go vet检查使用,检查是否被copy了

    // L is held while observing or changing the condition
    L Locker // 锁,一般在Wait调用前使用

    notify  notifyList 
    checker copyChecker // 运行时检查是否被拷贝了,如果拷贝了,直接panic
}
type notifyList struct {
    wait   uint32 // 当调用Wait的编码数
    notify uint32 // 当前通知Wait的编码书
    lock   uintptr // key field of the mutex 内部锁
    head   unsafe.Pointer // 链表的头,存储sudog
    tail   unsafe.Pointer // 链表的尾,存储sudog,wait加入,如果不是第一个,直接加入tail的next
}

创建Cond函数

locker一般是Mutex或者RWMutex

func NewCond(l Locker) *Cond {
    return &Cond{L: l}
}

Wait函数

func (c *Cond) Wait() {
    // 执行Wait前必须加锁
    c.checker.check() // 检查是否被copy,如果copy了,直接panic
    // 将notify的wait加一,并返回wait加一前的数
    // notifyListAdd的具体实现在runtime/sema.go里的notifyListAdd函数
    t := runtime_notifyListAdd(&c.notify)
    // 解锁,让其它Wait函数能进入
    c.L.Unlock()
    //notifyListWait的具体实现在runtime/sema.go里的notifyListWait函数
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}

checker.check函数

  1. 首次进入,在第二个条件判断时,将c指针地址存入了c值,返回true,导致条件不满足,返回
  2. 之后进入,第一个条件就不满足了,直接返回
  3. 如果Cond被赋值给另一个变量,因为*c存储的是上一个cond的指针值,从而导致第一个判断返回true,第二个条件cas也会返回false,因为不等于0,第三个同第一个,从满足条件,触发panic,从而保证了Cond不能被复制(前提是前一个Cond变量已经执行了Wait、Broadcast、Signal中任意一个,否则不会panic)
func (c *copyChecker) check() {
	if uintptr(*c) != uintptr(unsafe.Pointer(c)) && // *c存储的值转化为uintptr是否与c指针地址相等,初始是不相等的,
		!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
		// cas操作,如果c存储的值为0,则将c指针地址存入c,第一次执行这个操作,是成功的,然后返回,下回进入,因为*c存的值和c的地址是同一个,所以第一个检查就会返回
		uintptr(*c) != uintptr(unsafe.Pointer(c)) {
		// 同第一个
		panic("sync.Cond is copied")
	}
}

按照进入wait的顺序,将goroutine加入链表里面,到时通知也是顺序进行通知的

func notifyListWait(l *notifyList, t uint32) {
    加锁
    lockWithRank(&l.lock, lockRankNotifyList)

    // Return right away if this ticket has already been notified.
    // t小于l的notify,直接解锁返回
    if less(t, l.notify) {
        unlock(&l.lock)
        return
    }

    // Enqueue itself.
    // 下面的即把当前goroutine放入链表的尾部,并挂起当前goroutine,sudog后续会花一个章节单独讲
    s := acquireSudog()
    s.g = getg()
    s.ticket = t
    s.releasetime = 0
    t0 := int64(0)
    if blockprofilerate > 0 {
        t0 = cputicks()
        s.releasetime = -1
    }
    if l.tail == nil {
        l.head = s
    } else {
        l.tail.next = s
    }
    l.tail = s
    goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
    if t0 != 0 {
        blockevent(s.releasetime-t0, 2)
    }
    releaseSudog(s)
}

Signal函数

func (c *Cond) Signal() {
    c.checker.check() // 检查是否被copy
    runtime_notifyListNotifyOne(&c.notify) // 通知第c.notify的notify个goroutine,进行释放,并且将notify加一
}
func notifyListNotifyOne(l *notifyList) {
    // Fast-path: if there are no new waiters since the last notification
    // we don't need to acquire the lock at all.
    // wait跟notify相同,没有什么可以通知的,直接返回
    if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
        return
    }

    // 加锁
    lockWithRank(&l.lock, lockRankNotifyList)

    // Re-check under the lock if we need to do anything.
    // double check
    t := l.notify
    if t == atomic.Load(&l.wait) {
        unlock(&l.lock)
        return
    }

    // Update the next notify ticket number.
    // 将notify加一,原始notify已经存储在t
    atomic.Store(&l.notify, t+1)

    // Try to find the g that needs to be notified.
    // If it hasn't made it to the list yet we won't find it,
    // but it won't park itself once it sees the new notify number.
    //
    // This scan looks linear but essentially always stops quickly.
    // Because g's queue separately from taking numbers,
    // there may be minor reorderings in the list, but we
    // expect the g we're looking for to be near the front.
    // The g has others in front of it on the list only to the
    // extent that it lost the race, so the iteration will not
    // be too long. This applies even when the g is missing:
    // it hasn't yet gotten to sleep and has lost the race to
    // the (few) other g's that we find on the list.
    // 将第t个sudog释放
    for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
        if s.ticket == t {
            n := s.next
            if p != nil {
                p.next = n
            } else {
                l.head = n
            }
            if n == nil {
                l.tail = p
            }
            unlock(&l.lock)
            s.next = nil
            readyWithTime(s, 4)
            return
        }
    }
    unlock(&l.lock)
}

Broadcast函数

释放所有掉用了Wait的goroutine

func (c *Cond) Broadcast() {
    c.checker.check()
    runtime_notifyListNotifyAll(&c.notify)
}

循环链表,一次释放,释放goroutine实现在readyWithTime(后续将会有一篇详细讲解释放goroutine),并将notify设置为wait

func notifyListNotifyAll(l *notifyList) {
    // Fast-path: if there are no new waiters since the last notification
    // we don't need to acquire the lock.
    if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
        return
    }

    // Pull the list out into a local variable, waiters will be readied
    // outside the lock.
    lockWithRank(&l.lock, lockRankNotifyList)
    s := l.head
    l.head = nil
    l.tail = nil

    // Update the next ticket to be notified. We can set it to the current
    // value of wait because any previous waiters are already in the list
    // or will notice that they have already been notified when trying to
    // add themselves to the list.
    atomic.Store(&l.notify, atomic.Load(&l.wait))
    unlock(&l.lock)

    // Go through the local list and ready all waiters.
    for s != nil {
        next := s.next
        s.next = nil
        readyWithTime(s, 4)
        s = next
    }
}
原网站

版权声明
本文为[ase2014]所创,转载请带上原文链接,感谢
https://blog.csdn.net/u014763610/article/details/116573900