一、核心概念
1.1 什么是异步调用?
异步调用允许方法在不阻塞调用线程的情况下执行。调用方发起请求后立即返回,实际的业务逻辑在另一个线程中执行。
1.2 为什么需要异步调用?
- 提升响应速度:将耗时操作(如发送邮件、调用外部 API、生成报告)移出主请求线程,缩短接口响应时间。
- 提高吞吐量:主请求线程能更快地处理后续请求。
- 解耦业务逻辑:将非核心、可延迟的操作分离。
1.3 Spring Boot 中的 @Async
@Async
是 Spring 提供的注解,用于标记一个方法为异步执行。- 基于 Spring AOP 和 Java 线程池实现。
- 方法执行由
TaskExecutor
(任务执行器)管理。
1.4 核心组件
组件 | 说明 |
---|---|
@Async |
标记异步方法的注解。 |
@EnableAsync |
启用 Spring 的异步方法执行功能(Spring Boot 通常自动配置)。 |
TaskExecutor |
任务执行器接口,Spring 使用 ThreadPoolTaskExecutor 作为默认实现。 |
Future<T> / CompletableFuture<T> |
异步方法的返回类型,用于获取执行结果或状态。 |
1.5 返回类型
返回类型 | 说明 |
---|---|
void |
纯异步,不关心结果。 |
Future<T> |
可通过 get() 阻塞获取结果。 |
CompletableFuture<T> |
Java 8+ 提供,支持更强大的异步编程(链式调用、组合等)。 |
ListenableFuture<T> |
Spring 扩展,支持回调。 |
二、详细操作步骤(适合快速实践)
步骤 1:启用异步功能
虽然 Spring Boot 会自动配置 @Async
(如果检测到相关类),但显式启用更清晰。
// Application.java
@SpringBootApplication
@EnableAsync // 启用异步支持
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
步骤 2:定义异步方法
创建一个服务类,其中包含 @Async
标记的方法。
// AsyncService.java
@Service
@Slf4j
public class AsyncService {
// ==================== 1. 返回 void 的异步方法 ====================
@Async
public void sendEmail(String to, String subject, String content) {
log.info("开始发送邮件到: {}, 主题: {}", to, subject);
try {
// 模拟耗时操作 (如调用邮件服务)
Thread.sleep(3000);
log.info("邮件发送成功: {}", subject);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("发送邮件被中断", e);
}
}
// ==================== 2. 返回 Future 的异步方法 ====================
@Async
public Future<String> generateReport(String reportId) {
log.info("开始生成报告: {}", reportId);
try {
Thread.sleep(5000); // 模拟生成耗时
String reportData = "Report Data for " + reportId;
log.info("报告生成完成: {}", reportId);
return new AsyncResult<>(reportData); // AsyncResult 是 Future 的实现
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("生成报告被中断", e);
return new AsyncResult<>("生成失败");
}
}
// ==================== 3. 返回 CompletableFuture 的异步方法 ====================
@Async
public CompletableFuture<String> fetchDataFromApi(String apiEndpoint) {
log.info("开始调用外部 API: {}", apiEndpoint);
return CompletableFuture.supplyAsync(() -> {
try {
// 模拟 API 调用
Thread.sleep(2000);
return "Data from " + apiEndpoint;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("API 调用失败", e);
}
});
// 注意:CompletableFuture.supplyAsync 默认使用 ForkJoinPool.commonPool()
// 如果希望使用 Spring 的 TaskExecutor,需要传入:
// return CompletableFuture.supplyAsync(() -> { ... }, taskExecutor);
}
}
步骤 3:在 Controller 或 Service 中调用异步方法
// UserController.java
@RestController
@RequestMapping("/api/users")
@Slf4j
public class UserController {
@Autowired
private AsyncService asyncService;
@Autowired
private UserService userService; // 假设的业务服务
// ==================== 1. 调用 void 方法 ====================
@PostMapping("/{id}/notify")
public ResponseEntity<String> notifyUser(@PathVariable Long id) {
User user = userService.findById(id);
if (user == null) {
return ResponseEntity.notFound().build();
}
// 异步发送邮件,立即返回
asyncService.sendEmail(user.getEmail(), "欢迎通知", "欢迎加入我们的平台!");
log.info("已触发邮件发送任务,用户ID: {}", id);
return ResponseEntity.ok("通知已发送(异步)");
}
// ==================== 2. 调用 Future 方法 ====================
@GetMapping("/report/{reportId}")
public ResponseEntity<?> getReport(@PathVariable String reportId) throws ExecutionException, InterruptedException {
log.info("请求报告: {}", reportId);
// 启动异步任务
Future<String> future = asyncService.generateReport(reportId);
// 可以做其他事情...
log.info("正在执行其他操作...");
// 阻塞等待结果(不推荐在 Web 请求中长时间阻塞)
String reportData = future.get(); // get() 会阻塞直到任务完成
log.info("获取到报告数据");
return ResponseEntity.ok(reportData);
}
// ==================== 3. 调用 CompletableFuture 方法 ====================
@GetMapping("/data/{endpoint}")
public CompletableFuture<ResponseEntity<String>> fetchData(@PathVariable String endpoint) {
log.info("请求外部数据: {}", endpoint);
// 返回 CompletableFuture,Spring MVC 会自动处理
return asyncService.fetchDataFromApi(endpoint)
.thenApply(data -> ResponseEntity.ok(data))
.exceptionally(ex -> {
log.error("获取数据失败", ex);
return ResponseEntity.status(500).body("获取数据失败: " + ex.getMessage());
});
}
}
步骤 4:自定义线程池(强烈推荐)
默认线程池(SimpleAsyncTaskExecutor
)为每个任务创建新线程,性能差。应配置 ThreadPoolTaskExecutor
。
// AsyncConfig.java
@Configuration
@EnableAsync
@Slf4j
public class AsyncConfig {
@Value("${async.core-pool-size:5}")
private int corePoolSize;
@Value("${async.max-pool-size:10}")
private int maxPoolSize;
@Value("${async.queue-capacity:100}")
private int queueCapacity;
@Value("${async.keep-alive-seconds:60}")
private int keepAliveSeconds;
@Bean(name = "taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize); // 核心线程数
executor.setMaxPoolSize(maxPoolSize); // 最大线程数
executor.setQueueCapacity(queueCapacity); // 队列容量
executor.setKeepAliveSeconds(keepAliveSeconds); // 空闲线程存活时间
executor.setThreadNamePrefix("Async-"); // 线程名前缀
executor.setRejectedExecutionHandler(new ThreadPoolTaskExecutor.CallerRunsPolicy());
// 拒绝策略:由调用线程执行任务(避免丢弃)
executor.initialize(); // 必须调用 initialize()
return executor;
}
// ==================== 4.1. 自定义 CompletableFuture 的 Executor ====================
@Bean(name = "completableFutureExecutor")
public Executor completableFutureExecutor() {
// 可以为 CompletableFuture 创建专用线程池
return taskExecutor(); // 复用上面的线程池,或创建新的
}
}
# application.yml
# 自定义线程池参数
async:
core-pool-size: 5
max-pool-size: 20
queue-capacity: 100
keep-alive-seconds: 60
修改 AsyncService
以使用自定义线程池:
// AsyncService.java
@Service
@Slf4j
public class AsyncService {
@Autowired
@Qualifier("taskExecutor") // 指定使用的 Executor Bean
private Executor taskExecutor;
@Async("taskExecutor") // 指定使用名为 "taskExecutor" 的 Bean
public void sendEmail(String to, String subject, String content) {
// ... 方法体不变
}
@Async("taskExecutor")
public Future<String> generateReport(String reportId) {
// ... 方法体不变
}
@Async("completableFutureExecutor") // 如果有专用线程池
public CompletableFuture<String> fetchDataFromApi(String apiEndpoint) {
// 注意:CompletableFuture.supplyAsync 需要传入 Executor
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
return "Data from " + apiEndpoint;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("API 调用失败", e);
}
}, taskExecutor); // 显式传入 Spring 的 taskExecutor
}
}
步骤 5:测试异步调用
启动应用并测试:
调用
notifyUser
:POST /api/users/1/notify
- 响应应立即返回。
- 查看日志,确认
Async-
前缀的线程执行了sendEmail
。
调用
getReport
:GET /api/report/test123
- 响应会在约 5 秒后返回。
- 日志显示任务启动后立即执行其他操作,然后阻塞等待。
调用
fetchData
:GET /api/data/external
- 响应会在约 2 秒后返回。
- 利用
CompletableFuture
的非阻塞性。
三、常见错误与解决方案
错误现象 | 原因分析 | 解决方案 |
---|---|---|
@Async 方法不异步 |
1. 未加 @EnableAsync 2. 在同一个类中直接调用 @Async 方法3. 方法不是 public 4. Bean 未被 Spring 管理 |
1. 确保 @EnableAsync 2. 通过注入的 Bean 调用 3. 方法必须是 public 4. 使用 @Service , @Component 等 |
TaskRejectedException |
线程池队列已满且达到最大线程数 | 1. 增大队列容量或最大线程数 2. 使用合适的拒绝策略(如 CallerRunsPolicy )3. 优化任务或增加资源 |
AsyncUncaughtExceptionHandler 处理未捕获异常 |
void 方法抛出异常未被捕获 |
实现 AsyncUncaughtExceptionHandler 并配置 |
CompletableFuture 使用默认线程池 |
supplyAsync() 未传入 Executor |
显式传入自定义的 Executor |
四、注意事项
- 调用位置:
@Async
方法必须由 Spring 管理的 Bean 通过代理调用。不能在同一个类中直接调用。
// 错误! public class MyService { @Async public void asyncMethod() { ... } public void syncMethod() { asyncMethod(); // 直接调用,不会异步! } } // 正确! @Service public class MyService { @Autowired private MyService self; // 自注入或通过其他 Bean 调用 @Async public void asyncMethod() { ... } public void syncMethod() { self.asyncMethod(); // 通过代理调用 } }
- 方法可见性:
@Async
方法必须是public
。
- 返回类型:
void
方法的异常无法直接捕获,需配置AsyncUncaughtExceptionHandler
。Future
的get()
会阻塞,慎用。
- 事务传播:
@Async
方法默认不在调用方的事务中执行。如果需要,需在异步方法上添加@Transactional
。
- 线程上下文丢失:
- MDC(日志上下文)、SecurityContext 等可能在异步线程中丢失。需要手动传递或配置。
五、使用技巧
5.1 配置 AsyncUncaughtExceptionHandler
处理 void
方法的未捕获异常。
// CustomAsyncExceptionHandler.java
@Component
@Slf4j
public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
log.error("异步方法 {} 执行出错, 参数: {}, 错误: ", method.getName(), Arrays.toString(params), ex);
// 可发送告警、记录到特殊日志等
}
}
// AsyncConfig.java
@Configuration
@EnableAsync
public class AsyncConfig {
// ... 线程池配置
@Autowired
private CustomAsyncExceptionHandler customAsyncExceptionHandler;
@Override
public void configureAsynchronousProcessing(AsyncConfigurer configurer) {
configurer.setExecutor(taskExecutor());
configurer.setAsyncUncaughtExceptionHandler(customAsyncExceptionHandler);
}
}
5.2 传递 MDC 上下文
// MDC 传递装饰器
public class MdcTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
Map<String, String> contextMap = MDC.getCopyOfContextMap(); // 复制当前 MDC
return () -> {
try {
if (contextMap != null) {
MDC.setContextMap(contextMap); // 设置到异步线程
}
runnable.run();
} finally {
MDC.clear(); // 清理
}
};
}
}
// AsyncConfig.java
@Bean(name = "taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// ... 配置 corePoolSize, maxPoolSize 等
executor.setTaskDecorator(new MdcTaskDecorator()); // 设置装饰器
executor.initialize();
return executor;
}
5.3 监控线程池
// ThreadPoolMonitor.java
@Component
@Slf4j
public class ThreadPoolMonitor {
@Autowired
@Qualifier("taskExecutor")
private ThreadPoolTaskExecutor taskExecutor;
@Scheduled(fixedRate = 10000) // 每10秒监控一次
public void monitor() {
if (taskExecutor != null) {
log.info("Async ThreadPool - Active: {}, Pool Size: {}, Max Pool Size: {}, Queue Size: {}",
taskExecutor.getActiveCount(),
taskExecutor.getPoolSize(),
taskExecutor.getMaxPoolSize(),
taskExecutor.getQueue().size());
}
}
}
5.4 使用 @Async
的 value 属性
指定不同的线程池 Bean。
@Async("emailExecutor")
public void sendEmailAsync(String to, String content) { ... }
@Async("reportExecutor")
public Future<String> generateReportAsync(String id) { ... }
六、最佳实践
- 始终配置自定义线程池:避免使用默认的
SimpleAsyncTaskExecutor
。 - 合理设置线程池参数:根据业务负载调整
corePoolSize
,maxPoolSize
,queueCapacity
。 - 选择合适的拒绝策略:
CallerRunsPolicy
通常比AbortPolicy
更友好。 - 避免在异步方法中处理关键事务:除非明确需要。
- 监控异步任务:记录执行时间、失败率。
- 处理异常:特别是
void
方法的异常。 - 传递上下文:确保日志、安全等上下文在异步线程中可用。
七、性能优化
- 线程池调优:
- CPU 密集型任务:
corePoolSize
≈ CPU 核心数。 - IO 密集型任务:
corePoolSize
可更大(如 2 * CPU 核心数)。 - 避免
maxPoolSize
过大导致线程过多,消耗资源。
- CPU 密集型任务:
- 队列选择:
LinkedBlockingQueue
:无界队列(小心 OOM)。ArrayBlockingQueue
:有界队列,需配合拒绝策略。
- 减少线程创建开销:
- 预热线程池(
prestartAllCoreThreads()
)。
- 预热线程池(
- 异步日志:
- 确保日志框架(如 Logback)配置了异步 Appender,避免日志 I/O 阻塞异步线程。
- 避免过度异步:
- 并非所有操作都适合异步。权衡复杂性和收益。
总结
@Async
是 Spring Boot 中实现异步调用的便捷方式。
关键步骤:
@EnableAsync
启用。@Async
标记方法。- 配置自定义
ThreadPoolTaskExecutor
(最重要!)。 - 注意调用方式(避免自调用)。
- 处理异常和上下文传递。
推荐配置:
- 使用有界队列 +
CallerRunsPolicy
拒绝策略。 - 监控线程池状态。
- 为
CompletableFuture
指定自定义Executor
。
本文基于 Spring Boot 3.x 编写。