Share via


Azure Blob Storage-bestandsbron met Azure Queue Storage (verouderd)

Belangrijk

Deze documentatie is buiten gebruik gesteld en wordt mogelijk niet bijgewerkt. De producten, services of technologieën die in deze inhoud worden genoemd, worden niet meer ondersteund. Zie Wat is automatisch laadprogramma?

De ABS-AQS-connector biedt een geoptimaliseerde bestandsbron die gebruikmaakt van Azure Queue Storage (AQS) om nieuwe bestanden te vinden die zijn geschreven naar een Azure Blob Storage-container (ABS) zonder dat alle bestanden herhaaldelijk worden vermeld. Dit biedt twee voordelen:

  • Lagere latentie: u hoeft geen geneste mapstructuren op ABS weer te geven, wat traag en resource-intensief is.
  • Lagere kosten: geen kostbare LIST API-aanvragen die zijn gedaan bij ABS.

Notitie

De ABS-AQS-bron verwijdert berichten uit de AQS-wachtrij wanneer deze gebeurtenissen verbruikt. Als u wilt dat andere pijplijnen berichten uit deze wachtrij gebruiken, stelt u een afzonderlijke AQS-wachtrij in voor de geoptimaliseerde lezer. U kunt meerdere Event Grid-abonnementen instellen om te publiceren naar verschillende wachtrijen.

De ABS-AQS-bestandsbron gebruiken

Als u de ABS-AQS-bestandsbron wilt gebruiken, moet u het volgende doen:

  • Stel ABS-gebeurtenismeldingen in door gebruik te maken van Azure Event Grid-abonnementen en deze door te sturen naar AQS. Zie Reageren op Blob Storage-gebeurtenissen.

  • Geef de fileFormat opties en queueUrl een schema op. Voorbeeld:

    spark.readStream \
      .format("abs-aqs") \
      .option("fileFormat", "json") \
      .option("queueName", ...) \
      .option("connectionString", ...) \
      .schema(...) \
      .load()
    

Verifiëren met Azure Queue Storage en Blob Storage

Als u wilt verifiëren met Azure Queue Storage en Blob Storage, gebruikt u SAS-tokens (Shared Access Signature) of opslagaccountsleutels. U moet een verbindingsreeks opgeven voor het opslagaccount waarin uw wachtrij is geïmplementeerd met uw SAS-token of toegangssleutels voor uw opslagaccount. Zie Azure Storage-verbindingsreeks s configureren voor meer informatie.

U moet ook toegang bieden tot uw Azure Blob Storage-containers. Zie Verbinding maken naar Azure Data Lake Storage Gen2 en Blob Storage voor informatie over het configureren van toegang tot uw Azure Blob Storage-container.

Notitie

We raden u ten zeerste aan geheimen te gebruiken voor het leveren van uw verbindingsreeks s.

Configuratie

Optie Type Standaard Beschrijving
allowOverwrites Booleaanse waarde true Of een blob die wordt overschreven, opnieuw moet worden verwerkt.
connectionString String Geen (vereiste param) De verbindingsreeks voor toegang tot uw wachtrij.
fetchParallelism Geheel getal 1 Het aantal threads dat moet worden gebruikt bij het ophalen van berichten uit de wachtrijservice.
fileFormat String Geen (vereiste param) De indeling van de bestanden, zoals parquet, json, csv, , textenzovoort.
ignoreFileDeletion Booleaanse waarde false Als u levenscyclusconfiguraties hebt of als u de bronbestanden handmatig verwijdert, moet u deze optie instellen op true.
maxFileAge Geheel getal 604800 Bepaalt hoe lang (in seconden) bestandsmeldingen worden opgeslagen als status om dubbele verwerking te voorkomen.
pathRewrites Een JSON-tekenreeks. "{}" Als u koppelpunten gebruikt, kunt u het voorvoegsel van het container@storageAccount/key pad herschrijven met het koppelpunt. Alleen voorvoegsels kunnen opnieuw worden geschreven. Voor de configuratie {"myContainer@myStorageAccount/path": "dbfs:/mnt/data-warehouse"}wordt het pad wasbs://myContainer@myStorageAccount.blob.windows.core.net/path/2017/08/fileA.json bijvoorbeeld herschreven naar
dbfs:/mnt/data-warehouse/2017/08/fileA.json.
queueFetchInterval Een duurtekenreeks, bijvoorbeeld 2m voor 2 minuten. "5s" Hoe lang moet worden gewacht tussen het ophalen als de wachtrij leeg is. Azure-kosten per API-aanvraag voor AQS. Als gegevens niet vaak binnenkomen, kan deze waarde daarom worden ingesteld op een lange duur. Zolang de wachtrij niet leeg is, worden we continu opgehaald. Als er om de 5 minuten nieuwe bestanden worden gemaakt, wilt u mogelijk een hoge prijs queueFetchInterval instellen om de AQS-kosten te verlagen.
queueName String Geen (vereiste param) De naam van de AQS-wachtrij.

Als u veel berichten in de stuurprogrammalogboeken ziet die er als Fetched 0 new events and 3 old events.volgt uitzien, waar u vaak veel meer oude gebeurtenissen dan nieuw ziet, moet u het triggerinterval van uw stream verminderen.

Als u bestanden gebruikt vanaf een locatie in Blob Storage waar u verwacht dat sommige bestanden kunnen worden verwijderd voordat ze kunnen worden verwerkt, kunt u de volgende configuratie instellen om de fout te negeren en door te gaan met verwerken:

spark.sql("SET spark.sql.files.ignoreMissingFiles=true")

Veelgestelde vragen

Als ignoreFileDeletion onwaar (standaard) is en het object is verwijderd, mislukt de hele pijplijn?

Ja, als we een gebeurtenis ontvangen waarin staat dat het bestand is verwijderd, mislukt de hele pijplijn.

Hoe moet ik instellen maxFileAge?

Azure Queue Storage biedt ten minste één keer de semantiek voor de bezorging van berichten. Daarom moeten we de status voor ontdubbeling behouden. De standaardinstelling is maxFileAge 7 dagen, die gelijk is aan de maximale TTL van een bericht in de wachtrij.