分享方式:


教學課程:執行您的第一個差異即時資料表管線(機器翻譯)

本教學課程說明如何透過 Databricks 筆記本中的程式碼設定 Delta Live Tables 管線,並藉由觸發管線更新來執行管線。 本教學課程包含範例管線,可透過 PythonSQL 介面,使用範例程式碼擷取及處理範例資料集。 您也可以使用本教學課程中的指示,使用任何具有正確定義之 Delta Live Tables 語法的筆記本來建立管線。

您可以使用 Azure Databricks 工作區 UI 或自動化工具選項 (例如 API、CLI、Databricks 資產套件,或作為 Databricks 工作流程中的工作) 來設定 Delta Live Tables 管線和觸發更新。 若要熟悉 Delta Live Tables 的功能,Databricks 建議先使用 UI 來建立和執行管線。 此外,當您在 UI 中設定管線時,Delta Live Tables 會產生管線的 JSON 組態,可用來實作程序設計工作流程。

為了示範 Delta Live Tables 功能,本教學課程中的範例會下載公開可用的資料集。 不過,Databricks 有數種方式可連線到資料來源,並擷取管線在實作真實世界使用案例時會使用的資料。 請參閱將資料擷取至 Delta Live Tables

需求

  • 若要啟動管道,則您必須擁有叢集建立權限或定義 Delta Live Tables 叢集之叢集原則的存取權限。 差異即時資料表執行階段會在執行管道之前建立叢集,且如果您沒有正確的權限,則會建立失敗。

  • 若要使用本教學課程中的範例,您的工作區必須啟用 Unity 目錄

  • 您必須在 Unity 目錄中具有下列權限:

    • 適用於 my-volume 磁碟區的 READ VOLUMEWRITE VOLUMEALL PRIVILEGES
    • default 結構描述的 USE SCHEMAALL PRIVILEGES
    • main 目錄的 USE CATALOGALL PRIVILEGES

    若要設定這些權限,請參閱 Databricks 系統管理員或 Unity 目錄權限和安全物件

  • 本教學課程中的範例會使用 Unity 目錄磁碟區來儲存範例資料。 若要使用這些範例,請建立磁碟區,並使用該磁碟區的目錄、結構描述和磁碟區名稱來設定範例所使用的磁碟區路徑。

注意

如果您的工作區未啟用 Unity 目錄,則系統會將不需要 Unity 目錄且具有範例的筆記本附加至本文章。 若要使用這些範例,請在建立管線時選取 Hive metastore 作為儲存體選項。

您要在何處執行 Delta Live Tables 查詢?

Delta Live Tables 查詢主要是在 Databricks 筆記本中實作,但 Delta Live Tables 並非設計為以互動方式在筆記本資料格中執行。 在 Databricks 筆記本中執行包含 Delta Live Tables 語法的儲存格會產生錯誤訊息。 若要執行查詢,您必須將筆記本設定為管線的一部分。

重要

  • 撰寫 Delta Live Tables 的查詢時,您無法依賴筆記本的逐一資料格執行順序。 Delta Live Tables 會評估並執行筆記本中定義的所有程式碼,但執行模型與筆記本 Run all 命令不同。
  • 您無法在單一 Delta Live Tables 原始程式碼檔案中混用語言。 例如,筆記本只能包含 Python 查詢或 SQL 查詢。 如果您必須在管線中使用多種語言,請在管線中使用特定語言專用的筆記本或檔案。

您也可以使用儲存在檔案中的 Python 程式碼。 例如,您可以建立可匯入至 Python 管線的 Python 模組,或定義 Python 使用者定義函式 (UDF) 以用於 SQL 查詢。 若要瞭解如何匯入 Python 模組,請參閱從 Git 資料夾或工作區檔案匯入 Python 模組 (機器翻譯)。 若要瞭解如何使用 Python UDF,請參閱 使用者定義的純量函式 - Python (機器翻譯)。

範例:擷取和處理紐約嬰兒姓名資料

本文中的範例使用公開可用的資料集,其中包含紐約州嬰兒姓名的記錄。 這些範例會示範如何使用 Delta Live Tables 管線執行下列動作:

  • 將公開可用資料集的原始 CSV 資料讀取至資料表。
  • 從原始資料表讀取記錄,並使用 Delta Live Tables 預期來建立包含已清理資料的新資料表。
  • 使用已清理的記錄作為建立衍生資料集之 Delta Live Tables 查詢的輸入。

此程式碼會示範獎牌結構的簡化範例。 請參閱什麼是獎牌 Lakehouse 結構?(機器翻譯)。

我們會針對 PythonSQL 介面提供此範例的實作。 您可以遵循步驟來建立包含範例程式碼的新筆記本,也可以直接跳到建立管線,並使用此頁面提供的其中一個筆記本

使用 Python 實作 Delta Live Tables 管線

建立 Delta Live Tables 資料集的 Python 程式碼必須傳回 DataFrame。 對於不熟悉 Python 和 DataFrame 的使用者,Databricks 建議使用 SQL 介面。 請參閱使用 SQL 實作 Delta Live Tables 管線

所有 Delta Live Tables Python API 都會在 dlt 模組中實作。 使用 Python 實作的 Delta Live Tables 管線程式碼必須明確地匯 dlt 入 Python 筆記本和檔案頂端的模組。 Delta Live Tables 與許多 Python 指令碼有著下列主要不同之處:您不會呼叫執行資料擷取和轉換的函式來建立 Delta Live Tables 資料集。 相反地,Delta Live Tables 會解譯 dlt 模組 (位於載入至管線的所有檔案內) 的裝飾項目函式,並建置資料流程圖表。

若要實作本教學課程中的範例,請將下列 Python 程式碼複製並貼到新的 Python 筆記本中。 依所述的順序,將每個範例程式碼片段新增至筆記本中的獨自資料格。 若要檢閱建立筆記本的選項,請參閱建立筆記本

在使用 Python 介面建立管線時,資料表名稱預設會由函式名稱所定義。 例如,下列 Python 範例會建立三個名為 baby_names_rawbaby_names_preparedtop_baby_names_2021 的資料表。 您可以使用 name 參數覆寫資料表名稱。 請參閱建立 Delta Live Tables 具體化檢視或串流資料表

重要

若要避免管線執行時發生非預期的行為,請勿在定義資料集的函式中包含可能有副作用的程式碼。 若要深入了解,請參閱 Python 參考

匯入 Delta Live Tables 模組

所有 Delta Live Tables Python API 都會在 dlt 模組中實作。 在 Python 筆記本和檔案頂端明確匯入 dlt 模組。

下列範例顯示此匯入作業,以及 pyspark.sql.functions 的匯入陳述式。

import dlt
from pyspark.sql.functions import *

下載資料

若要取得此範例的資料,請下載 CSV 檔案,並儲存在磁碟區中,如下所示:

import os

os.environ["UNITY_CATALOG_VOLUME_PATH"] = "/Volumes/<catalog-name>/<schema-name>/<volume-name>/"
os.environ["DATASET_DOWNLOAD_URL"] = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
os.environ["DATASET_DOWNLOAD_FILENAME"] = "rows.csv"

dbutils.fs.cp(f"{os.environ.get('DATASET_DOWNLOAD_URL')}", f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}")

使用 Unity 目錄磁碟區的目錄、結構描述和磁碟區名稱取代 <catalog-name><schema-name><volume-name>

透過物件儲存體中的檔案建立資料表

Delta Live Tables 支援從 Azure Databricks 支援的所有格式載入資料。 請參閱資料格式選項 (機器翻譯)。

@dlt.table 裝飾項目會指示 Delta Live Tables 建立一個資料表,其中包含函式所傳回的 DataFrame 結果。 在傳回 Spark DataFrame 的任何 Python 函式定義之前新增 @dlt.table 裝飾項目,即可在 Delta Live Tables 中註冊新的資料表。 下列範例示範使用函式名稱作為資料表名稱,並將描述性註解新增至資料表的方法:

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
  df = spark.read.csv(f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}", header=True, inferSchema=True)
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

從管線中的上游資料集新增資料表

您可以使用 dlt.read(),從目前 Delta Live Tables 管線中宣告的其他資料集讀取資料。 以這種方式宣告新的資料表,會建立 Delta Live Tables 在執行更新之前自動解析的相依性。 下列程式碼也包含監視和強制執行預期之資料品質的範例。 請參閱使用 Delta Live Tables 管理資料品質 (機器翻譯)。

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    dlt.read("baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

建立具有擴充資料檢視的資料表

因為 Delta Live Tables 會以一系列相依性關係圖的形式處理管線的更新,所以您可以透過宣告具有特定商務邏輯的資料表,以宣告可驅動儀表板、BI 和分析的高度擴充檢視。

Delta Live Tables 中的資料表在概念上相當於具體化檢視。 與每次查詢檢視時執行邏輯的 Spark 傳統檢視不同,Delta Live Tables 資料表會將最新版的查詢結果儲存在資料檔案中。 因為 Delta Live Tables 會管理管線中所有資料集的更新,所以您可以排程管線更新,以符合具體化檢視的延遲需求,並獲知針對這些資料表的查詢包含最新版本的可用資料。

下列程式碼所定義的資料表,會示範其概念與從管線內上游資料衍生的具體化檢視有何相似之處:

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    dlt.read("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

若要設定使用筆記本的管線,請參閱建立管線

實作具有 SQL 的 Delta Live Tables 管線

Databricks 建議您使用具有 SQL 的 Delta Live Tables 作為 SQL 使用者在 Azure Databricks 上建置新 ETL、擷取和轉換管線的慣用方式。 Delta Live Tables 的 SQL 介面會使用許多新的關鍵字、建構和資料表值函式來擴充標準 Spark SQL。 這些標準 SQL 新增項目可讓使用者宣告資料集之間的相依性,以及部署生產等級基礎結構,而不需要學習新的工具或其他概念。

對於熟悉 Spark DataFrame 的使用者,以及需要支援難以使用 SQL 實作之更廣泛測試和作業 (例如中繼程式設計作業) 的人員,Databricks 建議使用 Python 介面。 請參閱使用 Python 實作 Delta Live Tables 管線

下載資料

若要取得用於此範例的資料,請複製下列程式碼,貼到新的筆記本,然後執行筆記本。 若要檢閱建立筆記本的選項,請參閱建立筆記本

%sh
wget -O "/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv" "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"

使用 Unity 目錄磁碟區的目錄、結構描述和磁碟區名稱取代 <catalog-name><schema-name><volume-name>

透過 Unity 目錄中的檔案建立資料表

請針對此範例的其餘部分複製下列 SQL 程式碼片段,並貼到新的 SQL 筆記本中,與上一節中的筆記本分開。 依所述的順序,將每個範例 SQL 程式碼片段新增至筆記本中的獨自資料格。

Delta Live Tables 支援從 Azure Databricks 支援的所有格式載入資料。 請參閱資料格式選項 (機器翻譯)。

所有 Delta Live Tables SQL 陳述式都會使用 CREATE OR REFRESH 語法和語意。 當您更新管線時,Delta Live Tables 會判斷資料表的邏輯正確結果是否可透過累加處理完成,或是否需要完整重新計算。

下列範例會從儲存在 Unity 目錄磁碟區中的 CSV 檔案載入資料,以建立資料表:

CREATE OR REFRESH MATERIALIZED VIEW baby_names_sql_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count FROM read_files(
  '/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST')

使用 Unity 目錄磁碟區的目錄、結構描述和磁碟區名稱取代 <catalog-name><schema-name><volume-name>

將資料表從上游資料集新增至管線

您可以使用 live 虛擬結構描述,以查詢目前 Delta Live Tables 管線中所宣告之其他資料集的資料。 以這種方式宣告新的資料表,會建立 Delta Live Tables 在執行更新之前自動解析的相依性。 live 結構描述是在 Delta Live Tables 中實作的自訂關鍵字,如果您想要發佈資料集,則可以用其替代目標結構描述。 請參閱 搭配您的 Delta Live Tables 管線 使用 Unity 目錄和 搭配舊版 Hive 中繼存放區使用 Delta Live Tables 管線。

下列程式碼也包含監視和強制執行預期之資料品質的範例。 請參閱使用 Delta Live Tables 管理資料品質 (機器翻譯)。

CREATE OR REFRESH MATERIALIZED VIEW baby_names_sql_prepared(
  CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
  CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
  Year AS Year_Of_Birth,
  First_Name,
  Count
FROM live.baby_names_sql_raw;

建立擴充的資料檢視

因為 Delta Live Tables 會以一系列相依性關係圖的形式處理管線的更新,所以您可以透過宣告具有特定商務邏輯的資料表,以宣告可驅動儀表板、BI 和分析的高度擴充檢視。

下列查詢會使用具體化檢視,從上游資料建立擴充檢視。 與每次查詢檢視時執行邏輯的 Spark 傳統檢視不同,具體化檢視會將最新版的查詢結果儲存在資料檔案中。 因為 Delta Live Tables 會管理管線中所有資料集的更新,所以您可以排程管線更新,以符合具體化檢視的延遲需求,並獲知針對這些資料表的查詢包含最新版本的可用資料。

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_sql_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM live.baby_names_sql_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;

若要設定使用筆記本的管線,請繼續參閱建立管線

建立管線

注意

  • 因為計算資源完全受控於無伺服器 DLT 管線,所以在為管線選取無伺服器時,無法使用計算設定。
  • 如需無伺服器 DLT 管線資格和啟用的相關資訊,請參閱啟用無伺服器計算 (機器翻譯)。

Delta Live Tables 會藉由使用 Delta Live Tables 語法解析筆記本或檔案中定義的相依性來建立管線(稱為 原始程式碼)。 每個原始碼檔案只能包含一種語言,但您可以在管線中混合不同語言的原始程式碼。

  1. 按一下側邊欄中的 [Delta Live Tables],然後按一下 [建立管線]
  2. 為管線取一個名稱。
  3. (選用) 若要使用無伺服器 DLT 管線執行管線,請選取 [無伺服器] 核取方塊。 當您選取 [無伺服器] 時,系統會從UI 移除 [計算] 設定。 請參閱 設定無伺服器差異實時數據表管線
  4. (選用) 選取一個產品版本
  5. 針對 [管線模式] 選取 [觸發]
  6. 設定一或多個包含管線原始程式碼的筆記本。 在 [路徑] 文字方塊中,輸入筆記本的路徑,或按一下 檔案選擇器圖示 以選取筆記本。
  7. 選取管線所發行資料集的目的地,即 Hive中繼存放區或 Unity 目錄。 請參閱發佈資料集
    • Hive 中繼存放區
      • (選用) 輸入用於從管線輸出資料的儲存位置。 如果您將儲存位置保留空白,系統就會使用預設位置 。
      • (選用) 指定要將資料集發佈至 Hive中繼存放區的目標結構描述
    • Unity 目錄:指定要將資料集發佈至 Unity 目錄的目錄目標結構描述
  8. (選用) 如果您尚未選取 [無伺服器],您可以設定管線的計算設定。 若要了解計算設定的選項,請參閱 設定 Delta Live Tables 管線的計算。
  9. (選用) 按一下 [新增通知] 來設定一或多個電子郵件位址,以接收管線事件的通知。 請參閱新增管線事件的電子郵件通知
  10. (選用) 設定管線的進階設定。 若要瞭解進階設定的選項,請參閱 設定 Delta Live Tables 管線
  11. 按一下 [建立]。

在您按一下 [建立] 之後,[管線詳細資料] 頁面隨即出現。 您也可以按一下 [Delta Live Tables] 索引標籤中的管線名稱來存取管線。

啟動管線更新

若要啟動管線更新,請按一下頂端面板中的 Delta Live Tables 啟動圖示 按鈕。 系統會傳回訊息,確認管線正在啟動。

成功啟動更新之後,Delta Live Tables 系統會:

  1. 使用 Delta Live Tables 系統所建立的叢集組態啟動叢集。 您也可以指定自訂叢集組態
  2. 建立目前不存在的任何資料表,並確保任何現有資料表的結構描述都正確無誤。
  3. 使用可獲得的最新資料來更新資料表。
  4. 在更新完成時關閉叢集。

注意

執行模式預設會設定為 [生產],這會為每個更新部署暫時計算資源。 您可以使用 [開發] 模式來變更此行為,在開發和測試期間將相同的計算資源用於多個管線更新。 請參閱開發和生產模式

發佈資料集

您可以將資料表發佈至 Hive中繼存放區或 Unity 目錄,進而在查詢中使用 Delta Live Tables 資料集。 如果您未指定發佈資料的目標,在 Delta Live Tables 管線中建立的資料表只能由該相同管線中的其他作業存取。 請參閱 搭配舊版 Hive 中繼存放 區使用 Delta Live Tables 管線和使用 Unity 目錄搭配您的 Delta Live Tables 管線

範例原始程式碼筆記本

您可以將這些筆記本匯入 Azure Databricks 工作區,並使用這些筆記本來部署 Delta Live Tables 管線。 請參閱建立管線

開始使用 Delta Live Tables Python 筆記本

取得筆記本

開始使用 Delta Live Tables SQL 筆記本

取得筆記本

沒有 Unity 目錄之工作區的範例原始程式碼筆記本

您可以在未啟用 Unity 目錄的情況下,將這些筆記本匯入 Azure Databricks 工作區,並使用這些筆記本來部署 Delta Live Tables 管線。 請參閱建立管線

開始使用 Delta Live Tables Python 筆記本

取得筆記本

開始使用 Delta Live Tables SQL 筆記本

取得筆記本