为什么使用的线程数高于要求?

我有一个 SpringBoot 应用程序,我最多允许 45 个并发请求。 现在,旅程中的1 个请求使用.parallel并行调用16 个外部服务threadPool A。因此,请牢记平均情况和最坏情况,我一直遵循以下配置:


ThreadPoolTaskExecutor A = new ThreadPoolTaskExecutor();

A.setCorePoolSize(400);

A.setMaxPoolSize(1000);

A.setQueueCapacity(10);

A.setThreadNamePrefix("async-executor");

A.initialize();

我的期望是最多使用 45*16 = 720 个线程。但是在运行负载测试时,我观察到线程不断打开(在线程转储中检查),几分钟后它开始给出 RejectedExecutionException。


RejectedExecutionException

Task ServiceX rejected from org.springframework.scheduling.concurrent.

ThreadPoolTaskExecutor$1@4221a19e[Running, pool

size = 1000, active threads = 2, queued tasks = 10, completed tasks = 625216]

线程转储中显示的大多数线程


"executor-A-57" #579 prio=5 os_prio=0 tid=0x000000000193f800 nid=0x2e95 waiting on condition [0x00007fa9e820c000]

   java.lang.Thread.State: TIMED_WAITING (parking)

    at sun.misc.Unsafe.park(Native Method)

    - parking to wait for  <0x0000000582dadf90> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)

    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)

    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)

    at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)

    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)

    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

    at java.lang.Thread.run(Thread.java:748)


   Locked ownable synchronizers:

    - None

我想知道我在这里缺少什么?为什么我会被拒绝?


慕姐4208626
浏览 106回答 2
2回答

富国沪深

这是一个有趣的。您列出的代码失败的原因是因为将元素从工作队列传输到工作线程所花费的时间比主线程将项目放入队列所花费的时间慢。流程是这样的:if(there are active threads and is there availability on the queue){&nbsp; &nbsp; submit to the work queue for the worker threads to pick up // 1} else {&nbsp; &nbsp;if(max pool size is not met){&nbsp; &nbsp; &nbsp; create a new thread with this task being its first task // 2&nbsp; &nbsp;} else {&nbsp;&nbsp; &nbsp; &nbsp; reject // 3&nbsp; &nbsp;}}&nbsp;您看到的是代码命中// 3.首次提交任务时,线程数将小于最大池大小。第一轮提交的任务将到达// 2。第一次迭代后,活动线程的数量将是最大池大小,代码将尝试提交到// 1.假设主线程非常非常快地将 3 个项目放入队列,因此 ThreadPool 中的 4 个线程无法足够快地取出一个。如果发生这种情况,我们将传递第一个 if 语句(因为队列中没有可用性)并转到 else。由于已经满足最大池大小,因此除了reject.这可以通过检查ThreadPoolExecutor Javadocs进一步解释。如果请求无法排队,则会创建一个新线程,除非这会超过 maximumPoolSize,在这种情况下,任务将被拒绝。然后直接切换通常需要无限的 maximumPoolSizes 以避免拒绝新提交的任务。当命令的平均到达速度继续快于它们的处理速度时,这反过来又承认了无限线程增长的可能性。要解决您的问题,您有两个合理的选择:使用同步队列。提供给 SynchronousQueue 的线程将无限期地等待,直到另一个线程获取该项目(如果它知道另一个线程正在等待接收它)。您定义的固定队列大小将导致主线程在放置不成功时返回(不阻塞)(即,另一个线程不会立即将其取消)。要使用 Spring 使用 SynchronousQueue,请将队列容量设置为零。setQueueCapacity(0).&nbsp;也来自 Javadocs工作队列的一个很好的默认选择是 SynchronousQueue,它将任务交给线程而不用其他方式持有它们。将队列大小设置为大于或等于您希望提交的并发任务数。队列的大小一般不会达到那个大小,但它会在未来保护你。

PIPIONE

我建议通过添加 1 个输出任务执行程序的记录器行来对此进行测试,然后对不同的 16 次调用和 45 次请求进行计数。可能会发生很多事情。也许 ThreadPoolTaskExecutor 不是一个 bean,而 spring 会选择另一个在您的应用程序中配置的 bean。也许应用程序的其他部分也在使用异步调用永远循环的代码中可能存在一些错误ETC...但是,如果您没有单元测试,一个好的开始是简单地记录正在发生的事情并分析您的日志。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java