在 Spark 中使用 Delta 表格

已完成

您可以使用 Delta 表格 (或差異格式檔案) 以多種方式來擷取和修改資料。

使用 Spark SQL

在 Spark 中處理 Delta 表格中的資料最常見的方式是使用 Spark SQL。 您可以使用 spark.sql 程式庫,將 SQL 陳述式內嵌在其他語言 (例如 PySpark 或 Scala)。 例如,下列程式碼會將資料列插入 products 表格中。

spark.sql("INSERT INTO products VALUES (1, 'Widget', 'Accessories', 2.99)")

或者,您也可以在 Notebook 中使用 %%sql magic 來執行 SQL 陳述式。

%%sql

UPDATE products
SET Price = 2.49 WHERE ProductId = 1;

使用 Delta API

當您想要使用差異檔案而不是目錄表格時,使用 Delta Lake API 可能會更簡單。 您可以從包含差異格式檔的資料夾位置建立 DeltaTable 的實例,然後使用 API 來修改表格中的資料。

from delta.tables import *
from pyspark.sql.functions import *

# Create a DeltaTable object
delta_path = "Files/mytable"
deltaTable = DeltaTable.forPath(spark, delta_path)

# Update the table (reduce price of accessories by 10%)
deltaTable.update(
    condition = "Category == 'Accessories'",
    set = { "Price": "Price * 0.9" })

使用「時間移動」來處理表格版本設定

對 Delta 表格所做的修改會記錄在表格的交易記錄中。 您可以使用記錄的交易來檢視表格變更的歷程記錄,以及擷取舊版的資料 (稱為「時間移動」)

若要查看表格的歷程記錄,您可以使用 DESCRIBE SQL 命令,如下所示。

%%sql

DESCRIBE HISTORY products

此陳述式的結果顯示已套用至表格的交易,如下所示 (某些資料行已被省略):

version timestamp 作業 operationParameters
2 2023-04-04T21:46:43Z UPDATE {"predicate":"(ProductId = 1)"}
1 2023-04-04T21:42:48Z WRITE {"mode":"Append","partitionBy":"[]"}
0 2023-04-04T20:04:23Z 建立資料表 {"isManaged":"true","description":null,"partitionBy":"[]","properties":"{}"}

若要查看外部表格的歷程記錄,您可以指定資料夾位置,而不是表格名稱。

%%sql

DESCRIBE HISTORY 'Files/mytable'

您可以將差異檔位置讀入資料框架中,並以 versionAsOf 選項指定所需的版本,以擷取特定版本的資料:

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)

或者,您可以藉由使用 timestampAsOf 選項來指定時間戳記:

df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_path)