如何检查我使用 Java 在 Spark-structured-streaming 中从 Kafka

我正在尝试从 kafka 获取数据到 spark-structured-streaming,但我无法检查我是否做得很好。我想在控制台上打印来自 kafka 的数据,但控制台上什么也没有。可能是因为来自卡夫卡的数据量很大,但我不知道。


我正在使用 Windows 10。我检查了 kafka 的端口是由“netstat -an | findstr TARGET_IP”建立的。TARGET_IP 表示kafka生产者的IP。根据以上结果的 PID,我检查了“任务列表/FI“PID eq 5406””。5406是java.exe的PID,PID 5406占用的内存在不断增加。


public static void main( String[] args ) {

    SparkSession spark = SparkSession.builder()

            .master("local")

            .appName("App").getOrCreate();

    Dataset<Row> df = spark

            .readStream()

            .format("kafka")

            .option("kafka.bootstrap.servers", "TARGET_IP:TARGET_PORT")

            .option("subscribe", "TARGET_TOPIC")

            .option("startingOffsets", "earliest")

            .load();

    df.printSchema();

    StreamingQuery queryone = df.writeStream().trigger(Trigger.ProcessingTime(1000)).format("console").start();

    try {

        queryone.awaitTermination();

    } catch (StreamingQueryException e) {

        e.printStackTrace();

    }

}


森林海
浏览 81回答 1
1回答

德玛西亚99

我测试了你的代码,它可以打印。首先,您应该检查您的kafka 主题,确保其中有消息。然后检查你的 spark 应用程序,确保它可以连接你的 kafka 代理。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java