猿问

如何使用 Java 从火花中的卡夫卡读取流嵌套的 JSON

我正在尝试使用Java从卡夫卡中读取复杂的嵌套JSON数据,并且在形成数据集时遇到麻烦


发送到卡夫卡的实际 JSON 文件


{"sample_title": {"txn_date": "2019-01-10","timestamp": "2019-02-01T08:57:18.100Z","txn_type": "TBD","txn_rcvd_time": "01/04/2019 03:32:32.135","txn_ref": "Test","txn_status": "TEST"}}

{"sample_title2": {"txn_date": "2019-01-10","timestamp": "2019-02-01T08:57:18.100Z","txn_type": "TBD","txn_rcvd_time": "01/04/2019 03:32:32.135","txn_ref": "Test","txn_status": "TEST"}}

{"sample_title3": {"txn_date": "2019-01-10","timestamp": "2019-02-01T08:57:18.100Z","txn_type": "TBD","txn_rcvd_time": "01/04/2019 03:32:32.135","txn_ref": "Test","txn_status": "TEST"}}

Dataset<Row> df = spark.readStream().format("kafka")

                    .option("spark.local.dir", config.getString(PropertyKeys.SPARK_APPLICATION_TEMP_LOCATION.getCode()))

                    .option("kafka.bootstrap.servers",

                            config.getString(PropertyKeys.KAFKA_BOORTSTRAP_SERVERS.getCode()))

                    .option("subscribe", config.getString(PropertyKeys.KAFKA_TOPIC_IPE_STP.getCode()))

                    .option("startingOffsets", "earliest")

                    .option("spark.default.parallelism",

                            config.getInt(PropertyKeys.SPARK_APPLICATION_DEFAULT_PARALLELISM_VALUE.getCode()))

                    .option("spark.sql.shuffle.partitions",

                            config.getInt(PropertyKeys.SPARK_APPLICATION_SHUFFLE_PARTITIONS_COUNT.getCode()))

                    .option("kafka.security.protocol", config.getString(PropertyKeys.SECURITY_PROTOCOL.getCode()))

val output =  df.selectExpr("CAST(value AS STRING)").as(Encoders.STRING()).filter(x -> x.contains("sample_title"));

由于我可以在输入中有多个架构,因此代码应该能够处理它并根据标题进行过滤并映射到Title类型的数据集


湖上湖
浏览 120回答 1
1回答

杨__羊羊

首先使类标题成为java bean类,即编写获取器和设置器。&nbsp; &nbsp; public class Title implements Serializable {&nbsp; &nbsp; &nbsp; &nbsp; String txn_date;&nbsp; &nbsp; &nbsp; &nbsp; Timestamp timestamp;&nbsp; &nbsp; &nbsp; &nbsp; String txn_type;&nbsp; &nbsp; &nbsp; &nbsp; String txn_rcvd_time;&nbsp; &nbsp; &nbsp; &nbsp; String txn_ref;&nbsp; &nbsp; &nbsp; &nbsp; String txn_status;&nbsp; &nbsp; &nbsp; &nbsp; public Title(String data){... //set values for fields with the data}&nbsp; &nbsp; &nbsp; &nbsp; // add all getters and setters for fields&nbsp; &nbsp; }&nbsp; &nbsp; Dataset<Title> resultdf = df.selectExpr("CAST(value AS STRING)").map(value -> new Title(value), Encoders.bean(Title.class))resultdf.filter(title -> // apply any predicate on title)如果要先筛选数据,然后应用编码,&nbsp; &nbsp; df.selectExpr("CAST(value AS STRING)").filter(get_json_object(col("value"), "$.sample_title").isNotNull)// for simple filter use, .filter(t-> t.contains("sample_title")).map(value -> new Title(value), Encoders.bean(Title.class))
随时随地看视频慕课网APP

相关分类

Java
我要回答