將資料載入到 DataFrame 中

Spark(scala) 中,我們可以通過幾種不同的方式將資料匯入 DataFrame,每種方式用於不同的用例。

從 CSV 建立 DataFrame

將資料載入到 DataFrame 的最簡單方法是從 CSV 檔案載入資料。這方面的一個例子(摘自官方檔案 )是:

import org.apache.spark.sql.SQLContext

val sqlContext = new SQLContext(sc)
val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") // Use first line of all files as header
    .option("inferSchema", "true") // Automatically infer data types
    .load("cars.csv")

隱式地從 RDD 建立 DataFrame

我們在 spark 應用程式中經常使用 RDD 中的資料,但需要將其轉換為 DataFrame。最簡單的方法是使用 .toDF() RDD 函式,它將隱式確定 DataFrame 的資料型別:

val data = List(
   ("John", "Smith", 30), 
   ("Jane", "Doe", 25)
)

val rdd = sc.parallelize(data)

val df = rdd.toDF("firstname", "surname", "age")

從 RDD 顯式建立 DataFrame

在某些情況下,使用 .toDF() 方法不是最好的主意,因為我們需要明確定義 DataFrame 的模式。這可以使用包含 StructField 陣列的 StructType 來實現。

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

val data = List(
   Array("John", "Smith", 30), 
   Array("Jane", "Doe", 25)
)

val rdd = sc.parallelize(data)

val schema = StructType(
   Array(
      StructField("firstname", StringType,  true),
      StructField("surname",   StringType,  false),
      StructField("age",       IntegerType, true)
   )
)

val rowRDD = rdd.map(arr => Row(arr : _*))

val df = sqlContext.createDataFrame(rowRDD, schema)