我从 kafka 来源获取日志,并将其放入 spark 中。
保存在我的 hadoop_path 中的日志格式如下所示
{"value":"{\"Name\":\"Amy\",\"Age\":\"22\"}"}
{"value":"{\"Name\":\"Jin\",\"Age\":\"26\"}"}
但是,我想让它像
{\"Name\":\"Amy\",\"Age\":\"22\"}
{\"Name\":\"Jin\",\"Age\":\"26\"}
任何一种解决方案都会很棒。(使用纯 Java 代码、Spark SQL 或 Kafka)
SparkSession spark = SparkSession.builder()
.master("local")
.appName("MYApp").getOrCreate();
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", Kafka_source)
.option("subscribe", Kafka_topic)
.option("startingOffsets", "earliest")
.option("failOnDataLoss",false)
.load();
Dataset<Row> dg = df.selectExpr("CAST(value AS STRING)");
StreamingQuery queryone = dg.writeStream()
.format("json")
.outputMode("append")
.option("checkpointLocation",Hadoop_path)
.option("path",Hadoop_path)
.start();
哔哔one
白猪掌柜的
拉丁的传说
相关分类