共用方式為


使用管線轉換數據

本文說明如何使用管線來宣告資料集的轉換,並指定如何透過查詢邏輯處理記錄。 它也包含建置管線的常見轉換模式範例。

您可以針對傳回 DataFrame 的任何查詢定義數據集。 您可以使用 Apache Spark 的內建操作、使用者自訂函數 (UDF)、自訂邏輯和 MLflow 模型作為 Lakeflow Spark 宣告式管線中的轉換。 將數據匯入至管線之後,您可以針對上游來源定義新的數據集,以建立新的串流數據表、具象化檢視表和檢視表。

若要瞭解如何在管線中有效地執行具狀態處理,請參閱 使用浮水印最佳化具狀態處理

使用檢視、具體化檢視和流式資料表的時機

實作管線查詢時,請選擇最佳的數據集類型,以確保其有效率且可維護。

請考慮使用視圖來進行下列動作:

  • 將您需要的大型或複雜查詢拆分成較容易管理的查詢。
  • 使用預期驗證中繼結果。
  • 減少儲存和計算不需要保留之結果的成本。 由於數據表已具體化,因此需要額外的計算和記憶體資源。

請考慮在下列情況下使用具體化檢視:

  • 多個下游查詢會取用數據表。 因為檢視會視需要計算,因此每次查詢檢視時都會重新計算檢視。
  • 其他管道、工作或查詢會消耗該表格。 由於視圖沒有實現,您只能在相同的管道中使用它們。
  • 您想要在開發期間檢視查詢的結果。 由於數據表已具體化,而且可以在管線外部檢視和查詢,因此在開發期間使用數據表有助於驗證計算的正確性。 驗證之後,將不需要具體化的查詢轉換成檢視。

請考慮在下列情況下使用串流資料表:

  • 查詢是針對持續或累加成長的數據源所定義。
  • 查詢結果應該以累加方式計算。
  • 管線需要高輸送量和低延遲。

備註

串流數據表一律會針對串流來源定義。 您也可以搭配 AUTO CDC ... INTO 使用串流來源,以套用 CDC 資料來源的更新。 請參閱 AUTOTO CDC API:使用管線簡化變更資料擷取

從目標架構中排除數據表

如果您必須計算不打算提供給外部使用的中繼數據表,則可以使用 TEMPORARY 關鍵詞防止它們發佈至架構。 暫存資料表仍會根據 Lakeflow Spark 宣告式管線語意來儲存和處理資料,但不應在目前管線外部存取。 臨時資料表在建立它的管線的整個生命週期中持續存在。 使用下列語法來宣告臨時表:

SQL

CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;

Python

@dp.table(
  temporary=True)
def temp_table():
  return ("...")

在單一管線中結合串流數據表和具體化檢視

串流數據表會繼承 Apache Spark 結構化串流的處理保證,並設定為處理僅附加數據源的查詢,其中新數據列一律會插入源數據表中,而不是修改。

備註

預設情況下,串流表只需要追加的資料來源,但是當串流來源是需要更新或刪除的另一個串流表時,可以使用skipChangeCommits 標誌來覆蓋這種行為。

常見的串流模式牽涉到內嵌源數據,以在管線中建立初始數據集。 這些初始數據集通常稱為銅牌數據表,而且通常會執行簡單的轉換。

相較之下,管線中的最終表格常被稱為黃金表格,通常需要複雜的匯總,或需要從 AUTO CDC ... INTO 作業的目標中讀取資料。 由於這些作業本質上會產生更新操作,而不是附加操作,因此不支援將它們用作串流資料表的輸入。 這些轉換更適合具象化檢視。

藉由將串流數據表和具體化檢視混合成單一管線,您可以簡化管線、避免成本高昂的重新擷取或重新處理原始數據,並具備 SQL 的完整功能,透過有效率編碼和篩選的數據集計算複雜的匯總。 下列範例說明這種類型的混合處理:

備註

這些範例會使用自動載入器從雲端記憶體載入檔案。 若要在已啟用 Unity Catalog 的管線中使用 Auto Loader 載入檔案,您必須使用 外部位置。 若要了解關於使用 Unity 目錄搭配管線的更多資訊,請參閱 使用 Unity 目錄搭配管線

Python

@dp.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://path/to/raw/data")
  )

@dp.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return spark.readStream.table("streaming_bronze").where(...)

@dp.materialized_view
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return spark.read.table("streaming_silver").groupBy("user_id").count()

SQL

CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM STREAM read_files(
  "abfss://path/to/raw/data",
  format => "json"
)

CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(streaming_bronze) WHERE...

CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM streaming_silver GROUP BY user_id

深入瞭解如何使用 自動載入器 從 Azure 記憶體累加內嵌 JSON 檔案。

流-靜態連接

流-靜態聯接是一種理想選擇,適合將僅追加數據的連續流去正規化,並主要使用靜態維度表。

在每次管線更新時,來自資料流的新記錄會與最新靜態數據表快照聯結。 如果在處理串流數據表的對應數據之後,在靜態數據表中新增或更新記錄,除非執行完整重新整理,否則不會重新計算結果記錄。

在設定為觸發執行的管線中,靜態數據表會在更新啟動時傳回結果。 在設定為持續運行的管線中,每當數據表處理更新時,皆會查詢最新版本的固定表格。

以下是數據流靜態聯結的範例:

Python

@dp.table
def customer_sales():
  return spark.readStream.table("sales").join(spark.read.table("customers"), ["customer_id"], "left")

SQL

CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(sales)
  INNER JOIN LEFT customers USING (customer_id)

有效率地計算匯總

您可以使用串流數據表,以累加方式計算簡單的分散式匯總,例如 count、min、max 或 sum,以及平均或標準偏差等代數匯總。 Databricks 建議對具有有限群組的查詢進行累加匯總,例如具有 GROUP BY country 子句的查詢。 每個更新只會讀取新的輸入數據。

若要深入瞭解如何撰寫執行累加彙總的 Lakeflow Spark 宣告式管線查詢,請參閱 使用浮水印執行視窗匯總

在 Lakeflow Spark 宣告式管線中使用 MLflow 模型

備註

若要在已啟用 Unity 目錄的管線中使用 MLflow 模型,您的管線必須設定為使用 preview 通道。 若要使用 current 信道,您必須設定管線以發佈至 Hive 中繼存放區。

您可以在工作流程中使用 MLflow 定型模型。 MLflow 模型會被視為 Azure Databricks 中的轉換,這表示它們會處理 Spark DataFrame 輸入,並將結果傳回為 Spark DataFrame。 由於 Lakeflow Spark 宣告式管線會針對 DataFrame 定義資料集,因此您只需幾行程式碼即可將使用 MLflow 的 Apache Spark 工作負載轉換成管線。 如需 MLflow 的詳細資訊,請參閱 ML 模型生命週期的 MLflow

如果您已經有呼叫 MLflow 模型的 Python 腳本,可以使用 @dp.table@dp.materialized_view 裝飾器,並確定已定義函式以傳回轉換結果,從而將此程式碼調整為管線。 Lakeflow Spark 宣告式管線預設不會安裝 MLflow,因此請確認您已安裝 MLflow 程式庫 %pip install mlflow,並在來源頂端匯入 mlflowdp。 如需管線語法的簡介,請參閱 使用 Python 開發管線程式碼

若要在管線中使用 MLflow 模型,請完成下列步驟:

  1. 取得 MLflow 模型的執行識別碼和模型名稱。 執行標識碼和模型名稱可用來建構 MLflow 模型的 URI。
  2. 使用 URI 來定義 Spark UDF 以載入 MLflow 模型。
  3. 在您的數據表定義中呼叫UDF,以使用MLflow模型。

下列範例顯示此模式的基本語法:

%pip install mlflow

from pyspark import pipelines as dp
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dp.materialized_view
def model_predictions():
  return spark.read.table(<input-data>)
    .withColumn("prediction", loaded_model_udf(<model-features>))

作為完整範例,下列程式代碼會定義名為 loaded_model_udf 的Spark UDF,以載入以貸款風險數據定型的MLflow模型。 用來進行預測的數據列作為參數傳遞至 UDF。 數據表 loan_risk_predictions 會計算 loan_risk_input_data中每個數據列的預測。

%pip install mlflow

from pyspark import pipelines as dp
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dp.materialized_view(
  comment="GBT ML predictions of loan risk",
  table_properties={
    "quality": "gold"
  }
)
def loan_risk_predictions():
  return spark.read.table("loan_risk_input_data")
    .withColumn('predictions', loaded_model_udf(struct(features)))

保留手動刪除或更新

Lakeflow Spark 宣告式管線可讓您手動刪除或更新資料表中的記錄,並執行重新整理作業以重新計算下游資料表。

根據預設,管線會在每次更新時根據輸入資料重新計算資料表結果,因此您必須確保不會從來源資料重新載入已刪除的記錄。 將 pipelines.reset.allowed 數據表屬性設定為 false 可防止重新整理數據表,但不會防止累加寫入數據表或新的數據流入數據表。

下圖說明使用兩個串流數據表的範例:

  • raw_user_table 從來源匯入原始用戶資料。
  • bmi_table 會使用來自 raw_user_table的體重和身高,逐步計算 BMI 指數。

您要從 raw_user_table 手動刪除或更新使用者記錄,然後重新計算 bmi_table

保留數據圖表

下列程式代碼示範如何將 pipelines.reset.allowed 數據表屬性設定為 false,以停用 raw_user_table 的完整重新整理,以便在一段時間內保留預期的變更,但在執行管線更新時會重新計算下游數據表:

CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM STREAM read_files("/databricks-datasets/iot-stream/data-user", format => "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(raw_user_table);