发布于 

Async annotation learn more

[TOC]

@Async annotation

本人通过一步一步debug的方式,来给你揭露@Async实现的原理。

@Async Demo

如果碰到一个新的注解,有什么功能,实现机制。要怎么去做?

但是我认为不论是从什么角度去下手的,最后一定是会落到源码里面的。

因此,我们写个最简单的例子吧。
首先启动类:

@SpringBootApplication
public class AppApplication {

public static void main(String[] args) {
SpringApplication.run(AppApplication.class, args);
}
}

再整个业务类:在需要异步处理的逻辑上加伤我们的@Async注解

@Service
@Slf4j
public class HelloService {

@Async
public void asyncHello() {
log.info("thread name:{}", Thread.currentThread().getName());
}
}

最后加上Controller来调用业务:

@RestController
@RequestMapping("/noah")
@Slf4j
public class HelloController {

@Resource
HelloService helloService;

@GetMapping("/hello")
public String helloAsync() {
log.info("thread name:{}", Thread.currentThread().getName());
helloService.asyncHello();
return "hello noah";
}
}

最后我们通过线程名来看是否异步执行了。

2022-01-09 19:30:05 [http-nio-8080-exec-2] INFO  [com.noah.async.controller.HelloController         ] >>> thread name:http-nio-8080-exec-2 
2022-01-09 19:30:05 [http-nio-8080-exec-2] INFO [com.noah.async.service.HelloService ] >>> thread name:http-nio-8080-exec-2

我们发现怎么还是tomcat的线程。这可以说明@Aysnc注解没有生效

@Async failure

当我们平常在写代码,发现代码没有按照我们期望生效的时候,怎么办呢?

  1. 当然是Google开发工程师啦~~
  2. 我们从源码分析肯定也能查到为啥没有生效

失效原因:

  • @SpringBootApplication 启动类没有添加@EnableAsync注解
  • 没有走 Spring 的代理类。因为 @Transactional@Async 注解的实现都是基于 Spring 的 AOP,而 AOP 的实现是基于动态代理模式实现的。那么注解失效的原因就很明显了,有可能因为调用方法的是对象本身而不是代理对象,因为没有经过 Spring 容器管理。

很显然,我这个情况符合第一种情况,没有添加 @EnableAsync 注解。

另外一个原因,我也很感兴趣,但是现在我的首要任务是把 Demo 搭建好,所以不能被其他信息给诱惑了。

很多同学带着问题去查询的时候,本来查的问题是@Async 注解为什么没有生效,结果慢慢的就走偏了,十五分钟后问题就逐渐演变为了 SpringBoot 的启动流程。

再过半小时,网页上就显示的是一些面试必背八股文之类的东西…

我说这个意思就是,查问题就好好查问题。查问题的过程中肯定会由这个问题引发的自己更加感兴趣的问题。但是,记录下来,先不要让问题发散。

这个道理,就和带着问题去看源码一样,看着看着,可能连自己的问题是什么都不知道了。

好了,接下来我们在代码加上@Async

@SpringBootApplication
@EnableAsync
public class AppApplication {

public static void main(String[] args) {
SpringApplication.run(AppApplication.class, args);
}
}

看下运行效果:

2022-01-09 20:07:33 [http-nio-8080-exec-2] INFO  [com.noah.async.controller.HelloController         ] >>> thread name:http-nio-8080-exec-2 
2022-01-09 20:07:33 [task-2] INFO [com.noah.async.service.HelloService ] >>> thread name:task-2

好了,符合我们的运行期望,但是我们要知道是怎么样配置的线程池执行的,才放心。

@Async Thread Pool

正常来说,要找到这个线程池,这时候就要去看源码了。

但是我们可以换个思路,我们把线程池压垮,走到线程池的拒绝策略就会把异常抛出吧?

于是我们把程序,稍微修改了下:

image-20220109224910363

上面是控制器的逻辑,我们传递指定任务大小给线程池。

下面是service逻辑,增加睡眠1s的逻辑

image-20220109225045204

期望能够因为队列满了,把异常信息打出来。

但是结果不是我们期望那样,直接看结果。

下图是初始化的堆大小:

image-20220109230446138

堆大小:500MB,已经使用才50MB。

当我们让线程池仍了1000w数据之后。结果如下图。

image-20220109230647364

image-20220109230911849

从上面的数据我们可以大胆猜测下,@Async的线程池配置这样的,核心线程数是8。队列长度是Integer.MAX_VALUE。

为什么不能最大线程数是8呢?可以自己思考下。

@Async还在能愉快的消费这数据,到这里,你是否认为:哇,@Async太nb了。千万级别的任务都能处理过来。

你这样认为就大错特错了!!

这里,我们可以引出:

@Async默认线程池有导致内存溢出的风险。

到处我们骚想法:通过抛出异常来看@Async线程池的配置落空了。

@Async Source code

捷径无法实现,那接下来我们认认真真开始啃源码吧。

image-20220110150226488

Annotation that marks a method as a candidate for asynchronous execution. Can also be used at the type level, in which case all of the type's methods are considered as asynchronous.

将方法标记为异步执行候选的注解。 也可以在类型级别使用,在这种情况下,所有类型的方法都被认为是异步的。

简单来说:这个注解可以作用在方法和类级别上。

In terms of target method signatures, any parameter types are supported. However, the return type is constrained to either void or java.util.concurrent.Future.

在目标方法签名方面,支持任何参数类型。 但是,返回类型被限制为 void 或 java.util.concurrent.Future。

当我们看到target单词的时候,我们第一反应应该是要出现一个代理对象的概率。但是上面这句话的重点,是在转折后面。返回类型只能是void或者Future。

假设我们一定要返回Integer值会怎么样呢?

@Async
public Integer asyncReturn() {
return 1;
}

结论就是返回一个null值,这样就很容易就有空指针。

留个TODO:我们返回为int的基本类型,可以学习了解到AOP的执行流程。

进阶TODO:@trasaction执行流程是怎么样的呢?

image-20220111091851964

idea还是yyds,直接帮我们标记出来。

接下来,我们看看注解的value属性

A qualifier value for the specified asynchronous operation(s).
May be used to determine the target executor to be used when executing this method, matching the qualifier value (or the bean name) of a specific Executor or TaskExecutor bean definition.
When specified on a class level @Async annotation, indicates that the given executor should be used for all methods within the class. Method level use of Async#value always overrides any value set at the class level.

指定方法或者类上面执行任务的线程池配置。根据bean名称

我们总结下,我们现在至今了解的信息

  • 除了 @Async 注解之外,还需要加上 @EnableAsync 注解,比如加在启动类上。
  • 然后把这个默认的线程池当做黑盒测试了一把,我怀疑它的核心线程数默认是 8,队列长度无线长。有内存溢出的风险。
  • 通过阅读 @Async 上的注解,我发现返回值只能是 void 或者 Future 类型,否则即使返回了其他值,不会报错,但是返回的值是 null,有空指针风险。
  • @Async 注解中有一个 value 属性,看注释应该是可以指定自定义线程池的。

接下来,我们把我们还需要了解@Async的问题列下

  1. 默认线程池的配置到底是怎么样的?
  2. 源码是怎么支持只返回void或者Future的?
  3. value属性是干什么用的?

我们通过value属性,被那些方法调用了,可以直接定位到下面的代码:

根据方法名或者指定线程池,或者返回默认线程池。

@Override
@Nullable
protected String getExecutorQualifier(Method method) {
// Maintainer's note: changes made here should also be made in
// AnnotationAsyncExecutionAspect#getExecutorQualifier
Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class);
if (async == null) {
async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class);
}
return (async != null ? async.value() : null);
}

接下来,我们来卷卷源码:

根据方法返回指定的执行器,空字符串也是返回默认的执行器。see also看方法名,我们也知道:决定异步执行器(方法)

Return the qualifier or bean name of the executor to be used when executing the given method, specified via Async.value at the method or declaring class level. If @Async is specified at both the method and class level, the method's #value takes precedence (even if empty string, indicating that the default executor should be used preferentially).

See Also:
determineAsyncExecutor(Method)

我们一步步debug进来,可以看到默认线程池的配置。

beanName:applicationTaskExecutor

image-20220111115256961

现在我是直接从 BeanFactory 获取到了这个线程池的 Bean,那么这个 Bean 是什么时候注入的呢?

  1. org.springframework.beans.factory.support.AbstractBeanFactory#getBean(java.lang.String),打上断点。找beanName=applicationTaskExecutor
  2. 全局寻找applicationTaskExecutor
  3. 把spring管理的线程池全部找出来

@ConfigurationProperties("spring.task.execution")
public class TaskExecutionProperties {

private final Pool pool = new Pool();

/**
* Prefix to use for the names of newly created threads.
*/
private String threadNamePrefix = "task-";

public Pool getPool() {
return this.pool;
}

public String getThreadNamePrefix() {
return this.threadNamePrefix;
}

public void setThreadNamePrefix(String threadNamePrefix) {
this.threadNamePrefix = threadNamePrefix;
}

public static class Pool {

/**
* Queue capacity. An unbounded capacity does not increase the pool and therefore
* ignores the "max-size" property.
*/
private int queueCapacity = Integer.MAX_VALUE;

/**
* Core number of threads.
*/
private int coreSize = 8;

/**
* Maximum allowed number of threads. If tasks are filling up the queue, the pool
* can expand up to that size to accommodate the load. Ignored if the queue is
* unbounded.
*/
private int maxSize = Integer.MAX_VALUE;

/**
* Whether core threads are allowed to time out. This enables dynamic growing and
* shrinking of the pool.
*/
private boolean allowCoreThreadTimeout = true;

/**
* Time limit for which threads may remain idle before being terminated.
*/
private Duration keepAlive = Duration.ofSeconds(60);

}

}

org.springframework.boot.autoconfigure.task.TaskExecutionProperties,一步步debug你会定位到这个配置属性类。


接下来我们一起讨论,为什么只能支持void或者Futureimage-20220111132828009

org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke

上面是拦截器逻辑,真实aop执行的逻辑。我们认真读源码,发现返回逻辑就知道了。

这也是AOP执行的逻辑。我们留下的TODO要在这里展开。

image-20220111141400793

我们展示来看下,真的异步执行的方法doSubmit()逻辑。这里我们可以小小的装个13,为什么呢?因为你这里都直接return null;为什么还要调用executor的submit,来构建future对象。用execute方法直接返回void少一次对象的创建。


image-20220111150539575

value的作用,自定义线程池。隔离不同业务属性

@Async more

我们来解决我们的todo事项:

留个TODO:我们返回为int的基本类型,可以学习了解到AOP的执行流程。

进阶TODO:@Transactional执行流程是怎么样的呢?

image-20220111171444764

首先我们定义了两个Interceptor。

接下来我们讨论的Cglib的动态代理流程

org.springframework.aop.framework.CglibAopProxy.DynamicAdvisedInterceptor

image-20220111174031907

image-20220111175321737

获取通过代理对象,执行的拦截器顺序。

org.springframework.aop.framework.ReflectiveMethodInvocation#proceed

image-20220111174227877

这是proceed的处理逻辑,只有拦截器的代码逻辑执行完了,才执行本身的方法。

接下来我们看2个具体的拦截器执行流程:

org.springframework.transaction.interceptor.TransactionInterceptor#invoke

image-20220111174431855

org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke

image-20220111174508266