当前位置:网站首页>Rust:如何实现一个线程池?
Rust:如何实现一个线程池?
2022-04-23 05:45:00 【许野平】
本文转自CSDN博文《rust 实战 - 实现一个线程工作池 ThreadPool》,作者:firefantasy。
这是我读过的最简单、描述最清晰的 rust 线程池设计原理和示范代码,在此感谢作者辛勤付出,并向读者推荐该文。
如何实现一个线程池
线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。
如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。
1 线程池 Pool
pub struct Pool {
max_workers: usize, // 定义最大线程数
}
impl Pool {
fn new(max_workers: usize) -> Pool {
}
fn execute<F>(&self, f:F) where F: FnOnce() + "static + Send {
}
}
用 execute 来执行任务,F: FnOnce() + "static + Send 是使用 thread::spawn 线程执行需要满足的 trait, 代表F是一个能在线程里执行的闭包函数。
另一点自然而然会想到在 Pool 添加一个线程数组, 这个线程数组就是用来执行任务的。比如Vec<Thread> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。
可以看作在一个线程里不断执行获取任务并执行的 Worker。
struct Worker where
{
_id: usize, // worker 编号
}
要怎么把任务发送给 Worker 执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,let (tx, rx) = mpsc::channel() 可以获取到一对发送端和接收端。
把发送端添加到 Pool 里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。
这里有一点需要特别注意,channel 的接收端 receiver 需要安全的在多个线程间共享,因此需要用Arc<Mutex::<T>> 来包裹起来,也就是用锁来解决并发冲突。
2 Pool 的完整定义
pub struct Pool {
workers: Vec<Worker>,
max_workers: usize,
sender: mpsc::Sender<Message>
}
该是时候定义我们要发给Worker的消息Message了
定义如下的枚举值
type Job = Box<dyn FnOnce() + "static + Send>;
enum Message {
ByeBye,
NewJob(Job),
}
Job 是一个要发送给 Worker 执行的闭包函数,这里 ByeBye 用来通知 Worker 可以终止当前的执行,退出线程。
只剩下实现 Worker 和 Pool 的具体逻辑了。
3 Worker的实现
impl Worker
{
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
let t = thread::spawn( move || {
loop {
let receiver = receiver.lock().unwrap();
let message= receiver.recv().unwrap();
match message {
Message::NewJob(job) => {
println!("do job from worker[{}]", id);
job();
},
Message::ByeBye => {
println!("ByeBye from worker[{}]", id);
break
},
}
}
});
Worker {
_id: id,
t: Some(t),
}
}
}
let message = receiver.lock().unwrap().recv().unwrap(); 这里获取锁后从receiver获取到消息体,然后 let message 结束后 rust 的生命周期会自动释放掉锁。
但如果写成
while let message = receiver.lock().unwrap().recv().unwrap() {
};
while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面 let message 要锁定久时间。
rust 的 mutex 锁没有对应的unlock方法,由 mutex 的生命周期管理。
我们给 Pool 实现 Drop trait, 让 Pool 被销毁时,自动暂停掉 worker 线程的执行。
impl Drop for Pool {
fn drop(&mut self) {
for _ in 0..self.max_workers {
self.sender.send(Message::ByeBye).unwrap();
}
for w in self.workers.iter_mut() {
if let Some(t) = w.t.take() {
t.join().unwrap();
}
}
}
}
drop方法里面用了两个循环,而不是在一个循环里做完两件事?
for w in self.workers.iter_mut() {
if let Some(t) = w.t.take() {
self.sender.send(Message::ByeBye).unwrap();
t.join().unwrap();
}
}
这里面隐藏了一个会造成死锁的陷阱,比如两个 Worker, 在单个循环里面迭代所有 Worker,再将终止信息发送给通道后,直接调用 join,
我们预期是第一个 worker 要收到消息,并且等他执行完。当情况可能是第二个 worker 获取到了消息,第一个worker没有获取到,那接下来的 join 就会阻塞造成死锁。
注意到没有,Worker 是被包装在 Option 内的,这里有两个点需要注意
- t.join 需要持有 t 的所有权
- 在我们这种情况下,self.workers 只能作为引用被for循环迭代。
这里考虑让 Worker 持有 Option<JoinHandle<()>>,后续可以通过在 Option 上调用 take方法将 Some 变体的值移出来,并在原来的位置留下 None 变体。
换而言之,让运行中的 worker 持有 Some 的变体,清理 worker 时,可以使用 None 替换掉 Some,从而让 Worker 失去可以运行的线程
struct Worker where
{
_id: usize,
t: Option<JoinHandle<()>>,
}
4 要点总结
Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁Vec<Option<T>>可以解决某些情况下需要T所有权的场景
5 完整代码
use std::thread::{
self, JoinHandle};
use std::sync::{
Arc, mpsc, Mutex};
type Job = Box<dyn FnOnce() + "static + Send>;
enum Message {
ByeBye,
NewJob(Job),
}
struct Worker where
{
_id: usize,
t: Option<JoinHandle<()>>,
}
impl Worker
{
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
let t = thread::spawn( move || {
loop {
let message = receiver.lock().unwrap().recv().unwrap();
match message {
Message::NewJob(job) => {
println!("do job from worker[{}]", id);
job();
},
Message::ByeBye => {
println!("ByeBye from worker[{}]", id);
break
},
}
}
});
Worker {
_id: id,
t: Some(t),
}
}
}
pub struct Pool {
workers: Vec<Worker>,
max_workers: usize,
sender: mpsc::Sender<Message>
}
impl Pool where {
pub fn new(max_workers: usize) -> Pool {
if max_workers == 0 {
panic!("max_workers must be greater than zero!")
}
let (tx, rx) = mpsc::channel();
let mut workers = Vec::with_capacity(max_workers);
let receiver = Arc::new(Mutex::new(rx));
for i in 0..max_workers {
workers.push(Worker::new(i, Arc::clone(&receiver)));
}
Pool {
workers: workers, max_workers: max_workers, sender: tx }
}
pub fn execute<F>(&self, f:F) where F: FnOnce() + "static + Send
{
let job = Message::NewJob(Box::new(f));
self.sender.send(job).unwrap();
}
}
impl Drop for Pool {
fn drop(&mut self) {
for _ in 0..self.max_workers {
self.sender.send(Message::ByeBye).unwrap();
}
for w in self.workers {
if let Some(t) = w.t.take() {
t.join().unwrap();
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn it_works() {
let p = Pool::new(4);
p.execute(|| println!("do new job1"));
p.execute(|| println!("do new job2"));
p.execute(|| println!("do new job3"));
p.execute(|| println!("do new job4"));
}
}
原文:rust 实战 - 实现一个线程工作池 ThreadPool
作者:firefantasy
来源:CSDN——中国程序员论坛
版权声明
本文为[许野平]所创,转载请带上原文链接,感谢
https://yeping.blog.csdn.net/article/details/123322851
边栏推荐
- Record the installation and configuration of gestermer on TX2, and then use GST RTSP server
- 11.a==b?
- ThreadLocal. Threadlocalmap analysis
- Common sense of thread pool
- 从源代码到可执行文件的过程
- Addition, deletion, modification and query of MySQL table
- Custom exception class
- Why does the subscript of the array start from 0 instead of 1?
- The onnx model of yolov5 removes the transfer layer
- Generate excel template (drop-down selection, multi-level linkage)
猜你喜欢

Kibana search syntax

Algèbre linéaire chapitre 2 - matrice et son fonctionnement

Installation and usage skills of idea
![Unsupervised denoising - [tmi2022] ISCL: dependent self cooperative learning for unpaired image denoising](/img/cd/10793445e6867eeee613b6ba4b85cf.png)
Unsupervised denoising - [tmi2022] ISCL: dependent self cooperative learning for unpaired image denoising

List segmentation best practices

C language file operation

GDAL+OGR学习

Guaba and Computational Geometry

Import of data
![How to use comparative learning to do unsupervised - [cvpr22] training & [eccv20] image translation](/img/33/780b80693f70112eebc10941f7c134.png)
How to use comparative learning to do unsupervised - [cvpr22] training & [eccv20] image translation
随机推荐
Basic knowledge of network in cloud computing
渔网道路密度计算
Kalman filter and inertial integrated navigation
POI and easyexcel exercises
C3p0 database connection pool usage
9.Life, the Universe, and Everything
[leetcode 67] sum of two binary numbers
SQL -- data definition
ThreadLocal. Threadlocalmap analysis
Integration and induction of knowledge points of automatic control principle (Han min version)
[leetcode 290] word rules
Database - sorting data
2. Average length of words
5.The Simple Problem
Rainbow (DP)
卡尔曼滤波与惯性组合导航
Type conversion in C #
3. Continuous integer
Fact final variable and final variable
Reading of denoising papers - [cvpr2022] blind2blind: self supervised image denoising with visible blind spots