本教學課程說明如何使用 Lakeflow Spark 宣告式管線和自動載入器來建立和部署 ETL (擷取、轉換和載入) 管線,以進行資料協調流程。 ETL 管線會實作從來源系統讀取數據的步驟、根據需求轉換該數據,例如數據品質檢查和記錄重複數據,以及將數據寫入目標系統,例如數據倉儲或數據湖。
在本教學課程中,您將使用管線和自動載入器來:
- 將原始源數據內嵌至目標數據表。
- 轉換原始源數據,並將轉換的數據寫入兩個目標具體化檢視。
- 查詢已轉換的數據。
- 使用 Databricks 作業將 ETL 管線自動化。
如需管線和自動載入器的詳細資訊,請參閱 Lakeflow Spark 宣告式管線 和 什麼是自動載入器?
需求
您必須滿足下列需求,才能完成本教學課程:
- 登入 Azure Databricks 工作區。
- 為您的工作區啟用 Unity 目錄 。
- 針對您的帳戶啟用 無伺服器計算 。 無伺服器 Lakeflow Spark 宣告式管線並非在所有工作區區域都可用。 如需了解具有有限區域可用性的功能,可用於哪些區域,請參閱 區域列表。
- 有權 建立計算資源 或 存取計算資源。
- 具有在 目錄中建立新架構的許可權。 必要的權限為
ALL PRIVILEGES或USE CATALOG與 。CREATE SCHEMA - 具有在 現有架構中建立新磁碟區的許可權。 必要的權限為
ALL PRIVILEGES或USE SCHEMA與 。CREATE VOLUME
關於數據集
此範例中使用的資料集是百萬首歌資料集的子集,這是當代音樂曲目的特徵和中繼資料集合。 此資料集可在 Azure Databricks 工作區中包含的範例資料集中使用。
步驟 1:建立管線
首先,使用管線語法在檔案 (稱為 原始程式碼) 中定義資料集,以建立管線。 每個原始程式碼檔案只能包含一種語言,但您可以在管線中新增多個語言特定的檔案。 若要深入瞭解,請參閱 Lakeflow Spark 宣告式管線
本教學課程使用無伺服器計算和 Unity 目錄。 針對未指定的所有組態選項,請使用預設設定。 如果您的工作區中未啟用或支援無伺服器計算,您可以使用預設計算設定來完成本教學課程。
若要建立新的管線,請遵循下列步驟:
- 在您的工作區中,按一下
中的新增,然後在側邊欄中選取ETL 管線。
- 為您的管線提供唯一的名稱。
- 在名稱下方,選取您產生之資料的預設目錄和結構描述。 您可以在轉換中指定其他目的地,但本教學課程會使用這些預設值。 您必須擁有您建立之目錄和結構描述的許可權。 請參閱 需求。
- 在本教學課程中,請選取 [從空白檔案開始]。
- 在 [資料夾路徑] 中,指定來源檔案的位置,或接受預設值 (您的使用者資料夾)。
- 選擇 Python 或 SQL 作為第一個來源檔案的語言 (管線可以混合和比對語言,但每個檔案都必須使用單一語言)。
- 按一下 ‹選取›。
新管線的管線編輯器隨即顯示。 系統會建立您語言的空白原始檔,為您的第一次轉換做好準備。
步驟 2:開發管線邏輯
在此步驟中,您將使用 Lakeflow 管線編輯器 以互動方式開發和驗證管線的原始程式碼。
程序代碼會使用自動載入器進行累加式數據擷取。 自動載入器會在新檔案抵達雲端物件儲存體時自動偵測並處理。 若要深入瞭解,請參閱 什麼是自動載入器?
系統會自動為管線建立並設定空白原始程式碼檔案。 檔案會在管線的轉換資料夾中建立。 根據預設,轉換資料夾中的所有 *.py 和 *.sql 檔案都是管線來源的一部分。
將下列程式碼複製並貼到原始檔中。 請務必使用您在步驟 1 中為檔案選取的語言。
Python
# Import modules from pyspark import pipelines as dp from pyspark.sql.functions import * from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define the path to the source data file_path = f"/databricks-datasets/songs/data-001/" # Define a streaming table to ingest data from a volume schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) @dp.table( comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." ) def songs_raw(): return (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .load(file_path)) # Define a materialized view that validates data and renames a column @dp.materialized_view( comment="Million Song Dataset with data cleaned and prepared for analysis." ) @dp.expect("valid_artist_name", "artist_name IS NOT NULL") @dp.expect("valid_title", "song_title IS NOT NULL") @dp.expect("valid_duration", "duration > 0") def songs_prepared(): return ( spark.read.table("songs_raw") .withColumnRenamed("title", "song_title") .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year") ) # Define a materialized view that has a filtered, aggregated, and sorted view of the data @dp.materialized_view( comment="A table summarizing counts of songs released by the artists who released the most songs each year." ) def top_artists_by_year(): return ( spark.read.table("songs_prepared") .filter(expr("year > 0")) .groupBy("artist_name", "year") .count().withColumnRenamed("count", "total_number_of_songs") .sort(desc("total_number_of_songs"), desc("year")) )SQL
-- Define a streaming table to ingest data from a volume CREATE OR REFRESH STREAMING TABLE songs_raw COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." AS SELECT * FROM STREAM read_files( '/databricks-datasets/songs/data-001/part*', format => "csv", header => "false", delimiter => "\t", schema => """ artist_id STRING, artist_lat DOUBLE, artist_long DOUBLE, artist_location STRING, artist_name STRING, duration DOUBLE, end_of_fade_in DOUBLE, key INT, key_confidence DOUBLE, loudness DOUBLE, release STRING, song_hotnes DOUBLE, song_id STRING, start_of_fade_out DOUBLE, tempo DOUBLE, time_signature INT, time_signature_confidence DOUBLE, title STRING, year INT, partial_sequence STRING """, schemaEvolutionMode => "none"); -- Define a materialized view that validates data and renames a column CREATE OR REFRESH MATERIALIZED VIEW songs_prepared( CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL), CONSTRAINT valid_title EXPECT (song_title IS NOT NULL), CONSTRAINT valid_duration EXPECT (duration > 0) ) COMMENT "Million Song Dataset with data cleaned and prepared for analysis." AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year FROM songs_raw; -- Define a materialized view that has a filtered, aggregated, and sorted view of the data CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs." AS SELECT artist_name, year, COUNT(*) AS total_number_of_songs FROM songs_prepared WHERE year > 0 GROUP BY artist_name, year ORDER BY total_number_of_songs DESC, year DESC;此原始碼包含三個查詢的程式碼。 您也可以將這些查詢放在個別檔案中,以您偏好的方式組織檔案和程式碼。
按一下
執行檔案或執行管線,以啟動已連線管線的更新。 如果您的管線中只有一個來源檔案,這些檔案在功能上是等效的。
更新完成時,編輯器將會顯示更新後的管線資訊。
- 程式碼右側側邊欄中的管線圖表 (DAG) 會顯示三個資料表
songs_raw、 和songs_preparedtop_artists_by_year。 - 更新的摘要會顯示在管線資產瀏覽器的頂端。
- 所產生資料表的詳細資料會顯示在底部窗格中,您可以選取其中一個資料表來瀏覽資料表中的資料。
這包括原始和清理的數據,以及一些簡單的分析,以按年份找到頂級藝術家。 在下一個步驟中,您會在管線中的個別檔案中建立臨機操作查詢,以進一步分析。
步驟 3:探索管線所建立的資料集
在此步驟中,您會對 ETL 管線中處理的資料執行臨機操作查詢,以分析 Databricks SQL 編輯器中的歌曲資料。 這些查詢會使用在上一個步驟中建立的備妥記錄。
首先,執行查詢,尋找自1990年以來每年發行最多歌曲的藝術家。
從管道資產瀏覽器側邊欄中,按一下
然後新增探索。
輸入 名稱 ,然後選取探索檔案的 SQL 。 SQL 筆記本會在新
explorations資料夾中建立。 根據預設,資料夾中的explorations檔案不會作為管線更新的一部分執行。 SQL 筆記本具有您可以一起執行或個別執行的儲存格。若要建立 1990 年之後每年發行最多歌曲的藝人表格,請在新的 SQL 檔案中輸入下列程式碼 (如果檔案中有範例程式碼,請取代它)。 因為此筆記本不是管線的一部分,所以它不會使用預設目錄和資料庫結構。 將
<catalog>.<schema>替換為您用作管線預設值的目錄和結構描述。-- Which artists released the most songs each year in 1990 or later? SELECT artist_name, total_number_of_songs, year -- replace with the catalog/schema you are using: FROM <catalog>.<schema>.top_artists_by_year WHERE year >= 1990 ORDER BY total_number_of_songs DESC, year DESC;按一下
或按下
Shift + Enter以執行此查詢。
現在,執行另一個查詢,以 4/4 節拍和舞蹈節奏尋找歌曲。
將下列程式碼新增至相同檔案中的下一個儲存格。 再次以您用作管線預設值的目錄和結構描述取代
<catalog>.<schema>。-- Find songs with a 4/4 beat and danceable tempo SELECT artist_name, song_title, tempo -- replace with the catalog/schema you are using: FROM <catalog>.<schema>.songs_prepared WHERE time_signature = 4 AND tempo between 100 and 140;按一下
或按下
Shift + Enter以執行此查詢。
步驟 4:建立作業以執行管線
接下來,建立工作流程,以使用依排程執行的 Databricks 作業,將資料擷取、處理和分析步驟自動化。
- 在編輯器頂端,選擇 Schedule (排程 ) 按鈕。
- 如果出現 Schedules (排程) 對話方塊,請選擇 Add schedule (新增排程)。
- 這會開啟 [新增排程 ] 對話方塊,您可以在其中建立作業,以依排程執行管線。
- 可以選擇為任務命名。
- 依預設,排程設定為每天執行一次。 您可以接受這一點,也可以設定自己的時間表。 選擇 進 階 可讓您選擇設定工作執行的特定時間。 選取 [ 更多選項 ] 可讓您在工作執行時建立通知。
- 選取 [ 建立 ] 以套用變更並建立作業。
現在,作業將每天執行,讓您的管線保持最新狀態。 您可以再次選擇 Schedule (排程 ) 以檢視排程清單。 您可以從該對話方塊管理管線的排程,包括新增、編輯或移除排程。
按一下排程 (或作業) 的名稱,會帶您前往 [作業與管線 ] 清單中的作業頁面。 您可以從該處檢視有關工作執行的詳細資訊,包括執行歷史記錄,或使用 [立即執行] 按鈕立即執行工作。
如需作業執行的詳細資訊,請參閱 Lakeflow 作業的監視和可觀察性 。
深入了解
- 若要深入瞭解資料處理管線,請參閱 Lakeflow Spark 宣告式管線
- 若要深入瞭解 Databricks Notebooks,請參閱 Databricks 筆記本。
- 若要深入瞭解 Lakeflow 作業,請參閱 什麼是作業?
- 若要深入瞭解 Delta Lake,請參閱 什麼是 Azure Databricks 中的 Delta Lake?