Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Une table de diffusion en continu est une table qui prend en charge la diffusion en continu ou le traitement incrémentiel des données. Les tables de diffusion en continu sont soutenues par des pipelines. Chaque fois qu’une table de diffusion en continu est actualisée, les données ajoutées aux tables sources sont ajoutées à la table de diffusion en continu. Vous pouvez actualiser les tables de diffusion en continu manuellement ou selon une planification.
Pour en savoir plus sur l’exécution ou la planification des actualisations, consultez Exécuter une mise à jour de pipeline.
Syntaxe
CREATE [OR REFRESH] [PRIVATE] STREAMING TABLE
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ column_constraint ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL | COMMENT column_comment | column_constraint | MASK clause } [ ... ]
table_clauses
{ USING DELTA
PARTITIONED BY (col [, ...]) |
CLUSTER BY clause |
LOCATION path |
COMMENT view_comment |
TBLPROPERTIES clause |
WITH { ROW FILTER clause } } [ ... ]
Paramètres
REFRESH
Si elle est spécifiée, crée la table ou met à jour une table existante et son contenu.
PRIVÉ
Crée une table de diffusion en continu privée.
- Ils ne sont pas ajoutés au catalogue et sont uniquement accessibles dans le pipeline de définition
- Ils peuvent avoir le même nom qu’un objet existant dans le catalogue. Dans le pipeline, si une table de diffusion en continu privée et un objet dans le catalogue ont le même nom, les références à ce nom se référeront à la table de diffusion en continu privée.
- Les tables de diffusion en continu privées sont sauvegardées pendant la durée de vie complète du pipeline, et non pas seulement pour une mise à jour unique.
Les tables de diffusion en continu privées ont été créées précédemment avec le
TEMPORARYparamètre.table_name
Le nom de la table nouvellement créée. Le nom complet de la table doit être unique.
spécification_de_table
Cette clause facultative définit la liste des colonnes, leurs types, leurs propriétés, leurs descriptions et leurs contraintes de colonne.
-
Les noms de colonnes doivent être uniques et mappés aux colonnes de sortie de la requête.
-
Spécifie le type de données de la colonne. Tous les types de données pris en charge par Azure Databricks ne sont pas pris en charge par les tables de streaming.
column_comment
Littéral
STRINGfacultatif décrivant la colonne. Cette option doit être spécifiée aveccolumn_type. Si le type de colonne n’est pas spécifié, le commentaire de colonne est ignoré.-
Ajoute une contrainte qui valide les données au fur et à mesure qu’elles circulent dans la table. Voir Gérer la qualité des données avec les attentes de la chaîne de traitement.
-
Important
Cette fonctionnalité est disponible en préversion publique.
Permet d’ajouter une fonction de masque de colonne pour anonymiser les données sensibles.
-
contrainte_de_table
Important
Cette fonctionnalité est disponible en préversion publique.
Lorsque vous spécifiez un schéma, vous pouvez définir des clés primaires et étrangères. Les contraintes sont informationnelles et ne sont pas appliquées. Consultez la clause CONSTRAINT dans la référence du langage SQL.
Note
Pour définir des contraintes de table, votre pipeline doit être compatible avec le Unity Catalog.
table_des_clauses
Spécifiez éventuellement les propriétés de partitionnement, de commentaires et définies par l’utilisateur pour la table. Chaque sous-clause ne peut être spécifiée qu’une seule fois.
UTILISATION DE DELTA
Spécifie le format de données. La seule option est DELTA.
Cette clause est facultative et est définie par défaut sur DELTA.
PARTITIONNÉ PAR
Liste facultative d’une ou plusieurs colonnes à utiliser pour le partitionnement dans la table. Mutuellement exclusif avec
CLUSTER BY.Le clustering liquide offre une solution flexible et optimisée pour le regroupement. Envisagez d’utiliser
CLUSTER BYplutôt quePARTITIONED BYpour les pipelines.CLUSTER BY
Activez le clustering liquide sur la table et définissez les colonnes à utiliser comme clés de clustering. Utilisez le clustering liquide automatique avec
CLUSTER BY AUTO, et Databricks choisit intelligemment les clés de clustering pour optimiser les performances des requêtes. Mutuellement exclusif avecPARTITIONED BY.EMPLACEMENT
Emplacement de stockage facultatif pour les données de la table. S’il n’est pas défini, le système utilise par défaut l’emplacement de stockage du pipeline.
COMMENTAIRE
Littéral
STRINGfacultatif pour décrire la colonne.TBLPROPERTIES
Liste facultative des propriétés de table disponibles pour la table.
AVEC ROW FILTER
Important
Cette fonctionnalité est disponible en préversion publique.
Ajoute une fonction de filtre de ligne au tableau. Toutes les requêtes futures de cette table reçoivent un sous-ensemble de lignes pour lesquelles la fonction prend la valeur TRUE. Cela est utile pour le contrôle d’accès affiné, car la fonction peut inspecter l’identité et les appartenances à un groupe de l’utilisateur appelant afin de décider s’il convient de filtrer certaines lignes.
Consultez la clause
ROW FILTER.-
Cette clause remplit la table à l’aide des données de
query. Cette requête doit être une requête de diffusion en continu . Utilisez le mot clé STREAM pour utiliser la sémantique de diffusion en continu pour lire à partir de la source. Si la lecture détecte une modification ou une suppression concernant un enregistrement existant, une erreur est générée. Il est plus sûr de lire depuis des sources statiques ou d’ajout uniquement. Pour ingérer des données ayant des validations de modification, vous pouvez utiliser Python et l’optionSkipChangeCommitspermettant de gérer les erreurs.Lorsque vous spécifiez un
queryet untable_specificationensemble, le schéma de table spécifié danstable_specificationdoit contenir toutes les colonnes retournées par lequery, sinon vous obtenez une erreur. Toutes les colonnes spécifiées danstable_specificationmais pas renvoyées parqueryrenvoient des valeursnulllors de la requête.Pour plus d’informations sur la diffusion en continu des données, consultez Transformer des données avec des pipelines.
Autorisations requises
L’utilisateur d’identification pour un pipeline doit avoir les autorisations suivantes :
- Le privilège
SELECTsur les tables de base référencées par la table de diffusion en continu. - Le privilège
USE CATALOGsur le catalogue parent et le privilègeUSE SCHEMAsur le schéma parent. - Le privilège
CREATE MATERIALIZED VIEWsur le schéma pour la table de diffusion en continu.
Pour qu’un utilisateur puisse mettre à jour le pipeline dans lequel la table de diffusion en continu est définie, il a besoin des éléments suivants :
- Le privilège
USE CATALOGsur le catalogue parent et le privilègeUSE SCHEMAsur le schéma parent. - La propriété de la table de diffusion en continu ou le privilège
REFRESHsur la table de diffusion en continu. - Le propriétaire de la table de streaming doit avoir le privilège
SELECTsur les tables de base référencées par la table de streaming.
Pour qu’un utilisateur puisse interroger la table de diffusion en continu résultante, il a besoin des éléments suivants :
- Le privilège
USE CATALOGsur le catalogue parent et le privilègeUSE SCHEMAsur le schéma parent. - Le privilège
SELECTsur la table de diffusion en continu.
Limites
- Seuls les propriétaires de tables peuvent actualiser les tables de streaming pour obtenir les données les plus récentes.
-
ALTER TABLEles commandes ne sont pas autorisées sur les tables de streaming. La définition et les propriétés de la table doivent être modifiées par le biais de l'instructionCREATE OR REFRESHou de l'instruction ALTER STREAMING TABLE. - L’évolution du schéma de table via des commandes DML telles que
INSERT INTOetMERGEn’est pas prise en charge. - Les commandes suivantes ne sont pas prises en charge sur les tables de streaming :
CREATE TABLE ... CLONE <streaming_table>COPY INTOANALYZE TABLERESTORETRUNCATEGENERATE MANIFEST[CREATE OR] REPLACE TABLE
- Le changement de nom de la table ou du propriétaire n'est pas supporté.
- Les colonnes générées, les colonnes d’identité et les colonnes par défaut ne sont pas prises en charge.
Examples
-- Define a streaming table from a volume of files:
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/customers/*", format => "csv")
-- Define a streaming table from a streaming source table:
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)
-- Define a table with a row filter and column mask:
CREATE OR REFRESH STREAMING TABLE customers_silver (
id int COMMENT 'This is the customer ID',
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(customers_bronze)
-- Define a streaming table that you can add flows into:
CREATE OR REFRESH STREAMING TABLE orders;