当前位置:网站首页>Rust: how to implement a thread pool?

Rust: how to implement a thread pool?

2022-04-23 18:04:00 Xu Yeping

In this paper, from CSDN post 《rust actual combat - Implement a thread work pool ThreadPool》, author :firefantasy.

This is the simplest I've ever read 、 The clearest description rust Thread pool design principle and demonstration code , Thank the author for his hard work , And recommend the article to readers .

How to implement a thread pool

Thread pool : A thread usage mode . Too many threads cause scheduling overhead , And then affect cache locality and overall performance . The thread pool maintains multiple threads , Waiting for supervisor to assign concurrent tasks . This avoids the cost of creating and destroying threads when processing short tasks . The thread pool not only ensures full utilization of the kernel , It also prevents overscheduling . The number of available threads should depend on the number of concurrent processors available 、 Processor kernel 、 Memory 、 The Internet sockets Such as the number of . for example , For computing intensive tasks , Number of threads cpu Number +2 More appropriate , Too many threads can cause extra thread switching overhead .

How to define thread pool Pool Well , First, the maximum number of threads must be an attribute of the thread pool , And in new Pool Creates the specified thread when .

1 Thread pool Pool

pub struct Pool {
    
  max_workers: usize, //  Define the maximum number of threads 
}

impl Pool {
    
  fn new(max_workers: usize) -> Pool {
    }
  fn execute<F>(&self, f:F) where F: FnOnce() + "static + Send {
    }
}

use execute To perform the task ,F: FnOnce() + "static + Send It's using thread::spawn Thread execution needs to meet trait, representative F Is a closure function that can be executed in a thread .

Another point naturally comes to mind in Pool Add an array of threads , This thread array is used to execute tasks . such as Vec<Thread> balabala. The thread here is alive , It is an entity that constantly accepts tasks and then executes them .
It can be seen as a thread that continuously executes the acquisition task and executes Worker.

struct Worker where
{
    
    _id: usize, // worker  Number 
}

How to send the task to Worker How about execution ?mpsc(multi producer single consumer) Multiple producers and single consumers can meet our needs ,let (tx, rx) = mpsc::channel() A pair of sender and receiver can be obtained .

Add sender to Pool Inside , Add the receiver to Worker Inside .Pool adopt channel Send tasks to multiple worker Consumption execution .

There is one point that needs special attention ,channel The receiving end of receiver It needs to be shared safely among multiple threads , So we need to use Arc<Mutex::<T>> Come and wrap it up , That is to use locks to solve concurrency conflicts .

2 Pool Complete definition

pub struct Pool {
    
    workers: Vec<Worker>,
    max_workers: usize,
    sender: mpsc::Sender<Message>
}

It's time to define what we're going to send Worker The news of Message 了
Define the following enumeration values

type Job = Box<dyn FnOnce() + "static + Send>;
enum Message {
    
    ByeBye,
    NewJob(Job),
}

Job Is a message to send to Worker The closure function executed , here ByeBye Used to inform Worker You can terminate the current execution , Exit thread .

Only the implementation is left Worker and Pool The concrete logic of .

3 Worker The implementation of the

impl Worker
{
    
    fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
    
        let t = thread::spawn( move || {
    
            loop {
    
                let receiver = receiver.lock().unwrap();
                let message=  receiver.recv().unwrap();
                match message {
    
                    Message::NewJob(job) => {
    
                        println!("do job from worker[{}]", id);
                        job();
                    },
                    Message::ByeBye => {
    
                        println!("ByeBye from worker[{}]", id);
                        break
                    },
                }  
            }
        });

        Worker {
    
            _id: id,
            t: Some(t),
        }
    }
}

let message = receiver.lock().unwrap().recv().unwrap(); Get the lock from here receiver Get the message body , then let message After the end rust The lock is automatically released during the life cycle of the .
But if it's written as

while let message = receiver.lock().unwrap().recv().unwrap() {
    
};

while let The whole parenthesis is a scope , After the scope ends , The lock will release , Better than the top let message Lock for a long time .
rust Of mutex The lock has no corresponding unlock Method , from mutex Life cycle management of .

We give Pool Realization Drop trait, Give Way Pool Be destroyed when the , Automatically pause worker Thread execution .

impl Drop for Pool {
    
    fn drop(&mut self) {
    
        for _ in 0..self.max_workers {
    
            self.sender.send(Message::ByeBye).unwrap();
        }
        for w in self.workers.iter_mut() {
    
            if let Some(t) = w.t.take() {
    
                t.join().unwrap();
            }
        }
    }
}

drop Method uses two loops , Instead of doing two things in a cycle ?

for w in self.workers.iter_mut() {
    
    if let Some(t) = w.t.take() {
    
        self.sender.send(Message::ByeBye).unwrap();
        t.join().unwrap();
    }
}

There is a trap that can cause deadlock , For example, two. Worker, Iterate over all... In a single loop Worker, After sending the termination information to the channel , Call directly join,
We expect to be the first worker To receive a message , And when he's done . When the situation may be the second worker Got the message , first worker No access to , What's next join It will block and cause deadlock .

Notice no ,Worker Is packaged in Option Internal , Here are two points to note

  1. t.join Need to hold t The ownership of the
  2. In our case ,self.workers Can only be used as a reference for Loop iteration .

Let's consider Worker hold Option<JoinHandle<()>>, Follow up can be done by Option On the call take Methods will Some The value of the variant is moved out , And leave it in its original position None variant .
In other words , Let the running worker hold Some A variation of the , clear worker when , have access to None Replace Some, So that Worker Lose a thread that can run

struct Worker where
{
    
    _id: usize,
    t: Option<JoinHandle<()>>,
}

4 Summary of key points

  • Mutex Rely on lifecycle management to release locks , When using, pay attention to whether the lock is overdue
  • Vec<Option<T>> It can solve the need of T The scene of ownership

5 Complete code

use std::thread::{
    self, JoinHandle};
use std::sync::{
    Arc, mpsc, Mutex};


type Job = Box<dyn FnOnce() + "static + Send>;
enum Message {
    
    ByeBye,
    NewJob(Job),
}

struct Worker where
{
    
    _id: usize,
    t: Option<JoinHandle<()>>,
}

impl Worker
{
    
    fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
    
        let t = thread::spawn( move || {
    
            loop {
    
                let message = receiver.lock().unwrap().recv().unwrap();
                match message {
    
                    Message::NewJob(job) => {
    
                        println!("do job from worker[{}]", id);
                        job();
                    },
                    Message::ByeBye => {
    
                        println!("ByeBye from worker[{}]", id);
                        break
                    },
                }  
            }
        });

        Worker {
    
            _id: id,
            t: Some(t),
        }
    }
}

pub struct Pool {
    
    workers: Vec<Worker>,
    max_workers: usize,
    sender: mpsc::Sender<Message>
}

impl Pool where {
    
    pub fn new(max_workers: usize) -> Pool {
    
        if max_workers == 0 {
    
            panic!("max_workers must be greater than zero!")
        }
        let (tx, rx) = mpsc::channel();

        let mut workers = Vec::with_capacity(max_workers);
        let receiver = Arc::new(Mutex::new(rx));
        for i in 0..max_workers {
    
            workers.push(Worker::new(i, Arc::clone(&receiver)));
        }

        Pool {
     workers: workers, max_workers: max_workers, sender: tx }
    }
    
    pub fn execute<F>(&self, f:F) where F: FnOnce() + "static + Send
    {
    

        let job = Message::NewJob(Box::new(f));
        self.sender.send(job).unwrap();
    }
}

impl Drop for Pool {
    
    fn drop(&mut self) {
    
        for _ in 0..self.max_workers {
    
            self.sender.send(Message::ByeBye).unwrap();
        }
        for w in self.workers {
    
            if let Some(t) = w.t.take() {
    
                t.join().unwrap();
            }
        }
    }
}

#[cfg(test)]
mod tests {
    
    use super::*;
    #[test]
    fn it_works() {
    
        let p = Pool::new(4);
        p.execute(|| println!("do new job1"));
        p.execute(|| println!("do new job2"));
        p.execute(|| println!("do new job3"));
        p.execute(|| println!("do new job4"));
    }

}


original text :rust actual combat - Implement a thread work pool ThreadPool
author :firefantasy
source :CSDN—— Chinese programmer Forum

版权声明
本文为[Xu Yeping]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204230544498195.html