Consultez Charger des données à l’aide de tables de streaming dans Databricks SQL.
Databricks recommande d’utiliser des tables de streaming pour ingérer des données à l’aide de Databricks SQL. Une table de diffusion en continu est une table enregistrée dans Unity Catalog avec une prise en charge supplémentaire pour la diffusion en continu ou le traitement incrémentiel des données. Un pipeline Delta Live Tables est automatiquement créé pour chaque table de diffusion en continu. Vous pouvez utiliser des tables de streaming pour le chargement incrémentiel des données à partir de Kafka et du stockage d’objets cloud.
Cet article montre comment utiliser des tables de streaming pour charger des données à partir d’un stockage d’objets cloud configuré en tant que volume Unity Catalog (recommandé) ou emplacement externe.
Remarque
Pour en savoir plus sur comment utiliser des tables Delta Lake comme sources et récepteurs de diffusion en continu, consultez Lectures et écritures en continu de table Delta.
Important
Les tables de diffusion en continu créées dans Databricks SQL sont sauvegardées par un pipeline Delta Live Tables serverless. Votre espace de travail doit prendre en charge les pipelines serverless pour utiliser cette fonctionnalité.
Avant de commencer
Avant de commencer, vous devez satisfaire aux exigences suivantes :
Exigences pour l’espace de travail :
- Un compte Azure Databricks avec serverless activé. Pour plus d’informations, consultez Activer des entrepôts SQL serverless.
- Un espace de travail avec Unity Catalog activé. Pour plus d’informations, consultez Configurer et gérer Unity Catalog.
Exigences de calcul :
Vous devez utiliser une des options suivantes :
Un entrepôt SQL qui utilise le canal
Current
.Calcul avec mode d’accès partagé sur Databricks Runtime 13.3 LTS ou ultérieur.
Calcul avec mode d’accès utilisateur unique sur Databricks Runtime 15.4 LTS ou version ultérieure.
Sur Databricks Runtime 15.3 et ci-dessous, vous ne pouvez pas utiliser le calcul d’un utilisateur unique pour interroger des tables de diffusion en continu appartenant à d’autres utilisateurs. Vous pouvez utiliser le calcul d’un seul utilisateur sur Databricks Runtime 15.3 et ci-dessous uniquement si vous êtes propriétaire de la table de diffusion en continu. Le créateur de la table est le propriétaire.
Databricks Runtime 15.4 LTS et versions ultérieures prennent en charge les requêtes sur les tables générées par Delta Live Tables sur le calcul mono-utilisateur, quelle que soit la propriété de la table. Pour tirer parti du filtrage des données fourni dans Databricks Runtime 15.4 LTS et ultérieur, vous devez également vérifier que votre espace de travail est activé pour le calcul serverless, car la fonctionnalité de filtrage des données qui prend en charge les tables générées par Delta Live Tables s’exécute sur un calcul serverless. Vous pouvez être facturé pour les ressources de calcul serverless lorsque vous utilisez le calcul mono-utilisateur pour exécuter des opérations de filtrage des données. Consultez le contrôle d’accès affiné sur le calcul d’un seul utilisateur.
Conditions requises pour les autorisations :
- Privilège
READ FILES
sur un emplacement externe Unity Catalog. Pour plus d’informations, consultez Créer un emplacement externe pour connecter le stockage cloud à Azure Databricks. - Privilège
USE CATALOG
sur le catalogue dans lequel vous créez la table de streaming. - Privilège
USE SCHEMA
sur le schéma dans lequel vous créez la table de streaming. - Privilège
CREATE TABLE
sur le schéma dans lequel vous créez la table de streaming.
Autres exigences :
Chemin d’accès à vos données sources.
Exemple de chemin d’accès au volume :
/Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>
Exemple de chemin d’accès à l’emplacement externe :
abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis
Remarque
Cet article suppose que les données que vous voulez charger se trouvent dans un emplacement de stockage cloud qui correspond à un volume Unity Catalog ou à un emplacement externe auquel vous avez accès.
Découvrir et afficher un aperçu des données sources
Dans la barre latérale de votre espace de travail, cliquez sur Requêtes, puis sur Créer une requête.
Dans l’éditeur de requête, sélectionnez un entrepôt SQL qui utilise le canal
Current
dans la liste déroulante.Collez ce qui suit dans l’éditeur, en remplaçant les valeurs entre crochets (
<>
) pour les informations identifiant vos données sources, puis cliquez sur Exécuter.Remarque
Vous pouvez rencontrer des erreurs d’inférence de schéma lors de l’exécution de la fonction table
read_files
si les valeurs par défaut de la fonction ne peuvent pas analyser vos données. Par exemple, vous devrez peut-être configurer le mode multiligne pour les fichiers CSV ou JSON multilignes. Pour obtenir la liste des options de l’analyseur, consultez read_files fonction table./* Discover your data in a volume */ LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>" /* Preview your data in a volume */ SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10 /* Discover your data in an external location */ LIST "abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>" /* Preview your data */ SELECT * FROM read_files("abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>") LIMIT 10
Charger des données dans une table de streaming
Pour créer une table de streaming à partir de données dans le stockage d’objets cloud, collez ce qui suit dans l’éditeur de requête, puis cliquez sur Exécuter :
/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')
/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>')
Définir le canal d’exécution
Les tables de streaming créées à l’aide d’entrepôts SQL sont automatiquement actualisées à l’aide d’un pipeline Delta Live Tables. Les pipelines Delta Live Tables utilisent le runtime dans le current
canal par défaut. Consultez les notes de publication de Delta Live Tables et le processus de mise à niveau des versions pour en savoir plus sur le processus de mise en production.
Databricks recommande d’utiliser le current
canal pour les charges de travail de production. Les nouvelles fonctionnalités sont d’abord publiées sur le preview
canal. Vous pouvez définir un pipeline sur le canal Delta Live Tables en préversion pour tester de nouvelles fonctionnalités en spécifiant preview
comme propriété de table. Vous pouvez spécifier cette propriété lorsque vous créez la table ou une fois la table créée à l’aide d’une instruction ALTER.
L’exemple de code suivant montre comment définir le canal en préversion dans une instruction CREATE :
CREATE OR REPLACE MATERIALIZED VIEW foo.default.bar
TBLPROPERTIES ('pipelines.channel' = 'preview') as
SELECT
*
FROM
range(5)
## <a id="refresh"></a> Refresh a <st> using a DLT pipeline
This section describes patterns for refreshing a <st> with the latest data available from the sources defined in the query.
When you `CREATE` or `REFRESH` a <st>, the update processes using a serverless <DLT> pipeline. Each <st> you define has an associated <DLT> pipeline.
After you run the `REFRESH` command, the DLT pipeline link is returned. You can use the DLT pipeline link to check the status of the refresh.
.. note:: Only the table owner can refresh a <st> to get the latest data. The user that creates the table is the owner, and the owner can't be changed. You might need to refresh your <st> before using [time travel](/delta/history.md#time-travel) queries.
See [_](/delta-live-tables/index.md).
### Ingest new data only
By default, the `read_files` function reads all existing data in the source directory during table creation, and then processes newly arriving records with each refresh.
To avoid ingesting data that already exists in the source directory at the time of table creation, set the `includeExistingFiles` option to `false`. This means that only data that arrives in the directory after table creation is processed. For example:
.. azure::
```sql
CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files(
'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
includeExistingFiles => false)
Actualiser entièrement une table de diffusion en continu
Les actualisations complètes re-traitent toutes les données disponibles dans la source avec la dernière définition. Il n’est pas recommandé d’appeler des actualisations complètes sur des sources qui ne conservent pas l’historique complet des données ou qui ont de courtes périodes de rétention, telles que Kafka, car l’actualisation complète tronque les données existantes. Vous ne pourrez peut-être pas récupérer d’anciennes données si les données ne sont plus disponibles dans la source.
Par exemple :
REFRESH STREAMING TABLE my_bronze_table FULL
Planifier une table de streaming pour l’actualisation automatique
Pour configurer une table de streaming pour l’actualiser automatiquement en fonction d’une planification définie, collez ce qui suit dans l’éditeur de requête, puis cliquez sur Exécuter :
ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
CRON '<cron-string>'
[ AT TIME ZONE '<timezone-id>' ]];
Pour obtenir des exemples de requêtes de planification d’actualisation, consultez ALTER STREAMING TABLE.
Suivre l’état de l’opération d’actualisation
Vous pouvez afficher l’état d’une actualisation de table de diffusion en continu en affichant le pipeline qui gère la table de diffusion en continu dans l’interface utilisateur Delta Live Tables ou en affichant les informations d’actualisation retournées par la commande DESCRIBE EXTENDED
pour la table de diffusion en continu.
DESCRIBE EXTENDED <table-name>
Ingestion en streaming à partir de Kafka
Pour obtenir un exemple d’ingestion en streaming à partir de Kafka, consultez read_kafka.
Accorder aux utilisateurs l’accès à une table de streaming
Pour accorder aux utilisateurs le privilège SELECT
sur la table de diffusion en continu afin qu’ils puissent l’interroger, collez ce qui suit dans l’éditeur de requête, puis cliquez sur Exécuter :
GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>
Pour plus d’informations sur les privilèges Unity Catalog, consultez les privilèges et objets sécurisables Unity Catalog.
Surveiller les exécutions à l’aide de l’historique des requêtes
Vous pouvez utiliser la page d’historique des requêtes pour accéder aux détails de la requête et aux profils de requête qui peuvent vous aider à identifier les requêtes et les goulots d’étranglement médiocres dans le pipeline Delta Live Tables utilisés pour exécuter vos mises à jour de table de diffusion en continu. Pour obtenir une vue d’ensemble du type d’informations disponibles dans les historiques de requête et les profils de requête, consultez l’historique des requêtes et le profil de requête.
Important
Cette fonctionnalité est disponible en préversion publique. Les administrateurs d’espace de travail peuvent activer cette fonctionnalité à partir de la page Aperçus. Consultez Gérer les préversions d’Azure Databricks.
Toutes les instructions relatives aux tables de diffusion en continu apparaissent dans l’historique des requêtes. Vous pouvez utiliser le filtre déroulant d’instruction pour sélectionner n’importe quelle commande et inspecter les requêtes associées. Toutes les CREATE
instructions sont suivies d’une REFRESH
instruction qui s’exécute de manière asynchrone sur un pipeline Delta Live Tables. Les REFRESH
instructions incluent généralement des plans de requête détaillés qui fournissent des insights sur l’optimisation des performances.
Pour accéder aux REFRESH
instructions de l’interface utilisateur de l’historique des requêtes, procédez comme suit :
- Cliquez dans la barre latérale gauche pour ouvrir l’interface utilisateur de l’historique des requêtes.
- Cochez la case ACTUALISER dans le filtre déroulant Instruction .
- Cliquez sur le nom de l’instruction de requête pour afficher les détails du résumé, comme la durée de la requête et les métriques agrégées.
- Cliquez sur Afficher le profil de requête pour ouvrir le profil de requête. Pour plus d’informations sur la navigation dans le profil de requête, consultez le profil de requête.
- Si vous le souhaitez, vous pouvez utiliser les liens dans la section Source de requête pour ouvrir la requête ou le pipeline associé.
Vous pouvez également accéder aux détails de requête à l’aide de liens dans l’éditeur SQL ou à partir d’un notebook attaché à un entrepôt SQL.
Remarque
Votre table de diffusion en continu doit être configurée pour s’exécuter à l’aide du canal d’aperçu. Consultez Définir le canal d’exécution.