当前位置:网站首页>Go language ⌈ concurrent programming ⌋

Go language ⌈ concurrent programming ⌋

2022-04-21 16:24:00 The breeze of the coast star

Timer

We often need to run at a later time Go Code , Or run repeatedly within a certain time interval .Go The built-in Timer and Dotter Features make these easy to implement .

package main

import "time"
import "fmt"

func main() {

    //  The timer represents an independent event at a certain time in the future . You tell the timer 
    //  Time to wait , Then it will provide a channel for notification .
    //  The timer here will wait  2  second .
    timer1 := time.NewTimer(time.Second * 2)

    // `<-timer1.C`  Until the channel of this timer  `C`  Definitely sent 
    //  Before the timer expires , Will be blocked all the time .
    <-timer1.C
    fmt.Println("Timer 1 expired")

    //  If all you need is simply waiting , You need to use  `time.Sleep`.
    //  One of the reasons timers are useful is that you can... Before the timer expires , Cancel this 
    //  Timer . This is an example 
    timer2 := time.NewTimer(time.Second)
    go func() {
        <-timer2.C
        fmt.Println("Timer 2 expired")
    }()
    stop2 := timer2.Stop()
    if stop2 {
        fmt.Println("Timer 2 stopped")
    }
}

Dotter

Timer It's used when you want to execute it at some point in the future , and Dotter When you want to repeat the preparation at a fixed time interval . Here is an example of a Dotter , It will execute regularly , Until we stop it .

package main

import "time"
import "fmt"

func main() {

    //  The mechanism of Dotter and timer is a little similar : A channel for sending data .
    //  Here we use the built-in... On this channel  `range`  To iterate the value every 
    // 500ms  Send the value once .
    ticker := time.NewTicker(time.Millisecond * 500)
    go func() {
        for t := range ticker.C {
            fmt.Println("Tick at", t)
        }
    }()

    //  The Dotter can be stopped like a timer . Once a dot stops ,
    //  Will no longer receive values from its channels . We will run after  1500ms
    //  Stop this Dotter .
    time.Sleep(time.Millisecond * 1500)
    ticker.Stop()
    fmt.Println("Ticker stopped")
}

Work pool

In this case , We will see how to use Go The coroutine and channel implement a Work pool .

package main

import "fmt"
import "time"

//  This is the task we will support in multiple concurrent instances . These executors 
//  Will be taken from  `jobs`  Channel receiving task , And through  `results`  Send the corresponding 
//  Result . We will make every task interval  1s  To imitate a time-consuming task .
func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        fmt.Println("worker", id, "processing job", j)
        time.Sleep(time.Second)
        results <- j * 2
    }
}

func main() {

    //  In order to use  worker  Work pool and collect their results , We need to 
    // 2  Channels .
    jobs := make(chan int, 100)
    results := make(chan int, 100)

    //  Here's the launch of  3  individual  worker, Initially blocked , because 
    //  The task hasn't been delivered yet .
    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }

    //  Here we send  9  individual  `jobs`, then  `close`  These passages 
    //  To show that these are all tasks .

    for j := 1; j <= 9; j++ {
        jobs <- j
    }
    close(jobs)

    //  Last , We collect the return values of all these tasks .
    for a := 1; a <= 9; a++ {
        <-results
    }
}

Rate limit

Rate limit It is an important way to control the utilization and quality of service resources .Go adopt Go coroutines 、 Channels and Dotter Graceful support for rate limits .

package main

import "time"
import "fmt"

func main() {

    //  First, we'll look at the basic rate limits . Suppose we want to limit our 
    //  Processing of receiving requests , We send these requests to the same channel .
    requests := make(chan int, 5)
    for i := 1; i <= 5; i++ {
        requests <- i
    }
    close(requests)

    //  This  `limiter`  The channel will every  200ms  Receive a value . This is 
    //  Manager in rate limiting task .
    limiter := time.Tick(time.Millisecond * 200)

    //  By blocking... Before each request  `limiter`  A receiver of a channel , We limit 
    //  Every day  200ms  Execute a request .
    for req := range requests {
        <-limiter
        fmt.Println("request", req, time.Now())
    }

    //  Sometimes we want to temporarily limit the rate , And does not affect the overall rate control 
    //  We can go through   Channel buffering   To achieve .
    //  This  `burstyLimiter`  Channels are used for  3  Temporary pulse rate limit .
    burstyLimiter := make(chan time.Time, 3)

    //  To fill the channel, you need to temporarily change the value twice , To prepare .
    for i := 0; i < 3; i++ {
        burstyLimiter <- time.Now()
    }

    //  Every time  200 ms  We will add a new value to  `burstyLimiter` in ,
    //  Until you reach  3  The limit of one .
    go func() {
        for t := range time.Tick(time.Millisecond * 200) {
            burstyLimiter <- t
        }
    }()

    //  Now the simulation exceeds  5  Multiple access requests . The first of them  3  General 
    //  Because of receiving  `burstyLimiter`  Of “ pulse ” influence .
    burstyRequests := make(chan int, 5)
    for i := 1; i <= 5; i++ {
        burstyRequests <- i
    }
    close(burstyRequests)
    for req := range burstyRequests {
        <-burstyLimiter
        fmt.Println("request", req, time.Now())
    }
}

Atomic counter

Go The most important way of state management is through the communication between channels , We are Work pool In the example of , But there are other ways to manage state . Here we'll see how to use sync/atomic Wrapped in multiple Go In the process of coordination Atomic count .

package main

import "fmt"
import "time"
import "sync/atomic"
import "runtime"

func main() {

    //  We will use an unsigned integer to represent ( Always a positive integer ) This counter .
    var ops uint64 = 0

    //  To simulate concurrent updates , We started  50  individual  Go  coroutines , Pair count 
    //  Device every  1ms  Add one at a time .
    for i := 0; i < 50; i++ {
        go func() {
            for {
                //  Use  `AddUint64`  To make the counter automatically increase , Use 
                // `&`  Grammar to give  `ops`  Memory address of .
                atomic.AddUint64(&ops, 1)

                //  Allow other  Go  Execution of the agreement 
                runtime.Gosched()
            }
        }()
    }

    //  Wait a second , Give Way  ops  Execute the self adding operation of for a while .
    time.Sleep(time.Second)

    //  In order to be used by others in the counter  Go  When the collaboration process is updated , Use it safely ,
    //  We go through  `LoadUint64`  Copy and extract the current value to  `opsFinal`
    //  in . Same as above , We need to give the memory address of this function  `&ops`
    opsFinal := atomic.LoadUint64(&ops)
    fmt.Println("ops:", opsFinal)
}

版权声明
本文为[The breeze of the coast star]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204211602430624.html