分享方式:


使用 Delta Live Tables 轉換數據

本文說明如何使用 Delta Live Tables 來宣告數據集上的轉換,以及指定如何透過查詢邏輯處理記錄。 它也包含一些常見的轉換模式範例,在建置 Delta Live Tables 管線時可能很有用。

您可以針對傳回 DataFrame 的任何查詢定義數據集。 您可以使用 Apache Spark 內建作業、UDF、自定義邏輯和 MLflow 模型作為 Delta Live Tables 管線中的轉換。 將數據內嵌至 Delta Live Tables 管線之後,您可以針對上游來源定義新的數據集,以建立新的串流數據表、具體化檢視和檢視表。

若要瞭解如何使用 Delta Live Tables 有效地執行具狀態處理,請參閱 使用浮水印優化 Delta 即時數據表中的具狀態處理。

使用檢視、具體化檢視和串流數據表的時機

若要確保您的管線有效率且可維護,請在實作管線查詢時選擇最佳的數據集類型。

請考慮在下列情況下使用檢視:

  • 您有一個大型或複雜的查詢,您想要分成更容易管理的查詢。
  • 您想要使用預期來驗證中繼結果。
  • 您想要降低記憶體和計算成本,而且不需要具體化查詢結果。 由於數據表已具體化,因此需要額外的計算和記憶體資源。

請考慮在下列情況下使用具體化檢視:

  • 多個下游查詢會取用數據表。 因為檢視會視需要計算,因此每次查詢檢視時都會重新計算檢視。
  • 其他管線、作業或查詢會取用數據表。 由於檢視並未具體化,因此您只能在相同的管線中使用它們。
  • 您想要在開發期間檢視查詢的結果。 由於數據表已具體化,而且可以在管線外部檢視和查詢,因此在開發期間使用數據表有助於驗證計算的正確性。 驗證之後,將不需要具體化的查詢轉換成檢視。

請考慮在下列情況下使用串流資料表:

  • 查詢是針對持續或累加成長的數據源所定義。
  • 查詢結果應該以累加方式計算。
  • 管線需要高輸送量和低延遲。

注意

串流數據表一律會針對串流來源定義。 您也可以使用 串流來源 搭配 APPLY CHANGES INTO 來套用 CDC 摘要的更新。 請參閱 套用變更 API:簡化差異實時數據表中的異動數據擷取。

在單一管線中結合串流數據表和具體化檢視

串流數據表會繼承 Apache Spark 結構化串流的處理保證,並設定為處理僅附加數據源的查詢,其中新數據列一律會插入源數據表中,而不是修改。

注意

雖然根據預設,串流數據表需要僅附加數據源,但當串流來源是另一個需要更新或刪除的串流數據表時,您可以使用skipChangeCommits旗標覆寫此行為

常見的串流模式包括內嵌源數據,以在管線中建立初始數據集。 這些初始數據集通常稱為 牌數據表,而且通常會執行簡單的轉換。

相較之下,管線中最後一個數據表通常稱為 黃金 數據表,通常需要複雜的匯總,或從屬於作業目標 APPLY CHANGES INTO 的來源讀取。 由於這些作業原本會建立更新,而不是附加,因此不支援它們做為串流數據表的輸入。 這些轉換更適合具體化檢視。

藉由將串流數據表和具體化檢視混合成單一管線,您可以簡化管線、避免成本高昂的重新擷取或重新處理原始數據,並具備 SQL 的完整功能,透過有效率編碼和篩選的數據集計算複雜的匯總。 下列範例說明這種類型的混合處理:

注意

這些範例會使用自動載入器從雲端記憶體載入檔案。 若要在啟用 Unity 目錄的管線中,使用自動載入器載入檔案,您必須使用外部位置。 若要深入了解搭配 Delta Live Tables 使用 Unity 目錄,請參閱搭配您的 Delta Live Tables 管線使用 Unity 目錄 (英文)。

Python

@dlt.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://path/to/raw/data")
  )

@dlt.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return dlt.read_stream("streaming_bronze").where(...)

@dlt.table
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return dlt.read("streaming_silver").groupBy("user_id").count()

SQL

CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM cloud_files(
  "abfss://path/to/raw/data", "json"
)

CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...

CREATE OR REFRESH LIVE TABLE live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id

深入瞭解如何使用 自動載入器 ,有效率地從 Azure 記憶體讀取 JSON 檔案以進行增量處理。

Stream-static 聯結

當以主要靜態維度數據表反正規化僅限附加數據的連續數據流時,串流靜態聯結是不錯的選擇。

透過每個管線更新,來自數據流的新記錄會與靜態數據表的最新快照集聯結。 如果在處理串流數據表的對應數據之後,在靜態數據表中新增或更新記錄,除非執行完整重新整理,否則不會重新計算結果記錄。

在設定為觸發執行的管線中,靜態數據表會在更新啟動時傳回結果。 在設定為連續執行的管線中,每次數據表處理更新時,都會查詢最新版的靜態數據表。

以下是數據流靜態聯結的範例:

Python

@dlt.table
def customer_sales():
  return dlt.read_stream("sales").join(dlt.read("customers"), ["customer_id"], "left")

SQL

CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
  INNER JOIN LEFT LIVE.customers USING (customer_id)

有效率地計算匯總

您可以使用串流數據表,以累加方式計算簡單的分散式匯總,例如 count、min、max 或 sum,以及平均或標準偏差等代數匯總。 Databricks 建議對具有有限群組數目的查詢進行累加匯總,例如具有 子句的 GROUP BY country 查詢。 每個更新只會讀取新的輸入數據。

若要深入瞭解如何撰寫執行累加匯總的 Delta Live Tables 查詢,請參閱 使用浮浮水印執行視窗式匯總。

在 Delta Live Tables 管線中使用 MLflow 模型

注意

若要在已啟用 Unity 目錄的管線中使用 MLflow 模型,您的管線必須設定為使用 preview 通道。 若要使用 current 通道,您必須設定管線以發佈至 Hive 中繼存放區。

您可以在 Delta Live Tables 管線中使用 MLflow 定型的模型。 MLflow 模型會被視為 Azure Databricks 中的轉換,這表示它們會處理 Spark DataFrame 輸入,並將結果傳回為 Spark DataFrame。 因為 Delta Live Tables 會針對 DataFrame 定義數據集,因此您可以只使用幾行程式代碼,將利用 MLflow 的 Apache Spark 工作負載轉換成 Delta Live Tables。 如需 MLflow 的詳細資訊,請參閱 使用 MLflow 的 ML 生命週期管理。

如果您已經有呼叫 MLflow 模型的 Python 筆記本,您可以使用裝飾專案將此程式代碼調整為 Delta Live Tables @dlt.table ,並確保已定義函式以傳回轉換結果。 Delta Live Tables 預設不會安裝 MLflow,因此請確定您%pip install mlflow和匯入mlflowdlt筆記本頂端。 如需 Delta Live Tables 語法的簡介,請參閱 使用 Python 實作 Delta Live Tables 管線。

若要在 Delta Live Tables 中使用 MLflow 模型,請完成下列步驟:

  1. 取得 MLflow 模型的執行識別碼和模型名稱。 執行標識碼和模型名稱可用來建構 MLflow 模型的 URI。
  2. 使用 URI 來定義 Spark UDF 以載入 MLflow 模型。
  3. 在您的數據表定義中呼叫UDF,以使用MLflow模型。

下列範例顯示此模式的基本語法:

%pip install mlflow

import dlt
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dlt.table
def model_predictions():
  return dlt.read(<input-data>)
    .withColumn("prediction", loaded_model_udf(<model-features>))

作為完整範例,下列程式代碼會定義名為 loaded_model_udf 的Spark UDF,以載入以貸款風險數據定型的MLflow模型。 用來進行預測的數據行會當做自變數傳遞至 UDF。 數據表 loan_risk_predictions 會計算 中每個數據列的 loan_risk_input_data預測。

%pip install mlflow

import dlt
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dlt.table(
  comment="GBT ML predictions of loan risk",
  table_properties={
    "quality": "gold"
  }
)
def loan_risk_predictions():
  return dlt.read("loan_risk_input_data")
    .withColumn('predictions', loaded_model_udf(struct(features)))

保留手動刪除或更新

Delta Live Tables 可讓您手動刪除或更新數據表中的記錄,並執行重新整理作業來重新計算下游數據表。

根據預設,Delta Live Tables 會在每次更新管線時,根據輸入數據重新計算數據表結果,因此您必須確定已刪除的記錄不會從源數據重載。 pipelines.reset.allowed將數據表屬性設定為false防止重新整理至數據表,但不會防止對數據表進行累加寫入,或防止新的數據流入數據表。

下圖說明使用兩個串流數據表的範例:

  • raw_user_table 從來源擷取原始用戶數據。
  • bmi_table 以累加方式從計算 BMI 分數的權數和高度 raw_user_table

您要從手動刪除或更新使用者記錄 raw_user_tablebmi_table然後重新計算 。

保留數據圖表

下列程式代碼示範將 pipelines.reset.allowed 數據表屬性 false 設定為 以停用完整重新 raw_user_table 整理,以便隨著時間保留預期的變更,但在執行管線更新時會重新計算下游數據表:

CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM cloud_files("/databricks-datasets/iot-stream/data-user", "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);