猿问

单生产者多消费者模型中,如果一个消费者出现异常,怎么通知其他消费者停止处理任务?

如题,在处理海量数据的时候,生产者生产数据的速度远大于消费者,故为了平衡处理能力,增加多个消费者进行处理,但是这里有个问题就是,如果这批数据一旦有那个消费者处理出现问题,其他消费者要停止处理,等待下一批数据,有什么办法能做到吗?

MM们
浏览 774回答 3
3回答

小唯快跑啊

生产者类DataProducerimport java.util.Observable; import java.util.Observer; import java.util.concurrent.BlockingQueue; public class DataProducer implements Observer, Runnable {     private BlockingQueue<String> blockingQueue;     private boolean handling = true;     public DataProducer(BlockingQueue<String> blockingQueue) {         this.blockingQueue = blockingQueue;     }     @Override     public void run() {         while (handling) {             try             {                 String str =  handleData();                 blockingQueue.offer(str);             } catch (Exception e) {                 e.printStackTrace();             }         }     }     private String handleData() {         return null;     }     @Override     public void update(Observable o, Object arg) {         if (arg.toString().equals("stopHandling")) {             System.out.println("stopHandling data");             handling = false;             blockingQueue.clear();         }     } }消费者类DataConsumerimport java.util.Observable; import java.util.concurrent.BlockingQueue; public class DataConsumer extends Observable implements Runnable {     private BlockingQueue<String> blockingQueue;     public DataConsumer(BlockingQueue<String> blockingDeque) {         this.blockingQueue = blockingDeque;     }     @Override     public void run() {         while (true) {             try {                 String str = blockingQueue.take();                 handleData(str);             } catch (Exception e) {                 notifyObservers("stopHandling");             }         }     }     private void handleData(String str) {     }     @Override     public void notifyObservers(Object arg) {         super.setChanged();         super.notifyObservers(arg);     } }主线程启动任务,测试程序import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedTransferQueue; public class TestData {     private static final ExecutorService executor = Executors.newFixedThreadPool(5);     public static void main(String[] args) {         BlockingQueue<String> blockingQueue = new LinkedTransferQueue<>();         DataProducer dataProducer = new DataProducer(blockingQueue);         executor.execute(dataProducer);         for (int i = 0; i < 4; i++) {             executor.execute(new DataConsumer(blockingQueue));         }     } }

温温酱

在传递给消费者的参数里加上状态,一但某个消费者处理出问题,就修改其状态标记问题,其它消费者检查到问题标记之后就不继续处理了(但是已经在处理过程中的仍然会继续,除非过程中多次检查状态)

大话西游666

可以在队列里面塞一个‘毒丸’ 元素 ,消费者检测到该对象就停止消费
随时随地看视频慕课网APP

相关分类

Java
我要回答