猿问

Kafka 流聚合抛出异常

这是我的 Kafka 流代码,它使用滑动窗口对时间窗口中的所有整数数据求和。


public class KafkaWindowingLIS {

    public static void main(String[] args) {

        Properties config = new Properties();

        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafkahost:9092");

        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-windowing-lis");

        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");


        Integer uid = 1;

        long tenSeconds = 1000 * 10;


        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> dataStream = builder.stream("kafka-windowing-lis");

        KStream<Integer, Integer> integerKStream = dataStream

                .filter((key, val) -> {               //Filter only numbers from Stream

                    try {

                        Integer.parseInt(val);

                        return true;

                    } catch (NumberFormatException e) {

                        return false;

                    }

                })

                .map((key, val) -> new KeyValue<Integer, Integer>(uid, Integer.parseInt(val)));


        TimeWindowedKStream<Integer, Integer> timeWindowedKStream = integerKStream

                .groupBy((k, v) -> k, Serialized.with(Serdes.Integer(), Serdes.Integer()))     //Killed my time

                .windowedBy(TimeWindows.of(tenSeconds));


        timeWindowedKStream.aggregate(

                () -> 0,

                (key, value, aggregate) -> value + aggregate)

                .toStream().print(Printed.toSysOut());



        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config);

//        kafkaStreams.cleanUp();

        kafkaStreams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));

    }

}


天涯尽头无女友
浏览 105回答 1
1回答

凤凰求蛊

因为您使用aggregate(),所以您需要通过显式设置输出值 serde&nbsp;aggregate(..., Materialized.with(...))。输出值类型可能与输入值类型不同,因此输入值 serde无法重用。(由于 Java 类型擦除,Kafka Streams 不知道类型实际上没有改变......)因此,Kafka Streams 从配置中回退到默认的 serde。作为替代方案,您可以使用reduce()而不是aggregate解决问题。输出类型reduce()与输入类型相同,因此输入值 serde 可以用作输出值。
随时随地看视频慕课网APP

相关分类

Java
我要回答