教學課程:Delta Lake

本教學課程介紹 Azure Databricks 上的常見 Delta Lake 作業,包括下列各項:

您可以從連結至 Azure Databricks 叢集的筆記本,執行本文中的範例 Python、R、Scala 和 SQL 程式代碼。 您也可以從 Databricks SQL 中與 SQL 倉儲相關聯的查詢執行本文中的 SQL 程式代碼。

注意

下列一些程式代碼範例使用由架構(也稱為資料庫)和數據表或檢視所組成的兩層命名空間表示法(例如 , default.people10m。 若要搭配 Unity 目錄使用這些範例,請將兩層命名空間取代為 Unity Catalog 三層命名空間表示法,其中包含目錄、架構和數據表或檢視表(例如 , main.default.people10m

建立數據表

根據預設,在 Azure Databricks 上建立的所有數據表都會使用 Delta Lake。

注意

Delta Lake 是 Azure Databricks 的所有讀取、寫入和數據表建立命令的預設值。

Python

# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)

R

library(SparkR)
sparkR.session()

# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"

saveAsTable(
  df = df,
  tableName = table_name
)

Scala

// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

// Write the data to a table.
val table_name = "people_10m"

people.write.saveAsTable("people_10m")

SQL

DROP TABLE IF EXISTS people_10m;

CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;

上述作業會使用從數據推斷的架構來建立新的 Managed 數據表 。 如需建立 Delta 數據表時可用選項的相關信息,請參閱 CREATE TABLE

針對受控數據表,Azure Databricks 會決定數據的位置。 若要取得位置,您可以使用 DESCRIBE DETAIL 語句,例如:

Python

display(spark.sql('DESCRIBE DETAIL people_10m'))

R

display(sql("DESCRIBE DETAIL people_10m"))

Scala

display(spark.sql("DESCRIBE DETAIL people_10m"))

SQL

DESCRIBE DETAIL people_10m;

有時候,您可能想要先指定架構再插入數據,以建立資料表。 您可以使用下列 SQL 命令來完成此作業:

CREATE TABLE IF NOT EXISTS people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

CREATE OR REPLACE TABLE people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

在 Databricks Runtime 13.3 LTS 和更新版本中,您可以使用 CREATE TABLE LIKE 來建立新的空白 Delta 數據表,以複製來源 Delta 數據表的架構和數據表屬性。 當將數據表從開發環境升階到生產環境時,這特別有用,例如下列程式代碼範例:

CREATE TABLE prod.people10m LIKE dev.people10m

您也可以使用 DeltaTableBuilder Delta Lake 中的 API 來建立資料表。 相較於 DataFrameWriter API,此 API 可讓您更輕鬆地指定其他資訊,例如數據行批注、數據表屬性和 產生的數據行

重要

這項功能處於公開預覽狀態

Python

# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
  .tableName("default.people10m") \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .execute()

# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .property("description", "table with people data") \
  .location("/tmp/delta/people10m") \
  .execute()

Scala

// Create table in the metastore
DeltaTable.createOrReplace(spark)
  .tableName("default.people10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .property("description", "table with people data")
  .location("/tmp/delta/people10m")
  .execute()

向上插入數據表

若要將一組更新和插入合併至現有的 Delta 數據表,您可以使用 MERGE INTO 語句。 例如,下列語句會從源數據表取得數據,並將它合併至目標 Delta 數據表。 當這兩個數據表中有相符的數據列時,Delta Lake 會使用指定的表達式來更新數據行。 當沒有相符的數據列時,Delta Lake 會加入新的數據列。 這項作業稱為 upsert

CREATE OR REPLACE TEMP VIEW people_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

如果您指定 *,這會更新或插入目標數據表中的所有數據行。 這假設源數據表的數據行與目標數據表中的數據行相同,否則查詢會擲回分析錯誤。

當您執行 INSERT 作業時,您必須為資料表中的每個資料行指定值(例如,當現有數據集中沒有相符的數據列時)。 不過,您不需要更新所有值。

若要查看結果,請查詢數據表。

SELECT * FROM people_10m WHERE id >= 9999998

讀取數據表

您可以依資料表名稱或資料表路徑存取 Delta 資料表中的數據,如下列範例所示:

Python

people_df = spark.read.table(table_name)

display(people_df)

## or

people_df = spark.read.load(table_path)

display(people_df)

R

people_df = tableToDF(table_name)

display(people_df)

Scala

val people_df = spark.read.table(table_name)

display(people_df)

\\ or

val people_df = spark.read.load(table_path)

display(people_df)

SQL

SELECT * FROM people_10m;

SELECT * FROM delta.`<path-to-table`;

寫入數據表

Delta Lake 使用標準語法將數據寫入數據表。

若要以不可部分完成的方式將新數據新增至現有的 Delta 數據表,請使用 append 模式,如下列範例所示:

SQL

INSERT INTO people10m SELECT * FROM more_people

Python

df.write.mode("append").saveAsTable("people10m")

Scala

df.write.mode("append").saveAsTable("people10m")

若要以不可部分完成的方式取代數據表中的所有數據,請使用 overwrite 模式,如下列範例所示:

SQL

INSERT OVERWRITE TABLE people10m SELECT * FROM more_people

Python

df.write.mode("overwrite").saveAsTable("people10m")

Scala

df.write.mode("overwrite").saveAsTable("people10m")

更新數據表

您可以更新符合 Delta 數據表中述詞的數據。 例如,在名為 people10m 的數據表或 位於/tmp/delta/people-10m的路徑中,若要從 或 MaleFFemale變更 或 數據行M中的gender縮寫,您可以執行下列命令:

SQL

UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';

UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

從資料表中刪除

您可以從 Delta 數據表移除符合述詞的數據。 例如,在名為 people10m 的數據表或路徑中 /tmp/delta/people-10m,若要刪除與數據行中 birthDate 具有值之人員對應的所有數據列 1955,您可以執行下列動作:

SQL

DELETE FROM people10m WHERE birthDate < '1955-01-01'

DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

重要

delete 從最新版的 Delta 數據表中移除數據,但在明確清除舊版之前,不會從實體記憶體中移除數據。 如需詳細資訊,請參閱 真空

顯示數據表歷程記錄

若要檢視數據表的歷程記錄,請使用 DESCRIBE HISTORY 語句,其會提供數據表版本、作業、使用者等資訊,讓每個寫入數據表。

DESCRIBE HISTORY people_10m

查詢舊版的資料表 (時程移動)

Delta Lake 時間移動可讓您查詢差異數據表的較舊快照集。

若要查詢舊版的數據表,請在 語句中 SELECT 指定版本或時間戳。 例如,若要從上述歷程記錄查詢第0版,請使用:

SELECT * FROM people_10m VERSION AS OF 0

SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

針對時間戳,只接受日期或時間戳字串,例如 "2019-01-01""2019-01-01'T'00:00:00.000Z"

DataFrameReader 選項可讓您從固定為特定數據表版本的 Delta 數據表建立 DataFrame,例如在 Python 中:

df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")

display(df1)

或者,或者:

df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")

display(df2)

如需詳細資訊,請參閱 使用 Delta Lake 數據表歷程記錄

優化數據表

對數據表執行多個變更之後,您可能有許多小型檔案。 若要改善讀取查詢的速度,您可以使用 OPTIMIZE 將小型檔案折疊成較大的檔案:

OPTIMIZE people_10m

依數據行排序 Z 順序

若要進一步改善讀取效能,您可以藉由 Z-Ordering 在相同檔案集中共置相關信息。 Delta Lake 數據略過演算法會自動使用此共同位置,大幅減少需要讀取的數據量。 若要使用 Z 順序數據,您可以指定要在 ZORDER BY 子句中排序的數據行。 例如,若要藉由 gender共同尋找 ,請執行:

OPTIMIZE people_10m
ZORDER BY (gender)

如需執行 OPTIMIZE時可用的一組完整選項,請參閱 在 Delta Lake 上使用優化的壓縮數據檔。

使用清除快照集 VACUUM

Delta Lake 提供讀取的快照集隔離,這表示即使其他使用者或作業正在查詢數據表,也能安全地執行 OPTIMIZE 。 不過,您最終應該清除舊的快照集。 您可以執行 VACUUM 命令來執行此動作:

VACUUM people_10m

如需有效使用 VACUUM 的詳細數據,請參閱 使用真空移除未使用的數據檔。