共用方式為


教學:建立並管理三角洲湖資料表

本教學將使用範例資料示範常見的 Delta 表格操作。 Delta Lake 是優化儲存層,為 Databricks 上的資料表提供基礎。 除非另有說明,Databricks 上的所有資料表皆為 Delta 資料表。

開始之前

若要完成本教學課程,您需要:

  • 允許使用現有的運算資源或建立新的運算資源。 請參閱 計算
  • Unity 目錄權限: USE CATALOGUSE SCHEMA,以及 CREATE TABLE 目錄上的 workspace 權限。 若要設定這些權限,請聯絡 Databricks 管理員或 Unity Catalog 權限與可保護物件

這些範例依賴一個名為 合成人記錄的資料集:10K 到 10M 紀錄。 此資料集包含虛構的人物紀錄,包括姓名、姓氏、性別及年齡。

首先,下載本教學的資料集。

  1. 請造訪 Kaggle 上的 Synthetic Person Records: 10K 到 10M Records 頁面。
  2. 點選 下載 ,然後以 壓縮檔下載資料集。 這會將一個名為 archive.zip 的檔案下載到你的本機。
  3. archive檔案中提取archive.zip資料夾。

接著,將 person_10000.csv 資料集上傳到 Azure Databricks 工作區中的 Unity Catalog 磁碟區。 Azure Databricks 建議將資料上傳到 Unity 目錄卷,因為卷提供了存取、儲存、管理及組織檔案的功能。

  1. 點擊資料圖示開啟目錄總管。目錄請見側邊欄。
  2. 在目錄總管中,點選新增或加碼圖示新增資料建立磁碟區
  3. 為磁碟 my-volume 區命名,並選擇 「管理磁碟區 」作為磁碟類型。
  4. 選擇 workspace 目錄和 default 結構,然後點擊 建立
  5. 打開 my-volume 並點擊 「上傳到此卷」。
  6. 在你本機的 person_10000.csv 資料夾中,拖放或瀏覽並選取 archive 檔案。
  7. 按一下 [上傳] 。

最後,建立一個筆記本來執行範例程式碼。

  1. 在側邊欄點選新增或加號圖示
  2. 點擊筆記本圖示。筆記本建立新的筆記本。
  3. 為筆記本選擇一種語言。

建立數據表

建立一個新的 Unity Catalog 管理資料表,名稱來自 workspace.default.people_10kperson_10000.csv。 Delta Lake 是 Azure Databricks 中所有資料表建立、讀取與寫入指令的預設。

Python

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("age", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/workspace/default/my-volume/person_10000.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()

# If you know the table does not already exist, you can use this command instead.
# df.write.saveAsTable("workspace.default.people_10k")

# View the new table.
df = spark.read.table("workspace.default.people_10k")
display(df)

程式語言 Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("id", IntegerType, true),
  StructField("firstName", StringType, true),
  StructField("lastName", StringType, true),
  StructField("gender", StringType, true),
  StructField("age", IntegerType, true)
))

val df = spark.read
  .format("csv")
  .option("header", true)
  .schema(schema)
  .load("/Volumes/workspace/default/my-volume/person_10000.csv")

// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()

// If you know the table does not already exist, you can use this command instead.
// df.saveAsTable("workspace.default.people_10k")

// View the new table.
val df2 = spark.read.table("workspace.default.people_10k")
display(df2)

SQL

-- Create the table with only the required columns and rename person_id to id.
CREATE OR REPLACE TABLE workspace.default.people_10k AS
SELECT
  person_id AS id,
  firstname,
  lastname,
  gender,
  age
FROM read_files(
  '/Volumes/workspace/default/my-volume/person_10000.csv',
  format => 'csv',
  header => true
);

-- View the new table.
SELECT * FROM workspace.default.people_10k;

有幾種不同的方法可以建立或複製資料表。 如需詳細資訊,請參閱CREATE TABLE

在 Databricks Runtime 13.3 LTS 及以上版本中,你可以用 CREATE TABLE LIKE 來建立一個新的空 Delta 資料表,複製來源 Delta 資料表的結構與資料表屬性。 這在將資料表從開發環境提升到生產環境時非常有用。

CREATE TABLE workspace.default.people_10k_prod LIKE workspace.default.people_10k

重要

這項功能處於公開預覽狀態

使用 DeltaTableBuilderPythonScala 的 API 來建立一個空資料表。 與 和 DataFrameWriter相比DataFrameWriterV2,API DeltaTableBuilder 讓指定額外資訊(如欄位註解、資料表屬性及產生欄位)變得更容易。

Python

from delta.tables import DeltaTable

(
  DeltaTable.createIfNotExists(spark)
    .tableName("workspace.default.people_10k_2")
    .addColumn("id", "INT")
    .addColumn("firstName", "STRING")
    .addColumn("lastName", "STRING", comment="surname")
    .addColumn("gender", "STRING")
    .addColumn("age", "INT")
    .execute()
)

display(spark.read.table("workspace.default.people_10k_2"))

程式語言 Scala

import io.delta.tables.DeltaTable

DeltaTable.createOrReplace(spark)
  .tableName("workspace.default.people_10k")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build()
  )
  .addColumn("gender", "STRING")
  .addColumn("age", "INT")
  .execute()

display(spark.read.table("workspace.default.people_10k"))

向上插入數據表

修改表格中現有的紀錄或新增紀錄,使用稱為 upsert 的操作。 若要將一組更新與插入合併到現有的 Delta 資料表中,請使用 DeltaTable.mergePythonScala 的方法,以及 MERGE INTO SQL 中的語句。

例如,將來源資料表 people_10k_updates 的資料合併到目標 Delta 資料表 workspace.default.people_10k。 當這兩個數據表中有相符的數據列時,Delta Lake 會使用指定的表達式來更新數據行。 當沒有相符的數據列時,Delta Lake 會加入新的數據列。

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from delta.tables import DeltaTable

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("age", IntegerType(), True)
])

data = [
  (10001, 'Billy', 'Luppitt', 'M', 55),
  (10002, 'Mary', 'Smith', 'F', 98),
  (10003, 'Elias', 'Leadbetter', 'M', 48),
  (10004, 'Jane', 'Doe', 'F', 30),
  (10005, 'Joshua', '', 'M', 90),
  (10006, 'Ginger', '', 'F', 16),
]

# Create the source table if it does not exist. Otherwise, replace the existing source table.
people_10k_updates = spark.createDataFrame(data, schema)
people_10k_updates.createOrReplaceTempView("people_10k_updates")

# Merge the source and target tables.
deltaTable = DeltaTable.forName(spark, 'workspace.default.people_10k')

(deltaTable.alias("people_10k")
  .merge(
    people_10k_updates.alias("people_10k_updates"),
    "people_10k.id = people_10k_updates.id")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

# View the additions to the table.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] >= 10001)
display(df_filtered)

程式語言 Scala

import org.apache.spark.sql.types._
import io.delta.tables._

// Define schema
val schema = StructType(Array(
  StructField("id", IntegerType, true),
  StructField("firstName", StringType, true),
  StructField("lastName", StringType, true),
  StructField("gender", StringType, true),
  StructField("age", IntegerType, true)
))

// Create data as Seq of Tuples
val data = Seq(
  (10001, "Billy", "Luppitt", "M", 55),
  (10002, "Mary", "Smith", "F", 98),
  (10003, "Elias", "Leadbetter", "M", 48),
  (10004, "Jane", "Doe", "F", 30),
  (10005, "Joshua", "", "M", 90),
  (10006, "Ginger", "", "F", 16)
)

// Create DataFrame directly from Seq of Tuples
val people_10k_updates = spark.createDataFrame(data).toDF(
  "id", "firstName", "lastName", "gender", "age"
)
people_10k_updates.createOrReplaceTempView("people_10k_updates")

// Merge the source and target tables
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

deltaTable.as("people_10k")
  .merge(
    people_10k_updates.as("people_10k_updates"),
    "people_10k.id = people_10k_updates.id"
  )
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .execute()

// View the additions to the table.
val df = spark.read.table("workspace.default.people_10k")
val df_filtered = df.filter($"id" >= 10001)
display(df_filtered)

SQL

-- Create the source table if it does not exist. Otherwise, replace the existing source table.
CREATE OR REPLACE TABLE workspace.default.people_10k_updates(
  id INT,
  firstName STRING,
  lastName STRING,
  gender STRING,
  age INT
);
-- Insert new data into the source table.
INSERT INTO workspace.default.people_10k_updates VALUES
  (10001, "Billy", "Luppitt", "M", 55),
  (10002, "Mary", "Smith", "F", 98),
  (10003, "Elias", "Leadbetter", "M", 48),
  (10004, "Jane", "Doe", "F", 30),
  (10005, "Joshua", "", "M", 90),
  (10006, "Ginger", "", "F", 16);

-- Merge the source and target tables.
MERGE INTO workspace.default.people_10k AS people_10k
USING workspace.default.people_10k_updates AS people_10k_updates
ON people_10k.id = people_10k_updates.id
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *;

-- View the additions to the table.
SELECT * FROM workspace.default.people_10k WHERE id >= 10001

在 SQL 中, * 運算子會更新或插入目標資料表中的所有欄位,假設來源資料表與目標資料表的欄位相同。 如果目標資料表的欄位不相同,查詢會拋出分析錯誤。 此外,當你執行插入操作時,必須為表格中的每一欄指定一個值。 欄位值可以為空,例如。 '' 執行插入操作時,不需要更新所有值。

讀取數據表

使用資料表名稱或路徑來存取 Delta 資料表中的資料。 要存取 Unity Catalog 管理的資料表,請使用完全限定的資料表名稱。 基於路徑的存取僅支援磁碟區與外部資料表,不支援受管理的資料表。 欲了解更多資訊,請參閱 Unity 目錄卷中的路徑規則與存取

Python

people_df = spark.read.table("workspace.default.people_10k")
display(people_df)

程式語言 Scala

val people_df = spark.read.table("workspace.default.people_10k")
display(people_df)

SQL

SELECT * FROM workspace.default.people_10k;

寫入數據表

Delta Lake 使用標準語法將資料寫入資料表。 若要將新資料加入現有的 Delta 資料表,請使用附加模式。 與上傳不同,寫入資料表不會檢查重複記錄。

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("age", IntegerType(), True)
])

data = [
  (10007, 'Miku', 'Hatsune', 'F', 25)
]

# Create the new data.
df  = spark.createDataFrame(data, schema)

# Append the new data to the target table.
df.write.mode("append").saveAsTable("workspace.default.people_10k")

# View the new addition.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] == 10007)
display(df_filtered)

程式語言 Scala

// Create the new data.
val data = Seq(
  (10007, "Miku", "Hatsune", "F", 25)
)

val df = spark.createDataFrame(data)
  .toDF("id", "firstName", "lastName", "gender", "age")

// Append the new data to the target table
df.write.mode("append").saveAsTable("workspace.default.people_10k")

// View the new addition.
val df2 = spark.read.table("workspace.default.people_10k")
val df_filtered = df2.filter($"id" === 10007)
display(df_filtered)

SQL

CREATE OR REPLACE TABLE workspace.default.people_10k_new (
  id INT,
  firstName STRING,
  lastName STRING,
  gender STRING,
  age INT
);

-- Insert the new data.
INSERT INTO workspace.default.people_10k_new VALUES
  (10007, 'Miku', 'Hatsune', 'F', 25);

-- Append the new data to the target table.
INSERT INTO workspace.default.people_10k
SELECT * FROM workspace.default.people_10k_new;

-- View the new addition.
SELECT * FROM workspace.default.people_10k WHERE id = 10007;

Databricks 筆記型電腦的儲存格輸出最多顯示 10,000 列或 2 MB,以較低者為準。 由於 workspace.default.people_10k 包含超過 10,000 列,筆記本輸出 display(df)中僅出現前 10,000 列。 額外的列存在於表格中,但因限制未被渲染在筆記本輸出中。 你可以透過專門針對它們篩選來查看額外的行。

若要替換資料表中的所有資料,請使用覆寫模式。

Python

df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")

程式語言 Scala

df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")

SQL

INSERT OVERWRITE TABLE workspace.default.people_10k SELECT * FROM workspace.default.people_10k_2

更新數據表

根據謂詞更新 Delta 表格中的資料。 例如,將gender欄中的值從Female變更為F,從Male變更為M,以及從Other變更為O

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

# Declare the predicate and update rows using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'Female'",
  set = { "gender": "'F'" }
)

# Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'Male',
  set = { 'gender': lit('M') }
)

deltaTable.update(
  condition = col('gender') == 'Other',
  set = { 'gender': lit('O') }
)

# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)

程式語言 Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

// Declare the predicate and update rows using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'Female'",
  Map("gender" -> "'F'")
)

// Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
  col("gender") === "Male",
  Map("gender" -> lit("M")));

deltaTable.update(
  col("gender") === "Other",
  Map("gender" -> lit("O")));

// View the updated table.
val df = spark.read.table("workspace.default.people_10k")
display(df)

SQL

-- Declare the predicate and update rows.
UPDATE workspace.default.people_10k SET gender = 'F' WHERE gender = 'Female';
UPDATE workspace.default.people_10k SET gender = 'M' WHERE gender = 'Male';
UPDATE workspace.default.people_10k SET gender = 'O' WHERE gender = 'Other';

-- View the updated table.
SELECT * FROM workspace.default.people_10k;

從資料表中刪除

從 Delta 表格中移除與謂詞相符的資料。 例如,以下程式碼示範了兩種刪除操作:先刪除年齡小於 18 的列,接著刪除年齡低於 21 的列。

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

# Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")

# Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col('age') < '21')

# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)

程式語言 Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

// Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")

// Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col("age") < "21")

// View the updated table.
val df = spark.read.table("workspace.default.people_10k")
display(df)

SQL

-- Delete rows using a predicate.
DELETE FROM workspace.default.people_10k WHERE age < '21';

-- View the updated table.
SELECT * FROM workspace.default.people_10k;

重要

刪除會從最新版本的 Delta 資料表中移除資料,但直到舊版本被明確抽空後,才會從實體儲存中移除資料。 更多資訊請參見 真空

顯示數據表歷程記錄

使用 DeltaTable.historyPythonScala 的方法,以及 DESCRIBE HISTORY SQL 中的陳述式,來查看每次寫入資料表的來源資訊。

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())

程式語言 Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())

SQL

DESCRIBE HISTORY workspace.default.people_10k

利用時間旅行查詢表格的早期版本

使用 Delta Lake 的時間旅行功能查詢較舊的 Delta 表快照。 若要查詢特定版本,請使用該資料表的版本號或時間戳記。 例如,查詢資料表歷史中的版本 0 或時間戳 2026-01-05T23:09:47.000+00:00

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaHistory = deltaTable.history()

# Query using the version number.
display(deltaHistory.where("version == 0"))

# Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))

程式語言 Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
val deltaHistory = deltaTable.history()

// Query using the version number.
display(deltaHistory.where("version == 0"))

// Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))

SQL

-- Query using the version number
SELECT * FROM workspace.default.people_10k VERSION AS OF 0;

-- Query using the timestamp
SELECT * FROM workspace.default.people_10k TIMESTAMP AS OF '2026-01-05T23:09:47.000+00:00';

對於時間戳記,僅接受日期或時間戳記字串。 例如,字串必須格式化為 "2026-01-05T22:43:15.000+00:00""2026-01-05 22:43:15"

使用 DataFrameReader 選項來從針對特定版本或時間戳固定的 Delta 表建立 DataFrame。

Python

# Query using the version number.
df = spark.read.option('versionAsOf', 0).table("workspace.default.people_10k")

# Query using the timestamp.
df = spark.read.option('timestampAsOf', '2026-01-05T23:09:47.000+00:00').table("workspace.default.people_10k")

display(df)

程式語言 Scala

// Query using the version number.
val dfVersion = spark.read
  .option("versionAsOf", 0)
  .table("workspace.default.people_10k")

// Query using the timestamp.
val dfTimestamp = spark.read
  .option("timestampAsOf", "2026-01-05T23:09:47.000+00:00")
  .table("workspace.default.people_10k")

display(dfVersion)
display(dfTimestamp)

SQL

-- Create a temporary view from version 0 of the table.
CREATE OR REPLACE TEMPORARY VIEW people_10k_v0 AS
SELECT * FROM workspace.default.people_10k VERSION AS OF 0;

-- Create a temporary view from a previous timestamp of the table.
CREATE OR REPLACE TEMPORARY VIEW people_10k_t0 AS
SELECT * FROM workspace.default.people_10k TIMESTAMP AS OF '2026-01-05T23:09:47.000+00:00';

SELECT * FROM people_10k_v0;
SELECT * FROM people_10k_t0;

欲了解更多資訊,請參閱 「與表格歷史共事」。

優化數據表

對資料表進行多次變更可能會產生多個小檔案,進而降低讀取查詢效能。 利用優化操作來提升速度,將小檔案合併成較大檔案。 請參閱 OPTIMIZE

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()

程式語言 Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()

SQL

OPTIMIZE workspace.default.people_10k

注意

如果啟用預測優化,你就不需要手動優化。 預測性最佳化會自動管理維護任務。 如需詳細資訊,請參閱 Unity 目錄受控資料表的預測優化

依數據行排序 Z 順序

為了對資料進行z排序並進一步提升讀取效能,請在操作中指定要排序的欄位。 例如,以高基數資料欄firstName進行排列。 欲了解更多關於 z 排序的資訊,請參見 資料跳躍

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")

程式語言 Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")

SQL

OPTIMIZE workspace.default.people_10k
ZORDER BY (firstName)

用真空操作清理快照

Delta Lake 對讀取有快照隔離,這表示在其他使用者或工作查詢資料表時,執行優化操作是安全的。 不過,你最終還是應該清理舊快照,因為這樣可以降低儲存成本、提升查詢效能,並確保資料合規性。 執行 VACUUM 這個操作來清理舊快照。 請參閱 VACUUM

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()

程式語言 Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()

SQL

VACUUM workspace.default.people_10k

欲了解更多有效使用真空操作的資訊,請參閱 「用真空移除未使用的資料檔案」。

後續步驟