springboot实现异步调用@Async

tech2025-03-28  1

介绍

所谓的异步执行其实就是使用多线程的方式实现异步调用 异步有什么好处呢? 如果一个业务逻辑执行完成需要多个步骤,也就是调用多个方法去执行, 这个时候异步执行比同步执行相应更快。不过要注意异步请求的顺序和处理结果的顺序最好一致,不然就达不到效果了

注意事项

1、在默认情况下,未设置TaskExecutor时,默认是使用SimpleAsyncTaskExecutor这个线程池,但此线程不是真正意义上的线程池,因为线程不重用,每次调用都会创建一个新的线程。可通过控制台日志输出可以看出,每次输出线程名都是递增的。所以最好我们来自定义一个线程池。 2、调用的异步方法,不能为同一个类的方法(包括同一个类的内部类),简单来说,因为Spring在启动扫描时会为其创建一个代理类,而同类调用时,还是调用本身的代理类的,所以和平常调用是一样的。其他的注解如@Cache等也是一样的道理,说白了,就是Spring的代理机制造成的。所以在开发中,最好把异步服务单独抽出一个类来管理。下面会重点讲述

什么情况下会导致@Async异步方法会失效?

1.不要在本类中异步调用。即一个方法是异步方法,然后用另一个方法调用这个异步方法。 2.不要有返回值,使用void 3.不能使用本类的私有方法或者非接口化加注@Async,因为代理不到失效 4.异步方法不能使用static修饰 5.异步类需使用@Component注解,不然将导致spring无法扫描到异步类 6.SpringBoot框架必须在启动类中增加@EnableAsync注解 7.异步方法不要和事物注解同时存在。可以在事物的方法中调用另外一个类中的异步方法。在调用Async方法的方法上标注@Transactional是管理调用方法的事务的,在Async方法上标注@Transactional是管理异步方法的事务,事务因线程隔离 8.诸如以上几点的情况比如spring中的@Transactional还有cache注解也不能有以上几点情况,否则也会失效的,因为本质都是因为代理的机制导致的

9.类中需要使用@Autowired或@Resource等注解自动注入,不能自己手动new对象

10.在Async 方法上标注@Transactional是没用的。 在Async 方法调用的方法上标注@Transactional 有效

定义一个线程池

@Configuration public class AsyncTaskPoolConfig { @Bean(name = "taskExecutor") public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(200); executor.setQueueCapacity(50); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("taskExecutor-ws-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } }

PS:使用@Async 注解时,需要注意一下几点:

当项目中只有一个线程池时,我们只需要写 @Async 即可将需要异步的方法进行异步;当项目中存在多个线程池,我们在写异步时,需要注意如果只写@Async注解没有任何属性则将此方法的执行异步到带有 @Primary 注解修饰的线程池中执行。还可以将方法异步到指定的线程池中,如 @Async(“threadPool”)则是将此方法异步到threadPool 线程池中执行。 package com.xsrt.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.ThreadPoolExecutor; @Configuration @EnableAsync public class AsyncConfig { @Bean("threadPoolTaskExecutor") public ThreadPoolTaskExecutor threadPoolTaskExecutor(){ ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors()*2); taskExecutor.setMaxPoolSize(60); taskExecutor.setQueueCapacity(20000); taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); taskExecutor.setThreadGroupName("Task-"); taskExecutor.setThreadNamePrefix("Async-"); taskExecutor.setBeanName("threadPoolTaskExecutor"); return taskExecutor; } }

如果代码中需要多个线程池,可以按照如下方式配置

package com.xsrt.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.ThreadPoolExecutor; @Configuration @EnableAsync public class AsyncConfig { @Bean("threadPoolTaskExecutor") @Primary //指定当前线程池为主线程池 public ThreadPoolTaskExecutor threadPoolTaskExecutor(){ ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors()*2); taskExecutor.setMaxPoolSize(60); taskExecutor.setQueueCapacity(2000); taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); taskExecutor.setThreadGroupName("Task-"); taskExecutor.setThreadNamePrefix("Async-"); taskExecutor.setBeanName("threadPoolTaskExecutor"); return taskExecutor; } @Bean("threadPool")//其他线程池 public ThreadPoolTaskExecutor threadPool(){ ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors()*2); taskExecutor.setMaxPoolSize(60); taskExecutor.setQueueCapacity(200); taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); taskExecutor.setThreadGroupName("Task-"); taskExecutor.setThreadNamePrefix("Pool-"); taskExecutor.setBeanName("threadPoolTaskExecutor"); return taskExecutor; } }

异步方法调用如下

package com.ccbobe.websocket.async; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; @Slf4j @Service public class AsyncService { @Async public String asyncPrimary(){ log.info("执行 Primary 异步......"); return null; } @Async("threadPool") public String asyncPools(){ log.info("执行 threadPool 异步......"); return null; } }

使用@Async和@EnableAsync注解

首先使用@EnableAsync注解开启异步调用功能,该注解可以放置的位置有:

启动类上方

@EnableAsync @SpringBootApplication public class MyApplication { public static void main(String[] args) { SpringApplication.run(MyApplication.class, args); }

调用异步的当前类上方

@EnableAsync @RestController public class TestAsyncController(){}

在配置类上方使用

@Configuration @EnableAsync public class AsyncTaskPoolConfig { @Bean(name = "taskExecutor") public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(200); executor.setQueueCapacity(50); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("taskExecutor-ws-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } }

编写异步请求

在异步执行的方法上添加注解:@Async

Component @Log4j2 public class AsyncTask { //这里注入的是dubbo的服务,和异步请求没有多大关系 @Reference(check = false) private AuthorFacade authorFacade; /** * 获取作者信息 * * @param authorId 作者ID * @return 作者信息 */ @Async("taskExecutor") public Future<AuthorDTO> getAuthor(String authorId){ try { System.out.println("执行线程的名字:"+Thread.currentThread().getName()); return new AsyncResult<AuthorDTO>(authorFacade.getAuthor(authorId)); } catch (Exception e) { e.printStackTrace(); return null; } } }

在service里调用异步执行的方法

/** * 异步调用获取文章信息 * * @param articleId 文章ID * @return 文章信息 */ @Override public Map getArticleDetailAsync(String articleId){ //同步调用获取文章信息 ArticleDTO articleDTO = articleFacade.getArticle(articleId); //异步调用获取作者信息 Future authorFuture = asyncTask.getAuthor(articleDTO.getAuthorId());

Map<String,Object> map=new HashMap<>(10); map.put("article",articleDTO); try{ map.put("author",authorFuture.get()); }catch (Exception e){ log.error(e.getMessage()); } return map; }

PS:1、当一个类去调用标注了异步注解的方法时,当前类其实就是主线程,而调用标注异步注解的方法其实就相当于一个子线程,只有当主线程运行完了,其实可以用通过一些例子来证实的,给异步执行的方法阻塞几秒钟,查看下线程的执行情况 2、实际上,@Async还有一个参数,通过Bean名称来指定调用的线程池-比如上例中设置的线程池参数不满足业务需求,可以另外定义合适的线程池,调用时指明使用这个线程池-缺省时springboot会优先使用名称为'taskExecutor'的线程池,如果没有找到,才会使用其他类型为TaskExecutor或其子类的线程池。

进阶

有时候我们不止希望异步执行任务,还希望任务执行完成后会有一个返回值,在java中提供了Future泛型接口,用来接收任务执行结果,springboot也提供了此类支持,使用实现了ListenableFuture接口的类如AsyncResult来作为返回值的载体。比如上例中,我们希望返回一个类型为String类型的值,可以将返回值改造为:

@Async public ListenableFuture<String> sayHello(String name) { String res = name + ":Hello World!"; LoggerFactory.getLogger(Hello.class).info(res); return new AsyncResult<>(res); }

调用返回值:

@Autowired private Hello hello; // 阻塞调用 hello.sayHello("yan").get(); // 限时调用 hello.sayHello("yan").get(1, TimeUnit.SECONDS)
最新回复(0)