引起:java.io.NotSerializedException:org.apache.kafka

我尝试使用 kafka 生产者发送 java 字符串消息。字符串消息是从Java Spark JavaPairDStream中提取的。


JavaPairDStream<String, String> processedJavaPairStream = inputStream.mapToPair

                 (record-> new Tuple2<>(record.key(), record.value())).mapValues(message -> message.replace('>', '#'));


String outTopics = "outputTopic";

String broker = "localhost:9092";


Properties properties = new Properties();

properties.put("bootstrap.servers", broker);

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");


Producer<String, String> producer = new KafkaProducer<String, String>(properties);

processedJavaPairStream.foreachRDD(rdd -> rdd.foreach(tuple2 -> {


    ProducerRecord<String, String> message = new ProducerRecord<String, String>(outTopics, tuple2._1, tuple2._2);


    System.out.println(message.key() + " : " + message.value()); //(1)


    producer.send(message).get(); //(2)

}));

(1) 行正确打印消息字符串。但是当我使用 kafka 生产者发送这些消息(如(2)行)时,它会抛出如下异常,

我无法理解这个异常。我确认 kafaka 生产者消息是<String,String>通过第 (1) 行输入的。但为什么第(2)行会抛出这个异常呢?我是否错过任何流程?



慕沐林林
浏览 99回答 1
1回答

动漫人物

您需要为每个 RDD 创建生产者。RDD 分布在多个执行器上,Producer 对象无法序列化以在它们之间共享或者,查看结构化流的文档,您可以简单地执行此操作以写入主题;无需自己创建和发送记录stream.writeStream().format("kafka")...请注意,如果目标只是将一个主题映射到另一个主题,那么Kafka Streams API比 Spark 更简单且开销更少
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java