Notes
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Databricks recommande d’utiliser des tables de diffusion en continu pour ingérer des données à l’aide de Databricks SQL. Une table de diffusion en continu est une table inscrite dans le catalogue Unity avec une prise en charge supplémentaire de la diffusion en continu ou du traitement incrémentiel des données. Un pipeline est automatiquement créé pour chaque table de streaming. Vous pouvez utiliser des tables de diffusion en continu pour le chargement incrémentiel des données à partir de Kafka et du stockage d’objets cloud.
Remarque
Pour en savoir plus sur comment utiliser des tables Delta Lake comme sources et récepteurs de diffusion en continu, consultez Lectures et écritures en continu de table Delta.
Spécifications
Pour utiliser des tables de diffusion en continu, vous devez répondre aux exigences suivantes.
Exigences pour l’espace de travail :
Les tables en flux créées dans Databricks SQL sont prises en charge par des pipelines déclaratifs Lakeflow sans serveur. Votre espace de travail doit prendre en charge les pipelines serverless pour utiliser cette fonctionnalité.
- Un compte Azure Databricks avec le mode sans serveur activé. Pour plus d’informations, consultez Activer des entrepôts SQL sans serveur.
- Un espace de travail avec le catalogue Unity activé. Pour plus d’informations, consultez Prise en main du catalogue Unity.
Exigences de calcul :
Vous devez utiliser une des options suivantes :
- Un entrepôt SQL qui utilise le canal
Current
. - Calcul avec le mode d’accès standard (anciennement mode d’accès partagé) sur Databricks Runtime 13.3 LTS ou version ultérieure.
Calcul avec mode d’accès dédié (anciennement mode d’accès utilisateur unique) sur Databricks Runtime 15.4 LTS ou version ultérieure.
Sur Databricks Runtime 15.3 et ci-dessous, vous ne pouvez pas utiliser de calcul dédié pour interroger des tables de streaming appartenant à d’autres utilisateurs. Vous pouvez utiliser le calcul dédié sur Databricks Runtime 15.3 ou versions antérieures uniquement si vous êtes propriétaire de la table de streaming. Le créateur de la table est le propriétaire.
Databricks Runtime 15.4 LTS et versions ultérieures prennent en charge l’interrogation des tables générées par des pipelines déclaratifs Lakeflow sur le calcul dédié, même si vous n’êtes pas le propriétaire de la table. Vous pouvez être facturé pour les ressources de calcul serverless lorsque vous utilisez le calcul dédié pour exécuter des opérations de filtrage des données. Consultez le contrôle d’accès affiné sur le calcul dédié.
Conditions requises pour les autorisations :
- Privilèges
USE CATALOG
etUSE SCHEMA
sur le catalogue et le schéma dans lesquels vous créez la table de diffusion en continu. - Privilège
CREATE TABLE
sur le schéma dans lequel vous créez la table de streaming. - Privilèges d’accès aux tables ou aux emplacements fournissant les données sources de votre table de diffusion en continu.
Créer des tables de diffusion en continu
Une table de streaming est définie par une requête SQL dans Databricks SQL. Lorsque vous créez une table de diffusion en continu, les données actuellement contenues dans les tables sources sont utilisées pour générer la table de diffusion en continu. Après cela, vous actualisez la table, généralement selon une planification, pour extraire toutes les données ajoutées dans les tables sources à ajouter à la table de diffusion en continu.
Lorsque vous créez une table de diffusion en continu, vous êtes considéré comme le propriétaire de la table.
Pour créer une table de diffusion en continu à partir d’une table existante, utilisez l’instructionCREATE STREAMING TABLE, comme dans l’exemple suivant :
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT product, price FROM STREAM raw_data;
Dans ce cas, la table sales
de diffusion en continu est créée à partir de colonnes spécifiques de la raw_data
table, avec une planification à actualiser toutes les heures. La requête utilisée doit être une requête de diffusion en continu . Utilisez le mot clé STREAM
pour utiliser la sémantique de streaming pour lire à partir de la source.
Lorsque vous créez une table de diffusion en continu à l’aide de l’instruction CREATE OR REFRESH STREAMING TABLE
, l’actualisation initiale des données et la population commencent immédiatement. Ces opérations n’utilisent pas le calcul de l’entrepôt DBSQL. Au lieu de cela, la table de streaming s’appuie sur des pipelines déclaratifs Lakeflow serverless pour la création et l’actualisation. Un pipeline serverless dédié est créé et géré automatiquement par le système pour chaque table de streaming.
Charger des fichiers avec le chargeur automatique
Pour créer une table en flux continu à partir de fichiers dans un volume, vous utilisez Auto Loader. Utilisez le chargeur automatique avec des pipelines déclaratifs Lakeflow pour la plupart des tâches d’ingestion de données à partir du stockage d’objets cloud. Le chargeur automatique et les pipelines déclaratifs Lakeflow sont conçus pour charger de manière incrémentielle et idempotente des données toujours croissantes à mesure qu’elles arrivent dans le stockage cloud.
Pour utiliser le chargeur automatique dans Databricks SQL, utilisez la read_files
fonction. Les exemples suivants illustrent l’utilisation du chargeur automatique pour lire un volume de fichiers JSON dans une table de diffusion en continu :
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/my_schema/my_volume/path/to/data",
format => "json"
);
Pour lire des données à partir du stockage cloud, vous pouvez également utiliser le chargeur automatique :
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT *
FROM STREAM read_files(
'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
format => "json"
);
Pour en savoir plus sur le chargeur automatique, consultez Qu’est-ce que le chargeur automatique ?. Pour en savoir plus sur l’utilisation du chargeur automatique dans SQL, avec des exemples, consultez Charger des données à partir du stockage d’objets.
Ingestion de streaming à partir d’autres sources
Pour un exemple d'ingestion à partir d'autres sources, y compris Kafka, consultez Charger des données avec des pipelines déclaratifs Lakeflow.
Ingérer de nouvelles données uniquement
Par défaut, la fonction read_files
lit toutes les données existantes dans le répertoire source lors de la création de la table, puis traite les enregistrements nouvellement arrivés avec chaque actualisation.
Pour éviter d’ingérer des données qui existent déjà dans le répertoire source au moment de la création de la table, définissez l’option includeExistingFiles
sur false
. Cela signifie que seules les données qui arrivent dans le répertoire après la création de la table sont traitées. Par exemple:
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT *
FROM STREAM read_files(
'/path/to/files',
includeExistingFiles => false
);
Configurer le canal d’exécution
Les tables de streaming créées à l’aide d’entrepôts SQL sont automatiquement actualisées à l’aide d’un pipeline. Les pipelines déclaratifs Lakeflow utilisent le runtime dans le current
canal par défaut. Consultez les notes de publication des pipelines déclaratifs Lakeflow et le processus de mise à niveau pour en savoir plus sur le processus de publication.
Databricks recommande d’utiliser le canal current
pour les charges de travail de production. Les nouvelles fonctionnalités sont d’abord publiées sur le canal preview
. Vous pouvez définir un pipeline sur le canal Pipelines déclaratifs Lakeflow en préversion pour tester les nouvelles fonctionnalités en spécifiant preview
comme propriété de table. Vous pouvez spécifier cette propriété lorsque vous créez la table ou après la création de la table à l'aide d'une instruction ALTER.
L’exemple de code suivant montre comment configurer le canal en préversion dans une instruction CREATE :
CREATE OR REFRESH STREAMING TABLE sales
TBLPROPERTIES ('pipelines.channel' = 'preview')
SCHEDULE EVERY 1 hour
AS SELECT *
FROM STREAM raw_data;
Masquer les données sensibles
Important
Cette fonctionnalité est disponible en préversion publique.
Vous pouvez utiliser des tables de streaming pour masquer les données sensibles aux utilisateurs accédant à la table. Une approche consiste à définir la requête afin qu’elle exclut entièrement les colonnes ou lignes sensibles. Vous pouvez également appliquer des masques de colonne ou des filtres de lignes en fonction des autorisations de l’utilisateur interrogeant. Par exemple, vous pouvez masquer la tax_id
colonne pour les utilisateurs qui ne figurent pas dans le groupe HumanResourcesDept
. Pour ce faire, utilisez la syntaxe ROW FILTER
et MASK
pendant la création de la table en streaming. Pour plus d’informations, consultez Filtrer les données de table sensibles à l’aide de filtres de lignes et de masques de colonne.
Actualiser une table de diffusion en continu
Les actualisations peuvent être planifiées automatiquement lorsque vous créez la table de diffusion en continu. Vous pouvez également actualiser manuellement les tables de diffusion en continu. Même si vous avez une actualisation planifiée, vous pouvez appeler une actualisation manuelle à tout moment. Les actualisations sont gérées par le même pipeline que celui qui a été créé automatiquement avec la table de diffusion en continu.
Pour actualiser une table de diffusion en continu :
REFRESH STREAMING TABLE sales;
Vous pouvez vérifier l’état de la dernière actualisation avec DESCRIBE TABLE EXTENDED.
Remarque
Seul le propriétaire de la table peut actualiser une table de flux pour obtenir les données les plus récentes. L’utilisateur qui crée la table est le propriétaire et le propriétaire ne peut pas être modifié. Vous devrez peut-être rafraîchir votre table de diffusion en continu avant d’utiliser des requêtes de voyage dans le temps.
Fonctionnement de l’actualisation
Une actualisation de la table de diffusion en continu évalue uniquement les nouvelles lignes qui sont arrivées depuis la dernière mise à jour et ajoute uniquement les nouvelles données.
Chaque actualisation utilise la définition actuelle de la table de diffusion en continu pour traiter ces nouvelles données. La modification d’une définition de table de diffusion en continu ne recalcule pas automatiquement les données existantes. Si une modification n’est pas compatible avec les données existantes (par exemple, modification d’un type de données), l’actualisation suivante échoue avec une erreur.
Les exemples suivants expliquent comment les modifications apportées à une définition de table de diffusion en continu affectent le comportement d’actualisation :
- La suppression d’un filtre ne retraite pas les lignes précédemment filtrées.
- La modification des projections de colonnes n’affecte pas la façon dont les données existantes ont été traitées.
- Les jointures avec des instantanés statiques utilisent l’état de l’instantané au moment du traitement initial. Les données tardives qui auraient été mises en correspondance avec l’instantané mis à jour seront ignorées. Ceci peut entraîner la suppression des faits si des dimensions sont tardives.
- La modification du CAST d’une colonne existante entraîne une erreur.
Si vos données changent d’une manière qui ne peut pas être prise en charge dans la table de diffusion en continu existante, vous pouvez effectuer une actualisation complète.
Actualiser entièrement une table de diffusion en continu
Les actualisations complètes re-traitent toutes les données disponibles dans la source avec la dernière définition. Il n’est pas recommandé d’appeler des actualisations complètes sur des sources qui ne conservent pas l’historique entier des données ou qui ont des périodes de rétention courtes, telles que Kafka, car l’actualisation complète tronque les données existantes. Vous ne pourrez peut-être pas récupérer d’anciennes données si les données ne sont plus disponibles dans la source.
Par exemple:
REFRESH STREAMING TABLE sales FULL;
Modifier le calendrier d’une table de diffusion en continu
Vous pouvez modifier (ou définir) une planification d’actualisation automatique pour votre table de diffusion en continu. Les exemples suivants vous montrent comment définir une planification à l’aide ALTER STREAMING TABLEde :
ALTER STREAMING TABLE sales
ADD SCHEDULE every 1 hour;
Pour obtenir des exemples de requêtes de planification d’actualisation, consultez ALTER STREAMING TABLE.
Suivre l’état d’une actualisation
Vous pouvez afficher l’état d’une actualisation d’une table de diffusion en continu en consultant le pipeline qui gère la table de diffusion en continu dans l’interface utilisateur des pipelines déclaratifs Lakeflow ou en affichant les informations d’actualisation retournées par la DESCRIBE EXTENDED
commande de la table de diffusion en continu.
DESCRIBE TABLE EXTENDED <table-name>;
Vous pouvez aussi visualiser la table de diffusion en continu dans l’Explorateur de catalogues et y voir l’état d’actualisation :
- Cliquez sur
Catalogue dans la barre latérale.
- Dans l’arborescence de l’Explorateur de catalogues à gauche, ouvrez le catalogue et sélectionnez le schéma où se trouve votre table de diffusion en continu.
- Ouvrez l’élément Tables sous le schéma que vous avez sélectionné, puis cliquez sur la table de diffusion en continu.
À partir de là, vous pouvez utiliser les onglets sous le nom de la table de diffusion en continu pour afficher et modifier des informations sur la table de diffusion en continu, notamment :
- Actualiser l’état et l’historique
- Schéma de table
- Exemples de données (nécessite un calcul actif)
- Autorisations
- Traçabilité, y compris les tables et les pipelines dont dépend cette table de diffusion en continu
- Insights sur l’utilisation
- Moniteurs que vous avez créés pour cette table de diffusion en continu
Contrôler l’accès aux tables de streaming
Les tables de diffusion en continu prennent en charge les contrôles d’accès enrichis pour prendre en charge le partage de données tout en évitant d’exposer des données potentiellement privées. Un propriétaire de table de diffusion en continu ou un utilisateur disposant du privilège MANAGE
peut accorder des privilèges SELECT
à d’autres utilisateurs. Les utilisateurs ayant SELECT
accès à la table de diffusion en continu n’ont pas besoin SELECT
d’accéder aux tables référencées par la table de diffusion en continu. Ce contrôle d’accès permet le partage de données tout en contrôlant l’accès aux données sous-jacentes.
Accorder des privilèges à une table de diffusion en continu
Pour accorder l’accès à une table de diffusion en continu, utilisez l’instructionGRANT :
GRANT <privilege_type> ON <st_name> TO <principal>;
Le privilege_type
peut être :
-
SELECT
– l’utilisateur peut effectuer une opérationSELECT
sur la table de diffusion en continu. -
REFRESH
– l’utilisateur peut effectuer une opérationREFRESH
sur la table de diffusion en continu. Les actualisations sont exécutées à l’aide des autorisations du propriétaire.
L’exemple suivant crée une table de diffusion en continu et accorde des privilèges de sélection et d’actualisation aux utilisateurs :
CREATE MATERIALIZED VIEW st_name AS SELECT * FROM source_table;
-- Grant read-only access:
GRANT SELECT ON st_name TO read_only_user;
-- Grand read and refresh access:
GRANT SELECT ON st_name TO refresh_user;
GRANT REFRESH ON st_name TO refresh_user;
Pour plus d'informations sur l'octroi de privilèges concernant les objets sécurisables dans Unity Catalog, consultez les privilèges et objets sécurisables Unity Catalog.
Révoquer des privilèges sur une table de streaming
Pour révoquer l’accès à partir d’une table de diffusion en continu, utilisez l’instructionREVOKE :
REVOKE privilege_type ON <st_name> FROM principal;
Lorsque SELECT
des privilèges sur une table source sont révoqués du propriétaire de la table de diffusion en continu ou d’un autre utilisateur qui a reçu MANAGE
ou SELECT
des privilèges sur la table de diffusion en continu, ou que la table source est supprimée, le propriétaire de la table de diffusion en continu ou l’utilisateur autorisé à interroger la table de diffusion en continu est toujours en mesure d’interroger la table de diffusion en continu. Toutefois, le comportement suivant se produit :
- Le propriétaire de la table de diffusion en continu ou d’autres personnes qui ont perdu l’accès à une table de diffusion en continu ne peuvent plus effectuer d’opération
REFRESH
sur cette table, et celle-ci devient obsolète. - En cas d’automatisation avec une planification, l’opération
REFRESH
suivante échoue ou n’est pas exécutée.
L’exemple suivant révoque le privilège SELECT
de read_only_user
:
REVOKE SELECT ON st_name FROM read_only_user;
Supprimer définitivement les enregistrements d’une table de diffusion en continu
Important
La prise en charge de l’instruction REORG
avec des tables de diffusion en continu est en préversion publique.
Remarque
- L’utilisation d’une instruction
REORG
avec une table de diffusion en continu nécessite Databricks Runtime 15.4 et versions ultérieures. - Bien que vous puissiez utiliser l’instruction
REORG
avec n’importe quelle table de diffusion en continu, elle n’est nécessaire que lors de la suppression d’enregistrements d’une table de diffusion en continu avec des vecteurs de suppression activés . La commande n’a aucun effet lorsqu’elle est utilisée avec une table de diffusion en continu sans vecteurs de suppression activés.
Pour supprimer physiquement les enregistrements du stockage sous-jacent d'une table de streaming avec des vecteurs de suppression activés, il faut prendre des mesures supplémentaires pour garantir qu'une opération VACUUM s'exécute sur les données de la table de streaming, par exemple pour la conformité RGPD.
Pour supprimer physiquement les enregistrements du stockage sous-jacent :
- Mettez à jour les enregistrements ou supprimez des enregistrements de la table de diffusion en continu.
- Exécutez une instruction
REORG
sur la table de streaming, en spécifiant le paramètreAPPLY (PURGE)
. Par exemple,REORG TABLE <streaming-table-name> APPLY (PURGE);
. - Attendez que la période de rétention des données de la table de diffusion en continu passe. La période de rétention des données par défaut est de sept jours, mais elle peut être configurée avec la propriété de table
delta.deletedFileRetentionDuration
. Consultez Configurer la conservation des données pour des requêtes de voyage dans le temps. -
REFRESH
la table de diffusion en continu. Consultez Actualiser une table de diffusion en continu. Dans les 24 heures suivant l’opérationREFRESH
, les tâches de maintenance des pipelines déclaratifs Lakeflow, y compris l’opérationVACUUM
requise pour garantir que les enregistrements sont supprimés définitivement, sont exécutées automatiquement.
Monitorer les exécutions en utilisant l’Historique des requêtes
Vous pouvez utiliser la page d'historique des requêtes pour accéder aux détails des requêtes et aux profils de requêtes qui peuvent vous aider à identifier les requêtes peu performantes et les goulots d'étranglement dans les pipelines déclaratifs Lakeflow utilisés pour exécuter vos mises à jour de table en streaming. Pour obtenir une vue d’ensemble du type d’informations disponibles dans les historiques de requête et les profils de requête, consultez Historique des requêtes et Profil des requêtes.
Important
Cette fonctionnalité est disponible en préversion publique. Les administrateurs d’espace de travail peuvent activer cette fonctionnalité à partir de la page Aperçus. Consultez Gérer les préversions d’Azure Databricks.
Toutes les déclarations liées aux tables de lecture en continu apparaissent dans l’historique des requêtes. Vous pouvez utiliser le filtre déroulant Requête pour sélectionner une commande et inspecter les requêtes associées. Toutes les CREATE
instructions sont suivies d’une REFRESH
instruction qui s’exécute de manière asynchrone sur un pipeline. Les instructions REFRESH
incluent généralement des plans de requête détaillés qui fournissent des insights sur l’optimisation des performances.
Pour accéder aux instructions REFRESH
de l’interface utilisateur de l’Historique des requêtes, procédez comme suit :
- Cliquez sur
Dans la barre latérale gauche pour ouvrir l’interface utilisateur de l’historique des requêtes .
- Sélectionnez la case à cocher REFRESH dans le filtre déroulant de la déclaration .
- Cliquez sur le nom de l'instruction de requête pour afficher les détails récapitulatifs tels que la durée de la requête et les métriques agrégées.
- Cliquez sur Afficher le profil de requête pour ouvrir le profil de requête. Pour plus d’informations sur la navigation dans le profil de requête, consultez Profil de requête.
- Si vous le souhaitez, vous pouvez utiliser les liens dans la section Source de requête pour ouvrir la requête ou le pipeline associé.
Vous pouvez également accéder aux détails de la requête à l’aide de liens dans l’éditeur SQL ou à partir d’un bloc-notes attaché à un entrepôt SQL.