手记

多线程之线程池(六)

Java中线程池是运用最多的并发框架,几乎所有并发的程序都可以使用线程池来完成。阿里巴巴Java开发手册中明确指出:

线程资源必须通过线程池提供,不允许在应用中自行显示创建线程。

与文无关

在实际的生产环境中,线程的数量必须得到控制,盲目的大量创建线程对系统性能是有伤害的,合理使用线程好处:

  • 减少在创建和销毁现场上所消耗的时间和系统资源

  • 提高响应速度,无需创建可以直接运行

  • 提高线程的可管理性。使用线程池可以进行统一分配,调优和监控,但是要做到合理利用线程池,必须对其原理了如指掌。

线程池工作原理

当向线程池提交一个任务的时候。

  1. 先看线程池中的核心线程是否有空闲的,如果有创建一个工作线程来执行任务。如果核心线程都在工作,那么进入下一步

  2. 判断任务队列是否满了,如果任务队列未满,则把任务存储到任务队列,执行下一步。如果满了,执行拒绝策略。

  3. 添加到任务队列之后,再判断核心线程是否有空闲的,如果没有空闲的,那么尝试创建新的非核心线程执行任务。

 // 线程池处理步骤
 public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        /*
         *  总共有是三个步骤
         */
        int c = ctl.get();        // 如果正在运行的线程数小于核心线程数,则创建工作线程
        if (workerCountOf(c) < corePoolSize) {            if (addWorker(command, true))                return;
            c = ctl.get();
        }       // 线程池正在运行,把任务加入任务队列成功
       if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();            
            //  如果线程池已经关闭,那么从任务队列移除任务,并且执行拒绝策略。
            if (! isRunning(recheck) && remove(command))
                reject(command);            // 可供使用的工作线程为0,那么创建新的非核心线程    
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }        //如果线程池不在运行或者未能加入任务队列,执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

image.png

创建线程池

JDK内部已经提供了Executors类,它扮演者线程池工厂的角色,通过它可以取得拥有特定功能的线程池,但是我们最好手动创建线程池。原因如下:

  1. Executors内部也是直接构造线程池对象,没有额外的操作

  2. 手动创建线程池,我们更明白线程池的参数,方便调优。

  3. Executors创建的线程池有可能导致OOM异常。

虽然不建议直接使用Executors直接创建线程池,但是我们可以看一下它给我们提供了那些工厂方法:

     // 返回一个可根据实际情况调整线程数量的线程池
     // 它是大小无界的线程池,适合执行很多短期一步的小程序,或是负载比较轻的服务器。
     public static ExecutorService newCachedThreadPool() {        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                                      60L, TimeUnit.SECONDS,                                      new SynchronousQueue<Runnable>());
     }     
    // 返回一个固定线程数量的线程池
    // 适合负载比较重的服务器
     public static ExecutorService newFixedThreadPool(int nThreads) {        return new ThreadPoolExecutor(nThreads, nThreads,                                      0L, TimeUnit.MILLISECONDS,                                      new LinkedBlockingQueue<Runnable>());
    }    
    // 返回一个固定线程数量的线程池对象,ScheduledThreadPoolExecutor对象可以定时执行某任务
    // 适合于多个后台线程执行周期任务。
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {        return new ScheduledThreadPoolExecutor(corePoolSize);
    }    
    // 返回只有一个线程的线程池。
    // 适合于单个线程顺序的执行任务
    public static ExecutorService newSingleThreadExecutor() {        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,                                    0L, TimeUnit.MILLISECONDS,                                    new LinkedBlockingQueue<Runnable>()));
    }    // 返回只有一个线程的ScheduledThreadPoolExecutor对象。
    // 单个线程执行周期任务
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }

本质上,我们可以通过ThreadPoolExecutor来创建线程池:

new ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {

参数如下:

  1. corePoolSize: 线程池的基本大小。当提交一个任务的时候,线程池就会创建一个新的线程执行任务,即使核心线程池中有空闲线程,也会新建,知道线程池中的数量等于corePoolSize就不再创建。如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有的线程。

  2. maximumPoolSize:线程池允许创建的最大线程数。当使用无界队列的时候,这个参数就没什么效果了。

  3. keepAliveTime:线程池的工作线程空闲以后,保持存活的时间,如果任务多,并且任务执行时间段,可以调大时间,提高线程的利用率。

  4. unit 保活时间的单位

  5. workQueue: 任务队列,用于保持或等待执行的任务阻塞队列。有如下队列可供选择:

  • ArrayBlockingQueue: 基于数组结构的有界队列,此队列按FIFO原则对元素进行排序

  • LinkedBlockingQueue: 基于链表的阻塞队列,FIFO原则,吞吐量通常高于ArrayBlockingQueue.

  • SynchronousQueue: 不存储元素的阻塞队列。每个插入必须要等到另一个线程调用移除操作。

  • PriorityBlockingQueue: 具有优先级的无阻塞队列

threadFactory:用于设置创建线程的工厂。

handler:拒绝策略,当队列线程池都满了,必须采用一种策略来处理还要提交的任务。在实际应用中,我们可以将信息记录到日志,来分析系统的负载和任务丢失情况JDK中提供了4中策略:

  • AbortPolicy: 直接抛出异常

  • CallerRunsPolicy: 只用调用者所在的线程来运行任务

  • DiscardOldestPolicy: 丢弃队列中最老的一个人任务,并执行当前任务。

  • DiscardPolicy: 直接丢弃新进来的任务

知道如上参数,再去分析Executors框架,聪明的你一定知道是怎么回事了。

执行任务

可以使用两个方法:

  • execute() 提交不需要返回值的任务,无法判断是否执行成功,具体步骤上面我们有分析

  • submit() 提交有返回值的任务,该方法返回一个future的对象,通过future对象可以判断任务是否执行成功。future的get方法会阻塞当前线程直到任务完成。

关闭线程池

两个方法:

  • shutdown() 通知线程该结束了,尝试用终端来停止线程,如果线程对中断不响应的话,那么这个方法无法关闭线程池。

  • shutdownNow() 看名字就知道是立刻关闭线程池,类似于线程的stop方法,不等待任务执行完成就关闭线程。

扩展线程池

有时候需要对线程池做一些扩展,比如知道线程池的开始结束时间,线程池的运行统计等信息。这个时候好在ThreadPoolExecutor给我们提供了三个方法进行扩展:

 protected void beforeExecute(Thread t, Runnable r) { } protected void afterExecute(Runnable r, Throwable t) { } protected void terminated() { }

可以监控的属性:

  • taskCount: 线程池需要执行的任务数量

  • completedTaskCount: 已经完成的任务数量

  • largestPoolSize: 线程池中曾经创建的最大的线程数量

  • getPoolSize: 线程池的线程数量

  • getActiveCount: 活动的线程数

合理配置线程池

线程池中线程的数量过大和过小都无法使系统的性能发挥到最优,确定线程池的大小可以考虑下面的角度:

  • 任务性质:CPU密集,IO密集,和混合密集

  • 任务执行时间:长,中,低

  • 任务优先级:高,中,低

  • 任务的依赖性:是否依赖其它资源,如数据库连接

建议使用有界队列,防止撑爆内存

在Java中,获取CPU数量:

Runtime.getRuntime().availableProcessors();

线程池计算公式:

N = CPU数量
U = 目标CPU使用率,  0 <= U <= 1
W/C = 等待(wait)时间与计算(compute)时间的比率

线程池数量 =  N * U * (1 + W/C)

线程调度

在多线程竞争的情况下,肯定要涉及到线程调度的问题。线程调度是指系统为线程分配处理器的过程,主要调度方式有两种。

  • 协同步式线程调度(Cooperative Threads-Schedulin) :线程的执行时间由线程本身控制,执行完任务之后通知系统切换到另外一个线程上。 实现简单,但是如果一个线程坚持不让出CPU,那么会导致整个系统崩溃。

  • 抢占式线程调度(Preemptive Threads-Schedulng):由系统分配时间,线程切换不由线程本身决定,这种情况下线程的执行时间是系统可控的,也不会有某线程出现问题导致进程阻塞的问题。

Java使用的是抢占式线程调度。

最后

线程池与普通线程的区别不会太多,只是更好的利用了系统资源,任何使用到线程的地方都可以使用线程池来替代。关于线程池,我们就说到这里。

参考

  • 《Java并发编程的艺术》

  • 《Java高并发程序设计》

0人推荐
随时随地看视频
慕课网APP