这是我的 Kafka 流代码,它使用滑动窗口对时间窗口中的所有整数数据求和。
public class KafkaWindowingLIS {
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafkahost:9092");
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-windowing-lis");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Integer uid = 1;
long tenSeconds = 1000 * 10;
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> dataStream = builder.stream("kafka-windowing-lis");
KStream<Integer, Integer> integerKStream = dataStream
.filter((key, val) -> { //Filter only numbers from Stream
try {
Integer.parseInt(val);
return true;
} catch (NumberFormatException e) {
return false;
}
})
.map((key, val) -> new KeyValue<Integer, Integer>(uid, Integer.parseInt(val)));
TimeWindowedKStream<Integer, Integer> timeWindowedKStream = integerKStream
.groupBy((k, v) -> k, Serialized.with(Serdes.Integer(), Serdes.Integer())) //Killed my time
.windowedBy(TimeWindows.of(tenSeconds));
timeWindowedKStream.aggregate(
() -> 0,
(key, value, aggregate) -> value + aggregate)
.toStream().print(Printed.toSysOut());
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config);
// kafkaStreams.cleanUp();
kafkaStreams.start();
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
}
}
凤凰求蛊
相关分类