手记

Spark2.1和2.2 SQL物理执行策略之Join源码分析以及不同类型Join区分

1. object ExtractEquiJoinKeys

一个模式匹配,官方注释是:

A pattern that finds joins with equality conditions that can be evaluated using equi-join. Null-safe equality will be transformed into equality as joining key (replace null with default value).

那什么叫null-safe equality呢?这里有个case class EqualNullSafe,解释是:

expr1 FUNC expr2 - Returns same result as the EQUAL(=) operator for non-null operands, but returns true if both are null, false if one of the them is null.

意思也就是除了正常的值会判断相等之外,当等式左右两边都是null时候也会认为其相等,当有一边为null时候认为其不等。查看源码会发现,当两边都是null时候其实会被当作是0来处理。

源码:

/**
 * A pattern that finds joins with equality conditions that can be evaluated using equi-join.
 *
 * Null-safe equality will be transformed into equality as joining key (replace null with default
 * value).
 */object ExtractEquiJoinKeys extends Logging with PredicateHelper {  /** (joinType, leftKeys, rightKeys, condition, leftChild, rightChild) */
  type ReturnType =
    (JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan)  def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {    case join @ Join(left, right, joinType, condition) =>
      logDebug(s"Considering join on: $condition")      // Find equi-join predicates that can be evaluated before the join, and thus can be used
      // as join keys.
      val predicates = condition.map(splitConjunctivePredicates).getOrElse(Nil)      val joinKeys = predicates.flatMap {        case EqualTo(l, r) if l.references.isEmpty || r.references.isEmpty => None
        case EqualTo(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => Some((l, r))        case EqualTo(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => Some((r, l))        // Replace null with default value for joining key, then those rows with null in it could
        // be joined together
        case EqualNullSafe(l, r) if canEvaluate(l, left) && canEvaluate(r, right) =>          Some((Coalesce(Seq(l, Literal.default(l.dataType))),            Coalesce(Seq(r, Literal.default(r.dataType)))))        case EqualNullSafe(l, r) if canEvaluate(l, right) && canEvaluate(r, left) =>          Some((Coalesce(Seq(r, Literal.default(r.dataType))),            Coalesce(Seq(l, Literal.default(l.dataType)))))        case other => None
      }      val otherPredicates = predicates.filterNot {        case EqualTo(l, r) if l.references.isEmpty || r.references.isEmpty => false
        case EqualTo(l, r) =>
          canEvaluate(l, left) && canEvaluate(r, right) ||
            canEvaluate(l, right) && canEvaluate(r, left)        case other => false
      }      if (joinKeys.nonEmpty) {        val (leftKeys, rightKeys) = joinKeys.unzip
        logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")        Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
      } else {        None
      }    case _ => None
  }
}

首先将join的所有conditions收集出来(如果有and则收集and.left和and.right),然后分成两个Sequence,一个是joinKeys,一个是otherPredicates,前者是对于canEvaluate()为true的收集其(left, right),后者是除了前者收集到的之外的其他condition。那么其中的canEvaluate()是什么,源码如下:

/**
 * Returns true if `expr` can be evaluated using only the output of `plan`.  This method
 * can be used to determine when it is acceptable to move expression evaluation within a query
 * plan.
 *
 * For example consider a join between two relations R(a, b) and S(c, d).
 *
 * - `canEvaluate(EqualTo(a,b), R)` returns `true`
 * - `canEvaluate(EqualTo(a,c), R)` returns `false`
 * - `canEvaluate(Literal(1), R)` returns `true` as literals CAN be evaluated on any plan
 */protected def canEvaluate(expr: Expression, plan: LogicalPlan): Boolean =
  expr.references.subsetOf(plan.outputSet)

即左边表达式的字段必须是右边的子集。

满足ExtractEquiJoinKeys模式的case,会被应用到Join的物理策略中来。

2. object JoinSelection

源码:

  /**
   * Select the proper physical plan for join based on joining keys and size of logical plan.
   *
   * At first, uses the [[ExtractEquiJoinKeys]] pattern to find joins where at least some of the
   * predicates can be evaluated by matching join keys. If found,  Join implementations are chosen
   * with the following precedence:
   *
   * - Broadcast: if one side of the join has an estimated physical size that is smaller than the
   *     user-configurable [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold
   *     or if that side has an explicit broadcast hint (e.g. the user applied the
   *     [[org.apache.spark.sql.functions.broadcast()]] function to a DataFrame), then that side
   *     of the join will be broadcasted and the other side will be streamed, with no shuffling
   *     performed. If both sides of the join are eligible to be broadcasted then the
   * - Shuffle hash join: if the average size of a single partition is small enough to build a hash
   *     table.
   * - Sort merge: if the matching join keys are sortable.
   *
   * If there is no joining keys, Join implementations are chosen with the following precedence:
   * - BroadcastNestedLoopJoin: if one side of the join could be broadcasted
   * - CartesianProduct: for Inner join
   * - BroadcastNestedLoopJoin
   */
  object JoinSelection extends Strategy with PredicateHelper {    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {      // --- BroadcastHashJoin --------------------------------------------------------------------
      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)        if canBuildRight(joinType) && canBroadcast(right) =>        Seq(joins.BroadcastHashJoinExec(
          leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right)))      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)        if canBuildLeft(joinType) && canBroadcast(left) =>        Seq(joins.BroadcastHashJoinExec(
          leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right)))      // --- ShuffledHashJoin ---------------------------------------------------------------------
      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)         if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right)
           && muchSmaller(right, left) ||
           !RowOrdering.isOrderable(leftKeys) =>        Seq(joins.ShuffledHashJoinExec(
          leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right)))      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)         if !conf.preferSortMergeJoin && canBuildLeft(joinType) && canBuildLocalHashMap(left)
           && muchSmaller(left, right) ||
           !RowOrdering.isOrderable(leftKeys) =>        Seq(joins.ShuffledHashJoinExec(
          leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right)))      // --- SortMergeJoin ------------------------------------------------------------
      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)        if RowOrdering.isOrderable(leftKeys) =>
        joins.SortMergeJoinExec(
          leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil

      // --- Without joining keys ------------------------------------------------------------

      // Pick BroadcastNestedLoopJoin if one side could be broadcasted
      case j @ logical.Join(left, right, joinType, condition)          if canBuildRight(joinType) && canBroadcast(right) =>
        joins.BroadcastNestedLoopJoinExec(
          planLater(left), planLater(right), BuildRight, joinType, condition) :: Nil
      case j @ logical.Join(left, right, joinType, condition)          if canBuildLeft(joinType) && canBroadcast(left) =>
        joins.BroadcastNestedLoopJoinExec(
          planLater(left), planLater(right), BuildLeft, joinType, condition) :: Nil

      // Pick CartesianProduct for InnerJoin
      case logical.Join(left, right, _: InnerLike, condition) =>
        joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nil

      case logical.Join(left, right, joinType, condition) =>        val buildSide =          if (right.stats(conf).sizeInBytes <= left.stats(conf).sizeInBytes) {            BuildRight
          } else {            BuildLeft
          }        // This join could be very slow or OOM
        joins.BroadcastNestedLoopJoinExec(
          planLater(left), planLater(right), buildSide, joinType, condition) :: Nil

      // --- Cases where this strategy does not apply ---------------------------------------------
      case _ => Nil
    }
  }

首先看注释,该策略会首先使用ExtractEquiJoinKeys来确定join至少有一个谓词是可以去估算的,如果有的话,就要根据这些谓词来去计算选择哪种join,这里分三种Join,广播Join,Shuffle Hash Join,还有最常见的Sort Merge Join。如果没有谓词可以估算的话,那么也是有两种方式:BroadcastNestedLoopJoin和CartesianProduct。

接下来分析源码。在满足有谓词可去估算的情况下,是如何判别是哪种Join?

2.1 Broadcast Join

涉及到两个方法,canBroadcast()canBuildX(canBuildLeft或者canBuildRight)。

/**
 * Matches a plan whose output should be small enough to be used in broadcast join.
 */private def canBroadcast(plan: LogicalPlan): Boolean = {
  plan.stats(conf).hints.isBroadcastable.getOrElse(false) ||
    (plan.stats(conf).sizeInBytes >= 0 &&
      plan.stats(conf).sizeInBytes <= conf.autoBroadcastJoinThreshold)
}

可以看到canBroadcast()这边会去配置项里查找AUTO_BROADCASTJOIN_THRESHOLD,这个配置为-1是不可用。

private def canBuildRight(joinType: JoinType): Boolean = joinType match {  case _: InnerLike | LeftOuter | LeftSemi | LeftAnti => true
  case j: ExistenceJoin => true
  case _ => false}private def canBuildLeft(joinType: JoinType): Boolean = joinType match {  case _: InnerLike | RightOuter => true
  case _ => false}

可以看到,canBuildRight()canBuildLeft()方法的意思当以另外一边为主时候为true。

2.2 Shuffled Hash Join

涉及到canBuildLocalHashMap()方法、muchSmaller()方法和一个配置项PREFER_SORTMERGEJOIN,这个配置项的解释是:

When true, prefer sort merge join over shuffle hash join.

canBuildLocalHashMap()方法源码是:

/**
 * Matches a plan whose single partition should be small enough to build a hash table.
 *
 * Note: this assume that the number of partition is fixed, requires additional work if it's
 * dynamic.
 */private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {
  plan.stats(conf).sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
}

该方法涉及到了两个配置项,一个是AUTO_BROADCASTJOIN_THRESHOLD,这个配置项在广播Join中已经有使用到,是对于查询优化非常有用的配置,另外一个是SHUFFLE_PARTITIONS,是为了join或者aggregate进行shuffle时的分区数,不配置的话,默认200。

muchSmaller()源码:

/**
 * Returns whether plan a is much smaller (3X) than plan b.
 *
 * The cost to build hash map is higher than sorting, we should only build hash map on a table
 * that is much smaller than other one. Since we does not have the statistic for number of rows,
 * use the size of bytes here as estimation.
 */private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
  a.stats(conf).sizeInBytes * 3 <= b.stats(conf).sizeInBytes
}

也就是所谓的much就是3倍大小。

2.3 Sort Merge Join

收集到的join keys在数据类型上都是可以排序的情况下,可以用Sort Merge Join。

3. BroadcastHashJoinExec

case class BroadcastHashJoinExec(
    leftKeys: Seq[Expression],
    rightKeys: Seq[Expression],
    joinType: JoinType,
    buildSide: BuildSide,
    condition: Option[Expression],
    left: SparkPlan,
    right: SparkPlan)
  extends BinaryExecNode with HashJoin with CodegenSupport {  protected override def doExecute(): RDD[InternalRow] = {    val numOutputRows = longMetric("numOutputRows")    val broadcastRelation = buildPlan.executeBroadcast[HashedRelation]()
    streamedPlan.execute().mapPartitions { streamedIter =>      val hashed = broadcastRelation.value.asReadOnlyCopy()      TaskContext.get().taskMetrics().incPeakExecutionMemory(hashed.estimatedSize)
      join(streamedIter, hashed, numOutputRows)
    }
  }
}
protected def join(
    streamedIter: Iterator[InternalRow],
    hashed: HashedRelation,
    numOutputRows: SQLMetric): Iterator[InternalRow] = {  val joinedIter = joinType match {    case _: InnerLike =>
      innerJoin(streamedIter, hashed)    case LeftOuter | RightOuter =>
      outerJoin(streamedIter, hashed)    case LeftSemi =>
      semiJoin(streamedIter, hashed)    case LeftAnti =>
      antiJoin(streamedIter, hashed)    case j: ExistenceJoin =>
      existenceJoin(streamedIter, hashed)    case x =>      throw new IllegalArgumentException(        s"BroadcastHashJoin should not take $x as the JoinType")
  }  val resultProj = createResultProjection
  joinedIter.map { r =>
    numOutputRows += 1
    resultProj(r)
  }
}

上述两段代码,第一段是BroadcastHashJoinExec的定义和基本方法,第二段是其继承的HashJoinjoin()方法。在HashJoin中,存在着一些二元对象,命名为(buildXXX, streamedXXX),这里没有打出来可以自行翻源码,比如(buildPlan, streamedPlan),那么在这里,buildXXX是要被Hash或者要被广播的小表,streamedXXX是大表,stream意思就是通过迭代流过去一条条处理的意思(个人理解)。

所以在这边,将buildPlan广播出去以后,将streamedPlan调用execute()过后返回的RDD[InternalRow],调用mapPartitions,根据每个分区和广播的小表进行join操作。

4. ShuffledHashJoinExec

case class ShuffledHashJoinExec(
    leftKeys: Seq[Expression],
    rightKeys: Seq[Expression],
    joinType: JoinType,
    buildSide: BuildSide,
    condition: Option[Expression],
    left: SparkPlan,
    right: SparkPlan)
  extends BinaryExecNode with HashJoin {  private def buildHashedRelation(iter: Iterator[InternalRow]): HashedRelation = {    val buildDataSize = longMetric("buildDataSize")    val buildTime = longMetric("buildTime")    val start = System.nanoTime()    val context = TaskContext.get()    val relation = HashedRelation(iter, buildKeys, taskMemoryManager = context.taskMemoryManager())
    buildTime += (System.nanoTime() - start) / 1000000
    buildDataSize += relation.estimatedSize    // This relation is usually used until the end of task.
    context.addTaskCompletionListener(_ => relation.close())
    relation
  }  protected override def doExecute(): RDD[InternalRow] = {    val numOutputRows = longMetric("numOutputRows")
    streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, buildIter) =>      val hashed = buildHashedRelation(buildIter)
      join(streamIter, hashed, numOutputRows)
    }
  }
}

ShuffledHashJoinBroadcastJoin在构造Hash Table上有不同,后者是依靠广播生成的HashedRelation,前者是调用zipPartitions方法,该方法的作用是将两个有相同分区数的RDD合并,映射参数是两个RDD的迭代器,可以看到在这里是(streamIter, buildIter),然后对buildIter构造HashRelation。这也就说明:BroadcastJoin的HashRelation是小表的全部数据,而ShuffledHashJoin的HashRelation只是小表跟大表在同一分区内的一部分数据

5. SortMergeJoinExec

case class SortMergeJoinExec(
    leftKeys: Seq[Expression],
    rightKeys: Seq[Expression],
    joinType: JoinType,
    condition: Option[Expression],
    left: SparkPlan,
    right: SparkPlan) extends BinaryExecNode with CodegenSupport {  protected override def doExecute(): RDD[InternalRow] = {    val numOutputRows = longMetric("numOutputRows")    val spillThreshold = getSpillThreshold
    left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
      ...
      ...
    }
  }
}

可以看到,同样是将两个RDD做zipPartitions后然后将每个partition迭代做Join。



作者:orisonchan
链接:https://www.jianshu.com/p/751354519ded


0人推荐
随时随地看视频
慕课网APP