Notes
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Lakeflow Declarative Pipelines introduit plusieurs nouveaux mots clés et fonctions SQL pour définir des vues matérialisées et des tables de diffusion en continu dans des pipelines. La prise en charge de SQL pour le développement de pipelines s’appuie sur les principes de base de Spark SQL et ajoute la prise en charge des fonctionnalités Structured Streaming.
Les utilisateurs familiarisés avec les DataFrames PySpark peuvent préférer développer du code de pipeline avec Python. Python prend en charge des tests et des opérations plus étendus qui sont difficiles à implémenter avec SQL, comme les opérations de métagrammation. Consultez Développer du code de pipeline avec Python.
Pour obtenir une référence complète de la syntaxe SQL des pipelines déclaratifs Lakeflow, consultez la référence du langage SQL Des pipelines déclaratifs Lakeflow.
Notions de base de SQL pour le développement de pipelines
Le code SQL qui crée des jeux de données Lakeflow Pipelines déclaratifs utilise la CREATE OR REFRESH
syntaxe pour définir des vues matérialisées et des tables en streaming à partir des résultats de requête.
Le mot clé STREAM
indique si la source de données référencée dans une clause SELECT
doit être lue avec les sémantiques de flux.
Les lectures et écritures par défaut se font dans le catalogue et le schéma spécifiés lors de la configuration du pipeline. Voir Définir le catalogue cible et le schéma.
Le code source des pipelines déclaratifs Lakeflow diffère critiquement des scripts SQL : Les pipelines déclaratifs Lakeflow évaluent toutes les définitions de jeu de données sur tous les fichiers de code source configurés dans un pipeline et créent un graphique de flux de données avant l’exécution de toutes les requêtes. L’ordre des requêtes apparaissant dans un notebook ou un script définit l’ordre d’évaluation du code, mais pas l’ordre d’exécution des requêtes.
Créer une vue matérialisée avec SQL
L’exemple de code suivant illustre la syntaxe de base pour la création d’une vue matérialisée avec SQL :
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
Créer une table de diffusion en continu avec SQL
L’exemple de code suivant illustre la syntaxe de base pour la création d’une table de diffusion en continu avec SQL. Lors de la lecture d’une source pour une table de diffusion en continu, le STREAM
mot clé indique d’utiliser la sémantique de diffusion en continu pour la source. N’utilisez pas le mot clé STREAM
lors de la création d’une vue matérialisée :
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
Remarque
Utilisez le mot clé STREAM pour utiliser la sémantique de diffusion en continu pour lire à partir de la source. Si la lecture détecte une modification ou une suppression concernant un enregistrement existant, une erreur est générée. Il est plus sûr de lire depuis des sources statiques ou d’ajout uniquement. Pour ingérer des données ayant des validations de modification, vous pouvez utiliser Python et l’option SkipChangeCommits
permettant de gérer les erreurs.
Charger des données à partir du stockage d’objets
Lakeflow Declarative Pipelines prend en charge le chargement de données à partir de tous les formats pris en charge par Azure Databricks. Consultez Options de format de données.
Remarque
Ces exemples utilisent des données disponibles sous les /databricks-datasets
automatiquement montés sur votre espace de travail. Databricks recommande d’utiliser des chemins de volume ou des URI cloud pour référencer les données stockées dans le stockage d’objets cloud. Consultez Qu’est-ce que les volumes de catalogue Unity ?.
Databricks recommande d'utiliser Auto Loader et les tables en streaming lors de la configuration des flux de traitement d'ingestion incrémentielle sur les données stockées dans le cloud. Consultez Qu’est-ce que Auto Loader ?.
SQL utilise la fonction read_files
pour appeler la fonctionnalité de chargement automatique. Vous devez également utiliser le STREAM
mot clé pour configurer une lecture en continu avec read_files
.
La syntaxe read_files
suivante est décrite dans SQL :
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM STREAM read_files(
"<file-path>",
[<option-key> => <option_value>, ...]
)
Les options du chargeur automatique sont des paires clé-valeur. Pour plus d’informations sur les formats et options pris en charge, consultez Options.
L’exemple suivant crée une table de streaming à partir de fichiers JSON à l’aide du chargeur automatique :
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
La fonction read_files
prend également en charge la sémantique de lot pour créer des vues matérialisées. L’exemple suivant utilise la sémantique de traitement par lots pour lire un répertoire JSON et créer une vue matérialisée :
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
Valider les données avec des attentes
Vous pouvez utiliser des attentes pour définir et appliquer des contraintes de qualité des données. Voir Gérer la qualité des données avec les attentes de la chaîne de traitement.
Le code suivant définit une attente nommée valid_data
qui supprime les enregistrements null lors de l’ingestion des données :
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Interroger les vues matérialisées et les tables de diffusion en continu définies dans votre pipeline
L’exemple suivant définit quatre jeux de données :
- Table de diffusion en continu nommée
orders
qui charge des données JSON. - Vue matérialisée nommée
customers
qui charge les données CSV. - Une vue matérialisée nommée
customer_orders
qui joint des enregistrements à partir des jeux de donnéesorders
etcustomers
, convertit l’horodatage de la commande en une date et sélectionne les champscustomer_id
,order_number
,state
etorder_date
. - Vue matérialisée nommée
daily_orders_by_state
qui agrège le nombre quotidien de commandes pour chaque état.
Remarque
Lorsque vous interrogez des vues ou des tables dans votre pipeline, vous pouvez spécifier le catalogue et le schéma directement, ou vous pouvez utiliser les valeurs par défaut configurées dans votre pipeline. Dans cet exemple, les tables orders
, customers
et customer_orders
sont écrites et lues à partir du catalogue et du schéma par défaut configurés pour votre pipeline.
Le mode de publication hérité utilise le schéma LIVE
pour interroger d'autres vues matérialisées et tables de streaming définies dans votre pipeline. Dans les nouveaux pipelines, la syntaxe de schéma LIVE
est ignorée silencieusement. Consultez le schéma LIVE (hérité).
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;
Définir une table privée
Vous pouvez utiliser la PRIVATE
clause lors de la création d’une vue matérialisée ou d’une table de diffusion en continu. Lorsque vous créez une table privée, vous créez la table, mais ne créez pas les métadonnées de la table. La clause PRIVATE
indique aux pipelines déclaratifs Lakeflow de créer une table qui est disponible pour le pipeline, mais ne doit pas être accessible en dehors de celui-ci. Pour réduire le temps de traitement, une table privée persiste pendant la durée de vie du pipeline qui la crée et pas uniquement pour une seule mise à jour.
Les tables privées peuvent avoir le même nom que les tables du catalogue. Si vous spécifiez un nom non qualifié pour une table dans un pipeline, s’il existe à la fois une table privée et une table de catalogue portant ce nom, la table privée sera utilisée.
Les tables privées étaient précédemment appelées tables temporaires.
Supprimer définitivement les enregistrements d’une vue matérialisée ou d’une table de diffusion en continu
Pour supprimer définitivement les enregistrements d’une table de diffusion en continu avec des vecteurs de suppression activés, comme pour la conformité RGPD, des opérations supplémentaires doivent être effectuées sur les tables Delta sous-jacentes de l’objet. Pour garantir la suppression d’enregistrements d’une table de diffusion en continu, consultez Supprimer définitivement les enregistrements d’une table de diffusion en continu.
Les vues matérialisées reflètent toujours les données des tables sous-jacentes lorsqu’elles sont actualisées. Pour supprimer des données dans une vue matérialisée, vous devez supprimer les données de la source et actualiser la vue matérialisée.
Paramétrer les valeurs utilisées lors de la déclaration de tables ou de vues avec SQL
Utilisez SET
pour spécifier une valeur de configuration dans une requête qui déclare une table ou une vue, y compris des configurations Spark. Toute table ou vue que vous définissez dans un notebook après l’instruction SET
a accès à la valeur définie. Toutes les configurations Spark spécifiées à l’aide de l’instruction SET
sont utilisées lors de l’exécution de la requête Spark pour n’importe quelle table ou vue en suivant l’instruction SET. Pour lire une valeur de configuration dans une requête, utilisez la syntaxe d’interpolation de chaîne ${}
. L’exemple suivant définit une valeur de configuration Spark nommée startDate
, puis utilise cette valeur dans une requête :
SET startDate='2025-01-01';
CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}
Pour spécifier plusieurs valeurs de configuration, utilisez une instruction SET
distincte pour chaque valeur.
Limites
La clause PIVOT
n'est pas prise en charge. L’opération pivot
dans Spark nécessite le chargement hâtif des données d’entrée pour calculer le schéma de sortie. Cette fonctionnalité n’est pas prise en charge dans les pipelines déclaratifs Lakeflow.
Remarque
La syntaxe CREATE OR REFRESH LIVE TABLE
pour créer une vue matérialisée est déconseillée. Au lieu de cela, utilisez CREATE OR REFRESH MATERIALIZED VIEW
.