当前位置:网站首页>Rust: how to implement a thread pool?
Rust: how to implement a thread pool?
2022-04-23 18:04:00 【Xu Yeping】
In this paper, from CSDN post 《rust actual combat - Implement a thread work pool ThreadPool》, author :firefantasy.
This is the simplest I've ever read 、 The clearest description rust Thread pool design principle and demonstration code , Thank the author for his hard work , And recommend the article to readers .
How to implement a thread pool
Thread pool : A thread usage mode . Too many threads cause scheduling overhead , And then affect cache locality and overall performance . The thread pool maintains multiple threads , Waiting for supervisor to assign concurrent tasks . This avoids the cost of creating and destroying threads when processing short tasks . The thread pool not only ensures full utilization of the kernel , It also prevents overscheduling . The number of available threads should depend on the number of concurrent processors available 、 Processor kernel 、 Memory 、 The Internet sockets Such as the number of . for example , For computing intensive tasks , Number of threads cpu Number +2 More appropriate , Too many threads can cause extra thread switching overhead .
How to define thread pool Pool Well , First, the maximum number of threads must be an attribute of the thread pool , And in new Pool Creates the specified thread when .
1 Thread pool Pool
pub struct Pool {
max_workers: usize, // Define the maximum number of threads
}
impl Pool {
fn new(max_workers: usize) -> Pool {
}
fn execute<F>(&self, f:F) where F: FnOnce() + "static + Send {
}
}
use execute To perform the task ,F: FnOnce() + "static + Send
It's using thread::spawn
Thread execution needs to meet trait
, representative F Is a closure function that can be executed in a thread .
Another point naturally comes to mind in Pool Add an array of threads , This thread array is used to execute tasks . such as Vec<Thread> balabala
. The thread here is alive , It is an entity that constantly accepts tasks and then executes them .
It can be seen as a thread that continuously executes the acquisition task and executes Worker.
struct Worker where
{
_id: usize, // worker Number
}
How to send the task to Worker How about execution ?mpsc(multi producer single consumer) Multiple producers and single consumers can meet our needs ,let (tx, rx) = mpsc::channel()
A pair of sender and receiver can be obtained .
Add sender to Pool Inside , Add the receiver to Worker Inside .Pool adopt channel Send tasks to multiple worker Consumption execution .
There is one point that needs special attention ,channel The receiving end of receiver It needs to be shared safely among multiple threads , So we need to use Arc<Mutex::<T>>
Come and wrap it up , That is to use locks to solve concurrency conflicts .
2 Pool Complete definition
pub struct Pool {
workers: Vec<Worker>,
max_workers: usize,
sender: mpsc::Sender<Message>
}
It's time to define what we're going to send Worker The news of Message 了
Define the following enumeration values
type Job = Box<dyn FnOnce() + "static + Send>;
enum Message {
ByeBye,
NewJob(Job),
}
Job Is a message to send to Worker The closure function executed , here ByeBye Used to inform Worker You can terminate the current execution , Exit thread .
Only the implementation is left Worker and Pool The concrete logic of .
3 Worker The implementation of the
impl Worker
{
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
let t = thread::spawn( move || {
loop {
let receiver = receiver.lock().unwrap();
let message= receiver.recv().unwrap();
match message {
Message::NewJob(job) => {
println!("do job from worker[{}]", id);
job();
},
Message::ByeBye => {
println!("ByeBye from worker[{}]", id);
break
},
}
}
});
Worker {
_id: id,
t: Some(t),
}
}
}
let message = receiver.lock().unwrap().recv().unwrap();
Get the lock from here receiver Get the message body , then let message After the end rust The lock is automatically released during the life cycle of the .
But if it's written as
while let message = receiver.lock().unwrap().recv().unwrap() {
};
while let The whole parenthesis is a scope , After the scope ends , The lock will release , Better than the top let message Lock for a long time .
rust Of mutex The lock has no corresponding unlock Method , from mutex Life cycle management of .
We give Pool Realization Drop trait, Give Way Pool Be destroyed when the , Automatically pause worker Thread execution .
impl Drop for Pool {
fn drop(&mut self) {
for _ in 0..self.max_workers {
self.sender.send(Message::ByeBye).unwrap();
}
for w in self.workers.iter_mut() {
if let Some(t) = w.t.take() {
t.join().unwrap();
}
}
}
}
drop Method uses two loops , Instead of doing two things in a cycle ?
for w in self.workers.iter_mut() {
if let Some(t) = w.t.take() {
self.sender.send(Message::ByeBye).unwrap();
t.join().unwrap();
}
}
There is a trap that can cause deadlock , For example, two. Worker, Iterate over all... In a single loop Worker, After sending the termination information to the channel , Call directly join,
We expect to be the first worker To receive a message , And when he's done . When the situation may be the second worker Got the message , first worker No access to , What's next join It will block and cause deadlock .
Notice no ,Worker Is packaged in Option Internal , Here are two points to note
- t.join Need to hold t The ownership of the
- In our case ,self.workers Can only be used as a reference for Loop iteration .
Let's consider Worker hold Option<JoinHandle<()>>
, Follow up can be done by Option
On the call take
Methods will Some
The value of the variant is moved out , And leave it in its original position None variant .
In other words , Let the running worker
hold Some
A variation of the , clear worker
when , have access to None
Replace Some
, So that Worker Lose a thread that can run
struct Worker where
{
_id: usize,
t: Option<JoinHandle<()>>,
}
4 Summary of key points
Mutex
Rely on lifecycle management to release locks , When using, pay attention to whether the lock is overdueVec<Option<T>>
It can solve the need of T The scene of ownership
5 Complete code
use std::thread::{
self, JoinHandle};
use std::sync::{
Arc, mpsc, Mutex};
type Job = Box<dyn FnOnce() + "static + Send>;
enum Message {
ByeBye,
NewJob(Job),
}
struct Worker where
{
_id: usize,
t: Option<JoinHandle<()>>,
}
impl Worker
{
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
let t = thread::spawn( move || {
loop {
let message = receiver.lock().unwrap().recv().unwrap();
match message {
Message::NewJob(job) => {
println!("do job from worker[{}]", id);
job();
},
Message::ByeBye => {
println!("ByeBye from worker[{}]", id);
break
},
}
}
});
Worker {
_id: id,
t: Some(t),
}
}
}
pub struct Pool {
workers: Vec<Worker>,
max_workers: usize,
sender: mpsc::Sender<Message>
}
impl Pool where {
pub fn new(max_workers: usize) -> Pool {
if max_workers == 0 {
panic!("max_workers must be greater than zero!")
}
let (tx, rx) = mpsc::channel();
let mut workers = Vec::with_capacity(max_workers);
let receiver = Arc::new(Mutex::new(rx));
for i in 0..max_workers {
workers.push(Worker::new(i, Arc::clone(&receiver)));
}
Pool {
workers: workers, max_workers: max_workers, sender: tx }
}
pub fn execute<F>(&self, f:F) where F: FnOnce() + "static + Send
{
let job = Message::NewJob(Box::new(f));
self.sender.send(job).unwrap();
}
}
impl Drop for Pool {
fn drop(&mut self) {
for _ in 0..self.max_workers {
self.sender.send(Message::ByeBye).unwrap();
}
for w in self.workers {
if let Some(t) = w.t.take() {
t.join().unwrap();
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn it_works() {
let p = Pool::new(4);
p.execute(|| println!("do new job1"));
p.execute(|| println!("do new job2"));
p.execute(|| println!("do new job3"));
p.execute(|| println!("do new job4"));
}
}
original text :rust actual combat - Implement a thread work pool ThreadPool
author :firefantasy
source :CSDN—— Chinese programmer Forum
版权声明
本文为[Xu Yeping]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204230544498195.html
边栏推荐
- [UDS unified diagnostic service] (Supplement) v. detailed explanation of ECU bootloader development points (2)
- Implementation of k8s redis one master multi slave dynamic capacity expansion
- 解决报错max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
- Amount input box, used for recharge and withdrawal
- Thirteen documents in software engineering
- I/O多路复用及其相关详解
- .104History
- 2022 tea artist (primary) examination simulated 100 questions and simulated examination
- Realsense selection comparison d455 d435i d415 t265 3D hardware comparison
- re正則錶達式
猜你喜欢
Theory and practice of laser slam in dark blue College - Chapter 2 (odometer calibration)
QTableWidget使用讲解
Re regular expression
C# 的数据流加密与解密
YOLOv4剪枝【附代码】
How to install jsonpath package
Fashion classification case based on keras
解决报错max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
Go对文件操作
Map basemap Library
随机推荐
Flash - Middleware
MySQL_ 01_ Simple data retrieval
_ FindText error
[UDS unified diagnostic service] (Supplement) v. detailed explanation of ECU bootloader development points (1)
Implementation of image recognition code based on VGg convolutional neural network
2022 judgment questions and answers for operation of refrigeration and air conditioning equipment
Detailed deployment of flask project
2022 Jiangxi Photovoltaic Exhibition, China Distributed Photovoltaic Exhibition, Nanchang Solar Energy Utilization Exhibition
.105Location
Generate verification code
Nat Commun|在生物科学领域应用深度学习的当前进展和开放挑战
C language loop structure program
Secure credit
2022江西储能技术展会,中国电池展,动力电池展,燃料电池展
Box pointer of rust
Implement a simple function to calculate the sum of all integers between M ~ n (m < n)
What are the relationships and differences between threads and processes
C byte array (byte []) and string are converted to each other
2022江西光伏展,中國分布式光伏展會,南昌太陽能利用展
Halo open source project learning (II): entity classes and data tables