当前位置:网站首页>Thread pool status + ThreadPoolExecutor
Thread pool status + ThreadPoolExecutor
2022-04-22 04:38:00 【Fairy wants carry】
Catalog
ThreadPoolExecutor Construction method of
Thread pool underlying interface ExecutorService Interface
ask :submit and execute It's all about submitting tasks , What's the difference? ?
ThreadPoolExecutor Submit task in (submit+invokeAll+invokeAny)
In the thread pool shutdown Method
int There is 32 position , high 3 Bits represent thread state , low 29 Bits indicate the number of threads ;
Thread pool state

Running: Run submit and process tasks ;
SHUTDOWN: New tasks are not accepted , In the past, it was still carried out as usual ;
stop state : Make use of Interrupt Method to interrupt a task , Even if you're on a mission , Will also be abandoned ;
——> When we call... In the thread pool shutdownNow() Method , The thread pool state will be determined by Running Turn into stop state
TIDYING state : The thread is all executed , Threads in the pool are 0, It will soon enter the end state ;
TERMINDATED: Put an end to the state ;

We'll put this information ( Thread state + Number of threads ) Stored in an atomic variable ;——> Purpose : One less time CAS operation ;

ThreadPoolExecutor Construction method of
Essentially TreadPoolExecutor Is to implement the ExecutorService Interface :
corePoolSize: Number of core threads
maximumPoolSize: Maximum number of threads
KeepAliveTime: Time to live —— For emergency threads ——>( Used after the rush hour )
unit: Time unit - For emergency threads
threadFactory: Thread factory
handler: Refusal strategy
workQueue: Task queue
Emergency thread :
When the concurrency is large enough ,( The blocking queue can't fit ), Will consider the emergency thread to perform the task ; But after the mission , Will hang up after the specified lifetime ;
Conditions :
Satisfy bounded queue , That is, when all the threads in your thread pool are busy , New tasks need to wait in the blocking queue ;——> benefits : Help reduce CPU Use , And reduce context switching ;

Thread pool execution execute() Method , When adding a task , The thread pool will make the following judgment :
1. If there are idle threads , Then execute the task directly ;
2. If there are no idle threads , And the number of threads currently running is less than corePoolSize, Then create a new thread to execute the task ;
3. If there are no idle threads , And the current number of threads is equal to corePoolSize, At the same time, the blocking queue is not full , Then put the task in the queue , Without adding new threads ;
4. If there are no idle threads , And the blocking queue is full , At the same time, the number of threads in the pool is less than maximumPoolSize , Create a new thread to execute the task ;
5. If there are no idle threads , And the blocking queue is full , At the same time, the number of threads in the pool is equal to maximumPoolSize , Then according to handler Specify a policy to reject new tasks .
Refusal strategy :
When the task keeps coming , But our threads are busy again , We're using a rejection strategy ( Is the emergency thread also a rejection strategy ?)
stay ThreadPoolExecutor Four processing strategies are provided ;

1、AbortPolicy: The processing task is rejected and an exception is thrown , It means giving up to ensure normal operation ;
2、CallerRunsPolicy: The thread calls the execute In itself . This strategy provides a simple feedback control mechanism, which can slow down the submission speed of new tasks .
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); }}
This strategy obviously doesn't want to give up the task . But because there are no resources in the pool , Then you can call this directly execute The thread itself executes .( At first, I didn't want to abandon the execution of the task , But for some application scenarios , It is likely that the current thread will also be blocked . If all threads are not executable , It's likely that the program can't continue to run . It depends on the business situation .)
3.DiscardPolicy: Tasks that cannot be performed will be deleted
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
This strategy and AbortPolicy Almost the same as , Also dropping tasks , It's just that he doesn't throw exceptions .
4.DiscardOldestPolicy: If the executing program has not been closed , The task at the head of the work queue will be deleted , Then try to execute the program again ( If it fails again , Then repeat the process )
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) {e.getQueue().poll();e.execute(r); }}
The strategy is a little more complicated , stay pool First lose the earliest task cached in the queue without closing , Then try running the task again . This strategy requires proper care .
scene :
Dubbo: Will log , Easy to find information ;
Netty: Create a new thread to perform the task ;
PinPoint: Try every strategy ;
ActiveMQ: The task will wait for a timeout (60s), If other tasks are completed during this period , Then the task can be performed , Or hang up ;
workQueue Task queue :
effect : Determines the queuing strategy of cache tasks 2.
ThreadPoolExecutor Thread pool provides three kinds of waiting queues :1.synchronousQueue、2.LinkedBlockingQueue、3.ArrayBlockingQueue
Bounded queues :
synchronousQueue: Every time you insert a task , You must wait for another thread to call to remove the task from the task queue , Otherwise, it will be blocked ;
ArrayBlockingQueue:( Supported by arrays ) On a first in, first out basis FIFO Sort by the rules of ;
Unbounded queue :
LinkedBlockingQueue: The capacity can be specified or not , Maximum Integer.MAX_VALUE
newCachedThreadPool

characteristic :
The number of core threads is 0, The maximum is Integer.MAX_VALUE, The idle lifetime of the emergency thread is 60S;
It shows that emergency threads can be created infinitely ;
The task queue in the blocking queue uses synchronousQueue: It means that only when you see the thread coming first , The task will give it , It means two-way rush to , Because it has no capacity , Can't put into the task ;
Scene and evaluation :

Common methods of thread pool

1. getCorePoolSize(): Returns the number of core threads in the thread pool , This value is always the same , Returns the... Set in the constructor coreSize size ;
2.getMaximumPoolSize(): Returns the maximum number of threads in the thread pool , This value is always the same , Returns the... Set in the constructor coreSize size ;
3.getLargestPoolSize(): Record the maximum number of threads that have ever occurred ( Waterline );
4.getPoolSize(): The number of current threads in the thread pool ;
5.getActiveCount():Returns the approximate( The approximate ) number of threads that are actively executing tasks;
6.prestartAllCoreThreads(): Will start all core threads , Whether or not there are tasks to be performed , The thread pool will create new threads , Until the number of threads in the pool reaches corePoolSize;
7.prestartCoreThread(): Will start a core thread ( ditto );
allowCoreThreadTimeOut(true): Allow core threads to KeepAliveTime After time , sign out ;
Thread pool underlying interface ExecutorService Interface
What we said before ThreadPoolExecutor Is the implementation class of the interface
Common methods of this interface :
1.Future<?> submit(Runnable task): Submit Runnable Task to thread pool , return Future object , because Runnable no return value , That is to say, call Future object get() Method returns null;
2.<T> Future<T> submit(Callable<T> task): Submit Callable Task to thread pool , return Future object , call Future object get() Method can obtain Callable The return value of ;
3.<T> Future<T> submit(Runnable task,T result): Submit Runnable Task to thread pool , return Future object , call Future object get() Method can obtain Runnable Parameter values for ;

RunnableTest:
public static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 50, 300, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(50),
new ThreadFactory(){ public Thread newThread(Runnable r) {
return new Thread(r, "schema_task_pool_" + r.hashCode());
}}, new ThreadPoolExecutor.DiscardOldestPolicy());
public static void callableTest() {
int a = 1;
//callable
Future<Boolean> future = threadPool.submit(new Callable<Boolean>(){
@Override
public Boolean call() throws Exception {
int b = a + 100;
System.out.println(b);
return true;
}
});
try {
System.out.println("feature.get");
Boolean boolean1 = future.get();
System.out.println(boolean1);
} catch (InterruptedException e) {
System.out.println("InterruptedException...");
e.printStackTrace();
} catch (ExecutionException e) {
System.out.println("execute exception...");
e.printStackTrace();
}
}
public static void runnableTest() {
int a = 1;
//runnable
Future<?> future1 = threadPool.submit(new Runnable(){
@Override
public void run() {
int b = a + 100;
System.out.println(b);
}
});
try {
System.out.println("feature.get");
Object x = future1.get(900,TimeUnit.MILLISECONDS);
System.out.println(x);//null
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
System.out.println("execute exception...");
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
ask :submit and execute It's all about submitting tasks , What's the difference? ?
submit: Two parameter types can be accepted ->Runnable+Callable, There is a return value
execute: A kind of Runnable type , also execute no return value
Continue to explore : Why should there be Runnable And Callable Two ways ?
because Callable There is a return value , So you can throw an exception , External digestion ( And FutureTask When used together, you can obtain asynchronous execution results )
and Runnable No results returned , In case of abnormality, it can only be digested internally
newSingleThreadExecutor

Single threaded executors :
Use scenarios :
I want multiple tasks to queue up , Just like the serial nature , When tasks >1, After execution, it will be placed in an unbounded queue , Task completed , The only thread executing the task will not be released ;
ask : What is the difference between creating a single thread by yourself and a single thread in the thread pool ?
Self created single thread serial execution task , If an exception occurs during task execution , There are no threads , There is no remedy ;
A single thread in a thread pool , If you are abnormal, he will have remedies , Create a new thread , Because the task is not carried out , So he will create a new thread ;
newSingleThreadExecutor: The number of threads is always 1, Do not modify the
This single threaded actuator , He won't return the thread pool object directly , But with a package , That is, there is no exposure to the unique methods of the implementation class (ThreadPoolExecutor Implementation class ), What's exposed is ExecutorService Interface ;

newFixedThreadPool:
What is exposed is ThreadPoolExecutor object , You can modify the methods in the thread pool ;
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

difference :
package com.example.juc.ThreadPool.FixAndSingle;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author diao 2022/4/21
*/
public class CompareFAS {
public static void main(String[] args) throws InterruptedException {
ExecutorService es1 = Executors.newFixedThreadPool(1);
ExecutorService es2 = Executors.newSingleThreadExecutor();
testThread(es1);
TimeUnit.SECONDS.sleep(1);
testThread(es2);
}
private static void testThread(ExecutorService e){
e.submit(()->{
throw new IllegalStateException("Error");
});
e.submit(()->{
System.out.println("running....");
});
}
}
*: It's not hard to find out ,newSingleThreadExecutor Returned a from XXXExecutorService() Encapsulated ThreadPoolExecutor;
So we are newSingleThreadExecutor in , You can't call the method of thread pool directly , After all, it returns ExecutorService;
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
test :
package com.example.juc.ThreadPool.FixAndSingle;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author diao 2022/4/21
*/
public class CompareTest {
public static void main(String[] args) {
ExecutorService es1 = Executors.newFixedThreadPool(1);
ExecutorService es2 = Executors.newSingleThreadExecutor();
//1. First newFixedThreadPool First convert to ThreadPoolExecutor
ThreadPoolExecutor tp1= (ThreadPoolExecutor) es1;
tp1.setCorePoolSize(2);
System.out.println(tp1.getCorePoolSize());
//2. Then we will newSingleThreadExecutor To ThreadPoolExecutor
ThreadPoolExecutor tp2= (ThreadPoolExecutor) es2;
tp2.setCorePoolSize(2);
System.out.println(tp2.getCorePoolSize());
}
}
in other words ,newFixedThreadPool(n) Can be transformed into ThreadPoolExecutor Of , You can specify the number of threads ;
And the other one is newSingleThreadExecutor no , The number of threads is 1( One by one · Come on ·);——>( Full thread safety )
ThreadPoolExecutor Submit task in (submit+invokeAll+invokeAny)

submit: Submit task method , Return value Future Results for task execution
package com.example.juc.ThreadPool.FixAndSingle;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* @author diao 2022/4/21
*/
@Slf4j(topic = "c.CompareTest02")
public class CompareTest02 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(2);
//2. Submit the task to the thread pool , use Future Get the result of task execution
Future<String> fu = pool.submit(() -> {
log.debug(" Start submitting tasks ...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "ok";
});
log.debug("{}",fu.get());
}
}

invokeAll Method : Submit all tasks
package com.example.juc.ThreadPool.FixAndSingle;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* @author diao 2022/4/21
*/
@Slf4j(topic = "c.CompareTest03")
public class CompareTest03 {
public static void main(String[] args) throws InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(3);
// The thread pool performs all tasks
List<Future<String>> futures = pool.invokeAll(Arrays.asList(
// A task , The task enters the collection
() -> {
log.debug("begin");
Thread.sleep(1000);
return "1";
},
()->{
log.debug("begin");
Thread.sleep(500);
return "2";
},
()->{
log.debug("begin");
Thread.sleep(2000);
return "3";
}
));
// The main thread prints the task content in the thread pool
futures.forEach(f->{
try {
log.debug("{}",f.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
}
}
invokeAny: Which task is completed first will return to which task
package com.example.juc.ThreadPool.FixAndSingle;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author diao 2022/4/21
*/
@Slf4j(topic = "c.CompareTestInvokeAny")
public class CompareTestInvokeAny {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(3);
// Put the task into the thread pool
Object result = es.invokeAny(Arrays.asList(
() -> {
log.debug("begin 1");
Thread.sleep(1000);
log.debug("end 1");
return "1";
},
() -> {
log.debug("begin 2");
Thread.sleep(500);
log.debug("end 2");
return "2";
},
() -> {
log.debug("begin 3");
Thread.sleep(2000);
log.debug("end 3");
return "3";
}
));
log.debug("{}",result);
}
}

In the thread pool shutdown Method
The executing thread will not be interrupted , But idle threads will be interrupted ;
then tryTerminate Try to end , After the end , Those threads that are executing let them finish executing themselves and hang up ;

shutdownNow():
All threads , All threads, whether executing or idle, are interrupted , That is to say 0 Live number ->tryTerminate() return true;

Other methods :

版权声明
本文为[Fairy wants carry]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204220433108654.html
边栏推荐
- How did Jian Daoyun, who started a business in a small team of 10 people and now has an income of more than 100 million without financing, do it?
- Websocket learning
- The website is linked to the gambling dark chain
- Using random function to change the random range
- [concurrent programming 046] for the synchronization method, how does the processor realize atomic operation?
- How do I test the shuttle application? Unit test
- Pod of kubernetes cluster said, can I die with dignity?
- Revit data processing (III)
- 二叉树的层序遍历
- [concurrent programming 043] how to solve the problems of CAS and ABA?
猜你喜欢

Jupiter notebook modifying the default opening path

Reenter leetcode (392. Judgment subsequence)

论文阅读 (48):A Library of Optimization Algorithms for Organizational Design

Solve the problem of Chinese garbled code in idea (garbled code in configuration file)

解决IDEA中文乱码问题(配置文件乱码)

How expensive is the "salary" of software testing industry?

线程池状态+ThreadPoolExecutor

H7-TOOL发布固件V2.15, 脱机烧录增加瑞萨,合泰以及IS25WP全系列SPI FLASH等(2022-04-14)

Zuo Chengyun - Dachang brushing class - the point with the most rope coverage

Sequence traversal of binary tree
随机推荐
sqlilabs(25a-26)
C语言学习记录——삼십칠 字符串函数使用和剖析(1)
14.buffferevent超时事件处理
Paper reading (48): a library of optimization algorithms for organizational design
Unity的随机数
10人小团队创业,如今收入过亿,不融资的简道云是怎么做到的?
2021-08-14
Raspberry pie 4B compile paddlelite (detailed steps 2022)
【openEuler】Failed to download metadata for repo ‘EPOL‘: Cannot d
Peer interview sharing Lenovo WinForm direction 20220420
Final of the 16th programming competition of Beijing Normal University - reproduction competition & supplementary questions
2022年A特种设备相关管理(电梯)复训题库及答案
ObjectMapper,别再像个二货一样一直new了
How do I evaluate strings to numbers in Bash?
Introduction to UVM Experiment 2
Training summary report
权威机构统计:2021 年最佳数据中心网络公司,中国华为和H3C上榜
QT used in vs 2019
Pgpool II 4.3 Chinese Manual - introductory tutorial
论文阅读 (49):Big Data Security and Privacy Protection (科普文)