SpringBoot-注解@Async
参考文献
@Async
1 | // org.springframework.scheduling.annotation.Async |
-
org.springframework.aop.interceptor.AsyncExecutionAspectSupport#determineAsyncExecutor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27/**
* Determine the specific executor to use when executing the given method.
* Should preferably return an {@link AsyncListenableTaskExecutor} implementation.
* @return the executor to use (or {@code null}, but just if no default executor is available)
*/
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
// 可以在@Async注解中配置线程池的名字
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
else {
targetExecutor = this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
return executor;
} -
最终会调用到
org.springframework.aop.interceptor.AsyncExecutionInterceptor#getDefaultExecutor
这个方法中1
2
3
4
5
6
7
8
9
10
11
12
13
14/**
* This implementation searches for a unique {@link org.springframework.core.task.TaskExecutor}
* bean in the context, or for an {@link Executor} bean named "taskExecutor" otherwise.
* If neither of the two is resolvable (e.g. if no {@code BeanFactory} was configured at all),
* this implementation falls back to a newly created {@link SimpleAsyncTaskExecutor} instance
* for local use if no default could be found.
* @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME
*/
protected Executor getDefaultExecutor( { BeanFactory beanFactory)
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23/**
* {@link TaskExecutor} implementation that fires up a new Thread for each task,
* executing it asynchronously.
*
* <p>Supports limiting concurrent threads through the "concurrencyLimit"
* bean property. By default, the number of concurrent threads is unlimited.
*
* <p><b>NOTE: This implementation does not reuse threads!</b> Consider a
* thread-pooling TaskExecutor implementation instead, in particular for
* executing a large number of short-lived tasks.
*
* @author Juergen Hoeller
* @since 2.0
* @see #setConcurrencyLimit
* @see SyncTaskExecutor
* @see org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
* @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor
*/
public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
implements AsyncListenableTaskExecutor, Serializable {
// ...略
}- 注意:这个实现不重用线程!考虑一个线程池的TaskExecutor实现,特别是在执行大量短期任务时。
自定义异步处理的线程池
方式一
- 添加
@EnableAsync
注解 MdcThreadPoolTaskExecutor
,MDC traceId
日志追踪
1 |
|
1 | package cn.holelin.xx.config; |
方式二
-
添加
@EnableAsync
注解 -
在
Spring
容器中定义一个线程池类型的bean
,bean
名称必须是taskExecutor
1
2
3
4
5
6
7
8
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(100);
executor.setThreadNamePrefix("my-thread-");
return executor;
}
获取异步返回值
-
若需取异步执行结果,方法返回值必须为
Future
类型,使用Spring
提供的静态方法org.springframework.scheduling.annotation.AsyncResult#forValue
创建返回值1
2
3
4
public Future<String> getInfo(long id) throws InterruptedException {
return AsyncResult.forValue(String.format("info:", id));
}
自定义异常处理
异常处理分2种情况
-
当返回值是Future的时候,方法内部有异常的时候,异常会向外抛出,可以对
Future.get
采用try..catch
来捕获异常 -
当返回值不是Future的时候,可以自定义一个bean,实现
AsyncConfigurer
接口中的getAsyncUncaughtExceptionHandler
方法,返回自定义的异常处理器1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public AsyncConfigurer asyncConfigurer() {
return new AsyncConfigurer() {
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncUncaughtExceptionHandler() {
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
//当目标方法执行过程中抛出异常的时候,此时会自动回调这个方法,可以在这个方法中处理异常
}
};
}
};
}
线程池隔离
@Async
注解有个value
参数来执行目标方法。用来指定线程池的bean
名称,方法运行的时候,就会采用指定的线程池
原理
-
内部使用
Aop
实现的,@EnableAsync
会引入一个bean
后置处理器:AsyncAnnotationBeanPostProcessor
,将其注册到Spring
容器,这个bean
后置处理器在所有bean创建过程中,判断bean
的类上是否有@Async
注解或者类中是否有@Async
标注的方法,如果有,会通过aop
给这个bean
生成代理对象,会在代理对象中添加一个切面:org.springframework.scheduling.annotation.AsyncAnnotationAdvisor
,这个切面中会引入一个拦截器:AnnotationAsyncExecutionInterceptor
,方法异步调用的关键代码就是在这个拦截器的invoke
方法中实现的.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32// org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24// org.springframework.aop.interceptor.AsyncExecutionAspectSupport#doSubmit
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
}
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
else {
executor.submit(task);
return null;
}
}