在 Scala 中建立 DataFrame

有許多方法可以建立 DataFrame。它們可以從本地列表,分散式 RDD 或從資料來源讀取建立。

使用 toDF

通過匯入 spark sql implicits,可以從本地 Seq,Array 或 RDD 建立 DataFrame,只要內容屬於 Product 子型別(元組和 case 類是 Product 子型別的眾所周知的示例)。例如:

import sqlContext.implicits._
val df = Seq(
  (1, "First Value", java.sql.Date.valueOf("2010-01-01")),
  (2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
).toDF("int_column", "string_column", "date_column")

使用 createDataFrame

另一種選擇是使用 SQLcontext 中的 createDataFrame 方法。此選項還允許使用 toDF 建立 Product 子型別的本地列表或 RDD,但不會在同一步驟中設定列的名稱。例如:

val df1 = sqlContext.createDataFrame(Seq(
  (1, "First Value", java.sql.Date.valueOf("2010-01-01")),
  (2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
))

此外,這種方法允許從 Row 例項的 RDD 建立,只要傳遞 schema 引數來定義生成的 DataFrame 模式。例:

import org.apache.spark.sql.types._
val schema = StructType(List(
    StructField("integer_column", IntegerType, nullable = false),
    StructField("string_column", StringType, nullable = true),
    StructField("date_column", DateType, nullable = true)
))

val rdd = sc.parallelize(Seq(
  Row(1, "First Value", java.sql.Date.valueOf("2010-01-01")),
  Row(2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
))

val df = sqlContext.createDataFrame(rdd, schema)

從訊息來源閱讀

也許最常見的建立 DataFrame 的方法來自資料來源。可以從 hdfs 中的鑲木地板檔案建立它,例如:

val df = sqlContext.read.parquet("hdfs:/path/to/file")