Delta Live Tables SQL 語言參考
本文提供 Delta Live Tables SQL 程式設計介面的詳細數據。
- 如需 Python API 的相關信息,請參閱 Delta Live Tables Python 語言參考。
- 如需 SQL 命令的詳細資訊,請參閱 SQL 語言參考。
您可以在 SQL 查詢中使用 Python 使用者定義函數 (UDF),但是您必須在 Python 檔案中定義這些 UDF,才能在 SQL 來源檔案中呼叫它們。 請參閱 使用者定義的純量函式 - Python。
限制
不支援 子 PIVOT
句。 pivot
Spark 中的作業需要急切載入輸入數據,才能計算輸出的架構。 Delta Live Tables 不支援此功能。
建立差異實時數據表具體化檢視或串流數據表
宣告串流數據表或具體化檢視時,您可以使用相同的基本 SQL 語法(也稱為 LIVE TABLE
)。
您只能使用針對串流來源讀取的查詢來宣告串流數據表。 Databricks 建議使用自動載入器從雲端物件記憶體串流擷取檔案。 請參閱 自動載入器 SQL 語法。
當您將管線中的其他資料表或檢視指定為串流來源時,您必須在資料集名稱周圍包含 STREAM()
函式。
下列描述使用 SQL 宣告具體化檢視和串流數據表的語法:
CREATE OR REFRESH [TEMPORARY] { STREAMING TABLE | LIVE TABLE } table_name
[(
[
col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ],
col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
[ table_constraint ] [, ...]
)]
[USING DELTA]
[PARTITIONED BY (col_name1, col_name2, ... )]
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
AS select_statement
建立 Delta Live Tables 檢視
下列描述使用 SQL 宣告檢視的語法:
CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
[(
[
col_name1 [ COMMENT col_comment1 ],
col_name2 [ COMMENT col_comment2 ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
)]
[COMMENT view_comment]
AS select_statement
自動載入器 SQL 語法
下列描述在 SQL 中使用自動載入器的語法:
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM cloud_files(
"<file-path>",
"<file-format>",
map(
"<option-key>", "<option_value",
"<option-key>", "<option_value",
...
)
)
您可以搭配自動載入器使用支援的格式選項。 使用 函式 map()
,您可以將任意數目的選項傳遞至 cloud_files()
方法。 選項是索引鍵/值組,其中索引鍵和值是字串。 如需支援格式和選項的詳細資訊,請參閱 檔格式選項。
範例:定義數據表
您可以從外部數據來源或從管線中定義的數據集讀取來建立資料集。 若要從內部數據集讀取,請將 LIVE
關鍵詞前面加上數據集名稱。 下列範例會定義兩個不同的數據集:名為 taxi_raw
的數據表,其接受 JSON 檔案做為輸入來源,而名為 filtered_data
的數據表會 taxi_raw
接受數據表做為輸入:
CREATE OR REFRESH LIVE TABLE taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`
CREATE OR REFRESH LIVE TABLE filtered_data
AS SELECT
...
FROM LIVE.taxi_raw
範例:從串流來源讀取
若要從串流來源讀取資料,例如自動載入器或內部數據集,請定義 STREAMING
資料表:
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)
如需串流數據的詳細資訊,請參閱 使用差異即時數據表轉換數據。
控制數據表具體化的方式
資料表也提供其具體化的額外控制:
- 指定如何使用 來分割
PARTITIONED BY
數據表。 您可以使用資料分割來加速查詢。 - 您可以使用 來設定資料表屬性
TBLPROPERTIES
。 請參閱 Delta Live Tables 資料表屬性。 - 使用
LOCATION
設定來設定儲存位置。 如果未設定,數據表數據預設會儲存在管線儲存位置LOCATION
。 - 您可以在架構定義中使用 產生的數據行 。 請參閱 範例:指定架構和數據分割數據行。
注意
對於大小小於 1 TB 的數據表,Databricks 建議讓 Delta Live Tables 控制數據組織。 除非您預期數據表成長超過 TB,否則通常不應該指定資料分割數據行。
範例:指定架構和數據分割數據行
當您定義資料表時,您可以選擇性地指定架構。 下列範例會指定目標數據表的架構,包括使用 Delta Lake 產生的 資料行,以及定義資料表的數據分割數據行:
CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
根據預設,如果您未指定架構,Delta Live Tables 會從 table
定義推斷架構。
範例:定義數據表條件約束
注意
數據表條件約束支援處於 公開預覽狀態。 若要定義數據表條件約束,您的管線必須是已啟用 Unity 目錄的管線,並設定為使用 preview
通道。
指定架構時,您可以定義主鍵和外鍵。 條件約束是參考性的,不會強制執行。 下列範例會定義具有主鍵和外鍵條件約束的數據表:
CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
設定數據表或檢視的組態值
使用 SET
來指定數據表或檢視的組態值,包括 Spark 組態。 您在語句可存取已定義值之後 SET
,於筆記本中定義的任何數據表或檢視表。 針對 SET 語句之後的任何數據表或檢視執行 Spark 查詢時,會使用使用 SET
語句所指定的任何 Spark 組態。 若要讀取查詢中的組態值,請使用字串插補語法 ${}
。 下列範例會設定名為 startDate
的Spark組態值,並在查詢中使用該值:
SET startDate='2020-01-01';
CREATE OR REFRESH LIVE TABLE filtered
AS SELECT * FROM src
WHERE date > ${startDate}
若要指定多個組態值,請針對每個值使用不同的 SET
語句。
SQL 屬性
CREATE TABLE 或 VIEW |
---|
TEMPORARY 建立數據表,但不會發佈數據表的元數據。 子 TEMPORARY 句會指示 Delta Live Tables 建立可供管線使用的數據表,但不應該在管線外部存取。 為了縮短處理時間,臨時表會保存管線的存留期,而不只是單一更新。 |
STREAMING 建立數據表,以數據流的形式讀取輸入數據集。 輸入數據集必須是串流數據源,例如自動載入器或 STREAMING 數據表。 |
PARTITIONED BY 用於分割數據表的一個或多個數據行的選擇性清單。 |
LOCATION 數據表數據的選擇性儲存位置。 如果未設定,系統會預設為管線儲存位置。 |
COMMENT 數據表的選擇性描述。 |
column_constraint 數據行上的選擇性參考主鍵或外鍵 條件約束 。 |
table_constraint 數據表上的選擇性參考主鍵或外鍵 條件約束 。 |
TBLPROPERTIES 數據表屬性的選擇性清單。 |
select_statement 定義數據表數據集的 Delta Live Tables 查詢。 |
CONSTRAINT 子句 |
---|
EXPECT expectation_name 定義資料品質條件約束 expectation_name 。 如果未 ON VIOLATION 定義條件約束,請將違反條件約束的數據列新增至目標數據集。 |
ON VIOLATION 要針對失敗的資料列採取選擇性動作: * FAIL UPDATE :立即停止管線執行。* DROP ROW :卸除記錄並繼續處理。 |
使用差異實時數據表中的 SQL 變更數據擷取
APPLY CHANGES INTO
使用 語句來使用 Delta Live Tables CDC 功能,如下所述:
CREATE OR REFRESH STREAMING TABLE table_name;
APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
您可以使用與非APPLY CHANGES
查詢相同的 CONSTRAINT
子句來APPLY CHANGES
定義目標的數據質量條件約束。 請參閱 使用 Delta Live Tables 管理數據品質。
注意
和 UPDATE
事件的預設行為INSERT
是從來源向上插入 CDC 事件:更新目標數據表中符合指定索引鍵的任何數據列,或在目標數據表中不存在相符記錄時插入新數據列。 DELETE
您可以使用條件來指定APPLY AS DELETE WHEN
事件的處理。
重要
您必須宣告目標串流資料表,才能將變更套用至 。 您可以選擇性地指定目標資料表的架構。 指定目標資料表的APPLY CHANGES
架構時,您也必須包含 __START_AT
與欄位具有相同資料類型sequence_by
的 和 __END_AT
資料行。
請參閱 套用變更 API:簡化差異實時數據表中的異動數據擷取。
子句 |
---|
KEYS 可唯一識別源數據中數據列的數據行或數據行組合。 這可用來識別哪些 CDC 事件會套用至目標數據表中的特定記錄。 這是必要子句。 |
IGNORE NULL UPDATES 允許內嵌包含目標數據行子集的更新。 當 CDC 事件符合現有的數據列並指定 IGNORE NULL UPDATES 時,具有 null 的數據行會保留其目標中的現有值。 這也適用於值為的 null 巢狀數據行。這個子句是選擇性的。 預設值是使用值覆寫 null 現有的數據行。 |
APPLY AS DELETE WHEN 指定應將 CDC 事件視為 DELETE 而非 upsert 的時機。 為了處理順序錯誤的數據,已刪除的數據列會暫時保留為基礎 Delta 數據表中的墓碑,而檢視會在中繼存放區中建立,以篩選掉這些墓碑。 您可以使用 來設定保留間隔pipelines.cdc.tombstoneGCThresholdInSeconds table 屬性。這個子句是選擇性的。 |
APPLY AS TRUNCATE WHEN 指定應將 CDC 事件視為完整數據表 TRUNCATE 的時機。 因為這個子句會觸發目標數據表的完整截斷,所以應該只用於需要這項功能的特定使用案例。APPLY AS TRUNCATE WHEN 子句僅支援 SCD 類型 1。 SCD 類型 2 不支援截斷。這個子句是選擇性的。 |
SEQUENCE BY 指定源數據中 CDC 事件邏輯順序的數據行名稱。 Delta Live Tables 會使用此排序來處理依序抵達的變更事件。 這是必要子句。 |
COLUMNS 指定要包含在目標數據表中的數據行子集。 您可以: * 指定要包含之資料列的完整清單: COLUMNS (userId, name, city) 。* 指定要排除的資料列清單: COLUMNS * EXCEPT (operation, sequenceNum) 這個子句是選擇性的。 預設值是在未指定 子句時 COLUMNS ,在目標數據表中包含所有數據行。 |
STORED AS 是否要將記錄儲存為 SCD 類型 1 或 SCD 類型 2。 這個子句是選擇性的。 預設值為 SCD 類型 1。 |
TRACK HISTORY ON 指定輸出數據行的子集,以在那些指定的數據行有任何變更時產生記錄記錄。 您可以: * 指定要追蹤之資料列的完整清單: COLUMNS (userId, name, city) 。* 指定要從追蹤中排除的資料列清單: COLUMNS * EXCEPT (operation, sequenceNum) 這個子句是選擇性的。 當有任何變更時,預設會追蹤所有輸出資料列的歷程記錄,相當於 TRACK HISTORY ON * 。 |