教學課程:在 Databricks 平臺上使用 Apache Spark 建置 ETL 管線

本教學課程說明如何使用 Apache Spark 開發及部署第一個作為資料編排的 ETL 流程(擷取、轉換和載入)。 雖然本教學課程使用 Databricks 的全用途計算資源,但如果您的工作區已啟用無伺服器計算,也可以使用它。

您也可以使用 Lakeflow Spark 宣告式管線來建置 ETL 管線。 Databricks Lakeflow Spark 宣告式管線可降低建置、部署和維護生產 ETL 管線的複雜性。 請參閱 教學課程:使用 Lakeflow Spark 宣告式管線建置 ETL 管線

讀完本文後,您將知道如何:

  1. 啟動一個 Databricks 多功能運算資源
  2. 建立一個 Databricks 筆記本
  3. 用自動載入器設定增量資料匯入到 Delta Lake
  4. 處理與與資料互動
  5. 將筆記本排程為 Databricks 任務

本教學使用互動筆記本完成常見的 Python 或 Scala 執行 ETL 任務。

您也可以使用 Databricks Terraform 提供者 來建立本文的資源。 請參閱使用 Terraform 建立叢集、筆記本和作業

需求

注意

即使你沒有運算控制權限,只要你 能取得運算資源,仍然可以完成以下大部分步驟。

步驟 1:建立運算資源

若要進行探索性資料分析與資料工程,請建立一個運算資源來執行指令。

  1. 在側邊欄中按下 計算圖示 [計算]
  2. 在計算頁面,點擊 建立計算
  3. 為計算資源指定唯一名稱,剩餘值保持預設狀態,然後點選 「建立運算」。

欲了解更多關於 Databricks 計算的資訊,請參閱 Compute

步驟 2:建立 Databricks 筆記本

若要在工作區中建立筆記本,請按一下提要欄位的 新增圖示新增 ,然後按一下 筆記本。 空白筆記本會在工作區中開啟。

若要深入瞭解如何建立並管理筆記本,請參閱 管理筆記本

步驟 3:設定自動載入器將資料內嵌至 Delta Lake

Databricks 建議使用自動載入器進行累加式資料擷取。 自動載入器會在新檔案抵達雲端物件儲存體時自動偵測並處理。

Databricks 建議使用 Delta Lake 儲存資料。 Delta Lake 是一個開源儲存層,提供 ACID 交易並啟用資料湖屋。 Delta Lake 是 Databricks 中建立之數據表的預設格式。

若要將自動載入器設定為將數據內嵌至 Delta Lake 數據表,請將下列程式代碼複製並貼到筆記本中的空白數據格:

Python

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))

程式語言 Scala

// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._

// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"

// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)

// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(Trigger.AvailableNow)
  .toTable(table_name)

注意

此程式碼中定義的變數應該可讓您安全地執行它,而不會有與現有工作區資產或其他使用者衝突的風險。 執行此程式碼時,限制的網路或儲存體權限會引發錯誤;請連絡您的工作區系統管理員,以針對這些限制進行疑難排解。

若要深入了解自動載入器,請參閱什麼是自動載入器?

步驟 4:處理資料並與之互動

筆記本會逐個儲存格執行邏輯。 若要執行您儲存格中的邏輯:

  1. 若要執行您在上一個步驟中完成的儲存格,請選取該儲存格,然後按 SHIFT+ENTER

  2. 若要查詢您剛建立的數據表,請將下列程式代碼複製並貼入空白儲存格,然後按 SHIFT+ENTER 以執行儲存格。

    Python

    df = spark.read.table(table_name)
    

    程式語言 Scala

    val df = spark.read.table(table_name)
    
  3. 若要預覽 DataFrame 中的資料,請將下列程式碼複製並貼到空白儲存格中,然後按 SHIFT+ENTER 以執行儲存格。

    Python

    display(df)
    

    程式語言 Scala

    display(df)
    

若要深入瞭解可視化數據的互動式選項,請參閱 Databricks 筆記本和 SQL 編輯器中的視覺效果

步驟 5:排程作業

您可以透過將 Databricks 筆記本新增為 Databricks 作業中的一個任務來執行,將其用作生產指令碼。 在此步驟中,您將建立可手動觸發的新作業。

若要將筆記本排程為任務:

  1. 按一下標題列右側的 [排程]
  2. 針對 [作業名稱] 輸入唯一名稱。
  3. 點擊 手動
  4. 計算 下拉選單中,選擇你在第一步建立的計算資源。
  5. 按一下 [建立]。
  6. 在出現的視窗中,點擊 [立即執行]
  7. 若要查看作業執行結果,請按一下 [外部連結] 圖示,該圖示位於 [上次執行] 時間戳記旁。

如需作業的詳細資訊,請參閱 什麼是作業?

其他整合

了解更多關於與 Azure Databricks 整合與資料工程工具的資訊: