线程池+ CompletableFuture
Executor框架
继承关系图

- 被框架执行的任务需要实现的
Runnable接口 或Callable接口 - 框架能执行任务需要实现
ExecutorService接口 - 任务执行完毕的结果是
FutureTask对象
三种线程池
主要就是上面类图的三个主要实现类
1. ScheduledThreadPoolExecutor
核心特征:ThreadPoolExecutor的子类,专为定时/周期性任务设计
使用示例:构造线程池,提交任务

主要有三种任务:
schedule(task, delay, unit):在指定延迟后执行一次任务。scheduleAtFixedRate(task, initialDelay, period, unit):固定频率执行。任务开始时间严格按initialDelay, initialDelay+period, ...推进,不受任务自身执行时间影响。scheduleWithFixedDelay(task, initialDelay, delay, unit):固定间隔执行。下次任务开始时间 = 上次任务执行结束的时间 +delay。
Tips:如果异常没有捕获的话,会静默取消,不会再往后执行;安全实践:对于周期任务,务必在其
run()方法内部使用try-catch块捕获所有Throwable,防止静默失败。
2. ForkJoinPool
核心特征:为分治算法优化的线程池,核心是工作窃取(Work-Stealing)
感觉没必要学
3. ThreadPoolExecutor
前言:Executors类的快捷静态方法可以额构造ThreadPoolExecutor,但一般用ThreadPoolExecutor的构造方法自定义参数使用
Tips:Spring框架的ThreadPoolTaskExecutor底层也是使用的ThreadPoolExecutor
一般用ThreadPoolTaskExecutor,使用方法先@Bean交给容器管理:

然后使用@Async 注解简化使用

ThreadPoolExecutor有两种提交方式:
上面简化使用的时候👆🏻会通过返回值判断,如果是void则是
execute否则是submit
| 方法 | 参数 | 返回值 | 异常处理 |
|---|---|---|---|
execute(Runnable) |
只接受 Runnable |
没有返回值(void) |
任务内的未捕获异常会直接抛给线程的 UncaughtExceptionHandler,调用方无法感知。 |
submit(Runnable) |
接受 Runnable |
返回 Future<?>,可以调用 get() 获取 null(任务成功)或捕获异常。 |
异常会被封装在 Future.get() 中抛出(ExecutionException)。 |
submit(Callable<T>) |
接受 Callable |
返回 Future<T>,通过 get() 获取实际返回值。 |
同上。 |
核心参数:
| corePoolSize | 核心线程数 |
|---|---|
| maximumPoolSize | 最大运行线程 |
| workQueue | 达到核心放到队列 |
| keepAliveTime | 非核心存活时间 |
| TimeUnit | 上面的时间 |
| threadFactory | 默认即可 创建线程的 |
| handler | 拒绝策略 |
工作流程:
- 新任务提交的时候,如果当前工作线程数 < corePoolSize,就会用threadFactory创建新线程执行任务,这时创建的都是核心线程,永久存在
- 否则就进入workQueue排队还扛得住,等待核心线程释放执行
- 队列也满了,就看当前工作线程是不是<maximumPoolSize,是的话threadFactory创建临时线程执行任务(执行的当前任务,队列里的等待核心线程空闲时被执行),空闲超过 keepAliveTime 就会被回收
- 当前工作线程>maximumPoolSize,就会执行handler拒绝策略
拒绝策略:
ThreadPoolExecutor.AbortPolicy抛出异常ThreadPoolExecutor.CallerRunsPolicy在调用者线程里运行ThreadPoolExecutor.DiscardPolicy直接丢弃ThreadPoolExecutor.DiscardOldestPolicy丢弃最早未处理【**DiscardOldestPolicy丢弃的正是队列头部那个等待最久的任务**】
拒绝策略进阶:如果不想丢弃任务又不想在调用者线程里去运行,可以实现RejectedExecutionHandler重写rejectedExecution方法,来自定义拒绝行为,可以把他存到数据库里;然后再重写LinkedBlockingQueue队列的take方法,即从队列取任务时先从数据里去任务
任务是什么?怎么存?
工作队列:
LinkedBlockingQueue链表队列,如果不指定容量,等同于无界队列【OOM风险,一定要指定容量】SynchronousQueue同步队列,无容量,只是为了填参,实际行为就是不会队列等待,直接创建非核心线程ArrayBlockingQueue数组队列,与有界LinkedBlockingQueue行为一致PriorityBlockingQueue优先级队列
使用策略:需要缓冲大量任务时使用1和3;需要高吞吐,少等待的场景就会使用2;需要按优先级执行任务就是4
最佳实践:
excutor.scheduleAtFixedRate可以获取线程池实时参数

不同的业务类别使用不同的线程池
核心线程数设置策略:
CPU密集型:N(N为CPU核心数)不再是N+1了,多出来的1是为了处理缺页中断,单处理中断仍需要占用CPU核心
IO密集型:2N
可以再乘以一负载因子,一般0.75,因为不止一个线程池,线程太多,切换上下文开销大
一般要重写ThreadFactory的newThread方法,主要是自定义次线程名字,方便日志记录
CompletableFuture
默认使用 ForkJoinPool.commonPool() 执行异步任务(当不指定线程池时),这可能导致公共池线程被阻塞,影响其他并行流操作。因此在实际项目中,强烈建议自定义 ThreadPoolExecutor 并与 CompletableFuture 结合使用
创建
有两个静态方法可以执行任务:
runAsync不允许有返回值supplyAsync有返回值

任务编排
链式依赖:thenCompose

组合:thenCombine

竞速:applyToEither

多任务并行:allOf

处理异常
使用Handle方法处理异常

结果转换与消费then
对执行完获得的Future结果可以进一步处理:

大概有以下几种处理的方式:

动态线程池
开源框架Dynamic TP,支持多种配置中心,提供告警功能
自己实现:
- 使用Nacos接收配置
- 当配置变化时,使用**@NacosConfigListener(dataId = “dynamic-thread-pool-demo”, groupId = “DEFAULT_GROUP”)**监听变化,执行reload操作


执行:
池化技术
空间换时间:提前占用一部分内存和资源存放对象,换取创建时的 CPU 开销和时间延迟。
资源管控:限制资源的最大使用量,防止无节制创建导致系统崩溃(如数据库连接耗尽、线程过多导致上下文切换风暴)。
复用机制:对象使用完毕后不销毁,而是放回池中待下次分配,减少 GC 压力。


