由:org.apache.spark.sql.Dataset上的java.lang

下面我提供我的代码。我遍历DataFrame,prodRows并为每个product_PK找到了匹配的product_PKs子列表prodRows。


  numRecProducts = 10

  var listOfProducts: Map[Long,Array[(Long, Int)]] = Map()

  prodRows.foreach{ row : Row =>

      val product_PK = row.get(row.fieldIndex("product_PK")).toString.toLong

      val gender = row.get(row.fieldIndex("gender_PK")).toString

      val selection = prodRows.filter($"gender_PK" === gender || $"gender_PK" === "UNISEX").limit(numRecProducts).select($"product_PK")

      var productList: Array[(Long, Int)] = Array()

      if (!selection.rdd.isEmpty()) {

        productList = selection.rdd.map(x => (x(0).toString.toLong,1)).collect()

      }

    listOfProducts = listOfProducts + (product_PK -> productList)

  }

但是当我执行它时,它给了我以下错误。selection在某些迭代中看起来像是空的。但是,我不明白如何处理此错误:


Driver stacktrace:

    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690)

    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1678)

    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1677)

    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)

    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1677)

    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)

    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)

    at scala.Option.foreach(Option.scala:257)

    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:855)

    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1905)


这是什么意思,我该如何处理?


陪伴而非守候
浏览 877回答 2
2回答

蓝山帝景

问题是您尝试prodRows从内部访问prodRows.foreach。您不能在转换中使用数据框,数据框仅存在于驱动程序中。
打开App,查看更多内容
随时随地看视频慕课网APP