我尝试使用 kafka 生产者发送 java 字符串消息。字符串消息是从Java Spark JavaPairDStream中提取的。
JavaPairDStream<String, String> processedJavaPairStream = inputStream.mapToPair
(record-> new Tuple2<>(record.key(), record.value())).mapValues(message -> message.replace('>', '#'));
String outTopics = "outputTopic";
String broker = "localhost:9092";
Properties properties = new Properties();
properties.put("bootstrap.servers", broker);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(properties);
processedJavaPairStream.foreachRDD(rdd -> rdd.foreach(tuple2 -> {
ProducerRecord<String, String> message = new ProducerRecord<String, String>(outTopics, tuple2._1, tuple2._2);
System.out.println(message.key() + " : " + message.value()); //(1)
producer.send(message).get(); //(2)
}));
(1) 行正确打印消息字符串。但是当我使用 kafka 生产者发送这些消息(如(2)行)时,它会抛出如下异常,
我无法理解这个异常。我确认 kafaka 生产者消息是<String,String>
通过第 (1) 行输入的。但为什么第(2)行会抛出这个异常呢?我是否错过任何流程?
动漫人物
相关分类