Apache Spark - 在流式事件上捕获 Kafka 数据以触发工作流

简而言之,我是一名开发人员,试图使用 Spark 将数据从一个系统移动到另一个系统。一个系统中的原始数据以经过处理、汇总的形式进入一个本土的分析系统。


我对 Spark 非常陌生——我的知识仅限于我在过去一两周内能够挖掘和试验的内容。


我想象的是;使用 Spark 监视来自 Kafka 的事件作为触发器。捕获消费者事件上的实体/数据,并使用它来告诉我分析系统中需要更新的内容。然后,我将对原始 Cassandra 数据运行相关的 Spark 查询,并将结果写入分析端的不同表中,仪表板指标将其称为数据源。


我有一个简单的 Kafka 结构化流查询工作。虽然我可以看到消耗的对象被输出到控制台,但当消费者事件发生时,我无法检索 Kafka 记录:


try {

    SparkSession spark = SparkSession

        .builder()

        .master(this.sparkMasterAddress)

        .appName("StreamingTest2")

        .getOrCreate();


    //THIS -> None of these events seem to give me the data consumed?

    //...thinking I'd trigger the Cassandra write from here?

    spark.streams().addListener(new StreamingQueryListener() {

        @Override

        public void onQueryStarted(QueryStartedEvent queryStarted) {

            System.out.println("Query started: " + queryStarted.id());

        }

        @Override

        public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {

            System.out.println("Query terminated: " + queryTerminated.id());

        }

        @Override

        public void onQueryProgress(QueryProgressEvent queryProgress) {

            System.out.println("Query made progress: " + queryProgress.progress());

        }

    });


    Dataset<Row> reader = spark

        .readStream()

        .format("kafka")

        .option("startingOffsets", "latest")

        .option("kafka.bootstrap.servers", "...etc...")

        .option("subscribe", "my_topic")

        .load();


    Dataset<String> lines = reader

        .selectExpr("cast(value as string)")

        .as(Encoders.STRING());


    StreamingQuery query = lines

        .writeStream()

        .format("console")

        .start();

    query.awaitTermination();

} catch (Exception e) {

    e.printStackTrace();

}


我的想法是;使用前者触发后者,将此东西捆绑为 Spark 应用程序/包/任何东西,然后将其部署到 Spark 中。那时,我希望它不断地将更新推送到指标表。


这会是我需要的可行、可扩展、合理的解决方案吗?我在正确的道路上吗?如果以某种方式更容易或更好,我不反对使用 Scala。


泛舟湖上清波郎朗
浏览 138回答 1
1回答

宝慕林4294392

知道了。了解了 ForeachWriter。效果很好:&nbsp; &nbsp; &nbsp; &nbsp; StreamingQuery query = lines&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .writeStream()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .format("foreach")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .foreach(new ForeachWriter<String>() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public void process(String value) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("process() value = " + value);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public void close(Throwable errorOrNull) {}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public boolean open(long partitionId, long version) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return true;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; })&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .start();&nbsp;
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java