轉型與行動

Spark 使用懶惰評估 ; 這意味著它不會做任何工作,除非它真的必須這樣做。這種方法允許我們避免不必要的記憶體使用,從而使我們能夠處理大資料。

一個轉變是懶惰的評價,並在實際工作情況,當一個動作發生。

例:

In [1]: lines = sc.textFile(file)        // will run instantly, regardless file's size
In [2]: errors = lines.filter(lambda line: line.startsWith("error")) // run instantly
In [3]: errorCount = errors.count()    // an action occurred, let the party start!
Out[3]: 0                              // no line with 'error', in this example

因此,在 [1] 中,我們告訴 Spark 將檔案讀入 RDD,名為 lines。星火聽到我們,對我們說:“是的,我做到這一點”,但實際上它並沒有尚未讀取檔案。

在[2]中,我們過濾檔案的行,假設其內容包含錯誤的行,這些行在其開頭標有 error。因此,我們告訴 Spark 建立一個名為 errors 的新 RDD,它將具有 RDD lines 的元素,在其開始時有 error 這個詞。

現在,在 [3],我們要求火花計數錯誤,即算 RDD 稱為元素的數量 errors 了。count() 是一個動作,它不會選擇 Spark,而是實際進行操作,這樣就可以找到 count() 的結果,這將是一個整數。

結果,當到達 [3] 時,實際上將執行 [1][2],即當我們到達 [3] 時,然後只有這樣:

  1. 該檔案將在 textFile() 中讀取(因為 [1]

  2. lines 將是 filter()‘ed(因為 [2]

  3. count() 將執行,因為 [3]

除錯提示:由於 Spark 在到達 [3] 之前不會做任何實際的工作,所以重要的是要理解如果 [1] 和/或 [2] 中存在錯誤,它將不會出現,直到 [3] 中的動作觸發 Spark 實際執行工作。例如,如果檔案中的資料不支援我使用的 startsWith(),那麼 [2] 將被 Spark 正確接受並且不會引發任何錯誤,但是當提交 [3] 時,Spark 實際上會評估 [1][2],那麼只有這樣才能理解 [2] 的某些東西是不正確的併產生描述性的錯誤。

因此,執行 [3] 時可能會觸發錯誤,但這並不意味著錯誤必須位於 [3] 的語句中!

注意,在 [3] 之後,lineserrors 都不會儲存在記憶體中。它們將繼續僅作為一組處理指令存在。如果對這些 RDD 中的任何一個執行多個操作,spark 將多次讀取並過濾資料。為避免在單個 RDD 上執行多個操作時重複操作,使用 cache 將資料儲存到記憶體中通常很有用。

你可以在 Spark 文件中看到更多轉換/操作。