参考文献

@Async

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// org.springframework.scheduling.annotation.Async
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {

/**
* A qualifier value for the specified asynchronous operation(s).
* <p>May be used to determine the target executor to be used when executing
* the asynchronous operation(s), matching the qualifier value (or the bean
* name) of a specific {@link java.util.concurrent.Executor Executor} or
* {@link org.springframework.core.task.TaskExecutor TaskExecutor}
* bean definition.
* <p>When specified on a class-level {@code @Async} annotation, indicates that the
* given executor should be used for all methods within the class. Method-level use
* of {@code Async#value} always overrides any value set at the class level.
* @since 3.1.2
*/
String value() default "";

}
  • org.springframework.aop.interceptor.AsyncExecutionAspectSupport#determineAsyncExecutor

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    /**
    * Determine the specific executor to use when executing the given method.
    * Should preferably return an {@link AsyncListenableTaskExecutor} implementation.
    * @return the executor to use (or {@code null}, but just if no default executor is available)
    */
    @Nullable
    protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
    AsyncTaskExecutor executor = this.executors.get(method);
    if (executor == null) {
    Executor targetExecutor;
    // 可以在@Async注解中配置线程池的名字
    String qualifier = getExecutorQualifier(method);
    if (StringUtils.hasLength(qualifier)) {
    targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
    }
    else {
    targetExecutor = this.defaultExecutor.get();
    }
    if (targetExecutor == null) {
    return null;
    }
    executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
    (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
    this.executors.put(method, executor);
    }
    return executor;
    }
  • 最终会调用到org.springframework.aop.interceptor.AsyncExecutionInterceptor#getDefaultExecutor这个方法中

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    /**
    * This implementation searches for a unique {@link org.springframework.core.task.TaskExecutor}
    * bean in the context, or for an {@link Executor} bean named "taskExecutor" otherwise.
    * If neither of the two is resolvable (e.g. if no {@code BeanFactory} was configured at all),
    * this implementation falls back to a newly created {@link SimpleAsyncTaskExecutor} instance
    * for local use if no default could be found.
    * @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME
    */
    @Override
    @Nullable
    protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
    Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
    return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
    }

    img

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    /**
    * {@link TaskExecutor} implementation that fires up a new Thread for each task,
    * executing it asynchronously.
    *
    * <p>Supports limiting concurrent threads through the "concurrencyLimit"
    * bean property. By default, the number of concurrent threads is unlimited.
    *
    * <p><b>NOTE: This implementation does not reuse threads!</b> Consider a
    * thread-pooling TaskExecutor implementation instead, in particular for
    * executing a large number of short-lived tasks.
    *
    * @author Juergen Hoeller
    * @since 2.0
    * @see #setConcurrencyLimit
    * @see SyncTaskExecutor
    * @see org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
    * @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor
    */
    @SuppressWarnings("serial")
    public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
    implements AsyncListenableTaskExecutor, Serializable {
    // ...略
    }
    • 注意:这个实现不重用线程!考虑一个线程池的TaskExecutor实现,特别是在执行大量短期任务时。

自定义异步处理的线程池

方式一

  • 添加@EnableAsync注解
  • MdcThreadPoolTaskExecutor,MDC traceId日志追踪
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109

import lombok.NonNull;
import org.slf4j.MDC;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;

/**
* 增加日志MDC
* @author HoleLin
*/
public class MdcThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {

/**
* Gets context for task *
*
* @return context for task
*/
private Map<String, String> getContextForTask() {
return MDC.getCopyOfContextMap();
}

/**
* All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code execute()} etc.)
* all delegate to this.
*/
@Override
public void execute(@NonNull Runnable command) {
super.execute(wrap(command, getContextForTask()));
}

/**
* All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
* all delegate to this.
*/
@NonNull
@Override
public Future<?> submit(@NonNull Runnable task) {
return super.submit(wrap(task, getContextForTask()));
}

/**
* All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
* all delegate to this.
*/
@NonNull
@Override
public <T> Future<T> submit(@NonNull Callable<T> task) {
return super.submit(wrap(task, getContextForTask()));
}

/**
* Wrap callable
*
* @param <T> parameter
* @param task task
* @param context context
* @return the callable
*/
private <T> Callable<T> wrap(final Callable<T> task, final Map<String, String> context) {
return () -> {
Map<String, String> previous = MDC.getCopyOfContextMap();
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
try {
return task.call();
} finally {
if (previous == null) {
MDC.clear();
} else {
MDC.setContextMap(previous);
}
}
};
}

/**
* Wrap runnable
*
* @param runnable runnable
* @param context context
* @return the runnable
*/
private Runnable wrap(final Runnable runnable, final Map<String, String> context) {
return () -> {
Map<String, String> previous = MDC.getCopyOfContextMap();
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
try {
runnable.run();
} finally {
if (previous == null) {
MDC.clear();
} else {
MDC.setContextMap(previous);
}
}
};
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package cn.holelin.xx.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.stereotype.Component;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
* 异步线程池配置类
*
* @author HoleLin
*/
@Slf4j
@Component
public class AsyncConfig implements AsyncConfigurer {

@Override
public Executor getAsyncExecutor() {
MdcThreadPoolTaskExecutor executor = new MdcThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setKeepAliveSeconds(5 * 60);
executor.setQueueCapacity(1000);
// 自定义实现拒绝策略
executor.setRejectedExecutionHandler((Runnable runnable, ThreadPoolExecutor exe) -> log.error("当前任务线程池队列已满."));
// 或者选择已经定义好的其中一种拒绝策略
// 丢弃任务并抛出RejectedExecutionException异常
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// 丢弃任务,但是不抛出异常
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 由调用线程处理该任务
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
// 告诉线程池,在销毁之前执行shutdown方法
executor.setWaitForTasksToCompleteOnShutdown(true);
// shutdown\shutdownNow 之后等待3秒
executor.setAwaitTerminationSeconds(3);

// 线程名称前缀
executor.setThreadNamePrefix("Async-");
executor.initialize();

return executor;
}
}

方式二

  • 添加@EnableAsync注解

  • Spring容器中定义一个线程池类型的bean,bean名称必须是taskExecutor

    1
    2
    3
    4
    5
    6
    7
    8
    @Bean
    public Executor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(10);
    executor.setMaxPoolSize(100);
    executor.setThreadNamePrefix("my-thread-");
    return executor;
    }

获取异步返回值

  • 若需取异步执行结果,方法返回值必须为Future类型,使用Spring提供的静态方法org.springframework.scheduling.annotation.AsyncResult#forValue创建返回值

    1
    2
    3
    4
    @Async
    public Future<String> getInfo(long id) throws InterruptedException {
       return AsyncResult.forValue(String.format("info:", id));
    }

自定义异常处理

异常处理分2种情况

  • 当返回值是Future的时候,方法内部有异常的时候,异常会向外抛出,可以对Future.get采用try..catch来捕获异常

  • 当返回值不是Future的时候,可以自定义一个bean,实现AsyncConfigurer接口中的getAsyncUncaughtExceptionHandler方法,返回自定义的异常处理器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    @Bean
    public AsyncConfigurer asyncConfigurer() {
    return new AsyncConfigurer() {
    @Nullable
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
    return new AsyncUncaughtExceptionHandler() {
    @Override
    public void handleUncaughtException(Throwable ex, Method method, Object... params) {
    //当目标方法执行过程中抛出异常的时候,此时会自动回调这个方法,可以在这个方法中处理异常
    }
    };
    }
    };
    }

线程池隔离

  • @Async注解有个value参数来执行目标方法。用来指定线程池的bean名称,方法运行的时候,就会采用指定的线程池

原理

  • 内部使用Aop实现的,@EnableAsync会引入一个bean后置处理器:AsyncAnnotationBeanPostProcessor,将其注册到Spring容器,这个bean后置处理器在所有bean创建过程中,判断bean的类上是否有@Async注解或者类中是否有@Async标注的方法,如果有,会通过aop给这个bean生成代理对象,会在代理对象中添加一个切面:org.springframework.scheduling.annotation.AsyncAnnotationAdvisor,这个切面中会引入一个拦截器:AnnotationAsyncExecutionInterceptor,方法异步调用的关键代码就是在这个拦截器的invoke方法中实现的.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    // org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke
    @Override
    @Nullable
    public Object invoke(final MethodInvocation invocation) throws Throwable {
    Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
    Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
    final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

    AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
    if (executor == null) {
    throw new IllegalStateException(
    "No executor specified and no default executor set on AsyncExecutionInterceptor either");
    }

    Callable<Object> task = () -> {
    try {
    Object result = invocation.proceed();
    if (result instanceof Future) {
    return ((Future<?>) result).get();
    }
    }
    catch (ExecutionException ex) {
    handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
    }
    catch (Throwable ex) {
    handleError(ex, userDeclaredMethod, invocation.getArguments());
    }
    return null;
    };

    return doSubmit(task, executor, invocation.getMethod().getReturnType());
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    // org.springframework.aop.interceptor.AsyncExecutionAspectSupport#doSubmit
    @Nullable
    protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
    if (CompletableFuture.class.isAssignableFrom(returnType)) {
    return CompletableFuture.supplyAsync(() -> {
    try {
    return task.call();
    }
    catch (Throwable ex) {
    throw new CompletionException(ex);
    }
    }, executor);
    }
    else if (ListenableFuture.class.isAssignableFrom(returnType)) {
    return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
    }
    else if (Future.class.isAssignableFrom(returnType)) {
    return executor.submit(task);
    }
    else {
    executor.submit(task);
    return null;
    }
    }