Partager via


Développer du code de pipeline avec SQL

Lakeflow Declarative Pipelines introduit plusieurs nouveaux mots clés et fonctions SQL pour définir des vues matérialisées et des tables de diffusion en continu dans des pipelines. La prise en charge de SQL pour le développement de pipelines s’appuie sur les principes de base de Spark SQL et ajoute la prise en charge des fonctionnalités Structured Streaming.

Les utilisateurs familiarisés avec les DataFrames PySpark peuvent préférer développer du code de pipeline avec Python. Python prend en charge des tests et des opérations plus étendus qui sont difficiles à implémenter avec SQL, comme les opérations de métagrammation. Consultez Développer du code de pipeline avec Python.

Pour obtenir une référence complète de la syntaxe SQL des pipelines déclaratifs Lakeflow, consultez la référence du langage SQL Des pipelines déclaratifs Lakeflow.

Notions de base de SQL pour le développement de pipelines

Le code SQL qui crée des jeux de données Lakeflow Pipelines déclaratifs utilise la CREATE OR REFRESH syntaxe pour définir des vues matérialisées et des tables en streaming à partir des résultats de requête.

Le mot clé STREAM indique si la source de données référencée dans une clause SELECT doit être lue avec les sémantiques de flux.

Les lectures et écritures par défaut se font dans le catalogue et le schéma spécifiés lors de la configuration du pipeline. Voir Définir le catalogue cible et le schéma.

Le code source des pipelines déclaratifs Lakeflow diffère critiquement des scripts SQL : Les pipelines déclaratifs Lakeflow évaluent toutes les définitions de jeu de données sur tous les fichiers de code source configurés dans un pipeline et créent un graphique de flux de données avant l’exécution de toutes les requêtes. L’ordre des requêtes apparaissant dans un notebook ou un script définit l’ordre d’évaluation du code, mais pas l’ordre d’exécution des requêtes.

Créer une vue matérialisée avec SQL

L’exemple de code suivant illustre la syntaxe de base pour la création d’une vue matérialisée avec SQL :

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Créer une table de diffusion en continu avec SQL

L’exemple de code suivant illustre la syntaxe de base pour la création d’une table de diffusion en continu avec SQL. Lors de la lecture d’une source pour une table de diffusion en continu, le STREAM mot clé indique d’utiliser la sémantique de diffusion en continu pour la source. N’utilisez pas le mot clé STREAM lors de la création d’une vue matérialisée :

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Remarque

Utilisez le mot clé STREAM pour utiliser la sémantique de diffusion en continu pour lire à partir de la source. Si la lecture détecte une modification ou une suppression concernant un enregistrement existant, une erreur est générée. Il est plus sûr de lire depuis des sources statiques ou d’ajout uniquement. Pour ingérer des données ayant des validations de modification, vous pouvez utiliser Python et l’option SkipChangeCommits permettant de gérer les erreurs.

Charger des données à partir du stockage d’objets

Lakeflow Declarative Pipelines prend en charge le chargement de données à partir de tous les formats pris en charge par Azure Databricks. Consultez Options de format de données.

Remarque

Ces exemples utilisent des données disponibles sous les /databricks-datasets automatiquement montés sur votre espace de travail. Databricks recommande d’utiliser des chemins de volume ou des URI cloud pour référencer les données stockées dans le stockage d’objets cloud. Consultez Qu’est-ce que les volumes de catalogue Unity ?.

Databricks recommande d'utiliser Auto Loader et les tables en streaming lors de la configuration des flux de traitement d'ingestion incrémentielle sur les données stockées dans le cloud. Consultez Qu’est-ce que Auto Loader ?.

SQL utilise la fonction read_files pour appeler la fonctionnalité de chargement automatique. Vous devez également utiliser le STREAM mot clé pour configurer une lecture en continu avec read_files.

La syntaxe read_files suivante est décrite dans SQL :

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM STREAM read_files(
    "<file-path>",
    [<option-key> => <option_value>, ...]
  )

Les options du chargeur automatique sont des paires clé-valeur. Pour plus d’informations sur les formats et options pris en charge, consultez Options.

L’exemple suivant crée une table de streaming à partir de fichiers JSON à l’aide du chargeur automatique :

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
  "/databricks-datasets/retail-org/sales_orders",
  format => "json");

La fonction read_files prend également en charge la sémantique de lot pour créer des vues matérialisées. L’exemple suivant utilise la sémantique de traitement par lots pour lire un répertoire JSON et créer une vue matérialisée :

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
  "/databricks-datasets/retail-org/sales_orders",
  format => "json");

Valider les données avec des attentes

Vous pouvez utiliser des attentes pour définir et appliquer des contraintes de qualité des données. Voir Gérer la qualité des données avec les attentes de la chaîne de traitement.

Le code suivant définit une attente nommée valid_data qui supprime les enregistrements null lors de l’ingestion des données :

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Interroger les vues matérialisées et les tables de diffusion en continu définies dans votre pipeline

L’exemple suivant définit quatre jeux de données :

  • Table de diffusion en continu nommée orders qui charge des données JSON.
  • Vue matérialisée nommée customers qui charge les données CSV.
  • Une vue matérialisée nommée customer_orders qui joint des enregistrements à partir des jeux de données orders et customers, convertit l’horodatage de la commande en une date et sélectionne les champs customer_id, order_number, state et order_date.
  • Vue matérialisée nommée daily_orders_by_state qui agrège le nombre quotidien de commandes pour chaque état.

Remarque

Lorsque vous interrogez des vues ou des tables dans votre pipeline, vous pouvez spécifier le catalogue et le schéma directement, ou vous pouvez utiliser les valeurs par défaut configurées dans votre pipeline. Dans cet exemple, les tables orders, customerset customer_orders sont écrites et lues à partir du catalogue et du schéma par défaut configurés pour votre pipeline.

Le mode de publication hérité utilise le schéma LIVE pour interroger d'autres vues matérialisées et tables de streaming définies dans votre pipeline. Dans les nouveaux pipelines, la syntaxe de schéma LIVE est ignorée silencieusement. Consultez le schéma LIVE (hérité).

CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;

Définir une table privée

Vous pouvez utiliser la PRIVATE clause lors de la création d’une vue matérialisée ou d’une table de diffusion en continu. Lorsque vous créez une table privée, vous créez la table, mais ne créez pas les métadonnées de la table. La clause PRIVATE indique aux pipelines déclaratifs Lakeflow de créer une table qui est disponible pour le pipeline, mais ne doit pas être accessible en dehors de celui-ci. Pour réduire le temps de traitement, une table privée persiste pendant la durée de vie du pipeline qui la crée et pas uniquement pour une seule mise à jour.

Les tables privées peuvent avoir le même nom que les tables du catalogue. Si vous spécifiez un nom non qualifié pour une table dans un pipeline, s’il existe à la fois une table privée et une table de catalogue portant ce nom, la table privée sera utilisée.

Les tables privées étaient précédemment appelées tables temporaires.

Supprimer définitivement les enregistrements d’une vue matérialisée ou d’une table de diffusion en continu

Pour supprimer définitivement les enregistrements d’une table de diffusion en continu avec des vecteurs de suppression activés, comme pour la conformité RGPD, des opérations supplémentaires doivent être effectuées sur les tables Delta sous-jacentes de l’objet. Pour garantir la suppression d’enregistrements d’une table de diffusion en continu, consultez Supprimer définitivement les enregistrements d’une table de diffusion en continu.

Les vues matérialisées reflètent toujours les données des tables sous-jacentes lorsqu’elles sont actualisées. Pour supprimer des données dans une vue matérialisée, vous devez supprimer les données de la source et actualiser la vue matérialisée.

Paramétrer les valeurs utilisées lors de la déclaration de tables ou de vues avec SQL

Utilisez SET pour spécifier une valeur de configuration dans une requête qui déclare une table ou une vue, y compris des configurations Spark. Toute table ou vue que vous définissez dans un notebook après l’instruction SET a accès à la valeur définie. Toutes les configurations Spark spécifiées à l’aide de l’instruction SET sont utilisées lors de l’exécution de la requête Spark pour n’importe quelle table ou vue en suivant l’instruction SET. Pour lire une valeur de configuration dans une requête, utilisez la syntaxe d’interpolation de chaîne ${}. L’exemple suivant définit une valeur de configuration Spark nommée startDate, puis utilise cette valeur dans une requête :

SET startDate='2025-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

Pour spécifier plusieurs valeurs de configuration, utilisez une instruction SET distincte pour chaque valeur.

Limites

La clause PIVOT n'est pas prise en charge. L’opération pivot dans Spark nécessite le chargement hâtif des données d’entrée pour calculer le schéma de sortie. Cette fonctionnalité n’est pas prise en charge dans les pipelines déclaratifs Lakeflow.

Remarque

La syntaxe CREATE OR REFRESH LIVE TABLE pour créer une vue matérialisée est déconseillée. Au lieu de cela, utilisez CREATE OR REFRESH MATERIALIZED VIEW.