Partage via


Consultez Charger des données à l’aide de tables de streaming dans Databricks SQL.

Databricks recommande d’utiliser des tables de streaming pour ingérer des données à l’aide de Databricks SQL. Une table de diffusion en continu est une table enregistrée dans Unity Catalog avec une prise en charge supplémentaire pour la diffusion en continu ou le traitement incrémentiel des données. Un pipeline Delta Live Tables est automatiquement créé pour chaque table de diffusion en continu. Vous pouvez utiliser des tables de streaming pour le chargement incrémentiel des données à partir de Kafka et du stockage d’objets cloud.

Cet article montre comment utiliser des tables de streaming pour charger des données à partir d’un stockage d’objets cloud configuré en tant que volume Unity Catalog (recommandé) ou emplacement externe.

Remarque

Pour en savoir plus sur comment utiliser des tables Delta Lake comme sources et récepteurs de diffusion en continu, consultez Lectures et écritures en continu de table Delta.

Important

Les tables de diffusion en continu créées dans Databricks SQL sont sauvegardées par un pipeline Delta Live Tables serverless. Votre espace de travail doit prendre en charge les pipelines serverless pour utiliser cette fonctionnalité.

Avant de commencer

Avant de commencer, vous devez satisfaire aux exigences suivantes :

Exigences pour l’espace de travail :

Exigences de calcul :

Vous devez utiliser une des options suivantes :

  • Un entrepôt SQL qui utilise le canal Current.

  • Calcul avec mode d’accès partagé sur Databricks Runtime 13.3 LTS ou ultérieur.

  • Calcul avec mode d’accès utilisateur unique sur Databricks Runtime 15.4 LTS ou version ultérieure.

    Sur Databricks Runtime 15.3 et ci-dessous, vous ne pouvez pas utiliser le calcul d’un utilisateur unique pour interroger des tables de diffusion en continu appartenant à d’autres utilisateurs. Vous pouvez utiliser le calcul d’un seul utilisateur sur Databricks Runtime 15.3 et ci-dessous uniquement si vous êtes propriétaire de la table de diffusion en continu. Le créateur de la table est le propriétaire.

    Databricks Runtime 15.4 LTS et versions ultérieures prennent en charge les requêtes sur les tables générées par Delta Live Tables sur le calcul mono-utilisateur, quelle que soit la propriété de la table. Pour tirer parti du filtrage des données fourni dans Databricks Runtime 15.4 LTS et ultérieur, vous devez également vérifier que votre espace de travail est activé pour le calcul serverless, car la fonctionnalité de filtrage des données qui prend en charge les tables générées par Delta Live Tables s’exécute sur un calcul serverless. Vous pouvez être facturé pour les ressources de calcul serverless lorsque vous utilisez le calcul mono-utilisateur pour exécuter des opérations de filtrage des données. Consultez le contrôle d’accès affiné sur le calcul d’un seul utilisateur.

Conditions requises pour les autorisations :

  • Privilège READ FILES sur un emplacement externe Unity Catalog. Pour plus d’informations, consultez Créer un emplacement externe pour connecter le stockage cloud à Azure Databricks.
  • Privilège USE CATALOG sur le catalogue dans lequel vous créez la table de streaming.
  • Privilège USE SCHEMA sur le schéma dans lequel vous créez la table de streaming.
  • Privilège CREATE TABLE sur le schéma dans lequel vous créez la table de streaming.

Autres exigences :

  • Chemin d’accès à vos données sources.

    Exemple de chemin d’accès au volume : /Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>

    Exemple de chemin d’accès à l’emplacement externe : abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis

    Remarque

    Cet article suppose que les données que vous voulez charger se trouvent dans un emplacement de stockage cloud qui correspond à un volume Unity Catalog ou à un emplacement externe auquel vous avez accès.

Découvrir et afficher un aperçu des données sources

  1. Dans la barre latérale de votre espace de travail, cliquez sur Requêtes, puis sur Créer une requête.

  2. Dans l’éditeur de requête, sélectionnez un entrepôt SQL qui utilise le canal Current dans la liste déroulante.

  3. Collez ce qui suit dans l’éditeur, en remplaçant les valeurs entre crochets (<>) pour les informations identifiant vos données sources, puis cliquez sur Exécuter.

    Remarque

    Vous pouvez rencontrer des erreurs d’inférence de schéma lors de l’exécution de la fonction table read_files si les valeurs par défaut de la fonction ne peuvent pas analyser vos données. Par exemple, vous devrez peut-être configurer le mode multiligne pour les fichiers CSV ou JSON multilignes. Pour obtenir la liste des options de l’analyseur, consultez read_files fonction table.

    /* Discover your data in a volume */
    LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>"
    
    /* Preview your data in a volume */
    SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10
    
    /* Discover your data in an external location */
    LIST "abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>"
    
    /* Preview your data */
    SELECT * FROM read_files("abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>") LIMIT 10
    

Charger des données dans une table de streaming

Pour créer une table de streaming à partir de données dans le stockage d’objets cloud, collez ce qui suit dans l’éditeur de requête, puis cliquez sur Exécuter :

/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')

/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>')

Actualiser une table de streaming à l’aide d’un pipeline DLT

Cette section décrit les modèles d’actualisation d’une table de streaming avec les dernières données disponibles à partir des sources définies dans la requête.

Lorsque vous CREATE ou REFRESH une table de diffusion en continu, les processus de mise à jour utilisent un pipeline Delta Live Tables serverless. Chaque table de diffusion en continu que vous définissez a un pipeline Delta Live Tables associé.

Après avoir exécuté la commande REFRESH, le lien de pipeline DLT est retourné. Vous pouvez utiliser le lien de pipeline DLT pour vérifier l’état d’actualisation.

Remarque

Seul le propriétaire de la table peut actualiser une table de diffusion en continu pour obtenir les données les plus récentes. L’utilisateur qui crée la table est le propriétaire et le propriétaire ne peut pas être modifié. Vous devrez peut-être actualiser votre table de diffusion en continu avant d’utiliser des requêtes de voyage dans le temps.

Consultez l’article Qu’est-ce que Delta Live Tables ?.

Ingérer de nouvelles données uniquement

Par défaut, la fonction read_files lit toutes les données existantes dans le répertoire source lors de la création de la table, puis traite les enregistrements nouvellement arrivés à chaque actualisation.

Pour éviter d’ingérer des données qui existent déjà dans le répertoire source au moment de la création de la table, utilisez définissez l’option includeExistingFiles sur false. Cela signifie que seules les données qui arrivent dans le répertoire après la création de la table sont traitées. Par exemple :

CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files(
  'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
  includeExistingFiles => false)

Actualiser entièrement une table de diffusion en continu

Les actualisations complètes re-traitent toutes les données disponibles dans la source avec la dernière définition. Il n’est pas recommandé d’appeler des actualisations complètes sur des sources qui ne conservent pas l’historique complet des données ou qui ont de courtes périodes de rétention, telles que Kafka, car l’actualisation complète tronque les données existantes. Vous ne pourrez peut-être pas récupérer d’anciennes données si les données ne sont plus disponibles dans la source.

Par exemple :

REFRESH STREAMING TABLE my_bronze_table FULL

Planifier une table de streaming pour l’actualisation automatique

Pour configurer une table de streaming pour l’actualiser automatiquement en fonction d’une planification définie, collez ce qui suit dans l’éditeur de requête, puis cliquez sur Exécuter :

ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
        CRON '<cron-string>'
                [ AT TIME ZONE '<timezone-id>' ]];

Pour obtenir des exemples de requêtes de planification d’actualisation, consultez ALTER STREAMING TABLE.

Suivre l’état de l’opération d’actualisation

Vous pouvez afficher l’état d’une actualisation de table de diffusion en continu en affichant le pipeline qui gère la table de diffusion en continu dans l’interface utilisateur Delta Live Tables ou en affichant les informations d’actualisation retournées par la commande DESCRIBE EXTENDED pour la table de diffusion en continu.

DESCRIBE EXTENDED <table-name>

Ingestion en streaming à partir de Kafka

Pour obtenir un exemple d’ingestion en streaming à partir de Kafka, consultez read_kafka.

Accorder aux utilisateurs l’accès à une table de streaming

Pour accorder aux utilisateurs le privilège SELECT sur la table de diffusion en continu afin qu’ils puissent l’interroger, collez ce qui suit dans l’éditeur de requête, puis cliquez sur Exécuter :

GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>

Pour plus d’informations sur les privilèges Unity Catalog, consultez les privilèges et objets sécurisables Unity Catalog.

Ressources supplémentaires