Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Le @materialized_view décorateur peut être utilisé pour définir des vues matérialisées dans un pipeline.
Pour définir une vue matérialisée, appliquez @materialized_view une requête qui effectue une lecture par lots sur une source de données.
Syntaxe
from pyspark import pipelines as dp
@dp.materialized_view(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by_auto = <bool>,
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
private = <bool>)
@dp.expect(...)
def <function-name>():
return (<query>)
Paramètres
@dp.expect() est une clause d’attente facultative. Vous pouvez inclure plusieurs attentes. Voir les attentes.
| Paramètre | Type | Descriptif |
|---|---|---|
| fonction | function |
Obligatoire. Fonction qui retourne un DataFrame batch Apache Spark à partir d’une requête définie par l’utilisateur. |
name |
str |
Nom de la table. S’il n’est pas fourni, la valeur par défaut est le nom de la fonction. |
comment |
str |
Description de la table. |
spark_conf |
dict |
Liste des configurations Spark pour l’exécution de cette requête |
table_properties |
dict |
Une dict de propriétés de table pour la table. |
path |
str |
Emplacement de stockage pour les données de table. Si ce n’est pas le cas, utilisez l’emplacement de stockage managé pour le schéma contenant la table. |
partition_cols |
list |
Liste d’une ou de plusieurs colonnes à utiliser pour partitionner la table. |
cluster_by_auto |
bool |
Activez le clustering liquide automatique sur la table. Cela peut être combiné avec cluster_by et définir les colonnes à utiliser comme clés de clustering initiales, suivie de la surveillance et des mises à jour de sélection automatique de clés en fonction de la charge de travail. Consultez le regroupement automatique de liquide. |
cluster_by |
list |
Activez le clustering liquide sur la table et définissez les colonnes à utiliser comme clés de clustering. Consultez Utilisation de Liquid Clustering pour les tables. |
schema |
str ou StructType |
Définition de schéma pour la table. Les schémas peuvent être définis en tant que chaîne SQL DDL ou avec StructTypePython. |
private |
bool |
Créez une table, mais ne publiez pas la table dans le metastore. Cette table est disponible pour le pipeline, mais elle n’est pas accessible en dehors du pipeline. Les tables privées sont persistantes pendant toute la durée de vie du pipeline. La valeur par défaut est False. |
row_filter |
str |
(Préversion publique) Clause de filtre de ligne pour la table. Consultez Publier des tables avec des filtres de lignes et des masques de colonne. |
La spécification d’un schéma est facultative et peut être effectuée avec PySpark StructType ou SQL DDL. Lorsque vous spécifiez un schéma, vous pouvez éventuellement inclure des colonnes générées, des masques de colonne et des clés primaires et étrangères. Voir:
- Colonnes générées par Delta Lake
- Contraintes sur Azure Databricks
- Publiez des tables avec des filtres de lignes et des masques de colonne.
Examples
from pyspark import pipelines as dp
# Specify a schema
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dp.materialized_view(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dp.materialized_view(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")
# Specify partition columns
@dp.materialized_view(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
# Specify table constraints
@dp.materialized_view(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
""")
def sales():
return ("...")
# Specify a row filter and column mask
@dp.materialized_view(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
return ("...")