当前位置:网站首页>Rust: how to implement a thread pool?
Rust: how to implement a thread pool?
2022-04-23 18:04:00 【Xu Yeping】
In this paper, from CSDN post 《rust actual combat - Implement a thread work pool ThreadPool》, author :firefantasy.
This is the simplest I've ever read 、 The clearest description rust Thread pool design principle and demonstration code , Thank the author for his hard work , And recommend the article to readers .
How to implement a thread pool
Thread pool : A thread usage mode . Too many threads cause scheduling overhead , And then affect cache locality and overall performance . The thread pool maintains multiple threads , Waiting for supervisor to assign concurrent tasks . This avoids the cost of creating and destroying threads when processing short tasks . The thread pool not only ensures full utilization of the kernel , It also prevents overscheduling . The number of available threads should depend on the number of concurrent processors available 、 Processor kernel 、 Memory 、 The Internet sockets Such as the number of . for example , For computing intensive tasks , Number of threads cpu Number +2 More appropriate , Too many threads can cause extra thread switching overhead .
How to define thread pool Pool Well , First, the maximum number of threads must be an attribute of the thread pool , And in new Pool Creates the specified thread when .
1 Thread pool Pool
pub struct Pool {
max_workers: usize, // Define the maximum number of threads
}
impl Pool {
fn new(max_workers: usize) -> Pool {
}
fn execute<F>(&self, f:F) where F: FnOnce() + "static + Send {
}
}
use execute To perform the task ,F: FnOnce() + "static + Send
It's using thread::spawn
Thread execution needs to meet trait
, representative F Is a closure function that can be executed in a thread .
Another point naturally comes to mind in Pool Add an array of threads , This thread array is used to execute tasks . such as Vec<Thread> balabala
. The thread here is alive , It is an entity that constantly accepts tasks and then executes them .
It can be seen as a thread that continuously executes the acquisition task and executes Worker.
struct Worker where
{
_id: usize, // worker Number
}
How to send the task to Worker How about execution ?mpsc(multi producer single consumer) Multiple producers and single consumers can meet our needs ,let (tx, rx) = mpsc::channel()
A pair of sender and receiver can be obtained .
Add sender to Pool Inside , Add the receiver to Worker Inside .Pool adopt channel Send tasks to multiple worker Consumption execution .
There is one point that needs special attention ,channel The receiving end of receiver It needs to be shared safely among multiple threads , So we need to use Arc<Mutex::<T>>
Come and wrap it up , That is to use locks to solve concurrency conflicts .
2 Pool Complete definition
pub struct Pool {
workers: Vec<Worker>,
max_workers: usize,
sender: mpsc::Sender<Message>
}
It's time to define what we're going to send Worker The news of Message 了
Define the following enumeration values
type Job = Box<dyn FnOnce() + "static + Send>;
enum Message {
ByeBye,
NewJob(Job),
}
Job Is a message to send to Worker The closure function executed , here ByeBye Used to inform Worker You can terminate the current execution , Exit thread .
Only the implementation is left Worker and Pool The concrete logic of .
3 Worker The implementation of the
impl Worker
{
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
let t = thread::spawn( move || {
loop {
let receiver = receiver.lock().unwrap();
let message= receiver.recv().unwrap();
match message {
Message::NewJob(job) => {
println!("do job from worker[{}]", id);
job();
},
Message::ByeBye => {
println!("ByeBye from worker[{}]", id);
break
},
}
}
});
Worker {
_id: id,
t: Some(t),
}
}
}
let message = receiver.lock().unwrap().recv().unwrap();
Get the lock from here receiver Get the message body , then let message After the end rust The lock is automatically released during the life cycle of the .
But if it's written as
while let message = receiver.lock().unwrap().recv().unwrap() {
};
while let The whole parenthesis is a scope , After the scope ends , The lock will release , Better than the top let message Lock for a long time .
rust Of mutex The lock has no corresponding unlock Method , from mutex Life cycle management of .
We give Pool Realization Drop trait, Give Way Pool Be destroyed when the , Automatically pause worker Thread execution .
impl Drop for Pool {
fn drop(&mut self) {
for _ in 0..self.max_workers {
self.sender.send(Message::ByeBye).unwrap();
}
for w in self.workers.iter_mut() {
if let Some(t) = w.t.take() {
t.join().unwrap();
}
}
}
}
drop Method uses two loops , Instead of doing two things in a cycle ?
for w in self.workers.iter_mut() {
if let Some(t) = w.t.take() {
self.sender.send(Message::ByeBye).unwrap();
t.join().unwrap();
}
}
There is a trap that can cause deadlock , For example, two. Worker, Iterate over all... In a single loop Worker, After sending the termination information to the channel , Call directly join,
We expect to be the first worker To receive a message , And when he's done . When the situation may be the second worker Got the message , first worker No access to , What's next join It will block and cause deadlock .
Notice no ,Worker Is packaged in Option Internal , Here are two points to note
- t.join Need to hold t The ownership of the
- In our case ,self.workers Can only be used as a reference for Loop iteration .
Let's consider Worker hold Option<JoinHandle<()>>
, Follow up can be done by Option
On the call take
Methods will Some
The value of the variant is moved out , And leave it in its original position None variant .
In other words , Let the running worker
hold Some
A variation of the , clear worker
when , have access to None
Replace Some
, So that Worker Lose a thread that can run
struct Worker where
{
_id: usize,
t: Option<JoinHandle<()>>,
}
4 Summary of key points
Mutex
Rely on lifecycle management to release locks , When using, pay attention to whether the lock is overdueVec<Option<T>>
It can solve the need of T The scene of ownership
5 Complete code
use std::thread::{
self, JoinHandle};
use std::sync::{
Arc, mpsc, Mutex};
type Job = Box<dyn FnOnce() + "static + Send>;
enum Message {
ByeBye,
NewJob(Job),
}
struct Worker where
{
_id: usize,
t: Option<JoinHandle<()>>,
}
impl Worker
{
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
let t = thread::spawn( move || {
loop {
let message = receiver.lock().unwrap().recv().unwrap();
match message {
Message::NewJob(job) => {
println!("do job from worker[{}]", id);
job();
},
Message::ByeBye => {
println!("ByeBye from worker[{}]", id);
break
},
}
}
});
Worker {
_id: id,
t: Some(t),
}
}
}
pub struct Pool {
workers: Vec<Worker>,
max_workers: usize,
sender: mpsc::Sender<Message>
}
impl Pool where {
pub fn new(max_workers: usize) -> Pool {
if max_workers == 0 {
panic!("max_workers must be greater than zero!")
}
let (tx, rx) = mpsc::channel();
let mut workers = Vec::with_capacity(max_workers);
let receiver = Arc::new(Mutex::new(rx));
for i in 0..max_workers {
workers.push(Worker::new(i, Arc::clone(&receiver)));
}
Pool {
workers: workers, max_workers: max_workers, sender: tx }
}
pub fn execute<F>(&self, f:F) where F: FnOnce() + "static + Send
{
let job = Message::NewJob(Box::new(f));
self.sender.send(job).unwrap();
}
}
impl Drop for Pool {
fn drop(&mut self) {
for _ in 0..self.max_workers {
self.sender.send(Message::ByeBye).unwrap();
}
for w in self.workers {
if let Some(t) = w.t.take() {
t.join().unwrap();
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn it_works() {
let p = Pool::new(4);
p.execute(|| println!("do new job1"));
p.execute(|| println!("do new job2"));
p.execute(|| println!("do new job3"));
p.execute(|| println!("do new job4"));
}
}
original text :rust actual combat - Implement a thread work pool ThreadPool
author :firefantasy
source :CSDN—— Chinese programmer Forum
版权声明
本文为[Xu Yeping]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204230544498195.html
边栏推荐
- 由tcl脚本生成板子对应的vivado工程
- Jenkspy package installation
- Go language JSON package usage
- ES6
- _ FindText error
- Go的Gin框架学习
- C language input and output (printf and scanf functions, putchar and getchar functions)
- Operation of 2022 mobile crane driver national question bank simulation examination platform
- cartographer_ There is no problem compiling node, but running the bug that hangs directly
- Auto. JS custom dialog box
猜你喜欢
Nodejs安装
Gobang game based on pyGame Library
Dock installation redis
7-21 wrong questions involve knowledge points.
GDAL + ogr learning
Data stream encryption and decryption of C
Halo open source project learning (II): entity classes and data tables
C# 的数据流加密与解密
Implementation of k8s redis one master multi slave dynamic capacity expansion
2022江西储能技术展会,中国电池展,动力电池展,燃料电池展
随机推荐
C [file operation] read TXT text by line
2022 Jiangxi Photovoltaic Exhibition, China distributed Photovoltaic Exhibition, Nanchang solar energy utilization Exhibition
Nanotechnology + AI enabled proteomics | Luomi life technology completed nearly ten million US dollars of financing
C language implements memcpy, memset, strcpy, strncpy, StrCmp, strncmp and strlen
ArcGIS table to excel exceeds the upper limit, conversion failed
Clion installation tutorial
re正則錶達式
2022 tea artist (primary) examination simulated 100 questions and simulated examination
Re expression régulière
Flash - Middleware
[UDS unified diagnostic service] (Supplement) v. detailed explanation of ECU bootloader development points (2)
587. Install fence / Sword finger offer II 014 Anagrams in strings
Qt读写XML文件(含源码+注释)
Install pyshp Library
C language array processing batch data
Halo open source project learning (II): entity classes and data tables
SSD硬盘SATA接口和M.2接口区别(详细)总结
Eigen learning summary
Identification verification code
消费者灰度实现思路