猿问

Java ForkJoinPool - 队列中的任务顺序

我想了解在 Java fork-join 池中处理任务的顺序。

到目前为止,我在文档中找到的唯一相关信息是关于名为“asyncMode”的参数,“如果此池对从未加入的分叉任务使用本地先进先出调度模式,则为真” .

我对这句话的理解是每个worker都有自己的任务队列;工作人员从自己队列的前面获取任务,如果自己的队列为空,则从其他工作人员的队列后面窃取任务;如果 asyncMode 为真(相应的假),工作人员会将新分叉的任务添加到他们自己队列的后面(相应的前面)。

如果我的解释有误,请纠正我!

现在,这提出了几个问题:

1)加入的分叉任务的顺序什么?

我的猜测是,当一个任务被分叉时,它会被添加到工作人员的队列中,如我上面的解释中所述。现在,假设任务已加入...

  • 如果在调用 join 时任务尚未启动,则调用 join 的 worker 会将任务从队列中拉出并立即开始处理它。

  • 如果在调用 join 时该任务已被另一个 worker 窃取,则调用 join 的 worker 将同时处理其他任务(按照我上面解释中描述的获取任务的顺序),直到它被加入已经被偷走它的工人完成了。

这个猜测是基于用打印语句编写简单的测试代码,并观察改变连接调用顺序影响任务处理顺序的方式。有人可以告诉我我的猜测是否正确?

2) 外部提交的任务排序是什么?

根据这个问题的答案,fork-join 池不使用外部队列。(顺便说一下,我正在使用 Java 8。)

那么我是否理解在外部提交任务时,该任务会被添加到随机选择的工作队列中?

如果是,外部提交的任务是加在队列的后面还是前面?

最后,这取决于任务是通过调用 pool.execute(task) 还是通过调用 pool.invoke(task) 提交?这是否取决于调用 pool.execute(task) 或 pool.invoke(task) 的线程是外部线程还是此 fork-join 池中的线程?


呼啦一阵风
浏览 286回答 1
1回答

皈依舞

你的猜测是正确的,你完全正确。正如您在“实施概述”中所读到的那样。&nbsp;* Joining Tasks&nbsp;* =============&nbsp;*&nbsp;* Any of several actions may be taken when one worker is waiting&nbsp;* to join a task stolen (or always held) by another.&nbsp; Because we&nbsp;* are multiplexing many tasks on to a pool of workers, we can't&nbsp;* just let them block (as in Thread.join).&nbsp; We also cannot just&nbsp;* reassign the joiner's run-time stack with another and replace&nbsp;* it later, which would be a form of "continuation", that even if&nbsp;* possible is not necessarily a good idea since we may need both&nbsp;* an unblocked task and its continuation to progress.&nbsp; Instead we&nbsp;* combine two tactics:&nbsp;*&nbsp;*&nbsp; &nbsp;Helping: Arranging for the joiner to execute some task that it&nbsp;*&nbsp; &nbsp; &nbsp; would be running if the steal had not occurred.&nbsp;*&nbsp;*&nbsp; &nbsp;Compensating: Unless there are already enough live threads,&nbsp;*&nbsp; &nbsp; &nbsp; method tryCompensate() may create or re-activate a spare&nbsp;*&nbsp; &nbsp; &nbsp; thread to compensate for blocked joiners until they unblock.2.ForkJoinPool.invoke和ForkJoinPool.join在提交任务的方式上是完全一样的。你可以在代码中看到&nbsp; &nbsp; public <T> T invoke(ForkJoinTask<T> task) {&nbsp; &nbsp; &nbsp; &nbsp; if (task == null)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; throw new NullPointerException();&nbsp; &nbsp; &nbsp; &nbsp; externalPush(task);&nbsp; &nbsp; &nbsp; &nbsp; return task.join();&nbsp; &nbsp; }&nbsp; &nbsp; public void execute(ForkJoinTask<?> task) {&nbsp; &nbsp; &nbsp; &nbsp; if (task == null)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; throw new NullPointerException();&nbsp; &nbsp; &nbsp; &nbsp; externalPush(task);&nbsp; &nbsp; }在 externalPush 中,您可以看到使用 ThreadLocalRandom 将任务添加到随机选择的工作队列中。此外,它使用推送方法进入队列的头部。&nbsp; &nbsp; final void externalPush(ForkJoinTask<?> task) {&nbsp; &nbsp; &nbsp; &nbsp; WorkQueue[] ws; WorkQueue q; int m;&nbsp; &nbsp; &nbsp; &nbsp; int r = ThreadLocalRandom.getProbe();&nbsp; &nbsp; &nbsp; &nbsp; int rs = runState;&nbsp; &nbsp; &nbsp; &nbsp; if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; U.compareAndSwapInt(q, QLOCK, 0, 1)) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ForkJoinTask<?>[] a; int am, n, s;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if ((a = q.array) != null &&&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; (am = a.length - 1) > (n = (s = q.top) - q.base)) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; int j = ((am & s) << ASHIFT) + ABASE;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; U.putOrderedObject(a, j, task);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; U.putOrderedInt(q, QTOP, s + 1);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; U.putIntVolatile(q, QLOCK, 0);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (n <= 1)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; signalWork(ws, q);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; U.compareAndSwapInt(q, QLOCK, 1, 0);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; externalSubmit(task);&nbsp; &nbsp; }我不确定你的意思是什么:这是否取决于调用 pool.execute(task) 或 pool.invoke(task) 的线程是外部线程还是此 fork-join 池中的线程?
随时随地看视频慕课网APP

相关分类

Java
我要回答