Java中线程池是运用最多的并发框架,几乎所有并发的程序都可以使用线程池来完成。阿里巴巴Java开发手册中明确指出:
线程资源必须通过线程池提供,不允许在应用中自行显示创建线程。
与文无关
在实际的生产环境中,线程的数量必须得到控制,盲目的大量创建线程对系统性能是有伤害的,合理使用线程好处:
减少在创建和销毁现场上所消耗的时间和系统资源
提高响应速度,无需创建可以直接运行
提高线程的可管理性。使用线程池可以进行统一分配,调优和监控,但是要做到合理利用线程池,必须对其原理了如指掌。
线程池工作原理
当向线程池提交一个任务的时候。
先看线程池中的核心线程是否有空闲的,如果有创建一个工作线程来执行任务。如果核心线程都在工作,那么进入下一步
判断任务队列是否满了,如果任务队列未满,则把任务存储到任务队列,执行下一步。如果满了,执行拒绝策略。
添加到任务队列之后,再判断核心线程是否有空闲的,如果没有空闲的,那么尝试创建新的非核心线程执行任务。
// 线程池处理步骤 public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * 总共有是三个步骤 */ int c = ctl.get(); // 如果正在运行的线程数小于核心线程数,则创建工作线程 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 线程池正在运行,把任务加入任务队列成功 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 如果线程池已经关闭,那么从任务队列移除任务,并且执行拒绝策略。 if (! isRunning(recheck) && remove(command)) reject(command); // 可供使用的工作线程为0,那么创建新的非核心线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //如果线程池不在运行或者未能加入任务队列,执行拒绝策略 else if (!addWorker(command, false)) reject(command); }
image.png
创建线程池
JDK内部已经提供了Executors类,它扮演者线程池工厂的角色,通过它可以取得拥有特定功能的线程池,但是我们最好手动创建线程池。原因如下:
Executors内部也是直接构造线程池对象,没有额外的操作
手动创建线程池,我们更明白线程池的参数,方便调优。
Executors创建的线程池有可能导致OOM异常。
虽然不建议直接使用Executors直接创建线程池,但是我们可以看一下它给我们提供了那些工厂方法:
// 返回一个可根据实际情况调整线程数量的线程池 // 它是大小无界的线程池,适合执行很多短期一步的小程序,或是负载比较轻的服务器。 public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } // 返回一个固定线程数量的线程池 // 适合负载比较重的服务器 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } // 返回一个固定线程数量的线程池对象,ScheduledThreadPoolExecutor对象可以定时执行某任务 // 适合于多个后台线程执行周期任务。 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } // 返回只有一个线程的线程池。 // 适合于单个线程顺序的执行任务 public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } // 返回只有一个线程的ScheduledThreadPoolExecutor对象。 // 单个线程执行周期任务 public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); }
本质上,我们可以通过ThreadPoolExecutor来创建线程池:
new ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
参数如下:
corePoolSize: 线程池的基本大小。当提交一个任务的时候,线程池就会创建一个新的线程执行任务,即使核心线程池中有空闲线程,也会新建,知道线程池中的数量等于corePoolSize就不再创建。如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有的线程。
maximumPoolSize:线程池允许创建的最大线程数。当使用无界队列的时候,这个参数就没什么效果了。
keepAliveTime:线程池的工作线程空闲以后,保持存活的时间,如果任务多,并且任务执行时间段,可以调大时间,提高线程的利用率。
unit 保活时间的单位
workQueue: 任务队列,用于保持或等待执行的任务阻塞队列。有如下队列可供选择:
ArrayBlockingQueue: 基于数组结构的有界队列,此队列按FIFO原则对元素进行排序
LinkedBlockingQueue: 基于链表的阻塞队列,FIFO原则,吞吐量通常高于ArrayBlockingQueue.
SynchronousQueue: 不存储元素的阻塞队列。每个插入必须要等到另一个线程调用移除操作。
PriorityBlockingQueue: 具有优先级的无阻塞队列
threadFactory:用于设置创建线程的工厂。
handler:拒绝策略,当队列线程池都满了,必须采用一种策略来处理还要提交的任务。在实际应用中,我们可以将信息记录到日志,来分析系统的负载和任务丢失情况JDK中提供了4中策略:
AbortPolicy: 直接抛出异常
CallerRunsPolicy: 只用调用者所在的线程来运行任务
DiscardOldestPolicy: 丢弃队列中最老的一个人任务,并执行当前任务。
DiscardPolicy: 直接丢弃新进来的任务
知道如上参数,再去分析Executors框架,聪明的你一定知道是怎么回事了。
执行任务
可以使用两个方法:
execute() 提交不需要返回值的任务,无法判断是否执行成功,具体步骤上面我们有分析
submit() 提交有返回值的任务,该方法返回一个future的对象,通过future对象可以判断任务是否执行成功。future的get方法会阻塞当前线程直到任务完成。
关闭线程池
两个方法:
shutdown() 通知线程该结束了,尝试用终端来停止线程,如果线程对中断不响应的话,那么这个方法无法关闭线程池。
shutdownNow() 看名字就知道是立刻关闭线程池,类似于线程的stop方法,不等待任务执行完成就关闭线程。
扩展线程池
有时候需要对线程池做一些扩展,比如知道线程池的开始结束时间,线程池的运行统计等信息。这个时候好在ThreadPoolExecutor给我们提供了三个方法进行扩展:
protected void beforeExecute(Thread t, Runnable r) { } protected void afterExecute(Runnable r, Throwable t) { } protected void terminated() { }
可以监控的属性:
taskCount: 线程池需要执行的任务数量
completedTaskCount: 已经完成的任务数量
largestPoolSize: 线程池中曾经创建的最大的线程数量
getPoolSize: 线程池的线程数量
getActiveCount: 活动的线程数
合理配置线程池
线程池中线程的数量过大和过小都无法使系统的性能发挥到最优,确定线程池的大小可以考虑下面的角度:
任务性质:CPU密集,IO密集,和混合密集
任务执行时间:长,中,低
任务优先级:高,中,低
任务的依赖性:是否依赖其它资源,如数据库连接
建议使用有界队列,防止撑爆内存
在Java中,获取CPU数量:
Runtime.getRuntime().availableProcessors();
线程池计算公式:
N = CPU数量 U = 目标CPU使用率, 0 <= U <= 1 W/C = 等待(wait)时间与计算(compute)时间的比率 线程池数量 = N * U * (1 + W/C)
线程调度
在多线程竞争的情况下,肯定要涉及到线程调度的问题。线程调度是指系统为线程分配处理器的过程,主要调度方式有两种。
协同步式线程调度(Cooperative Threads-Schedulin) :线程的执行时间由线程本身控制,执行完任务之后通知系统切换到另外一个线程上。 实现简单,但是如果一个线程坚持不让出CPU,那么会导致整个系统崩溃。
抢占式线程调度(Preemptive Threads-Schedulng):由系统分配时间,线程切换不由线程本身决定,这种情况下线程的执行时间是系统可控的,也不会有某线程出现问题导致进程阻塞的问题。
Java使用的是抢占式线程调度。
最后
线程池与普通线程的区别不会太多,只是更好的利用了系统资源,任何使用到线程的地方都可以使用线程池来替代。关于线程池,我们就说到这里。
参考
《Java并发编程的艺术》
《Java高并发程序设计》