使用 Spark 來處理資料檔案

已完成

使用 Spark 的優點之一,是您可以使用各種程式設計語言撰寫和執行程式碼,方便運用您既有的程式設計技能,並針對指定的工作使用最適當的語言。 全新 Azure Databricks Spark 筆記本的預設語言是 PySpark (Python 的 Spark 最佳化版本),由於對資料操作和視覺效果提供強式支援,因此資料科學家和分析師常使用此版本。 此外,您也可以使用 Scala (JAVA 衍生語言,能以互動方式使用) 和 SQL (Spark SQL 程式庫中常用的 SQL 語言變體,用於處理關聯式資料結構) 這類語言。 軟體工程師也可以建立使用 JAVA 等架構在 Spark 上執行的編譯解決方案。

使用資料框架探索資料

Spark 會原生使用稱為彈性分散式資料集 (RDD) 的資料結構;但是雖然可以撰寫直接與 RDD 搭配運作的程式碼,但最常用來搭配 Spark 中結構化資料使用的資料結構是資料框架 (隨附於 Spark SQL 程式庫中)。 Spark 中的資料框架類似於通用 Pandas Python 程式庫中的資料框架,但已經過最佳化,可在 Spark 的分散式處理環境中運作。

注意

除了 Dataframe API 外,Spark SQL 還提供 Java 和 Scala 支援的強型別 Dataset API。 本課程模組會聚焦在 Dataframe API。

將資料載入資料框架中

現在來探索假設範例,了解如何使用資料框架來處理資料。 假設您在 Databricks 檔案系統 (DBFS) 儲存體 data 資料夾內名為 products.csv 的逗點分隔文字檔中有下列資料:

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

在 Spark 筆記本中,可以使用下列 PySpark 程式碼將資料載入資料框架,並顯示前 10 列:

%pyspark
df = spark.read.load('/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

開頭的 %pyspark 行稱為 magic,用於告知 Spark 此儲存格中使用的語言是 PySpark。 以下是產品資料範例的對等 Scala 程式碼:

%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))

magic %spark 用來指定 Scala。

提示

您也可以選取要用於筆記本介面中每個儲存格的語言。

先前顯示的這兩個範例會產生如下的輸出:

ProductID ProductName 類別 ListPrice
771 Mountain-100 Silver, 38 Mountain Bikes 3399.9900
772 Mountain-100 Silver, 42 Mountain Bikes 3399.9900
773 Mountain-100 Silver, 44 Mountain Bikes 3399.9900
... ... ... ...

指定資料框架結構描述

在上一個範例中,CSV 檔的第一個資料列中包含資料行名稱,而 Spark 可以從包含的資料推斷每個資料行的資料類型。 您也可以為資料指定明確的結構描述,當資料檔案中未包含資料行名稱時很實用,如下列 CSV 範例所示:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

下列 PySpark 範例示範如何使用下列格式,指定要從 product-data.csv 檔案載入之資料框架的結構描述:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

結果會再次類似於:

ProductID ProductName 類別 ListPrice
771 Mountain-100 Silver, 38 Mountain Bikes 3399.9900
772 Mountain-100 Silver, 42 Mountain Bikes 3399.9900
773 Mountain-100 Silver, 44 Mountain Bikes 3399.9900
... ... ... ...

篩選和分組資料框架

您可以使用資料框架類別方法來篩選、排序、分組,以及操作它所包含的資料。 例如,下列程式碼範例會使用 select 方法,從上一個範例中包含產品資料的 df 資料框架擷取 ProductName 和 ListPrice 資料行:

pricelist_df = df.select("ProductID", "ListPrice")

此程式碼範例的結果看起來會像這樣:

ProductID ListPrice
771 3399.9900
772 3399.9900
773 3399.9900
... ...

大部分資料操作方法的共通點是,select 都會傳回新的資料框架物件。

提示

從資料框架中選取資料行子集是常見的作業,也可以使用下列較短的語法來達成這個目標:

pricelist_df = df["ProductID", "ListPrice"]

您可以將方法「鏈結」在一起,執行一連串的操作,產生轉換的資料框架。 例如,此範例程式碼會鏈結 selectwhere 方法來建立新的資料框架,其中包含類別為 Mountain BikesRoad Bikes 的產品之 ProductNameListPrice 資料行:

bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

此程式碼範例的結果看起來會像這樣:

ProductName ListPrice
Mountain-100 Silver, 38 3399.9900
Road-750 Black, 52 539.9900
... ...

若要分組和彙總資料,可以使用 groupBy 方法和彙總函式。 例如,下列 PySpark 程式碼會計算每個類別的產品數目:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

此程式碼範例的結果看起來會像這樣:

類別 計數
Headsets 3
Wheels 14
Mountain Bikes 32
... ...

在 Spark 中使用 SQL 運算式

Dataframe API 是名為 Spark SQL 的 Spark 程式庫的一部分,可讓資料分析師使用 SQL 運算式來查詢及操作資料。

在 Spark 目錄中建立資料庫物件

Spark 目錄是關聯式資料物件 (例如檢視和資料表) 的中繼存放區。 Spark 執行階段可以使用目錄,順暢整合以任何 Spark 支援的語言撰寫的程式碼,以及對某些資料分析師或開發人員而言可能更自然的 SQL 運算式。

若要讓資料框架中的資料可在 Spark 目錄中查詢,其中一個最簡單方式就是建立暫存檢視,如下列程式碼範例所示:

df.createOrReplaceTempView("products")

檢視是暫時性的,這表示它會自動在目前工作階段結束時刪除。 您也可以建立保存於目錄中的資料表,用於定義可使用 Spark SQL 查詢的資料庫。

注意

我們不會在本課程模組中深入探討 Spark 目錄資料表,但值得花一些時間強調幾個重點:

  • 您可以使用 spark.catalog.createTable 方法來建立空的資料表。 資料表是中繼資料結構,會將其基礎資料儲存在與目錄相關聯的儲存位置。 刪除資料表也會刪除其基礎資料。
  • 您可以使用資料框架的 saveAsTable 方法,將資料框架儲存為資料表。
  • 您可以使用 方法來建立 外部 資料表 spark.catalog.createExternalTable 。 外部資料表會定義目錄中的中繼資料,但會從外部儲存位置取得其基礎資料;通常是資料湖中的資料夾。 刪除外部資料表並不會刪除基礎資料。

使用 Spark SQL API 查詢資料

在以任何語言撰寫的程式碼中,都能使用 Spark SQL API 來查詢目錄中的資料。 例如,下列 PySpark 程式碼會使用 SQL 查詢,以資料框架的形式從 products 檢視傳回資料。

bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
                      FROM products \
                      WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)

此範例程式碼的結果可能看起來類似下表:

ProductName ListPrice
Mountain-100 Silver, 38 3399.9900
Road-750 Black, 52 539.9900
... ...

使用 SQL 程式碼

上述範例示範如何使用 Spark SQL API 在 Spark 程式碼中內嵌 SQL 運算式。 在筆記本中,您也可以使用 %sql magic 來執行 SQL 程式碼,以查詢目錄中的物件,如下所示:

%sql

SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category

SQL 程式碼範例傳回的結果集會自動在筆記本中顯示為資料表,如下所示:

類別 ProductCount
Bib-Shorts 3
Bike Racks 1
Bike Stands 1
... ...