Azure Blob Storage-bestandsbron met Azure Queue Storage (verouderd)
Belangrijk
Deze documentatie is buiten gebruik gesteld en wordt mogelijk niet bijgewerkt. De producten, services of technologieën die in deze inhoud worden genoemd, worden niet meer ondersteund. Zie Wat is automatisch laadprogramma?
De ABS-AQS-connector biedt een geoptimaliseerde bestandsbron die gebruikmaakt van Azure Queue Storage (AQS) om nieuwe bestanden te vinden die zijn geschreven naar een Azure Blob Storage-container (ABS) zonder dat alle bestanden herhaaldelijk worden vermeld. Dit biedt twee voordelen:
- Lagere latentie: u hoeft geen geneste mapstructuren op ABS weer te geven, wat traag en resource-intensief is.
- Lagere kosten: geen kostbare LIST API-aanvragen die zijn gedaan bij ABS.
Notitie
De ABS-AQS-bron verwijdert berichten uit de AQS-wachtrij wanneer deze gebeurtenissen verbruikt. Als u wilt dat andere pijplijnen berichten uit deze wachtrij gebruiken, stelt u een afzonderlijke AQS-wachtrij in voor de geoptimaliseerde lezer. U kunt meerdere Event Grid-abonnementen instellen om te publiceren naar verschillende wachtrijen.
De ABS-AQS-bestandsbron gebruiken
Als u de ABS-AQS-bestandsbron wilt gebruiken, moet u het volgende doen:
Stel ABS-gebeurtenismeldingen in door gebruik te maken van Azure Event Grid-abonnementen en deze door te sturen naar AQS. Zie Reageren op Blob Storage-gebeurtenissen.
Geef de
fileFormat
opties enqueueUrl
een schema op. Voorbeeld:spark.readStream \ .format("abs-aqs") \ .option("fileFormat", "json") \ .option("queueName", ...) \ .option("connectionString", ...) \ .schema(...) \ .load()
Verifiëren met Azure Queue Storage en Blob Storage
Als u wilt verifiëren met Azure Queue Storage en Blob Storage, gebruikt u SAS-tokens (Shared Access Signature) of opslagaccountsleutels. U moet een verbindingsreeks opgeven voor het opslagaccount waarin uw wachtrij is geïmplementeerd met uw SAS-token of toegangssleutels voor uw opslagaccount. Zie Azure Storage-verbindingsreeks s configureren voor meer informatie.
U moet ook toegang bieden tot uw Azure Blob Storage-containers. Zie Verbinding maken naar Azure Data Lake Storage Gen2 en Blob Storage voor informatie over het configureren van toegang tot uw Azure Blob Storage-container.
Notitie
We raden u ten zeerste aan geheimen te gebruiken voor het leveren van uw verbindingsreeks s.
Configuratie
Optie | Type | Standaard | Beschrijving |
---|---|---|---|
allowOverwrites | Booleaanse waarde | true |
Of een blob die wordt overschreven, opnieuw moet worden verwerkt. |
connectionString | String | Geen (vereiste param) | De verbindingsreeks voor toegang tot uw wachtrij. |
fetchParallelism | Geheel getal | 1 | Het aantal threads dat moet worden gebruikt bij het ophalen van berichten uit de wachtrijservice. |
fileFormat | String | Geen (vereiste param) | De indeling van de bestanden, zoals parquet , json , csv , , text enzovoort. |
ignoreFileDeletion | Booleaanse waarde | false |
Als u levenscyclusconfiguraties hebt of als u de bronbestanden handmatig verwijdert, moet u deze optie instellen op true . |
maxFileAge | Geheel getal | 604800 | Bepaalt hoe lang (in seconden) bestandsmeldingen worden opgeslagen als status om dubbele verwerking te voorkomen. |
pathRewrites | Een JSON-tekenreeks. | "{}" |
Als u koppelpunten gebruikt, kunt u het voorvoegsel van het container@storageAccount/key pad herschrijven met het koppelpunt. Alleen voorvoegsels kunnen opnieuw worden geschreven. Voor de configuratie {"myContainer@myStorageAccount/path": "dbfs:/mnt/data-warehouse"} wordt het pad wasbs://myContainer@myStorageAccount.blob.windows.core.net/path/2017/08/fileA.json bijvoorbeeld herschreven naardbfs:/mnt/data-warehouse/2017/08/fileA.json . |
queueFetchInterval | Een duurtekenreeks, bijvoorbeeld 2m voor 2 minuten. |
"5s" |
Hoe lang moet worden gewacht tussen het ophalen als de wachtrij leeg is. Azure-kosten per API-aanvraag voor AQS. Als gegevens niet vaak binnenkomen, kan deze waarde daarom worden ingesteld op een lange duur. Zolang de wachtrij niet leeg is, worden we continu opgehaald. Als er om de 5 minuten nieuwe bestanden worden gemaakt, wilt u mogelijk een hoge prijs queueFetchInterval instellen om de AQS-kosten te verlagen. |
queueName | String | Geen (vereiste param) | De naam van de AQS-wachtrij. |
Als u veel berichten in de stuurprogrammalogboeken ziet die er als Fetched 0 new events and 3 old events.
volgt uitzien, waar u vaak veel meer oude gebeurtenissen dan nieuw ziet, moet u het triggerinterval van uw stream verminderen.
Als u bestanden gebruikt vanaf een locatie in Blob Storage waar u verwacht dat sommige bestanden kunnen worden verwijderd voordat ze kunnen worden verwerkt, kunt u de volgende configuratie instellen om de fout te negeren en door te gaan met verwerken:
spark.sql("SET spark.sql.files.ignoreMissingFiles=true")
Veelgestelde vragen
Als ignoreFileDeletion
onwaar (standaard) is en het object is verwijderd, mislukt de hele pijplijn?
Ja, als we een gebeurtenis ontvangen waarin staat dat het bestand is verwijderd, mislukt de hele pijplijn.
Hoe moet ik instellen maxFileAge
?
Azure Queue Storage biedt ten minste één keer de semantiek voor de bezorging van berichten. Daarom moeten we de status voor ontdubbeling behouden. De standaardinstelling is maxFileAge
7 dagen, die gelijk is aan de maximale TTL van een bericht in de wachtrij.