瞭解如何使用 Lakeflow Spark 說明式管線 (SDP) 建立和部署 ETL (擷取、轉換和載入) 流程,並利用 Auto Loader 和變更資料擷取 (CDC) 進行資料編排。 ETL 管線會實作從來源系統讀取數據的步驟、根據需求轉換該數據,例如數據品質檢查和記錄重複數據,以及將數據寫入目標系統,例如數據倉儲或數據湖。
在本教學課程中,您將使用 MySQL customers 資料庫中資料表的數據來:
- 使用 Debezium 或其他工具從交易式資料庫擷取變更,並將其儲存至雲端物件儲存 (S3、ADLS 或 GCS)。 在本教學課程中,您會略過設定外部 CDC 系統,而是產生虛假資料以簡化教學課程。
- 使用自動載入器,以累加方式從雲端物件記憶體載入訊息,並將原始訊息儲存在數據表中
customers_cdc。 自動載入器會推斷結構描述並處理結構描述演進。 - 建立
customers_cdc_clean表格以使用預期檢查資料品質。 例如,id永遠不應該是null,因為它是用於執行 upsert 作業。 - 對已清除的 CDC 資料執行,
AUTO CDC ... INTO以將變更更新插入到最終customers表格中。 - 顯示管線如何建立類型 2 緩慢變更維度 (SCD2) 表格,以追蹤所有變更。
目標是近乎即時地內嵌原始數據,併為分析師小組建置數據表,同時確保數據品質。
本教學課程使用 medallion Lakehouse 架構,其會透過銅層內嵌原始數據、使用銀層清理和驗證數據,並使用金層套用維度模型化和匯總。 如需詳細資訊,請參閱獎章湖屋架構的概念。
實作的流程如下所示:
如需管線、自動載入器和 CDC 的詳細資訊,請參閱 Lakeflow Spark 宣告式管線、 什麼是自動載入器?和 什麼是變更資料擷取 (CDC)?
需求
您必須滿足下列需求,才能完成本教學課程:
- 登入 Azure Databricks 工作區。
- 為您的工作區啟用 Unity 目錄 。
- 針對您的帳戶啟用 無伺服器計算 。 無伺服器化 Lakeflow Spark 宣告式管線並非在所有工作區域中都可用。 如需了解具有有限區域可用性的功能,可用於哪些區域,請參閱 區域列表。 如果您的帳戶未啟用無伺服器計算,則這些步驟應該可以使用您工作區的預設運算資源。
- 有權 建立計算資源 或 存取計算資源。
- 具有在 目錄中建立新架構的許可權。 必要的權限為
ALL PRIVILEGES或USE CATALOG與 。CREATE SCHEMA - 具有在 現有架構中建立新磁碟區的許可權。 必要的權限為
ALL PRIVILEGES或USE SCHEMA與 。CREATE VOLUME
變更 ETL 管線中的數據擷取
變更資料擷取 (CDC) 是擷取對交易式資料庫 (例如 MySQL 或 PostgreSQL) 或資料倉儲所做的記錄變更的程序。 CDC 會擷取資料刪除、附加和更新等作業,通常作為串流來重新具體化外部系統中的資料表。 CDC 具備增量載入的能力,同時消除大量載入更新的需求。
備註
若要簡化本教學課程,請略過設定外部 CDC 系統。 假設系統正在執行並將 CDC 資料儲存為雲端物件儲存服務(例如 S3、ADLS 或 GCS)中的 JSON 檔案。 本教學課程使用 Faker 程式庫來產生教學課程中使用的資料。
捕獲 CDC
有各種 CDC 工具可供使用。 Debezium 是領先的開源解決方案之一,但也存在其他簡化資料來源的實作,例如 Fivetran、Qlik Replicate、StreamSets、Talend、Oracle GoldenGate 和 AWS DMS。
在本教學課程中,您會使用來自外部系統的 CDC 數據,例如 Debezium 或 DMS。 Debezium 會擷取每個變更的數據列。 它通常會將資料變更的歷史記錄傳送至 Kafka 主題,或將其儲存為檔案。
您必須從 customers 資料表(JSON 格式)中擷取 CDC 資訊,檢查其正確無誤,然後在 Lakehouse 中具現化 customers 資料表。
從 Debezium 獲得的 CDC 輸入
對於每個變更,您會收到一則 JSON 訊息,其中包含要更新之列的所有欄位 (id , firstname, lastnameemail, , )。 address 該消息還包括其他元數據:
-
operation:作業程序代碼,通常是 (DELETE、APPEND、 。UPDATE -
operation_date:每個作業動作記錄的日期和時間。
Debezium 之類的工具可以產生更進階的輸出,例如變更前的數據列值,但本教學課程會省略它們以求簡單。
步驟 1:建立管線
建立新的 ETL 管線來查詢您的 CDC 資料來源,並在工作區中產生資料表。
在您的工作區中,按一下左上角的
新增。加號圖示。 按一下 ETL 管線。
將管線的標題變更為
Pipelines with CDC tutorial或您偏好的名稱。在標題下,選擇您具有寫入許可權的目錄和結構描述。
如果您未在程式碼中指定目錄或結構描述,則預設會使用此目錄和結構描述。 您的程式碼可以寫入至任何目錄或模式,透過指定完整路徑。 本教學課程會使用您在此處指定的預設值。
從 進階選項中,選取從 空白檔案開始。
為您的程式碼選擇一個資料夾。 您可以選取 瀏覽 以瀏覽工作區中的資料夾清單。 您可以選擇您擁有寫入權限的任何資料夾。
若要使用版本控制,請選取 Git 資料夾。 如果您需要建立新資料夾,請選取
按鈕。
根據您要用於教學課程的語言,選擇 Python 或 SQL 作為檔案的語言。
按一下 [選取] 以建立具有這些設定的管線,然後開啟 Lakeflow 管線編輯器。
您現在有一個空白管道,其中包含預設目錄和結構描述。 接下來,設定要在教學課程中匯入的範例資料。
步驟 2:建立要在本教學課程中匯入的範例資料
如果您要從現有來源匯入自己的資料,則不需要此步驟。 在本教學課程中,請產生虛假資料作為教學課程的範例。 建立筆記本以執行 Python 資料產生指令碼。 此程式碼只需要執行一次即可產生範例資料,因此請在管線的 explorations 資料夾中建立它,這不會作為管線更新的一部分執行。
備註
此程式碼使用 Faker 產生範例 CDC 資料。 Faker 可以自動安裝,因此本教學使用 %pip install faker。 您也可以為筆記本設定對 *faker* 的依賴。 請參閱 將相依性新增至筆記本。
在 Lakeflow 管線編輯器中,在編輯器左側的資產瀏覽器側邊欄中,按一下
新增,然後選擇探索。
為其 指定名稱,例如
Setup data,選取 Python。 您可以保留預設目標資料夾,也就是新explorations資料夾。點擊 建立。 這會在新資料夾中建立筆記本。
在第一個儲存格中輸入下列程式碼。 您必須變更
<my_catalog>和<my_schema>的定義以符合您在上一個程序中選取的預設型錄和模式:%pip install faker # Update these to match the catalog and schema # that you used for the pipeline in step 1. catalog = "<my_catalog>" schema = dbName = db = "<my_schema>" spark.sql(f'USE CATALOG `{catalog}`') spark.sql(f'USE SCHEMA `{schema}`') spark.sql(f'CREATE VOLUME IF NOT EXISTS `{catalog}`.`{db}`.`raw_data`') volume_folder = f"/Volumes/{catalog}/{db}/raw_data" try: dbutils.fs.ls(volume_folder+"/customers") except: print(f"folder doesn't exist, generating the data under {volume_folder}...") from pyspark.sql import functions as F from faker import Faker from collections import OrderedDict import uuid fake = Faker() import random fake_firstname = F.udf(fake.first_name) fake_lastname = F.udf(fake.last_name) fake_email = F.udf(fake.ascii_company_email) fake_date = F.udf(lambda:fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S")) fake_address = F.udf(fake.address) operations = OrderedDict([("APPEND", 0.5),("DELETE", 0.1),("UPDATE", 0.3),(None, 0.01)]) fake_operation = F.udf(lambda:fake.random_elements(elements=operations, length=1)[0]) fake_id = F.udf(lambda: str(uuid.uuid4()) if random.uniform(0, 1) < 0.98 else None) df = spark.range(0, 100000).repartition(100) df = df.withColumn("id", fake_id()) df = df.withColumn("firstname", fake_firstname()) df = df.withColumn("lastname", fake_lastname()) df = df.withColumn("email", fake_email()) df = df.withColumn("address", fake_address()) df = df.withColumn("operation", fake_operation()) df_customers = df.withColumn("operation_date", fake_date()) df_customers.repartition(100).write.format("json").mode("overwrite").save(volume_folder+"/customers")若要產生教學課程中使用的資料集,請輸入 Shift + Enter 以執行程式碼:
選擇性。 若要預覽本教學課程中使用的資料,請在下一個儲存格中輸入下列程式碼,然後執行程式碼。 更新目錄和結構描述,以符合先前程式碼中的路徑。
# Update these to match the catalog and schema # that you used for the pipeline in step 1. catalog = "<my_catalog>" schema = "<my_schema>" display(spark.read.json(f"/Volumes/{catalog}/{schema}/raw_data/customers"))
這會產生一個大型資料集(包含虛假的 CDC 資料),您可以在教學課程的其餘部分使用。 在下一個步驟中,使用自動載入器擷取資料。
步驟 3:使用 Auto Loader 以增量方式擷取資料
下一步是將原始資料從(偽造的)雲端儲存擷取到青銅層中。
這可能會因為多種原因而面臨挑戰,因為您必須:
- 大規模運作,可能會處理數百萬個小檔案。
- 推斷架構和 JSON 類型。
- 處理使用不正確 JSON 架構的錯誤記錄。
- 處理架構演進(例如,客戶表中的新欄位)。
自動載入器可簡化此擷取,包括結構描述推斷和結構描述演變,同時擴展至數百萬個傳入檔案。 自動載入器可在 Python 中使用 cloudFiles 和 在 SQL 中使用 SELECT * FROM STREAM read_files(...) ,而且可以搭配各種格式使用(JSON、CSV、Apache Avro 等):
將資料表定義為串流資料表,可保證您只取用新的傳入資料。 如果您未將它定義為串流表格,它會掃描並擷取所有可用的資料。 如需詳細資訊 ,請參閱串流數據表 。
若要使用自動載入器擷取傳入的 CDC 資料,請將下列程式碼複製並貼到使用管線建立的程式碼檔中 (稱為
my_transformation.py)。 您可以根據您在建立管線時選擇的語言,使用 Python 或 SQL。 請務必將<catalog>和<schema>替換為您為此管道設定的預設值。Python
from pyspark import pipelines as dp from pyspark.sql.functions import * # Replace with the catalog and schema name that # you are using: path = "/Volumes/<catalog>/<schema>/raw_data/customers" # Create the target bronze table dp.create_streaming_table("customers_cdc_bronze", comment="New customer data incrementally ingested from cloud object storage landing zone") # Create an Append Flow to ingest the raw data into the bronze table @dp.append_flow( target = "customers_cdc_bronze", name = "customers_bronze_ingest_flow" ) def customers_bronze_ingest_flow(): return ( spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .load(f"{path}") )SQL
CREATE OR REFRESH STREAMING TABLE customers_cdc_bronze COMMENT "New customer data incrementally ingested from cloud object storage landing zone"; CREATE FLOW customers_bronze_ingest_flow AS INSERT INTO customers_cdc_bronze BY NAME SELECT * FROM STREAM read_files( -- replace with the catalog/schema you are using: "/Volumes/<catalog>/<schema>/raw_data/customers", format => "json", inferColumnTypes => "true" )按一下
執行檔案或執行管線,以啟動已連線管線的更新。 如果您的管線中只有一個來源檔案,這些檔案在功能上是等效的。
更新完成時,編輯器將會顯示更新後的管線資訊。
- 程式碼右側側邊欄中的管線圖形 (DAG) 會顯示單一資料表
customers_cdc_bronze。 - 更新的摘要會顯示在管線資產瀏覽器的頂端。
- 所產生表格的詳細資料會顯示在底部窗格中,您可以選取表格來瀏覽資料表中的資料。
這是從雲端儲存匯入的原始青銅層資料。 在下一個步驟中,清理資料以建立銀層資料表。
步驟 4:整理和期望來追蹤資料品質
定義青銅層之後,透過新增期望來建立銀層,以控制資料品質。 檢查以下條件:
- 識別子永遠不得為
null。 - CDC 作業類型必須有效。
- 自動載入器必須正確讀取 JSON。
不符合這些條件的資料列會被移除。
如需更多資訊,請參閱使用管道期望管理數據品質。
從管道資產瀏覽器側邊欄中,按一下
新增,然後 轉換。
輸入 名稱 ,然後選擇原始程式碼檔案的語言 (Python 或 SQL)。 您可以在管線中混合使用語言,因此可以為此步驟選擇任意一種語言。
若要建立具有已清理資料表的銀色圖層並施加限制,請將下列程式碼複製並貼到新檔案中 (根據檔案的語言選擇 Python 或 SQL)。
Python
from pyspark import pipelines as dp from pyspark.sql.functions import * dp.create_streaming_table( name = "customers_cdc_clean", expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "id IS NOT NULL","valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"} ) @dp.append_flow( target = "customers_cdc_clean", name = "customers_cdc_clean_flow" ) def customers_cdc_clean_flow(): return ( spark.readStream.table("customers_cdc_bronze") .select("address", "email", "id", "firstname", "lastname", "operation", "operation_date", "_rescued_data") )SQL
CREATE OR REFRESH STREAMING TABLE customers_cdc_clean ( CONSTRAINT no_rescued_data EXPECT (_rescued_data IS NULL) ON VIOLATION DROP ROW, CONSTRAINT valid_id EXPECT (id IS NOT NULL) ON VIOLATION DROP ROW, CONSTRAINT valid_operation EXPECT (operation IN ('APPEND', 'DELETE', 'UPDATE')) ON VIOLATION DROP ROW ) COMMENT "New customer data incrementally ingested from cloud object storage landing zone"; CREATE FLOW customers_cdc_clean_flow AS INSERT INTO customers_cdc_clean BY NAME SELECT * FROM STREAM customers_cdc_bronze;按一下
執行檔案或執行管線,以啟動已連線管線的更新。
因為現在有兩個原始檔,所以它們不會做同樣的事情,但在這種情況下,輸出是相同的。
- 執行管線 會運行您的整個管線,包括步驟 3 中的程式碼。 如果您的輸入資料正在更新,這會將來源的任何變更匯入到您的青銅層。 這不會從資料設定步驟執行程式碼,因為該步驟位於探索資料夾中,而不是管線來源的一部分。
- 執行檔案只會 執行目前的來源檔案。 在這種情況下,在不更新輸入資料的情況下,這會從快取的青銅表產生白銀資料。 在建立或編輯管線程式碼時,只執行此檔案以加快反覆專案速度會很有用。
更新完成後,您可以看到管線圖表現在顯示兩個表格(銀色圖層取決於青銅層),底部面板顯示兩個表格的詳細資訊。 現在在管線資產瀏覽器的頂端會顯示多次執行的時間,但只會提供最近一次執行的詳細資料。
接下來,創建表格的 customers 最終黃金層版本。
步驟 5:使用 AUTO CDC 流程具體化客戶資料表
到目前為止,表格只是在每個步驟中傳遞了 CDC 資料。 現在,建立 customers 資料表以包含最新的視圖,並且作為原始資料表的複本,而不是包含建立該資料表的 CDC 作業清單。
這不容易手動實作。 您必須考慮像資料去重這樣的因素,以保留最新的數據列。
不過,Lakeflow Spark 宣告式管線透過 AUTO CDC 作業來解決這些挑戰。
從管道資產瀏覽器的側邊欄中,按一下
,然後選擇 添加 和 轉換。
輸入 名稱 ,然後為新的原始程式碼檔案選擇語言 (Python 或 SQL)。 您可以再次為此步驟選擇任一語言,但請使用下面的正確程式碼。
若要在 Lakeflow Spark 宣告式管道中使用
AUTO CDC處理 CDC 資料,請將以下程式碼複製並貼上至新檔案中。Python
from pyspark import pipelines as dp from pyspark.sql.functions import * dp.create_streaming_table(name="customers", comment="Clean, materialized customers") dp.create_auto_cdc_flow( target="customers", # The customer table being materialized source="customers_cdc_clean", # the incoming CDC keys=["id"], # what we'll be using to match the rows to upsert sequence_by=col("operation_date"), # de-duplicate by operation date, getting the most recent value ignore_null_updates=False, apply_as_deletes=expr("operation = 'DELETE'"), # DELETE condition except_column_list=["operation", "operation_date", "_rescued_data"], )SQL
CREATE OR REFRESH STREAMING TABLE customers; CREATE FLOW customers_cdc_flow AS AUTO CDC INTO customers FROM stream(customers_cdc_clean) KEYS (id) APPLY AS DELETE WHEN operation = "DELETE" SEQUENCE BY operation_date COLUMNS * EXCEPT (operation, operation_date, _rescued_data) STORED AS SCD TYPE 1;按一下
執行檔案以 啟動已連線管線的更新。
更新完成後,您可以看到管線圖表顯示 3 個表格,從銅牌到銀牌再到金牌。
步驟 6:使用緩慢變更維度類型 2 (SCD2) 追蹤更新歷程記錄
建立表格通常需要追蹤由APPEND、UPDATE和DELETE所產生的所有變更:
- 歷程記錄:您想要保留資料表所有變更的歷程記錄。
- 可追溯性:您想要查看發生的操作。
帶有 Lakeflow SDP 的 SCD2
Delta 支援變更資料流程 (CDF),並 table_change 可在 SQL 和 Python 中查詢資料表修改。 不過,CDF 的主要使用案例是擷取管道中的變更,而不是從一開始就建立資料表變更的完整檢視。
如果您有順序錯亂的事件,實作會變得特別複雜。 如果您必須依時間戳記來排序變更,並接收過去發生的修改,則必須在 SCD 表格中附加新項目,並更新先前的項目。
Lakeflow SDP 消除了這種複雜性,並允許您建立一個單獨的資料表,其中包含從一開始的所有修改。 然後,可以大規模使用此表格,並視需要使用特定分割區或 ZORDER 直欄。 亂序欄位會根據_sequence_by自動處理。
若要建立 SCD2 表格,請使用 SQL 中的選項 STORED AS SCD TYPE 2 或 Python 中的選項 stored_as_scd_type="2"。
備註
您也可以使用選項來限制功能追蹤的欄位:TRACK HISTORY ON {columnList | EXCEPT(exceptColumnList)}
從管道資產瀏覽器的側邊欄中,按一下
,然後選擇 添加 和 轉換。
輸入 名稱 ,然後為新的原始程式碼檔案選擇語言 (Python 或 SQL)。
將以下程式碼複製並貼到新檔案中。
Python
from pyspark import pipelines as dp from pyspark.sql.functions import * # create the table dp.create_streaming_table( name="customers_history", comment="Slowly Changing Dimension Type 2 for customers" ) # store all changes as SCD2 dp.create_auto_cdc_flow( target="customers_history", source="customers_cdc_clean", keys=["id"], sequence_by=col("operation_date"), ignore_null_updates=False, apply_as_deletes=expr("operation = 'DELETE'"), except_column_list=["operation", "operation_date", "_rescued_data"], stored_as_scd_type="2", ) # Enable SCD2 and store individual updatesSQL
CREATE OR REFRESH STREAMING TABLE customers_history; CREATE FLOW customers_history_cdc AS AUTO CDC INTO customers_history FROM stream(customers_cdc_clean) KEYS (id) APPLY AS DELETE WHEN operation = "DELETE" SEQUENCE BY operation_date COLUMNS * EXCEPT (operation, operation_date, _rescued_data) STORED AS SCD TYPE 2;按一下
執行檔案以 啟動已連線管線的更新。
更新完成後,管線圖表將包含新 customers_history 表格,並且依賴於銀層表格,而底部面板將顯示所有四個表格的詳細資訊。
步驟 7:建立具體化檢視,以追蹤誰變更其資訊最多
數據表 customers_history 包含使用者對其資訊所做的所有歷程記錄變更。 在黃金層中建立簡單的具體化檢視,以追蹤最常更改其資訊的是誰。 這可用於真實世界中的詐騙偵測分析或用戶建議。 此外,應用 SCD2 變更後已經刪除重複項,因此您可以直接計算每個使用者 ID 的行數。
從管道資產瀏覽器的側邊欄中,按一下
,然後選擇 添加 和 轉換。
輸入 名稱 ,然後為新的原始程式碼檔案選擇語言 (Python 或 SQL)。
將下列程式碼複製並貼到新的原始檔中。
Python
from pyspark import pipelines as dp from pyspark.sql.functions import * @dp.table( name = "customers_history_agg", comment = "Aggregated customer history" ) def customers_history_agg(): return ( spark.read.table("customers_history") .groupBy("id") .agg( count("address").alias("address_count"), count("email").alias("email_count"), count("firstname").alias("firstname_count"), count("lastname").alias("lastname_count") ) )SQL
CREATE OR REPLACE MATERIALIZED VIEW customers_history_agg AS SELECT id, count("address") as address_count, count("email") AS email_count, count("firstname") AS firstname_count, count("lastname") AS lastname_count FROM customers_history GROUP BY id按一下
執行檔案以 啟動已連線管線的更新。
更新完成後,管道圖中會出現一個依賴於 customers_history 該表的新表,您可以在底部面板中查看它。 您的管線現已完成。 您可以執行完整的 執行流程來測試它。 剩下的唯一任務是排程管線以定期更新。
步驟 8:建立執行 ETL 流程的作業
接下來,建立工作流程,以使用 Databricks 作業將管線中的資料擷取、處理和分析步驟自動化。
- 在編輯器頂端,選擇 Schedule (排程 ) 按鈕。
- 如果出現 Schedules (排程) 對話方塊,請選擇 Add schedule (新增排程)。
- 這會開啟 [新增排程 ] 對話方塊,您可以在其中建立作業,以依排程執行管線。
- 可以選擇為任務命名。
- 依預設,排程設定為每天執行一次。 您可以接受此預設值,或設定自己的排程。 選擇 進 階 可讓您選擇設定工作執行的特定時間。 選取 [ 更多選項 ] 可讓您在工作執行時建立通知。
- 選取 [ 建立 ] 以套用變更並建立作業。
現在,作業將每天執行,讓您的管線保持最新狀態。 您可以再次選擇 Schedule (排程 ) 以檢視排程清單。 您可以從該對話方塊管理管線的排程,包括新增、編輯或移除排程。
按一下排程 (或作業) 的名稱,會帶您前往 [作業與管線 ] 清單中的作業頁面。 您可以從該處檢視有關工作執行的詳細資訊,包括執行歷史記錄,或使用 [立即執行] 按鈕立即執行工作。
如需作業執行的詳細資訊,請參閱 Lakeflow 作業的監視和可觀察性 。