Delta Live Tables Python 語言參考

本文提供 Delta Live Tables Python 程式設計介面的詳細數據。

如需 SQL API 的相關信息,請參閱 Delta Live Tables SQL 語言參考

如需設定自動載入器的特定詳細數據,請參閱 什麼是自動載入器?

限制

Delta Live Tables Python 介面具有下列限制:

  • Python 和viewtable式必須傳回 DataFrame。 某些在 DataFrame 上運作的函式不會傳回 DataFrame,而且不應該使用。 由於 DataFrame 轉換會在解析完整數據流圖形之後執行,因此使用這類作業可能會有非預期的副作用。 這些作業包括、、count()toPandas()save()saveAsTable()collect()函式。 不過,您可以將這些函式包含在 或 view 函式定義之外table,因為此程式代碼會在圖形初始化階段執行一次。
  • 不支援函 pivot() 式。 pivot Spark 中的作業需要急切載入輸入數據,才能計算輸出的架構。 Delta Live Tables 不支援此功能。

匯入 dlt Python 模組

差異實時數據表 Python 函式定義於模組中 dlt 。 使用 Python API 實作的管線必須匯入此課程模組:

import dlt

建立差異實時數據表具體化檢視或串流數據表

在 Python 中,Delta Live Tables 會根據定義查詢來決定將數據集更新為具體化檢視或串流數據表。 裝飾 @table 專案可用來定義具體化檢視和串流數據表。

若要在 Python 中定義具體化檢視,請套用 @table 至對數據源執行靜態讀取的查詢。 若要定義串流數據表,請套用 @table 至對數據源執行串流讀取的查詢。 這兩種數據集類型都有相同的語法規格,如下所示:

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  schema="schema-definition",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

建立 Delta Live Tables 檢視

若要在 Python 中定義檢視,請套用 @view 裝飾專案。 @table如同裝飾專案,您可以在差異實時數據表中針對靜態或串流數據集使用檢視。 以下是使用 Python 定義檢視的語法:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

範例:定義數據表和檢視

若要在 Python 中定義資料表或檢視表,請將 或 @dlt.table 裝飾專案套用@dlt.view至函式。 您可以使用函式名稱或 name 參數來指派數據表或檢視名稱。 下列範例會定義兩個不同的數據集:一個稱為 taxi_raw 的檢視,其接受 JSON 檔案做為輸入來源,而名為 filtered_data 的數據表會 taxi_raw 接受檢視做為輸入:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return dlt.read("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return dlt.read("taxi_raw").where(...)

範例:存取在相同管線中定義的數據集

除了從外部數據源讀取之外,您還可以使用 Delta Live Tables read() 函式存取相同管線中定義的數據集。 下列範例示範如何使用 read() 函式建立customers_filtered資料集:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return dlt.read("customers_raw").where(...)

您也可以使用 函 spark.table() 式來存取相同管線中定義的數據集。 使用 函 spark.table() 式來存取管線中定義的數據集時,在函式自變數前面加上 LIVE 關鍵詞至資料集名稱:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredB():
  return spark.table("LIVE.customers_raw").where(...)

範例:從中繼存放區中註冊的數據表讀取

若要從Hive中繼存放區中註冊的數據表讀取數據,請在函式自變數中省略 LIVE 關鍵詞,並選擇性地將數據表名稱限定為資料庫名稱:

@dlt.table
def customers():
  return spark.table("sales.customers").where(...)

如需從 Unity 目錄資料表讀取的範例,請參閱 將數據內嵌至 Unity 目錄管線

範例:使用 存取數據集 spark.sql

您也可以在查詢函式中使用 spark.sql 表示式傳回數據集。 若要從內部數據集讀取,請在資料集名稱前面加上 LIVE.

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

從多個來源數據流寫入串流數據表

重要

的差異 @append_flow 實時數據表支持處於 公開預覽狀態

您可以使用 @append_flow 裝飾項目從多個串流來源寫入串流資料表,以執行下列動作:

  • 新增和移除將數據附加至現有串流數據表的串流來源,而不需要完整重新整理。 例如,您可能有一個數據表,結合您正在運作之每個區域的區域數據。 隨著新區域推出,您可以將新的區域數據新增至數據表,而不需要執行完整重新整理。
  • 藉由附加遺漏的歷程記錄數據來更新串流數據表(回填)。 例如,您有 Apache Kafka 主題寫入的現有串流數據表。 您也會將歷程記錄數據儲存在數據表中,而您需要在串流數據表中插入一次,而且您無法串流數據,因為您需要在插入數據之前執行複雜的匯總。

若要透過 @append_flow 處理建立記錄輸出的目標數據表,請使用 create_streaming_table() 函式。

注意

如果您需要定義具有 預期的數據品質條件約束,請將目標數據表上的期望定義為函式的 create_streaming_table() 一部分。 您無法在 @append_flow 定義中定義預期。

以下是 的 @append_flow語法:

import dlt

dlt.create_streaming_table("<target-table-name>")

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment") # optional
def <function-name>():
  return (<streaming query>)

範例:從多個 Kafka 主題寫入串流數據表

下列範例會建立名為 kafka_target 的串流數據表,並從兩個 Kafka 主題寫入該串流數據表:

import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dlt.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

範例:執行一次性數據回填

下列範例會執行查詢,將歷程記錄數據附加至串流數據表:

注意

若要確保當回填查詢是排程或持續執行的管線一部分時,請移除執行管線一次之後的查詢。 若要在到達回填目錄時附加新數據,請將查詢保留原位。

import dlt

@dlt.table()
def csv_target():
  return spark.readStream.format("csv").load("path/to/sourceDir")

@dlt.append_flow(target = "csv_target")
def backfill():
  return spark.readStream.format("csv").load("path/to/backfill/data/dir")

建立數據表以作為串流作業的目標

使用函 create_streaming_table() 式,透過串流作業建立記錄輸出的目標數據表,包括 apply_changes()@append_flow 輸出記錄。

注意

create_streaming_live_table()create_target_table()式已被取代。 Databricks 建議更新現有的程式代碼以使用 函式 create_streaming_table()

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"}
)
引數
name

類型:str (英文)

資料表名稱。

此為必要參數。
comment

類型:str (英文)

數據表的選擇性描述。
spark_conf

類型:dict (英文)

執行此查詢的Spark組態選擇性清單。
table_properties

類型:dict (英文)

數據表屬性選擇性清單。
partition_cols

類型:array (英文)

用於分割數據表的一個或多個數據行的選擇性清單。
path

類型:str (英文)

數據表數據的選擇性儲存位置。 如果未設定,系統會預設為管線儲存位置。
schema

類型: strStructType

數據表的選擇性架構定義。 架構可以定義為 SQL DDL 字串,或使用 Python
StructType.
expect_all
expect_all_or_drop
expect_all_or_fail

類型:dict (英文)

數據表的選擇性數據品質條件約束。 請參閱 多個期望

控制數據表具體化的方式

資料表也提供其具體化的額外控制:

注意

對於大小小於 1 TB 的數據表,Databricks 建議讓 Delta Live Tables 控制數據組織。 除非您預期數據表成長超過 TB,否則通常不應該指定資料分割數據行。

範例:指定架構和數據分割數據行

您可以選擇性地使用 Python StructType 或 SQL DDL 字串來指定資料表架構。 使用 DDL 字串指定時,定義可以包含 產生的數據行

下列範例會使用 Python StructType所指定的架構建立名為 sales 的數據表:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

下列範例會使用 DDL 字串指定資料表的架構、定義產生的數據列,以及定義資料分割資料列:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

根據預設,如果您未指定架構,Delta Live Tables 會從 table 定義推斷架構。

設定串流數據表以忽略來源串流數據表中的變更

注意

  • 旗標只適用於spark.readStream使用 函skipChangeCommitsoption()式。 您無法在函 dlt.read_stream() 式中使用這個旗標。
  • 當來源串流數據表定義為 apply_changes() 函式的目標時,您無法使用 skipChangeCommits 旗標。

根據預設,串流數據表需要僅限附加的來源。 當串流數據表使用另一個串流數據表做為來源,而來源串流數據表需要更新或刪除時,例如 GDPR「被遺忘的權利」處理時, skipChangeCommits 可以在讀取來源串流數據表時設定旗標來忽略這些變更。 如需此旗標的詳細資訊,請參閱 忽略更新和刪除

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")

Python Delta Live Tables 屬性

下表描述使用 Delta Live Tables 定義資料表和檢視時,您可以指定的選項和屬性:

@table 或 @view
name

類型:str (英文)

數據表或檢視表的選擇性名稱。 如果未定義,則會使用函式名稱做為數據表或檢視名稱。
comment

類型:str (英文)

數據表的選擇性描述。
spark_conf

類型:dict (英文)

執行此查詢的Spark組態選擇性清單。
table_properties

類型:dict (英文)

數據表屬性選擇性清單。
path

類型:str (英文)

數據表數據的選擇性儲存位置。 如果未設定,系統會預設為管線儲存位置。
partition_cols

類型:a collection of str (英文)

選擇性的集合,例如, list用於分割數據表的一或多個數據行。
schema

類型: strStructType

數據表的選擇性架構定義。 架構可以定義為 SQL DDL 字串,或使用 Python
StructType.
temporary

類型:bool (英文)

建立數據表,但不會發佈數據表的元數據。 temporary關鍵詞會指示 Delta Live Tables 建立可供管線使用的數據表,但不應該在管線外部存取。 為了縮短處理時間,臨時表會保存管線的存留期,而不只是單一更新。

預設值為 『False』。
數據表或檢視定義
def <function-name>()

定義數據集的 Python 函式。 name如果未設定 參數,則會<function-name>當做目標數據集名稱使用。
query

會傳回 Spark 數據集或 Koalas DataFrame 的 Spark SQL 語句。

使用 dlt.read()spark.table() 從相同管線中定義的數據集執行完整讀取。 使用 函 spark.table() 式從相同管線中定義的數據集讀取時,請將 關鍵詞前面加上 LIVE 函式自變數中的數據集名稱。 例如,若要從名為 customers的數據集讀取:

spark.table("LIVE.customers")

您也可以使用 函 spark.table() 式,藉 LIVE 由省略 關鍵詞,並選擇性地使用資料庫名稱限定數據表名稱,以從中繼存放區中註冊的數據表讀取:

spark.table("sales.customers")

使用 dlt.read_stream() 來執行從相同管線中定義之數據集的串流讀取。

使用函 spark.sql 式來定義 SQL 查詢來建立傳回數據集。

使用 PySpark 語法來使用 Python 定義 Delta Live Tables 查詢。
預期結果
@expect("description", "constraint")

宣告所識別的數據質量條件約束
description. 如果數據列違反預期,請在目標數據集中包含數據列。
@expect_or_drop("description", "constraint")

宣告所識別的數據質量條件約束
description. 如果數據列違反預期,請從目標數據集卸除該數據列。
@expect_or_fail("description", "constraint")

宣告所識別的數據質量條件約束
description. 如果數據列違反預期,請立即停止執行。
@expect_all(expectations)

宣告一或多個數據質量條件約束。
expectations 是 Python 字典,其中索引鍵是預期描述,而值是預期條件約束。 如果數據列違反任何預期,請在目標數據集中包含該數據列。
@expect_all_or_drop(expectations)

宣告一或多個數據質量條件約束。
expectations 是 Python 字典,其中索引鍵是預期描述,而值是預期條件約束。 如果數據列違反任何預期,請從目標數據集卸除該數據列。
@expect_all_or_fail(expectations)

宣告一或多個數據質量條件約束。
expectations 是 Python 字典,其中索引鍵是預期描述,而值是預期條件約束。 如果數據列違反任何預期,請立即停止執行。

在 Delta 即時數據表中使用 Python 變更數據擷取

使用 Python API 中的 函 apply_changes() 式來使用 Delta Live Tables CDC 功能。 Delta Live Tables Python 介面也提供 create_streaming_table() 函式。 您可以使用此函式來建立函式所需的 apply_changes() 目標資料表。

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

注意

UPDATE 事件的預設行為INSERT是從來源向上插入 CDC 事件:更新目標數據表中符合指定索引鍵的任何數據列,或在目標數據表中不存在相符記錄時插入新數據列。 DELETE您可以使用條件來指定APPLY AS DELETE WHEN事件的處理。

重要

您必須宣告目標串流資料表,才能將變更套用至 。 您可以選擇性地指定目標資料表的架構。 指定目標資料表的apply_changes架構時,您也必須包含 __START_AT 與欄位具有相同資料類型sequence_by的 和 __END_AT 資料行。

請參閱 使用差異實時數據表中的APPLY CHANGES API 來擷取簡化的變更數據。

引數
target

類型:str (英文)

要更新之數據表的名稱。 在執行函式之前apply_changes(),您可以使用 create_streaming_table() 函式來建立目標數據表。

此為必要參數。
source

類型:str (英文)

包含 CDC 記錄的數據來源。

此為必要參數。
keys

類型:list (英文)

可唯一識別源數據中數據列的數據行或數據行組合。 這可用來識別哪些 CDC 事件會套用至目標數據表中的特定記錄。

您可以指定下列其中一項:

* 字串清單: ["userId", "orderId"]
* Spark SQL col() 函式的列表: [col("userId"), col("orderId"]

函式的 col() 自變數不能包含限定符。 例如,您可以使用 col(userId),但無法使用 col(source.userId)

此為必要參數。
sequence_by

類型: strcol()

指定源數據中 CDC 事件邏輯順序的數據行名稱。 Delta Live Tables 會使用此排序來處理依序抵達的變更事件。

您可以指定下列其中一項:

* 字串: "sequenceNum"
* Spark SQL 函 col() 式: col("sequenceNum")

函式的 col() 自變數不能包含限定符。 例如,您可以使用 col(userId),但無法使用 col(source.userId)

此為必要參數。
ignore_null_updates

類型:bool (英文)

允許內嵌包含目標數據行子集的更新。 當 CDC 事件符合現有的數據列且 ignore_null_updatesTrue時,具有的數據 null 行會保留其目標中的現有值。 這也適用於值為的 null巢狀數據行。 當 為 Falseignore_null_updates,現有的值將會以null值覆寫。

這是選擇性參數。

預設值為 False
apply_as_deletes

類型: strexpr()

指定應將 CDC 事件視為 DELETE 而非 upsert 的時機。 為了處理順序錯誤的數據,已刪除的數據列會暫時保留為基礎 Delta 數據表中的墓碑,而檢視會在中繼存放區中建立,以篩選掉這些墓碑。 您可以使用 來設定保留間隔
pipelines.cdc.tombstoneGCThresholdInSecondstable 屬性

您可以指定下列其中一項:

* 字串: "Operation = 'DELETE'"
* Spark SQL 函 expr() 式: expr("Operation = 'DELETE'")

這是選擇性參數。
apply_as_truncates

類型: strexpr()

指定應將 CDC 事件視為完整數據表 TRUNCATE的時機。 因為這個子句會觸發目標數據表的完整截斷,所以應該只用於需要這項功能的特定使用案例。

apply_as_truncates只有 SCD 類型 1 才支援 參數。 SCD 類型 2 不支援截斷。

您可以指定下列其中一項:

* 字串: "Operation = 'TRUNCATE'"
* Spark SQL 函 expr() 式: expr("Operation = 'TRUNCATE'")

這是選擇性參數。
column_list

except_column_list

類型:list (英文)

要包含在目標數據表中的數據行子集。 使用 column_list 指定要包含之資料行的完整清單。 使用 except_column_list 指定要排除的數據行。 您可以將值宣告為字串清單或 Spark SQL 函 col() 式:

* column_list = ["userId", "name", "city"].
* column_list = [col("userId"), col("name"), col("city")]
* except_column_list = ["operation", "sequenceNum"]
* except_column_list = [col("operation"), col("sequenceNum")

函式的 col() 自變數不能包含限定符。 例如,您可以使用 col(userId),但無法使用 col(source.userId)

這是選擇性參數。

當目標數據表中沒有 column_listexcept_column_list 自變數傳遞至函式時,預設值為包含所有數據行。
stored_as_scd_type

類型: strint

是否要將記錄儲存為 SCD 類型 1 或 SCD 類型 2。

針對 SCD 型態 1 或 2 SCD 型態 2 設定為 1

這個子句是選擇性的。

預設值為 SCD 類型 1。
track_history_column_list

track_history_except_column_list

類型:list (英文)

要追蹤目標數據表中記錄的輸出數據行子集。 使用 track_history_column_list 指定要追蹤之資料行的完整清單。 使用
track_history_except_column_list 表示指定要從追蹤中排除的數據行。 您可以將值宣告為字串清單或 Spark SQL col() 函式:- track_history_column_list = ["userId", "name", "city"]。 - track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum")

函式的 col() 自變數不能包含限定符。 例如,您可以使用 col(userId),但無法使用 col(source.userId)

這是選擇性參數。

預設值是當目標數據表中沒有 track_history_column_list 或 時,將所有數據行包含在目標數據表中
track_history_except_column_list 自變數會傳遞至函式。