当前位置:网站首页>Rust: how to implement a thread pool?
Rust: how to implement a thread pool?
2022-04-23 18:04:00 【Xu Yeping】
In this paper, from CSDN post 《rust actual combat - Implement a thread work pool ThreadPool》, author :firefantasy.
This is the simplest I've ever read 、 The clearest description rust Thread pool design principle and demonstration code , Thank the author for his hard work , And recommend the article to readers .
How to implement a thread pool
Thread pool : A thread usage mode . Too many threads cause scheduling overhead , And then affect cache locality and overall performance . The thread pool maintains multiple threads , Waiting for supervisor to assign concurrent tasks . This avoids the cost of creating and destroying threads when processing short tasks . The thread pool not only ensures full utilization of the kernel , It also prevents overscheduling . The number of available threads should depend on the number of concurrent processors available 、 Processor kernel 、 Memory 、 The Internet sockets Such as the number of . for example , For computing intensive tasks , Number of threads cpu Number +2 More appropriate , Too many threads can cause extra thread switching overhead .
How to define thread pool Pool Well , First, the maximum number of threads must be an attribute of the thread pool , And in new Pool Creates the specified thread when .
1 Thread pool Pool
pub struct Pool {
max_workers: usize, // Define the maximum number of threads
}
impl Pool {
fn new(max_workers: usize) -> Pool {
}
fn execute<F>(&self, f:F) where F: FnOnce() + "static + Send {
}
}
use execute To perform the task ,F: FnOnce() + "static + Send
It's using thread::spawn
Thread execution needs to meet trait
, representative F Is a closure function that can be executed in a thread .
Another point naturally comes to mind in Pool Add an array of threads , This thread array is used to execute tasks . such as Vec<Thread> balabala
. The thread here is alive , It is an entity that constantly accepts tasks and then executes them .
It can be seen as a thread that continuously executes the acquisition task and executes Worker.
struct Worker where
{
_id: usize, // worker Number
}
How to send the task to Worker How about execution ?mpsc(multi producer single consumer) Multiple producers and single consumers can meet our needs ,let (tx, rx) = mpsc::channel()
A pair of sender and receiver can be obtained .
Add sender to Pool Inside , Add the receiver to Worker Inside .Pool adopt channel Send tasks to multiple worker Consumption execution .
There is one point that needs special attention ,channel The receiving end of receiver It needs to be shared safely among multiple threads , So we need to use Arc<Mutex::<T>>
Come and wrap it up , That is to use locks to solve concurrency conflicts .
2 Pool Complete definition
pub struct Pool {
workers: Vec<Worker>,
max_workers: usize,
sender: mpsc::Sender<Message>
}
It's time to define what we're going to send Worker The news of Message 了
Define the following enumeration values
type Job = Box<dyn FnOnce() + "static + Send>;
enum Message {
ByeBye,
NewJob(Job),
}
Job Is a message to send to Worker The closure function executed , here ByeBye Used to inform Worker You can terminate the current execution , Exit thread .
Only the implementation is left Worker and Pool The concrete logic of .
3 Worker The implementation of the
impl Worker
{
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
let t = thread::spawn( move || {
loop {
let receiver = receiver.lock().unwrap();
let message= receiver.recv().unwrap();
match message {
Message::NewJob(job) => {
println!("do job from worker[{}]", id);
job();
},
Message::ByeBye => {
println!("ByeBye from worker[{}]", id);
break
},
}
}
});
Worker {
_id: id,
t: Some(t),
}
}
}
let message = receiver.lock().unwrap().recv().unwrap();
Get the lock from here receiver Get the message body , then let message After the end rust The lock is automatically released during the life cycle of the .
But if it's written as
while let message = receiver.lock().unwrap().recv().unwrap() {
};
while let The whole parenthesis is a scope , After the scope ends , The lock will release , Better than the top let message Lock for a long time .
rust Of mutex The lock has no corresponding unlock Method , from mutex Life cycle management of .
We give Pool Realization Drop trait, Give Way Pool Be destroyed when the , Automatically pause worker Thread execution .
impl Drop for Pool {
fn drop(&mut self) {
for _ in 0..self.max_workers {
self.sender.send(Message::ByeBye).unwrap();
}
for w in self.workers.iter_mut() {
if let Some(t) = w.t.take() {
t.join().unwrap();
}
}
}
}
drop Method uses two loops , Instead of doing two things in a cycle ?
for w in self.workers.iter_mut() {
if let Some(t) = w.t.take() {
self.sender.send(Message::ByeBye).unwrap();
t.join().unwrap();
}
}
There is a trap that can cause deadlock , For example, two. Worker, Iterate over all... In a single loop Worker, After sending the termination information to the channel , Call directly join,
We expect to be the first worker To receive a message , And when he's done . When the situation may be the second worker Got the message , first worker No access to , What's next join It will block and cause deadlock .
Notice no ,Worker Is packaged in Option Internal , Here are two points to note
- t.join Need to hold t The ownership of the
- In our case ,self.workers Can only be used as a reference for Loop iteration .
Let's consider Worker hold Option<JoinHandle<()>>
, Follow up can be done by Option
On the call take
Methods will Some
The value of the variant is moved out , And leave it in its original position None variant .
In other words , Let the running worker
hold Some
A variation of the , clear worker
when , have access to None
Replace Some
, So that Worker Lose a thread that can run
struct Worker where
{
_id: usize,
t: Option<JoinHandle<()>>,
}
4 Summary of key points
Mutex
Rely on lifecycle management to release locks , When using, pay attention to whether the lock is overdueVec<Option<T>>
It can solve the need of T The scene of ownership
5 Complete code
use std::thread::{
self, JoinHandle};
use std::sync::{
Arc, mpsc, Mutex};
type Job = Box<dyn FnOnce() + "static + Send>;
enum Message {
ByeBye,
NewJob(Job),
}
struct Worker where
{
_id: usize,
t: Option<JoinHandle<()>>,
}
impl Worker
{
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
let t = thread::spawn( move || {
loop {
let message = receiver.lock().unwrap().recv().unwrap();
match message {
Message::NewJob(job) => {
println!("do job from worker[{}]", id);
job();
},
Message::ByeBye => {
println!("ByeBye from worker[{}]", id);
break
},
}
}
});
Worker {
_id: id,
t: Some(t),
}
}
}
pub struct Pool {
workers: Vec<Worker>,
max_workers: usize,
sender: mpsc::Sender<Message>
}
impl Pool where {
pub fn new(max_workers: usize) -> Pool {
if max_workers == 0 {
panic!("max_workers must be greater than zero!")
}
let (tx, rx) = mpsc::channel();
let mut workers = Vec::with_capacity(max_workers);
let receiver = Arc::new(Mutex::new(rx));
for i in 0..max_workers {
workers.push(Worker::new(i, Arc::clone(&receiver)));
}
Pool {
workers: workers, max_workers: max_workers, sender: tx }
}
pub fn execute<F>(&self, f:F) where F: FnOnce() + "static + Send
{
let job = Message::NewJob(Box::new(f));
self.sender.send(job).unwrap();
}
}
impl Drop for Pool {
fn drop(&mut self) {
for _ in 0..self.max_workers {
self.sender.send(Message::ByeBye).unwrap();
}
for w in self.workers {
if let Some(t) = w.t.take() {
t.join().unwrap();
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn it_works() {
let p = Pool::new(4);
p.execute(|| println!("do new job1"));
p.execute(|| println!("do new job2"));
p.execute(|| println!("do new job3"));
p.execute(|| println!("do new job4"));
}
}
original text :rust actual combat - Implement a thread work pool ThreadPool
author :firefantasy
source :CSDN—— Chinese programmer Forum
版权声明
本文为[Xu Yeping]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204230544498195.html
边栏推荐
- MySQL_ 01_ Simple data retrieval
- Implementation of k8s redis one master multi slave dynamic capacity expansion
- Crawl lottery data
- Go的Gin框架学习
- C language loop structure program
- Nat Commun|在生物科学领域应用深度学习的当前进展和开放挑战
- [UDS unified diagnostic service] IV. typical diagnostic service (4) - online programming function unit (0x34-0x38)
- C byte array (byte []) and string are converted to each other
- 解决报错max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
- 2022 Jiangxi Photovoltaic Exhibition, China distributed Photovoltaic Exhibition, Nanchang solar energy utilization Exhibition
猜你喜欢
.104History
Go对文件操作
From source code to executable file
2022 Jiangxi Photovoltaic Exhibition, China distributed Photovoltaic Exhibition, Nanchang solar energy utilization Exhibition
Docker installation MySQL
Halo 开源项目学习(二):实体类与数据表
Random number generation of C #
Remember using Ali Font Icon Library for the first time
Go language JSON package usage
Docker 安装 MySQL
随机推荐
Summary of floating point double precision, single precision and half precision knowledge
C1 notes [task training chapter I]
Reptile efficiency improvement method
587. Install fence / Sword finger offer II 014 Anagrams in strings
Detailed deployment of flask project
读取excel,int 数字时间转时间
Anchor location - how to set the distance between the anchor and the top of the page. The anchor is located and offset from the top
Fashion classification case based on keras
2022 Jiangxi energy storage technology exhibition, China Battery exhibition, power battery exhibition and fuel cell Exhibition
C language loop structure program
Crawl lottery data
Yolov4 pruning [with code]
Go语言JSON包使用
Flash operates on multiple databases
Nat Commun|在生物科学领域应用深度学习的当前进展和开放挑战
Read excel, int digital time to time
Process management command
Array rotation
2022 judgment questions and answers for operation of refrigeration and air conditioning equipment
Install pyshp Library