Delta Live Tables Python 語言參考
本文提供 Delta Live Tables Python 程式設計介面的詳細數據。
如需 SQL API 的相關信息,請參閱 Delta Live Tables SQL 語言參考。
如需設定自動載入器的特定詳細數據,請參閱 什麼是自動載入器?。
限制
Delta Live Tables Python 介面具有下列限制:
- Python 和
view
函table
式必須傳回 DataFrame。 某些在 DataFrame 上運作的函式不會傳回 DataFrame,而且不應該使用。 由於 DataFrame 轉換會在解析完整數據流圖形之後執行,因此使用這類作業可能會有非預期的副作用。 這些作業包括、、count()
toPandas()
、save()
和saveAsTable()
等collect()
函式。 不過,您可以將這些函式包含在 或view
函式定義之外table
,因為此程式代碼會在圖形初始化階段執行一次。 - 不支援函
pivot()
式。pivot
Spark 中的作業需要急切載入輸入數據,才能計算輸出的架構。 Delta Live Tables 不支援此功能。
匯入 dlt
Python 模組
差異實時數據表 Python 函式定義於模組中 dlt
。 使用 Python API 實作的管線必須匯入此課程模組:
import dlt
建立差異實時數據表具體化檢視或串流數據表
在 Python 中,Delta Live Tables 會根據定義查詢來決定將數據集更新為具體化檢視或串流數據表。 裝飾 @table
專案可用來定義具體化檢視和串流數據表。
若要在 Python 中定義具體化檢視,請套用 @table
至對數據源執行靜態讀取的查詢。 若要定義串流數據表,請套用 @table
至對數據源執行串流讀取的查詢。 這兩種數據集類型都有相同的語法規格,如下所示:
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
schema="schema-definition",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
建立 Delta Live Tables 檢視
若要在 Python 中定義檢視,請套用 @view
裝飾專案。 @table
如同裝飾專案,您可以在差異實時數據表中針對靜態或串流數據集使用檢視。 以下是使用 Python 定義檢視的語法:
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
範例:定義數據表和檢視
若要在 Python 中定義資料表或檢視表,請將 或 @dlt.table
裝飾專案套用@dlt.view
至函式。 您可以使用函式名稱或 name
參數來指派數據表或檢視名稱。 下列範例會定義兩個不同的數據集:一個稱為 taxi_raw
的檢視,其接受 JSON 檔案做為輸入來源,而名為 filtered_data
的數據表會 taxi_raw
接受檢視做為輸入:
import dlt
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return dlt.read("taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return dlt.read("taxi_raw").where(...)
範例:存取在相同管線中定義的數據集
除了從外部數據源讀取之外,您還可以使用 Delta Live Tables read()
函式存取相同管線中定義的數據集。 下列範例示範如何使用 read()
函式建立customers_filtered
資料集:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return dlt.read("customers_raw").where(...)
您也可以使用 函 spark.table()
式來存取相同管線中定義的數據集。 使用 函 spark.table()
式來存取管線中定義的數據集時,在函式自變數前面加上 LIVE
關鍵詞至資料集名稱:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredB():
return spark.table("LIVE.customers_raw").where(...)
範例:從中繼存放區中註冊的數據表讀取
若要從Hive中繼存放區中註冊的數據表讀取數據,請在函式自變數中省略 LIVE
關鍵詞,並選擇性地將數據表名稱限定為資料庫名稱:
@dlt.table
def customers():
return spark.table("sales.customers").where(...)
如需從 Unity 目錄資料表讀取的範例,請參閱 將數據內嵌至 Unity 目錄管線。
範例:使用 存取數據集 spark.sql
您也可以在查詢函式中使用 spark.sql
表示式傳回數據集。 若要從內部數據集讀取,請在資料集名稱前面加上 LIVE.
:
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")
從多個來源數據流寫入串流數據表
重要
的差異 @append_flow
實時數據表支持處於 公開預覽狀態。
您可以使用 @append_flow
裝飾項目從多個串流來源寫入串流資料表,以執行下列動作:
- 新增和移除將數據附加至現有串流數據表的串流來源,而不需要完整重新整理。 例如,您可能有一個數據表,結合您正在運作之每個區域的區域數據。 隨著新區域推出,您可以將新的區域數據新增至數據表,而不需要執行完整重新整理。
- 藉由附加遺漏的歷程記錄數據來更新串流數據表(回填)。 例如,您有 Apache Kafka 主題寫入的現有串流數據表。 您也會將歷程記錄數據儲存在數據表中,而您需要在串流數據表中插入一次,而且您無法串流數據,因為您需要在插入數據之前執行複雜的匯總。
若要透過 @append_flow
處理建立記錄輸出的目標數據表,請使用 create_streaming_table() 函式。
注意
如果您需要定義具有 預期的數據品質條件約束,請將目標數據表上的期望定義為函式的 create_streaming_table()
一部分。 您無法在 @append_flow
定義中定義預期。
以下是 的 @append_flow
語法:
import dlt
dlt.create_streaming_table("<target-table-name>")
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment") # optional
def <function-name>():
return (<streaming query>)
範例:從多個 Kafka 主題寫入串流數據表
下列範例會建立名為 kafka_target
的串流數據表,並從兩個 Kafka 主題寫入該串流數據表:
import dlt
dlt.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
範例:執行一次性數據回填
下列範例會執行查詢,將歷程記錄數據附加至串流數據表:
注意
若要確保當回填查詢是排程或持續執行的管線一部分時,請移除執行管線一次之後的查詢。 若要在到達回填目錄時附加新數據,請將查詢保留原位。
import dlt
@dlt.table()
def csv_target():
return spark.readStream.format("csv").load("path/to/sourceDir")
@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream.format("csv").load("path/to/backfill/data/dir")
建立數據表以作為串流作業的目標
使用函 create_streaming_table()
式,透過串流作業建立記錄輸出的目標數據表,包括 apply_changes() 和 @append_flow 輸出記錄。
注意
和 create_streaming_live_table()
函create_target_table()
式已被取代。 Databricks 建議更新現有的程式代碼以使用 函式 create_streaming_table()
。
create_streaming_table(
name = "<table-name>",
comment = "<comment>"
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"}
)
引數 |
---|
name 類型: str (英文)資料表名稱。 此為必要參數。 |
comment 類型: str (英文)數據表的選擇性描述。 |
spark_conf 類型: dict (英文)執行此查詢的Spark組態選擇性清單。 |
table_properties 類型: dict (英文)數據表屬性的選擇性清單。 |
partition_cols 類型: array (英文)用於分割數據表的一個或多個數據行的選擇性清單。 |
path 類型: str (英文)數據表數據的選擇性儲存位置。 如果未設定,系統會預設為管線儲存位置。 |
schema 類型: str 或 StructType 數據表的選擇性架構定義。 架構可以定義為 SQL DDL 字串,或使用 Python StructType . |
expect_all expect_all_or_drop expect_all_or_fail 類型: dict (英文)數據表的選擇性數據品質條件約束。 請參閱 多個期望。 |
控制數據表具體化的方式
資料表也提供其具體化的額外控制:
- 指定如何使用 來分割
partition_cols
數據表。 您可以使用資料分割來加速查詢。 - 您可以在定義檢視或資料表時設定資料表屬性。 請參閱 Delta Live Tables 資料表屬性。
- 使用
path
設定來設定數據表數據的儲存位置。 如果未設定,數據表數據預設會儲存在管線儲存位置path
。 - 您可以在架構定義中使用 產生的數據行 。 請參閱 範例:指定架構和數據分割數據行。
注意
對於大小小於 1 TB 的數據表,Databricks 建議讓 Delta Live Tables 控制數據組織。 除非您預期數據表成長超過 TB,否則通常不應該指定資料分割數據行。
範例:指定架構和數據分割數據行
您可以選擇性地使用 Python StructType
或 SQL DDL 字串來指定資料表架構。 使用 DDL 字串指定時,定義可以包含 產生的數據行。
下列範例會使用 Python StructType
所指定的架構建立名為 sales
的數據表:
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
下列範例會使用 DDL 字串指定資料表的架構、定義產生的數據列,以及定義資料分割資料列:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
根據預設,如果您未指定架構,Delta Live Tables 會從 table
定義推斷架構。
設定串流數據表以忽略來源串流數據表中的變更
注意
- 旗標只適用於
spark.readStream
使用 函skipChangeCommits
option()
式。 您無法在函dlt.read_stream()
式中使用這個旗標。 - 當來源串流數據表定義為 apply_changes() 函式的目標時,您無法使用
skipChangeCommits
旗標。
根據預設,串流數據表需要僅限附加的來源。 當串流數據表使用另一個串流數據表做為來源,而來源串流數據表需要更新或刪除時,例如 GDPR「被遺忘的權利」處理時, skipChangeCommits
可以在讀取來源串流數據表時設定旗標來忽略這些變更。 如需此旗標的詳細資訊,請參閱 忽略更新和刪除。
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")
Python Delta Live Tables 屬性
下表描述使用 Delta Live Tables 定義資料表和檢視時,您可以指定的選項和屬性:
@table 或 @view |
---|
name 類型: str (英文)數據表或檢視表的選擇性名稱。 如果未定義,則會使用函式名稱做為數據表或檢視名稱。 |
comment 類型: str (英文)數據表的選擇性描述。 |
spark_conf 類型: dict (英文)執行此查詢的Spark組態選擇性清單。 |
table_properties 類型: dict (英文)數據表屬性的選擇性清單。 |
path 類型: str (英文)數據表數據的選擇性儲存位置。 如果未設定,系統會預設為管線儲存位置。 |
partition_cols 類型: a collection of str (英文)選擇性的集合,例如, list 用於分割數據表的一或多個數據行。 |
schema 類型: str 或 StructType 數據表的選擇性架構定義。 架構可以定義為 SQL DDL 字串,或使用 Python StructType . |
temporary 類型: bool (英文)建立數據表,但不會發佈數據表的元數據。 temporary 關鍵詞會指示 Delta Live Tables 建立可供管線使用的數據表,但不應該在管線外部存取。 為了縮短處理時間,臨時表會保存管線的存留期,而不只是單一更新。預設值為 『False』。 |
數據表或檢視定義 |
---|
def <function-name>() 定義數據集的 Python 函式。 name 如果未設定 參數,則會<function-name> 當做目標數據集名稱使用。 |
query 會傳回 Spark 數據集或 Koalas DataFrame 的 Spark SQL 語句。 使用 dlt.read() 或 spark.table() 從相同管線中定義的數據集執行完整讀取。 使用 函 spark.table() 式從相同管線中定義的數據集讀取時,請將 關鍵詞前面加上 LIVE 函式自變數中的數據集名稱。 例如,若要從名為 customers 的數據集讀取:spark.table("LIVE.customers") 您也可以使用 函 spark.table() 式,藉 LIVE 由省略 關鍵詞,並選擇性地使用資料庫名稱限定數據表名稱,以從中繼存放區中註冊的數據表讀取:spark.table("sales.customers") 使用 dlt.read_stream() 來執行從相同管線中定義之數據集的串流讀取。使用函 spark.sql 式來定義 SQL 查詢來建立傳回數據集。使用 PySpark 語法來使用 Python 定義 Delta Live Tables 查詢。 |
預期結果 |
---|
@expect("description", "constraint") 宣告所識別的數據質量條件約束 description . 如果數據列違反預期,請在目標數據集中包含數據列。 |
@expect_or_drop("description", "constraint") 宣告所識別的數據質量條件約束 description . 如果數據列違反預期,請從目標數據集卸除該數據列。 |
@expect_or_fail("description", "constraint") 宣告所識別的數據質量條件約束 description . 如果數據列違反預期,請立即停止執行。 |
@expect_all(expectations) 宣告一或多個數據質量條件約束。 expectations 是 Python 字典,其中索引鍵是預期描述,而值是預期條件約束。 如果數據列違反任何預期,請在目標數據集中包含該數據列。 |
@expect_all_or_drop(expectations) 宣告一或多個數據質量條件約束。 expectations 是 Python 字典,其中索引鍵是預期描述,而值是預期條件約束。 如果數據列違反任何預期,請從目標數據集卸除該數據列。 |
@expect_all_or_fail(expectations) 宣告一或多個數據質量條件約束。 expectations 是 Python 字典,其中索引鍵是預期描述,而值是預期條件約束。 如果數據列違反任何預期,請立即停止執行。 |
在 Delta 即時數據表中使用 Python 變更數據擷取
使用 Python API 中的 函 apply_changes()
式來使用 Delta Live Tables CDC 功能。 Delta Live Tables Python 介面也提供 create_streaming_table() 函式。 您可以使用此函式來建立函式所需的 apply_changes()
目標資料表。
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
注意
和 UPDATE
事件的預設行為INSERT
是從來源向上插入 CDC 事件:更新目標數據表中符合指定索引鍵的任何數據列,或在目標數據表中不存在相符記錄時插入新數據列。 DELETE
您可以使用條件來指定APPLY AS DELETE WHEN
事件的處理。
重要
您必須宣告目標串流資料表,才能將變更套用至 。 您可以選擇性地指定目標資料表的架構。 指定目標資料表的apply_changes
架構時,您也必須包含 __START_AT
與欄位具有相同資料類型sequence_by
的 和 __END_AT
資料行。
請參閱 使用差異實時數據表中的APPLY CHANGES API 來擷取簡化的變更數據。
引數 |
---|
target 類型: str (英文)要更新之數據表的名稱。 在執行函式之前 apply_changes() ,您可以使用 create_streaming_table() 函式來建立目標數據表。此為必要參數。 |
source 類型: str (英文)包含 CDC 記錄的數據來源。 此為必要參數。 |
keys 類型: list (英文)可唯一識別源數據中數據列的數據行或數據行組合。 這可用來識別哪些 CDC 事件會套用至目標數據表中的特定記錄。 您可以指定下列其中一項: * 字串清單: ["userId", "orderId"] * Spark SQL col() 函式的列表: [col("userId"), col("orderId"] 函式的 col() 自變數不能包含限定符。 例如,您可以使用 col(userId) ,但無法使用 col(source.userId) 。此為必要參數。 |
sequence_by 類型: str 或 col() 指定源數據中 CDC 事件邏輯順序的數據行名稱。 Delta Live Tables 會使用此排序來處理依序抵達的變更事件。 您可以指定下列其中一項: * 字串: "sequenceNum" * Spark SQL 函 col() 式: col("sequenceNum") 函式的 col() 自變數不能包含限定符。 例如,您可以使用 col(userId) ,但無法使用 col(source.userId) 。此為必要參數。 |
ignore_null_updates 類型: bool (英文)允許內嵌包含目標數據行子集的更新。 當 CDC 事件符合現有的數據列且 ignore_null_updates 為 True 時,具有的數據 null 行會保留其目標中的現有值。 這也適用於值為的 null 巢狀數據行。 當 為 False 時ignore_null_updates ,現有的值將會以null 值覆寫。這是選擇性參數。 預設值為 False 。 |
apply_as_deletes 類型: str 或 expr() 指定應將 CDC 事件視為 DELETE 而非 upsert 的時機。 為了處理順序錯誤的數據,已刪除的數據列會暫時保留為基礎 Delta 數據表中的墓碑,而檢視會在中繼存放區中建立,以篩選掉這些墓碑。 您可以使用 來設定保留間隔pipelines.cdc.tombstoneGCThresholdInSeconds table 屬性。您可以指定下列其中一項: * 字串: "Operation = 'DELETE'" * Spark SQL 函 expr() 式: expr("Operation = 'DELETE'") 這是選擇性參數。 |
apply_as_truncates 類型: str 或 expr() 指定應將 CDC 事件視為完整數據表 TRUNCATE 的時機。 因為這個子句會觸發目標數據表的完整截斷,所以應該只用於需要這項功能的特定使用案例。apply_as_truncates 只有 SCD 類型 1 才支援 參數。 SCD 類型 2 不支援截斷。您可以指定下列其中一項: * 字串: "Operation = 'TRUNCATE'" * Spark SQL 函 expr() 式: expr("Operation = 'TRUNCATE'") 這是選擇性參數。 |
column_list except_column_list 類型: list (英文)要包含在目標數據表中的數據行子集。 使用 column_list 指定要包含之資料行的完整清單。 使用 except_column_list 指定要排除的數據行。 您可以將值宣告為字串清單或 Spark SQL 函 col() 式:* column_list = ["userId", "name", "city"] .* column_list = [col("userId"), col("name"), col("city")] * except_column_list = ["operation", "sequenceNum"] * except_column_list = [col("operation"), col("sequenceNum") 函式的 col() 自變數不能包含限定符。 例如,您可以使用 col(userId) ,但無法使用 col(source.userId) 。這是選擇性參數。 當目標數據表中沒有 column_list 或 except_column_list 自變數傳遞至函式時,預設值為包含所有數據行。 |
stored_as_scd_type 類型: str 或 int 是否要將記錄儲存為 SCD 類型 1 或 SCD 類型 2。 針對 SCD 型態 1 或 2 SCD 型態 2 設定為 1 。這個子句是選擇性的。 預設值為 SCD 類型 1。 |
track_history_column_list track_history_except_column_list 類型: list (英文)要追蹤目標數據表中記錄的輸出數據行子集。 使用 track_history_column_list 指定要追蹤之資料行的完整清單。 使用track_history_except_column_list 表示指定要從追蹤中排除的數據行。 您可以將值宣告為字串清單或 Spark SQL col() 函式:- track_history_column_list = ["userId", "name", "city"] 。 - track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") 函式的 col() 自變數不能包含限定符。 例如,您可以使用 col(userId) ,但無法使用 col(source.userId) 。這是選擇性參數。 預設值是當目標數據表中沒有 track_history_column_list 或 時,將所有數據行包含在目標數據表中track_history_except_column_list 自變數會傳遞至函式。 |
意見反應
https://aka.ms/ContentUserFeedback。
即將登場:在 2024 年,我們將逐步淘汰 GitHub 問題作為內容的意見反應機制,並將它取代為新的意見反應系統。 如需詳細資訊,請參閱:提交並檢視相關的意見反應