猿问

如何在流查询中使用 from_json 标准函数(在 select 中)?

我使用以下 JSON 结构处理来自 Kafka 的消息:


{"unix_time": 1557678233, "category_id": 1000, "ip": "172.10.34.17", "type": "view"}

我想打印出我收到的内容。这是我已经完成的代码片段:


JavaSparkContext sc = createJavaSparkContext();

JavaStreamingContext streamingContext =

                new JavaStreamingContext(sc, Durations.seconds(BATCH_DURATION_IN_SECONDS));


SparkSession sparkSession = SparkSession

        .builder()

        .config(new SparkConf())

        .getOrCreate();


Dataset<Row> df = sparkSession

        .readStream()

        .format("kafka")

        .option("kafka.bootstrap.servers", CommonUtils.KAFKA_HOST_PORT)

        .option("subscribe", KAFKA_TOPIC)

        .load();


StreamingQuery query = df.selectExpr("CAST(value AS STRING)")

            .select(from_json(new Column("value"), getSchema())).as("data").

                    select("data.category_id").writeStream().foreach(new ForeachWriter<Row>() {

                @Override

                public void process(Row value) {

                    System.out.println(value);

                }


                @Override

                public void close(Throwable errorOrNull) {


                }


                @Override

                public boolean open(long partitionId, long version) {

                    return true;

                }

            })

            .start();


    query.awaitTermination();

架构方法:


private static StructType getSchema() {

    return new StructType(new StructField[]{

            new StructField(UNIX_TIME, DataTypes.TimestampType, false, Metadata.empty()),

            new StructField(CATEGORY_ID, DataTypes.IntegerType, false, Metadata.empty()),

            new StructField(IP, DataTypes.StringType, false, Metadata.empty()),

            new StructField(TYPE, DataTypes.StringType, false, Metadata.empty()),

    });

}

如何克服这个问题?对此有何建议?


摇曳的蔷薇
浏览 102回答 1
1回答

慕虎7371278

异常的这一部分准确地告诉您在哪里寻找答案:无法解析给定输入列的“data.category_id”:[jsontostruct(value)]换句话说,data.category_id可用列中没有一列只是 1 列jsontostruct(value)。这意味着仅select在流式查询中不起作用。原因相当简单(我可以将其视为拼写错误)——在Column和Datasetas("data")类型上可用的右括号太多。总之,替换查询的以下部分:.select(from_json(new Column("value"), getSchema())).as("data")至以下内容:.select(from_json(new Column("value"), getSchema()).as("data"))请注意,我将一个右括号移到了末尾。
随时随地看视频慕课网APP

相关分类

Java
我要回答