教學課程:Delta Lake
本教學課程介紹 Azure Databricks 上的常見 Delta Lake 作業,包括下列各項:
您可以從連結至 Azure Databricks 叢集的筆記本內,執行本文中的範例 Python、R、Scala 和 SQL 程式代碼。 您也可以從 Databricks SQL 中與 SQL 倉儲相關聯的查詢中執行本文中的 SQL 程式代碼。
注意
下列一些程式代碼範例使用由架構(也稱為資料庫)和數據表或檢視所組成的兩層命名空間表示法(例如 , default.people10m
。 若要搭配 Unity 目錄使用這些範例,請將兩層命名空間取代為 Unity Catalog 三層命名空間表示法,其中包含目錄、架構和數據表或檢視表(例如 , main.default.people10m
。
建立數據表
根據預設,在 Azure Databricks 上建立的所有數據表都會使用 Delta Lake。
注意
Delta Lake 是 Azure Databricks 的所有讀取、寫入和數據表建立命令的預設值。
Python
# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)
R
library(SparkR)
sparkR.session()
# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")
# Write the data to a table.
table_name = "people_10m"
saveAsTable(
df = df,
tableName = table_name
)
Scala
// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
// Write the data to a table.
val table_name = "people_10m"
people.write.saveAsTable("people_10m")
SQL
DROP TABLE IF EXISTS people_10m;
CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;
上述作業會使用從數據推斷的架構來建立新的 Managed 數據表 。 如需建立 Delta 數據表時可用選項的相關信息,請參閱 CREATE TABLE。
針對受控數據表,Azure Databricks 會決定數據的位置。 若要取得位置,您可以使用 DESCRIBE DETAIL 語句,例如:
Python
display(spark.sql('DESCRIBE DETAIL people_10m'))
R
display(sql("DESCRIBE DETAIL people_10m"))
Scala
display(spark.sql("DESCRIBE DETAIL people_10m"))
SQL
DESCRIBE DETAIL people_10m;
有時候,您可能想要先指定架構再插入數據,以建立資料表。 您可以使用下列 SQL 命令來完成此作業:
CREATE TABLE IF NOT EXISTS people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
CREATE OR REPLACE TABLE people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
在 Databricks Runtime 13.3 LTS 和更新版本中,您可以使用 CREATE TABLE LIKE
來建立新的空白 Delta 數據表,以複製來源 Delta 數據表的架構和數據表屬性。 當將數據表從開發環境升階到生產環境時,這特別有用,例如下列程式代碼範例:
CREATE TABLE prod.people10m LIKE dev.people10m
您也可以使用 DeltaTableBuilder
Delta Lake 中的 API 來建立資料表。 相較於 DataFrameWriter API,此 API 可讓您更輕鬆地指定其他資訊,例如數據行批注、數據表屬性和 產生的數據行。
重要
這項功能處於公開預覽狀態。
Python
# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
.tableName("default.people10m") \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.execute()
# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.property("description", "table with people data") \
.location("/tmp/delta/people10m") \
.execute()
Scala
// Create table in the metastore
DeltaTable.createOrReplace(spark)
.tableName("default.people10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.property("description", "table with people data")
.location("/tmp/delta/people10m")
.execute()
向上插入數據表
若要將一組更新和插入合併至現有的 Delta 數據表,您可以使用 MERGE INTO 語句。 例如,下列語句會從源數據表取得數據,並將它合併至目標 Delta 數據表。 當這兩個數據表中有相符的數據列時,Delta Lake 會使用指定的表達式來更新數據行。 當沒有相符的數據列時,Delta Lake 會加入新的數據列。 這項作業稱為 upsert。
CREATE OR REPLACE TEMP VIEW people_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
如果您指定 *
,這會更新或插入目標數據表中的所有數據行。 這假設源數據表的數據行與目標數據表中的數據行相同,否則查詢會擲回分析錯誤。
當您執行 INSERT
作業時,您必須為資料表中的每個資料行指定值(例如,當現有數據集中沒有相符的數據列時)。 不過,您不需要更新所有值。
若要查看結果,請查詢數據表。
SELECT * FROM people_10m WHERE id >= 9999998
讀取數據表
您可以依資料表名稱或資料表路徑存取 Delta 資料表中的數據,如下列範例所示:
Python
people_df = spark.read.table(table_name)
display(people_df)
## or
people_df = spark.read.load(table_path)
display(people_df)
R
people_df = tableToDF(table_name)
display(people_df)
Scala
val people_df = spark.read.table(table_name)
display(people_df)
\\ or
val people_df = spark.read.load(table_path)
display(people_df)
SQL
SELECT * FROM people_10m;
SELECT * FROM delta.`<path-to-table`;
寫入數據表
Delta Lake 使用標準語法將數據寫入數據表。
若要以不可部分完成的方式將新數據新增至現有的 Delta 數據表,請使用 append
模式,如下列範例所示:
SQL
INSERT INTO people10m SELECT * FROM more_people
Python
df.write.mode("append").saveAsTable("people10m")
Scala
df.write.mode("append").saveAsTable("people10m")
若要以不可部分完成的方式取代數據表中的所有數據,請使用 overwrite
模式,如下列範例所示:
SQL
INSERT OVERWRITE TABLE people10m SELECT * FROM more_people
Python
df.write.mode("overwrite").saveAsTable("people10m")
Scala
df.write.mode("overwrite").saveAsTable("people10m")
更新數據表
您可以更新符合 Delta 數據表中述詞的數據。 例如,在名為 people10m
的數據表或 位於/tmp/delta/people-10m
的路徑中,若要從 或 Male
F
Female
變更 或 數據行M
中的gender
縮寫,您可以執行下列命令:
SQL
UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
從資料表中刪除
您可以從 Delta 數據表移除符合述詞的數據。 例如,在名為 people10m
的數據表或路徑中 /tmp/delta/people-10m
,若要刪除與數據行中 birthDate
具有值之人員對應的所有數據列 1955
,您可以執行下列動作:
SQL
DELETE FROM people10m WHERE birthDate < '1955-01-01'
DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
重要
delete
從最新版的 Delta 數據表中移除數據,但在明確清除舊版之前,不會從實體記憶體中移除數據。 如需詳細資訊,請參閱 真空 。
顯示數據表歷程記錄
若要檢視數據表的歷程記錄,請使用 DESCRIBE HISTORY 語句,其會提供數據表版本、作業、使用者等資訊,讓每個寫入數據表。
DESCRIBE HISTORY people_10m
查詢舊版的資料表 (時程移動)
Delta Lake 時間移動可讓您查詢差異數據表的較舊快照集。
若要查詢舊版的數據表,請在 語句中 SELECT
指定版本或時間戳。 例如,若要從上述歷程記錄查詢第0版,請使用:
SELECT * FROM people_10m VERSION AS OF 0
或
SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
針對時間戳,只接受日期或時間戳字串,例如 "2019-01-01"
和 "2019-01-01'T'00:00:00.000Z"
。
DataFrameReader 選項可讓您從固定為特定數據表版本的 Delta 數據表建立 DataFrame,例如在 Python 中:
df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")
display(df1)
或者,或者:
df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")
display(df2)
如需詳細資訊,請參閱 使用 Delta Lake 數據表歷程記錄。
優化數據表
對數據表執行多個變更之後,您可能有許多小型檔案。 若要改善讀取查詢的速度,您可以使用 OPTIMIZE
將小型檔案折疊成較大的檔案:
OPTIMIZE people_10m
依數據行排序 Z 順序
若要進一步改善讀取效能,您可以藉由 Z-Ordering 在相同檔案集中共置相關信息。 Delta Lake 數據略過演算法會自動使用此共同位置,大幅減少需要讀取的數據量。 若要使用 Z 順序數據,您可以指定要在 ZORDER BY
子句中排序的數據行。 例如,若要藉由 gender
共同尋找 ,請執行:
OPTIMIZE people_10m
ZORDER BY (gender)
如需執行 OPTIMIZE
時可用的一組完整選項,請參閱 在 Delta Lake 上使用優化的壓縮數據檔。
使用清除快照集 VACUUM
Delta Lake 提供讀取的快照集隔離,這表示即使其他使用者或作業正在查詢數據表,也能安全地執行 OPTIMIZE
。 不過,您最終應該清除舊的快照集。 您可以執行 VACUUM
命令來執行此動作:
VACUUM people_10m
如需有效使用 VACUUM
的詳細數據,請參閱 使用真空移除未使用的數據檔。