Partager via


CREATE STREAMING TABLE (canaux)

Une table de diffusion en continu est une table qui prend en charge la diffusion en continu ou le traitement incrémentiel des données. Les tables de diffusion en continu sont soutenues par des pipelines. Chaque fois qu’une table de diffusion en continu est actualisée, les données ajoutées aux tables sources sont ajoutées à la table de diffusion en continu. Vous pouvez actualiser les tables de diffusion en continu manuellement ou selon une planification.

Pour en savoir plus sur l’exécution ou la planification des actualisations, consultez Exécuter une mise à jour de pipeline.

Syntaxe

CREATE [OR REFRESH] [PRIVATE] STREAMING TABLE
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ column_constraint ] [, ...]
    [ , table_constraint ] [...] )

   column_properties
      { NOT NULL | COMMENT column_comment | column_constraint | MASK clause } [ ... ]

table_clauses
  { USING DELTA
    PARTITIONED BY (col [, ...]) |
    CLUSTER BY clause |
    LOCATION path |
    COMMENT view_comment |
    TBLPROPERTIES clause |
    WITH { ROW FILTER clause } } [ ... ]

Paramètres

  • REFRESH

    Si elle est spécifiée, crée la table ou met à jour une table existante et son contenu.

  • PRIVÉ

    Crée une table de diffusion en continu privée.

    • Ils ne sont pas ajoutés au catalogue et sont uniquement accessibles dans le pipeline de définition
    • Ils peuvent avoir le même nom qu’un objet existant dans le catalogue. Dans le pipeline, si une table de diffusion en continu privée et un objet dans le catalogue ont le même nom, les références à ce nom se référeront à la table de diffusion en continu privée.
    • Les tables de diffusion en continu privées sont sauvegardées pendant la durée de vie complète du pipeline, et non pas seulement pour une mise à jour unique.

    Les tables de diffusion en continu privées ont été créées précédemment avec le TEMPORARY paramètre.

  • table_name

    Le nom de la table nouvellement créée. Le nom complet de la table doit être unique.

  • spécification_de_table

    Cette clause facultative définit la liste des colonnes, leurs types, leurs propriétés, leurs descriptions et leurs contraintes de colonne.

  • contrainte_de_table

    Important

    Cette fonctionnalité est disponible en préversion publique.

    Lorsque vous spécifiez un schéma, vous pouvez définir des clés primaires et étrangères. Les contraintes sont informationnelles et ne sont pas appliquées. Consultez la clause CONSTRAINT dans la référence du langage SQL.

    Note

    Pour définir des contraintes de table, votre pipeline doit être compatible avec le Unity Catalog.

  • table_des_clauses

    Spécifiez éventuellement les propriétés de partitionnement, de commentaires et définies par l’utilisateur pour la table. Chaque sous-clause ne peut être spécifiée qu’une seule fois.

    • UTILISATION DE DELTA

      Spécifie le format de données. La seule option est DELTA.

      Cette clause est facultative et est définie par défaut sur DELTA.

    • PARTITIONNÉ PAR

      Liste facultative d’une ou plusieurs colonnes à utiliser pour le partitionnement dans la table. Mutuellement exclusif avec CLUSTER BY.

      Le clustering liquide offre une solution flexible et optimisée pour le regroupement. Envisagez d’utiliser CLUSTER BY plutôt que PARTITIONED BY pour les pipelines.

    • CLUSTER BY

      Activez le clustering liquide sur la table et définissez les colonnes à utiliser comme clés de clustering. Utilisez le clustering liquide automatique avec CLUSTER BY AUTO, et Databricks choisit intelligemment les clés de clustering pour optimiser les performances des requêtes. Mutuellement exclusif avec PARTITIONED BY.

      Consultez Utilisation de Liquid Clustering pour les tables.

    • EMPLACEMENT

      Emplacement de stockage facultatif pour les données de la table. S’il n’est pas défini, le système utilise par défaut l’emplacement de stockage du pipeline.

    • COMMENTAIRE

      Littéral STRING facultatif pour décrire la colonne.

    • TBLPROPERTIES

      Liste facultative des propriétés de table disponibles pour la table.

    • AVEC ROW FILTER

    Important

    Cette fonctionnalité est disponible en préversion publique.

    Ajoute une fonction de filtre de ligne au tableau. Toutes les requêtes futures de cette table reçoivent un sous-ensemble de lignes pour lesquelles la fonction prend la valeur TRUE. Cela est utile pour le contrôle d’accès affiné, car la fonction peut inspecter l’identité et les appartenances à un groupe de l’utilisateur appelant afin de décider s’il convient de filtrer certaines lignes.

    Consultez la clause ROW FILTER.

  • requête

    Cette clause remplit la table à l’aide des données de query. Cette requête doit être une requête de diffusion en continu . Utilisez le mot clé STREAM pour utiliser la sémantique de diffusion en continu pour lire à partir de la source. Si la lecture détecte une modification ou une suppression concernant un enregistrement existant, une erreur est générée. Il est plus sûr de lire depuis des sources statiques ou d’ajout uniquement. Pour ingérer des données ayant des validations de modification, vous pouvez utiliser Python et l’option SkipChangeCommits permettant de gérer les erreurs.

    Lorsque vous spécifiez un query et un table_specification ensemble, le schéma de table spécifié dans table_specification doit contenir toutes les colonnes retournées par le query, sinon vous obtenez une erreur. Toutes les colonnes spécifiées dans table_specification mais pas renvoyées par query renvoient des valeurs null lors de la requête.

    Pour plus d’informations sur la diffusion en continu des données, consultez Transformer des données avec des pipelines.

Autorisations requises

L’utilisateur d’identification pour un pipeline doit avoir les autorisations suivantes :

  • Le privilège SELECT sur les tables de base référencées par la table de diffusion en continu.
  • Le privilège USE CATALOG sur le catalogue parent et le privilège USE SCHEMA sur le schéma parent.
  • Le privilège CREATE MATERIALIZED VIEW sur le schéma pour la table de diffusion en continu.

Pour qu’un utilisateur puisse mettre à jour le pipeline dans lequel la table de diffusion en continu est définie, il a besoin des éléments suivants :

  • Le privilège USE CATALOG sur le catalogue parent et le privilège USE SCHEMA sur le schéma parent.
  • La propriété de la table de diffusion en continu ou le privilège REFRESH sur la table de diffusion en continu.
  • Le propriétaire de la table de streaming doit avoir le privilège SELECT sur les tables de base référencées par la table de streaming.

Pour qu’un utilisateur puisse interroger la table de diffusion en continu résultante, il a besoin des éléments suivants :

  • Le privilège USE CATALOG sur le catalogue parent et le privilège USE SCHEMA sur le schéma parent.
  • Le privilège SELECT sur la table de diffusion en continu.

Limites

  • Seuls les propriétaires de tables peuvent actualiser les tables de streaming pour obtenir les données les plus récentes.
  • ALTER TABLE les commandes ne sont pas autorisées sur les tables de streaming. La définition et les propriétés de la table doivent être modifiées par le biais de l'instruction CREATE OR REFRESH ou de l'instruction ALTER STREAMING TABLE.
  • L’évolution du schéma de table via des commandes DML telles que INSERT INTOet MERGE n’est pas prise en charge.
  • Les commandes suivantes ne sont pas prises en charge sur les tables de streaming :
    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • Le changement de nom de la table ou du propriétaire n'est pas supporté.
  • Les colonnes générées, les colonnes d’identité et les colonnes par défaut ne sont pas prises en charge.

Examples

-- Define a streaming table from a volume of files:
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/customers/*", format => "csv")

-- Define a streaming table from a streaming source table:
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

-- Define a table with a row filter and column mask:
CREATE OR REFRESH STREAMING TABLE customers_silver (
  id int COMMENT 'This is the customer ID',
  name string,
  region string,
  ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(customers_bronze)

-- Define a streaming table that you can add flows into:
CREATE OR REFRESH STREAMING TABLE orders;