手记

使用线程池的多种姿势

线程池大家肯定再熟悉不过了,什么核心参数,源码分析等等。所以这里就不介绍这些内容了。今天主要来看看在
一些开源框架中,都是怎么玩转线程池的呢。


1-RPC框架中的线程池

在Motan这个RPC框架中,netty服务端处理业务代码的地方就使用了如下线程池:
//server 处理任务的线程池创建和预先启动核心线程
standardThreadExecutor = (standardThreadExecutor != null && 
!standardThreadExecutor.isShutdown()) ? standardThreadExecutor
        : new StandardThreadExecutor(minWorkerThread, maxWorkerThread, 
        workerQueueSize, new DefaultThreadFactory("NettyServer-" + 
        url.getServerPortStr(), true));
standardThreadExecutor.prestartAllCoreThreads();
在这个线程池的说明中,已经体现了这个线程池和我们常见的普通线程池不同之处:
threadPoolExecutor execute执行策略:优先offer到queue,queue满后再扩充线程到maxThread,如果已经到了
maxThread就reject 比较适合于CPU密集型应用(比如runnable内部执行的操作都在JVM内部,memory copy,
or compute等等)
 
StandardThreadExecutor execute执行策略: 优先扩充线程到maxThread,再offer到queue,如果满了就
reject比较适合于业务处理需要远程资源的场景。
StandardThreadExecutor主要是利用了自己实现的ExecutorQueue队列来实现上述功能的。
super(coreThreads, maxThreads, keepAliveTime, unit, new ExecutorQueue(), threadFactory,
 handler);
((ExecutorQueue) getQueue()).setStandardThreadExecutor(this);

ExecutorQueue队列中的核心方法如下:
// 注:tomcat的代码进行一些小变更 
public boolean offer(Runnable o) {
   int poolSize = threadPoolExecutor.getPoolSize();

   // we are maxed out on threads, simply queue the object
   //这里判断线程池工作线程的大小是否等于线程池最大线程数,如果等于则放入队列中
   if (poolSize == threadPoolExecutor.getMaximumPoolSize()) {
      return super.offer(o);
   }
   // we have idle threads, just add it to the queue
   // note that we don't use getActiveCount(), see BZ 49730
   //这里判断正在处理的任务数,是否小于工作线程数,如果小于则加入队列。说明还有空闲的线程。
   if (threadPoolExecutor.getSubmittedTasksCount() <= poolSize) {
      return super.offer(o);
   }
   //如果上面都不满足,说明已经没有空闲线程了。
   //这里会判断当前线程池中工作线程是否小于线程池的最大线程数,
   //如果小于则会继续给任务创建新的线程进行执行。这里返回false,说明加入队列失败,
   //线程池就会去进行addWorker操作,继续创建新的线程来执行任务。
   // if we have less threads than maximum force creation of a new
   // thread
   if (poolSize < threadPoolExecutor.getMaximumPoolSize()) {
      return false;
   }
   //否则加入队列
   // if we reached here, we need to add it to the queue
   return super.offer(o);
}
注释已经说的很明白了。大家可以去找一下完整的代码去学习一下。
到这里我们发现我们完全可以根据自己的需求来改变对线程池使用的方式。比如,我们还可以在线程池的拒绝策
略上做一些改变。
自己实现一个拒绝策略如下:
private class BlockRejectedExecutionHandler implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
        try {
            //log.info("current thread task exceed thread pool define size :
             {}", maxThread);
            // 改造点,由blockingqueue的offer改成put阻塞方法
            executor.getQueue().put(runnable);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
这段代码在有界队列的线程池队列已经满了的情况下,可以起到什么作用大家可以看代码分析一下。


2-netty中的线程池

最近在学习一个netty的专栏,迷迷糊糊,发现netty中的线程池也很有意思。使用起来和普通线程池差不多。
DefaultEventLoopGroup eventLoopGroup = new DefaultEventLoopGroup(5);
eventLoopGroup.execute(() -> {
    System.out.println("thread: " + Thread.currentThread().getName());
});

DefaultEventLoopGroup主要是继承MultithreadEventExecutorGroup这个类。

1-根据传入的线程数构造一个EventExecutor数组children = new EventExecutor[nThreads];
EventExecutor代表具体执行任务的类,等会来看看其具体实现

2-将EventExecutor数组传入到chooserFactory中: chooser = chooserFactory.newChooser(children);
chooserFactory默认有两种实现,作用是选择哪个EventExecutor去执行我们的任务。

3-eventLoopGroup.execute方法
public void execute(Runnable command) {
    next().execute(command);
}
这里的next()实际调用的是chooser.next().也就是chooserFactory的实现类中的方法.比如轮训获取
EventExecutor执行器来执行任务:
public EventExecutor next() {
    return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}


接着看execute提交任务,选择了一个EventExecutor进行提交任务
最终在SingleThreadEventExecutor的execute方法中,将任务加入了队列中
taskQueue.offer(task);

那任务是如何运行的呢?
SingleThreadEventExecutor的execute方法中
在第一次加入队列的时候会启动一个线程startThread();
这个线程会执行:
SingleThreadEventExecutor.this.run();方法调用DefaultEventLoop中的run方法。

最后来看一下run方法怎么运行任务的,就是不断从队列中获取任务然后执行:
protected void run() {
    for (;;) {
        Runnable task = takeTask();
        if (task != null) {
            task.run();
            updateLastExecutionTime();
        }

        if (confirmShutdown()) {
            break;
        }
    }
}

总体来看netty的线程池有如下特点:
首先初始化多个EventExecutor,每个EventExecutor中维护了一个队列,任务会加入到队列中
,并且每个EventExecutor会启动一个线程,从队列中获取任务,来真正的运行任务,由此可见,这里
不能执行耗时的操作,否则会造成任务的等待。
在提交任务到EventExecutor的时候可以通过自己实现chooserFactory来决定任务由哪个EventExecutor来执行。

有了上面的理解,我想在某些场景是可以考虑的。


3-线程隔离

对于线程隔离,我们来看一种比较简单的实现,在调度系统中,收到任务执行的请求时,通常会将请求丢进线程池。
我们来看看在xxl-job中是怎么做的:
调度线程池隔离,拆分为"Fast"和"Slow"两个线程池,1分钟窗口期内任务耗时达500ms超过10次,
该窗口期内判定为慢任务,慢任务自动降级进入"Slow"线程池,避免耗尽调度线程,提高系统稳定性;
我们来看看代码,主要实现类是:JobTriggerPoolHelper
首先初始化了两个线程池:
fastTriggerPool = new ThreadPoolExecutor(
        10,
        XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
        60L,
        TimeUnit.SECONDS,
        new LinkedBlockingQueue<Runnable>(1000),
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "xxl-job, 
                admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
            }
            
        });

slowTriggerPool = new ThreadPoolExecutor(
        10,
        XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
        60L,
        TimeUnit.SECONDS,
        new LinkedBlockingQueue<Runnable>(2000),
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "xxl-job, 
                admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
                
            }
        });
        
通过map来维护任务和次数
private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = 
new ConcurrentHashMap<>();   

当任务的次数达到一定数量则会自动切换线程池:
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
    triggerPool_ = slowTriggerPool;
}   
   
具体实现可以去直接阅读JobTriggerPoolHelper的源码,在xxl-job中。 xxl-job框架比较活跃,有兴趣
可以对其深入学习一下。




0人推荐
随时随地看视频
慕课网APP