线程池
作用:
- 线程复用 控制最大并发数
- 实现任务缓存策略以及拒绝策略
- 定期执行 周期执行
- 隔离不同业务的线程执行环境
解决了两个问题:
1:通过减少任务间的调度开销 (主要是通过线程池中的线程被重复使用的方式),来提高大量任务时的执行性能; 2:提供了一种方式来管理线程和消费,维护基本数据统计等工作
线程池决定了任务的执行策略:
- 什么线程
- 什么顺序
- 多少任务执行
- 多少任务等待
- 如何放弃以及通知放弃
- 任务执行前操作
Executor框架
public interface Executor {
void execute(Runnable command);
}
ExecutorService继承了Executor,增加了一些方法
public interface ExecutorService extends Executor {
// 平缓关闭
void shutdown();
// 粗暴关闭
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
}
execute&submit
使用这两个方法一定要注意,execute会将runnable交给worker去执行,出现异常会打印异常栈。
而submit方法则会将runnable包装成一个ScheduledFutureTask 这个类会将异常吞掉,不会打印异常栈。
Callable
- 拥有返回值
Future
用来执行一些较长时间的计算,通过get来获取结果(阻塞或者超时)
用于异步获取执行结果或取消执行任务的场景
FutureTask<Integer> futureTask = new FutureTask<>(() -> {
int result = 0;
for (int i = 0; i < 100; i++) {
Thread.sleep(10);
result += i;
}
return result;
});
new Thread(futureTask).start();
System.out.println(futureTask.get());
Future<String> future = pool.submit(() -> {
Thread.sleep(3000);
return "java";
});
String s = future.get();
Future模式
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyFuture myFuture = new MyFuture();
// 在这里 main thread 可以做其他事情
// 下一行代码将阻塞直到结果可用
System.out.println(myFuture.getData());
}
}
class MyFuture{
private volatile boolean FLAG = false;
private String data;
public MyFuture() {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("future 任务开始 睡眠 3000ms");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("future 任务结束");
setData("jntm");
}
}).start();
}
private synchronized void setData(String data){
if (FLAG){
return;
}
this.data = data;
FLAG = true;
notify();
}
public synchronized String getData(){
while (!FLAG){
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return data;
}
}
CompletableFuture
使用观察者模式来实现
有意思的是这个类跟Stream的终结流有点像,只有终点调用了取值的操作,整个调用兰才会被执行
某些方法可以传递一个线程池参数,当不传递Executor时,会使用ForkJoinPool中的共用线程池CommonPool(CommonPool的大小是CPU核数-1,如果是IO密集的应用,线程数可能成为瓶颈
// 此处 cf1 跟 cf2并发
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
System.out.println("cf1");
return "cf1";
});
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
sleep(500);
System.out.println("cf2");
return "cf2";
});
// cf3等待c1执行完毕
CompletableFuture<String> cf3 = cf1.thenApply(result -> {
System.out.println("cf3");
return "cf3";
});
// cf4 等待 cf1跟cf2执行完毕
CompletableFuture<String> cf4 = cf1.thenCombine(cf2, (result1, result2) -> {
System.out.println("cf4");
return "cf4";
});
// cf5等待cf2执行完毕
CompletableFuture<String> cf5 = cf2.thenApply(result -> {
System.out.println("cf5");
return "cf5";
});
// cf6等待 3 4 5执行完毕
CompletableFuture<Void> cf6 = CompletableFuture.allOf(cf3, cf4, cf5);
System.out.println(cf6.thenApply(v -> {
try {
return cf3.get() + cf4.get() + cf5.get();
} catch (InterruptedException | ExecutionException e) {
return null;
}
}).get());
stateDiagram-v2
[*] --> cf1
[*] --> cf2
cf1 --> cf3
cf1 --> cf4
cf2 --> cf4
cf2 --> cf5
cf3 --> cf6
cf4 --> cf6
cf5 --> cf6
cf6 --> [*]
异常处理
CompletableFuture.completedFuture("")
.exceptionally(e -> {
e.printStackTrace();
return "";
});
CompletableFuture在回调方法中对异常进行了包装。大部分异常会封装成CompletionException后抛出,真正的异常存储在cause属性中,因此如果调用链中经过了回调方法处理那么就需要用Throwable.getCause()方法提取真正的异常。但是,有些情况下会直接返回真正的异常
初始化
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
// 将runnable包装成callable,内部是通过适配器的方式来实现的
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
get
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
// 这里会一直阻塞到任务完成
s = awaitDone(false, 0L);
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
long startTime = 0L; // Special value 0L means not yet parked
WaitNode q = null;
boolean queued = false;
// 无限循环
for (;;) {
int s = state;
// 如果任务已经完成,返回
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// 还未完成,让线程调度器重新调度,防止占着不放
else if (s == COMPLETING)
Thread.yield();
// 线程被打断,抛出异常
else if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
// 第一次运行,创建一些信息
else if (q == null) {
if (timed && nanos <= 0L)
return s;
q = new WaitNode();
}
else if (!queued)
queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
else if (timed) {
final long parkNanos;
if (startTime == 0L) { // first time
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) {
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
// nanoTime may be slow; recheck before parking
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);
}
else
LockSupport.park(this);
}
}
run
public void run() {
// 状态不对
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
// 状态正确时进入
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 执行主体
result = c.call();
// 标记执行完成
ran = true;
} catch (Throwable ex) {
// 标记失败
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
cancel
public boolean cancel(boolean mayInterruptIfRunning) {
// 状态不对
if (!(state == NEW && STATE.compareAndSet
(this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
// 通过设置中断位来停止线程
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
STATE.setRelease(this, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
线程池分类
- ThreadPollExecutor
- ForkJoinPool
- 分解汇总的任务
- 用很少的线程可以执行很多的任务(子任务) TPE做不到先执行子任务
- CPU密集型
ThreadPollExecutor
Executors:线程池工厂(不推荐使用)
- newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。这个线程池的最大线程数能达到整数的最大值
- newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
- newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。同样 线程最大数也是整数最大值
- newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
- newWorkSealingPool jdk8引入 使用多个队列来减少竞争
这个线程工厂大部分都使用了无界队列 如果瞬间请求量大 很有可能造成oom
队列在线程池中起的作用:
请求数大于 coreSize 时,可以让任务在队列中排队,让线程池中的线程慢慢的消费请求,当线程消费完所有的线程后,会阻塞的从队列中拿数据,通过队列阻塞的功能,使线程不消亡
原理
- 运行状态图
生命周期
- 管理方法
调用shutdown方法,此时仍然可以接受新任务,但是新任务将不会被运行,待已运行的任务执行完毕,线程池就会被终止
自定义
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2/*实际运行线程数 (不管它们创建以后是不是空闲的。线程池需要保持 corePoolSize 数量的线程)*/,
3/*最多允许创建的线程数*/,
0L /* 让线程存活的时间 0为永久 */,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(4)/* 线程池的内部队列 */,
Executors.defaultThreadFactory()/* 产生线程的方式 */,
new ThreadPoolExecutor.DiscardOldestPolicy() /* 线程池满时的拒绝策略 */
);
参数:
- corePoolSize 如果等于0 则任务执行结束后就会销毁所有线程 如果大于0 任务执行后这些线程不会被销毁
- maximumPoolSize 能最大同时容纳的线程数 如果任务数量大于这个数 那么剩下的任务就要被缓存在一个阻塞队列中
- keepAliveTime 表示线程池中的线程空闲时间 多于corePoolSize数量的部分线程会被销毁
- 时间单位
- workQUeue 缓存队列
- threadFactory 定义线程池线程的产生方式
- handler 任务拒绝策略
值得注意的是,上述参数除了缓存队列,其他参数都是可以在运行时动态调整的。
为了能达到动态调整队列长度的目的:可以通过实现自己的阻塞队列来实现。
如何配置:
- CPU密集型
- IO密集型
实际应用中 很难确定每个应用到底是CPU密集还是IO密集 不如通过动态调整线程池的方式 边调整边观察 进行负载测试 从而得到适合特定业务场景下的最佳配置
自定义线程工厂 为线程指定有意义的名称和相应的序列号 方便出错排查
定义好拒绝策略 宁愿抛出异常 也不要使用DiscardPolicy 这个策略会静悄悄的抛弃任务
线程池预热:
- prestartCoreThread
- prestartAllCoreThreads
设置回收核心线程:
- allowCoreThreadTimeOut
线程池的大小
Ncpu = CPU数量 Ucpu = 预期CPU使用率 W/C = 等待时间/计算时间
最优大小等于 Ncpu * Ucpu * (1 + W/C)
但这个公式过于学术,在生产环境中,这些时间很难计算 更多地是靠场景根据经验来设置各个参数 或是根据监控来动态调整参数以观察效果
应用场景
- coreSize == maxSize
让线程一下子增加到 maxSize,并且不要回收线程,防止线程回收,避免不断增加回收的损耗
- maxSize 无界 + SynchronousQueue
当任务被消费时,才会返回,这样请求就能够知道当前请求是已经在被消费了,如果是其他的队列的话,我们只知道任务已经被提交成功了,但无法知道当前任务是在被消费中,还是正在队列中堆积
比较消耗资源,大量请求到来时,我们会新建大量的线程来处理请求
- maxSize 有界 + Queue 无界
对实时性要求不大,但流量忽高忽低的场景下,可以使用这种方式
当流量高峰时,大量的请求被阻塞在队列中,对于请求的实时性难以保证
- maxSize 有界 + Queue 有界
把队列从无界修改成有界,只要排队的任务在要求的时间内,能够完成任务即可
- keepAliveTime 设置无穷大
想要空闲的线程不被回收,我们可以设置 keepAliveTime 为无穷大值
线程池的公用和独立
查询和写入不公用线程池,如果公用的话,当查询量很大时,写入的请求可能会到队列中去排队,无法及时被处理
原则上来说,每个写入业务场景都独自使用自己的线程池,绝不共用,这样在业务治理、限流、熔断方面都比较容易 多个查询业务场景是可以公用线程池的
Wroker
在线程池中,最小的执行单位就是 Worker
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
// 运行任务的线程
final Thread thread;
// 任务代码块
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
Worker(Runnable firstTask) {
// 把自己作为一个代码块穿给线程
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 线程是通过线程工程创建的
this.thread = getThreadFactory().newThread(this);
}
public void run() {
// 这里就将任务的执行交给线程池了
runWorker(this);
}
...
}
任务提交
提交到线程池中的任务,务必要注意任务之间要没有依赖,否则很容易就会出现死锁问题
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果工作线程数小于coreSize
if (workerCountOf(c) < corePoolSize) {
// 则直接创建worker执行
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果线程池状态正常,并且工作队列还能入队
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 线程池状态异常,尝试从队列移除任务
if (! isRunning(recheck) && remove(command))
// 移除成功就拒绝任务
reject(command);
// 工作线程为0
else if (workerCountOf(recheck) == 0)
// 直接创建worker执行
addWorker(null, false);
}
// 队列满了,如果线程数超过maxSize,拒绝任务
else if (!addWorker(command, false))
reject(command);
}
addWorker 方法首先是执行了一堆校验,然后使用 new Worker (firstTask) 新建了 Worker,最后使用 t.start () 执行 Worker,所以 t.start () 会执行到 Worker 的 run 方法上,到runWorker 方法里
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 校验各种状态
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 把任务交给worker,此时要执行的任务就已经传入给worker里面的thread了
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
// 检查线程池状态
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动线程,执行worker
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 所以说在这里,在不断地取任务执行
// 如果要执行的task为空,则会去取一个task,取不到就阻塞
while (task != null || (task = getTask()) != null) {
// 锁住worker,防止一个任务多个线程执行
w.lock();
// 线程池stop了,让线程中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 执行前钩子函数
beforeExecute(wt, task);
try {
// 执行真正的任务
// 而执行这个任务的线程就是worker里面的thread
task.run();
// 执行后钩子函数
afterExecute(task, null);
} catch (Throwable ex) {
// 异常钩子函数
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
private Runnable getTask() {
// 如果设置了线程超时时间,超过一定时间没有任务,超出coreSize部分的线程会被回收
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
// 检查线程池状态
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果线程数大于maxSize但是存活时间还没超过keepalive,则跳过后面取任务的部分
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 超过keepAliveTime时间取不到数据就返回,此时线程不再运行,结束了,JVM会回收掉
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
饱和策略
线程池的构建最后参数定义了当缓存队列满了之后,要如何处置提交的任务
- AbortPolicy:默认的饱和策略 抛出RejectedExecutionException
- CallerRunsPolicy:回退到由提交者执行
- DiscardPolicy:丢弃掉当前提交的任务
- DiscardOldestPolicy:丢弃掉等待队列中最老的任务
线程工厂
决定如何产生新线程,最常用来设置线程名称
public static final ThreadFactory SHORT_LIFE_POOL_THREAD_FACTORY = new ThreadFactory() {
private final AtomicInteger atomicInteger = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "short-life-thread-pool-" + atomicInteger.incrementAndGet());
}
};
ForkJoinPool
Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架
- 工作窃取(work-stealing):指某个线程从其他队列里窃取任务来执行
构造器
public ForkJoinPool() {...} // 创建一个拥有处理器数量的线程池
public ForkJoinPool(int parallelism){...} // 自定义并行度
使用
ForkJoinPool pool = new ForkJoinPool();
pool.execute(...);
execute 方法传递的任务有两个抽象子类:
- RecursiveAction 无返回值任务
- RecursiveTask 有返回值任务
class RaskDemo extends RecursiveAction {
/**
* 每个"小任务"最多只打印20个数
*/
private static final int MAX = 20;
private int start;
private int end;
public RaskDemo(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected void compute() {
//当end-start的值小于MAX时,开始打印
if((end-start) < MAX) {
for(int i= start; i<end;i++) {
System.out.println(Thread.currentThread().getName()+"i的值"+i);
}
}else {
// 将大任务分解成两个小任务
int middle = (start + end) / 2;
RaskDemo left = new RaskDemo(start, middle);
RaskDemo right = new RaskDemo(middle, end);
left.fork();
right.fork();
}
}
}
两个抽象子类的区别在于有返回值的任务在调用compute后最终会返回计算结果 无论是自己计算的 还是对子任务的合并
案例
线程池没有隔离导致任务饥饿
之前在维护视图库平台的时候,排查过一个问题:
视图库需要定期向上级平台定期发送一个 HTTP 保活请求,大概是几分钟一次,但通过日志以及数据库记录的保活请求发送时间发现,每次保活操作的执行都大大延迟了。
通过静态代码审查发现是保活操作跟另外一个像上级平台发送订阅通知数据的操作共用了同一个线程池,后者是大量短而快的任务,这就导致了保活操作要在队列中排队一段时间才会执行,所以执行时间都延迟了。
最后通过让保活操作自己单独使用线程池来解决问题。