构造函数
java的threadpool构造的时候支持7种参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize是线程池的核心线程数
maximumPoolSize是最大线程数
这两个数目的关系后面讲
keepAliveTime,unit是线程的存活时间
workQueue是阻塞队列
threadFactory是线程工厂(主要是用来给线程起名字,设置优先级等操作,jstack的时候可以快速找出自己线程池的线程)
handler这个是拒绝策略,主要是对于超过线程池最大容量后的操作
提交任务
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
提交任务的时候,我们看到,先看看是否满足corePoolSize,只要比corePoolSize小就加入线程池(addWorker下面讲,这里暂时理解为添加执行)。如果当前线程数量比核心线程数大,那么就会尝试往阻塞队列里加入任务,这里如果线程池不运行了就会采取拒绝策略。如果添加到阻塞队列也失败 ,就会以非核心添加执行,如果失败就是拒绝策略。
在addWorker中,第二个参数core的作用就是限制线程数。
wc >= (core ? corePoolSize : maximumPoolSize))
如果是核心,那么线程数不能超过corePoolSize,否则不能超过maximumPoolSize。
线程池添加任务的流程大概如下
先填满核心线程数,然后填满阻塞队列,然后继续增长线程到最大线程数。
如果说一个线程执行完毕了,那么怎么获取到下一个任务呢
在包装的任务中有如下代码
while (task != null || (task = getTask()) != null) {
task最开始赋值的是addWorker传入的,在执行完以后,就会while循环调用gettask。其中是会从阻塞队列里获取任务。
在getTask中
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
会根据线程数是否超过了核心线程数来判定获取任务的方式,如果是核心,那么会一直等队列里的数据,否则就会进行我们设置的时间等待,如果是超过了,我们上面看到的while循环就会退出,也就是线程数开始减少,一直到核心数量。
小操作
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
线程池有线程池的状态,同时线程的个数也在变化,java线程池是通过一个int的不同位数来表达的,在更新状态的时候通过cas来做,以此减少了锁的开销。
扩展
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
线程池中任务执行的框架里,有beforeExecute,afterExecute两个空实现的方法,一个是在任务方法执行前,一个是在任务方法执行后,其中afterExecute是可以获取到异常的。如果我们提交的是submit方法。会先包装成一个RunnableFuture,如果有异常需要自己获取,否则就会看不到异常。一种解决方式就是改造线程池。