简而言之,我是一名开发人员,试图使用 Spark 将数据从一个系统移动到另一个系统。一个系统中的原始数据以经过处理、汇总的形式进入一个本土的分析系统。
我对 Spark 非常陌生——我的知识仅限于我在过去一两周内能够挖掘和试验的内容。
我想象的是;使用 Spark 监视来自 Kafka 的事件作为触发器。捕获消费者事件上的实体/数据,并使用它来告诉我分析系统中需要更新的内容。然后,我将对原始 Cassandra 数据运行相关的 Spark 查询,并将结果写入分析端的不同表中,仪表板指标将其称为数据源。
我有一个简单的 Kafka 结构化流查询工作。虽然我可以看到消耗的对象被输出到控制台,但当消费者事件发生时,我无法检索 Kafka 记录:
try {
SparkSession spark = SparkSession
.builder()
.master(this.sparkMasterAddress)
.appName("StreamingTest2")
.getOrCreate();
//THIS -> None of these events seem to give me the data consumed?
//...thinking I'd trigger the Cassandra write from here?
spark.streams().addListener(new StreamingQueryListener() {
@Override
public void onQueryStarted(QueryStartedEvent queryStarted) {
System.out.println("Query started: " + queryStarted.id());
}
@Override
public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
System.out.println("Query terminated: " + queryTerminated.id());
}
@Override
public void onQueryProgress(QueryProgressEvent queryProgress) {
System.out.println("Query made progress: " + queryProgress.progress());
}
});
Dataset<Row> reader = spark
.readStream()
.format("kafka")
.option("startingOffsets", "latest")
.option("kafka.bootstrap.servers", "...etc...")
.option("subscribe", "my_topic")
.load();
Dataset<String> lines = reader
.selectExpr("cast(value as string)")
.as(Encoders.STRING());
StreamingQuery query = lines
.writeStream()
.format("console")
.start();
query.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
}
我的想法是;使用前者触发后者,将此东西捆绑为 Spark 应用程序/包/任何东西,然后将其部署到 Spark 中。那时,我希望它不断地将更新推送到指标表。
这会是我需要的可行、可扩展、合理的解决方案吗?我在正确的道路上吗?如果以某种方式更容易或更好,我不反对使用 Scala。
宝慕林4294392
相关分类