这是线程池第二篇文章,上一篇文章写了如何创建线程池,这篇文章用代码来演示通过 ThreadPoolExecutor 和 Executors的静态工厂方法创建线程池,以及这些线程池的基本使用,然后重点讲解 ThreadPoolExecutor 的实现接口 ExecutorService 中的常用API,如何通过这些API来管理线程池。
1、ThreadPoolExecutor 创建线程池
从上篇文章《juc-12-线程池2-线程池的使用》中,我们学习到 ThreadPoolExecutor 几个重载的构造函数都会调用下面这个构造函数:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
这个构造函数指定 corePoolSize、maximumPoolSize、keepAliveTime 、workQueue、threadFactory 和 RejectedExecutionHandler
其他重载的构造函数中:
- 如果没有指定线程工厂threadFactory,则默认使用Executors.defaultThreadFactory();
- 如果没有指定拒绝策略 RejectedExecutionHandler,则默认使用 AbortPolicy,表示直接抛出异常。
提示: BlockingQueue阻塞队列的相关知识点,请看前面的文章《juc-10-阻塞队列》。
添加线程的规则:
- 如果线程小于corePoolSize的时候,即使线程有处于空闲状态,也会继续创建新的线程运行新的任务
- 如果等于大于corePoolSize,但是小于maxPoolSize,则将任务放入队列
- 如果队列已满,并且线程数小于maxPoolSize,创建新的线程来运行人任务。
- 如果队列已满,并且线程数大于或者等于maxPoolSize,则拒绝该任务,拒绝策略由
RejectedExecutionHandler
决定。
演示 ThreadPoolExecutor 创建线程池
public class ThreadPoolExecutorDemo {
public static void main(String[] args) {
int corePoolSize = 3;//核心线程数=3,
int maximumPoolSize = 5;//最大线程数=5
int keepAliveTime = 30;//超出核心线程数的线程空闲时间超过 30ms 就会被回收
int capacity = 10;//阻塞队列的容量=10
ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(capacity);
// 创建线程池,
// 没有指定线程工厂threadFactory,则默认使用Executors.defaultThreadFactory()
// 没有指定拒绝策略 RejectedExecutionHandler,则默认使用 AbortPolicy,表示直接抛出异常
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, workQueue);
// 下面演示 taskCount 为同数值时的场景
// 1. 任务数 <= corePoolSize + capacity,线程池中只有 corePoolSize 个线程
// int taskCount = corePoolSize + capacity;
// 2. corePoolSize + capacity < 任务数 <= maximumPoolSize + capacity,这时 corePoolSize < 线程池中线程数 <= maximumPoolSize
// int taskCount = maximumPoolSize + capacity;
// 3. 任务数 > maximumPoolSize + capacity , 线程池无法接受你所提交的任务的时候,采取的拒绝策略
int taskCount = maximumPoolSize + capacity + 1;
// 模拟程序需要执行 taskCount 个任务
for (int i = 0; i < taskCount; i++) {
// 每个任务启动一条线程去处理
int index = i;
// 给线程池提交任务
threadPoolExecutor.submit(() -> {
try {
System.out.println(Thread.currentThread().getName() + "执行任务..." + index);
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
threadPoolExecutor.shutdown();//关闭线程池
}
}
- 任务数 taskCount<= corePoolSize + capacity,运行结果如下:
此时,线程池中只有 corePoolSize 个线程,因为任务数量可以被 corePoolSize 个线程和 workQueue 所接受,线程数没有大于corePoolSize,所以线程执行完一个任务后,会通过 workQueue.take() 取出一个新任务继续执行。
pool-1-thread-1执行任务...0
pool-1-thread-3执行任务...2
pool-1-thread-2执行任务...1
pool-1-thread-1执行任务...3
pool-1-thread-2执行任务...4
pool-1-thread-3执行任务...5
pool-1-thread-1执行任务...6
pool-1-thread-3执行任务...8
pool-1-thread-2执行任务...7
pool-1-thread-2执行任务...9
pool-1-thread-1执行任务...10
pool-1-thread-3执行任务...11
pool-1-thread-2执行任务...12
- corePoolSize + capacity < 任务数 taskCount <= maximumPoolSize + capacity,运行结果如下:
这时 corePoolSize < 线程池中线程数 <= maximumPoolSize,可以看出线程池中最多出现了5条线程执行任务。线程数大于corePoolSize,所以线程执行完一个任务后,会通过 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 取出一个新任务继续执行。
pool-1-thread-2执行任务...1
pool-1-thread-4执行任务...13
pool-1-thread-5执行任务...14
pool-1-thread-3执行任务...2
pool-1-thread-1执行任务...0
pool-1-thread-3执行任务...3
pool-1-thread-4执行任务...4
pool-1-thread-5执行任务...7
pool-1-thread-1执行任务...6
pool-1-thread-2执行任务...5
pool-1-thread-3执行任务...8
pool-1-thread-4执行任务...10
pool-1-thread-2执行任务...9
pool-1-thread-1执行任务...12
pool-1-thread-5执行任务...11
- 任务数 taskCount > maximumPoolSize + capacity , 运行结果如下:
线程池无法接受你所提交的任务的时候,采取的拒绝策略
pool-1-thread-1执行任务...0
pool-1-thread-5执行任务...14
pool-1-thread-3执行任务...2
pool-1-thread-4执行任务...13
pool-1-thread-2执行任务...1
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@568db2f2 rejected from java.util.concurrent.ThreadPoolExecutor@378bf509[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at com.xander.juc._11threadPool.ThreadPoolExecutorDemo.main(ThreadPoolExecutorDemo.java:42)
pool-1-thread-2执行任务...4
pool-1-thread-4执行任务...3
...
2、Executors 创建线程池
2.1 Executors.newFixedThreadPool
newFixedThreadPool(int nThreads) :用于创建固定线程数的线程池。
演示 newFixedThreadPool 的使用
public class NewFixedThreadPoolDemo {
public static void main(String[] args) {
// 创建固定5个线程的线程池
ExecutorService executorService = Executors.newFixedThreadPool(5);
// 模拟程序需要执行1000个任务
for (int i = 0; i < 1000; i++) {
// 每个任务启动一条线程去处理
int index = i;
// 给线程池提交任务
executorService.submit(()->{
System.out.println(Thread.currentThread().getName() + "执行任务..."+ index);
});
}
executorService.shutdown();//关闭线程池
}
}
运行结果如下:
从线程名称可以看出,线程池中只有5条线程用于执行任务。
pool-1-thread-3执行任务...2
pool-1-thread-5执行任务...4
pool-1-thread-3执行任务...5
pool-1-thread-2执行任务...1
pool-1-thread-1执行任务...0
pool-1-thread-1执行任务...9
...
pool-1-thread-4执行任务...998
pool-1-thread-4执行任务...999
pool-1-thread-5执行任务...993
pool-1-thread-1执行任务...992
pool-1-thread-3执行任务...920
pool-1-thread-2执行任务...919
2.2 Executors.newSingleThreadExecutor
newSingleThreadExecutor() :用于创建只有一条线程的线程池。
public class NewSingleThreadExecutorDemo {
public static void main(String[] args) {
// 创建只有一条线程的线程池
ExecutorService executorService = Executors.newSingleThreadExecutor();
// 模拟程序需要执行1000个任务
for (int i = 0; i < 1000; i++) {
// 每个任务启动一条线程去处理
int index = i;
// 给线程池提交任务
executorService.submit(()->{
System.out.println(Thread.currentThread().getName() + "执行任务..."+ index);
});
}
executorService.shutdown();//关闭线程池
}
}
运行结果如下:
从线程名称可以看出,线程池中只有1条线程用于执行任务。
pool-1-thread-1执行任务...0
pool-1-thread-1执行任务...1
pool-1-thread-1执行任务...2
pool-1-thread-1执行任务...3
...
pool-1-thread-1执行任务...996
pool-1-thread-1执行任务...997
pool-1-thread-1执行任务...998
pool-1-thread-1执行任务...999
2.3 Executors.newCachedThreadPool
newCachedThreadPool() :创建无界线程池,maximumPoolSize = Integer.MAX_VALUE 近似无界,具有自动回收多余线程的功能。
- corePoolSize = 0 说明核心线程数为0,
- maximumPoolSize = Integer.MAX_VALUE,最大可创建 Integer.MAX_VALUE 个线程;
- keepAliveTime 等于60s,因为核心线程数 corePoolSize 为0,所以只要线程池中的线程空闲超过 keepAliveTime 都会被自动回收;
- workQueue 是 SynchronousQueue 实例,这是一个不存储元素的阻塞队列,它的容量为 0,每一个put操作都要等待一个take操作,也就是说如果当前线程池中所有线程都正在忙,则为该任务创建一个新线程并将这个放入池中。
public class NewCachedThreadPoolDemo {
public static void main(String[] args) {
// 创建无界线程池,线程池中线程最大数量 maximumPoolSize = Integer.MAX_VALUE 近似无界,具有自动回收多余线程的功能
ExecutorService executorService = Executors.newCachedThreadPool();
// 模拟程序需要执行1000个任务
for (int i = 0; i < 1000; i++) {
// 每个任务启动一条线程去处理
int index = i;
// 给线程池提交任务
executorService.submit(()->{
try {
System.out.println(Thread.currentThread().getName() + "执行任务..."+ index);
// 模拟执行任务耗时 50ms,
// 用于演示池中所有线程都繁忙时,需要创建新的线程去执行新任务
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();//关闭线程池
}
}
运行结果如下:
从线程名称可以看出,当接收新任务时,如果池中所有线程都繁忙,会创建新的线程去执行新任务
pool-1-thread-1执行任务...0
pool-1-thread-6执行任务...5
pool-1-thread-3执行任务...2
pool-1-thread-2执行任务...1
pool-1-thread-9执行任务...8
...
pool-1-thread-785执行任务...953
pool-1-thread-784执行任务...952
pool-1-thread-783执行任务...951
pool-1-thread-782执行任务...950
2.4 Executors.newScheduledThreadPool
newScheduledThreadPool(int corePoolSize) :创建支持延迟任务、周期性执行任务的线程池。
public class ScheduledExecutorServiceDemo {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) {
// 创建 ScheduledExecutorService ,能执行延迟和周期性任务
ScheduledExecutorService schedule = Executors.newScheduledThreadPool(5);
System.out.println(Thread.currentThread().getName() + "开始执行:" + formatter.format(LocalDateTime.now()));
// testSchedule(schedule);
// testScheduleAtFixedRate1(schedule);
// testScheduleAtFixedRate2(schedule);
testScheduleWithFixedDelay(schedule);
}
/**
* 测试 schedule 方法,延迟执行,并且只执行一次
*
* @param schedule
*/
private static void testSchedule(ScheduledExecutorService schedule) {
schedule.schedule(() -> {
// 延迟1000ms,只执行一次
System.out.println(Thread.currentThread().getName() + "延迟1000ms,只执行一次:" + formatter.format(LocalDateTime.now()));
},
1000, TimeUnit.MILLISECONDS);
}
/**
* 测试 scheduleAtFixedRate,执行任务时间 2000ms 小于设置的 period 周期时长 3000ms
*
* @param schedule
*/
private static void testScheduleAtFixedRate1(ScheduledExecutorService schedule) {
schedule.scheduleAtFixedRate(() -> {
try {
// 周期性任务,延迟1000ms执行,并且每隔3000ms 开始一次任务
System.out.println("scheduleAtFixedRate: " + Thread.currentThread().getName() + " 开始时间" + formatter.format(LocalDateTime.now()));
TimeUnit.MILLISECONDS.sleep(2000);
System.out.println("scheduleAtFixedRate: " + Thread.currentThread().getName() + " 执行任务花费 2000ms 结束时间" + formatter.format(LocalDateTime.now()));
} catch (InterruptedException e) {
e.printStackTrace();
}
},
1000, 3000, TimeUnit.MILLISECONDS);
}
/**
* 测试 scheduleAtFixedRate,执行任务时间 4000ms 大于设置的 period 周期时长 3000ms
* 虽然执行任务时间大于设置的 period 周期时长,但是只有等上一个任务执行结束后,才会开启新的任务
*
* @param schedule
*/
private static void testScheduleAtFixedRate2(ScheduledExecutorService schedule) {
schedule.scheduleAtFixedRate(() -> {
try {
// 周期性任务,延迟1000ms执行,并且每隔3000ms 开始一次任务
System.out.println("scheduleAtFixedRate: " + Thread.currentThread().getName() + " 开始时间" + formatter.format(LocalDateTime.now()));
TimeUnit.MILLISECONDS.sleep(4000);
System.out.println("scheduleAtFixedRate: " + Thread.currentThread().getName() + " 执行任务花费 4000ms 结束时间" + formatter.format(LocalDateTime.now()));
} catch (InterruptedException e) {
e.printStackTrace();
}
},
1000, 3000, TimeUnit.MILLISECONDS);
}
/**
* 测试 scheduleWithFixedDelay
* 周期性任务,延迟1000ms执行,前一个任务结束 3000ms 后下一个任务开始执行
* @param schedule
*/
private static void testScheduleWithFixedDelay(ScheduledExecutorService schedule) {
schedule.scheduleWithFixedDelay(() -> {
try {
// 周期性任务,延迟1000ms执行,前一个任务结束 3000ms 后下一个任务开始执行
System.out.println("scheduleWithFixedDelay周期性任务," + Thread.currentThread().getName() + " 开始时间" + formatter.format(LocalDateTime.now()));
TimeUnit.MILLISECONDS.sleep(5000);
System.out.println("scheduleAtFixedRate周期性任务," + Thread.currentThread().getName() + " 执行任务花费 5000ms 结束时间" + formatter.format(LocalDateTime.now()));
} catch (InterruptedException e) {
e.printStackTrace();
}
},
1000, 3000, TimeUnit.MILLISECONDS);
}
}
测试 schedule 方法,延迟执行,并且只执行一次,运行结果如下:
main开始执行:2020-12-01 10:17:24
pool-1-thread-1延迟1000ms,只执行一次:2020-12-01 10:17:25
测试 scheduleAtFixedRate,执行任务时间 2000ms 小于设置的 period 周期时长 3000ms,运行结果如下:
从打印的时间可以看出,新任务执行结束过1s后,才开启新的任务,每次开启任务的时间间隔都是 3000ms。
main开始执行:2020-12-01 10:01:25
scheduleAtFixedRate: pool-1-thread-1 开始时间2020-12-01 10:01:26
scheduleAtFixedRate: pool-1-thread-1 执行任务花费 2000ms 结束时间2020-12-01 10:01:28
scheduleAtFixedRate: pool-1-thread-1 开始时间2020-12-01 10:01:29
scheduleAtFixedRate: pool-1-thread-1 执行任务花费 2000ms 结束时间2020-12-01 10:01:31
scheduleAtFixedRate: pool-1-thread-2 开始时间2020-12-01 10:01:32
scheduleAtFixedRate: pool-1-thread-2 执行任务花费 2000ms 结束时间2020-12-01 10:01:34
scheduleAtFixedRate: pool-1-thread-1 开始时间2020-12-01 10:01:35
scheduleAtFixedRate: pool-1-thread-1 执行任务花费 2000ms 结束时间2020-12-01 10:01:37
scheduleAtFixedRate: pool-1-thread-3 开始时间2020-12-01 10:01:38
测试 scheduleAtFixedRate,执行任务时间 4000ms 大于设置的 period 周期时长 3000ms,运行结果如下:
从打印的时间可以看出,虽然执行任务时间大于设置的 period 周期时长,但是只有等上一个任务执行结束,才会立即执行新的任务
main开始执行:2020-12-01 09:49:47
scheduleAtFixedRate: pool-1-thread-1 开始时间2020-12-01 09:49:48
scheduleAtFixedRate: pool-1-thread-1 执行任务花费 4000ms 结束时间2020-12-01 09:49:52
scheduleAtFixedRate: pool-1-thread-1 开始时间2020-12-01 09:49:52
scheduleAtFixedRate: pool-1-thread-1 执行任务花费 4000ms 结束时间2020-12-01 09:49:56
scheduleAtFixedRate: pool-1-thread-2 开始时间2020-12-01 09:49:56
scheduleAtFixedRate: pool-1-thread-2 执行任务花费 4000ms 结束时间2020-12-01 09:50:00
scheduleAtFixedRate: pool-1-thread-1 开始时间2020-12-01 09:50:00
scheduleAtFixedRate: pool-1-thread-1 执行任务花费 4000ms 结束时间2020-12-01 09:50:04
scheduleAtFixedRate: pool-1-thread-3 开始时间2020-12-01 09:50:04
测试 scheduleWithFixedDelay,运行结果如下:
从打印的时间可以看出,周期性任务,延迟1000ms执行,前一个任务结束 3000ms 后下一个任务开始执行。
main开始执行:2020-12-01 10:09:36
scheduleWithFixedDelay周期性任务,pool-1-thread-1 开始时间2020-12-01 10:09:37
scheduleAtFixedRate周期性任务,pool-1-thread-1 执行任务花费 5000ms 结束时间2020-12-01 10:09:42
scheduleWithFixedDelay周期性任务,pool-1-thread-1 开始时间2020-12-01 10:09:45
scheduleAtFixedRate周期性任务,pool-1-thread-1 执行任务花费 5000ms 结束时间2020-12-01 10:09:50
scheduleWithFixedDelay周期性任务,pool-1-thread-2 开始时间2020-12-01 10:09:53
scheduleAtFixedRate周期性任务,pool-1-thread-2 执行任务花费 5000ms 结束时间2020-12-01 10:09:58
scheduleWithFixedDelay周期性任务,pool-1-thread-1 开始时间2020-12-01 10:10:01
3、ExecutorService中的常用API
ThreadPoolExecutor 的继承关系图:
ThreadPoolExecutor 实现了 ExecutorService ,ExecutorService 是一个接口,它继承自Executor接口,所以 ExecutorService 包含 Executor 里面的方法,Executor 只有一个 void execute(Runnable command)
方法,接收 Runnable,无返回值。
下面看看 ExecutorService 接口定义的线程池的操作方法。
方法 | 描述 |
---|---|
void execute(Runnable command) | 提交Runnable新任务到线程池执行,无返回值, 下面3个重载的submit()方法将Runnable/Callable参数包装成FutureTask后,都通过执行execute()方法,向线程池提交任务 |
Future<T> submit(Callable<T> task) | 提交Callable新任务到线程池执行,返回Future实例,Future用于控制任务执行或者获取任务执行信息,Future返回的是 Callable 的 call() 方法执行的结果 |
Future<?> submit(Runnable task) | 提交Runnable新任务到线程池执行,返回Future实例,Future用于控制任务执行或者获取任务执行信息,Future.get()都返回null |
Future<T> submit(Runnable task, T result) | 提交Runnable新任务到线程池执行,返回Future实例,Future用于控制任务执行或者获取任务执行信息,Future.get()都返回给定的result |
boolean awaitTermination (long timeout, TimeUnit unit) |
阻塞直到 shutdown()方法执行后,所有任务都已执行完成,或直到发生超时,或直到当前线程中断(interrupted)(以先发生者为准) |
void shutdown() | 这个方法只是初始化整个线程池的关闭过程,执行这个方法之后,拒绝加入新任务,执行完当前线程中正在执行的任务和队列中的线程任务后,再进行停止。会把存量的任务都执行完毕,但是不会加入新任务。 |
List<Runnable> shutdownNow() | 通过抛出InterruptedException,中断所有正在执行的任务,并返回还未开始执行的任务列表 |
boolean isShutdown() | 如果 ExecutorService 已经 shut down,也就是已经执行了shutdown() 或 shutdownNow(),返回true |
boolean isTerminated() | 如果 shut down 后所有任务都已完成,则返回true, 注意,isTerminated() 永远不是 true ,除非先调用 shutdown() 或 shutdownNow() |
提示: Runnable、Future、RunnableFuture、FutureTask 和 Callable 之间的关系,还有 Callable 和 Future 的用法在前面文章《juc-01-创建线程》有讲过。
介绍完了各个API的作用,下面用demo来进一步说明这些API如何去使用。
3.1 void execute(Runnable command)
void execute(Runnable command) : 提交Runnable新任务到线程池执行,无返回值,3个重载的submit()方法将Runnable/Callable参数包装成FutureTask后,都是通过执行execute()方法,向线程池提交任务。
public class ExecuteDemo {
public static void main(String[] args) {
int corePoolSize = 3;//核心线程数=3,
int maximumPoolSize = 5;//最大线程数=5
int keepAliveTime = 30;//超出核心线程数的线程空闲时间超过 30ms 就会被回收
int capacity = 10;//阻塞队列的容量=10
ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(capacity);
// 创建线程池,
// 没有指定线程工厂threadFactory,则默认使用Executors.defaultThreadFactory()
// 没有指定拒绝策略 RejectedExecutionHandler,则默认使用 AbortPolicy,表示直接抛出异常
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, workQueue);
// 模拟程序需要执行10个任务
for (int i = 0; i < 10; i++) {
// 每个任务启动一条线程去处理
int index = i;
// 给线程池提交任务
threadPoolExecutor.execute(()->{
System.out.println(Thread.currentThread().getName() + "执行任务..."+ index);
});
}
threadPoolExecutor.shutdown();//关闭线程池
}
}
运行结果:
pool-1-thread-1执行任务...0
pool-1-thread-3执行任务...2
pool-1-thread-3执行任务...4
pool-1-thread-2执行任务...1
pool-1-thread-3执行任务...5
pool-1-thread-2执行任务...6
pool-1-thread-2执行任务...8
pool-1-thread-2执行任务...9
pool-1-thread-1执行任务...3
pool-1-thread-3执行任务...7
3.2 submit 方法
通过下面的源码,可以看出 3个重载的submit方法 都通过调用 execute 方法,提交新任务到线程池执行
- submit(Runnable task): 通过Runnable新建FutureTask,FutureTask.get() 返回值设为 null,调用 execute 方法,提交Runnable新任务到线程池执行,最后返回 FutureTask;
- submit(Runnable task, T result): 通过Runnable 和 result 新建FutureTask,FutureTask.get() 返回值设为 result,调用 execute 方法,提交Runnable新任务到线程池执行,最后返回 FutureTask;
- submit(Callable task): 根据 Callable 新建一个 FutureTask 实例,FutureTask.get() 返回的是 Callable的call() 方法执行的结果,调用 execute 方法,提交生成的FutureTask新任务到线程池执行,最后返回 FutureTask;
3个重载的submit方法源码
// 根据 Runnable 和 value,新建一个 FutureTask 实例,FutureTask.get() 返回的是value
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
// 根据 Callable 新建一个 FutureTask 实例,FutureTask.get() 返回的是 Callable的call() 方法执行的结果
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
//通过Runnable新建FutureTask,FutureTask.get() 返回值设为 null
RunnableFuture<Void> ftask = newTaskFor(task, null);
//调用 execute 方法,提交Runnable新任务到线程池执行
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
//通过Runnable 和 result 新建FutureTask,FutureTask.get() 返回值设为 result
RunnableFuture<T> ftask = newTaskFor(task, result);
//调用 execute 方法,提交Runnable新任务到线程池执行
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
// 根据 Callable 新建一个 FutureTask 实例,FutureTask.get() 返回的是 Callable的call() 方法执行的结果
RunnableFuture<T> ftask = newTaskFor(task);
//调用 execute 方法,提交 RunnableFuture 新任务到线程池执行
execute(ftask);
return ftask;
}
3.2.1 submit(Runnable task)
submit(Runnable task): 通过Runnable新建FutureTask,FutureTask.get() 返回值设为 null,调用 execute 方法,提交Runnable新任务到线程池执行,最后返回 FutureTask;
演示 Future submit(Runnable task) 的使用,所有 Future.get() 都返回 null
public class SubmitDemo {
public static void main(String[] args) {
int corePoolSize = 3;//核心线程数=3,
int maximumPoolSize = 5;//最大线程数=5
int keepAliveTime = 30;//超出核心线程数的线程空闲时间超过 30ms 就会被回收
int capacity = 10;//阻塞队列的容量=10
ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(capacity);
// 创建线程池,
// 没有指定线程工厂threadFactory,则默认使用Executors.defaultThreadFactory()
// 没有指定拒绝策略 RejectedExecutionHandler,则默认使用 AbortPolicy,表示直接抛出异常
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, workQueue);
// 用于存放 submit 返回的 Future
List<Future> futureList = new ArrayList<>();
// 1. 演示 Future<T> submit(Runnable task) 的使用,所有 Future.get() 都返回 null
submitRunnableNoResult(threadPoolExecutor, futureList);
//遍历 futureList 打印任务执行结果
futureList.forEach(future -> {
try {
System.out.println("Future获取任务执行结果:" + future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
threadPoolExecutor.shutdown();//关闭线程池
}
/**
* 演示 Future<T> submit(Runnable task) 的使用,所有 Future.get() 都返回 null
*
* @param threadPoolExecutor
* @param futureList
*/
private static void submitRunnableNoResult(ThreadPoolExecutor threadPoolExecutor, List<Future> futureList) {
for (int i = 0; i < 5; i++) {
// 每个任务启动一条线程去处理
int index = i;
// Future<?> submit(Runnable task) 给线程池提交任务
Future<?> future = threadPoolExecutor.submit(() -> {
System.out.println(Thread.currentThread().getName() + "执行任务..." + index);
});
futureList.add(future);
}
}
}
运行结果,所有 Future.get() 都返回 null:
pool-1-thread-1执行任务...0
pool-1-thread-2执行任务...1
pool-1-thread-1执行任务...3
pool-1-thread-3执行任务...2
pool-1-thread-2执行任务...4
Future获取任务执行结果:null
Future获取任务执行结果:null
Future获取任务执行结果:null
Future获取任务执行结果:null
Future获取任务执行结果:null
3.2.2 submit(Runnable task, T result)
submit(Runnable task, T result): 通过Runnable 和 result 新建FutureTask,FutureTask.get() 返回值设为 result,调用 execute 方法,提交Runnable新任务到线程池执行,最后返回 FutureTask;
public class SubmitDemo {
public static void main(String[] args) {
int corePoolSize = 3;//核心线程数=3,
int maximumPoolSize = 5;//最大线程数=5
int keepAliveTime = 30;//超出核心线程数的线程空闲时间超过 30ms 就会被回收
int capacity = 10;//阻塞队列的容量=10
ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(capacity);
// 创建线程池,
// 没有指定线程工厂threadFactory,则默认使用Executors.defaultThreadFactory()
// 没有指定拒绝策略 RejectedExecutionHandler,则默认使用 AbortPolicy,表示直接抛出异常
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, workQueue);
// 用于存放 submit 返回的 Future
List<Future> futureList = new ArrayList<>();
// 1. 演示 Future<T> submit(Runnable task) 的使用,所有 Future.get() 都返回 null
// submitRunnableNoResult(threadPoolExecutor, futureList);
// 2. 演示 Future<T> submit(Runnable task, T result) 的使用,所有 Future.get() 都返回submit时指定的 result
// submitRunnableWithResult(threadPoolExecutor, futureList);
//遍历 futureList 打印任务执行结果
futureList.forEach(future -> {
try {
System.out.println("Future获取任务执行结果:" + future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
threadPoolExecutor.shutdown();//关闭线程池
}
/**
* 演示 Future<T> submit(Runnable task, T result) 的使用,Future.get() 都返回submit时指定的 result
*
* @param threadPoolExecutor
* @param futureList
*/
private static void submitRunnableWithResult(ThreadPoolExecutor threadPoolExecutor, List<Future> futureList) {
for (int i = 0; i < 5; i++) {
// 每个任务启动一条线程去处理
int index = i;
// Future<T> submit(Runnable task, T result) 给线程池提交任务
Future<?> future = threadPoolExecutor.submit(() -> {
System.out.println(Thread.currentThread().getName() + "执行任务..." + index);
}, "hello"+index);
futureList.add(future);
}
}
}
运行结果,Future.get() 都返回submit时指定的 result:
pool-1-thread-1执行任务...0
pool-1-thread-2执行任务...1
pool-1-thread-1执行任务...3
pool-1-thread-3执行任务...2
pool-1-thread-2执行任务...4
Future获取任务执行结果:hello0
Future获取任务执行结果:hello1
Future获取任务执行结果:hello2
Future获取任务执行结果:hello3
Future获取任务执行结果:hello4
3.2.3 submit(Callable task)
submit(Callable task): 根据 Callable 新建一个 FutureTask 实例,FutureTask.get() 返回的是 Callable的call() 方法执行的结果,调用 execute 方法,提交生成的FutureTask新任务到线程池执行,最后返回 FutureTask;
public class SubmitDemo {
public static void main(String[] args) {
int corePoolSize = 3;//核心线程数=3,
int maximumPoolSize = 5;//最大线程数=5
int keepAliveTime = 30;//超出核心线程数的线程空闲时间超过 30ms 就会被回收
int capacity = 10;//阻塞队列的容量=10
ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(capacity);
// 创建线程池,
// 没有指定线程工厂threadFactory,则默认使用Executors.defaultThreadFactory()
// 没有指定拒绝策略 RejectedExecutionHandler,则默认使用 AbortPolicy,表示直接抛出异常
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, workQueue);
// 用于存放 submit 返回的 Future
List<Future> futureList = new ArrayList<>();
// 1. 演示 Future<T> submit(Runnable task) 的使用,所有 Future.get() 都返回 null
// submitRunnableNoResult(threadPoolExecutor, futureList);
// 2. 演示 Future<T> submit(Runnable task, T result) 的使用,所有 Future.get() 都返回 result
// submitRunnableWithResult(threadPoolExecutor, futureList);
// 3. 演示 Future<T> submit(Callable<T> task) 的使用,Future返回的是 Callable 的 call() 方法执行的结果
// submitCallable(threadPoolExecutor, futureList);
//遍历 futureList 打印任务执行结果
futureList.forEach(future -> {
try {
System.out.println("Future获取任务执行结果:" + future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
threadPoolExecutor.shutdown();//关闭线程池
}
/**
* 演示 Future<T> submit(Callable<T> task) 的使用,Future返回的是 Callable 的 call() 方法执行的结果
*
* @param threadPoolExecutor
* @param futureList
*/
private static void submitCallable(ThreadPoolExecutor threadPoolExecutor, List<Future> futureList) {
for (int i = 0; i < 5; i++) {
// 每个任务启动一条线程去处理
int index = i;
// Future<T> submit(Callable<T> task) 给线程池提交任务
Future<String> future = threadPoolExecutor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println(Thread.currentThread().getName() + "执行任务..." + index);
String result = "result " + index;
return result;
}
});
futureList.add(future);
}
}
}
运行结果,Future返回的是 Callable 的 call() 方法执行的结果:
pool-1-thread-1执行任务...0
pool-1-thread-3执行任务...2
pool-1-thread-1执行任务...3
pool-1-thread-2执行任务...1
pool-1-thread-3执行任务...4
Future获取任务执行结果:result 0
Future获取任务执行结果:result 1
Future获取任务执行结果:result 2
Future获取任务执行结果:result 3
Future获取任务执行结果:result 4
3.3 shutdown
这个方法只是初始化整个线程池的关闭过程,执行这个方法之后,拒绝加入新任务,执行完当前线程中正在执行的任务和队列中的线程任务后,再进行停止。
会把存量的任务都执行完毕,但是不会加入新任务。
演示shutdown使用
/**
* Description: 线程池中的任务
*
* @author Xander
* datetime: 2020-12-01 18:53
*/
public class Task implements Runnable {
private int index;
public Task(int index) {
this.index = index;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "开始执行任务..."+index);
Thread.sleep(500);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "被中断了");
}
}
}
// 演示 shutdown 使用
public class ShutDownDemo {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executorService = new ThreadPoolExecutor(3, 5, 30, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5));
for (int i = 0; i < 5; i++) {
executorService.execute(new Task(i));
}
// shutdown() 执行后,会把存量的任务都执行完毕,但是不会加入新任务
executorService.shutdown();
// 这个任务会被 拒绝加入
executorService.execute(new Task(5));
}
}
运行结果:
pool-1-thread-1开始执行任务...0
pool-1-thread-3开始执行任务...2
pool-1-thread-2开始执行任务...1
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.xander.juc._11threadPool.executorService.Task@45ee12a7 rejected from java.util.concurrent.ThreadPoolExecutor@330bedb4[Shutting down, pool size = 3, active threads = 3, queued tasks = 2, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.xander.juc._11threadPool.executorService.ShutDownDemo.main(ShutDownDemo.java:22)
pool-1-thread-3开始执行任务...3
pool-1-thread-2开始执行任务...4
3.4 shutdownNow
通过抛出InterruptedException,中断所有正在执行的任务,并返回还未开始执行的任务列表
演示 shutdownNow 使用
/**
* Description: 线程池中的任务
*
* @author Xander
* datetime: 2020-12-01 18:53
*/
public class Task implements Runnable {
private int index;
public Task(int index) {
this.index = index;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "开始执行任务..."+index);
Thread.sleep(500);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "被中断了");
}
}
}
// 演示 shutdownNow 使用
public class ShutDownNowDemo {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executorService = new ThreadPoolExecutor(3, 5, 30, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5));
for (int i = 0; i < 8; i++) {
executorService.execute(new Task(i));
}
// sleep 100ms 后,前3个任务还在工作中,会直接被中断
Thread.sleep(100);
// 线程池立即停止,正在执行任务的线程会被interrupt,并返回还未开始执行的任务列表
List<Runnable> runnables = executorService.shutdownNow();
System.out.println("还未开始执行的任务数:"+runnables.size());
}
}
运行结果:
pool-1-thread-1开始执行任务...0
pool-1-thread-3开始执行任务...2
pool-1-thread-2开始执行任务...1
pool-1-thread-2被中断了
还未开始执行的任务数:5
pool-1-thread-3被中断了
pool-1-thread-1被中断了
3.5. isShutdown
如果 ExecutorService 已经 shut down,也就是已经执行了shutdown() 或 shutdownNow(),返回true
演示 isShutdown 使用
/**
* Description: 线程池中的任务
*
* @author Xander
* datetime: 2020-12-01 18:53
*/
public class Task implements Runnable {
private int index;
public Task(int index) {
this.index = index;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "开始执行任务..."+index);
Thread.sleep(500);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "被中断了");
}
}
}
// 演示 isShutdown 使用
public class IsShutDownDemo {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executorService = new ThreadPoolExecutor(3, 5, 30, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5));
for (int i = 0; i < 5; i++) {
executorService.execute(new Task(i));
}
// isShutdown 如果 ExecutorService 已经 shut down,返回true
System.out.println("isShutdown:"+executorService.isShutdown());
executorService.shutdown();
System.out.println("isShutdown:"+executorService.isShutdown());
}
}
运行结果:
isShutdown:false
pool-1-thread-3开始执行任务...2
pool-1-thread-2开始执行任务...1
pool-1-thread-1开始执行任务...0
isShutdown:true
pool-1-thread-2开始执行任务...3
pool-1-thread-3开始执行任务...4
3.6 isTerminated
如果 shut down 后所有任务都已完成,则返回true,
注意,isTerminated() 永远不是 true ,除非先调用 shutdown() 或 shutdownNow()
演示 isTerminated 使用
public class IsTerminatedDemo {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executorService = new ThreadPoolExecutor(3, 5, 30, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5));
for (int i = 0; i < 5; i++) {
executorService.execute(new Task(i));
}
// isShutdown 如果 ExecutorService 已经 shut down,返回true
System.out.println("isShutdown:" + executorService.isShutdown());
executorService.shutdown();
System.out.println("isShutdown:" + executorService.isShutdown());
Thread.sleep(100);
// 如果 shut down 后所有任务都已完成,则返回true,<br>注意,isTerminated() 永远不是 true ,除非先调用 shutdown() 或 shutdownNow()
System.out.println("after 100ms isTerminated:" + executorService.isTerminated());
Thread.sleep(1000);
System.out.println("after 1000ms isTerminated:" + executorService.isTerminated());
}
}
运行结果:
pool-1-thread-2开始执行任务...1
pool-1-thread-3开始执行任务...2
isShutdown:false
pool-1-thread-1开始执行任务...0
isShutdown:true
after 100ms isTerminated:false
pool-1-thread-1开始执行任务...3
pool-1-thread-3开始执行任务...4
after 1000ms isTerminated:true
3.7 awaitTermination
阻塞直到 shutdown()方法执行后,所有任务都已执行完成,或直到发生超时,或直到当前线程中断(interrupted)(以先发生者为准)。
如果 executor 终止(terminated) 返回true
,而如果在终止之前超时返回false
public class AwaitTerminationDemo {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executorService = new ThreadPoolExecutor(3, 5, 30, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5));
for (int i = 0; i < 5; i++) {
executorService.execute(new Task(i));
}
// isShutdown 如果 ExecutorService 已经 shut down,返回true
System.out.println("isShutdown:" + executorService.isShutdown());
executorService.shutdown();
System.out.println("isShutdown:" + executorService.isShutdown());
// 阻塞直到 shutdown()方法执行后,所有任务都已执行完成,或直到发生超时,或直到当前线程中断(interrupted)(以先发生者为准)。
boolean b = executorService.awaitTermination(300, TimeUnit.MILLISECONDS);
System.out.println("awaitTermination 300ms : " + b);
boolean termination = executorService.awaitTermination(3, TimeUnit.SECONDS);
System.out.println("awaitTermination 3s : " + termination);
// 如果 shut down 后所有任务都已完成,则返回true,注意,isTerminated() 永远不是 true ,除非先调用 shutdown() 或 shutdownNow()
System.out.println("isTerminated:" + executorService.isTerminated());
}
}
运行结果:
pool-1-thread-1开始执行任务...0
pool-1-thread-3开始执行任务...2
isShutdown:false
pool-1-thread-2开始执行任务...1
isShutdown:true
awaitTermination 300ms : false
pool-1-thread-1开始执行任务...3
pool-1-thread-2开始执行任务...4
awaitTermination 3s : true
isTerminated:true
小结:
这篇文章用代码演示了如何通过 ThreadPoolExecutor 和 Executors的静态工厂方法创建线程池,介绍了 ExecutorService 中的核心API,并用Demo演示如何通过这些API来管理线程池。
代码:
github.com/wengxingxia/002juc.git