手记

Spark Programming Guide(二)

Basics

To illustrate RDD basics, consider the simple program below:
仔细观察下面的程序,介绍了RDD的基本功能:

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

The first line defines a base RDD from an external file. This dataset is not loaded in memory or otherwise acted on: lines is merely a pointer to the file. The second line defines lineLengths as the result of a map transformation. Again, lineLengths is not immediately computed, due to laziness. Finally, we run reduce, which is an action. At this point Spark breaks the computation into tasks to run on separate machines, and each machine runs both its part of the map and a local reduction, returning only its answer to the driver program.

第一行代码从外部文件定义了一个基本的RDD,这里数据集并没有被真正的加载进内存,这里仅仅是建立了一个指向文件的一个指针。第二行定义了一个lineLengths的数据集由第一个数据集转换操作产生。同样由于RDD是惰性加载的,这里lineLengths并不会立即进行计算。最后,我们执行了action进行reduce计算。到此时Spark将计算任务分配到不同的机器上,每台机器同时对分区数据进行map和本地的reduce操作,最后把结果数据返回到驱动程序。

If we also wanted to use lineLengths again later, we could add:

如果我们在后面需要继续使用lineLengths,我们可以调用如下方法:

lineLengths.persist()

before the reduce, which would cause lineLengths to be saved in memory after the first time it is computed.

当执行第一个reduce计算操作之后lineLengths将会被持久化到内存中

Passing Functions to Spark

Spark’s API relies heavily on passing functions in the driver program to run on the cluster. There are two recommended ways to do this:

Spark的API很大程度上依赖于从驱动程序传递函数到集群之上,这里有两种推荐的方法来做这件事:

  1. Anonymous function syntax, which can be used for short pieces of code.

使用简短的匿名函数语法

  1. Static methods in a global singleton object. For example, you can define object MyFunctions and then pass MyFunctions.func1, as follows:

使用全集单例对象的静态方法。比如,你可以定义一个对象MyFunctions并且调用MyFunctions.func1方法,就像下面:

object MyFunctions {
  def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)

Note that while it is also possible to pass a reference to a method in a class instance (as opposed to a singleton object), this requires sending the object that contains that class along with the method. For example, consider:

注意,虽然还可以传递类实例中的方法的引用(而不是单例对象),但这需要将包含该类的对象连同方法一起发送出去。例如,考虑:

class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

Here, if we create a new MyClass instance and call doStuff on it, the map inside there references the func1 method of that MyClass instance, so the whole object needs to be sent to the cluster. It is similar to writing rdd.map(x => this.func1(x)).

这里,如果我们创建一个MyClass的实例并调用了它的doStuff方法,这里的map操作中引用了MyClass实例的func1方法,所以整个MyClass对象都需要被发送的集群上,等同于rdd.map(x => this.func1(x)).

In a similar way, accessing fields of the outer object will reference the whole object:

以类似的方式,访问外部对象的字段将引用整个对象:

class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}

is equivalent to writing rdd.map(x => this.field + x), which references all of this. To avoid this issue, the simplest way is to copy field into a local variable instead of accessing it externally:

这等同于rdd.map(x => this.field + x),引用了整个对象。最简单的方法是将字段复制到本地变量,而不是外部访问它:

def doStuff(rdd: RDD[String]): RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
}
Understanding closures

One of the harder things about Spark is understanding the scope and life cycle of variables and methods when executing code across a cluster. RDD operations that modify variables outside of their scope can be a frequent source of confusion. In the example below we’ll look at code that uses foreach() to increment a counter, but similar issues can occur for other operations as well.

想要理解Spark的方法和变量在集群中的生命周期是比较困难的。RDD操作在其范围之外修改变量可能是造成混淆的常见原因。在下面的示例中,我们将查看使用foreach()来增加计数器的代码,但是类似的问题也可能发生在其他操作中。

Example

Consider the naive RDD element sum below, which may behave differently depending on whether execution is happening within the same JVM. A common example of this is when running Spark in local mode (--master = local[n]) versus deploying a Spark application to a cluster (e.g. via spark-submit to YARN):

看下面一个简单的 RDD 元素求和,执行结果可能不同,具体取决于是否在同一个 JVM 中执行. 一个常见的例子是当 Spark 运行在 local 本地模式(--master = local[n])时,与部署 Spark 应用到群集(例如,通过 spark-submit 到 YARN):

var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)
Local vs. cluster modes

The behavior of the above code is undefined, and may not work as intended. To execute jobs, Spark breaks up the processing of RDD operations into tasks, each of which is executed by an executor. Prior to execution, Spark computes the task’s closure. The closure is those variables and methods which must be visible for the executor to perform its computations on the RDD (in this case foreach()). This closure is serialized and sent to each executor.

上述代码的行为未定义,可能不会按预期的方式工作。执行作业时,Spark将会分解RDD操作并加入到任务队列中,每个任务都运行在一个单独的执行器中。在执行之前,Spark将确认任务的闭包,而闭包是在RDD上的executor必须能够访问的变量和方法(例中的 foreach())。闭包被序列化并被发送到每个执行器。

The variables within the closure sent to each executor are now copies and thus, when counter is referenced within the foreach function, it’s no longer the counter on the driver node. There is still a counter in the memory of the driver node but this is no longer visible to the executors! The executors only see the copy from the serialized closure. Thus, the final value of counter will still be zero since all operations on counter were referencing the value within the serialized closure.

闭包的变量副本发给每个 counter ,当 counter 被 foreach 函数引用的时候,它已经不再是 driver node 的 counter 了。虽然在 driver node 仍然有一个 counter 在内存中,但是对 executors 已经不可见。executor 看到的只是序列化的闭包一个副本。所以 counter 最终的值还是 0,因为对 counter 所有的操作均引用序列化的 closure 内的值。

In local mode, in some circumstances the foreach function will actually execute within the same JVM as the driver and will reference the same original counter, and may actually update it.

在 local 本地模式,在某些情况下的foreach功能实际上是同一JVM上的驱动程序中执行,并会引用同一个原始的counter计数器,实际上可能进行更新操作.

To ensure well-defined behavior in these sorts of scenarios one should use an Accumulator. Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail.

为了确保类似的场景下行为明确应用使用累加器(Accumulator)。Spark的累加器使用了一种特殊的方式提供了一种安全的机制去更新集群中各个worker节点的变量。本指南的累加器部分将更详细地讨论这些问题。

In general, closures - constructs like loops or locally defined methods, should not be used to mutate some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures. Some code that does this may work in local mode, but that’s just by accident and such code will not behave as expected in distributed mode. Use an Accumulator instead if some global aggregation is needed.

通常情况下,closures - constructs类似于循环或局部定义的方法,不应该用户去改变一些全局的状态。Spark不能保证类似的行为能够正常的执行。有些代码可以在本地模式下工作,但这只是偶然的,这样的代码在分布式模式下不会像预期的那样运行。如果需要一些全局的聚合功能,应使用 Accumulator(累加器)。

Printing elements of an RDD

Another common idiom is attempting to print out the elements of an RDD using rdd.foreach(println) or rdd.map(println). On a single machine, this will generate the expected output and print all the RDD’s elements. However, in cluster mode, the output to stdout being called by the executors is now writing to the executor’s stdout instead, not the one on the driver, so stdout on the driver won’t show these! To print all elements on the driver, one can use the collect() method to first bring the RDD to the driver node thus: rdd.collect().foreach(println). This can cause the driver to run out of memory, though, because collect() fetches the entire RDD to a single machine; if you only need to print a few elements of the RDD, a safer approach is to use the take(): rdd.take(100).foreach(println).

另一种常见的语法用于打印 RDD 的所有元素使用rdd.foreach(println)或rdd.map(println)。在单台机器上,这将预期输出和打印 RDD 的所有元素。在集群模式下,标准输出将会被集群节点中的executor所替代,而不会再驱动程序中输出,所以驱动程序中不会输出元素信息!要打印 driver 程序的所有元素,可以使用的 collect() 方法首先把 RDD 放到 driver 程序节点上,像这样: rdd.collect().foreach(println)。但是这可能会导致驱动节点内存耗尽,由于collect()会获取RDD所有元素到一台机器上,如果你仅仅需要打印几个元素,一种更安全的方式是使用take()方法:rdd.take(100).foreach(println).

2人推荐
随时随地看视频
慕课网APP