在 Spark 中使用 Delta 表格
您可以使用 Delta 表格 (或差異格式檔案) 以多種方式來擷取和修改資料。
使用 Spark SQL
在 Spark 中處理 Delta 表格中的資料最常見的方式是使用 Spark SQL。 您可以使用 spark.sql 程式庫,將 SQL 陳述式內嵌在其他語言 (例如 PySpark 或 Scala)。 例如,下列程式碼會將資料列插入 products 表格中。
spark.sql("INSERT INTO products VALUES (1, 'Widget', 'Accessories', 2.99)")
或者,您也可以在 Notebook 中使用 %%sql
magic 來執行 SQL 陳述式。
%%sql
UPDATE products
SET Price = 2.49 WHERE ProductId = 1;
使用 Delta API
當您想要使用差異檔案而不是目錄表格時,使用 Delta Lake API 可能會更簡單。 您可以從包含差異格式檔的資料夾位置建立 DeltaTable 的實例,然後使用 API 來修改表格中的資料。
from delta.tables import *
from pyspark.sql.functions import *
# Create a DeltaTable object
delta_path = "Files/mytable"
deltaTable = DeltaTable.forPath(spark, delta_path)
# Update the table (reduce price of accessories by 10%)
deltaTable.update(
condition = "Category == 'Accessories'",
set = { "Price": "Price * 0.9" })
使用「時間移動」來處理表格版本設定
對 Delta 表格所做的修改會記錄在表格的交易記錄中。 您可以使用記錄的交易來檢視表格變更的歷程記錄,以及擷取舊版的資料 (稱為「時間移動」)
若要查看表格的歷程記錄,您可以使用 DESCRIBE
SQL 命令,如下所示。
%%sql
DESCRIBE HISTORY products
此陳述式的結果顯示已套用至表格的交易,如下所示 (某些資料行已被省略):
version | timestamp | 作業 | operationParameters |
---|---|---|---|
2 | 2023-04-04T21:46:43Z | UPDATE | {"predicate":"(ProductId = 1)"} |
1 | 2023-04-04T21:42:48Z | WRITE | {"mode":"Append","partitionBy":"[]"} |
0 | 2023-04-04T20:04:23Z | 建立資料表 | {"isManaged":"true","description":null,"partitionBy":"[]","properties":"{}"} |
若要查看外部表格的歷程記錄,您可以指定資料夾位置,而不是表格名稱。
%%sql
DESCRIBE HISTORY 'Files/mytable'
您可以將差異檔位置讀入資料框架中,並以 versionAsOf
選項指定所需的版本,以擷取特定版本的資料:
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)
或者,您可以藉由使用 timestampAsOf
選項來指定時間戳記:
df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_path)