Java 8 一个流到多个映射

假设我有不适合内存的巨大网络服务器日志文件。我需要将此文件流式传输到 mapreduce 方法并保存到数据库。我使用 Java 8 流 api 执行此操作。例如,我在 mapreduce 过程之后得到一个列表,例如,按客户端消耗,按 ip 消耗,按内容消耗。但是,我的需求不像我的例子中给出的那样。由于我无法共享代码,我只想给出基本示例。

通过 Java 8 Stream Api,我想只读取一次文件,同时获取 3 个列表,而我正在流式传输文件,并行或顺序。但平行会很好。有没有办法做到这一点?


波斯汪
浏览 164回答 2
2回答

跃然一笑

我已经根据您的情况调整了这个问题的答案。自定义 Spliterator 会将流“拆分”为多个由不同属性收集的流:@SafeVarargspublic static <T> long streamForked(Stream<T> source, Consumer<Stream<T>>... consumers){&nbsp; &nbsp; return StreamSupport.stream(new ForkingSpliterator<>(source, consumers), false).count();}public static class ForkingSpliterator<T>&nbsp; &nbsp; extends AbstractSpliterator<T>{&nbsp; &nbsp; private Spliterator<T>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;sourceSpliterator;&nbsp; &nbsp; private List<BlockingQueue<T>> queues = new ArrayList<>();&nbsp; &nbsp; private boolean&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; sourceDone;&nbsp; &nbsp; @SafeVarargs&nbsp; &nbsp; private ForkingSpliterator(Stream<T> source, Consumer<Stream<T>>... consumers)&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; super(Long.MAX_VALUE, 0);&nbsp; &nbsp; &nbsp; &nbsp; sourceSpliterator = source.spliterator();&nbsp; &nbsp; &nbsp; &nbsp; for (Consumer<Stream<T>> fork : consumers)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; queues.add(queue);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Thread(() -> fork.accept(StreamSupport.stream(new ForkedConsumer(queue), false))).start();&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; @Override&nbsp; &nbsp; public boolean tryAdvance(Consumer<? super T> action)&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; sourceDone = !sourceSpliterator.tryAdvance(t -> queues.forEach(queue -> queue.offer(t)));&nbsp; &nbsp; &nbsp; &nbsp; return !sourceDone;&nbsp; &nbsp; }&nbsp; &nbsp; private class ForkedConsumer&nbsp; &nbsp; &nbsp; &nbsp; extends AbstractSpliterator<T>&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; private BlockingQueue<T> queue;&nbsp; &nbsp; &nbsp; &nbsp; private ForkedConsumer(BlockingQueue<T> queue)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; super(Long.MAX_VALUE, 0);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; this.queue = queue;&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; public boolean tryAdvance(Consumer<? super T> action)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; while (queue.peek() == null)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (sourceDone)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // element is null, and there won't be no more, so "terminate" this sub stream&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return false;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // push to consumer pipeline&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; action.accept(queue.poll());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return true;&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}您可以按如下方式使用它:streamForked(Stream.of(new Row("content1", "client1", "location1", 1),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;new Row("content2", "client1", "location1", 2),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;new Row("content1", "client1", "location2", 3),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;new Row("content2", "client2", "location2", 4),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;new Row("content1", "client2", "location2", 5)),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;rows -> System.out.println(rows.collect(Collectors.groupingBy(Row::getClient,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Collectors.groupingBy(Row::getContent,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Collectors.summingInt(Row::getConsumption))))),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;rows -> System.out.println(rows.collect(Collectors.groupingBy(Row::getClient,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Collectors.groupingBy(Row::getLocation,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Collectors.summingInt(Row::getConsumption))))),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;rows -> System.out.println(rows.collect(Collectors.groupingBy(Row::getContent,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Collectors.groupingBy(Row::getLocation,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Collectors.summingInt(Row::getConsumption))))));// Output// {client2={location2=9}, client1={location1=3, location2=3}}// {client2={content2=4, content1=5}, client1={content2=2, content1=4}}// {content2={location1=2, location2=4}, content1={location1=1, location2=8}}请注意,您几乎可以对流的副本做任何想做的事情。根据您的示例,我使用堆叠groupingBy收集器按两个属性对行进行分组,然后总结 int 属性。所以结果将是一个Map<String, Map<String, Integer>>. 但您也可以将其用于其他场景:rows -> System.out.println(rows.count())rows -> rows.forEach(row -> System.out.println(row))rows -> System.out.println(rows.anyMatch(row -> row.getConsumption() > 3))

慕桂英546537

通常收集到标准 API 以外的任何东西都可以通过自定义Collector. 在您的情况下,一次收集 3 个列表(只是一个编译的小例子,因为您也无法共享您的代码):private static <T> Collector<T, ?, List<List<T>>> to3Lists() {&nbsp; &nbsp; class Acc {&nbsp; &nbsp; &nbsp; &nbsp; List<T> left = new ArrayList<>();&nbsp; &nbsp; &nbsp; &nbsp; List<T> middle = new ArrayList<>();&nbsp; &nbsp; &nbsp; &nbsp; List<T> right = new ArrayList<>();&nbsp; &nbsp; &nbsp; &nbsp; List<List<T>> list = Arrays.asList(left, middle, right);&nbsp; &nbsp; &nbsp; &nbsp; void add(T elem) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // obviously do whatever you want here&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; left.add(elem);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; middle.add(elem);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; right.add(elem);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; Acc merge(Acc other) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; left.addAll(other.left);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; middle.addAll(other.middle);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; right.addAll(other.right);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return this;&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; public List<List<T>> finisher() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return list;&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; return Collector.of(Acc::new, Acc::add, Acc::merge, Acc::finisher);}并通过以下方式使用它:Stream.of(1, 2, 3)&nbsp; &nbsp; &nbsp; .collect(to3Lists());显然,这个自定义收集器没有做任何有用的事情,而只是一个如何使用它的示例。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java