Lakeflow 宣告式管線引進數個新的 SQL 關鍵詞和函式,用於定義管線中具體化檢視和串流數據表。 開發管線的 SQL 支援是以 Spark SQL 的基本概念為基礎,並新增對結構化串流功能的支援。
熟悉 PySpark DataFrame 的使用者可能會偏好使用 Python 開發管線程序代碼。 Python 支援更廣泛的測試和作業,這些測試與作業難以搭配 SQL 實作,例如中繼程式設計作業。 請參閱 使用 Python 開發管線程式代碼。
如需 Lakeflow 宣告式管線 SQL 語法的完整參考,請參閱 Lakeflow 宣告式管線 SQL 語言參考。
用於管線開發的 SQL 基本概念
建立 Lakeflow 宣告式管線數據集的 SQL 程式代碼會 CREATE OR REFRESH
使用 語法,針對查詢結果定義具體化檢視和串流數據表。
STREAM
關鍵詞會指出是否應該使用串流語意來讀取 子句中所SELECT
參考的數據源。
讀取和寫入預設為管線設定期間指定的資料目錄和結構。 請參閱 設定目標目錄和架構。
Lakeflow 宣告式管線原始程式碼與 SQL 腳本大相徑庭:Lakeflow 宣告式管線會在管線中設定的所有原始碼檔案評估所有數據集定義,並在執行任何查詢之前建置數據流圖形。 筆記本或文稿中顯示的查詢順序會定義程式代碼評估的順序,但不會定義查詢執行的順序。
使用 SQL 建立具體化檢視
下列程式代碼範例示範使用 SQL 建立具體化檢視的基本語法:
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
使用 SQL 建立串流數據表
下列程式代碼範例示範使用 SQL 建立串流數據表的基本語法。 讀取串流數據表的來源時, STREAM
關鍵詞會指出使用來源的串流語意。 建立具體化檢視時,請勿使用 STREAM
關鍵詞:
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
備註
使用 STREAM 關鍵詞來使用串流語意從來源讀取。 如果讀取遇到現有記錄的變更或刪除,則會拋出錯誤。 閱讀靜態或只能添加的來源是最安全的。 若要匯入包含變更提交的數據,您可以使用 Python 和 SkipChangeCommits
選項來處理錯誤。
從物件記憶體載入資料
Lakeflow 宣告式管線支援從 Azure Databricks 支援的所有格式載入數據。 請參閱 數據格式選項。
備註
這些範例使用在 /databricks-datasets
自動掛接至工作區下可用的數據。 Databricks 建議使用磁碟區路徑或雲端 URI 來參考儲存在雲端物件記憶體中的數據。 請參閱什麼是 Unity Catalog 磁碟區?。
Databricks 建議在針對儲存在雲端物件儲存中的數據設定增量擷取工作負載時,使用 Auto Loader 和串流數據表。 請參閱 什麼是自動載入器?。
SQL 會使用函 read_files
式來叫用自動載入器功能。 您還必須使用 STREAM
關鍵詞來配置 read_files
的串流讀取。
下列描述在 SQL 中 read_files
的語法:
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM STREAM read_files(
"<file-path>",
[<option-key> => <option_value>, ...]
)
自動載入器的選項是鍵值對。 如需支援的格式和選項的詳細資訊,請參閱 選項。
下列範例會使用自動載入器從 JSON 檔案建立串流資料表:
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
read_files
函式也支援批次語意來建立具體化檢視。 下列範例會使用批次語意來讀取 JSON 目錄,並建立具體化檢視:
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
用預期驗證數據
您可以使用預期來設定及強制執行資料品質條件約束。 請參閱 使用管線期望來管理資料品質。
下列程式代碼定義名為 valid_data
的預期,會在數據擷取期間卸除 Null 的記錄:
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
查詢管線中定義的實體化視圖和串流資料表
下列範例會定義四個資料集:
- 名為
orders
的串流數據表,會載入 JSON 數據。 - 名為
customers
的具體化檢視,可載入 CSV 數據。 - 名為
customer_orders
的具體化檢視,聯結來自orders
和customers
數據集的記錄、將順序時間戳轉換成日期,然後選取customer_id
、order_number
、state
和order_date
字段。 - 名為
daily_orders_by_state
的具體化檢視,會匯總每個州的每日訂單計數。
備註
在查詢管線中的檢視或資料表時,您可以直接指定目錄和架構,也可以使用管線中設定的預設值。 在此範例中,orders
、customers
和 customer_orders
數據表會從您管線設定的預設目錄和架構中寫入和讀取。
舊版發佈模式會使用 LIVE
架構來查詢管線中定義的其他具體化檢視和串流數據表。 在新管線中,LIVE
架構語法會被悄悄忽略。 請參閱 LIVE 架構 (舊版)。
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;
定義私人數據表
您可以在建立具體化檢視或串流數據表時使用 PRIVATE
子句。 當您建立私人數據表時,您會建立數據表,但不要建立數據表的元數據。 子 PRIVATE
句會指示 Lakeflow 宣告式管線建立一個僅供管線使用的資料表,但不應在管線之外被存取。 為了縮短處理時間,專用資料表會在管線的生命週期內持續存在,而不僅僅是一次更新。
私人數據表的名稱可以和目錄中的數據表相同。 如果您在管線內指定數據表的不限定名稱,如果有私用數據表和具有該名稱的目錄數據表,則會使用私用數據表。
私有資料表先前被稱為臨時表。
從實體化視圖或串流表永久刪除記錄
若要永久刪除具有刪除向量的串流表記錄,例如為了符合GDPR規範,必須在物件的基礎 Delta 資料表上執行其他作業。 若要確保從串流資料表刪除記錄,請參閱 從串流資料表永久刪除記錄。
具體化檢視在重新整理時,始終會反映基礎表中的數據。 若要刪除具體化檢視中的數據,您必須從來源刪除數據並重新整理具體化檢視。
將數值參數化以在使用 SQL 宣告資料表或檢視時使用
使用 SET
在宣告資料表或檢視的查詢中指定組態值,包括 Spark 組態。 在 SET
語句之後,您在筆記本中定義的任何數據表或檢視表都可以存取這些已定義的值。 針對 SET
語句之後的任何數據表或檢視執行 Spark 查詢時,會使用使用 SET 語句指定的任何 Spark 組態。 若要讀取查詢中的組態值,請使用字串插補語法 ${}
。 下列範例會設定名為 startDate
的 Spark 組態值,並在查詢中使用該值:
SET startDate='2025-01-01';
CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}
若要指定多個組態值,請針對每個值使用不同的 SET
語句。
局限性
此 PIVOT
子句不受支援。 Spark 中的 pivot
作業需要急切載入輸入數據以計算輸出架構。 Lakeflow 宣告式管線不支援此功能。
備註
建立具體化檢視的 CREATE OR REFRESH LIVE TABLE
語法已被取代。 請改用 CREATE OR REFRESH MATERIALIZED VIEW
。