建立 Delta Lake 資料表

已完成

Delta Lake 建立在資料表上,可提供資料湖中檔案的關聯式儲存體抽象概念。

從資料框架建立 Delta Lake 資料表

建立 Delta Lake 資料表最簡單的其中一種方式,就是以「差異」格式儲存資料框架,並指定應該儲存資料表之資料檔案和相關中繼資料資訊的路徑。

例如,下列 PySpark 程式碼會載入具有現有檔案資料的資料框架,然後將該資料框架儲存到「差異」格式的新資料夾位置:

# Load a file into a dataframe
df = spark.read.load('/data/mydata.csv', format='csv', header=True)

# Save the dataframe as a delta table
delta_table_path = "/delta/mydata"
df.write.format("delta").save(delta_table_path)

儲存差異資料表之後,您指定的路徑位置會包含資料的 parquet 檔案 (不論載入資料框架的來源檔案格式為何),以及包含資料表交易記錄的 _delta_log 資料夾。

注意

交易記錄會將所有資料修改記錄至資料表。 藉由記錄每個修改,可以強制執行交易一致性,並保留資料表的版本設定資訊。

您可以使用覆寫模式,將現有 Delta Lake 資料表取代為資料框架的內容,如下所示:

new_df.write.format("delta").mode("overwrite").save(delta_table_path)

您也可以使用附加模式,將資料列從資料框架新增至現有的資料表:

new_rows_df.write.format("delta").mode("append").save(delta_table_path)

進行條件式更新

雖然您可以在資料框架中修改資料,然後藉由覆寫該資料來取代 Delta Lake 資料表,但資料庫中的較常見模式是插入、更新或刪除現有資料表中的資料列作為離散交易作業。 若要向 Delta Lake 資料表進行此類修改,您可以使用 Delta Lake API 中的 DeltaTable 物件,其支援「更新」、「刪除」和「合併」作業。 例如,您可以使用下列程式碼,針對 [類別] 資料行值為「配件」的所有資料列,更新 [價格] 資料行:

from delta.tables import *
from pyspark.sql.functions import *

# Create a deltaTable object
deltaTable = DeltaTable.forPath(spark, delta_table_path)

# Update the table (reduce price of accessories by 10%)
deltaTable.update(
    condition = "Category == 'Accessories'",
    set = { "Price": "Price * 0.9" })

資料修改會記錄在交易記錄中,並視需要在資料表資料夾中建立新的 parquet 檔案。

提示

如需使用 Data Lake API 的詳細資訊,請參閱 Delta Lake API 文件

查詢舊版資料表

Delta Lake 資料表支援透過交易記錄進行版本設定。 交易記錄會記錄對資料表所做的修改,並記下每個交易的時間戳記和版本號碼。 您可以使用此記錄的版本資料以檢視舊版資料表 - 此功能稱為「時間移動」

您可以藉由將差異資料表位置的資料讀取至資料框架,使用 versionAsOf 選項指定所需版本,以從特定版本的 Delta Lake 資料表擷取資料:

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)

或者,您可以藉由使用 timestampAsOf 選項來指定時間戳記:

df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_table_path)