跃然一笑
我已经根据您的情况调整了这个问题的答案。自定义 Spliterator 会将流“拆分”为多个由不同属性收集的流:@SafeVarargspublic static <T> long streamForked(Stream<T> source, Consumer<Stream<T>>... consumers){ return StreamSupport.stream(new ForkingSpliterator<>(source, consumers), false).count();}public static class ForkingSpliterator<T> extends AbstractSpliterator<T>{ private Spliterator<T> sourceSpliterator; private List<BlockingQueue<T>> queues = new ArrayList<>(); private boolean sourceDone; @SafeVarargs private ForkingSpliterator(Stream<T> source, Consumer<Stream<T>>... consumers) { super(Long.MAX_VALUE, 0); sourceSpliterator = source.spliterator(); for (Consumer<Stream<T>> fork : consumers) { LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>(); queues.add(queue); new Thread(() -> fork.accept(StreamSupport.stream(new ForkedConsumer(queue), false))).start(); } } @Override public boolean tryAdvance(Consumer<? super T> action) { sourceDone = !sourceSpliterator.tryAdvance(t -> queues.forEach(queue -> queue.offer(t))); return !sourceDone; } private class ForkedConsumer extends AbstractSpliterator<T> { private BlockingQueue<T> queue; private ForkedConsumer(BlockingQueue<T> queue) { super(Long.MAX_VALUE, 0); this.queue = queue; } @Override public boolean tryAdvance(Consumer<? super T> action) { while (queue.peek() == null) { if (sourceDone) { // element is null, and there won't be no more, so "terminate" this sub stream return false; } } // push to consumer pipeline action.accept(queue.poll()); return true; } }}您可以按如下方式使用它:streamForked(Stream.of(new Row("content1", "client1", "location1", 1), new Row("content2", "client1", "location1", 2), new Row("content1", "client1", "location2", 3), new Row("content2", "client2", "location2", 4), new Row("content1", "client2", "location2", 5)), rows -> System.out.println(rows.collect(Collectors.groupingBy(Row::getClient, Collectors.groupingBy(Row::getContent, Collectors.summingInt(Row::getConsumption))))), rows -> System.out.println(rows.collect(Collectors.groupingBy(Row::getClient, Collectors.groupingBy(Row::getLocation, Collectors.summingInt(Row::getConsumption))))), rows -> System.out.println(rows.collect(Collectors.groupingBy(Row::getContent, Collectors.groupingBy(Row::getLocation, Collectors.summingInt(Row::getConsumption))))));// Output// {client2={location2=9}, client1={location1=3, location2=3}}// {client2={content2=4, content1=5}, client1={content2=2, content1=4}}// {content2={location1=2, location2=4}, content1={location1=1, location2=8}}请注意,您几乎可以对流的副本做任何想做的事情。根据您的示例,我使用堆叠groupingBy收集器按两个属性对行进行分组,然后总结 int 属性。所以结果将是一个Map<String, Map<String, Integer>>. 但您也可以将其用于其他场景:rows -> System.out.println(rows.count())rows -> rows.forEach(row -> System.out.println(row))rows -> System.out.println(rows.anyMatch(row -> row.getConsumption() > 3))
慕桂英546537
通常收集到标准 API 以外的任何东西都可以通过自定义Collector. 在您的情况下,一次收集 3 个列表(只是一个编译的小例子,因为您也无法共享您的代码):private static <T> Collector<T, ?, List<List<T>>> to3Lists() { class Acc { List<T> left = new ArrayList<>(); List<T> middle = new ArrayList<>(); List<T> right = new ArrayList<>(); List<List<T>> list = Arrays.asList(left, middle, right); void add(T elem) { // obviously do whatever you want here left.add(elem); middle.add(elem); right.add(elem); } Acc merge(Acc other) { left.addAll(other.left); middle.addAll(other.middle); right.addAll(other.right); return this; } public List<List<T>> finisher() { return list; } } return Collector.of(Acc::new, Acc::add, Acc::merge, Acc::finisher);}并通过以下方式使用它:Stream.of(1, 2, 3) .collect(to3Lists());显然,这个自定义收集器没有做任何有用的事情,而只是一个如何使用它的示例。