Executor框架

继承关系图

image-20260510213334831

  1. 被框架执行的任务需要实现的 Runnable 接口Callable接口
  2. 框架能执行任务需要实现ExecutorService接口
  3. 任务执行完毕的结果是 FutureTask 对象

三种线程池

主要就是上面类图的三个主要实现类

1. ScheduledThreadPoolExecutor

核心特征:ThreadPoolExecutor的子类,专为定时/周期性任务设计

使用示例:构造线程池,提交任务

image-20260510214641504

主要有三种任务:

  1. schedule(task, delay, unit):在指定延迟后执行一次任务。
  2. scheduleAtFixedRate(task, initialDelay, period, unit):固定频率执行。任务开始时间严格按initialDelay, initialDelay+period, ...推进,不受任务自身执行时间影响。
  3. scheduleWithFixedDelay(task, initialDelay, delay, unit):固定间隔执行。下次任务开始时间 = 上次任务执行结束的时间 + delay

Tips:如果异常没有捕获的话,会静默取消,不会再往后执行;安全实践:对于周期任务,务必在其run()方法内部使用try-catch块捕获所有Throwable,防止静默失败。

2. ForkJoinPool

核心特征:为分治算法优化的线程池,核心是工作窃取(Work-Stealing)

感觉没必要学

3. ThreadPoolExecutor

前言:Executors类的快捷静态方法可以额构造ThreadPoolExecutor,但一般用ThreadPoolExecutor的构造方法自定义参数使用

Tips:Spring框架的ThreadPoolTaskExecutor底层也是使用的ThreadPoolExecutor

一般用ThreadPoolTaskExecutor,使用方法先@Bean交给容器管理:

image-20260510233854185

然后使用@Async 注解简化使用

image-20260510234037730

ThreadPoolExecutor有两种提交方式:

上面简化使用的时候👆🏻会通过返回值判断,如果是void则是execute否则是submit

方法 参数 返回值 异常处理
execute(Runnable) 只接受 Runnable 没有返回值void 任务内的未捕获异常会直接抛给线程的 UncaughtExceptionHandler,调用方无法感知。
submit(Runnable) 接受 Runnable 返回 Future<?>,可以调用 get() 获取 null(任务成功)或捕获异常。 异常会被封装在 Future.get() 中抛出(ExecutionException)。
submit(Callable<T>) 接受 Callable 返回 Future<T>,通过 get() 获取实际返回值。 同上。

核心参数:

corePoolSize 核心线程数
maximumPoolSize 最大运行线程
workQueue 达到核心放到队列
keepAliveTime 非核心存活时间
TimeUnit 上面的时间
threadFactory 默认即可 创建线程的
handler 拒绝策略

工作流程:

  1. 新任务提交的时候,如果当前工作线程数 < corePoolSize,就会用threadFactory创建新线程执行任务,这时创建的都是核心线程,永久存在
  2. 否则就进入workQueue排队还扛得住,等待核心线程释放执行
  3. 队列也满了,就看当前工作线程是不是<maximumPoolSize,是的话threadFactory创建临时线程执行任务(执行的当前任务,队列里的等待核心线程空闲时被执行),空闲超过 keepAliveTime 就会被回收
  4. 当前工作线程>maximumPoolSize,就会执行handler拒绝策略

拒绝策略:

  1. ThreadPoolExecutor.AbortPolicy 抛出异常

  2. ThreadPoolExecutor.CallerRunsPolicy 在调用者线程里运行

  3. ThreadPoolExecutor.DiscardPolicy 直接丢弃

  4. ThreadPoolExecutor.DiscardOldestPolicy 丢弃最早未处理【**DiscardOldestPolicy 丢弃的正是队列头部那个等待最久的任务**】

拒绝策略进阶:如果不想丢弃任务又不想在调用者线程里去运行,可以实现RejectedExecutionHandler重写rejectedExecution方法,来自定义拒绝行为,可以把他存到数据库里;然后再重写LinkedBlockingQueue队列的take方法,即从队列取任务时先从数据里去任务

任务是什么?怎么存?

工作队列:

  1. LinkedBlockingQueue 链表队列,如果不指定容量,等同于无界队列【OOM风险,一定要指定容量】

  2. SynchronousQueue 同步队列,无容量,只是为了填参,实际行为就是不会队列等待,直接创建非核心线程

  3. ArrayBlockingQueue 数组队列,与有界 LinkedBlockingQueue 行为一致

  4. PriorityBlockingQueue 优先级队列

使用策略:需要缓冲大量任务时使用1和3;需要高吞吐,少等待的场景就会使用2;需要按优先级执行任务就是4

最佳实践:

  1. excutor.scheduleAtFixedRate可以获取线程池实时参数

    image-20260511004816434

  2. 不同的业务类别使用不同的线程池

  3. 核心线程数设置策略:

    • CPU密集型:N(N为CPU核心数)不再是N+1了,多出来的1是为了处理缺页中断,单处理中断仍需要占用CPU核心

    • IO密集型:2N

      可以再乘以一负载因子,一般0.75,因为不止一个线程池,线程太多,切换上下文开销大

  4. 一般要重写ThreadFactory的newThread方法,主要是自定义次线程名字,方便日志记录

CompletableFuture

默认使用 ForkJoinPool.commonPool() 执行异步任务(当不指定线程池时),这可能导致公共池线程被阻塞,影响其他并行流操作。因此在实际项目中,强烈建议自定义 ThreadPoolExecutor 并与 CompletableFuture 结合使用

创建

有两个静态方法可以执行任务:

  1. runAsync不允许有返回值

  2. supplyAsync有返回值

image-20260511143110338

任务编排

链式依赖:thenCompose

image-20260511150732271

组合:thenCombine

image-20260511150903225

竞速:applyToEither

image-20260511151033846

多任务并行:allOf

image-20260511151117220

处理异常

使用Handle方法处理异常

image-20260511144335528

结果转换与消费then

对执行完获得的Future结果可以进一步处理:

image-20260511143614813

大概有以下几种处理的方式:

image-20260511145338123

image-20260511152045546

动态线程池

开源框架Dynamic TP,支持多种配置中心,提供告警功能

自己实现:

  1. 使用Nacos接收配置
  2. 当配置变化时,使用**@NacosConfigListener(dataId = “dynamic-thread-pool-demo”, groupId = “DEFAULT_GROUP”)**监听变化,执行reload操作
  3. image-20260515121038983

image-20260515121309662

执行:image-20260515121544003

池化技术

空间换时间:提前占用一部分内存和资源存放对象,换取创建时的 CPU 开销和时间延迟。

资源管控:限制资源的最大使用量,防止无节制创建导致系统崩溃(如数据库连接耗尽、线程过多导致上下文切换风暴)。

复用机制:对象使用完毕后不销毁,而是放回池中待下次分配,减少 GC 压力。