转型与行动

Spark 使用懒惰评估 ; 这意味着它不会做任何工作,除非它真的必须这样做。这种方法允许我们避免不必要的内存使用,从而使我们能够处理大数据。

一个转变是懒惰的评价,并在实际工作情况,当一个动作发生。

例:

In [1]: lines = sc.textFile(file)        // will run instantly, regardless file's size
In [2]: errors = lines.filter(lambda line: line.startsWith("error")) // run instantly
In [3]: errorCount = errors.count()    // an action occurred, let the party start!
Out[3]: 0                              // no line with 'error', in this example

因此,在 [1] 中,我们告诉 Spark 将文件读入 RDD,名为 lines。星火听到我们,对我们说:“是的,我做到这一点”,但实际上它并没有尚未读取文件。

在[2]中,我们过滤文件的行,假设其内容包含错误的行,这些行在其开头标有 error。因此,我们告诉 Spark 创建一个名为 errors 的新 RDD,它将具有 RDD lines 的元素,在其开始时有 error 这个词。

现在,在 [3],我们要求火花计数错误,即算 RDD 称为元素的数量 errors 了。count() 是一个动作,它不会选择 Spark,而是实际进行操作,这样就可以找到 count() 的结果,这将是一个整数。

结果,当到达 [3] 时,实际上将执行 [1][2],即当我们到达 [3] 时,然后只有这样:

  1. 该文件将在 textFile() 中读取(因为 [1]

  2. lines 将是 filter()‘ed(因为 [2]

  3. count() 将执行,因为 [3]

调试提示:由于 Spark 在到达 [3] 之前不会做任何实际的工作,所以重要的是要理解如果 [1] 和/或 [2] 中存在错误,它将不会出现,直到 [3] 中的动作触发 Spark 实际执行工作。例如,如果文件中的数据不支持我使用的 startsWith(),那么 [2] 将被 Spark 正确接受并且不会引发任何错误,但是当提交 [3] 时,Spark 实际上会评估 [1][2],那么只有这样才能理解 [2] 的某些东西是不正确的并产生描述性的错误。

因此,执行 [3] 时可能会触发错误,但这并不意味着错误必须位于 [3] 的语句中!

注意,在 [3] 之后,lineserrors 都不会存储在内存中。它们将继续仅作为一组处理指令存在。如果对这些 RDD 中的任何一个执行多个操作,spark 将多次读取并过滤数据。为避免在单个 RDD 上执行多个操作时重复操作,使用 cache 将数据存储到内存中通常很有用。

你可以在 Spark 文档中看到更多转换/操作。