Java/Scala Kafka Producer 不向主题发送消息

我在将序列化的 XML 发送到我的 Kafka 主题时遇到问题。每当我运行我的代码时,我都没有收到任何异常或错误消息,但我仍然看不到 Kafka 主题中的任何消息。


我的 Kafka-Producer 设置是:


def WartungsdbKafkaConnector(args: Array[String]): Unit = {  

    val xmlFile = args(0)

    val record = getRecord(xmlFile)

    val kafkaProducer = getKafkaProducer

    kafkaProducer.send(record)

}


protected def getRecord(xmlFile: String): ProducerRecord[String, String] = {

    val lines = scala.io.Source.fromFile(xmlFile).mkString

    val xml = scala.xml.XML.loadString(lines)

    val paramPress = xml \ "PARAMETER" \ "PRESS"

    val databaseId = allCatch.opt {paramPress.\@("NUMBER")}

    val key = databaseId.get

    val topic = args(1)

    new ProducerRecord(topic, key, lines)

}



protected def getKafkaProducer: KafkaProducer[String, String] = {  

    val props = new Properties

    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 

    "ec-x.eu-west-1.compute.amazonaws.com:9092," +

    "ec2-x.eu-west-1.compute.amazonaws.com:9092," +

    "ec2-x.eu-west-1.compute.amazonaws.com:9092")

    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")

    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)

    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)

    props.put(ProducerConfig.LINGER_MS_CONFIG, "100")

    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy")

    props.put(ProducerConfig.RETRIES_CONFIG, "20")

    props.put(ProducerConfig.ACKS_CONFIG, "all")


    new KafkaProducer[String, String](props)

}


GCT1015
浏览 182回答 1
1回答

紫衣仙女

您不会刷新、等待或关闭生产者,因此应用程序只是停止而不发送数据。生产者在可配置的时间内批量处理数据和消息,以减少实际到达代理的发送请求数量。尝试kafkaProducer.send(record)  // optionally call get() on this to capture the result and potential errors kafkaProducer.flush() kafkaProducer.close()最重要的是,永远不要忘记关闭生产者(或消费者)
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java