猿问

将 JSON 保存到 HDFS 的结构化流

我的 Structured Spark Streaming 程序是从 Kafka 读取 JSON 数据并以 JSON 格式写入 HDFS。我能够将 JSON 保存到 HDFS,但它保存了 JSON 字符串:


 "jsontostructs(CAST(value AS STRING))"

key as below: {"jsontostructs(CAST(value AS STRING))":{"age":42,"name":"John"}}.

如何只保存


{"age":42,"name":"John"}?





StructType schema = kafkaPrimerRow.schema();


//Read json from kafka. JSON is: {"age":42,"name":"John"}

Dataset<Row> df = spark

                    .readStream()

                    .format("kafka")

                    .option("kafka.bootstrap.servers", input_bootstrap_server)

                    .option("subscribe", topics[0])

                    .load();





    //Save Stream to HDFS

    StreamingQuery ds = df             

.select(functions.from_json(col("value").cast(DataTypes.StringType),schema)) 

.writeStream()


.format("json")

.outputMode(OutputMode.Append())

.option("path", destPath)

.option("checkpointLocation", checkpoint)

.start();


繁星淼淼
浏览 160回答 1
1回答

BIG阳

以下 .select("data.*") 达到了目的。StreamingQuery&nbsp;ds&nbsp;=&nbsp;df &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.select(functions.from_json(col("value").cast(DataTypes.StringType),schema).as("data")) &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.select("data.*") &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.writeStream() &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.format("json") &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.outputMode(OutputMode.Append()) &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.option("path",&nbsp;destPath) &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.option("checkpointLocation",&nbsp;checkpoint) &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.start();
随时随地看视频慕课网APP

相关分类

Java
我要回答