共用方式為


使用 Python 開發管線程式代碼

Lakeflow 宣告式管線引進數個新的 Python 程式代碼建構,以定義管線中的具體化檢視和串流數據表。 Python 支援開發管線是以 PySpark DataFrame 和結構化串流 API 的基本概念為基礎。

對於不熟悉 Python 和 DataFrame 的使用者,Databricks 建議使用 SQL 介面。 請參閱 使用 SQL開發管線程式代碼。

如需 Lakeflow 宣告式管線 Python 語法的完整參考,請參閱 Lakeflow 宣告式管線 Python 語言參考

適用於管線開發的 Python 基本概念

建立 Lakeflow 宣告式管線數據集的 Python 程式代碼必須傳回 DataFrame。

所有 Lakeflow 宣告式管線 Python API 都會在 dlt 模組中實作。 使用 Python 實作的 Lakeflow 宣告式管線代碼,必須在 Python 筆記本和檔案的頂端明確匯入 dlt 模組。

讀取和寫入預設為管線組態期間指定的目錄和架構。 請參閱 設定目標目錄和架構

Lakeflow 宣告式管線特定 Python 程式代碼與其他類型的 Python 程式代碼有一個重要差異:Python 管線程序代碼不會直接呼叫執行數據擷取和轉換的函式,以建立 Lakeflow 宣告式管線數據集。 相反地,Lakeflow 宣告式管線會從管線中配置的所有原始程式碼檔案的 dlt 模組中解譯裝飾器函數,並建構數據流圖。

重要

若要避免管線執行時發生非預期的行為,請勿在定義數據集的函式中包含可能有副作用的程序代碼。 若要深入瞭解,請參閱 Python 參考

使用 Python 建立具體化檢視或串流數據表

裝飾器 @dlt.table 會告訴 Lakeflow 宣告式管線,根據函式傳回的結果建立實體化檢視或串流表。 批次讀取的結果會建立具體化檢視,而串流讀取的結果會建立串流數據表。

根據預設,具體化檢視和串流數據表名稱會從函式名稱推斷。 下列程式代碼範例示範建立具體化檢視表和串流數據表的基本語法:

註記

這兩個函式都會參考 samples 目錄中的相同數據表,並使用相同的裝飾函式。 這些範例強調具體化檢視和串流數據表的基本語法唯一差異是使用 spark.readspark.readStream

並非所有數據源都支援串流讀取。 某些數據源應該一律使用串流語意來處理。

import dlt

@dlt.table()
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table()
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

或者,您可以使用 name 裝飾專案中的 @dlt.table 自變數來指定資料表名稱。 下列範例示範具體化檢視表和串流數據表的這個模式:

import dlt

@dlt.table(name = "trips_mv")
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table(name = "trips_st")
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

從物件記憶體載入資料

Lakeflow 宣告式管線支援從 Azure Databricks 支援的所有格式載入數據。 請參閱 數據格式選項。

註記

這些範例會使用自動掛接到工作區的/databricks-datasets下可用的資料。 Databricks 建議使用磁碟區路徑或雲端 URI 來參考儲存在雲端物件記憶體中的數據。 請參閱 什麼是 Unity 目錄磁碟區?

Databricks 建議在設定針對儲存在雲端物件儲存中的資料進行增量擷取的工作負載時,使用 Auto Loader 和串流資料表。 請參閱 什麼是自動載入器?

下列範例會使用自動載入器從 JSON 檔案建立串流資料表:

import dlt

@dlt.table()
def ingestion_st():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

下列範例會使用批次語意來讀取 JSON 目錄,並建立具體化檢視:

import dlt

@dlt.table()
def batch_mv():
  return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")

用預期驗證數據

您可以使用預期來設定及強制執行資料品質條件約束。 請參閱 使用期望值管理管線中的資料品質

下列程式代碼會使用 @dlt.expect_or_drop 來定義名為 valid_data 的預期,以在數據擷取期間卸除 null 的記錄:

import dlt

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

查詢管線中定義的實體化視圖和串流表

下列範例會定義四個資料集:

  • 名為 orders 的串流數據表,會載入 JSON 數據。
  • 名為 customers 的具體化檢視,可載入 CSV 數據。
  • 名為 customer_orders 的具體化檢視,聯結來自 orderscustomers 數據集的記錄、將順序時間戳轉換成日期,然後選取 customer_idorder_numberstateorder_date 字段。
  • 名為 daily_orders_by_state 的具體化檢視,會匯總每個州的每日訂單計數。

註記

在查詢管線中的檢視或資料表時,您可以直接指定目錄和架構,也可以使用管線中設定的預設值。 在此範例中,orderscustomerscustomer_orders 數據表會從您管線設定的預設目錄和架構中寫入和讀取。

舊版發佈模式會使用 LIVE 架構來查詢管線中定義的其他具體化檢視和串流數據表。 在新管線中,LIVE 架構語法會被默默地忽略。 請參閱 LIVE 架構 (舊版)

import dlt
from pyspark.sql.functions import col

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

@dlt.table()
def customers():
    return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")

@dlt.table()
def customer_orders():
  return (spark.read.table("orders")
    .join(spark.read.table("customers"), "customer_id")
      .select("customer_id",
        "order_number",
        "state",
        col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
      )
  )

@dlt.table()
def daily_orders_by_state():
    return (spark.read.table("customer_orders")
      .groupBy("state", "order_date")
      .count().withColumnRenamed("count", "order_count")
    )

for 迴圈中建立數據表

您可以使用 Python for 循環,以程式設計方式建立多個數據表。 當您有許多數據源或目標數據集只因少數參數而有所不同時,這可能會很有用,這導致維護所需的總程式碼較少,而且程式碼冗贅較少。

for 迴圈會依序列順序評估邏輯,但一旦為數據集完成規劃,管線就會平行執行邏輯。

重要

使用此模式來定義資料集時,請確定傳遞至 for 迴圈的值清單一律是加總的。 如果先前在管線中定義的資料集於未來的管線運行中被省略,該資料集會自動從目標結構中移除。

下列範例會建立五個數據表,依區域篩選客戶訂單。 在這裡,區域名稱是用來設定目標具體化檢視的名稱,以及篩選源數據。 暫存檢視可用來定義建構最終具體化檢視時所用源數據表的聯結。

import dlt
from pyspark.sql.functions import collect_list, col

@dlt.view()
def customer_orders():
  orders = spark.read.table("samples.tpch.orders")
  customer = spark.read.table("samples.tpch.customer")

  return (orders.join(customer, orders.o_custkey == customer.c_custkey)
    .select(
      col("c_custkey").alias("custkey"),
      col("c_name").alias("name"),
      col("c_nationkey").alias("nationkey"),
      col("c_phone").alias("phone"),
      col("o_orderkey").alias("orderkey"),
      col("o_orderstatus").alias("orderstatus"),
      col("o_totalprice").alias("totalprice"),
      col("o_orderdate").alias("orderdate"))
  )

@dlt.view()
def nation_region():
  nation = spark.read.table("samples.tpch.nation")
  region = spark.read.table("samples.tpch.region")

  return (nation.join(region, nation.n_regionkey == region.r_regionkey)
    .select(
      col("n_name").alias("nation"),
      col("r_name").alias("region"),
      col("n_nationkey").alias("nationkey")
    )
  )

# Extract region names from region table

region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]

# Iterate through region names to create new region-specific materialized views

for region in region_list:

  @dlt.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
  def regional_customer_orders(region_filter=region):

    customer_orders = spark.read.table("customer_orders")
    nation_region = spark.read.table("nation_region")

    return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
      .select(
        col("custkey"),
        col("name"),
        col("phone"),
        col("nation"),
        col("region"),
        col("orderkey"),
        col("orderstatus"),
        col("totalprice"),
        col("orderdate")
      ).filter(f"region = '{region_filter}'")
    )

以下是此管線數據流圖形的範例:

兩個檢視引導進五個區域數據表的數據流圖表。

疑難解答:for 迴圈會建立許多具有相同值的數據表

執行管線用來評估 Python 程式碼的延遲執行模型,需要在調用經 @dlt.table() 裝飾的函數時,您的邏輯必須直接參考個別值。

下列範例示範兩個正確的方法來定義具有 for 循環的數據表。 在這兩個範例中,tables 清單中的每個資料表名稱都會在由 @dlt.table()裝飾的函式中被明確引用。

import dlt

# Create a parent function to set local variables

def create_table(table_name):
  @dlt.table(name=table_name)
  def t():
    return spark.read.table(table_name)

tables = ["t1", "t2", "t3"]
for t_name in tables:
  create_table(t_name)

# Call `@dlt.table()` within a for loop and pass values as variables

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table(table_name=t_name):
    return spark.read.table(table_name)

下列範例 未正確引用 的值。 此範例會建立具有不同名稱的數據表,但所有數據表都會從 for 迴圈中的最後一個值載入數據:

import dlt

# Don't do this!

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table():
    return spark.read.table(t_name)

從實體化視圖或串流表永久刪除記錄

若要從已啟用刪除向量的具體化檢視或串流資料表中永久刪除記錄,例如針對GDPR合規性,必須在對象的 Delta 基礎資料表上執行其他作業。 若要確保從具體化檢視刪除記錄,請參閱 永久刪除具有啟用刪除向量之具體化檢視中的記錄。 若要確保從串流資料表刪除記錄,請參閱 從串流資料表永久刪除記錄