Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Dekorátor @table lze použít k definování streamovaných tabulek v datovém toku.
Pokud chcete definovat streamovací tabulku, použijte @table na dotaz, který provádí streamování čtení proti zdroji dat, nebo použijte funkci create_streaming_table().
Poznámka:
Ve starším dlt modulu se operátor @table použil k vytvoření streamovaných tabulek i materializovaných zobrazení. Operátor @table v pyspark.pipelines modulu stále funguje tímto způsobem, ale Databricks doporučuje použít @materialized_view operátor k vytvoření materializovaných zobrazení.
Syntaxe
from pyspark import pipelines as dp
@dp.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by_auto = <bool>,
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
private = <bool>)
@dp.expect(...)
def <function-name>():
return (<query>)
Parametry
@dp.expect() je volitelná očekávací podmínka deklarativních kanálů Spark Lakeflow. Můžete zahrnout více očekávání. Podívejte se na očekávání.
| Parameter | Typ | Description |
|---|---|---|
| funkce | function |
Povinné. Funkce, která vrací datový rámec streamování Apache Sparku z uživatelem definovaného dotazu. |
name |
str |
Název tabulky. Pokud není zadaný, nastaví se výchozí hodnota názvu funkce. |
comment |
str |
Popis tabulky. |
spark_conf |
dict |
Seznam konfigurací Sparku pro spuštění tohoto dotazu |
table_properties |
dict |
Seznam dict pro tabulku. |
path |
str |
Úložiště dat tabulky. Pokud není nastavené, použijte spravované umístění úložiště pro schéma obsahující tabulku. |
partition_cols |
list |
Seznam jednoho nebo více sloupců, které se mají použít k rozdělení tabulky. |
cluster_by_auto |
bool |
Povolte automatické shlukování kapalin v tabulce. To lze kombinovat s cluster_by a definovat sloupce, které budou použity jako počáteční klíče clusteringu, následované monitorováním a automatickými aktualizacemi výběru klíčů na základě pracovního zatížení. Viz Automatické shlukování kapalin. |
cluster_by |
list |
Povolte tekuté shlukování v tabulce a definujte sloupce, které se mají použít jako klíče shlukování. Viz Použití metody 'liquid clustering' pro tabulky. |
schema |
str nebo StructType |
Definice schématu pro tabulku. Schémata lze definovat jako řetězec DDL SQL nebo pomocí Pythonu StructType. |
private |
bool |
Vytvořte tabulku, ale tabulku nepublikujte do metastoru. Tato tabulka je dostupná pro kanál, ale není přístupná mimo kanál. Privátní tabulky se uchovávají po celou dobu životnosti kanálu. Výchozí hodnota je False.Privátní tabulky byly dříve vytvořeny pomocí parametru temporary . |
row_filter |
str |
(Public Preview) Klauzule řádkového filtru pro tabulku. Viz Publikování tabulek s filtry řádků a maskami sloupců. |
Zadání schématu je volitelné a lze ho provést pomocí PySpark StructType nebo SQL DDL. Při zadávání schématu můžete volitelně zahrnout vygenerované sloupce, masky sloupců a primární a cizí klíče. Viz:
- sloupce generované službou Delta Lake
- Omezení v Azure Databricks
- Publikujte tabulky s filtry řádků a maskami sloupců.
Examples
from pyspark import pipelines as dp
# Specify a schema
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dp.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dp.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")
# Specify partition columns
@dp.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
# Specify table constraints
@dp.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
""")
def sales():
return ("...")
# Specify a row filter and column mask
@dp.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
return ("...")