@table裝飾工具可用來定義管線中的串流資料表。
若要定義串流資料表,請將 @table 套用至對資料來源執行串流讀取的查詢,或使用 create_streaming_table () 函式。
備註
在較舊 dlt 的模組中, @table 運算子用於建立串流資料表和具體化檢視。
@table模組中的pyspark.pipelines運算子仍會以這種方式運作,但 Databricks 建議使用@materialized_view運算子來建立實體化檢視。
語法
from pyspark import pipelines as dp
@dp.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by_auto = <bool>,
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
private = <bool>)
@dp.expect(...)
def <function-name>():
return (<query>)
參數
@dp.expect() 是可選的 Lakeflow Spark 宣告性管道期望子句。 您可以包含多個期望。 請參閱 預期。
| 參數 | 類型 | Description |
|---|---|---|
| 函數 | function |
必須的。 用來從使用者定義的查詢中返回 Apache Spark 串流 DataFrame 的函數。 |
name |
str |
數據表名稱。 如果未提供,則預設為函式名稱。 |
comment |
str |
數據表的描述。 |
spark_conf |
dict |
用於執行此查詢的 Spark 組態清單 |
table_properties |
dict |
一個用於資料表的dict資料表屬性。 |
path |
str |
數據表數據的儲存位置。 如果未設定,請使用包含數據表之架構的受控儲存位置。 |
partition_cols |
list |
用於分割數據表的一或多個欄列表。 |
cluster_by_auto |
bool |
在桌子上啟用自動液體聚集。 這可以與要用作初始叢集索引鍵的直欄結合 cluster_by 並定義,然後根據工作負載進行監視和自動索引鍵選擇更新。 請參閱 自動液體群集。 |
cluster_by |
list |
在數據表上啟用液體叢集,並定義要當做叢集索引鍵使用的數據行。 請參閱 針對數據表使用液體叢集。 |
schema |
str 或 StructType |
數據表的架構定義。 架構可以定義為 SQL DDL 字串,或使用 Python StructType。 |
private |
bool |
建立數據表,但不會將數據表發佈至中繼存放區。 該數據表可供管線使用,但無法在管線外部存取。 私人資料表會在管線的生命周期內持續存在。 預設值為 False。私人資料表先前是使用參數 temporary 建立的。 |
row_filter |
str |
(公開預覽)數據表的數據列篩選子句。 請參閱 使用資料列篩選和欄位遮罩發佈資料表,。 |
指定架構是選擇性的,而且可以使用 PySpark StructType 或 SQL DDL 來完成。 當您指定架構時,可以選擇性地包含產生的數據行、數據行遮罩,以及主鍵和外鍵。 See:
範例
from pyspark import pipelines as dp
# Specify a schema
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dp.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dp.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")
# Specify partition columns
@dp.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
# Specify table constraints
@dp.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
""")
def sales():
return ("...")
# Specify a row filter and column mask
@dp.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
return ("...")