手记

深入剖析Java线程池与Executor框架(一) : 任务的抽象

摘要:

  在《Java 并发:并发背景》一文中,从操作系统演进的角度解释了进程与线程出现的背景与原因。简单地说,进程实现了操作系统级别的并发,线程实现了进程级别的并发。通过使用多个线程来执行任务,不仅可以极大提高资源的利用率,而且可以满足很多场景下的实时性要求,实现任务并发、异步执行的效果。
  事实上,线程扮演的是Worker的角色,是Task的执行者。从Java的角度来看,Thread实现了Worker的角色,Runable、Callable以及FutureTask都可以扮演Task的角色。其中,Runable应该是我们最为熟悉的封装任务的方式,但其天生就存在一个显著的不足:在执行完任务之后,无法获取执行结果。也就是说,若程序需要获取另一个线程的执行结果,就得通过共享变量或者线程间通信的方式来实现,这是相当麻烦的。但在Java 1.5之后,Executor框架的推出使得使用Java进行并发编程变得简单而有效。Callable与FutureTask作为Executor框架的一部分,可以更好的完成对任务的封装,使得我们可以轻松实现对任务执行过程的控制和对任务执行结果的获取。
  下面我们就围绕Runable、Callable和FutureTask展开对Java并发编程中的任务模型的探讨,并以此奠定学习线程池、Executor框架等后续并发知识的基础。


一. 并发的真相

  并发,见文知意,即多个进程或线程同时执行,其需要操作系统的支持。广为大家所使用的Windows、Linux和Uinux都是支持多线程、多进程的操作系统。DOS就不支持多进程、多线程模式了,它是一个单进程、单线程操作系统,所有的程序都是串行执行的,即在同一个时间点只能有一个进程在执行。

  支持多进程、多线程的操作系统的CPU真的那么神通广大,能够同时执行多个程序吗?当然不是了。事实是这样的,CPU的运算速度很快,一秒钟至少可以进行数亿次运算,因此,CPU把自己的时间分成一个个很小的时间片,时间片a执行程序A一小段时间,下一个时间片b执行程序B,再下一个时间片C又执行程序。这样,虽然有数十个线程,但一样可以在很短的时间内把它们通通都执行一遍。由于CPU的执行速度太快了,我们人类根本感受不到这种上下文切换带来的停顿,因此看起来就像是在同一时间执行所有的线程一样。但事实上,在一个时间点上,CPU只能被一个线程所使用。当然,如果一个机器是多CPU或者是单CPU多核的,那么就可以实现真正意义上的并发。

  线程是CPU执行的基本单位,是最小粒度的并发。在实践中,我们常常把耗时的操作单独封装成一个任务交给所创建的线程去执行,这样就不用一直阻塞在主线程上,从而提高资源利用率和程序执行效率。由此可见,对任务的封装是并发编程的第一步。常见的任务封装方式有Runnable、Callable和FutureTask三种,下面一一进行介绍。


二. Runnable与Callable

  Runnable是一个功能性接口,其只声明了一个run()方法,如下:

@FunctionalInterfacepublic interface Runnable {
    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see     java.lang.Thread#run()
     */
    public abstract void run();
}

  由于run()方法返回值为void类型,所以在任务执行完成后,我们无法直接获取到任务的执行结果。Callable的call()方法类似于Runnable的run()方法,都是用来定义任务所要完成的工作,通过实现这两个接口并重写这两个方法实现对任务的封装。不同之处在于,call()方法不但是有返回值的(该方法的返回类型就是泛型接口传递进来的泛型参数V),还可以抛出异常。

@FunctionalInterfacepublic interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

  Callable任务运行之后可以拿到一个表示异步计算结果的Future对象,通过它可以控制任务的执行状态、获取任务的执行结果。接下来,我们将对Future和FutureTask进行介绍,然后给出Callable的一般使用方式。


三. Future与FutureTask

1、Future接口

  Future类是J.U.C包下的一个接口,完成了对任务执行状态和执行结果的封装。通过Future可以实现取消任务、查询任务和获取任务结果等操作。Future接口的定义为:

public interface Future<V> {
    /**
     * Attempts to cancel execution of this task.  This attempt will
     * fail if the task has already completed, has already been cancelled,
     * or could not be cancelled for some other reason. If successful,
     * and this task has not started when {@code cancel} is called,
     * this task should never run.  If the task has already started,
     * then the {@code mayInterruptIfRunning} parameter determines
     * whether the thread executing this task should be interrupted in
     * an attempt to stop the task.
     *
     * <p>After this method returns, subsequent calls to {@link #isDone} will
     * always return {@code true}.  Subsequent calls to {@link #isCancelled}
     * will always return {@code true} if this method returned {@code true}.
     *
     * @param mayInterruptIfRunning {@code true} if the thread executing this
     * task should be interrupted; otherwise, in-progress tasks are allowed
     * to complete
     * @return {@code false} if the task could not be cancelled,
     * typically because it has already completed normally;
     * {@code true} otherwise
     */
    boolean cancel(boolean mayInterruptIfRunning);    /**
     * Returns {@code true} if this task was cancelled before it completed
     * normally.
     *
     * @return {@code true} if this task was cancelled before it completed
     */
    boolean isCancelled();    /**
     * Returns {@code true} if this task completed.
     *
     * Completion may be due to normal termination, an exception, or
     * cancellation -- in all of these cases, this method will return
     * {@code true}.
     *
     * @return {@code true} if this task completed
     */
    boolean isDone();    /**
     * Waits if necessary for the computation to complete, and then
     * retrieves its result.
     *
     * @return the computed result
     * @throws CancellationException if the computation was cancelled
     * @throws ExecutionException if the computation threw an
     * exception
     * @throws InterruptedException if the current thread was interrupted
     * while waiting
     */
    V get() throws InterruptedException, ExecutionException;    /**
     * Waits if necessary for at most the given time for the computation
     * to complete, and then retrieves its result, if available.
     *
     * @param timeout the maximum time to wait
     * @param unit the time unit of the timeout argument
     * @return the computed result
     * @throws CancellationException if the computation was cancelled
     * @throws ExecutionException if the computation threw an
     * exception
     * @throws InterruptedException if the current thread was interrupted
     * while waiting
     * @throws TimeoutException if the wait timed out
     */
    V get(long timeout, TimeUnit unit)        throws InterruptedException, ExecutionException, TimeoutException;
}

  Future接口中声明了以下5个方法,通过这5个方法我们可以方便地控制任务的执行并获取任务的执行状态和结果:

  • cancel方法用于任务的取消。

  • isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。

  • isDone方法表示任务是否已经完成,若任务完成,则返回true;

  • get()方法用来获取异步任务的执行结果,若任务未完成,则该方法会阻塞直到任务执行完毕才返回;

  • get(long timeout, TimeUnit unit)也用来获取异步任务的执行结果,若在指定时间内还未获取到结果,就抛出java.util.concurrent.TimeoutException。


  总的来说,我们通过Future可以干三件事:控制异步任务的执行、获取异步任务的执行状态以及获取异步任务的执行结果。但是,Future只是对异步任务执行情况的抽象,是一个接口,不能直接使用,因此只能通过其实现类FutureTask完成以上操作。


2、FutureTask实现类

  Future只是一个接口,不能直接用来创建对象,FutureTask是Future唯一的实现类。从下图可以看到,FutureTask实现了RunnableFuture接口,而RunnableFuture接口继承了Runnable接口和Future接口,因此,FutureTask既能当做一个Runnable任务直接被Thread执行,也能作为Future用来获取Callable任务的计算结果。FutureTask的继承结构和部分实现如下所示:

        

public class FutureTask<V> implements RunnableFuture<V> {.../**  构造方法一
     * Creates a {@code FutureTask} that will, upon running, execute the
     * given {@code Callable}.
     *
     * @param  callable the callable task
     * @throws NullPointerException if the callable is null
     */
    public FutureTask(Callable<V> callable) {        if (callable == null)            throw new NullPointerException();        this.callable = callable;        this.state = NEW;       // ensure visibility of callable
    }/**  构造方法二
     * Creates a {@code FutureTask} that will, upon running, execute the
     * given {@code Runnable}, and arrange that {@code get} will return the
     * given result on successful completion.
     *
     * @param runnable the runnable task
     * @param result the result to return on successful completion. If
     * you don't need a particular result, consider using
     * constructions of the form:
     * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
     * @throws NullPointerException if the runnable is null
     */
    public FutureTask(Runnable runnable, V result) {        this.callable = Executors.callable(runnable, result);        this.state = NEW;       // ensure visibility of callable
    }

    ...

}

  RunnableFuture接口的定义如下:

/**
 * A {@link Future} that is {@link Runnable}. Successful execution of
 * the {@code run} method causes completion of the {@code Future}
 * and allows access to its results.
 * @see FutureTask
 * @see Executor
 * @since 1.6
 * @author Doug Lea
 * @param <V> The result type returned by this Future's {@code get} method
 */public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

四. 最佳使用组合

  那么,什么情况下使用以及如何使用Callable、Future以及FutureTask呢?想必最为典型的使用场景就是线程池了。在ExecutorService接口中声明了若干个submit方法的重载版本:

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

  Callable一般是和ExecutorService配合来使用的。一般情况下,我们使用第一个submit方法和第三个submit方法,第二个submit方法很少使用。此外,我们可以使用FutureTask来抽象任务并获取任务的执行情况。


 1、使用Callable+Future获取执行结果

public class Test {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        Future<Integer> result = executor.submit(task);
        executor.shutdown();        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }

        System.out.println("主线程在执行任务");        try {
            System.out.println("task运行结果"+result.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        System.out.println("所有任务执行完毕");
    }
}
class Task implements Callable<Integer>{    @Override
    public Integer call() throws Exception {
        System.out.println("子线程在进行计算");
        Thread.sleep(3000);        int sum = 0;        for(int i=0;i<100;i++)
            sum += i;        return sum;
    }
}/* Output: 子线程在进行计算
            主线程在执行任务
            task运行结果4950
            所有任务执行完毕
 *///:~

 2、使用Callable+FutureTask获取执行结果

public class Test {
    public static void main(String[] args) {        //第一种方式
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        executor.submit(futureTask);
        executor.shutdown();        //第二种方式,注意这种方式和第一种方式效果是类似的,只不过第一种方式的背景是线程池,第二中方式背景是单个Thread
        /*Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        Thread thread = new Thread(futureTask);
        thread.start();*/

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }

        System.out.println("主线程在执行任务");        try {
            System.out.println("task运行结果"+futureTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        System.out.println("所有任务执行完毕");
    }
}
class Task implements Callable<Integer>{    @Override
    public Integer call() throws Exception {
        System.out.println("子线程在进行计算");
        Thread.sleep(3000);        int sum = 0;        for(int i=0;i<100;i++)
            sum += i;        return sum;
    }
}/* Output: 子线程在进行计算
            主线程在执行任务
            task运行结果4950
            所有任务执行完毕
 *///:~

  特别地,如果为了可取消性而使用 Future 但又不提供可用的结果,则可以声明 Future<?> 形式类型、并返回 null 作为底层任务的结果。


引用

《Java并发编程:Callable、Future和FutureTask》

原文出处

0人推荐
随时随地看视频
慕课网APP