如何在数据集中存储自定义对象?

如何在数据集中存储自定义对象?

根据介绍星火数据集:

在我们期待Spark2.0的同时,我们计划对数据集进行一些令人兴奋的改进,特别是:.自定义编码器-尽管我们目前为各种各样的类型自动生成编码器,但我们希望为自定义对象打开一个API。

并尝试将自定义类型存储在Dataset导致以下错误:

无法找到存储在数据集中的类型的编码器。导入sqlContext.Inductions支持原始类型(Int、String等)和Producttype(CASE类)。_对序列化其他类型的支持将在以后的版本中添加。

或:

异常:未找到编码器用于.

有什么解决办法吗?


注意,这个问题仅作为CommunityWiki答案的入口点存在。随时更新/改进问题和答案。


婷婷同学_
浏览 660回答 3
3回答

饮歌长啸

更新这个答案仍然是有效和信息丰富的,尽管现在情况更好,自从2.2/2.3,这增加了内置编码器的支持Set, Seq, Map, Date, Timestamp,和BigDecimal..如果您坚持只使用case类和通常的Scala类型来创建类型,那么应该可以只使用SQLImplicits.不幸的是,在这方面几乎没有增加任何帮助。寻觅@since 2.0.0在……里面Encoders.scala或SQLImplicits.scala查找主要与原始类型有关的内容(以及对Case类的一些调整)。所以,首先要说的是:目前对自定义类编码器没有真正好的支持。..这样的话,下面是一些我们可以期望做得很好的技巧,考虑到我们目前所拥有的一切。作为一个预先的免责声明:这不会完美的工作,我会尽我最大的努力使所有的限制清楚和预先。到底是什么问题?当您想创建数据集时,SPark“需要一个编码器(将T类型的JVM对象与内部SparkSQL表示形式相互转换),该编码器通常是通过从SparkSession,也可以通过调用Encoders“(摘自医生createDataset)。编码器将采用以下形式Encoder[T]哪里T是您正在编码的类型。第一个建议是增加import spark.implicits._(这给了你这些(第二个建议是显式地传递隐式编码器,使用这,这个一组编码器相关功能。没有普通类可用的编码器,所以import spark.implicits._class MyObj(val i: Int)// ...val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))将给出以下隐式相关编译时错误:无法找到存储在数据集中的类型的编码器。导入sqlContext.Inductions支持原始类型(Int、String等)和Producttype(CASE类)。_对序列化其他类型的支持将在以后的版本中添加。但是,如果将刚才用于在某个类中获取上述错误的任何类型包装,则Product,错误会被延迟到运行时,所以import spark.implicits._case class Wrap[T](unwrap: T)class MyObj(val i: Int) // ...val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))编译很好,但是在运行时会失败。未支持的OperationException:没有为MyObj找到编码器这样做的原因是,实施者SPark创建的Induces实际上只在运行时(通过Scala关系)生成。在本例中,在编译时所有的SPark检查都是最外层的类扩展Product(所有的CASE类都这样做),并且只在运行时才意识到它仍然不知道如何处理MyObj(如果我试图创建一个Dataset[(Int,MyObj)]-星火等待运行时继续运行MyObj)。这些是迫切需要解决的核心问题:一些扩展的类Product编译,尽管在运行时总是崩溃,而且没有办法传递嵌套类型的自定义编码器(我无法仅为MyObj使它知道如何编码Wrap[MyObj]或(Int,MyObj)).只管用kryo每个人建议的解决方案是使用kryo编码器。import spark.implicits._class MyObj(val i: Int)implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj] // ...val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))不过,这很快就会变得很乏味。特别是当您的代码正在操作各种数据集、加入、分组等时。那么,为什么不直接默示这一切都是自动完成的呢?import scala.reflect.ClassTagimplicit def kryoEncoder[A](implicit ct: ClassTag[A]) =    org.apache.spark.sql.Encoders.kryo[A](ct)现在,我似乎可以做任何我想做的事情(下面的示例在spark-shell哪里spark.implicits._自动导入)class MyObj(val i: Int)val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))val d2 = d1.map(d => (d.i+1,d)).alias("d2")  // mapping works fine and ..val d3 = d1.map(d => (d.i,  d)).alias("d3")   // .. deals with the new typeval d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!或者几乎。问题是使用kryo导致SPark只将DataSet中的每一行存储为平面二进制对象。为map, filter, foreach这就足够了,但是对于像join,星火确实需要将它们分隔成列。检查架构d2或d3,您可以看到只有一个二进制列:d2.printSchema// root//  |-- value: binary (nullable = true)元组的部分解因此,在Scala中使用InstitucesinScala的魔力(更多在6.26.3过载分辨率),我可以为自己做一系列能做得尽可能好的事情,至少对于元组来说是这样,并且可以很好地与现有的Institutions一起工作:import org.apache.spark.sql.{Encoder,Encoders}import scala.reflect.ClassTagimport spark.implicits._   // we can still take advantage of all the old implicitsimplicit def single[A](implicit c: ClassTag[A]):  Encoder[A] = Encoders.kryo[A](c)implicit def tuple2[A1, A2](   implicit e1: Encoder[A1],            e2: Encoder[A2]): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)implicit def tuple3[A1, A2, A3](   implicit e1: Encoder[A1],            e2: Encoder[A2],            e3: Encoder[A3]): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)// ... you can keep making these然后,带着这些请求,我可以让上面的例子起作用,尽管用了一些列重命名。class MyObj(val i: Int)val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))val d2 = d1.map(d => (d.i+1,d)) .toDF("_1","_2").as[(Int,MyObj)].alias("d2")val d3 = d1.map(d => (d.i  ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3") val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")我还没有弄清楚如何获得预期的元组名称(_1, _2.)默认情况下不用重命名-如果有人想玩这个游戏,这,这个名字"value"被介绍和这,这个通常添加元组名称的位置。但是,关键是我现在有了一个很好的结构化模式:d4.printSchema// root//  |-- _1: struct (nullable = false)//  |    |-- _1: integer (nullable = true)//  |     |-- _2: binary (nullable = true)//  |-- _2: struct (nullable = false)//  |    |-- _1: integer (nullable = true)//  |      |-- _2: binary (nullable = true)总之,这个解决办法是:允许我们为元组获得单独的列(因此我们可以再次加入元组,耶!)我们可以再一次依赖于请求(所以不需要经过。)kryo(到处都是)几乎完全向后兼容import spark.implicits._(涉及一些重命名)是吗?不让我们加入kyro序列化二进制列,更不用说那些可能具有将某些元组列重命名为“value”(如果有必要的话,可以通过转换将其撤消),会产生令人不快的副作用。.toDF,指定新的列名,并将其转换回DataSet-模式名称似乎通过联接(最需要它们的地方)被保留。一般类的部分解这个不太愉快,也没有很好的解决办法。但是,现在我们有了上面的元组解决方案,我有一个预感-来自另一个答案的隐式转换解决方案也不会那么痛苦,因为您可以将更复杂的类转换为元组。然后,在创建DataSet之后,您可能会使用dataframe方法重命名这些列。如果一切顺利,这是真的一个改进,因为我现在可以在类的字段上执行联接。如果我只使用了一个平面二进制kryo序列化程序是不可能的。下面是一个做了一些事情的例子:我有一个类MyObj其中有类型的字段。Int, java.util.UUID,和Set[String]..第一个照顾好自己。第二个,虽然我可以使用kryo如果存储为String(自UUID这通常是我想反对的事情。第三个真正属于二进制列。class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])// alias for the type to convert to and fromtype MyObjEncoded =  (Int, String, Set[String])// implicit conversionsimplicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)implicit  def fromEncoded(e: MyObjEncoded): MyObj =   new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)现在,我可以使用这个机器创建一个具有良好模式的数据集:val d = spark.createDataset(Seq[MyObjEncoded](   new MyObj(1, java.util.UUID.randomUUID, Set("foo")),   new MyObj(2, java.util.UUID.randomUUID, Set("bar")))).toDF("i","u","s").as[MyObjEncoded]模式向我展示了I列的正确名称和前两种情况,这两种情况我都可以使用。d.printSchema// root//  |-- i: integer (nullable = false)//  |-- u: string (nullable = true)//  |-- s: binary (nullable = true)

繁花如伊

您可以使用UDT注册,然后使用案例类、元组等.所有正确的工作与您的用户定义的类型!假设您想使用自定义Enum:trait CustomEnum { def value:String }case object Foo extends CustomEnum  { val value = "F" }case object Bar extends CustomEnum   { val value = "B" }object CustomEnum {   def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get}登记如下:// First define a UDT class for it:class CustomEnumUDT extends UserDefinedType[CustomEnum] {   override def sqlType: DataType = org.apache.spark.sql.types.StringType   override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value)   // Note that this will be a UTF8String type   override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString)   override def userClass: Class[CustomEnum] = classOf[CustomEnum]}// Then Register the UDT Class!   // NOTE: you have to put this file into the org.apache.spark package!UDTRegistration.register(classOf[CustomEnum].   getName, classOf[CustomEnumUDT].getName)那就用它!case class UsingCustomEnum(id:Int, en:CustomEnum)val seq = Seq(   UsingCustomEnum(1, Foo),   UsingCustomEnum(2, Bar),   UsingCustomEnum(3, Foo)).toDS()seq.filter(_.en == Foo).show()println(seq.collect())假设您想使用多态记录:trait CustomPolycase class FooPoly(id:Int) extends CustomPolycase class BarPoly(value:String, secondValue:Long) extends CustomPoly..它的用法是这样的:case class UsingPoly(id:Int, poly:CustomPoly)Seq(   UsingPoly(1, new FooPoly(1)),   UsingPoly(2, new BarPoly("Blah", 123)),   UsingPoly(3, new FooPoly(1))).toDS polySeq.filter(_.poly match {   case FooPoly(value) => value == 1   case _ => false}).show()您可以编写一个自定义的UDT,它将所有内容编码为字节(我在这里使用java序列化,但更好的方法可能是检测SPark的Kryo上下文)。首先定义UDT类:class CustomPolyUDT extends UserDefinedType[CustomPoly] {   val kryo = new Kryo()   override def sqlType: DataType = org.apache.spark.sql.types.BinaryType   override def serialize(obj: CustomPoly): Any = {     val bos = new ByteArrayOutputStream()     val oos = new ObjectOutputStream(bos)     oos.writeObject(obj)     bos.toByteArray  }   override def deserialize(datum: Any): CustomPoly = {     val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])     val ois = new ObjectInputStream(bis)     val obj = ois.readObject()     obj.asInstanceOf[CustomPoly]   }   override def userClass: Class[CustomPoly] = classOf[CustomPoly]}然后注册:// NOTE: The file you do this in has to be inside of the org.apache.spark package!UDTRegistration.register(classOf[CustomPoly]. getName, classOf[CustomPolyUDT].getName)那你就可以用它了!// As shown above:case class UsingPoly(id:Int, poly:CustomPoly)Seq(   UsingPoly(1, new FooPoly(1)),   UsingPoly(2, new BarPoly("Blah", 123)),   UsingPoly(3, new FooPoly(1))).toDS polySeq.filter(_.poly match {   case FooPoly(value) => value == 1   case _ => false}).show()
打开App,查看更多内容
随时随地看视频慕课网APP