在 Databricks 中建置端對端數據管線

本文說明如何建立和部署端對端數據處理管線,包括如何內嵌未經處理的數據、轉換數據,以及對已處理的數據執行分析。

注意

雖然本文示範如何使用 Databricks 筆記本和 Azure Databricks 作業來建立完整的數據管線,以協調工作流程,但 Databricks 建議使用 Delta Live Tables,這是一個宣告式介面,可用來建置可靠、可維護且可測試的數據處理管線。

什麼是資料管線?

數據管線會實作從來源系統移動數據、根據需求轉換數據,以及將數據儲存在目標系統中所需的步驟。 數據管線包含將原始數據轉換成用戶可取用之備妥數據所需的所有程式。 例如,數據管線可能會準備數據,讓數據分析師和數據科學家可以透過分析和報告從數據擷取價值。

擷取、轉換和載入 (ETL) 工作流程是數據管線的常見範例。 在 ETL 處理中,數據會從來源系統擷取並寫入暫存區域、根據需求轉換(確保數據品質、重複數據刪除記錄等等),然後寫入至數據倉儲或數據湖等目標系統。

數據管線步驟

為了協助您開始在 Azure Databricks 上建置數據管線,本文中包含的範例會逐步引導您建立數據處理工作流程:

  • 使用 Azure Databricks 功能來探索原始數據集。
  • 建立 Databricks 筆記本以內嵌原始源數據,並將原始數據寫入目標數據表。
  • 建立 Databricks 筆記本來轉換原始源數據,並將轉換的數據寫入目標數據表。
  • 建立 Databricks 筆記本來查詢已轉換的數據。
  • 使用 Azure Databricks 作業將數據管線自動化。

需求

  • 您已登入 Azure Databricks,並在 資料科學 和工程工作區中。
  • 您有權 建立叢集存取叢集
  • (選擇性)若要將數據表發佈至 Unity 目錄,您必須在 Unity 目錄中建立 目錄架構

範例:百萬首歌數據集

此範例中使用的數據集是百萬首歌數據集子集,這是當代音樂曲目的特徵和元數據集合。 此數據集可在 Azure Databricks 工作區中包含的範例數據集 中使用。

步驟 1:建立叢集

若要在此範例中執行數據處理和分析,請建立叢集以提供執行命令所需的計算資源。

注意

由於此範例會使用儲存在 DBFS 中的範例數據集,並建議將數據表保存到 Unity 目錄,因此您會建立以單一使用者存取模式設定的叢集。 單一使用者存取模式提供 DBFS 的完整存取權,同時啟用 Unity 目錄的存取權。 請參閱 DBFS 和 Unity 目錄的最佳做法。

  1. 按兩下 提要欄位中的[計算 ]。
  2. 在 [計算] 頁面上,按兩下 [ 建立叢集]。
  3. 在 [新增叢集] 頁面上,輸入叢集的唯一名稱。
  4. [存取模式] 中,選取 [ 單一使用者]。
  5. 在 [ 單一用戶或服務主體存取] 中,選取您的用戶名稱。
  6. 保留其餘值的默認狀態,然後按兩下 [ 建立叢集]。

若要深入瞭解 Databricks 叢集,請參閱 計算

步驟 2:探索源數據

若要瞭解如何使用 Azure Databricks 介面來探索原始源數據,請參閱 探索數據管線的源數據。 如果您想要直接移至內嵌和準備數據,請繼續進行 步驟 3:內嵌原始數據

步驟 3:內嵌原始數據

在此步驟中,您會將原始數據載入數據表,使其可供進一步處理。 若要管理 Databricks 平臺上的數據資產,例如數據表,Databricks 建議 Unity 目錄。 不過,如果您沒有建立必要目錄和架構以將數據表發行至 Unity 目錄的許可權,您仍然可以將數據表發佈至 Hive 中繼存放區來完成下列步驟。

若要內嵌數據,Databricks 建議使用 自動載入器。 自動載入器會在抵達雲端物件記憶體時自動偵測並處理新檔案。

您可以設定自動載入器來自動偵測已載入數據的架構,讓您不需要明確宣告數據架構並隨著新數據行導入而演進數據表架構,即可初始化數據表。 這樣就不需要在一段時間內手動追蹤和套用架構變更。 Databricks 在使用自動載入器時建議架構推斷。 不過,如數據探索步驟所示,歌曲數據不包含標頭資訊。 因為標頭未與數據一起儲存,因此您必須明確地定義架構,如下一個範例所示。

  1. 在提要欄中,按兩下 New Icon[新增 ],然後從功能表中選取 [筆記本 ]。 [ 建立筆記本 ] 對話框隨即出現。

  2. 輸入筆記本名稱,例如 Ingest songs data。 預設情況:

    • Python 是選取的語言。
    • 筆記本會附加至您所使用的最後一個叢集。 在此情況下,您在步驟 1:建立叢集中 建立的叢集
  3. 在筆記本的第一個數據格中輸入下列內容:

    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define variables used in the code below
    file_path = "/databricks-datasets/songs/data-001/"
    table_name = "<table-name>"
    checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data"
    
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    (spark.readStream
      .format("cloudFiles")
      .schema(schema)
      .option("cloudFiles.format", "csv")
      .option("sep","\t")
      .load(file_path)
      .writeStream
      .option("checkpointLocation", checkpoint_path)
      .trigger(availableNow=True)
      .toTable(table_name)
    )
    

    如果您使用 Unity 目錄,請將 取代<table-name>為目錄、架構和資料表名稱,以包含內嵌的記錄(例如, 。 data_pipelines.songs_data.raw_song_data 否則,請將 取代 <table-name> 為包含內嵌記錄的數據表名稱,例如 raw_song_data

    <checkpoint-path>取代為 DBFS 中目錄的路徑,以維護檢查點檔案, 例如 /tmp/pipeline_get_started/_checkpoint/song_data

  4. 按兩下 Run Menu,然後選取[ 執行儲存格]。 此範例會使用 中的 README資訊定義數據架構,內嵌 中 file_path所含所有檔案的歌曲數據,並將數據寫入 指定的 table_name數據表。

步驟 4:準備原始數據

若要準備原始數據以供分析,下列步驟會篩選掉不必要的數據行,並新增包含新記錄建立時間戳的新字段,以轉換原始歌曲數據。

  1. 在提要欄中,按兩下 New Icon[新增 ],然後從功能表中選取 [筆記本 ]。 [ 建立筆記本 ] 對話框隨即出現。

  2. 輸入筆記本的名稱。 例如: Prepare songs data 。 將預設語言變更為 SQL

  3. 在筆記本的第一個數據格中輸入下列內容:

    CREATE OR REPLACE TABLE
      <table-name> (
        artist_id STRING,
        artist_name STRING,
        duration DOUBLE,
        release STRING,
        tempo DOUBLE,
        time_signature DOUBLE,
        title STRING,
        year DOUBLE,
        processed_time TIMESTAMP
      );
    
    INSERT INTO
      <table-name>
    SELECT
      artist_id,
      artist_name,
      duration,
      release,
      tempo,
      time_signature,
      title,
      year,
      current_timestamp()
    FROM
      <raw-songs-table-name>
    

    如果您使用 Unity 目錄,請將 取代 <table-name> 為目錄、架構和資料表名稱,以包含篩選和轉換的記錄(例如 , data_pipelines.songs_data.prepared_song_data。 否則,請將 取代 <table-name> 為資料表的名稱,以包含已篩選和轉換的記錄(例如 , prepared_song_data

    將取代 <raw-songs-table-name> 為包含上一個步驟中內嵌之原始歌曲記錄的數據表名稱。

  4. 按兩下 Run Menu,然後選取[ 執行儲存格]。

步驟 5:查詢轉換的數據

在此步驟中,您會新增查詢來分析歌曲數據,以擴充處理管線。 這些查詢會使用在上一個步驟中建立的備妥記錄。

  1. 在提要欄中,按兩下 New Icon[新增 ],然後從功能表中選取 [筆記本 ]。 [ 建立筆記本 ] 對話框隨即出現。

  2. 輸入筆記本的名稱。 例如: Analyze songs data 。 將預設語言變更為 SQL

  3. 在筆記本的第一個數據格中輸入下列內容:

    -- Which artists released the most songs each year?
    SELECT
      artist_name,
      count(artist_name)
    AS
      num_songs,
      year
    FROM
      <prepared-songs-table-name>
    WHERE
      year > 0
    GROUP BY
      artist_name,
      year
    ORDER BY
      num_songs DESC,
      year DESC
    

    將取代 <prepared-songs-table-name> 為包含備妥數據的數據表名稱。 例如: data_pipelines.songs_data.prepared_song_data

  4. 單擊Down Caret單元格動作功能表中的 [新增儲存格下方],然後在新儲存格中輸入下列內容:

     -- Find songs for your DJ list
     SELECT
       artist_name,
       title,
       tempo
     FROM
       <prepared-songs-table-name>
     WHERE
       time_signature = 4
       AND
       tempo between 100 and 140;
    

    將取代 <prepared-songs-table-name> 為上一個步驟中建立之備妥數據表的名稱。 例如: data_pipelines.songs_data.prepared_song_data

  5. 若要執行查詢並檢視輸出,請按兩下 [ 全部執行]。

步驟 6:建立 Azure Databricks 作業以執行管線

您可以建立工作流程,以使用 Azure Databricks 作業自動執行數據擷取、處理和分析步驟。

  1. 在您的 資料科學 與工程工作區中,執行下列其中一項作業:
    • 點選 Jobs Icon提要欄位中的 [工作流程 ],然後按下 Create Job Button
    • 在提要欄位中,按兩下 New Icon[新增 ],然後選取 [ 作業]。
  2. 在 [工作] 索引標籤上的 [工作] 對話框中,將 [新增作業的名稱...] 取代為您的作業名稱。 例如,「歌曲工作流程」。
  3. 在 [ 工作名稱] 中,輸入第一個工作的名稱, Ingest_songs_data例如 。
  4. [類型],選取 [筆記本 ] 工作類型。
  5. [來源] 中,選取 [ 工作區]。
  6. 使用檔案瀏覽器來尋找數據擷取筆記本、按兩下筆記本名稱,然後按兩下 [ 確認]。
  7. 在 [ 叢集] 中,選取 [Shared_job_cluster ] 或您在步驟中建立的 Create a cluster 叢集。
  8. 按一下 [建立]
  9. 按兩下 Add Task Button 您剛才建立的工作下方,然後選取 [筆記本]。
  10. 在 [ 工作名稱] 中,輸入工作的名稱,例如 Prepare_songs_data
  11. [類型],選取 [筆記本 ] 工作類型。
  12. [來源] 中,選取 [ 工作區]。
  13. 使用檔案瀏覽器來尋找數據準備筆記本、按下筆記本名稱,然後按兩下 [ 確認]。
  14. 在 [ 叢集] 中,選取 [Shared_job_cluster ] 或您在步驟中建立的 Create a cluster 叢集。
  15. 按一下 [建立]
  16. 按兩下 Add Task Button 您剛才建立的工作下方,然後選取 [筆記本]。
  17. 在 [ 工作名稱] 中,輸入工作的名稱,例如 Analyze_songs_data
  18. [類型],選取 [筆記本 ] 工作類型。
  19. [來源] 中,選取 [ 工作區]。
  20. 使用檔案瀏覽器來尋找數據分析筆記本、按下筆記本名稱,然後按下 [ 確認]。
  21. 在 [ 叢集] 中,選取 [Shared_job_cluster ] 或您在步驟中建立的 Create a cluster 叢集。
  22. 按一下 [建立]
  23. 若要執行工作流程,請按下 Run Now Button。 若要檢視執行的詳細數據,請按下作業執行檢視中執行之 [開始時間] 資料行中的連結。 按兩下每個工作以檢視工作執行的詳細數據。
  24. 若要在工作流程完成時檢視結果,請按下最終數據分析工作。 [ 輸出] 頁面隨即出現,並顯示查詢結果。

步驟 7:排程數據管線作業

注意

為了示範如何使用 Azure Databricks 作業來協調排程的工作流程,本快速入門範例會將擷取、準備和分析步驟分成不同的筆記本,然後每個筆記本會用來在作業中建立工作。 如果所有處理都包含在單一筆記本中,您可以直接從 Azure Databricks Notebook UI 排程筆記本。 請參閱 建立和管理排程的筆記本作業

常見的需求是依排程執行數據管線。 若要定義執行管線之作業的排程:

  1. 按兩下 Jobs Icon提要欄位中的 [工作流程 ]。
  2. 在 [ 名稱] 資料行中,按兩下作業名稱。 側邊面板會顯示 [作業詳細數據]。
  3. 按兩下 [作業詳細資料] 面板中的 [新增觸發程式],然後選取 [觸發程式類型] 中的 [排程]。
  4. 指定期間、開始時間和時區。 選擇性地選取 [顯示 Cron 語法] 複選框,以顯示和編輯在[矽 Cron 語法] 中的排程。
  5. 按一下 [檔案] 。

深入了解