KStream<String, Long> longs = builder.stream( Serdes.String(), Serdes.Long(), "longs"); // In one ktable, count by key, on a five second tumbling window. KTable<Windowed<String>, Long> longCounts = longs.countByKey(TimeWindows.of("longCounts", 5000L)); // Finally, sink to the long-avgs topic. longCounts.toStream((wk, v) -> wk.key()) .to("long-counts");
慕哥9229398