Informations de référence sur le langage SQL dans Delta Live Tables

Cet article fournit des détails sur l’interface de programmation SQL de Delta Live Tables.

Vous pouvez utiliser des fonctions définies par l’utilisateur (UDF) Python dans vos requêtes SQL, mais vous devez définir ces fonctions UDF dans les fichiers Python avant de les appeler dans les fichiers sources SQL. Consultez Fonctions scalaires définies par l'utilisateur – Python.

Limites

La clause PIVOT n'est pas prise en charge. L’opération pivot dans Spark nécessite un chargement hâtif des données d’entrée pour calculer le schéma de la sortie. Cette fonctionnalité n’est pas prise en charge dans Delta Live Tables.

Créer une vue matérialisée ou une table de diffusion en continu Delta Live Tables

Vous utilisez la même syntaxe SQL de base lors de la déclaration d’une table de diffusion en continu ou d’une vue matérialisée (également appelée LIVE TABLE).

Vous pouvez uniquement déclarer des tables de diffusion en continu à l’aide de requêtes qui lisent sur une source de diffusion en continu. Databricks recommande d’utiliser Auto Loader pour l’ingestion d’une diffusion en continu de fichiers à partir du stockage d’objets cloud. Consultez Syntaxe SQL Auto Loader.

Vous devez inclure la fonction STREAM() autour d’un nom de jeu de données lors de la spécification d’autres tables ou vues dans votre pipeline en tant que source de diffusion en continu.

Ce qui suit décrit la syntaxe pour déclarer des vues matérialisées et des tables de diffusion en continu avec SQL :

CREATE OR REFRESH [TEMPORARY] { STREAMING TABLE | LIVE TABLE } table_name
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  AS select_statement

Créer une vue Delta Live Tables

Ce qui suit décrit la syntaxe pour déclarer des vues dans SQL :

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

Syntaxe SQL Auto Loader

Vous trouverez ci-après une description de la syntaxe permettant d’utiliser Auto Loader dans SQL :

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM cloud_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

Vous pouvez utiliser des options de format prises en charge avec Auto Loader. À l’aide de la fonction map(), vous pouvez passer n’importe quel nombre d’options à la méthode cloud_files(). Les options sont des paires clé-valeur, où les clés et les valeurs sont des chaînes. Pour plus d’informations sur les formats et options de prise en charge, consultez Options de format de fichier.

Exemple : définir des tables

Vous pouvez créer un jeu de données en lisant les données à partir d’une source de données externe ou des jeux de données définis dans un pipeline. Pour lire dans un jeu de données interne, ajoutez le mot clé LIVE au début le nom du jeu de données. L’exemple suivant définit deux jeux de données différents : une table appelée taxi_raw qui prend un fichier JSON comme source d’entrée et une table appelée filtered_data qui prend la table taxi_raw comme entrée :

CREATE OR REFRESH LIVE TABLE taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`

CREATE OR REFRESH LIVE TABLE filtered_data
AS SELECT
  ...
FROM LIVE.taxi_raw

Exemple : lecture à partir d’une source de diffusion en continu

Pour lire des données à partir d’une source de streaming, par exemple Auto Loader ou un jeu de données interne, définissez une table STREAMING :

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)

Pour plus d’informations sur la diffusion en continu de données, consultez Transformer les données avec Delta Live Tables.

Contrôler la façon dont les tables sont matérialisées

Les tables offrent également un contrôle supplémentaire de leur matérialisation :

Remarque

Pour les tables d’une taille inférieure à 1 To, Databricks recommande de laisser Delta Live Tables contrôler l’organisation des données. À moins que vous ne vous attendiez à ce que votre table dépasse un téraoctet, vous ne devez généralement pas spécifier de colonnes de partition.

Exemple : spécifier un schéma et des colonnes de partition

Vous pouvez éventuellement spécifier un schéma lorsque vous définissez une table. L’exemple suivant spécifie le schéma de la table cible, notamment l’utilisation de colonnes générées par Delta Lake et la définition de colonnes de partition pour la table :

CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Par défaut, Delta Live Tables déduit le schéma de la définition table si vous ne spécifiez pas de schéma.

Exemple : définir des contraintes de table

Remarque

La prise en charge des contraintes de table est en préversion publique. Pour définir des contraintes de table, votre pipeline doit être compatible avec Unity Catalog et configuré pour utiliser le canal preview.

Lorsque vous spécifiez un schéma, vous pouvez définir la clé primaire et les clés étrangères. Les contraintes sont informationnelles et ne sont pas appliquées. L’exemple suivant définit une table avec une contrainte de clé primaire et étrangère :

CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING NOT NULL PRIMARY KEY,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
  CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Définir les valeurs de configuration d’une table ou d’une vue

Utilisez SET pour spécifier une valeur de configuration d’une table ou d’une vue, y compris des configurations Spark. Toute table ou vue que vous définissez dans un notebook après l’instruction SET a accès à la valeur définie. Toutes les configurations Spark spécifiées avec l’instruction SET sont appliquées lors de l’exécution de la requête Spark sur une table ou une vue définie à la suite de l’instruction SET. Pour lire une valeur de configuration dans une requête, utilisez la syntaxe d’interpolation de chaîne ${}. L’exemple suivant définit une valeur de configuration Spark nommée startDate, puis utilise cette valeur dans une requête :

SET startDate='2020-01-01';

CREATE OR REFRESH LIVE TABLE filtered
AS SELECT * FROM src
WHERE date > ${startDate}

Pour spécifier plusieurs valeurs de configuration, utilisez une instruction SET distincte pour chaque valeur.

Propriétés SQL

CREATE TABLE ou VIEW
TEMPORARY

Créez une table mais ne publiez pas de métadonnées pour la table. La clause TEMPORARY indique à Delta Live Tables de créer une table disponible pour le pipeline, mais qui ne doit pas être accessible en dehors du pipeline. Pour réduire le temps de traitement, une table temporaire persiste pendant la durée de vie du pipeline qui la crée, pas uniquement pour une seule mise à jour.
STREAMING

Crée une table qui lit un jeu de données d’entrée en tant que flux. Le jeu de données d’entrée doit être une source de données en streaming, par exemple Auto Loader ou une table STREAMING.
PARTITIONED BY

Liste facultative d’une ou de plusieurs colonnes à utiliser pour le partitionnement de la table.
LOCATION

Emplacement de stockage facultatif pour les données de la table. S’il n’est pas défini, le système utilise par défaut l’emplacement de stockage du pipeline.
COMMENT

Description facultative de la table.
column_constraint

Une contrainte de clé primaire ou de clé étrangère facultative et informationnelle sur la colonne.
table_constraint

Une contrainte de clé primaire ou de clé étrangère facultative et informationnelle sur la table.
TBLPROPERTIES

Liste facultative des propriétés de table disponibles pour la table.
select_statement

Requête Delta Live Tables qui définit le jeu de données pour la table.
Clause CONSTRAINT
EXPECT expectation_name

Définit la contrainte de qualité des données expectation_name. Si la contrainte ON VIOLATION n’est pas définie, ajoutez les lignes qui ne respectent pas la contrainte dans le jeu de données cible.
ON VIOLATION

Action facultative à effectuer pour les lignes incriminées :

* FAIL UPDATE : arrêter immédiatement l’exécution du pipeline.
* DROP ROW : supprimer l’enregistrement et continuer le traitement.

Capture des changements de données avec SQL dans Delta Live Tables

Utilisez l’instruction APPLY CHANGES INTO pour utiliser la fonctionnalité de capture des changements de données Delta Live Tables, comme décrit dans les éléments suivants :

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Vous définissez des contraintes de qualité des données pour une cible APPLY CHANGES à l’aide de la même clause CONSTRAINT que les requêtes non-APPLY CHANGES. Consultez Gérer la qualité des données avec Delta Live Tables.

Remarque

Le comportement par défaut pour les événements INSERT et UPDATE consiste à effectuer un upsert des événements de capture des changements de données à partir de la source : mettre à jour les lignes de la table cible qui correspondent aux clés spécifiées, ou insérer une nouvelle ligne quand un enregistrement correspondant n’existe pas dans la table cible. La gestion des événements DELETE peut être spécifiée avec la condition APPLY AS DELETE WHEN.

Important

Vous devez déclarer une table de streaming cible dans laquelle appliquer les modifications. Vous pouvez éventuellement spécifier le schéma de votre table cible. Lorsque vous spécifiez le schéma de la table cible APPLY CHANGES, vous devez également inclure les colonnes __START_AT et __END_AT avec le même type de données que le champ sequence_by.

Consultez Capture des changements de données simplifiée avec l’API APPLY CHANGES dans Delta Live Tables.

Clauses
KEYS

Colonne ou combinaison de colonnes identifiant de façon unique une ligne dans les données sources. Utilisée pour identifier les événements de capture des changements de données qui s’appliquent à des enregistrements spécifiques dans la table cible.

La clause est obligatoire.
IGNORE NULL UPDATES

Autoriser l’ingestion des mises à jour contenant un sous-ensemble des colonnes cibles. Quand un événement de capture des changements de données correspond à une ligne existante et que la commande IGNORE NULL UPDATES est spécifiée, les colonnes contenant null conservent leurs valeurs existantes dans la cible. Cela s’applique également aux colonnes imbriquées avec une valeur de null.

Cette clause est facultative.

La valeur par défaut consiste à remplacer les colonnes existantes par des valeurs null.
APPLY AS DELETE WHEN

Spécifie quand un événement de capture des changements de données doit être traité en tant qu’opération DELETE plutôt qu’opération upsert. Pour gérer des données non ordonnées, la ligne supprimée est conservée temporairement en tant qu’objet tombstone dans la table Delta sous-jacente, et un affichage est créé dans le metastore, qui filtre ces objets tombstone. Vous pouvez configurer l’intervalle de conservation avec la
pipelines.cdc.tombstoneGCThresholdInSecondsPropriété de tableau.

Cette clause est facultative.
APPLY AS TRUNCATE WHEN

Spécifie quand un événement de capture des changements de données doit être traité en tant que TRUNCATE de table complet. Étant donné que cette clause déclenche une troncation complète de la table cible, elle doit être utilisée uniquement pour des cas d’usage spécifiques nécessitant cette fonctionnalité.

La clause APPLY AS TRUNCATE WHEN est prise en charge uniquement pour le type SCD 1. Le type SCD 2 ne prend pas en charge la troncation.

Cette clause est facultative.
SEQUENCE BY

Nom de colonne spécifiant l’ordre logique des événements de capture des changements de données dans les données sources. Delta Live Tables utilise ce séquencement pour gérer les événements de modification qui se produisent dans le désordre.

La clause est obligatoire.
COLUMNS

Spécifie un sous-ensemble de colonnes à inclure dans la table cible. Vous pouvez :

* Spécifier la liste complète des colonnes à inclure : COLUMNS (userId, name, city).
* Spécifier une liste de colonnes à exclure : COLUMNS * EXCEPT (operation, sequenceNum).

Cette clause est facultative.

Par défaut, quand la clause COLUMNS n’est pas spécifiée, toutes les colonnes dans la table cible sont incluses.
STORED AS

Indique s’il faut stocker des enregistrements en tant que méthode SCD de type 1 ou méthode SCD de type 2.

Cette clause est facultative.

La valeur par défaut est la méthode SCD de type 1.
TRACK HISTORY ON

Spécifie un sous-ensemble de colonnes de sortie pour générer des enregistrements d’historique en cas de modification de ces colonnes spécifiées. Vous pouvez :

* Spécifier la liste complète des colonnes à suivre : COLUMNS (userId, name, city).
* Spécifiez une liste de colonnes à exclure du suivi : COLUMNS * EXCEPT (operation, sequenceNum)

Cette clause est facultative. La valeur par défaut est l’historique de suivi pour toutes les colonnes de sortie en cas de modifications, équivalent à TRACK HISTORY ON *.