教程:Delta Lake
本教程介绍 Azure Databricks 上的常见 Delta Lake 操作,包括:
可以从附加到 Azure Databricks 计算资源(例如群集)的笔记本内部运行本文中的示例 Python、Scala 和 SQL 代码。 还可以从与 Databricks SQL 中的 SQL 仓库关联的查询中运行本文中的 SQL 代码。
准备源数据
本教程依赖于名为 People 10 M 的数据集。其中包含 1000 万条虚构记录,这些记录保存了有关人员的事实,例如名字和姓氏、出生日期和工资。 本教程假设此数据集位于与目标 Azure Databricks 工作区关联的 Unity Catalog 卷中。
若要获取本教程的 People 10 M 数据集,请执行以下操作:
- 转到 Kaggle 中的 People 10 M 页面。
- 单击“下载”,将名为
archive.zip
的文件下载到本地计算机。 - 提取
archive.zip
文件中名为export.csv
的文件。export.csv
文件包含本教程的数据。
若要将 export.csv
文件上传到卷,请执行以下操作:
- 在边栏上,单击“目录”。
- 在“目录资源管理器”中,浏览并打开要将
export.csv
文件上传到的卷。 - 单击“上传到此卷”。
- 在本地计算机上拖放或者浏览并选择
export.csv
文件。 - 单击“上载” 。
在以下代码示例中,请将 /Volumes/main/default/my-volume/export.csv
替换为目标卷中 export.csv
文件的路径。
创建表
默认情况下,在 Azure Databricks 上创建的所有表都使用 Delta Lake。 Databricks 建议使用 Unity Catalog 托管表。
在前面的代码示例和以下代码示例中,请将表名 main.default.people_10m
替换为 Unity Catalog 中的目标三部分目录、架构和表名。
注意
Delta Lake 是 Azure Databricks 所有读取、写入和表创建命令的默认值。
Python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", TimestampType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")
# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")
// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")
SQL
CREATE OR REPLACE TABLE main.default.people_10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
);
COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );
上述操作将创建新的托管表。 有关创建 Delta 表时的可用选项的信息,请参阅 CREATE TABLE。
在 Databricks Runtime 13.3 LTS 及更高版本中,可以使用 CREATE TABLE LIKE 创建一个新的空 Delta 表,该表会复制源 Delta 表的架构和表属性。 这在将表从开发环境提升到生产环境时特别有用,如以下代码示例所示:
CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m
若要创建空表,还可以使用 Delta Lake 中适用于 Python 和 Scala 的 DeltaTableBuilder
API。 与等效的 DataFrameWriter API 相比,这些 API 可以更轻松地指定其他信息,例如列注释、表属性和生成的列。
重要
此功能目前以公共预览版提供。
Python
DeltaTable.createIfNotExists(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
Scala
DeltaTable.createOrReplace(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
在表中更新插入
若要将一组更新和插入操作合并到现有的 Delta 表中,请使用适用于 Python 和 Scala 的 DeltaTable.merge
方法,以及适用于 SQL 的 MERGE INTO 语句。 例如,以下示例从源表中获取数据并将其合并到目标 Delta 表中。 如果两个表中有一个匹配行,Delta Lake 会使用给定的表达式更新数据列。 如果没有匹配行,Delta Lake 会添加一个新行。 此操作称为“upsert”。
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", DateType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
data = [
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]
people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")
# ...
from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')
(deltaTable.alias("people_10m")
.merge(
people_10m_updates.alias("people_10m_updates"),
"people_10m.id = people_10m_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
Scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val data = Seq(
Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)
val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")
// ...
import io.delta.tables.DeltaTable
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.as("people_10m")
.merge(
people_10m_updates.as("people_10m_updates"),
"people_10m.id = people_10m_updates.id"
)
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
SQL
CREATE OR REPLACE TEMP VIEW people_10m_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
在 SQL 中,如果指定 *
,则会在目标表中更新或插入所有列,并假设源表具有与目标表相同的列。 如果目标表没有相同的列,查询将引发分析错误。
执行插入操作时必须为表中的每个列指定一个值(例如,当现有数据集中没有匹配行时,必须这样做)。 但是,你不需要更新所有值。
若要查看结果,请查询表。
Python
df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)
Scala
val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)
SQL
SELECT * FROM main.default.people_10m WHERE id >= 9999998
读取表
可以按表名或表路径访问 Delta 表中的数据,如以下示例中所示:
Python
people_df = spark.read.table("main.default.people_10m")
display(people_df)
Scala
val people_df = spark.read.table("main.default.people_10m")
display(people_df)
SQL
SELECT * FROM main.default.people_10m;
写入到表
Delta Lake 使用标准语法将数据写入表中。
若要以原子方式将新数据添加到现有 Delta 表,请使用追加模式,如以下示例中所示:
Python
df.write.mode("append").saveAsTable("main.default.people_10m")
Scala
df.write.mode("append").saveAsTable("main.default.people_10m")
SQL
INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people
若要替换表中的所有数据,请使用覆盖模式,如以下示例中所示:
Python
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
Scala
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
SQL
INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people
更新表
可以在 Delta 表中更新与谓词匹配的数据。 例如,在示例 people_10m
表中,若要将 gender
列中的缩写从 M
或 F
更改为 Male
或 Female
,可运行以下命令:
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
)
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
SQL
UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';
从表中删除
可以从 Delta 表中删除与谓词匹配的数据。 例如,在示例 people_10m
表中,若要删除与 1955
之前的 birthDate
列中具有某个值的人员对应的所有行,可运行以下命令:
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
SQL
DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'
重要
删除操作会从最新版本的 Delta 表中删除数据,但是直到显式删除旧的版本后才能从物理存储中删除数据。 有关详细信息,请参阅清空。
显示表历史记录
若要查看表的历史记录,请使用适用于 Python 和 Scala 的 DeltaTable.history
方法以及适用于 SQL 中的 DESCRIBE HISTORY 语句,该语句提供对表进行的每次写入的出处信息,包括表版本、操作、用户等。
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
SQL
DESCRIBE HISTORY main.default.people_10m
查询表的旧版本(按时间顺序查看)
Delta Lake 按时间顺序查看允许你查询 Delta 表的旧快照。
若要查询表的旧版本,请指定表的版本或时间戳。 例如,若要从前面的历史记录中查询版本 0 或时间戳 2024-05-15T22:43:15.000+00:00Z
,请使用以下命令:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
对于时间戳,仅接受日期或时间戳字符串,例如 "2024-05-15T22:43:15.000+00:00"
或 "2024-05-15 22:43:15"
。
使用 DataFrameReader 选项,可以从固定到表的特定版本或时间戳的 Delta 表创建数据帧,例如:
Python
df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")
display(df)
Scala
val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")
display(df)
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'
有关详细信息,请参阅使用 Delta Lake 表历史记录。
优化表
对表执行多个更改后,可能会有很多小文件。 为了提高读取查询的速度,可以使用优化操作将小文件折叠为较大的文件:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
SQL
OPTIMIZE main.default.people_10m
按列进行 Z 排序
为了进一步提高读取性能,你可以通过 Z 排序将相关的信息放置在同一组文件中。 Delta Lake 数据跳过算法使用此并置来大幅减少需要读取的数据量。 若要对数据进行 Z 排序,需要在 Z 排序依据操作中指定要排序的列。 例如,若要按 gender
进行并置,请运行:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
SQL
OPTIMIZE main.default.people_10m
ZORDER BY (gender)
有关运行优化操作时可用的完整选项集,请参阅优化数据文件布局。
使用 VACUUM
清理快照
Delta Lake 为读取提供快照隔离,这意味着即使其他用户或作业正在查询表,也可以安全地运行优化操作。 不过,最终你应该清除旧快照。 可以通过运行 vacuum 操作来实现此目的:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
SQL
VACUUM main.default.people_10m
有关有效使用 vacuum 操作的详细信息,请参阅使用 vacuum 删除未使用的数据文件。