猿问

生产者-消费者。消费者等待所有生产者完成,毒丸

我有两个生产者和一个消费者:


public class Main {


    public static void main(String[] args) throws InterruptedException {

        final BlockingQueue<Integer> integersQueue = new ArrayBlockingQueue<>(20);



        final Thread producer = new Thread(() -> {

            for (int i = 0; i < 10; i++) {

                try {

                    integersQueue.put(i);

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

            }

        });



        final Thread thread = new Thread(() -> {

            while (integersQueue.size() > 0) { //Wait while all producers work

                try {

                    System.out.println("GET: " + integersQueue.take());

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

            }

        });



        Thread thread1 = new Thread(producer);

        Thread thread2 = new Thread(producer);

        thread1.start();

        thread2.start();


        Thread.sleep(5000);


        thread.start();

    }

}

如果所有生产者都完成了,我正试图找到一种方法来阻止消费者。有多个生产者,但只有一个消费者。我需要一些毒丸,但如何从不同的生产商指定它?我发现了这个:https : //docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html但我不明白如何应用它?


POPMUISE
浏览 140回答 2
2回答

饮歌长啸

有人必须知道有多少生产者。如果消费者知道,那么每个生产者在完成后发送一个毒丸,消费者计算毒丸,当计数等于生产者数量时结束。如果生产者知道,用一个AtomicInteger计数,最后一个生产者送毒丸。如果只main知道,即它是“控制器”,那么它需要等待所有生产者线程结束,使用join(),然后main发送毒丸。

胡说叔叔

我如何处理毒丸的类型安全变体:public sealed interface BaseMessage {&nbsp; &nbsp; final class ValidMessage<T> implements BaseMessage {&nbsp; &nbsp; &nbsp; &nbsp; @Nonnull&nbsp; &nbsp; &nbsp; &nbsp; private final T value;&nbsp; &nbsp; &nbsp; &nbsp; public ValidMessage(@Nonnull T value) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; this.value = value;&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; @Nonnull&nbsp; &nbsp; &nbsp; &nbsp; public T getValue() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return value;&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; public boolean equals(Object o) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (this == o) return true;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (o == null || getClass() != o.getClass()) return false;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ValidMessage<?> that = (ValidMessage<?>) o;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return value.equals(that.value);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; public int hashCode() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return Objects.hash(value);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; public String toString() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return "ValidMessage{value=%s}".formatted(value);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; final class PoisonedMessage implements BaseMessage {&nbsp; &nbsp; &nbsp; &nbsp; public static final PoisonedMessage INSTANCE = new PoisonedMessage();&nbsp; &nbsp; &nbsp; &nbsp; private PoisonedMessage() {&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; public String toString() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return "PoisonedMessage{}";&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}public class Producer implements Callable<Void> {&nbsp; &nbsp; @Nonnull&nbsp; &nbsp; private final BlockingQueue<BaseMessage> messages;&nbsp; &nbsp; Producer(@Nonnull BlockingQueue<BaseMessage> messages) {&nbsp; &nbsp; &nbsp; &nbsp; this.messages = messages;&nbsp; &nbsp; }&nbsp; &nbsp; @Override&nbsp; &nbsp; public Void call() throws Exception {&nbsp; &nbsp; &nbsp; &nbsp; messages.put(new BaseMessage.ValidMessage<>(1));&nbsp; &nbsp; &nbsp; &nbsp; messages.put(new BaseMessage.ValidMessage<>(2));&nbsp; &nbsp; &nbsp; &nbsp; messages.put(new BaseMessage.ValidMessage<>(3));&nbsp; &nbsp; &nbsp; &nbsp; messages.put(BaseMessage.PoisonedMessage.INSTANCE);&nbsp; &nbsp; &nbsp; &nbsp; return null;&nbsp; &nbsp; }}public class Consumer implements Callable<Void> {&nbsp; &nbsp; @Nonnull&nbsp; &nbsp; private final BlockingQueue<BaseMessage> messages;&nbsp; &nbsp; private final int maxPoisons;&nbsp; &nbsp; public Consumer(@Nonnull BlockingQueue<BaseMessage> messages, int maxPoisons) {&nbsp; &nbsp; &nbsp; &nbsp; this.messages = messages;&nbsp; &nbsp; &nbsp; &nbsp; this.maxPoisons = maxPoisons;&nbsp; &nbsp; }&nbsp; &nbsp; @Override&nbsp; &nbsp; public Void call() throws Exception {&nbsp; &nbsp; &nbsp; &nbsp; int poisonsReceived = 0;&nbsp; &nbsp; &nbsp; &nbsp; while (poisonsReceived < maxPoisons && !Thread.currentThread().isInterrupted()) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; BaseMessage message = messages.take();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (message instanceof BaseMessage.ValidMessage<?> vm) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Integer value = (Integer) vm.getValue();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println(value);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } else if (message instanceof BaseMessage.PoisonedMessage) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ++poisonsReceived;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; throw new IllegalArgumentException("Invalid BaseMessage type: " + message);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; return null;&nbsp; &nbsp; }}
随时随地看视频慕课网APP

相关分类

Java
我要回答