Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Der @materialized_view Dekorateur kann verwendet werden, um materialisierte Ansichten in einer Pipeline zu definieren.
Um eine materialisierte Ansicht zu definieren, wenden Sie @materialized_view auf eine Abfrage an, die einen Batchlesevorgang für eine Datenquelle durchführt.
Syntax
from pyspark import pipelines as dp
@dp.materialized_view(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by_auto = <bool>,
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
private = <bool>)
@dp.expect(...)
def <function-name>():
return (<query>)
Die Parameter
@dp.expect() ist eine optionale Erwartungsklausel. Sie können mehrere Erwartungen einbeziehen. Siehe Erwartungen.
| Parameter | Typ | Description |
|---|---|---|
| Funktion | function |
Erforderlich. Eine Funktion, die einen Apache Spark Batch DataFrame aus einer benutzerdefinierten Abfrage zurückgibt. |
name |
str |
Der Tabellenname. Wenn nicht angegeben, wird standardmäßig der Funktionsname verwendet. |
comment |
str |
Eine Beschreibung für die Tabelle. |
spark_conf |
dict |
Eine Liste der Spark-Konfigurationen für die Ausführung dieser Abfrage |
table_properties |
dict |
Eine dict von Tabelleneigenschaften für die Tabelle |
path |
str |
Ein Speicherort für Tabellendaten. Wenn sie nicht festgelegt ist, verwenden Sie den verwalteten Speicherort für das Schema, das die Tabelle enthält. |
partition_cols |
list |
Eine Liste mit einer oder mehreren Spalten, die für die Partitionierung der Tabelle verwendet werden sollen. |
cluster_by_auto |
bool |
Aktivieren Sie die automatische Flüssigkeitsgruppierung auf dem Tisch. Dies kann mit cluster_by kombiniert werden, und die Spalten werden definiert, die als anfängliche Clusteringschlüssel verwendet werden sollen, gefolgt von der Überwachung und automatischen Aktualisierungen der Schlüsselauswahl basierend auf der Arbeitslast. Siehe Automatische Flüssigkeitsclusterung. |
cluster_by |
list |
Aktivieren des Liquid Clustering für die Tabelle und Definieren der Spalten, die als Clusterschlüssel verwendet werden sollen. Siehe Verwenden von Flüssigclustering für Tabellen. |
schema |
str oder StructType |
Eine Schemadefinition für die Tabelle. Schemas können als SQL-DDL-Zeichenfolge oder mit Python StructType definiert werden |
private |
bool |
Erstellen Sie eine Tabelle, veröffentlichen Sie die Tabelle jedoch nicht im Metastore. Diese Tabelle ist für die Pipeline verfügbar, aber nicht außerhalb der Pipeline zugänglich. Private Tabellen bleiben für die Lebensdauer der Pipeline erhalten. Der Standardwert lautet False. |
row_filter |
str |
(Öffentliche Vorschau) Eine Zeilenfilterklausel für die Tabelle. Siehe Veröffentlichen von Tabellen mit Zeilenfiltern und Spaltenmasken. |
Die Angabe eines Schemas ist optional und kann mit PySpark StructType oder SQL DDL erfolgen. Wenn Sie ein Schema angeben, können Sie optional generierte Spalten, Spaltenmasken und Primär- und Fremdschlüssel einschließen. Siehe:
- Von Delta Lake generierte Spalten
- Einschränkungen für Azure Databricks
- Veröffentlichen Sie Tabellen mit Zeilenfiltern und Spaltenmasken.
Examples
from pyspark import pipelines as dp
# Specify a schema
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dp.materialized_view(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dp.materialized_view(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")
# Specify partition columns
@dp.materialized_view(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
# Specify table constraints
@dp.materialized_view(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
""")
def sales():
return ("...")
# Specify a row filter and column mask
@dp.materialized_view(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
return ("...")