猿问

每个异步线程都运行一个无限循环

我正在实施一个包含不同任务的程序,并且都实现了 Runnable。例如,有一个任务在数据库上工作并将一些元组发送到同步共享内存,随后,有另一个线程检查共享内存并将消息发送到队列。此外,这两个线程在无限 while 循环中进行迭代。


我已经使用了 fixedThreadPool 来执行这些线程。


问题在于,有时程序控制权仍保留在第一个正在运行的线程中,而第二个线程永远没有机会进入其运行状态。


这是我的类似示例代码:


public class A implements Runnable {

    @Override

    public void run() {

        while(true) {

        //do something

        }

    }

}

public class B implements Runnable {

    @Override

    public void run() {

        while(true) {

        //do something

        }

    }

}

public class Driver {

    public static void main(String[] args) {

        ExecutorService executorService = Executors.newFixedThreadPool(2);

        A a = new A();

        executorService.execute(a);

        B b = new B();

        executorService.execute(b);

    }

}

我还做了一些棘手的事情,让第一个线程在运行一小段时间后休眠一次。结果,它使第二个线程找到了运行的机会。但是对于这个问题,有没有什么格式良好的解决方案呢?您认为问题出在哪里?


交互式爱情
浏览 195回答 2
2回答

汪汪一只猫

这是生产者/消费者模式的一个很好的例子。有很多方法可以实现这一点。这是使用wait/notify模式的一个简单实现。public class A implements Runnable {&nbsp; &nbsp; private Queue<Integer> queue;&nbsp; &nbsp; private int maxSize;&nbsp; &nbsp; public A(Queue<Integer> queue, int maxSize) {&nbsp; &nbsp; &nbsp; &nbsp; super();&nbsp; &nbsp; &nbsp; &nbsp; this.queue = queue;&nbsp; &nbsp; &nbsp; &nbsp; this.maxSize = maxSize;&nbsp; &nbsp; }&nbsp; &nbsp; @Override&nbsp; &nbsp; public void run() {&nbsp; &nbsp; &nbsp; &nbsp; while (true) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; synchronized (queue) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; while (queue.size() == maxSize) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; try {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("Queue is full, " + "Producer thread waiting for "&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; + "consumer to take something from queue");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; queue.wait();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } catch (Exception ex) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ex.printStackTrace();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Random random = new Random();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; int i = random.nextInt();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("Producing value : " + i);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; queue.add(i);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; queue.notifyAll();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}public class B implements Runnable {&nbsp; &nbsp; private Queue<Integer> queue;&nbsp; &nbsp; public B(Queue<Integer> queue) {&nbsp; &nbsp; &nbsp; &nbsp; super();&nbsp; &nbsp; &nbsp; &nbsp; this.queue = queue;&nbsp; &nbsp; }&nbsp; &nbsp; @Override&nbsp; &nbsp; public void run() {&nbsp; &nbsp; &nbsp; &nbsp; while (true) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; synchronized (queue) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; while (queue.isEmpty()) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("Queue is empty," + "Consumer thread is waiting"&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; + " for producer thread to put something in queue");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; try {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; queue.wait();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } catch (Exception ex) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ex.printStackTrace();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("Consuming value : " + queue.remove());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; queue.notifyAll();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}这里很热,我们设置了一些东西。public class ProducerConsumerTest {&nbsp; &nbsp; public static void main(String[] args) {&nbsp; &nbsp; &nbsp; &nbsp; Queue<Integer> buffer = new LinkedList<>();&nbsp; &nbsp; &nbsp; &nbsp; int maxSize = 10;&nbsp; &nbsp; &nbsp; &nbsp; Thread producer = new Thread(new A(buffer, maxSize));&nbsp; &nbsp; &nbsp; &nbsp; Thread consumer = new Thread(new B(buffer));&nbsp; &nbsp; &nbsp; &nbsp; ExecutorService executorService = Executors.newFixedThreadPool(2);&nbsp; &nbsp; &nbsp; &nbsp; executorService.submit(producer);&nbsp; &nbsp; &nbsp; &nbsp; executorService.submit(consumer);&nbsp; &nbsp; }}在这种情况下,它Queue充当共享内存。您可以将其替换为适合您需要的任何其他数据结构。这里的诀窍是你必须仔细地在线程之间进行协调。这就是你上面的实现所缺乏的。

天涯尽头无女友

我知道这听起来可能很激进,但是异步代码库的非框架部分应该尽量避免while(true)手动编码循环,而是将其建模为(可能是自我重新调度的)回调到执行程序中这允许更公平的资源利用和最重要的每次迭代监控工具。当代码不等待临界(或只是原型时),最简单的方法是做到这一点的Executors,可能CompletableFuture秒。class Participant implements Runnable {&nbsp; &nbsp; final Executor context;&nbsp; &nbsp; @Override&nbsp; &nbsp; public void run() {&nbsp; &nbsp; &nbsp; &nbsp; final Item work = workSource.next();&nbsp; &nbsp; &nbsp; &nbsp; if (workSource.hasNext()) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; context.execute(this::run);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}
随时随地看视频慕课网APP

相关分类

Java
我要回答