Partager via


Qu’est-ce que le mode de notification de fichier Auto Loader ?

Dans le mode Notification de fichiers, Auto Loader configure automatiquement un service de notification et un service de file d’attente qui s’abonnent aux événements de fichiers du répertoire d’entrée. Vous pouvez utiliser les notifications de fichiers pour adapter l'Auto Loader à l'ingestion de millions de fichiers par heure. Comparé au mode de liste de répertoire, le mode Notification de fichiers est plus performant et évolutif pour les répertoires d’entrée volumineux ou un nombre élevé de fichiers, mais il nécessite des autorisations supplémentaires sur le cloud.

Vous pouvez basculer entre les notifications de fichiers et la liste de répertoires à tout moment tout en conservant la garantie de traitement des données « une seule fois ».

Remarque

Le mode de notification de fichier n’est pas pris en charge pour les comptes Stockage Premium Azure, car les comptes Premium ne prennent pas en charge le stockage en file d’attente.

Avertissement

La modification du chemin d’accès source pour Auto Loader n’est pas prise en charge pour le mode de notification de fichier. Si le mode de notification de fichier est utilisé et que le chemin d’accès a été modifié, vous risquez de ne pas ingérer les fichiers qui sont déjà présents dans le nouveau répertoire lors de sa mise à jour.

Ressources cloud utilisées en mode de notification de fichier Auto Loader

Important

Vous avez besoin d’autorisations élevées pour configurer automatiquement l’infrastructure cloud pour le mode de notification de fichier. Contactez votre administrateur cloud ou administrateur d’espace de travail. Voir :

Auto Loader peut configurer automatiquement des notifications de fichiers pour vous lorsque vous définissez l’option cloudFiles.useNotifications sur true et que vous fournissez les autorisations nécessaires pour créer des ressources cloud. En outre, vous devrez peut-être fournir des options supplémentaires pour donner à Auto Loader l’autorisation de créer ces ressources.

Le tableau suivant résume les ressources qui sont créées par Auto Loader.

Cloud Storage Service d’abonnement Service File d’attente Préfixe * Limite **
AWS S3 AWS SNS AWS SQS databricks-auto-ingest 100 par compartiment S3
ADLS Gen2 Azure Event Grid Stockage File d’attente Azure databricks 500 par compte de stockage
GCS Google Pub/Sub Google Pub/Sub databricks-auto-ingest 100 par compartiment GCS
Stockage Blob Azure Azure Event Grid Stockage File d’attente Azure databricks 500 par compte de stockage
  • Auto Loader nomme les ressources avec ce préfixe.

** Nombre de pipelines de notification de fichiers pouvant être lancés simultanément

Si vous avez besoin d’exécuter plus que le nombre limité de pipelines de notification de fichiers pour un compte de stockage donné, vous pouvez :

  • Utiliser un service tel que AWS Lambda, Azure Functions ou Google Cloud Functions pour répartir les notifications d’une file d’attente unique qui écoute un conteneur ou un compartiment entier dans des files d’attente spécifiques aux répertoires.

Événements de notification de fichiers

AWS S3 fournit un événement ObjectCreated lorsqu’un fichier est chargé dans un compartiment S3, qu’il ait été chargé par un chargement put ou en plusieurs parties.

ADLS Gen2 fournit différentes notifications d’événements pour les fichiers qui apparaissent dans votre conteneur Gen2.

  • Auto Loader écoute l’événement FlushWithClose pour traiter un fichier.
  • Les flux Auto Loader prennent en charge l’action RenameFile de découverte de fichiers. Les actions RenameFile nécessitent l’envoi d’une demande d’API au système de stockage pour obtenir la taille du fichier renommé.
  • Les flux Auto Loader créés avec Databricks Runtime 9.0 et versions ultérieures prennent en charge l’action RenameDirectory pour découvrir les fichiers. Les actions RenameDirectory nécessitent l’envoi de demandes d’API au système de stockage pour lister le contenu du répertoire renommé.

Google Cloud Storage fournit un événement OBJECT_FINALIZE lorsqu’un fichier est chargé, ce qui inclut les remplacements et les copies de fichiers. Les échecs de chargement ne génèrent pas cet événement.

Notes

Les fournisseurs de cloud ne garantissent pas une livraison à 100 % de tous les événements de fichiers dans des conditions très rares et ne fournissent pas de SLA stricts sur la latence des événements de fichiers. Databricks vous recommande de déclencher des renvois standard avec Auto Loader à l’aide de l’option cloudFiles.backfillInterval pour garantir que tous les fichiers sont découverts dans un SLA donné si l’exhaustivité des données est obligatoire. Le déclenchement de renvois normaux n’entraîne pas de doublons.

Autorisations requises pour configurer la notification de fichier pour ADLS Gen2 et Stockage Blob Azure

Vous devez disposer d’autorisations de lecture pour le répertoire d’entrée. Consultez Stockage Blob Azure.

Pour utiliser le mode de notification de fichiers, vous devez fournir des informations d’authentification pour configurer les services de notification d’événements et y accéder. Vous avez uniquement besoin d’un principal de service pour l’authentification.

  • Principal de service - Utilisation de rôles intégrés Azure

    Créez une application et un principal de service Microsoft Entra ID (anciennement Azure Active Directory) sous la forme d’un ID client et d’une clé secrète client.

    Attribuez à cette application les rôles suivants pour le compte de stockage dans lequel se trouve le chemin d’entrée :

    • Contributor : Ce rôle sert à configurer les ressources de votre compte de stockage, telles que les files d’attente et les abonnements aux événements.
    • Storage Queue Data Contributor : Ce rôle sert à effectuer des opérations sur les files d’attente, telles que la récupération et la suppression de messages dans les files d’attente. Ce rôle est obligatoire uniquement lorsque vous fournissez un principal de service sans chaîne de connexion.

    Attribuez à cette application le rôle suivant pour le groupe de ressources associé :

    • EventGrid EventSubscription Contributor : Ce rôle sert à effectuer des opérations sur les abonnements à la grille d’événements, telles que la création ou l’énumération des abonnements aux événements.

    Pour plus d’informations, consultez Attribuer des rôles Azure en utilisant le portail Azure.

  • Principal de service - utilisation d’un rôle personnalisé

    Si vous êtes préoccupé par les autorisations excessives requises pour les rôles précédents, vous pouvez créer un Rôle personnalisé avec au moins les autorisations suivantes, listées ci-dessous au format JSON de rôle Azure :

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    Ensuite, vous pouvez attribuer ce rôle personnalisé à votre application.

    Pour plus d’informations, consultez Attribuer des rôles Azure en utilisant le portail Azure.

Autorisations d’Auto loader

Résolution des erreurs courantes

Erreur :

java.lang.RuntimeException: Failed to create event grid subscription.

Si ce message d’erreur s’affiche lorsque vous exécutez Auto Loader pour la première fois, cela signifie que la grille d’événements n’est pas inscrite en tant que fournisseur de ressources dans votre abonnement Azure. Pour l’inscrire sur Portail Azure :

  1. Accédez à votre abonnement.
  2. Cliquez sur Fournisseurs de ressources sous la section Paramètres.
  3. Inscrivez le fournisseur Microsoft.EventGrid.

Erreur :

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

Si ce message d’erreur s’affiche lorsque vous exécutez Auto Loader pour la première fois, vérifiez que vous avez attribué le rôle Contributeur à votre principal de service pour Event Grid, ainsi qu’à votre compte de stockage.

Autorisations requises pour configurer la notification de fichier pour AWS S3

Vous devez disposer d’autorisations de lecture pour le répertoire d’entrée. Pour plus d’informations, consultez les détails de la connexion S3.

Pour utiliser le mode Notification de fichiers, joignez le document de stratégie JSON suivant à votre utilisateur ou rôle IAM.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": [
        "sqs:ListQueues",
        "sqs:ListQueueTags",
        "sns:ListTopics"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": [
        "sns:Unsubscribe",
        "sns:DeleteTopic",
        "sqs:DeleteQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

où :

  • <bucket-name> : Nom du compartiment S3 dans lequel votre flux lira des fichiers, par exemple auto-logs. Vous pouvez utiliser * comme caractère générique, par exemple databricks-*-logs. Pour connaître le compartiment S3 sous-jacent de votre chemin d’accès DBFS, vous pouvez répertorier tous les points de montage DBFS d’un notebook en exécutant %fs mounts.
  • <region> : Région AWS dans laquelle réside le compartiment S3, par exemple us-west-2. Si vous ne souhaitez pas spécifier la région, utilisez *.
  • <account-number> : Numéro de compte AWS qui possède le compartiment S3, par exemple 123456789012. Si vous ne souhaitez pas spécifier le numéro de compte, utilisez *.

La chaîne databricks-auto-ingest-* dans la spécification ARN SQS et SNS est le préfixe de nom que la source cloudFiles utilise lors de la création des services SQS et SNS. Étant donné qu’Azure Databricks configure les services de notification lors de l’exécution initiale du flux, vous pouvez utiliser une stratégie avec des autorisations réduites après l’exécution initiale (par exemple, arrêter le flux, puis le redémarrer).

Notes

La stratégie précédente concerne uniquement les autorisations nécessaires pour configurer les services de notification de fichiers, à savoir les services de notification de compartiment S3, SNS et SQS, et suppose que vous disposez déjà d’un accès en lecture au compartiment S3. Si vous devez ajouter des autorisations S3 en lecture seule, ajoutez ce qui suit à la liste Action dans l’instruction DatabricksAutoLoaderSetup dans le document JSON :

  • s3:ListBucket
  • s3:GetObject

Autorisations réduites après la configuration initiale

Les autorisations de configuration des ressources décrites ci-dessus sont requises uniquement lors de l’exécution initiale du flux. Après la première exécution, vous pouvez passer à la stratégie IAM suivante avec des autorisations réduites.

Important

Avec les autorisations réduites, vous ne pouvez pas démarrer de nouvelles requêtes de streaming ni recréer des ressources en cas d’échec (par exemple, la file d’attente SQS a été supprimée par inadvertance). Vous ne pouvez pas non plus utiliser l’API de gestion des ressources cloud pour répertorier ou détruire des ressources.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:<queue-name>",
       "arn:aws:sns:<region>:<account-number>:<topic-name>",
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketLocation",
       "s3:ListBucket"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:PutObject",
       "s3:PutObjectAcl",
       "s3:GetObject",
       "s3:DeleteObject"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>/*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": [
       "sqs:ListQueues",
       "sqs:ListQueueTags",
       "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}

Autorisations requises pour configurer la notification de fichier pour GCS

Vous devez disposer des autorisations list et get sur votre compartiment GCS et sur tous les objets. Pour plus d’informations, consultez la documentation de Google sur les autorisations IAM.

Pour utiliser le mode Notification de fichiers, vous devez ajouter des autorisations pour le compte de service GCS et pour le compte utilisé pour accéder aux ressources Google Cloud Pub/Sub.

Ajoutez le rôle Pub/Sub Publisher au compte de service GCS. Cela permet au compte de publier des messages de notification d’événements de vos compartiments GCS vers Google Cloud Pub/Sub.

Comme pour le compte de service utilisé pour les ressources Google Cloud Pub/Sub, vous devez ajouter les autorisations suivantes :

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

Pour ce faire, vous pouvez soit créer un rôle personnalisé IAM avec ces autorisations, soit attribuer des rôles GCP préexistants pour couvrir ces autorisations.

Recherche du compte de service GCS

Dans la console Google Cloud pour le projet correspondant, accédez à Cloud Storage > Settings. La section « Compte de service de stockage cloud » contient l’e-mail du compte de service GCS.

Compte de service GCS

Création d’un rôle IAM Google Cloud personnalisé pour le mode Notification de fichiers

Dans la console Google Cloud pour le projet correspondant, accédez à IAM & Admin > Roles. Ensuite, créez un rôle en haut de l’écran ou mettez à jour un rôle existant. Dans l’écran de création ou de modification du rôle, cliquez sur Add Permissions. Dans le menu qui s’affiche, vous pouvez ajouter les autorisations souhaitées au rôle.

Rôles personnalisés GCP IAM

Configurer ou gérer manuellement des ressources de notification de fichier

Les utilisateurs privilégiés peuvent configurer ou gérer manuellement des ressources de notification de fichier.

  • Configurez manuellement les services de notification de fichiers via le fournisseur de cloud et spécifiez manuellement l’identificateur de file d’attente. Pour plus d’informations, consultez Options de notification de fichiers.
  • Utilisez les API Scala pour créer ou gérer les services de notifications et de mise en file d’attente, comme illustré dans l’exemple suivant :

Remarque

Vous devez disposer des autorisations appropriées pour configurer ou modifier l’infrastructure cloud. Consultez la documentation sur les autorisations pour Azure, S3 ou GCS.

Python

# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds

# COMMAND ----------

#####################################
## Creating a ResourceManager in AWS
#####################################

manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in Azure
#######################################

manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.connectionString", <connection-string>) \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("cloudFiles.tenantId", <tenant-id>) \
  .option("cloudFiles.clientId", <service-principal-client-id>) \
  .option("cloudFiles.clientSecret", <service-principal-client-secret>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices())

# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Scala

/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by <AL>
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Utilisez setUpNotificationServices(<resource-suffix>) pour créer une file d’attente et un abonnement avec le nom <prefix>-<resource-suffix> (le préfixe dépend du système de stockage résumé dans Ressources cloud utilisées dans le mode de notification de fichier Auto Loader. S’il existe une ressource portant le même nom, Azure Databricks réutilise la ressource qui existe déjà au lieu d’en créer une nouvelle. Cette fonction renvoie un identificateur de file d’attente que vous pouvez transmettre à la source cloudFiles à l’aide de l’identificateur figurant dans Options de notification de fichiers. Cela permet à l’utilisateur de la source cloudFiles d’avoir moins d’autorisations que l’utilisateur qui crée les ressources.

Fournissez l’option "path" à newManager uniquement si vous appelez setUpNotificationServices ; elle n’est pas nécessaire pour listNotificationServices ou tearDownNotificationServices. Il s’agit du même path que celui utilisé lors de l’exécution d’une requête de diffusion en continu.

La matrice suivante indique quelles méthodes d’API sont prises en charge dans Databricks Runtime pour chaque type de stockage :

Cloud Storage API Setup API List API Tear down
AWS S3 Toutes les versions Toutes les versions Toutes les versions
ADLS Gen2 Toutes les versions Toutes les versions Toutes les versions
GCS Databricks Runtime 9.1 et versions ultérieures Databricks Runtime 9.1 et versions ultérieures Databricks Runtime 9.1 et versions ultérieures
Stockage Blob Azure Toutes les versions Toutes les versions Toutes les versions
ADLS Gen1 Non pris en charge Non pris en charge Non pris en charge