Qu’est-ce que le mode de notification de fichier Auto Loader ?
Dans le mode Notification de fichiers, Auto Loader configure automatiquement un service de notification et un service de file d’attente qui s’abonnent aux événements de fichiers du répertoire d’entrée. Vous pouvez utiliser les notifications de fichiers pour adapter l'Auto Loader à l'ingestion de millions de fichiers par heure. Comparé au mode de liste de répertoire, le mode Notification de fichiers est plus performant et évolutif pour les répertoires d’entrée volumineux ou un nombre élevé de fichiers, mais il nécessite des autorisations supplémentaires sur le cloud.
Vous pouvez basculer entre les notifications de fichiers et la liste de répertoires à tout moment tout en conservant la garantie de traitement des données « une seule fois ».
Remarque
Le mode de notification de fichier n’est pas pris en charge pour les comptes Stockage Premium Azure, car les comptes Premium ne prennent pas en charge le stockage en file d’attente.
Avertissement
La modification du chemin d’accès source pour Auto Loader n’est pas prise en charge pour le mode de notification de fichier. Si le mode de notification de fichier est utilisé et que le chemin d’accès a été modifié, vous risquez de ne pas ingérer les fichiers qui sont déjà présents dans le nouveau répertoire lors de sa mise à jour.
Le mode Notification de fichier n’est pris en charge que sur le calcul à utilisateur unique.
Ressources cloud utilisées en mode de notification de fichier Auto Loader
Important
Vous avez besoin d’autorisations élevées pour configurer automatiquement l’infrastructure cloud pour le mode de notification de fichier. Contactez votre administrateur cloud ou administrateur d’espace de travail. Voir :
Auto Loader peut configurer automatiquement des notifications de fichiers pour vous lorsque vous définissez l’option cloudFiles.useNotifications
sur true
et que vous fournissez les autorisations nécessaires pour créer des ressources cloud. En outre, vous devrez peut-être fournir des options supplémentaires pour donner à Auto Loader l’autorisation de créer ces ressources.
Le tableau suivant résume les ressources qui sont créées par Auto Loader.
Cloud Storage | Service d’abonnement | Service File d’attente | Préfixe * | Limite ** |
---|---|---|---|---|
AWS S3 | AWS SNS | AWS SQS | databricks-auto-ingest | 100 par compartiment S3 |
ADLS Gen2 | Azure Event Grid | Stockage File d’attente Azure | databricks | 500 par compte de stockage |
GCS | Google Pub/Sub | Google Pub/Sub | databricks-auto-ingest | 100 par compartiment GCS |
Stockage Blob Azure | Azure Event Grid | Stockage File d’attente Azure | databricks | 500 par compte de stockage |
- Auto Loader nomme les ressources avec ce préfixe.
** Nombre de pipelines de notification de fichiers pouvant être lancés simultanément
Si vous avez besoin d’exécuter plus que le nombre limité de pipelines de notification de fichiers pour un compte de stockage donné, vous pouvez :
- Utiliser un service tel que AWS Lambda, Azure Functions ou Google Cloud Functions pour répartir les notifications d’une file d’attente unique qui écoute un conteneur ou un compartiment entier dans des files d’attente spécifiques aux répertoires.
Événements de notification de fichiers
AWS S3 fournit un événement ObjectCreated
lorsqu’un fichier est chargé dans un compartiment S3, qu’il ait été chargé par un chargement put ou en plusieurs parties.
ADLS Gen2 fournit différentes notifications d’événements pour les fichiers qui apparaissent dans votre conteneur Gen2.
- Auto Loader écoute l’événement
FlushWithClose
pour traiter un fichier. - Les flux Auto Loader prennent en charge l’action
RenameFile
de découverte de fichiers. Les actionsRenameFile
nécessitent l’envoi d’une demande d’API au système de stockage pour obtenir la taille du fichier renommé. - Les flux Auto Loader créés avec Databricks Runtime 9.0 et versions ultérieures prennent en charge l’action
RenameDirectory
pour découvrir les fichiers. Les actionsRenameDirectory
nécessitent l’envoi de demandes d’API au système de stockage pour lister le contenu du répertoire renommé.
Google Cloud Storage fournit un événement OBJECT_FINALIZE
lorsqu’un fichier est chargé, ce qui inclut les remplacements et les copies de fichiers. Les échecs de chargement ne génèrent pas cet événement.
Notes
Les fournisseurs de cloud ne garantissent pas une livraison à 100 % de tous les événements de fichiers dans des conditions très rares et ne fournissent pas de SLA stricts sur la latence des événements de fichiers. Databricks vous recommande de déclencher des renvois standard avec Auto Loader à l’aide de l’option cloudFiles.backfillInterval
pour garantir que tous les fichiers sont découverts dans un SLA donné si l’exhaustivité des données est obligatoire. Le déclenchement de renvois normaux n’entraîne pas de doublons.
Autorisations requises pour configurer la notification de fichier pour ADLS Gen2 et Stockage Blob Azure
Vous devez disposer d’autorisations de lecture pour le répertoire d’entrée. Consultez Stockage Blob Azure.
Pour utiliser le mode de notification de fichiers, vous devez fournir des informations d’authentification pour configurer les services de notification d’événements et y accéder. Vous avez uniquement besoin d’un principal de service pour l’authentification.
Principal de service - Utilisation de rôles intégrés Azure
Créez une application et un principal de service Microsoft Entra ID (anciennement Azure Active Directory) sous la forme d’un ID client et d’une clé secrète client.
Attribuez à cette application les rôles suivants pour le compte de stockage dans lequel se trouve le chemin d’entrée :
- Contributor : Ce rôle sert à configurer les ressources de votre compte de stockage, telles que les files d’attente et les abonnements aux événements.
- Storage Queue Data Contributor : Ce rôle sert à effectuer des opérations sur les files d’attente, telles que la récupération et la suppression de messages dans les files d’attente. Ce rôle est obligatoire uniquement lorsque vous fournissez un principal de service sans chaîne de connexion.
Attribuez à cette application le rôle suivant pour le groupe de ressources associé :
- EventGrid EventSubscription Contributor : Ce rôle sert à effectuer des opérations sur les abonnements à la grille d’événements, telles que la création ou l’énumération des abonnements aux événements.
Pour plus d’informations, consultez Attribuer des rôles Azure en utilisant le portail Azure.
Principal de service - utilisation d’un rôle personnalisé
Si vous êtes préoccupé par les autorisations excessives requises pour les rôles précédents, vous pouvez créer un Rôle personnalisé avec au moins les autorisations suivantes, listées ci-dessous au format JSON de rôle Azure :
"permissions": [ { "actions": [ "Microsoft.EventGrid/eventSubscriptions/write", "Microsoft.EventGrid/eventSubscriptions/read", "Microsoft.EventGrid/eventSubscriptions/delete", "Microsoft.EventGrid/locations/eventSubscriptions/read", "Microsoft.Storage/storageAccounts/read", "Microsoft.Storage/storageAccounts/write", "Microsoft.Storage/storageAccounts/queueServices/read", "Microsoft.Storage/storageAccounts/queueServices/write", "Microsoft.Storage/storageAccounts/queueServices/queues/write", "Microsoft.Storage/storageAccounts/queueServices/queues/read", "Microsoft.Storage/storageAccounts/queueServices/queues/delete" ], "notActions": [], "dataActions": [ "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action" ], "notDataActions": [] } ]
Ensuite, vous pouvez attribuer ce rôle personnalisé à votre application.
Pour plus d’informations, consultez Attribuer des rôles Azure en utilisant le portail Azure.
Résolution des erreurs courantes
Erreur :
java.lang.RuntimeException: Failed to create event grid subscription.
Si ce message d’erreur s’affiche lorsque vous exécutez Auto Loader pour la première fois, cela signifie que la grille d’événements n’est pas inscrite en tant que fournisseur de ressources dans votre abonnement Azure. Pour l’inscrire sur Portail Azure :
- Accédez à votre abonnement.
- Cliquez sur Fournisseurs de ressources sous la section Paramètres.
- Inscrivez le fournisseur
Microsoft.EventGrid
.
Erreur :
403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...
Si ce message d’erreur s’affiche lorsque vous exécutez Auto Loader pour la première fois, vérifiez que vous avez attribué le rôle Contributeur à votre principal de service pour Event Grid, ainsi qu’à votre compte de stockage.
Autorisations requises pour configurer la notification de fichier pour AWS S3
Vous devez disposer d’autorisations de lecture pour le répertoire d’entrée. Pour plus d’informations, consultez les détails de la connexion S3.
Pour utiliser le mode Notification de fichiers, joignez le document de stratégie JSON suivant à votre utilisateur ou rôle IAM.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderSetup",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"s3:PutBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:SetTopicAttributes",
"sns:CreateTopic",
"sns:TagResource",
"sns:Publish",
"sns:Subscribe",
"sqs:CreateQueue",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:SetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility"
],
"Resource": [
"arn:aws:s3:::<bucket-name>",
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
},
{
"Sid": "DatabricksAutoLoaderList",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "*"
},
{
"Sid": "DatabricksAutoLoaderTeardown",
"Effect": "Allow",
"Action": [
"sns:Unsubscribe",
"sns:DeleteTopic",
"sqs:DeleteQueue"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
}
]
}
où :
<bucket-name>
: Nom du compartiment S3 dans lequel votre flux lira des fichiers, par exempleauto-logs
. Vous pouvez utiliser*
comme caractère générique, par exempledatabricks-*-logs
. Pour connaître le compartiment S3 sous-jacent de votre chemin d’accès DBFS, vous pouvez répertorier tous les points de montage DBFS d’un notebook en exécutant%fs mounts
.<region>
: Région AWS dans laquelle réside le compartiment S3, par exempleus-west-2
. Si vous ne souhaitez pas spécifier la région, utilisez*
.<account-number>
: Numéro de compte AWS qui possède le compartiment S3, par exemple123456789012
. Si vous ne souhaitez pas spécifier le numéro de compte, utilisez*
.
La chaîne databricks-auto-ingest-*
dans la spécification ARN SQS et SNS est le préfixe de nom que la source cloudFiles
utilise lors de la création des services SQS et SNS. Étant donné qu’Azure Databricks configure les services de notification lors de l’exécution initiale du flux, vous pouvez utiliser une stratégie avec des autorisations réduites après l’exécution initiale (par exemple, arrêter le flux, puis le redémarrer).
Notes
La stratégie précédente concerne uniquement les autorisations nécessaires pour configurer les services de notification de fichiers, à savoir les services de notification de compartiment S3, SNS et SQS, et suppose que vous disposez déjà d’un accès en lecture au compartiment S3. Si vous devez ajouter des autorisations S3 en lecture seule, ajoutez ce qui suit à la liste Action
dans l’instruction DatabricksAutoLoaderSetup
dans le document JSON :
s3:ListBucket
s3:GetObject
Autorisations réduites après la configuration initiale
Les autorisations de configuration des ressources décrites ci-dessus sont requises uniquement lors de l’exécution initiale du flux. Après la première exécution, vous pouvez passer à la stratégie IAM suivante avec des autorisations réduites.
Important
Avec les autorisations réduites, vous ne pouvez pas démarrer de nouvelles requêtes de streaming ni recréer des ressources en cas d’échec (par exemple, la file d’attente SQS a été supprimée par inadvertance). Vous ne pouvez pas non plus utiliser l’API de gestion des ressources cloud pour répertorier ou détruire des ressources.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderUse",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:TagResource",
"sns:Publish",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:<queue-name>",
"arn:aws:sns:<region>:<account-number>:<topic-name>",
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:PutObjectAcl",
"s3:GetObject",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::<bucket-name>/*"
]
},
{
"Sid": "DatabricksAutoLoaderListTopics",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "arn:aws:sns:<region>:<account-number>:*"
}
]
}
Autorisations requises pour configurer la notification de fichier pour GCS
Vous devez disposer des autorisations list
et get
sur votre compartiment GCS et sur tous les objets. Pour plus d’informations, consultez la documentation de Google sur les autorisations IAM.
Pour utiliser le mode Notification de fichiers, vous devez ajouter des autorisations pour le compte de service GCS et pour le compte utilisé pour accéder aux ressources Google Cloud Pub/Sub.
Ajoutez le rôle Pub/Sub Publisher
au compte de service GCS. Cela permet au compte de publier des messages de notification d’événements de vos compartiments GCS vers Google Cloud Pub/Sub.
Comme pour le compte de service utilisé pour les ressources Google Cloud Pub/Sub, vous devez ajouter les autorisations suivantes :
pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update
Pour ce faire, vous pouvez soit créer un rôle personnalisé IAM avec ces autorisations, soit attribuer des rôles GCP préexistants pour couvrir ces autorisations.
Recherche du compte de service GCS
Dans la console Google Cloud pour le projet correspondant, accédez à Cloud Storage > Settings
.
La section « Compte de service de stockage cloud » contient l’e-mail du compte de service GCS.
Création d’un rôle IAM Google Cloud personnalisé pour le mode Notification de fichiers
Dans la console Google Cloud pour le projet correspondant, accédez à IAM & Admin > Roles
. Ensuite, créez un rôle en haut de l’écran ou mettez à jour un rôle existant. Dans l’écran de création ou de modification du rôle, cliquez sur Add Permissions
. Dans le menu qui s’affiche, vous pouvez ajouter les autorisations souhaitées au rôle.
Configurer ou gérer manuellement des ressources de notification de fichier
Les utilisateurs privilégiés peuvent configurer ou gérer manuellement des ressources de notification de fichier.
- Configurez manuellement les services de notification de fichiers via le fournisseur de cloud et spécifiez manuellement l’identificateur de file d’attente. Pour plus d’informations, consultez Options de notification de fichiers.
- Utilisez les API Scala pour créer ou gérer les services de notifications et de mise en file d’attente, comme illustré dans l’exemple suivant :
Remarque
Vous devez disposer des autorisations appropriées pour configurer ou modifier l’infrastructure cloud. Consultez la documentation sur les autorisations pour Azure, S3 ou GCS.
Python
# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds
# COMMAND ----------
#####################################
## Creating a ResourceManager in AWS
#####################################
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
.newManager() \
.option("cloudFiles.region", <region>) \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in Azure
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
.newManager() \
.option("cloudFiles.connectionString", <connection-string>) \
.option("cloudFiles.resourceGroup", <resource-group>) \
.option("cloudFiles.subscriptionId", <subscription-id>) \
.option("cloudFiles.tenantId", <tenant-id>) \
.option("cloudFiles.clientId", <service-principal-client-id>) \
.option("cloudFiles.clientSecret", <service-principal-client-secret>) \
.option("path", <path-to-specific-container-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
.newManager() \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)
# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
Scala
/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////
import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
.newManager
.option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
.option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////
import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
.newManager
.option("cloudFiles.connectionString", <connection-string>)
.option("cloudFiles.resourceGroup", <resource-group>)
.option("cloudFiles.subscriptionId", <subscription-id>)
.option("cloudFiles.tenantId", <tenant-id>)
.option("cloudFiles.clientId", <service-principal-client-id>)
.option("cloudFiles.clientSecret", <service-principal-client-secret>)
.option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////
import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
.newManager
.option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
.create()
// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
// List notification services created by <AL>
val df = manager.listNotificationServices()
// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
Utilisez setUpNotificationServices(<resource-suffix>)
pour créer une file d’attente et un abonnement avec le nom <prefix>-<resource-suffix>
(le préfixe dépend du système de stockage résumé dans Ressources cloud utilisées dans le mode de notification de fichier Auto Loader. S’il existe une ressource portant le même nom, Azure Databricks réutilise la ressource qui existe déjà au lieu d’en créer une nouvelle. Cette fonction renvoie un identificateur de file d’attente que vous pouvez transmettre à la source cloudFiles
à l’aide de l’identificateur figurant dans Options de notification de fichiers. Cela permet à l’utilisateur de la source cloudFiles
d’avoir moins d’autorisations que l’utilisateur qui crée les ressources.
Fournissez l’option "path"
à newManager
uniquement si vous appelez setUpNotificationServices
; elle n’est pas nécessaire pour listNotificationServices
ou tearDownNotificationServices
. Il s’agit du même path
que celui utilisé lors de l’exécution d’une requête de diffusion en continu.
La matrice suivante indique quelles méthodes d’API sont prises en charge dans Databricks Runtime pour chaque type de stockage :
Cloud Storage | API Setup | API List | API Tear down |
---|---|---|---|
AWS S3 | Toutes les versions | Toutes les versions | Toutes les versions |
ADLS Gen2 | Toutes les versions | Toutes les versions | Toutes les versions |
GCS | Databricks Runtime 9.1 et versions ultérieures | Databricks Runtime 9.1 et versions ultérieures | Databricks Runtime 9.1 et versions ultérieures |
Stockage Blob Azure | Toutes les versions | Toutes les versions | Toutes les versions |
ADLS Gen1 | Non pris en charge | Non pris en charge | Non pris en charge |