Source de fichier Stockage Blob Azure avec Stockage File d’attente Azure (hérité)

Important

Cette documentation a été mise hors service et peut ne pas être mise à jour. Les produits, services ou technologies mentionnés dans ce contenu ne sont plus pris en charge. Consultez Qu’est-ce que Auto Loader ?.

Le connecteur ABS-AQS fournit une source de fichier optimisée qui utilise le Stockage File d’attente Azure (AQS) pour trouver de nouveaux fichiers écrits dans un conteneur de Stockage Blob Azure (ABS) sans devoir répertorier tous les fichiers de manière répétée. Cela offre deux avantages :

  • Latence faible : il n’est pas nécessaire de répertorier les structures de répertoire imbriquées sur ABS, ce qui est lent et gourmand en ressources.
  • Réduction des coûts : aucune demande d’API de liste coûteuse n’est envoyée à ABS.

Notes

La source ABS-AQS supprime les messages de la file d’attente AQS à mesure qu’elle consomme les événements. Si vous souhaitez que d’autres pipelines consomment les messages de cette file d’attente, configurez une file d’attente AQS séparée pour le lecteur optimisé. Vous pouvez configurer plusieurs abonnements Event Grid à publier dans différentes files d’attente.

Utiliser la source du fichier ABS-AQS

Pour utiliser la source du fichier ABS-AQS, vous devez :

  • Configurez des notifications d’événements ABS en tirant parti des abonnements Azure Event Grid et acheminez-les vers AQS. Consultez Réaction aux événements Stockage Blob

  • Spécifiez les options fileFormat et queueUrl et un schéma. Par exemple :

    spark.readStream \
      .format("abs-aqs") \
      .option("fileFormat", "json") \
      .option("queueName", ...) \
      .option("connectionString", ...) \
      .schema(...) \
      .load()
    

S’authentifier avec la Stockage de file d’attente Azure et le stockage d’objets Blob

Pour vous authentifier avec la file d’attente Azure Stockage et le stockage d’objets Blob, utilisez des jetons de Signature d’accès partagé (SAS) ou des clés de compte de stockage. Vous devez fournir une chaîne de connexion pour le compte de stockage dans lequel votre file d’attente est déployée, qui contient votre jeton SAS ou vos clés d’accès à votre compte de stockage. Pour plus d’informations, consultez Configuration des chaînes de connexion Stockage Azure.

Vous devrez également fournir l’accès à vos conteneurs de stockage d’objets BLOB Azure. Pour plus d’informations sur la configuration de l’accès à un conteneur de Stockage Blob Azure, consultez Se connecter à Azure Data Lake Storage Gen2 et au Stockage Blob.

Notes

Nous vous recommandons vivement d’utiliser des secrets pour fournir vos chaînes de connexion.

Configuration

Option Type Default Description
allowOverwrites Boolean true Indique si un objet BLOB qui est remplacé doit être retraité.
connectionString String Aucun (paramètre obligatoire) La chaîne de connexion pour accéder à votre file d'attente.
fetchParallelism Integer 1 Nombre de threads à utiliser lors de la récupération de messages auprès du service de mise en file d’attente.
fileFormat String Aucun (paramètre obligatoire) Format des fichiers tels que parquet, json, csv, text, et ainsi de suite.
ignoreFileDeletion Boolean false Si vous avez des configurations de cycle de vie ou si vous supprimez les fichiers sources manuellement, vous devez définir cette option sur true .
maxFileAge Integer 604800 Détermine la durée (en secondes) pendant laquelle les notifications de fichier sont stockées en tant qu’État pour empêcher le traitement des doublons.
pathRewrites Une chaîne JSON. "{}" Si vous utilisez des points de montage, vous pouvez réécrire le préfixe du chemin d’accès avec le point de container@storageAccount/key montage. Seuls les préfixes peuvent être réécrits. Par exemple, pour la configuration {"myContainer@myStorageAccount/path": "dbfs:/mnt/data-warehouse"}, le chemin d’accès wasbs://myContainer@myStorageAccount.blob.windows.core.net/path/2017/08/fileA.json est réécrit dans
dbfs:/mnt/data-warehouse/2017/08/fileA.json.
queueFetchInterval Chaîne de durée, par exemple, 2m pendant 2 minutes. "5s" Délai d’attente entre les extractions si la file d’attente est vide. Facture Azure par demande d’API à AQS. Par conséquent, si les données n’arrivent pas fréquemment, cette valeur peut être définie sur une durée longue. Tant que la file d’attente n’est pas vide, nous allons procéder à une extraction continue. Si de nouveaux fichiers sont créés toutes les 5 minutes, vous souhaiterez peut-être définir un niveau élevé queueFetchInterval pour réduire les coûts de AQS.
queueName String Aucun (paramètre obligatoire) Nom de la file d’attente AQS.

Si vous observez un grand nombre de messages dans les journaux des pilotes qui ressemblent à Fetched 0 new events and 3 old events., où vous avez tendance à observer un nombre d’événements beaucoup plus ancien que le nouveau, vous devez réduire l’intervalle de déclenchement de votre flux.

Si vous consommez des fichiers à partir d’un emplacement sur le stockage d’objets BLOB où vous vous attendez à ce que certains fichiers puissent être supprimés avant leur traitement, vous pouvez définir la configuration suivante pour ignorer l’erreur et continuer le traitement :

spark.sql("SET spark.sql.files.ignoreMissingFiles=true")

Forum Aux Questions (FAQ)

Si ignoreFileDeletion a la valeur false (valeur par défaut) et que l’objet a été supprimé, l’ensemble du pipeline échoue-t-il ?

Oui, si nous recevons un événement indiquant que le fichier a été supprimé, l’ensemble du pipeline est en échec.

Comment dois-je définir maxFileAge ?

Azure Queue Stockage fournit une sémantique de remise de messages au moins une fois, par conséquent, nous devons conserver l’état de la déduplication. Le paramètre par défaut pour maxFileAge est de 7 jours, ce qui est égal à la durée de vie maximale d’un message dans la file d’attente.