共用方式為


教學課程:使用 Lakeflow Spark 宣告式管線建置 ETL 管線

本教學課程說明如何使用 Lakeflow Spark 宣告式管線和自動載入器來建立和部署 ETL (擷取、轉換和載入) 管線,以進行資料協調流程。 ETL 管線會實作從來源系統讀取數據的步驟、根據需求轉換該數據,例如數據品質檢查和記錄重複數據,以及將數據寫入目標系統,例如數據倉儲或數據湖。

在本教學課程中,您將使用管線和自動載入器來:

  • 將原始源數據內嵌至目標數據表。
  • 轉換原始源數據,並將轉換的數據寫入兩個目標具體化檢視。
  • 查詢已轉換的數據。
  • 使用 Databricks 作業將 ETL 管線自動化。

如需管線和自動載入器的詳細資訊,請參閱 Lakeflow Spark 宣告式管線什麼是自動載入器?

需求

您必須滿足下列需求,才能完成本教學課程:

關於數據集

此範例中使用的資料集是百萬首歌資料集的子集,這是當代音樂曲目的特徵和中繼資料集合。 此資料集可在 Azure Databricks 工作區中包含的範例資料集中使用。

步驟 1:建立管線

首先,使用管線語法在檔案 (稱為 原始程式碼) 中定義資料集,以建立管線。 每個原始程式碼檔案只能包含一種語言,但您可以在管線中新增多個語言特定的檔案。 若要深入瞭解,請參閱 Lakeflow Spark 宣告式管線

本教學課程使用無伺服器計算和 Unity 目錄。 針對未指定的所有組態選項,請使用預設設定。 如果您的工作區中未啟用或支援無伺服器計算,您可以使用預設計算設定來完成本教學課程。

若要建立新的管線,請遵循下列步驟:

  1. 在您的工作區中,按一下加號圖示中的新增,然後在側邊欄中選取ETL 管線
  2. 為您的管線提供唯一的名稱。
  3. 在名稱下方,選取您產生之資料的預設目錄和結構描述。 您可以在轉換中指定其他目的地,但本教學課程會使用這些預設值。 您必須擁有您建立之目錄和結構描述的許可權。 請參閱 需求
  4. 在本教學課程中,請選取 [從空白檔案開始]。
  5. [資料夾路徑] 中,指定來源檔案的位置,或接受預設值 (您的使用者資料夾)。
  6. 選擇 PythonSQL 作為第一個來源檔案的語言 (管線可以混合和比對語言,但每個檔案都必須使用單一語言)。
  7. 按一下 ‹選取›。

新管線的管線編輯器隨即顯示。 系統會建立您語言的空白原始檔,為您的第一次轉換做好準備。

步驟 2:開發管線邏輯

在此步驟中,您將使用 Lakeflow 管線編輯器 以互動方式開發和驗證管線的原始程式碼。

程序代碼會使用自動載入器進行累加式數據擷取。 自動載入器會在新檔案抵達雲端物件儲存體時自動偵測並處理。 若要深入瞭解,請參閱 什麼是自動載入器?

系統會自動為管線建立並設定空白原始程式碼檔案。 檔案會在管線的轉換資料夾中建立。 根據預設,轉換資料夾中的所有 *.py 和 *.sql 檔案都是管線來源的一部分。

  1. 將下列程式碼複製並貼到原始檔中。 請務必使用您在步驟 1 中為檔案選取的語言。

    Python

    # Import modules
    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define the path to the source data
    file_path = f"/databricks-datasets/songs/data-001/"
    
    # Define a streaming table to ingest data from a volume
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    @dp.table(
      comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    )
    def songs_raw():
      return (spark.readStream
        .format("cloudFiles")
        .schema(schema)
        .option("cloudFiles.format", "csv")
        .option("sep","\t")
        .load(file_path))
    
    # Define a materialized view that validates data and renames a column
    @dp.materialized_view(
      comment="Million Song Dataset with data cleaned and prepared for analysis."
    )
    @dp.expect("valid_artist_name", "artist_name IS NOT NULL")
    @dp.expect("valid_title", "song_title IS NOT NULL")
    @dp.expect("valid_duration", "duration > 0")
    def songs_prepared():
      return (
        spark.read.table("songs_raw")
          .withColumnRenamed("title", "song_title")
          .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
      )
    
    # Define a materialized view that has a filtered, aggregated, and sorted view of the data
    @dp.materialized_view(
      comment="A table summarizing counts of songs released by the artists who released the most songs each year."
    )
    def top_artists_by_year():
      return (
        spark.read.table("songs_prepared")
          .filter(expr("year > 0"))
          .groupBy("artist_name", "year")
          .count().withColumnRenamed("count", "total_number_of_songs")
          .sort(desc("total_number_of_songs"), desc("year"))
      )
    

    SQL

    -- Define a streaming table to ingest data from a volume
    CREATE OR REFRESH STREAMING TABLE songs_raw
    COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    AS SELECT *
    FROM STREAM read_files(
      '/databricks-datasets/songs/data-001/part*',
      format => "csv",
      header => "false",
      delimiter => "\t",
      schema => """
        artist_id STRING,
        artist_lat DOUBLE,
        artist_long DOUBLE,
        artist_location STRING,
        artist_name STRING,
        duration DOUBLE,
        end_of_fade_in DOUBLE,
        key INT,
        key_confidence DOUBLE,
        loudness DOUBLE,
        release STRING,
        song_hotnes DOUBLE,
        song_id STRING,
        start_of_fade_out DOUBLE,
        tempo DOUBLE,
        time_signature INT,
        time_signature_confidence DOUBLE,
        title STRING,
        year INT,
        partial_sequence STRING
      """,
      schemaEvolutionMode => "none");
    
    -- Define a materialized view that validates data and renames a column
    CREATE OR REFRESH MATERIALIZED VIEW songs_prepared(
    CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL),
    CONSTRAINT valid_title EXPECT (song_title IS NOT NULL),
    CONSTRAINT valid_duration EXPECT (duration > 0)
    )
    COMMENT "Million Song Dataset with data cleaned and prepared for analysis."
    AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year
    FROM songs_raw;
    
    -- Define a materialized view that has a filtered, aggregated, and sorted view of the data
    CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year
    COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs."
    AS SELECT
      artist_name,
      year,
      COUNT(*) AS total_number_of_songs
    FROM songs_prepared
    WHERE year > 0
    GROUP BY artist_name, year
    ORDER BY total_number_of_songs DESC, year DESC;
    

    此原始碼包含三個查詢的程式碼。 您也可以將這些查詢放在個別檔案中,以您偏好的方式組織檔案和程式碼。

  2. 按一下 播放圖示。執行檔案或執行管線,以啟動已連線管線的更新。 如果您的管線中只有一個來源檔案,這些檔案在功能上是等效的。

更新完成時,編輯器將會顯示更新後的管線資訊。

  • 程式碼右側側邊欄中的管線圖表 (DAG) 會顯示三個資料表 songs_raw、 和 songs_preparedtop_artists_by_year
  • 更新的摘要會顯示在管線資產瀏覽器的頂端。
  • 所產生資料表的詳細資料會顯示在底部窗格中,您可以選取其中一個資料表來瀏覽資料表中的資料。

這包括原始和清理的數據,以及一些簡單的分析,以按年份找到頂級藝術家。 在下一個步驟中,您會在管線中的個別檔案中建立臨機操作查詢,以進一步分析。

步驟 3:探索管線所建立的資料集

在此步驟中,您會對 ETL 管線中處理的資料執行臨機操作查詢,以分析 Databricks SQL 編輯器中的歌曲資料。 這些查詢會使用在上一個步驟中建立的備妥記錄。

首先,執行查詢,尋找自1990年以來每年發行最多歌曲的藝術家。

  1. 從管道資產瀏覽器側邊欄中,按一下加號圖示。然後新增探索

  2. 輸入 名稱 ,然後選取探索檔案的 SQL 。 SQL 筆記本會在新 explorations 資料夾中建立。 根據預設,資料夾中的 explorations 檔案不會作為管線更新的一部分執行。 SQL 筆記本具有您可以一起執行或個別執行的儲存格。

  3. 若要建立 1990 年之後每年發行最多歌曲的藝人表格,請在新的 SQL 檔案中輸入下列程式碼 (如果檔案中有範例程式碼,請取代它)。 因為此筆記本不是管線的一部分,所以它不會使用預設目錄和資料庫結構。 將 <catalog>.<schema> 替換為您用作管線預設值的目錄和結構描述。

    -- Which artists released the most songs each year in 1990 or later?
    SELECT artist_name, total_number_of_songs, year
      -- replace with the catalog/schema you are using:
      FROM <catalog>.<schema>.top_artists_by_year
      WHERE year >= 1990
      ORDER BY total_number_of_songs DESC, year DESC;
    
  4. 按一下 [播放] 圖示。 或按下 Shift + Enter 以執行此查詢。

現在,執行另一個查詢,以 4/4 節拍和舞蹈節奏尋找歌曲。

  1. 將下列程式碼新增至相同檔案中的下一個儲存格。 再次以您用作管線預設值的目錄和結構描述取代 <catalog>.<schema>

    -- Find songs with a 4/4 beat and danceable tempo
    SELECT artist_name, song_title, tempo
      -- replace with the catalog/schema you are using:
      FROM <catalog>.<schema>.songs_prepared
      WHERE time_signature = 4 AND tempo between 100 and 140;
    
  2. 按一下 [播放] 圖示。 或按下 Shift + Enter 以執行此查詢。

步驟 4:建立作業以執行管線

接下來,建立工作流程,以使用依排程執行的 Databricks 作業,將資料擷取、處理和分析步驟自動化。

  1. 在編輯器頂端,選擇 Schedule (排程 ) 按鈕。
  2. 如果出現 Schedules (排程) 對話方塊,請選擇 Add schedule (新增排程)。
  3. 這會開啟 [新增排程 ] 對話方塊,您可以在其中建立作業,以依排程執行管線。
  4. 可以選擇為任務命名。
  5. 依預設,排程設定為每天執行一次。 您可以接受這一點,也可以設定自己的時間表。 選擇 進 可讓您選擇設定工作執行的特定時間。 選取 [ 更多選項 ] 可讓您在工作執行時建立通知。
  6. 選取 [ 建立 ] 以套用變更並建立作業。

現在,作業將每天執行,讓您的管線保持最新狀態。 您可以再次選擇 Schedule (排程 ) 以檢視排程清單。 您可以從該對話方塊管理管線的排程,包括新增、編輯或移除排程。

按一下排程 (或作業) 的名稱,會帶您前往 [作業與管線 ] 清單中的作業頁面。 您可以從該處檢視有關工作執行的詳細資訊,包括執行歷史記錄,或使用 [立即執行] 按鈕立即執行工作。

如需作業執行的詳細資訊,請參閱 Lakeflow 作業的監視和可觀察性

深入了解