Partager via


CREATE STREAMING TABLE

S’applique à :coché oui Databricks SQL

Crée une table de streaming, une table Delta avec une prise en charge supplémentaire pour le streaming ou le traitement incrémentiel des données.

Les tables de streaming ne sont prises en charge que dans les pipelines déclaratifs Spark Lakeflow et sur Databricks SQL avec le catalogue Unity. L’exécution de cette commande sur le calcul Databricks Runtime pris en charge analyse uniquement la syntaxe. Consultez le développement de pipelines déclaratifs Spark Lakeflow avec SQL.

Syntaxe

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

column_properties
  { NOT NULL |
    COMMENT column_comment |
    column_constraint |
    MASK clause } [ ... ]

table_clauses
  { PARTITIONED BY (col [, ...]) |
    CLUSTER BY clause |
    COMMENT table_comment |
    DEFAULT COLLATION UTF8_BINARY |
    TBLPROPERTIES clause |
    schedule |
    WITH { ROW FILTER clause } } [...]

schedule
  { SCHEDULE [ REFRESH ] schedule_clause |
    TRIGGER ON UPDATE [ AT MOST EVERY trigger_interval ] }

schedule_clause
  { EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
  CRON cron_string [ AT TIME ZONE timezone_id ]}

Paramètres

  • REFRESH

    S’il est spécifié, actualise la table avec les dernières données disponibles à partir des sources définies dans la requête. Seules les nouvelles données qui arrivent avant le début de la requête sont traitées. Les nouvelles données ajoutées aux sources pendant l’exécution de la commande sont ignorées jusqu’à l’actualisation suivante. L’opération d’actualisation de CREATE OR REFRESH est entièrement déclarative. Si une commande d’actualisation ne spécifie pas toutes les métadonnées de l’instruction de création de table d’origine, les métadonnées non spécifiées sont supprimées.

  • SI ELLE N’EXISTE PAS

    Crée la table de diffusion en continu si elle n’existe pas. Si une table portant ce nom existe déjà, l’instruction CREATE STREAMING TABLE est ignorée.

    Vous pouvez spécifier au maximum un seul des paramètres IF NOT EXISTS ou OR REFRESH.

  • table_name

    Le nom de la table à créer. Le nom ne doit pas inclure de spécification temporelle ou de spécification d’options. Si le nom n’est pas qualifié, la table est créée dans le schéma actuel.

  • spécification_de_table

    Cette clause facultative définit la liste des colonnes, leurs types, leurs propriétés, leurs descriptions et leurs contraintes de colonne.

    Si vous ne définissez pas de colonnes dans le schéma de la table, vous devez spécifier AS query.

    • column_identifier

      Nom unique de la colonne.

      • column_type

        Spécifie le type de données de la colonne.

      • NOT NULL

        Si elle est spécifiée, la colonne n’accepte pas de valeurs NULL.

      • COMMENTAIRE Column_comment

        Littéral de chaîne pour décrire la colonne.

      • column_constraint

        Importante

        Cette fonctionnalité est disponible en préversion publique.

        Ajoute une clé primaire ou une contrainte de clé étrangère à la colonne d’une table de diffusion en continu. Les contraintes ne sont pas prises en charge pour les tables du catalogue hive_metastore.

      • Clause MASK

        Permet d’ajouter une fonction de masque de colonne pour anonymiser les données sensibles. Toutes les requêtes suivantes de cette colonne reçoivent le résultat de l’évaluation de cette fonction sur la colonne à la place de la valeur d’origine de la colonne. Cela peut être utile à des fins de contrôle d’accès plus précis, où la fonction peut inspecter l’identité ou l’appartenance à un groupe de l’utilisateur appelant afin de décider s’il convient de modifier la valeur.

      • CONSTRAINT expectation_name ATTENDRE (expectation_expr) [ ON VIOLATION { FAIL UPDATE | DROP ROW } ]

        Ajoute des attentes de qualité des données à la table. Ces attentes de qualité des données peuvent être suivies au fil du temps et accessibles via le journal des événements de la table de diffusion en continu. Une attente FAIL UPDATE entraîne l’échec du traitement lors de la création de la table et de l’actualisation de la table. Une attente DROP ROW entraîne la suppression de la ligne entière si l’attente n’est pas remplie.

        expectation_expr peut être composé de littéraux, d’identificateurs de colonnes dans la table, et de fonctions ou d’opérateurs SQL déterministes intégrés, à l’exception de :

        exprNe doit pas non plus contenir de sous-requête.

      • contrainte_de_table

        Importante

        Cette fonctionnalité est disponible en préversion publique.

        Ajoute une clé primaire informative ou des contraintes de clés étrangères informatives à une table de diffusion en continu. Les contraintes de clé ne sont pas prises en charge pour les tables du catalogue hive_metastore.

  • table_des_clauses

    Vous pouvez également spécifier le partitionnement, les commentaires, les propriétés définies par l'utilisateur et un calendrier d'actualisation pour la nouvelle table. Chaque sous-clause ne peut être spécifiée qu’une seule fois.

    • PARTITIONNÉ PAR

      Liste facultative des colonnes de la table par lesquelles partitionner la table.

      Remarque

      Le clustering liquide offre une solution flexible et optimisée pour le regroupement. Envisagez d'utiliser CLUSTER BY plutôt que PARTITIONED BY pour les tables de streaming.

    • CLUSTER BY

      Clause facultative pour regrouper par un sous-ensemble de colonnes. Utilisez le clustering liquide automatique avec CLUSTER BY AUTO, et Databricks choisit intelligemment les clés de clustering pour optimiser les performances des requêtes. Consultez Utilisation de Liquid Clustering pour les tables.

      Le clustering liquide ne peut pas être combiné avec PARTITIONED BY.

    • COMMENTAIRE table_comment

      Littéral STRING pour décrire la colonne.

    • COLLATION PAR DÉFAUT UTF8_BINARY

      S’applique à :check marqué oui Databricks SQL vérifié marqué oui Databricks Runtime 17.1 et versions ultérieures

      Force le classement par défaut de la table de streaming à UTF8_BINARY. Cette clause est obligatoire si le schéma dans lequel la table est créée a un classement par défaut autre que UTF8_BINARY. Le classement par défaut de la table de streaming est utilisé comme classement par défaut au sein du query et pour les types de colonnes.

    • TBLPROPERTIES

      (Facultatif) Définit une ou plusieurs propriétés définies par l’utilisateur.

      Utilisez ce paramètre pour spécifier le canal d’exécution de pipelines déclaratifs Spark Lakeflow utilisé pour exécuter cette instruction. Définissez la valeur de la pipelines.channel propriété sur "PREVIEW" ou "CURRENT". La valeur par défaut est "CURRENT". Pour plus d'informations sur les canaux des pipelines déclaratifs Spark Lakeflow, consultez les canaux d'exécution des pipelines déclaratifs Spark Lakeflow.

    • horaire

      La planification peut être une SCHEDULE instruction ou une TRIGGER instruction.

      • PROGRAMME [ REFRESH ] clause_du_programme

        • EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }

          Pour planifier une actualisation qui se produit régulièrement, utilisez EVERY la syntaxe. Si la syntaxe EVERY est spécifiée, la table de diffusion en continu ou la vue matérialisée est actualisée régulièrement à l’intervalle spécifié en fonction de la valeur fournie, telle que HOUR, HOURS, DAY, DAYS, WEEK, ou WEEKS. Le tableau suivant répertorie les valeurs entières acceptées pour number.

          Unité de temps Valeur de type entier
          HOUR or HOURS 1 <= H <= 72
          DAY or DAYS 1 <= D <= 31
          WEEK or WEEKS 1 <= W <= 8

          Remarque

          Les formes singulières et plurielles de l’unité de temps incluse sont sémantiquement équivalentes.

        • CRON cron_string [ AT TIME ZONE timezone_id ]

          Pour planifier une actualisation à l’aide d’une valeur quartz cron. Les time_zone_values valides sont acceptées. La fonction AT TIME ZONE LOCAL n'est pas prise en charge.

          Si AT TIME ZONE est absent, le fuseau horaire de session est utilisé. Si AT TIME ZONE est absent et que le fuseau horaire de session n’est pas défini, une erreur est générée. SCHEDULE est équivalent sémantiquement à SCHEDULE REFRESH.

        La planification peut être fournie dans le cadre de la commande CREATE. Utilisez ALTER STREAMING TABLE ou exécutez la commande CREATE OR REFRESH avec la clause SCHEDULE pour modifier la planification d’une table de streaming après la création.

      • DÉCLENCHEMENT ON UPDATE [ AU PLUS TOUS LES trigger_interval ]

        Importante

        La TRIGGER ON UPDATE fonctionnalité est en version bêta.

        Si vous le souhaitez, définissez la table à actualiser lorsqu’une source de données en amont est mise à jour, au plus une fois par minute. Définissez une valeur pour AT MOST EVERY exiger au moins une durée minimale entre les actualisations.

        Les sources de données en amont doivent être des tables Delta externes ou gérées (y compris des vues matérialisées ou des tables de diffusion en continu) ou des vues managées dont les dépendances sont limitées aux types de tables pris en charge.

        L’activation des événements de fichier peut rendre les déclencheurs plus performants et augmenter certaines des limites des mises à jour du déclencheur.

        Il trigger_interval s’agit d’une instruction INTERVAL qui est d’au moins 1 minute.

        TRIGGER ON UPDATE présente les limitations suivantes

        • Pas plus de 10 sources de données en amont par table de diffusion en continu lors de l’utilisation de TRIGGER ON UPDATE.
        • Un maximum de 1 000 tables de diffusion en continu ou de vues matérialisées peut être spécifié avec TRIGGER ON UPDATE.
        • La AT MOST EVERY clause est par défaut de 1 minute et ne peut pas être inférieure à 1 minute.
  • CLAUSE WITHROW FILTER

    Ajoute une fonction de filtre de ligne au tableau. Toutes les requêtes ultérieures de cette table reçoivent un sous-ensemble de ses lignes pour lesquelles la fonction prend la valeur booléenne TRUE. Cela peut être utile à des fins de contrôle d’accès plus précis, où la fonction peut inspecter l’identité ou l’appartenance à un groupe de l’utilisateur appelant afin de décider s’il convient de filtrer certaines lignes.

  • RequêteAS

    Cette clause remplit la table à l’aide des données de query. Cette requête doit être une requête de streaming . Pour ce faire, ajoutez le STREAM mot clé à n’importe quelle relation que vous souhaitez traiter de manière incrémentielle. Lorsque vous spécifiez un query et un table_specification ensemble, le schéma de table spécifié dans table_specification doit contenir toutes les colonnes retournées par le query, sinon vous obtenez une erreur. Toutes les colonnes spécifiées dans table_specification mais pas renvoyées par query renvoient des valeurs null lors de la requête.

Différences entre les tables de diffusion en continu et les autres tables

Les tables de diffusion en continu sont des tables avec état, conçues pour gérer chaque ligne une seule fois lorsque vous traitez un jeu de données croissant. Étant donné que la plupart des jeux de données croissent continuellement au fil du temps, les tables de diffusion en continu conviennent à la plupart des charges de travail d’ingestion. Les tables de diffusion en continu sont optimales pour les pipelines qui nécessitent une actualisation des données et une faible latence. Les tables de diffusion en continu peuvent également être utiles pour les transformations massives à l’échelle, car les résultats peuvent être calculés de manière incrémentielle à mesure que de nouvelles données arrivent, ce qui permet de maintenir les résultats à jour sans avoir à recalculer entièrement toutes les données sources à chaque mise à jour. Les tables de diffusion en continu sont conçues pour les sources de données en ajout uniquement.

Les tables de streaming acceptent des commandes supplémentaires telles que REFRESH, qui traite les dernières données disponibles dans les sources fournies dans la requête. Les modifications apportées à la requête fournie ne se reflètent sur les nouvelles données que lorsqu'on appelle un REFRESH, et non sur les données déjà traitées. Pour appliquer également les modifications sur les données existantes, vous devez exécuter REFRESH TABLE <table_name> FULL pour effectuer un FULL REFRESH. Les actualisations complètes re-traitent toutes les données disponibles dans la source avec la dernière définition. Il n’est pas recommandé d’appeler des actualisations complètes sur des sources qui ne conservent pas l’intégralité de l’historique des données ou qui ont des périodes de rétention courtes, telles que Kafka, car l’actualisation complète tronque les données existantes. Vous ne pourrez peut-être pas récupérer d’anciennes données si les données ne sont plus disponibles dans la source.

Filtres de lignes et masques de colonne

Les filtres de lignes vous permettent de spécifier une fonction qui s’applique en tant que filtre chaque fois qu’une analyse de table extrait des lignes. Ces filtres permettent de vérifier que les requêtes suivantes retournent uniquement les lignes pour lesquelles le prédicat de filtre a la valeur true.

Les masques de colonne vous permettent de masquer les valeurs d’une colonne chaque fois qu’une analyse de table récupère des lignes. Toutes les requêtes futures impliquant cette colonne recevront le résultat de l’évaluation de la fonction sur la colonne, en remplaçant la valeur d’origine de la colonne.

Pour plus d’informations sur l’utilisation des filtres de lignes et des masques de colonne, consultez Filtres de lignes et masques de colonne.

Gestion des filtres de lignes et des masques de colonne

Les filtres de lignes et les masques de colonne sur les tables de streaming doivent être ajoutés, mis à jour ou supprimés via l’instruction CREATE OR REFRESH.

Comportement

  • Actualisation en tant que définisseur : lorsque les instructions CREATE OR REFRESH ou REFRESH actualisent une table de streaming, les fonctions de filtre de lignes s’exécutent avec les droits du définisseur (en tant que du propriétaire de la table). Cela signifie que l’actualisation de la table utilise le contexte de sécurité de l’utilisateur qui a créé la table de streaming.
  • Requête : Bien que la plupart des filtres s’exécutent avec les droits du definer, les fonctions qui vérifient le contexte utilisateur (par CURRENT_USER exemple et IS_MEMBER) sont des exceptions. Ces fonctions s’exécutent en tant qu’appelant. Cette approche applique la sécurité des données et les contrôles d’accès spécifiques à l’utilisateur en fonction du contexte de l’utilisateur actuel.

Observabilité

Utilisez DESCRIBE EXTENDED, INFORMATION_SCHEMA ou l’Explorateur de catalogues pour examiner les filtres de lignes et les masques de colonne existants qui s’appliquent à une table de streaming donnée. Cette fonctionnalité permet aux utilisateurs d’auditer et de passer en revue l’accès aux données ainsi que les mesures de protection sur les tables de streaming.

Limites

  • Seuls les propriétaires de tables peuvent actualiser les tables de streaming pour obtenir les données les plus récentes.
  • ALTER TABLE les commandes ne sont pas autorisées sur les tables de streaming. La définition et les propriétés de la table doivent être modifiées par le biais de l'instruction CREATE OR REFRESH ou de l'instruction ALTER STREAMING TABLE.
  • L’évolution du schéma de table via des commandes DML telles que INSERT INTOet MERGE n’est pas prise en charge.
  • Les commandes suivantes ne sont pas prises en charge sur les tables de streaming :
    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • Le partage Delta n’est pas pris en charge.
  • Le changement de nom de la table ou du propriétaire n'est pas supporté.
  • Les contraintes de table telles que PRIMARY KEY et FOREIGN KEY ne sont pas prises en charge pour les tables de streaming dans le catalogue hive_metastore.
  • Les colonnes générées, les colonnes d’identité et les colonnes par défaut ne sont pas prises en charge.

Exemples

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Creates a streaming table that scheduled to refresh when upstream data is updated.
-- The refresh frequency of triggered_data is at most once an hour.
> CREATE STREAMING TABLE triggered_data
  TRIGGER ON UPDATE AT MOST EVERY INTERVAL 1 hour
  AS SELECT *
  FROM STREAM source_stream_data;

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE EVERY 1 HOUR
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  AS SELECT * FROM STREAM sales;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
    id int,
    name string,
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn
  )
  WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
  AS SELECT *
  FROM STREAM read_files('s3://bucket/path/sensitive_data')