Delta Live Tables SQL 語言參考

本文提供 Delta Live Tables SQL 程式設計介面的詳細數據。

您可以在 SQL 查詢中使用 Python 使用者定義函數 (UDF),但是您必須在 Python 檔案中定義這些 UDF,才能在 SQL 來源檔案中呼叫它們。 請參閱 使用者定義的純量函式 - Python

限制

不支援 子 PIVOT 句。 pivot Spark 中的作業需要急切載入輸入數據,才能計算輸出的架構。 Delta Live Tables 不支援此功能。

建立差異實時數據表具體化檢視或串流數據表

宣告串流數據表或具體化檢視時,您可以使用相同的基本 SQL 語法(也稱為 LIVE TABLE)。

您只能使用針對串流來源讀取的查詢來宣告串流數據表。 Databricks 建議使用自動載入器從雲端物件記憶體串流擷取檔案。 請參閱 自動載入器 SQL 語法

當您將管線中的其他資料表或檢視指定為串流來源時,您必須在資料集名稱周圍包含 STREAM() 函式。

下列描述使用 SQL 宣告具體化檢視和串流數據表的語法:

CREATE OR REFRESH [TEMPORARY] { STREAMING TABLE | LIVE TABLE } table_name
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  AS select_statement

建立 Delta Live Tables 檢視

下列描述使用 SQL 宣告檢視的語法:

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

自動載入器 SQL 語法

下列描述在 SQL 中使用自動載入器的語法:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM cloud_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

您可以搭配自動載入器使用支援的格式選項。 使用 函式 map() ,您可以將任意數目的選項傳遞至 cloud_files() 方法。 選項是索引鍵/值組,其中索引鍵和值是字串。 如需支援格式和選項的詳細資訊,請參閱 檔格式選項

範例:定義數據表

您可以從外部數據來源或從管線中定義的數據集讀取來建立資料集。 若要從內部數據集讀取,請將 LIVE 關鍵詞前面加上數據集名稱。 下列範例會定義兩個不同的數據集:名為 taxi_raw 的數據表,其接受 JSON 檔案做為輸入來源,而名為 filtered_data 的數據表會 taxi_raw 接受數據表做為輸入:

CREATE OR REFRESH LIVE TABLE taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`

CREATE OR REFRESH LIVE TABLE filtered_data
AS SELECT
  ...
FROM LIVE.taxi_raw

範例:從串流來源讀取

若要從串流來源讀取資料,例如自動載入器或內部數據集,請定義 STREAMING 資料表:

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)

如需串流數據的詳細資訊,請參閱 使用差異即時數據表轉換數據。

控制數據表具體化的方式

資料表也提供其具體化的額外控制:

注意

對於大小小於 1 TB 的數據表,Databricks 建議讓 Delta Live Tables 控制數據組織。 除非您預期數據表成長超過 TB,否則通常不應該指定資料分割數據行。

範例:指定架構和數據分割數據行

當您定義資料表時,您可以選擇性地指定架構。 下列範例會指定目標數據表的架構,包括使用 Delta Lake 產生的 資料行,以及定義資料表的數據分割數據行:

CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

根據預設,如果您未指定架構,Delta Live Tables 會從 table 定義推斷架構。

範例:定義數據表條件約束

注意

數據表條件約束支援處於 公開預覽狀態。 若要定義數據表條件約束,您的管線必須是已啟用 Unity 目錄的管線,並設定為使用 preview 通道。

指定架構時,您可以定義主鍵和外鍵。 條件約束是參考性的,不會強制執行。 下列範例會定義具有主鍵和外鍵條件約束的數據表:

CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING NOT NULL PRIMARY KEY,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
  CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

設定數據表或檢視的組態值

使用 SET 來指定數據表或檢視的組態值,包括 Spark 組態。 您在語句可存取已定義值之後 SET ,於筆記本中定義的任何數據表或檢視表。 針對 SET 語句之後的任何數據表或檢視執行 Spark 查詢時,會使用使用 SET 語句所指定的任何 Spark 組態。 若要讀取查詢中的組態值,請使用字串插補語法 ${}。 下列範例會設定名為 startDate 的Spark組態值,並在查詢中使用該值:

SET startDate='2020-01-01';

CREATE OR REFRESH LIVE TABLE filtered
AS SELECT * FROM src
WHERE date > ${startDate}

若要指定多個組態值,請針對每個值使用不同的 SET 語句。

SQL 屬性

CREATE TABLE 或 VIEW
TEMPORARY

建立數據表,但不會發佈數據表的元數據。 子 TEMPORARY 句會指示 Delta Live Tables 建立可供管線使用的數據表,但不應該在管線外部存取。 為了縮短處理時間,臨時表會保存管線的存留期,而不只是單一更新。
STREAMING

建立數據表,以數據流的形式讀取輸入數據集。 輸入數據集必須是串流數據源,例如自動載入器或 STREAMING 數據表。
PARTITIONED BY

用於分割數據表的一個或多個數據行的選擇性清單。
LOCATION

數據表數據的選擇性儲存位置。 如果未設定,系統會預設為管線儲存位置。
COMMENT

數據表的選擇性描述。
column_constraint

數據行上的選擇性參考主鍵或外鍵 條件約束
table_constraint

數據表上的選擇性參考主鍵或外鍵 條件約束
TBLPROPERTIES

數據表屬性選擇性清單。
select_statement

定義數據表數據集的 Delta Live Tables 查詢。
CONSTRAINT 子句
EXPECT expectation_name

定義資料品質條件約束 expectation_name。 如果未 ON VIOLATION 定義條件約束,請將違反條件約束的數據列新增至目標數據集。
ON VIOLATION

要針對失敗的資料列採取選擇性動作:

* FAIL UPDATE:立即停止管線執行。
* DROP ROW:卸除記錄並繼續處理。

使用差異實時數據表中的 SQL 變更數據擷取

APPLY CHANGES INTO使用 語句來使用 Delta Live Tables CDC 功能,如下所述:

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

您可以使用與非APPLY CHANGES查詢相同的 CONSTRAINT 子句來APPLY CHANGES定義目標的數據質量條件約束。 請參閱 使用 Delta Live Tables 管理數據品質。

注意

UPDATE 事件的預設行為INSERT是從來源向上插入 CDC 事件:更新目標數據表中符合指定索引鍵的任何數據列,或在目標數據表中不存在相符記錄時插入新數據列。 DELETE您可以使用條件來指定APPLY AS DELETE WHEN事件的處理。

重要

您必須宣告目標串流資料表,才能將變更套用至 。 您可以選擇性地指定目標資料表的架構。 指定目標資料表的APPLY CHANGES架構時,您也必須包含 __START_AT 與欄位具有相同資料類型sequence_by的 和 __END_AT 資料行。

請參閱 套用變更 API:簡化差異實時數據表中的異動數據擷取。

子句
KEYS

可唯一識別源數據中數據列的數據行或數據行組合。 這可用來識別哪些 CDC 事件會套用至目標數據表中的特定記錄。

這是必要子句。
IGNORE NULL UPDATES

允許內嵌包含目標數據行子集的更新。 當 CDC 事件符合現有的數據列並指定 IGNORE NULL UPDATES 時,具有 null 的數據行會保留其目標中的現有值。 這也適用於值為的 null巢狀數據行。

這個子句是選擇性的。

預設值是使用值覆寫 null 現有的數據行。
APPLY AS DELETE WHEN

指定應將 CDC 事件視為 DELETE 而非 upsert 的時機。 為了處理順序錯誤的數據,已刪除的數據列會暫時保留為基礎 Delta 數據表中的墓碑,而檢視會在中繼存放區中建立,以篩選掉這些墓碑。 您可以使用 來設定保留間隔
pipelines.cdc.tombstoneGCThresholdInSecondstable 屬性

這個子句是選擇性的。
APPLY AS TRUNCATE WHEN

指定應將 CDC 事件視為完整數據表 TRUNCATE的時機。 因為這個子句會觸發目標數據表的完整截斷,所以應該只用於需要這項功能的特定使用案例。

APPLY AS TRUNCATE WHEN子句僅支援 SCD 類型 1。 SCD 類型 2 不支援截斷。

這個子句是選擇性的。
SEQUENCE BY

指定源數據中 CDC 事件邏輯順序的數據行名稱。 Delta Live Tables 會使用此排序來處理依序抵達的變更事件。

這是必要子句。
COLUMNS

指定要包含在目標數據表中的數據行子集。 您可以:

* 指定要包含之資料列的完整清單: COLUMNS (userId, name, city)
* 指定要排除的資料列清單: COLUMNS * EXCEPT (operation, sequenceNum)

這個子句是選擇性的。

預設值是在未指定 子句時 COLUMNS ,在目標數據表中包含所有數據行。
STORED AS

是否要將記錄儲存為 SCD 類型 1 或 SCD 類型 2。

這個子句是選擇性的。

預設值為 SCD 類型 1。
TRACK HISTORY ON

指定輸出數據行的子集,以在那些指定的數據行有任何變更時產生記錄記錄。 您可以:

* 指定要追蹤之資料列的完整清單: COLUMNS (userId, name, city)
* 指定要從追蹤中排除的資料列清單: COLUMNS * EXCEPT (operation, sequenceNum)

這個子句是選擇性的。 當有任何變更時,預設會追蹤所有輸出資料列的歷程記錄,相當於 TRACK HISTORY ON *