Share via


Azure Blob Storage-filkälla med Azure Queue Storage (äldre)

Viktigt!

Den här dokumentationen har dragits tillbaka och kanske inte uppdateras. De produkter, tjänster eller tekniker som nämns i det här innehållet stöds inte längre. Se Vad är automatisk inläsare?.

ABS-AQS-anslutningstjänsten tillhandahåller en optimerad filkälla som använder Azure Queue Storage (AQS) för att hitta nya filer som skrivits till en Azure Blob Storage-container (ABS) utan att flera gånger lista alla filer. Detta ger två fördelar:

  • Kortare svarstid: du behöver inte lista kapslade katalogstrukturer i ABS, vilket är långsamt och resursintensivt.
  • Lägre kostnader: inga dyrare API-begäranden till ABS.

Kommentar

ABS-AQS-källan tar bort meddelanden från AQS-kön när den förbrukar händelser. Om du vill att andra pipelines ska använda meddelanden från den här kön konfigurerar du en separat AQS-kö för den optimerade läsaren. Du kan konfigurera flera Event Grid-prenumerationer för att publicera till olika köer.

Använda ABS-AQS-filkällan

Om du vill använda ABS-AQS-filkällan måste du:

  • Konfigurera ABS-händelseaviseringar genom att använda Azure Event Grid-prenumerationer och dirigera dem till AQS. Se Reagera på Blob Storage-händelser.

  • fileFormat Ange alternativen och queueUrl och ett schema. Till exempel:

    spark.readStream \
      .format("abs-aqs") \
      .option("fileFormat", "json") \
      .option("queueName", ...) \
      .option("connectionString", ...) \
      .schema(...) \
      .load()
    

Autentisera med Azure Queue Storage och Blob Storage

Om du vill autentisera med Azure Queue Storage och Blob Storage använder du SAS-token (Signatur för delad åtkomst) eller lagringskontonycklar. Du måste ange en anslutningssträng för lagringskontot där din kö distribueras som innehåller antingen din SAS-token eller åtkomstnycklar till ditt lagringskonto. Mer information finns i Konfigurera Azure Storage-anslutningssträng.

Du måste också ge åtkomst till dina Azure Blob Storage-containrar. Mer information om hur du konfigurerar åtkomst till din Azure Blob Storage-container finns i Anslut till Azure Data Lake Storage Gen2 och Blob Storage.

Kommentar

Vi rekommenderar starkt att du använder hemligheter för att tillhandahålla dina anslutningssträng.

Konfiguration

Alternativ Typ Standardvärde beskrivning
allowOverwrites Booleskt true Om en blob som skrivs över ska bearbetas på nytt.
Connectionstring String Ingen (obligatorisk param) Anslutningssträng för att komma åt din kö.
fetchParallelism Integer 1 Antal trådar som ska användas när meddelanden hämtas från kötjänsten.
fileFormat String Ingen (obligatorisk param) Formatet för filer som parquet, json, csv, textoch så vidare.
ignoreFileDeletion Booleskt false Om du har livscykelkonfigurationer eller om du tar bort källfilerna manuellt måste du ange det här alternativet till true.
maxFileAge Integer 604800 Avgör hur länge (i sekunder) filmeddelanden lagras som tillstånd för att förhindra duplicerad bearbetning.
pathRewrites En JSON-sträng. "{}" Om du använder monteringspunkter kan du skriva om prefixet för container@storageAccount/key sökvägen med monteringspunkten. Det går bara att skriva om prefix. För konfigurationen {"myContainer@myStorageAccount/path": "dbfs:/mnt/data-warehouse"}skrivs till exempel sökvägen wasbs://myContainer@myStorageAccount.blob.windows.core.net/path/2017/08/fileA.json om till
dbfs:/mnt/data-warehouse/2017/08/fileA.json.
queueFetchInterval En varaktighetssträng, till exempel 2m i 2 minuter. "5s" Hur lång tid det är att vänta mellan hämtningarna om kön är tom. Azure-avgifter per API-begäran till AQS. Om data inte kommer ofta kan det här värdet därför anges till en lång varaktighet. Så länge kön inte är tom hämtar vi kontinuerligt. Om nya filer skapas var femte minut kanske du vill ange en hög queueFetchInterval för att minska AQS-kostnaderna.
queueName String Ingen (obligatorisk param) Namnet på AQS-kön.

Om du ser många meddelanden i drivrutinsloggarna som ser ut som Fetched 0 new events and 3 old events., där du tenderar att observera många fler gamla händelser än nya, bör du minska utlösarintervallet för strömmen.

Om du använder filer från en plats i Blob Storage där du förväntar dig att vissa filer kan tas bort innan de kan bearbetas kan du ange följande konfiguration för att ignorera felet och fortsätta bearbetningen:

spark.sql("SET spark.sql.files.ignoreMissingFiles=true")

Vanliga frågor och svar

Om ignoreFileDeletion är False (standard) och objektet har tagits bort, kommer hela pipelinen att misslyckas?

Ja, om vi får en händelse som anger att filen har tagits bort misslyckas hela pipelinen.

Hur ska jag ställa in maxFileAge?

Azure Queue Storage tillhandahåller semantik för meddelandeleverans minst en gång, därför måste vi behålla tillståndet för deduplicering. Standardinställningen för är 7 dagar, vilket är lika med maximalT TTL för maxFileAge ett meddelande i kön.