当前位置:网站首页>线程池状态+ThreadPoolExecutor
线程池状态+ThreadPoolExecutor
2022-04-22 04:33:00 【Fairy要carry】
目录
线程池执行execute()方法,添加一个任务时,线程池会作出以下判断:
ThreadPoolExecutor中的提交任务(submit+invokeAll+invokeAny)
int中有32位,高3位表示线程状态,低29位表示线程数量;
线程池状态

Running:运行提交并且处理任务;
SHUTDOWN:新任务不接受,以前的还是照常执行;
stop状态:会利用Interrupt方法打断任务,就算你是正在执行的任务,也会被抛弃;
——>当我们调用线程池中的shutdownNow()方法,线程池状态就会由Running变为stop状态
TIDYING状态:线程全部执行完毕了,池中线程为0,马上会进入终结状态;
TERMINDATED:终结状态;

我们会将这些信息(线程状态+线程数量)储存到一个原子变量中;——>目的:减少一次CAS操作;

ThreadPoolExecutor的构造方法
本质上TreadPoolExecutor是实现了ExecutorService接口:
corePoolSize:核心线程数目
maximumPoolSize:最大线程数目
KeepAliveTime:生存时间——针对救急线程——>(用于高峰期一过)
unit:时间单位-针对救急线程
threadFactory:线程工厂
handler:拒绝策略
workQueue:任务队列
救急线程:
当并发量足够大的时候,(阻塞队列都装不下),就会考虑救急线程来执行任务;但是执行任务完之后,会在指定的生存时间过后挂掉;
条件:
满足有界队列,也就是说当你的线程池中的所有线程都在忙时,新进的任务需要再阻塞队列中进行等待;——>好处:有利于降低CPU的使用,以及降低上下文切换;

线程池执行execute()方法,添加一个任务时,线程池会作出以下判断:
1.如果有空闲线程,则直接执行该任务;
2.如果没有空闲线程,且当前运行的线程数少于corePoolSize,则创建新的线程执行该任务;
3.如果没有空闲线程,且当前的线程数等于corePoolSize,同时阻塞队列未满,则将任务入队列,而不添加新的线程;
4.如果没有空闲线程,且阻塞队列已满,同时池中的线程数小于maximumPoolSize ,则创建新的线程执行任务;
5.如果没有空闲线程,且阻塞队列已满,同时池中的线程数等于maximumPoolSize ,则根据构造函数中的 handler 指定的策略来拒绝新的任务。
拒绝策略:
当任务不断过来,但我们的线程又都在忙的时候,我们就采取的是拒绝策略(救急线程是不是也是一种拒绝策略呢?)
在ThreadPoolExecutor提供四种处理策略;

1、AbortPolicy:处理任务遭到拒绝抛出异常,意思就是放弃以保证正常运行;
2、CallerRunsPolicy:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制能够减缓新任务的提交速度。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); }}
这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行。(开始我总不想丢弃任务的执行,但是对某些应用场景来讲,很有可能造成当前线程也被阻塞。如果所有线程都是不能执行的,很可能导致程序没法继续跑了。需要视业务情景而定吧。)
3.DiscardPolicy:不能执行的任务将被删除
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
这种策略和AbortPolicy几乎一样,也是丢弃任务,只不过他不抛出异常。
4.DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) {e.getQueue().poll();e.execute(r); }}
该策略就稍微复杂一些,在pool没有关闭的前提下首先丢掉缓存在队列中的最早的任务,然后重新尝试运行该任务。这个策略需要适当小心。
场景:
Dubbo:会记录日志,方便查找信息;
Netty:创建一个新的线程来执行任务;
PinPoint:尝试每一种策略;
ActiveMQ:任务会进行一个超时等待(60s),如果在这期间有其他任务被执行完了,那么任务就可以执行,否则挂掉;
workQueue任务队列:
作用:决定了缓存任务的排队策略2.
ThreadPoolExecutor线程池提供了三种等待队列:1.synchronousQueue、2.LinkedBlockingQueue、3.ArrayBlockingQueue
有界队列:
synchronousQueue:每次插入任务的时候,必须等待另一个线程调用移除该任务队列中的任务,否则阻塞状态;
ArrayBlockingQueue:(由数组支持的)按照先进先出的FIFO的规则进行排序;
无界队列:
LinkedBlockingQueue:可以指定也可以不指定容量,最大Integer.MAX_VALUE
newCachedThreadPool

特点:
核心线程数为0,最大为Integer.MAX_VALUE,救急线程空闲生存时间为60S;
说明可以无限创建救急线程;
里面阻塞队列中的任务队列用的是synchronousQueue:意思就是只有先看到了线程来了,任务才会给它,意思就是双向奔赴,因为它是没有容量的,放不进任务;
场景与评价:

线程池常用方法

1. getCorePoolSize():返回线程池的核心线程数,这个值是一直不变的,返回在构造函数中设置的coreSize大小;
2.getMaximumPoolSize():返回线程池的最大线程数,这个值是一直不变的,返回在构造函数中设置的coreSize大小;
3.getLargestPoolSize():记录了曾经出现的最大线程个数(水位线);
4.getPoolSize():线程池中当前线程的数量;
5.getActiveCount():Returns the approximate(近似) number of threads that are actively executing tasks;
6.prestartAllCoreThreads():会启动所有核心线程,无论是否有待执行的任务,线程池都会创建新的线程,直到池中线程数量达到 corePoolSize;
7.prestartCoreThread():会启动一个核心线程(同上);
allowCoreThreadTimeOut(true):允许核心线程在KeepAliveTime时间后,退出;
线程池底层接口ExecutorService接口
之前我们所说的ThreadPoolExecutor是该接口的实现类
该接口常用方法:
1.Future<?> submit(Runnable task):提交Runnable任务到线程池,返回Future对象,由于Runnable没有返回值,也就是说调用Future对象get()方法返回null;
2.<T> Future<T> submit(Callable<T> task):提交Callable任务到线程池,返回Future对象,调用Future对象get()方法可以获取Callable的返回值;
3.<T> Future<T> submit(Runnable task,T result):提交Runnable任务到线程池,返回Future对象,调用Future对象get()方法可以获取Runnable的参数值;

RunnableTest:
public static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 50, 300, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(50),
new ThreadFactory(){ public Thread newThread(Runnable r) {
return new Thread(r, "schema_task_pool_" + r.hashCode());
}}, new ThreadPoolExecutor.DiscardOldestPolicy());
public static void callableTest() {
int a = 1;
//callable
Future<Boolean> future = threadPool.submit(new Callable<Boolean>(){
@Override
public Boolean call() throws Exception {
int b = a + 100;
System.out.println(b);
return true;
}
});
try {
System.out.println("feature.get");
Boolean boolean1 = future.get();
System.out.println(boolean1);
} catch (InterruptedException e) {
System.out.println("InterruptedException...");
e.printStackTrace();
} catch (ExecutionException e) {
System.out.println("execute exception...");
e.printStackTrace();
}
}
public static void runnableTest() {
int a = 1;
//runnable
Future<?> future1 = threadPool.submit(new Runnable(){
@Override
public void run() {
int b = a + 100;
System.out.println(b);
}
});
try {
System.out.println("feature.get");
Object x = future1.get(900,TimeUnit.MILLISECONDS);
System.out.println(x);//null
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
System.out.println("execute exception...");
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
问:submit和execute都是提交任务,有什么区别?
submit:可以接收两种参数类型->Runnable+Callable,有返回值
execute:一种Runnable类型,并且execute没有返回值
继续探究:为啥要有Runnable与Callable两种方式呢?
因为Callable有返回值,所以可以抛出异常,外部消化(与FutureTask配合使用可以获取异步执行结果)
而Runnable没有返回结果,如果出现异常只能内部消化
newSingleThreadExecutor

单线程执行器:
使用场景:
希望多个任务排队执行,就跟串行性质一样,当任务>1,执行完毕就会被放到无界队列排队,任务执行完毕,这唯一执行任务的线程也不会释放;
问:自己创建一个单线程和线程池里面的一个单线程有什么区别?
自己创建的单线程串行执行任务,如果任务执行出现异常,线程就无了,没有任何补救措施;
线程池中的单个线程,你出现异常他会有补救措施,创建一个新的线程,因为任务并没有执,所以他会创建一个新的线程;
newSingleThreadExecutor:线程数始终为1,不可修改
这个单线程执行器,他不会将线程池对象直接返回,而是用了一个包装,也就是说没有暴露实现类的特有方法(ThreadPoolExecutor实现类),暴露的是ExecutorService接口;

newFixedThreadPool:
对外暴露的就是ThreadPoolExecutor对象,可以修改线程池中的方法;
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

区别:
package com.example.juc.ThreadPool.FixAndSingle;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author diao 2022/4/21
*/
public class CompareFAS {
public static void main(String[] args) throws InterruptedException {
ExecutorService es1 = Executors.newFixedThreadPool(1);
ExecutorService es2 = Executors.newSingleThreadExecutor();
testThread(es1);
TimeUnit.SECONDS.sleep(1);
testThread(es2);
}
private static void testThread(ExecutorService e){
e.submit(()->{
throw new IllegalStateException("Error");
});
e.submit(()->{
System.out.println("running....");
});
}
}
*: 不难发现,newSingleThreadExecutor中返回了一个由XXXExecutorService()封装的的ThreadPoolExecutor;
所以我们在newSingleThreadExecutor中,不能直接调用线程池的方法,毕竟它返回的是ExecutorService;
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
测试:
package com.example.juc.ThreadPool.FixAndSingle;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author diao 2022/4/21
*/
public class CompareTest {
public static void main(String[] args) {
ExecutorService es1 = Executors.newFixedThreadPool(1);
ExecutorService es2 = Executors.newSingleThreadExecutor();
//1.首先newFixedThreadPool先转化为ThreadPoolExecutor
ThreadPoolExecutor tp1= (ThreadPoolExecutor) es1;
tp1.setCorePoolSize(2);
System.out.println(tp1.getCorePoolSize());
//2.然后我们将newSingleThreadExecutor转为ThreadPoolExecutor
ThreadPoolExecutor tp2= (ThreadPoolExecutor) es2;
tp2.setCorePoolSize(2);
System.out.println(tp2.getCorePoolSize());
}
}
也就是说,newFixedThreadPool(n)是可以通过强转变为ThreadPoolExecutor的,可以自行指定线程数;
而另外一个叫newSingleThreadExecutor是不可以的,线程数为1(一个个·来·);——>(完全线程安全)
ThreadPoolExecutor中的提交任务(submit+invokeAll+invokeAny)

submit:提交任务方法,返回值Future为任务执行结果
package com.example.juc.ThreadPool.FixAndSingle;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* @author diao 2022/4/21
*/
@Slf4j(topic = "c.CompareTest02")
public class CompareTest02 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(2);
//2.将任务提交到线程池中,用Future获取任务执行的结果
Future<String> fu = pool.submit(() -> {
log.debug("开始提交任务了...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "ok";
});
log.debug("{}",fu.get());
}
}

invokeAll方法:提交所有的任务
package com.example.juc.ThreadPool.FixAndSingle;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* @author diao 2022/4/21
*/
@Slf4j(topic = "c.CompareTest03")
public class CompareTest03 {
public static void main(String[] args) throws InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(3);
//线程池执行所有任务
List<Future<String>> futures = pool.invokeAll(Arrays.asList(
//一个任务,任务进入集合中
() -> {
log.debug("begin");
Thread.sleep(1000);
return "1";
},
()->{
log.debug("begin");
Thread.sleep(500);
return "2";
},
()->{
log.debug("begin");
Thread.sleep(2000);
return "3";
}
));
//主线程打印线程池中的任务内容
futures.forEach(f->{
try {
log.debug("{}",f.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
}
}
invokeAny:哪个任务率先完成就返回哪个任务
package com.example.juc.ThreadPool.FixAndSingle;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author diao 2022/4/21
*/
@Slf4j(topic = "c.CompareTestInvokeAny")
public class CompareTestInvokeAny {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(3);
//将任务放到线程池中
Object result = es.invokeAny(Arrays.asList(
() -> {
log.debug("begin 1");
Thread.sleep(1000);
log.debug("end 1");
return "1";
},
() -> {
log.debug("begin 2");
Thread.sleep(500);
log.debug("end 2");
return "2";
},
() -> {
log.debug("begin 3");
Thread.sleep(2000);
log.debug("end 3");
return "3";
}
));
log.debug("{}",result);
}
}

线程池中的shutdown方法
正在执行的线程不会被打断,但是空闲状态的线程会被打断;
然后tryTerminate尝试终结,终结之后,那些正在执行的线程就让他自己执行完挂掉;

shutdownNow():
所有线程,不管是正在执行还是空闲的线程全部打断,也就是0活数->tryTerminate()返回true;

其他方法:

版权声明
本文为[Fairy要carry]所创,转载请带上原文链接,感谢
https://blog.csdn.net/weixin_57128596/article/details/124309662
边栏推荐
- Reenter leetcode (392. Judgment subsequence)
- spark 安装与使用 educoder
- [twelfth database operation - stored procedure]
- Statistics of authoritative institutions: the best data center network company in 2021, China Huawei and H3C, were listed
- What are the main aspects of mobile app testing?
- 树莓派4B编译paddlelite(详细步骤2022年)
- 论文阅读 (47):DTFD-MIL: Double-Tier Feature Distillation Multiple Instance Learning for Histopathology..
- 5_ Data analysis - Data Visualization
- L1-051 discount (5 points)
- 软件测试成行业“薪”贵?
猜你喜欢

02-SparkSQL

DS18B20 temperature sensor based on 51 single chip microcomputer

Solve the problem of Chinese garbled code in idea (garbled code in configuration file)

sqlilabs(25a-26)

Introduction to C - parallel programming

5_ Data analysis - Data Visualization

pipeline communication

SCI paper writing -- word template of IEEE Journal (also available in latex)

01-Read&Write

How does IOT platform realize business configuration center
随机推荐
2022山东省安全员C证特种作业证考试题库及答案
Job scheduling, intermediate scheduling, process scheduling
一文告诉你分析即服务(AaaS)到底是什么
Zuo Chengyun - Dachang brushing class - the point with the most rope coverage
MUI-弹出菜单
DNS domain name system - directory service of the Internet
DOM事件流和事件委托
Reenter leetcode (392. Judgment subsequence)
Chapter 8 of C language programming (fifth edition of Tan Haoqiang) is good at using pointer exercises to analyze and answer
How to combine acrobat Pro DC with other files to create a single PDF file?
L1-046 divide singles (20 points)
Shell variables $, $@, $0, $1, $2, ${},%% use explanation and easy-to-use shell formatting tools
Jeesite export Excel
Filebeat collects log data and transfers it to redis. Different es indexes are created according to log fields through logstash
论文阅读 (48):A Library of Optimization Algorithms for Organizational Design
How much do you know about the testing methods of software testing?
手机软件(App)测试主要有哪些方面?
Solve the problem of Chinese garbled code in idea (garbled code in configuration file)
02-SparkSQL
Pod of kubernetes cluster said, can I die with dignity?