当前位置:网站首页>Async的线程池使用的哪个?
Async的线程池使用的哪个?
2022-08-03 16:24:00 【听风听雨听世界】
前言
在Spring中我们经常会用到异步操作,注解中使用 @EnableAsync 和 @Async 就可以使用它了。但是最近发现在异步中线程号使用的是我们项目中自定义的线程池 ThreadPoolTaskExecutor 而不是之前熟悉的 SimpleAsyncTaskExecutor
那么来看一下他的执行过程吧。
正文
- 首先要使异步生效,我们得在启动类中加入
@EnableAsync那么就点开它看看。它会使用@Import注入一个AsyncConfigurationSelector类,启动是通过父类可以决定它使用的是配置类ProxyAsyncConfiguration。
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
public AsyncConfigurationSelector() {
}
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch(adviceMode) {
case PROXY:
return new String[]{
ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[]{
"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"};
default:
return null;
}
}
}
- 点开能够看到注入一个
AsyncAnnotationBeanPostProcessor。它实现了BeanPostProcessor接口,因此它是一个后处理器,用于将Spring AOP的Advisor应用于给定的bean。从而该bean上通过异步注解所定义的方法在调用时会被真正地异步调用起来。
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
public ProxyAsyncConfiguration() {
}
@Bean(
name = {
"org.springframework.context.annotation.internalAsyncAnnotationProcessor"}
)
@Role(2)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder((Integer)this.enableAsync.getNumber("order"));
return bpp;
}
}
AsyncAnnotationBeanPostProcessor的父类实现了BeanFactoryAware,那么会在AsyncAnnotationBeanPostProcessor实例化之后回调setBeanFactory()来实例化切面AsyncAnnotationAdvisor。
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
//定义一个切面
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
AsyncAnnotationAdvisor构造和声明切入的目标(切点)和代码增强(通知)。
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
asyncAnnotationTypes.add(Async.class);
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
}
catch (ClassNotFoundException ex) {
// If EJB 3.1 API not present, simply ignore.
}
//通知
this.advice = buildAdvice(executor, exceptionHandler);
//切入点
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
- 通知就是最终要执行的。
buildAdvice用于构建通知,主要是创建一个AnnotationAsyncExecutionInterceptor类型的拦截器,并且配置好使用的执行器和异常处理器。真正的异步执行的代码在AsyncExecutionAspectSupport中!
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
//配置拦截器
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
- 配置拦截器,通过参数配置自定义的执行器和异常处理器或者使用默认的执行器和异常处理器。
public void configure(@Nullable Supplier<Executor> defaultExecutor,
@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
//默认执行器
this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);
}
getDefaultExecutor()方法,用来查找默认的执行器,父类AsyncExecutionAspectSupport首先寻找唯一一个类型为TaskExecutor的执行器并返回,若存在多个则寻找默认的执行器taskExecutor,若无法找到则直接返回null。子类AsyncExecutionInterceptor重写getDefaultExecutor方法,首先调用父类逻辑,返回null则配置一个名为SimpleAsyncTaskExecutor的执行器
/** * 父类 * 获取或构建此通知实例的默认执行器 * 这里返回的执行器将被缓存以供后续使用 * 默认实现搜索唯一的TaskExecutor的bean * 在上下文中,用于名为“taskExecutor”的Executor bean。 * 如果两者都不是可解析的,这个实现将返回 null */
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
if (beanFactory != null) {
try {
// 搜索唯一的一个TaskExecutor类型的bean并返回
return beanFactory.getBean(TaskExecutor.class);
}
catch (NoUniqueBeanDefinitionException ex) {
//找不到唯一一个bean异常后,搜索一个TaskExecutor类型的“taskExecutor”的bean并返回
logger.debug("Could not find unique TaskExecutor bean", ex);
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
if (logger.isInfoEnabled()) {
logger.info("More than one TaskExecutor bean found within the context, and none is named " +
"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +
"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex) {
//未找到异常时搜索一个TaskExecutor类型的“taskExecutor”的bean并返回
logger.debug("Could not find default TaskExecutor bean", ex);
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
logger.info("No task executor bean found for async processing: " +
"no bean of type TaskExecutor and no bean named 'taskExecutor' either");
}
// Giving up -> either using local default executor or none at all...
}
}
return null;
}
/** * 子类 * 如父类为null则重新实例化一个名为SimpleAsyncTaskExecutor的执行器 */
@Override
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}
所以,到了这一步就可以理解为什么异步线程名默认叫 SimpleAsyncTaskExecutor-xx ,为什么有了自己的线程池有可能异步用到了自己的线程池配置。
我们有这个切入点之后,每次请求接口执行异步方法前都会执行 AsyncExecutionInterceptor#invoke() , determineAsyncExecutor 用来决策使用哪个执行器
@Nullable
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
//在缓存的执行器中选择一个对应方法的执行器
AsyncTaskExecutor executor = (AsyncTaskExecutor)this.executors.get(method);
if (executor == null) {
//获取@Async注解中的value(指定的执行器)
String qualifier = this.getExecutorQualifier(method);
Executor targetExecutor;
if (StringUtils.hasLength(qualifier)) {
//获取指定执行器的bean
targetExecutor = this.findQualifiedExecutor(this.beanFactory, qualifier);
} else {
//选择默认的执行器
targetExecutor = (Executor)this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor)targetExecutor : new TaskExecutorAdapter(targetExecutor);
//将执行器进行缓存
this.executors.put(method, executor);
}
return (AsyncTaskExecutor)executor;
}
当有了执行器调用 doSubmit 方法将任务加入到执行器中。
异步任务,默认将采用SimpleAsyncTaskExecutor作为执行器!它有如下特点:
不复用线程,也就是说为每个任务新起一个线程。但是可以通过
concurrencyLimit属性来控制并发线程数量,但是默认情况下不做限制(concurrencyLimit取值为-1)。
因此,如果我们使用异步任务,一定不能采用默认执行器的配置,以防OOM异常!最好的方式是指定执行器!
总结
本文主要以看源码的方式来了解异步注解 @Async 是如何在项目中选择线程以及使用线程的,尽量给异步任务指定一个独有线程池,这样会的避免不与其他业务共用线程池而造成影响。
边栏推荐
猜你喜欢

STM32的HAL和LL库区别和性能对比

2年开发经验去面试,吊打面试官,即将面试的程序员这些笔记建议复习

To add digital wings to education, NetEase Yunxin released the overall solution of "Internet + Education"

#夏日挑战赛# HarmonyOS 实现一个绘画板

MySQL窗口函数

从零开始搭建MySQL主从复制架构

机器人开发--Universal Scene Description(USD)

C专家编程 第1章 C:穿越时空的迷雾 1.6 它很棒,但它符合标准吗

node连接mongoose数据库流程

设置海思芯片MMZ内存、OS内存详解
随机推荐
13 and OOM simulation
To add digital wings to education, NetEase Yunxin released the overall solution of "Internet + Education"
MySQL相关介绍
[QT] Qt project demo: data is displayed on the ui interface, double-click the mouse to display specific information in a pop-up window
leetcode:202. 快乐数
《社会企业开展应聘文职人员培训规范》团体标准在新华书店上架
将 Windows 事件日志错误加载到 SQL 表中
Kubernetes 笔记 / 入门 / 生产环境 / 用部署工具安装 Kubernetes / 用 kubeadm 启动集群 / 安装 kubeadm
83. Remove Duplicates from Sorted List
Common distributed theories (CAP, BASE) and consensus protocols (Gosssip, Raft)
一文看懂推荐系统:召回03:基于用户的协同过滤(UserCF),要计算用户之间的相似度
【QT】Qt 给已经开发好的程序快速封装成动态库
C语言01、数据类型、变量常量、字符串、转义字符、注释
新版本 MaxCompute 的SQL 中支持的 EXTRACT 函数有什么作用?
Some optional strategies and usage scenarios for PWA application Service Worker caching
Introduction to the advantages of the new generation mesh network protocol T-Mesh wireless communication technology
QT QT 】 【 to have developed a good program for packaging into a dynamic library
组件通信-父传子组件通信
Small Tools (4) integrated Seata1.5.2 distributed transactions
leetcode-693.交替位二进制数