教學課程:執行端對端 Lakehouse 分析管線

本教學課程說明如何為 Azure Databricks Lakehouse 設定端對端分析管線。

重要

本教學課程使用互動式筆記本,在已啟用 Unity 目錄的叢集上完成 Python 中的一般 ETL 工作。 如果您未使用 Unity 目錄,請參閱 在 Azure Databricks 上執行您的第一個 ETL 工作負載。

本教學課程中的工作

本文結束時,您會感到自在:

  1. 啟動已啟用 Unity 目錄的計算叢集
  2. 建立 Databricks 筆記本
  3. 從 Unity 目錄外部位置寫入和讀取資料。
  4. 使用自動載入器設定 Unity 目錄數據表的累加式數據擷取。
  5. 執行筆記本數據格來處理、查詢和預覽數據
  6. 將筆記本排程為 Databricks 作業
  7. 從 Databricks SQL 查詢 Unity 目錄數據表

Azure Databricks 提供一套生產就緒的工具,可讓數據專業人員快速開發及部署擷取、轉換和載入 (ETL) 管線。 Unity 目錄可讓數據管理人為整個組織的使用者設定及保護記憶體認證、外部位置和資料庫物件的安全。 Databricks SQL 可讓分析師針對生產 ETL 工作負載中使用的相同數據表執行 SQL 查詢,以大規模執行即時商業智慧。

您也可以使用 Delta Live Tables 來建置 ETL 管線。 Databricks 建立了 Delta 實時數據表,以減少建置、部署和維護生產 ETL 管線的複雜性。 請參閱 教學課程:執行您的第一個 Delta Live Tables 管線

需求

注意

如果您沒有叢集控制許可權,只要您可以 存取叢集,您仍然可以完成下列步驟。

步驟 1:建立叢集

若要進行探勘數據分析和數據工程,請建立叢集以提供執行命令所需的計算資源。

  1. 按兩下 計算圖示提要欄位中的[計算 ]。
  2. 按兩下 新增圖示提要欄位中的 [新增 ],然後選取 [ 叢集]。 這會開啟 [新增叢集/計算] 頁面。
  3. 指定叢集的唯一名稱。
  4. 選取 [單一 節點 ] 單選按鈕。
  5. 從 [存取模式] 下拉式清單中選取 [單一使用者]。
  6. 請確定您的電子郵件地址會顯示在 [ 單一使用者 ] 欄位中。
  7. 選取所需的 Databricks 執行時間版本 11.1 或更新版本,以使用 Unity 目錄。
  8. 按兩下 [ 建立計算 ] 以建立叢集。

若要深入瞭解 Databricks 叢集,請參閱 計算

步驟 2:建立 Databricks 筆記本

若要開始在 Azure Databricks 上撰寫和執行互動式程式碼,請建立筆記本。

  1. 按兩下 新增圖示提要欄位中的 [新增 ],然後按兩下 [ 筆記本]。
  2. 在 [建立筆記本] 頁面上:
    • 指定筆記本的唯一名稱。
    • 請確定預設語言已設定為 Python
    • 使用 [連線] 下拉功能表,從 [叢集] 下拉式列表中選取您在步驟 1 中建立的叢集

筆記本會以一個空白數據格開啟。

若要深入瞭解如何建立和管理筆記本,請參閱 管理筆記本

步驟 3:從 Unity 目錄所管理的外部位置寫入和讀取數據

Databricks 建議使用 自動載入器 進行累加式數據擷取。 自動載入器會在抵達雲端物件記憶體時自動偵測並處理新檔案。

使用 Unity 目錄來管理外部位置的安全存取。 具有 READ FILES 外部位置許可權的用戶或服務主體可以使用自動載入器來內嵌數據。

一般而言,數據會因為從其他系統寫入而抵達外部位置。 在此示範中,您可以將 JSON 檔案寫出至外部位置,以模擬數據抵達。

將下列程式代碼複製到筆記本數據格。 將的字串值catalog取代為 具有 和 USE CATALOG 許可權的目錄CREATE CATALOG名稱。 將的字串值external_location取代為、、 和 CREATE EXTERNAL TABLE 權限的外部位置READ FILESWRITE FILES路徑。

外部位置可以定義為整個記憶體容器,但通常會指向容器中巢狀目錄。

外部位置路徑的正確格式為 "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location"


 external_location = "<your-external-location>"
 catalog = "<your-catalog>"

 dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
 display(dbutils.fs.head(f"{external_location}/filename.txt"))
 dbutils.fs.rm(f"{external_location}/filename.txt")

 display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))

執行此儲存格應該列印一行,讀取 12 個字節、列印字串 「Hello world」,並顯示目錄中提供的所有資料庫。 如果您無法執行此儲存格,請確認您位於已啟用 Unity 目錄的工作區中,並要求工作區系統管理員的適當許可權,以完成本教學課程。

下列 Python 程式代碼會使用您的電子郵件位址,在提供的目錄中建立唯一資料庫,以及在提供的外部位置中建立唯一的儲存位置。 執行此儲存格將會移除與本教學課程相關聯的所有數據,讓您能夠以等冪方式執行此範例。 類別已定義並具現化,您將用來模擬從連線系統到來源外部位置的數據批次。

將此程式代碼複製到筆記本中的新數據格,並加以執行以設定您的環境。

注意

此程式代碼中定義的變數應該可讓您安全地執行它,而不會有與現有工作區資產或其他使用者衝突的風險。 執行此程式代碼時,受限的網路或記憶體許可權將會引發錯誤;請連絡您的工作區系統管理員,以針對這些限制進行疑難解答。


from pyspark.sql.functions import col

# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"

spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")

# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)

# Define a class to load batches of data to source
class LoadData:

    def __init__(self, source):
        self.source = source

    def get_date(self):
        try:
            df = spark.read.format("json").load(source)
        except:
            return "2016-01-01"
        batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
        if batch_date.month == 3:
            raise Exception("Source data exhausted")
        return batch_date

    def get_batch(self, batch_date):
        return (
            spark.table("samples.nyctaxi.trips")
            .filter(col("tpep_pickup_datetime").cast("date") == batch_date)
        )

    def write_batch(self, batch):
        batch.write.format("json").mode("append").save(self.source)

    def land_batch(self):
        batch_date = self.get_date()
        batch = self.get_batch(batch_date)
        self.write_batch(batch)

RawData = LoadData(source)

您現在可以將下列程式代碼複製到數據格中並加以執行,來登陸一批數據。 您可以手動執行此數據格,最多 60 次以觸發新的數據抵達。

RawData.land_batch()

步驟 4:設定自動載入器將數據內嵌至 Unity 目錄

Databricks 建議使用 Delta Lake 儲存數據。 Delta Lake 是 開放原始碼 儲存層,可提供 ACID 交易並啟用 Data Lakehouse。 Delta Lake 是 Databricks 中建立之數據表的預設格式。

若要設定自動載入器將資料內嵌至 Unity 目錄資料表,請將下列程式代碼複製並貼到筆記本中的空白資料格中:

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .option("mergeSchema", "true")
  .toTable(table))

若要深入了解自動載入器,請參閱 什麼是自動載入器?

若要深入瞭解使用 Unity 目錄進行結構化串流,請參閱 搭配結構化串流使用 Unity 目錄。

步驟 5:處理數據並與其互動

筆記本會逐一執行邏輯數據格。 使用下列步驟在數據格中執行邏輯:

  1. 若要執行您在上一個步驟中完成的儲存格,請選取儲存格,然後按 SHIFT+ENTER

  2. 若要查詢您剛建立的數據表,請將下列程式代碼複製並貼入空白儲存格,然後按 SHIFT+ENTER 以執行儲存格。

    df = spark.read.table(table_name)
    
  3. 若要預覽 DataFrame 中的數據,請將下列程式代碼複製並貼到空白數據格中,然後按 SHIFT+ENTER 以執行數據格。

    display(df)
    

若要深入瞭解可視化數據的互動式選項,請參閱 Databricks 筆記本中的視覺效果。

步驟 6:排程作業

您可以將 Databricks 筆記本新增為 Databricks 作業中的工作,以執行 Databricks 筆記本作為生產腳本。 在此步驟中,您將建立可手動觸發的新作業。

若要將筆記本排程為工作:

  1. 按兩下標題列右側的 [ 排程 ]。
  2. 輸入作業名稱的唯一名稱
  3. 按兩下 [ 手動]。
  4. 在 [ 叢集 ] 下拉式清單中,選取您在步驟 1 中建立的叢集。
  5. 按一下 [建立]
  6. 在出現的視窗中,按兩下 [ 立即執行]。
  7. 若要查看作業執行結果,請按兩下 外部連結 [上次執行時間戳] 旁的圖示。

如需作業的詳細資訊,請參閱 什麼是 Azure Databricks 作業?

步驟 7:從 Databricks SQL 查詢數據表

USE CATALOG具有目前目錄許可權、USE SCHEMA目前架構許可權和SELECT數據表許可權的任何人都可以從慣用的 Databricks API 查詢數據表的內容。

您需要存取執行中的 SQL 倉儲,才能在 Databricks SQL 中執行查詢。

您稍早在本教學課程中建立的數據表名稱為 target_table。 您可以使用您在第一個數據格中提供的目錄,以及具有父系 e2e_lakehouse_<your-username>的資料庫來查詢它。 您可以使用 目錄總 管來尋找您所建立的資料物件。

其他整合

深入瞭解使用 Azure Databricks 進行數據工程的整合和工具: