在组归约上按键触发嵌套结构的错误序列化

我想按键减少数据帧。reduce 逻辑非常复杂,需要更新大约 10-15 个字段。这就是为什么我想将 DataFrame 转换为 DataSet 并减少 Java POJO。

问题

问题是,在groupByKey-reduceByKey我得到一些非常奇怪的值之后。Encoders.bean(Entity.class)读取正确的数据。请参阅代码示例部分

变通方法

替换Encoders.beanEncoders.kryo不起作用,异常:

Try to map struct<broker_name:string,server_name:string,order:int,storages:array<struct<timestamp:timestamp,storage:double>>> to Tuple1, but failed as the number of fields does not line up.

我也看到了这个 workarround,但Encoders.product需要TypeTag. 我不知道如何TypeTag在 Java 代码中创建。

慕标5832272
浏览 107回答 1
1回答

泛舟湖上清波郎朗

这是因为反序列化使用由 推断出的架构上的结构匹配Encoder,并且由于 bean 类没有自然结构,架构的字段按名称排序。所以如果你定义一个像你的 bean 类Entity,从 bean 推断的模式Encoder将是Encoders.bean(Storage.class).schema().printTreeString();root&nbsp;|-- storage: double (nullable = true)&nbsp;|-- timestamp: timestamp (nullable = true)不是root&nbsp;|-- timestamp: timestamp (nullable = true)&nbsp;|-- storage: double (nullable = true)这是应该使用的架构Dataset。换句话说,架构定义为:StructType schema = Encoders.bean(Entity.class).schema();或者StructType schema = StructType.fromDDL(&nbsp; "broker_name string, order integer, server_name string, " +&nbsp;&nbsp; "storages array<struct<storage: double, timestamp: timestamp>>"&nbsp;);将是有效的,并且可以用于testData直接加载:Dataset<Entity> ds = spark.read()&nbsp; .option("multiline", "true")&nbsp; .schema(schema)&nbsp; .json("testData.json")&nbsp; .as(Encoders.bean(Entity.class));而您当前的架构,相当于:StructType valid = StructType.fromDDL(&nbsp; "broker_name string, order integer, server_name string, " +&nbsp;&nbsp; "storages array<struct<timestamp: timestamp, storage: double>>"&nbsp;);不是,尽管它可以与 JSON 阅读器一起使用,它(与 相比Encoders)按名称匹配数据。可以说,这种行为应该被报告为一个错误——直观地说,不应该有Encoder转储与其自己的加载逻辑不兼容的数据的情况。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java