Informations de référence sur le langage SQL dans Delta Live Tables
Cet article contient des détails sur l’interface de programmation SQL de Delta Live Tables.
- Pour des informations sur l’API Python, consultez Informations de référence sur le langage Python dans Delta Live Tables.
- Pour plus d’informations sur les commandes SQL, consultez Référence sur le langage SQL.
Vous pouvez utiliser des fonctions définies par l’utilisateur (UDF) Python dans vos requêtes SQL, mais vous devez définir ces fonctions UDF dans les fichiers Python avant de les appeler dans les fichiers sources SQL. Consultez Fonctions scalaires définies par l'utilisateur – Python.
Limites
La clause PIVOT
n'est pas prise en charge. L’opération pivot
dans Spark nécessite le chargement hâtif des données d’entrée pour calculer le schéma de sortie. Cette fonctionnalité n’est pas prise en charge dans Delta Live Tables.
Créer une vue matérialisée ou une table de diffusion en continu Delta Live Tables
Remarque
- La syntaxe
CREATE OR REFRESH LIVE TABLE
pour créer une vue matérialisée est déconseillée. Utilisez plutôtCREATE OR REFRESH MATERIALIZED VIEW
. - Pour utiliser la clause
CLUSTER BY
afin d’activer le clustering liquide, votre pipeline doit être configuré pour utiliser la chaîne d’aperçu.
Vous utilisez la même syntaxe SQL de base lors de la déclaration d’une table de diffusion en continu ou d’une vue matérialisée.
Déclarer une vue matérialisée Delta Live Tables avec SQL
L’exemple suivant décrit la syntaxe permettant de déclarer une vue matérialisée dans les tables dynamiques Delta avec SQL :
CREATE OR REFRESH MATERIALIZED VIEW view_name [CLUSTER BY (col_name1, col_name2, ... )]
[(
[
col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
[ table_constraint ] [, ...]
)]
[USING DELTA]
[PARTITIONED BY (col_name1, col_name2, ... )]
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
[ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
AS select_statement
Déclarer une table de streaming Delta Live Tables avec SQL
Vous pouvez uniquement déclarer des tables de diffusion en continu à l’aide de requêtes qui lisent sur une source de diffusion en continu. Databricks recommande d’utiliser Auto Loader pour l’ingestion d’une diffusion en continu de fichiers à partir du stockage d’objets cloud. Consultez Syntaxe SQL Auto Loader.
Lorsque vous spécifiez d’autres tables ou vues dans votre pipeline en tant que sources de diffusion en continu, vous devez inclure la fonction STREAM()
autour du nom du jeu de données.
La syntaxe suivante décrit la syntaxe de déclaration d’une table de diffusion en continu dans Delta Live Tables avec SQL :
CREATE OR REFRESH [TEMPORARY] STREAMING TABLE table_name [CLUSTER BY (col_name1, col_name2, ... )]
[(
[
col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
[ table_constraint ] [, ...]
)]
[USING DELTA]
[PARTITIONED BY (col_name1, col_name2, ... )]
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
[ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
AS select_statement
Créer une vue Delta Live Tables
Ce qui suit décrit la syntaxe pour déclarer des vues dans SQL :
CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
[(
[
col_name1 [ COMMENT col_comment1 ],
col_name2 [ COMMENT col_comment2 ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
)]
[COMMENT view_comment]
AS select_statement
Syntaxe SQL Auto Loader
Vous trouverez ci-après une description de la syntaxe permettant d’utiliser Auto Loader dans SQL :
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM read_files(
"<file-path>",
"<file-format>",
map(
"<option-key>", "<option_value",
"<option-key>", "<option_value",
...
)
)
Vous pouvez utiliser des options de format prises en charge avec Auto Loader. À l’aide de la fonction map()
, vous pouvez transmettre des options à la méthode read_files()
. Les options sont des paires clé-valeur, où les clés et les valeurs sont des chaînes. Pour plus d’informations sur les formats et options de prise en charge, consultez Options de format de fichier.
Exemple : définir des tables
Vous pouvez créer un jeu de données en lisant les données à partir d’une source de données externe ou des jeux de données définis dans un pipeline. Pour lire dans un jeu de données interne, ajoutez le mot clé LIVE
au début le nom du jeu de données. L’exemple suivant définit deux jeux de données différents : une table appelée taxi_raw
qui prend un fichier JSON comme source d’entrée et une table appelée filtered_data
qui prend la table taxi_raw
comme entrée :
CREATE OR REFRESH MATERIALIZED VIEW taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`
CREATE OR REFRESH MATERIALIZED VIEW filtered_data
AS SELECT
...
FROM LIVE.taxi_raw
Exemple : lecture à partir d’une source de diffusion en continu
Pour lire des données à partir d’une source de streaming, par exemple Auto Loader ou un jeu de données interne, définissez une table STREAMING
:
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)
Pour plus d’informations sur la diffusion en continu de données, consultez Transformer les données avec Delta Live Tables.
Contrôler la façon dont les tables sont matérialisées
Les tables offrent également un contrôle supplémentaire de leur matérialisation :
- Avec la propriété
PARTITIONED BY
, indiquez de quelle manière les tables sont partitionnées. Le partitionnement vous permet d’accélérer les requêtes. - Vous pouvez définir les propriétés des tables en utilisant
TBLPROPERTIES
. Voir Propriétés des tables Delta Live Tables. - Définissez un emplacement de stockage à l’aide du paramètre
LOCATION
. Par défaut, les données des tables sont stockées dans l’emplacement de stockage du pipeline siLOCATION
n’est pas défini. - Vous pouvez utiliser des colonnes générées dans votre définition de schéma. Voir Exemple : Spécifier un schéma et des colonnes de partition.
Remarque
Pour les tables d’une taille inférieure à 1 To, Databricks recommande de laisser Delta Live Tables contrôler l’organisation des données. Si vous ne prévoyez pas de faire croître votre table de plus d’un téra-octet, Databricks vous recommande de ne pas spécifier de colonnes de partition.
Exemple : spécifier un schéma et des colonnes de partition
Vous pouvez éventuellement spécifier un schéma lorsque vous définissez une table. L’exemple suivant spécifie le schéma de la table cible, notamment l’utilisation de colonnes générées par Delta Lake et la définition de colonnes de partition pour la table :
CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
Par défaut, Delta Live Tables déduit le schéma de la définition table
si vous ne spécifiez pas de schéma.
Exemple : définir des contraintes de table
Remarque
La prise en charge de Delta Live Tables pour les contraintes de table est disponible en préversion publique. Pour définir des contraintes de table, votre pipeline doit être compatible avec Unity Catalog et configuré pour utiliser le canal preview
.
Lorsque vous spécifiez un schéma, vous pouvez définir des clés primaires et étrangères. Les contraintes sont informationnelles et ne sont pas appliquées. Consultez la clause CONSTRAINT dans les informations de référence sur le langage SQL.
L’exemple suivant définit une table avec une contrainte de clé primaire et étrangère :
CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
Paramétrer les valeurs utilisées lors de la déclaration de tables ou de vues avec SQL
Utilisez SET
pour spécifier une valeur de configuration dans une requête qui déclare une table ou une vue, y compris des configurations Spark. Toute table ou vue que vous définissez dans un notebook après l’instruction SET
a accès à la valeur définie. Toutes les configurations Spark spécifiées avec l’instruction SET
sont appliquées lors de l’exécution de la requête Spark sur une table ou une vue définie à la suite de l’instruction SET. Pour lire une valeur de configuration dans une requête, utilisez la syntaxe d’interpolation de chaîne ${}
. L’exemple suivant définit une valeur de configuration Spark nommée startDate
, puis utilise cette valeur dans une requête :
SET startDate='2020-01-01';
CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}
Pour spécifier plusieurs valeurs de configuration, utilisez une instruction SET
distincte pour chaque valeur.
Exemple : définition d’un filtre de lignes et d’un masque de colonnes
Important
Les filtres de lignes et les masques de colonne sont en préversion publique.
Pour créer une vue matérialisée ou une table en continu avec un filtre de ligne et un masque de colonne, utilisez la clause ROW FILTER et la clause MASK. L’exemple suivant montre comment définir une vue matérialisée et une table de diffusion en continu avec un filtre de ligne et un masque de colonne :
CREATE OR REFRESH STREAMING TABLE customers_silver (
id int COMMENT 'This is the customer ID',
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(LIVE.customers_bronze)
CREATE OR REFRESH MATERIALIZED VIEW sales (
customer_id STRING MASK catalog.schema.customer_id_mask_fn,
customer_name STRING,
number_of_line_items STRING COMMENT 'Number of items in the order',
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
)
COMMENT "Raw data on sales"
WITH ROW FILTER catalog.schema.order_number_filter_fn ON (order_number)
AS SELECT * FROM LIVE.sales_bronze
Pour plus d’informations sur les filtres de lignes et les masques de colonnes, consultez Publier des tables avec des filtres de lignes et des masques de colonnes.
Propriétés SQL
Remarque
Pour utiliser la clause CLUSTER BY
afin d’activer le clustering liquide, votre pipeline doit être configuré pour utiliser la chaîne d’aperçu.
CREATE TABLE ou VIEW |
---|
TEMPORARY Créez une table mais ne publiez pas de métadonnées pour la table. La clause TEMPORARY indique à Delta Live Tables de créer une table disponible pour le pipeline, mais qui ne doit pas être accessible en dehors du pipeline. Pour réduire le temps de traitement, une table temporaire persiste pendant la durée de vie du pipeline qui la crée, pas uniquement pour une seule mise à jour. |
STREAMING Crée une table qui lit un jeu de données d’entrée en tant que flux. Le jeu de données d’entrée doit être une source de données en streaming, par exemple Auto Loader ou une table STREAMING . |
CLUSTER BY Activez le clustering liquide sur la table et définissez les colonnes à utiliser comme clés de clustering. Consultez Utilisation des clustering liquides pour les tableaux Delta. |
PARTITIONED BY Liste facultative d’une ou de plusieurs colonnes à utiliser pour le partitionnement de la table. |
LOCATION Emplacement de stockage facultatif pour les données de la table. S’il n’est pas défini, le système utilise par défaut l’emplacement de stockage du pipeline. |
COMMENT Description facultative de la table. |
column_constraint Une contrainte de clé primaire ou de clé étrangère facultative et informationnelle sur la colonne. |
MASK clause (Préversion publique)Permet d’ajouter une fonction de masque de colonne pour anonymiser les données sensibles. Les futures requêtes pour cette colonne retournent le résultat de la fonction évaluée au lieu de la valeur d’origine de la colonne. Cela est utile pour le contrôle d’accès affiné, car la fonction peut vérifier l’identité et les appartenances à un groupe de l’utilisateur afin de décider s’il faut masquer la valeur. Voir Clause de masque de colonne. |
table_constraint Une contrainte de clé primaire ou de clé étrangère facultative et informationnelle sur la table. |
TBLPROPERTIES Liste facultative des propriétés de table disponibles pour la table. |
WITH ROW FILTER clause (Préversion publique)Permet d’ajouter une fonction de filtre de ligne à la table. Toutes les requêtes futures de cette table reçoivent un sous-ensemble de lignes pour lesquelles la fonction prend la valeur TRUE. Cela est utile pour le contrôle d’accès affiné, car la fonction peut inspecter l’identité et les appartenances à un groupe de l’utilisateur appelant afin de décider s’il convient de filtrer certaines lignes. Consultez Clause ROW FILTER. |
select_statement Requête Delta Live Tables qui définit le jeu de données pour la table. |
Clause CONSTRAINT |
---|
EXPECT expectation_name Définit la contrainte de qualité des données expectation_name . Si la contrainte ON VIOLATION n’est pas définie, ajoutez les lignes qui ne la respectent pas dans le jeu de données cible. |
ON VIOLATION Action facultative à effectuer pour les lignes incriminées : - FAIL UPDATE : arrêter immédiatement l’exécution du pipeline.- DROP ROW : supprimer l’enregistrement et continuer le traitement. |
Capture des changements de données avec SQL dans Delta Live Tables
Utilisez l’instruction APPLY CHANGES INTO
pour utiliser la fonctionnalité de capture des changements de données Delta Live Tables, comme décrit dans les éléments suivants :
CREATE OR REFRESH STREAMING TABLE table_name;
APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
Vous définissez des contraintes de qualité des données pour une cible APPLY CHANGES
à l’aide de la même clause CONSTRAINT
que les requêtes non-APPLY CHANGES
. Consultez Gérer la qualité des données avec Delta Live Tables.
Remarque
Le comportement par défaut pour les événements INSERT
et UPDATE
consiste à effectuer un upsert des événements de capture des changements de données à partir de la source : mettre à jour les lignes de la table cible qui correspondent aux clés spécifiées, ou insérer une nouvelle ligne quand un enregistrement correspondant n’existe pas dans la table cible. La gestion des événements DELETE
peut être spécifiée avec la condition APPLY AS DELETE WHEN
.
Important
Vous devez déclarer une table de streaming cible dans laquelle appliquer les modifications. Vous pouvez éventuellement spécifier le schéma de votre table cible. Lorsque vous spécifiez le schéma de la table cible APPLY CHANGES
, vous devez également inclure les colonnes __START_AT
et __END_AT
avec le même type de données que le champ sequence_by
.
Consultez Les API APPLY CHANGES : Simplifiez la capture des changements de données avec Delta Live Tables.