猿问

如何仅从 kafka 源获取值以激发?

我从 kafka 来源获取日志,并将其放入 spark 中。

保存在我的 hadoop_path 中的日志格式如下所示

{"value":"{\"Name\":\"Amy\",\"Age\":\"22\"}"}

{"value":"{\"Name\":\"Jin\",\"Age\":\"26\"}"}


但是,我想让它像

{\"Name\":\"Amy\",\"Age\":\"22\"}

{\"Name\":\"Jin\",\"Age\":\"26\"}


任何一种解决方案都会很棒。(使用纯 Java 代码、Spark SQL 或 Kafka)


        SparkSession spark = SparkSession.builder()

                .master("local")

                .appName("MYApp").getOrCreate();

        Dataset<Row> df = spark

                .readStream()

                .format("kafka")

                .option("kafka.bootstrap.servers", Kafka_source)

                .option("subscribe", Kafka_topic)

                .option("startingOffsets", "earliest")

                .option("failOnDataLoss",false)

                .load();

        Dataset<Row> dg = df.selectExpr("CAST(value AS STRING)");

        StreamingQuery queryone = dg.writeStream()

                .format("json")

                .outputMode("append")

                .option("checkpointLocation",Hadoop_path)

                .option("path",Hadoop_path)

                .start();


慕的地10843
浏览 76回答 3
3回答

哔哔one

我已经用 from_json 函数完成了!!&nbsp; &nbsp; &nbsp; &nbsp; SparkSession spark = SparkSession.builder()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .master("local")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .appName("MYApp").getOrCreate();&nbsp; &nbsp; &nbsp; &nbsp; Dataset<Row> df = spark&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .readStream()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .format("kafka")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .option("kafka.bootstrap.servers", Kafka_source)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .option("subscribe", Kafka_topic)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .option("startingOffsets", "earliest")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .option("failOnDataLoss",false)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .load();&nbsp; &nbsp; &nbsp; &nbsp; Dataset<Row> dg = df.selectExpr("CAST(value AS STRING)");&nbsp; &nbsp; &nbsp; &nbsp; Dataset<Row> dz = dg.select(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; from_json(dg.col("value"), DataTypes.createStructType(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new StructField[] {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; DataTypes.createStructField("Name", StringType,true)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; })).getField("Name").alias("Name")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ,from_json(dg.col("value"), DataTypes.createStructType(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new StructField[] {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; DataTypes.createStructField("Age", IntegerType,true)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; })).getField("Age").alias("Age")&nbsp; &nbsp; &nbsp; &nbsp; StreamingQuery queryone = dg.writeStream()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .format("json")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .outputMode("append")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .option("checkpointLocation",Hadoop_path)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .option("path",Hadoop_path)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .start();

白猪掌柜的

您可以使用 Spark 获得预期的结果,如下所示:SparkSession spark = SparkSession.builder()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .master("local")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .appName("MYApp").getOrCreate();Dataset<Row> df = spark&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .readStream()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .format("kafka")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .option("kafka.bootstrap.servers", Kafka_source)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .option("subscribe", Kafka_topic)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .option("startingOffsets", "earliest")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .option("failOnDataLoss",false)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .load();Dataset<Row> dg = df.selectExpr("CAST(value AS STRING)")&nbsp; &nbsp; &nbsp; &nbsp; .withColumn("Name", functions.json_tuple(functions.col("value"),"Name"))&nbsp; &nbsp; &nbsp; &nbsp; .withColumn("Age", functions.json_tuple(functions.col("value"),"Age"));StreamingQuery queryone = dg.writeStream()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .format("json")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .outputMode("append")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .option("checkpointLocation",Hadoop_path)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .option("path",Hadoop_path)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .start();基本上,您必须为值列中 json 字符串中的每个字段创建单独的列。

拉丁的传说

使用以下内容:Dataframe<Row> df = spark&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .readStream()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .format("kafka")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .option("kafka.bootstrap.servers", Kafka_source)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .option("subscribe", Kafka_topic)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .option("startingOffsets", "earliest")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .option("failOnDataLoss",false)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .load();df.printSchema();StreamingQuery queryone = df.selectExpr("CAST(value AS STRING)")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .writeStream()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .format("json")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .outputMode("append")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .option("checkpointLocation",Hadoop_path)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .option("path",Hadoop_path)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .start();确保架构包含value作为列。
随时随地看视频慕课网APP

相关分类

Java
我要回答