SparkSQL:如何处理用户定义函数中的空值?

给定表1,其中一列“ x”为String类型。我想用“ y”列创建表2,该列是“ x”中给出的日期字符串的整数表示。


重要的是将null值保留在“ y”列中。


表1(数据框df1):


+----------+

|         x|

+----------+

|2015-09-12|

|2015-09-13|

|      null|

|      null|

+----------+

root

 |-- x: string (nullable = true)

表2(资料框df2):


+----------+--------+                                                                  

|         x|       y|

+----------+--------+

|      null|    null|

|      null|    null|

|2015-09-12|20150912|

|2015-09-13|20150913|

+----------+--------+

root

 |-- x: string (nullable = true)

 |-- y: integer (nullable = true)

用户定义的函数(udf)将“ x”列的值转换为“ y”列的值是:


val extractDateAsInt = udf[Int, String] (

  (d:String) => d.substring(0, 10)

      .filterNot( "-".toSet)

      .toInt )

并且有效,无法处理空值。


即使,我可以做类似的事情


val extractDateAsIntWithNull = udf[Int, String] (

  (d:String) => 

    if (d != null) d.substring(0, 10).filterNot( "-".toSet).toInt 

    else 1 )

我发现没有办法null通过udfs “产生” 值(当然,因为Ints不能null)。


我当前用于创建df2的解决方案(表2)如下:


// holds data of table 1  

val df1 = ... 


// filter entries from df1, that are not null

val dfNotNulls = df1.filter(df1("x")

  .isNotNull)

  .withColumn("y", extractDateAsInt(df1("x")))

  .withColumnRenamed("x", "right_x")


// create df2 via a left join on df1 and dfNotNull having 

val df2 = df1.join( dfNotNulls, df1("x") === dfNotNulls("right_x"), "leftouter" ).drop("right_x")

问题:


当前的解决方案似乎很麻烦(并且可能无法有效地提高性能)。有没有更好的办法?

@ Spark-developers:是否有NullableInt计划/可用的类型,以便可以使用以下udf(请参见代码摘录)?

代码摘录


val extractDateAsNullableInt = udf[NullableInt, String] (

  (d:String) => 

    if (d != null) d.substring(0, 10).filterNot( "-".toSet).toInt 

    else null )


MYYA
浏览 1189回答 3
3回答

守着星空守着你

这是Option派上用场的地方:val extractDateAsOptionInt = udf((d: String) => d match {  case null => None  case s => Some(s.substring(0, 10).filterNot("-".toSet).toInt)})或在一般情况下使其更加安全:import scala.util.Tryval extractDateAsOptionInt = udf((d: String) => Try(  d.substring(0, 10).filterNot("-".toSet).toInt).toOption)一切归功于德米特里Selivanov谁已经指出,这种解决方案为(失踪?)编辑这里。另一种方法是null在UDF之外处理:import org.apache.spark.sql.functions.{lit, when}import org.apache.spark.sql.types.IntegerTypeval extractDateAsInt = udf(   (d: String) => d.substring(0, 10).filterNot("-".toSet).toInt)df.withColumn("y",  when($"x".isNull, lit(null))    .otherwise(extractDateAsInt($"x"))    .cast(IntegerType))

噜噜哒

我创建了以下代码,以使用户定义的函数可用,以处理所述的空值。希望对别人有帮助!/** * Set of methods to construct [[org.apache.spark.sql.UserDefinedFunction]]s that * handle `null` values. */object NullableFunctions {  import org.apache.spark.sql.functions._  import scala.reflect.runtime.universe.{TypeTag}  import org.apache.spark.sql.UserDefinedFunction  /**   * Given a function A1 => RT, create a [[org.apache.spark.sql.UserDefinedFunction]] such that   *   * if fnc input is null, None is returned. This will create a null value in the output Spark column.   *   * if A1 is non null, Some( f(input) will be returned, thus creating f(input) as value in the output column.   * @param f function from A1 => RT   * @tparam RT return type   * @tparam A1 input parameter type   * @return a [[org.apache.spark.sql.UserDefinedFunction]] with the behaviour describe above   */  def nullableUdf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction = {    udf[Option[RT],A1]( (i: A1) => i match {      case null => None      case s => Some(f(i))    })  }  /**   * Given a function A1, A2 => RT, create a [[org.apache.spark.sql.UserDefinedFunction]] such that   *   * if on of the function input parameters is null, None is returned.   *     This will create a null value in the output Spark column.   *   * if both input parameters are non null, Some( f(input) will be returned, thus creating f(input1, input2)   *     as value in the output column.   * @param f function from A1 => RT   * @tparam RT return type   * @tparam A1 input parameter type   * @tparam A2 input parameter type   * @return a [[org.apache.spark.sql.UserDefinedFunction]] with the behaviour describe above   */  def nullableUdf[RT: TypeTag, A1: TypeTag, A2: TypeTag](f: Function2[A1, A2, RT]): UserDefinedFunction = {    udf[Option[RT], A1, A2]( (i1: A1, i2: A2) =>  (i1, i2) match {      case (null, _) => None      case (_, null) => None      case (s1, s2) => Some((f(s1,s2)))    } )  }}
打开App,查看更多内容
随时随地看视频慕课网APP