Partager via


Utiliser des tables de streaming dans Databricks SQL

Databricks recommande d’utiliser des tables de diffusion en continu pour ingérer des données à l’aide de Databricks SQL. Une table de diffusion en continu est une table inscrite dans le catalogue Unity avec une prise en charge supplémentaire de la diffusion en continu ou du traitement incrémentiel des données. Un pipeline est automatiquement créé pour chaque table de streaming. Vous pouvez utiliser des tables de diffusion en continu pour le chargement incrémentiel des données à partir de Kafka et du stockage d’objets cloud.

Remarque

Pour en savoir plus sur comment utiliser des tables Delta Lake comme sources et récepteurs de diffusion en continu, consultez Lectures et écritures en continu de table Delta.

Spécifications

Pour utiliser des tables de diffusion en continu, vous devez répondre aux exigences suivantes.

Exigences pour l’espace de travail :

Les tables en flux créées dans Databricks SQL sont prises en charge par des pipelines déclaratifs Lakeflow sans serveur. Votre espace de travail doit prendre en charge les pipelines serverless pour utiliser cette fonctionnalité.

Exigences de calcul :

Vous devez utiliser une des options suivantes :

  • Un entrepôt SQL qui utilise le canal Current.
  • Calcul avec le mode d’accès standard (anciennement mode d’accès partagé) sur Databricks Runtime 13.3 LTS ou version ultérieure.
  • Calcul avec mode d’accès dédié (anciennement mode d’accès utilisateur unique) sur Databricks Runtime 15.4 LTS ou version ultérieure.

    Sur Databricks Runtime 15.3 et ci-dessous, vous ne pouvez pas utiliser de calcul dédié pour interroger des tables de streaming appartenant à d’autres utilisateurs. Vous pouvez utiliser le calcul dédié sur Databricks Runtime 15.3 ou versions antérieures uniquement si vous êtes propriétaire de la table de streaming. Le créateur de la table est le propriétaire.

    Databricks Runtime 15.4 LTS et versions ultérieures prennent en charge l’interrogation des tables générées par des pipelines déclaratifs Lakeflow sur le calcul dédié, même si vous n’êtes pas le propriétaire de la table. Vous pouvez être facturé pour les ressources de calcul serverless lorsque vous utilisez le calcul dédié pour exécuter des opérations de filtrage des données. Consultez le contrôle d’accès affiné sur le calcul dédié.

Conditions requises pour les autorisations :

  • Privilèges USE CATALOG et USE SCHEMA sur le catalogue et le schéma dans lesquels vous créez la table de diffusion en continu.
  • Privilège CREATE TABLE sur le schéma dans lequel vous créez la table de streaming.
  • Privilèges d’accès aux tables ou aux emplacements fournissant les données sources de votre table de diffusion en continu.

Créer des tables de diffusion en continu

Une table de streaming est définie par une requête SQL dans Databricks SQL. Lorsque vous créez une table de diffusion en continu, les données actuellement contenues dans les tables sources sont utilisées pour générer la table de diffusion en continu. Après cela, vous actualisez la table, généralement selon une planification, pour extraire toutes les données ajoutées dans les tables sources à ajouter à la table de diffusion en continu.

Lorsque vous créez une table de diffusion en continu, vous êtes considéré comme le propriétaire de la table.

Pour créer une table de diffusion en continu à partir d’une table existante, utilisez l’instructionCREATE STREAMING TABLE, comme dans l’exemple suivant :

CREATE OR REFRESH STREAMING TABLE sales
  SCHEDULE EVERY 1 hour
  AS SELECT product, price FROM STREAM raw_data;

Dans ce cas, la table sales de diffusion en continu est créée à partir de colonnes spécifiques de la raw_data table, avec une planification à actualiser toutes les heures. La requête utilisée doit être une requête de diffusion en continu . Utilisez le mot clé STREAM pour utiliser la sémantique de streaming pour lire à partir de la source.

Lorsque vous créez une table de diffusion en continu à l’aide de l’instruction CREATE OR REFRESH STREAMING TABLE , l’actualisation initiale des données et la population commencent immédiatement. Ces opérations n’utilisent pas le calcul de l’entrepôt DBSQL. Au lieu de cela, la table de streaming s’appuie sur des pipelines déclaratifs Lakeflow serverless pour la création et l’actualisation. Un pipeline serverless dédié est créé et géré automatiquement par le système pour chaque table de streaming.

Charger des fichiers avec le chargeur automatique

Pour créer une table en flux continu à partir de fichiers dans un volume, vous utilisez Auto Loader. Utilisez le chargeur automatique avec des pipelines déclaratifs Lakeflow pour la plupart des tâches d’ingestion de données à partir du stockage d’objets cloud. Le chargeur automatique et les pipelines déclaratifs Lakeflow sont conçus pour charger de manière incrémentielle et idempotente des données toujours croissantes à mesure qu’elles arrivent dans le stockage cloud.

Pour utiliser le chargeur automatique dans Databricks SQL, utilisez la read_files fonction. Les exemples suivants illustrent l’utilisation du chargeur automatique pour lire un volume de fichiers JSON dans une table de diffusion en continu :

CREATE OR REFRESH STREAMING TABLE sales
  SCHEDULE EVERY 1 hour
  AS SELECT * FROM STREAM read_files(
    "/Volumes/my_catalog/my_schema/my_volume/path/to/data",
    format => "json"
  );

Pour lire des données à partir du stockage cloud, vous pouvez également utiliser le chargeur automatique :

CREATE OR REFRESH STREAMING TABLE sales
  SCHEDULE EVERY 1 hour
  AS SELECT *
  FROM STREAM read_files(
    'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
    format => "json"
  );

Pour en savoir plus sur le chargeur automatique, consultez Qu’est-ce que le chargeur automatique ?. Pour en savoir plus sur l’utilisation du chargeur automatique dans SQL, avec des exemples, consultez Charger des données à partir du stockage d’objets.

Ingestion de streaming à partir d’autres sources

Pour un exemple d'ingestion à partir d'autres sources, y compris Kafka, consultez Charger des données avec des pipelines déclaratifs Lakeflow.

Ingérer de nouvelles données uniquement

Par défaut, la fonction read_files lit toutes les données existantes dans le répertoire source lors de la création de la table, puis traite les enregistrements nouvellement arrivés avec chaque actualisation.

Pour éviter d’ingérer des données qui existent déjà dans le répertoire source au moment de la création de la table, définissez l’option includeExistingFiles sur false. Cela signifie que seules les données qui arrivent dans le répertoire après la création de la table sont traitées. Par exemple:

CREATE OR REFRESH STREAMING TABLE sales
  SCHEDULE EVERY 1 hour
  AS SELECT *
  FROM STREAM read_files(
    '/path/to/files',
    includeExistingFiles => false
  );

Configurer le canal d’exécution

Les tables de streaming créées à l’aide d’entrepôts SQL sont automatiquement actualisées à l’aide d’un pipeline. Les pipelines déclaratifs Lakeflow utilisent le runtime dans le current canal par défaut. Consultez les notes de publication des pipelines déclaratifs Lakeflow et le processus de mise à niveau pour en savoir plus sur le processus de publication.

Databricks recommande d’utiliser le canal current pour les charges de travail de production. Les nouvelles fonctionnalités sont d’abord publiées sur le canal preview. Vous pouvez définir un pipeline sur le canal Pipelines déclaratifs Lakeflow en préversion pour tester les nouvelles fonctionnalités en spécifiant preview comme propriété de table. Vous pouvez spécifier cette propriété lorsque vous créez la table ou après la création de la table à l'aide d'une instruction ALTER.

L’exemple de code suivant montre comment configurer le canal en préversion dans une instruction CREATE :

CREATE OR REFRESH STREAMING TABLE sales
  TBLPROPERTIES ('pipelines.channel' = 'preview')
  SCHEDULE EVERY 1 hour
  AS SELECT *
  FROM STREAM raw_data;

Masquer les données sensibles

Important

Cette fonctionnalité est disponible en préversion publique.

Vous pouvez utiliser des tables de streaming pour masquer les données sensibles aux utilisateurs accédant à la table. Une approche consiste à définir la requête afin qu’elle exclut entièrement les colonnes ou lignes sensibles. Vous pouvez également appliquer des masques de colonne ou des filtres de lignes en fonction des autorisations de l’utilisateur interrogeant. Par exemple, vous pouvez masquer la tax_id colonne pour les utilisateurs qui ne figurent pas dans le groupe HumanResourcesDept. Pour ce faire, utilisez la syntaxe ROW FILTER et MASK pendant la création de la table en streaming. Pour plus d’informations, consultez Filtrer les données de table sensibles à l’aide de filtres de lignes et de masques de colonne.

Actualiser une table de diffusion en continu

Les actualisations peuvent être planifiées automatiquement lorsque vous créez la table de diffusion en continu. Vous pouvez également actualiser manuellement les tables de diffusion en continu. Même si vous avez une actualisation planifiée, vous pouvez appeler une actualisation manuelle à tout moment. Les actualisations sont gérées par le même pipeline que celui qui a été créé automatiquement avec la table de diffusion en continu.

Pour actualiser une table de diffusion en continu :

REFRESH STREAMING TABLE sales;

Vous pouvez vérifier l’état de la dernière actualisation avec DESCRIBE TABLE EXTENDED.

Remarque

Seul le propriétaire de la table peut actualiser une table de flux pour obtenir les données les plus récentes. L’utilisateur qui crée la table est le propriétaire et le propriétaire ne peut pas être modifié. Vous devrez peut-être rafraîchir votre table de diffusion en continu avant d’utiliser des requêtes de voyage dans le temps.

Fonctionnement de l’actualisation

Une actualisation de la table de diffusion en continu évalue uniquement les nouvelles lignes qui sont arrivées depuis la dernière mise à jour et ajoute uniquement les nouvelles données.

Chaque actualisation utilise la définition actuelle de la table de diffusion en continu pour traiter ces nouvelles données. La modification d’une définition de table de diffusion en continu ne recalcule pas automatiquement les données existantes. Si une modification n’est pas compatible avec les données existantes (par exemple, modification d’un type de données), l’actualisation suivante échoue avec une erreur.

Les exemples suivants expliquent comment les modifications apportées à une définition de table de diffusion en continu affectent le comportement d’actualisation :

  • La suppression d’un filtre ne retraite pas les lignes précédemment filtrées.
  • La modification des projections de colonnes n’affecte pas la façon dont les données existantes ont été traitées.
  • Les jointures avec des instantanés statiques utilisent l’état de l’instantané au moment du traitement initial. Les données tardives qui auraient été mises en correspondance avec l’instantané mis à jour seront ignorées. Ceci peut entraîner la suppression des faits si des dimensions sont tardives.
  • La modification du CAST d’une colonne existante entraîne une erreur.

Si vos données changent d’une manière qui ne peut pas être prise en charge dans la table de diffusion en continu existante, vous pouvez effectuer une actualisation complète.

Actualiser entièrement une table de diffusion en continu

Les actualisations complètes re-traitent toutes les données disponibles dans la source avec la dernière définition. Il n’est pas recommandé d’appeler des actualisations complètes sur des sources qui ne conservent pas l’historique entier des données ou qui ont des périodes de rétention courtes, telles que Kafka, car l’actualisation complète tronque les données existantes. Vous ne pourrez peut-être pas récupérer d’anciennes données si les données ne sont plus disponibles dans la source.

Par exemple:

REFRESH STREAMING TABLE sales FULL;

Modifier le calendrier d’une table de diffusion en continu

Vous pouvez modifier (ou définir) une planification d’actualisation automatique pour votre table de diffusion en continu. Les exemples suivants vous montrent comment définir une planification à l’aide ALTER STREAMING TABLEde :

ALTER STREAMING TABLE sales
  ADD SCHEDULE every 1 hour;

Pour obtenir des exemples de requêtes de planification d’actualisation, consultez ALTER STREAMING TABLE.

Suivre l’état d’une actualisation

Vous pouvez afficher l’état d’une actualisation d’une table de diffusion en continu en consultant le pipeline qui gère la table de diffusion en continu dans l’interface utilisateur des pipelines déclaratifs Lakeflow ou en affichant les informations d’actualisation retournées par la DESCRIBE EXTENDED commande de la table de diffusion en continu.

DESCRIBE TABLE EXTENDED <table-name>;

Vous pouvez aussi visualiser la table de diffusion en continu dans l’Explorateur de catalogues et y voir l’état d’actualisation :

  1. Cliquez sur l’icône Données.Catalogue dans la barre latérale.
  2. Dans l’arborescence de l’Explorateur de catalogues à gauche, ouvrez le catalogue et sélectionnez le schéma où se trouve votre table de diffusion en continu.
  3. Ouvrez l’élément Tables sous le schéma que vous avez sélectionné, puis cliquez sur la table de diffusion en continu.

À partir de là, vous pouvez utiliser les onglets sous le nom de la table de diffusion en continu pour afficher et modifier des informations sur la table de diffusion en continu, notamment :

  • Actualiser l’état et l’historique
  • Schéma de table
  • Exemples de données (nécessite un calcul actif)
  • Autorisations
  • Traçabilité, y compris les tables et les pipelines dont dépend cette table de diffusion en continu
  • Insights sur l’utilisation
  • Moniteurs que vous avez créés pour cette table de diffusion en continu

Contrôler l’accès aux tables de streaming

Les tables de diffusion en continu prennent en charge les contrôles d’accès enrichis pour prendre en charge le partage de données tout en évitant d’exposer des données potentiellement privées. Un propriétaire de table de diffusion en continu ou un utilisateur disposant du privilège MANAGE peut accorder des privilèges SELECT à d’autres utilisateurs. Les utilisateurs ayant SELECT accès à la table de diffusion en continu n’ont pas besoin SELECT d’accéder aux tables référencées par la table de diffusion en continu. Ce contrôle d’accès permet le partage de données tout en contrôlant l’accès aux données sous-jacentes.

Accorder des privilèges à une table de diffusion en continu

Pour accorder l’accès à une table de diffusion en continu, utilisez l’instructionGRANT :

GRANT <privilege_type> ON <st_name> TO <principal>;

Le privilege_type peut être :

  • SELECT – l’utilisateur peut effectuer une opération SELECT sur la table de diffusion en continu.
  • REFRESH – l’utilisateur peut effectuer une opération REFRESH sur la table de diffusion en continu. Les actualisations sont exécutées à l’aide des autorisations du propriétaire.

L’exemple suivant crée une table de diffusion en continu et accorde des privilèges de sélection et d’actualisation aux utilisateurs :

CREATE MATERIALIZED VIEW st_name AS SELECT * FROM source_table;

-- Grant read-only access:
GRANT SELECT ON st_name TO read_only_user;

-- Grand read and refresh access:
GRANT SELECT ON st_name TO refresh_user;
GRANT REFRESH ON st_name TO refresh_user;

Pour plus d'informations sur l'octroi de privilèges concernant les objets sécurisables dans Unity Catalog, consultez les privilèges et objets sécurisables Unity Catalog.

Révoquer des privilèges sur une table de streaming

Pour révoquer l’accès à partir d’une table de diffusion en continu, utilisez l’instructionREVOKE :

REVOKE privilege_type ON <st_name> FROM principal;

Lorsque SELECT des privilèges sur une table source sont révoqués du propriétaire de la table de diffusion en continu ou d’un autre utilisateur qui a reçu MANAGE ou SELECT des privilèges sur la table de diffusion en continu, ou que la table source est supprimée, le propriétaire de la table de diffusion en continu ou l’utilisateur autorisé à interroger la table de diffusion en continu est toujours en mesure d’interroger la table de diffusion en continu. Toutefois, le comportement suivant se produit :

  • Le propriétaire de la table de diffusion en continu ou d’autres personnes qui ont perdu l’accès à une table de diffusion en continu ne peuvent plus effectuer d’opération REFRESH sur cette table, et celle-ci devient obsolète.
  • En cas d’automatisation avec une planification, l’opération REFRESH suivante échoue ou n’est pas exécutée.

L’exemple suivant révoque le privilège SELECT de read_only_user :

REVOKE SELECT ON st_name FROM read_only_user;

Supprimer définitivement les enregistrements d’une table de diffusion en continu

Important

La prise en charge de l’instruction REORG avec des tables de diffusion en continu est en préversion publique.

Remarque

  • L’utilisation d’une instruction REORG avec une table de diffusion en continu nécessite Databricks Runtime 15.4 et versions ultérieures.
  • Bien que vous puissiez utiliser l’instruction REORG avec n’importe quelle table de diffusion en continu, elle n’est nécessaire que lors de la suppression d’enregistrements d’une table de diffusion en continu avec des vecteurs de suppression activés . La commande n’a aucun effet lorsqu’elle est utilisée avec une table de diffusion en continu sans vecteurs de suppression activés.

Pour supprimer physiquement les enregistrements du stockage sous-jacent d'une table de streaming avec des vecteurs de suppression activés, il faut prendre des mesures supplémentaires pour garantir qu'une opération VACUUM s'exécute sur les données de la table de streaming, par exemple pour la conformité RGPD.

Pour supprimer physiquement les enregistrements du stockage sous-jacent :

  1. Mettez à jour les enregistrements ou supprimez des enregistrements de la table de diffusion en continu.
  2. Exécutez une instruction REORG sur la table de streaming, en spécifiant le paramètre APPLY (PURGE). Par exemple, REORG TABLE <streaming-table-name> APPLY (PURGE);.
  3. Attendez que la période de rétention des données de la table de diffusion en continu passe. La période de rétention des données par défaut est de sept jours, mais elle peut être configurée avec la propriété de table delta.deletedFileRetentionDuration. Consultez Configurer la conservation des données pour des requêtes de voyage dans le temps.
  4. REFRESH la table de diffusion en continu. Consultez Actualiser une table de diffusion en continu. Dans les 24 heures suivant l’opération REFRESH, les tâches de maintenance des pipelines déclaratifs Lakeflow, y compris l’opération VACUUM requise pour garantir que les enregistrements sont supprimés définitivement, sont exécutées automatiquement.

Monitorer les exécutions en utilisant l’Historique des requêtes

Vous pouvez utiliser la page d'historique des requêtes pour accéder aux détails des requêtes et aux profils de requêtes qui peuvent vous aider à identifier les requêtes peu performantes et les goulots d'étranglement dans les pipelines déclaratifs Lakeflow utilisés pour exécuter vos mises à jour de table en streaming. Pour obtenir une vue d’ensemble du type d’informations disponibles dans les historiques de requête et les profils de requête, consultez Historique des requêtes et Profil des requêtes.

Important

Cette fonctionnalité est disponible en préversion publique. Les administrateurs d’espace de travail peuvent activer cette fonctionnalité à partir de la page Aperçus. Consultez Gérer les préversions d’Azure Databricks.

Toutes les déclarations liées aux tables de lecture en continu apparaissent dans l’historique des requêtes. Vous pouvez utiliser le filtre déroulant Requête pour sélectionner une commande et inspecter les requêtes associées. Toutes les CREATE instructions sont suivies d’une REFRESH instruction qui s’exécute de manière asynchrone sur un pipeline. Les instructions REFRESH incluent généralement des plans de requête détaillés qui fournissent des insights sur l’optimisation des performances.

Pour accéder aux instructions REFRESH de l’interface utilisateur de l’Historique des requêtes, procédez comme suit :

  1. Cliquez sur l’icône Historique. Dans la barre latérale gauche pour ouvrir l’interface utilisateur de l’historique des requêtes .
  2. Sélectionnez la case à cocher REFRESH dans le filtre déroulant de la déclaration .
  3. Cliquez sur le nom de l'instruction de requête pour afficher les détails récapitulatifs tels que la durée de la requête et les métriques agrégées.
  4. Cliquez sur Afficher le profil de requête pour ouvrir le profil de requête. Pour plus d’informations sur la navigation dans le profil de requête, consultez Profil de requête.
  5. Si vous le souhaitez, vous pouvez utiliser les liens dans la section Source de requête pour ouvrir la requête ou le pipeline associé.

Vous pouvez également accéder aux détails de la requête à l’aide de liens dans l’éditeur SQL ou à partir d’un bloc-notes attaché à un entrepôt SQL.

Ressources supplémentaires