当前位置:网站首页>Rust: how to implement a thread pool?
Rust: how to implement a thread pool?
2022-04-23 18:04:00 【Xu Yeping】
In this paper, from CSDN post 《rust actual combat - Implement a thread work pool ThreadPool》, author :firefantasy.
This is the simplest I've ever read 、 The clearest description rust Thread pool design principle and demonstration code , Thank the author for his hard work , And recommend the article to readers .
How to implement a thread pool
Thread pool : A thread usage mode . Too many threads cause scheduling overhead , And then affect cache locality and overall performance . The thread pool maintains multiple threads , Waiting for supervisor to assign concurrent tasks . This avoids the cost of creating and destroying threads when processing short tasks . The thread pool not only ensures full utilization of the kernel , It also prevents overscheduling . The number of available threads should depend on the number of concurrent processors available 、 Processor kernel 、 Memory 、 The Internet sockets Such as the number of . for example , For computing intensive tasks , Number of threads cpu Number +2 More appropriate , Too many threads can cause extra thread switching overhead .
How to define thread pool Pool Well , First, the maximum number of threads must be an attribute of the thread pool , And in new Pool Creates the specified thread when .
1 Thread pool Pool
pub struct Pool {
max_workers: usize, // Define the maximum number of threads
}
impl Pool {
fn new(max_workers: usize) -> Pool {
}
fn execute<F>(&self, f:F) where F: FnOnce() + "static + Send {
}
}
use execute To perform the task ,F: FnOnce() + "static + Send It's using thread::spawn Thread execution needs to meet trait, representative F Is a closure function that can be executed in a thread .
Another point naturally comes to mind in Pool Add an array of threads , This thread array is used to execute tasks . such as Vec<Thread> balabala. The thread here is alive , It is an entity that constantly accepts tasks and then executes them .
It can be seen as a thread that continuously executes the acquisition task and executes Worker.
struct Worker where
{
_id: usize, // worker Number
}
How to send the task to Worker How about execution ?mpsc(multi producer single consumer) Multiple producers and single consumers can meet our needs ,let (tx, rx) = mpsc::channel() A pair of sender and receiver can be obtained .
Add sender to Pool Inside , Add the receiver to Worker Inside .Pool adopt channel Send tasks to multiple worker Consumption execution .
There is one point that needs special attention ,channel The receiving end of receiver It needs to be shared safely among multiple threads , So we need to use Arc<Mutex::<T>> Come and wrap it up , That is to use locks to solve concurrency conflicts .
2 Pool Complete definition
pub struct Pool {
workers: Vec<Worker>,
max_workers: usize,
sender: mpsc::Sender<Message>
}
It's time to define what we're going to send Worker The news of Message 了
Define the following enumeration values
type Job = Box<dyn FnOnce() + "static + Send>;
enum Message {
ByeBye,
NewJob(Job),
}
Job Is a message to send to Worker The closure function executed , here ByeBye Used to inform Worker You can terminate the current execution , Exit thread .
Only the implementation is left Worker and Pool The concrete logic of .
3 Worker The implementation of the
impl Worker
{
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
let t = thread::spawn( move || {
loop {
let receiver = receiver.lock().unwrap();
let message= receiver.recv().unwrap();
match message {
Message::NewJob(job) => {
println!("do job from worker[{}]", id);
job();
},
Message::ByeBye => {
println!("ByeBye from worker[{}]", id);
break
},
}
}
});
Worker {
_id: id,
t: Some(t),
}
}
}
let message = receiver.lock().unwrap().recv().unwrap(); Get the lock from here receiver Get the message body , then let message After the end rust The lock is automatically released during the life cycle of the .
But if it's written as
while let message = receiver.lock().unwrap().recv().unwrap() {
};
while let The whole parenthesis is a scope , After the scope ends , The lock will release , Better than the top let message Lock for a long time .
rust Of mutex The lock has no corresponding unlock Method , from mutex Life cycle management of .
We give Pool Realization Drop trait, Give Way Pool Be destroyed when the , Automatically pause worker Thread execution .
impl Drop for Pool {
fn drop(&mut self) {
for _ in 0..self.max_workers {
self.sender.send(Message::ByeBye).unwrap();
}
for w in self.workers.iter_mut() {
if let Some(t) = w.t.take() {
t.join().unwrap();
}
}
}
}
drop Method uses two loops , Instead of doing two things in a cycle ?
for w in self.workers.iter_mut() {
if let Some(t) = w.t.take() {
self.sender.send(Message::ByeBye).unwrap();
t.join().unwrap();
}
}
There is a trap that can cause deadlock , For example, two. Worker, Iterate over all... In a single loop Worker, After sending the termination information to the channel , Call directly join,
We expect to be the first worker To receive a message , And when he's done . When the situation may be the second worker Got the message , first worker No access to , What's next join It will block and cause deadlock .
Notice no ,Worker Is packaged in Option Internal , Here are two points to note
- t.join Need to hold t The ownership of the
- In our case ,self.workers Can only be used as a reference for Loop iteration .
Let's consider Worker hold Option<JoinHandle<()>>, Follow up can be done by Option On the call take Methods will Some The value of the variant is moved out , And leave it in its original position None variant .
In other words , Let the running worker hold Some A variation of the , clear worker when , have access to None Replace Some, So that Worker Lose a thread that can run
struct Worker where
{
_id: usize,
t: Option<JoinHandle<()>>,
}
4 Summary of key points
MutexRely on lifecycle management to release locks , When using, pay attention to whether the lock is overdueVec<Option<T>>It can solve the need of T The scene of ownership
5 Complete code
use std::thread::{
self, JoinHandle};
use std::sync::{
Arc, mpsc, Mutex};
type Job = Box<dyn FnOnce() + "static + Send>;
enum Message {
ByeBye,
NewJob(Job),
}
struct Worker where
{
_id: usize,
t: Option<JoinHandle<()>>,
}
impl Worker
{
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
let t = thread::spawn( move || {
loop {
let message = receiver.lock().unwrap().recv().unwrap();
match message {
Message::NewJob(job) => {
println!("do job from worker[{}]", id);
job();
},
Message::ByeBye => {
println!("ByeBye from worker[{}]", id);
break
},
}
}
});
Worker {
_id: id,
t: Some(t),
}
}
}
pub struct Pool {
workers: Vec<Worker>,
max_workers: usize,
sender: mpsc::Sender<Message>
}
impl Pool where {
pub fn new(max_workers: usize) -> Pool {
if max_workers == 0 {
panic!("max_workers must be greater than zero!")
}
let (tx, rx) = mpsc::channel();
let mut workers = Vec::with_capacity(max_workers);
let receiver = Arc::new(Mutex::new(rx));
for i in 0..max_workers {
workers.push(Worker::new(i, Arc::clone(&receiver)));
}
Pool {
workers: workers, max_workers: max_workers, sender: tx }
}
pub fn execute<F>(&self, f:F) where F: FnOnce() + "static + Send
{
let job = Message::NewJob(Box::new(f));
self.sender.send(job).unwrap();
}
}
impl Drop for Pool {
fn drop(&mut self) {
for _ in 0..self.max_workers {
self.sender.send(Message::ByeBye).unwrap();
}
for w in self.workers {
if let Some(t) = w.t.take() {
t.join().unwrap();
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn it_works() {
let p = Pool::new(4);
p.execute(|| println!("do new job1"));
p.execute(|| println!("do new job2"));
p.execute(|| println!("do new job3"));
p.execute(|| println!("do new job4"));
}
}
original text :rust actual combat - Implement a thread work pool ThreadPool
author :firefantasy
source :CSDN—— Chinese programmer Forum
版权声明
本文为[Xu Yeping]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204230544498195.html
边栏推荐
- Crawler for querying nicknames and avatars based on qqwebapi
- Solving the problem of displaying too many unique values in ArcGIS partition statistics failed
- Docker 安装 Redis
- Implementation of k8s redis one master multi slave dynamic capacity expansion
- Detailed deployment of flask project
- Batch export ArcGIS attribute table
- Operation of 2022 mobile crane driver national question bank simulation examination platform
- C [file operation] read TXT text by line
- Realsense selection comparison d455 d435i d415 t265 3D hardware comparison
- Docker installation MySQL
猜你喜欢

How to install jsonpath package

Installation du docker redis

Qtablewidget usage explanation

.105Location

Cloud native Virtualization: building edge computing instances based on kubevirt

Random number generation of C #

Docker 安装 MySQL

Go file operation

Nat commun | current progress and open challenges of applied deep learning in Bioscience

.104History
随机推荐
Installation du docker redis
Go's gin framework learning
Logic regression principle and code implementation
Selenium + phantom JS crack sliding verification 2
Gaode map search, drag and drop query address
Svn simple operation command
Cross domain settings of Chrome browser -- including new and old versions
Tensorflow tensor introduction
Submit local warehouse and synchronize code cloud warehouse
QTableWidget使用讲解
C1 notes [task training chapter I]
The ultimate experience, the audio and video technology behind the tiktok
cv_ Solution of mismatch between bridge and opencv
Identification verification code
Go的Gin框架学习
Reptile efficiency improvement method
Stanford machine learning course summary
Array rotation
Docker 安装 MySQL
Multi thread crawling Marco Polo network supplier data