Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
De @table decorator kan worden gebruikt om streamingtabellen in een pijplijn te definiëren.
Als u een streamingtabel wilt definiëren, past u @table toe op een query die een streaming-leesbewerking uitvoert op een gegevensbron, of gebruik de functie create_streaming_table().
Opmerking
In de oudere dlt module werd de @table operator gebruikt om zowel streamingtabellen als gerealiseerde weergaven te maken. De @table operator in de pyspark.pipelines module werkt nog steeds op deze manier, maar Databricks raadt aan de @materialized_view operator te gebruiken om gerealiseerde weergaven te maken.
Syntaxis
from pyspark import pipelines as dp
@dp.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by_auto = <bool>,
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
private = <bool>)
@dp.expect(...)
def <function-name>():
return (<query>)
Parameterwaarden
@dp.expect() is een optionele declaratieve verwachtingclausule voor pijplijnen van Lakeflow Spark. U kunt meerdere verwachtingen opnemen. Zie verwachtingen.
| Kenmerk | Typologie | Description |
|---|---|---|
| functie | function |
Verplicht. Een functie die een Streaming DataFrame van Apache Spark retourneert vanuit een door de gebruiker gedefinieerde query. |
name |
str |
De naam van de tabel. Als deze niet is opgegeven, wordt standaard de functienaam gebruikt. |
comment |
str |
Een beschrijving voor de tabel. |
spark_conf |
dict |
Een lijst met Spark-configuraties voor de uitvoering van deze query |
table_properties |
dict |
Een dict van tabeleigenschappen voor de tabel. |
path |
str |
Een opslaglocatie voor tabelgegevens. Als dit niet is ingesteld, gebruikt u de beheerde opslaglocatie voor het schema met de tabel. |
partition_cols |
list |
Een lijst met een of meer kolommen die moeten worden gebruikt voor het partitioneren van de tabel. |
cluster_by_auto |
bool |
Schakel automatische vloeistofclustering in op de tabel. Dit kan worden gecombineerd met cluster_by en definieer de kolommen die moeten worden gebruikt als initiële clusteringsleutels, gevolgd door bewaking en automatische updates voor sleutelselectie op basis van de workload. Zie Automatische vloeistofclustering. |
cluster_by |
list |
Schakel vloeistofclustering in de tabel in en definieer de kolommen die moeten worden gebruikt als clustersleutels. Zie Liquid Clustering gebruiken voor tabellen. |
schema |
str of StructType |
Een schemadefinitie voor de tabel. Schema's kunnen worden gedefinieerd als een SQL DDL-tekenreeks of met een Python StructType. |
private |
bool |
Maak een tabel, maar publiceer de tabel niet in de metastore. Deze tabel is beschikbaar voor de pijplijn, maar is niet toegankelijk buiten de pijplijn. Privétabellen blijven bestaan voor de levensduur van de pijplijn. De standaardwaarde is False.Er zijn eerder privétabellen gemaakt met de temporary parameter. |
row_filter |
str |
(Openbare preview) Een rijfilterclausule voor de tabel. Zie Tabellen publiceren met rijfilters en kolommaskers. |
Het opgeven van een schema is optioneel en kan worden uitgevoerd met PySpark StructType of SQL DDL. Wanneer u een schema opgeeft, kunt u eventueel gegenereerde kolommen, kolommaskers en primaire en refererende sleutels opnemen. See:
- door Delta Lake gegenereerde kolommen
- Beperkingen voor Azure Databricks
- Tabellen publiceren met rijfilters en kolommaskers.
Voorbeelden
from pyspark import pipelines as dp
# Specify a schema
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dp.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dp.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")
# Specify partition columns
@dp.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
# Specify table constraints
@dp.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
""")
def sales():
return ("...")
# Specify a row filter and column mask
@dp.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
return ("...")