Använda frågeparallellisering i Azure Stream Analytics

Den här artikeln visar hur du drar nytta av parallellisering i Azure Stream Analytics. Du lär dig hur du skalar Stream Analytics-jobb genom att konfigurera indatapartitioner och justera analysfrågedefinitionen.

Som en förutsättning kanske du vill känna till begreppet strömningsenhet som beskrivs i Förstå och justera strömningsenheter.

Vilka är delarna i ett Stream Analytics-jobb?

En Stream Analytics-jobbdefinition innehåller minst en strömmande indata, en fråga och utdata. Indata är där jobbet läser dataströmmen från. Frågan används för att transformera dataindataströmmen och utdata är där jobbet skickar jobbresultatet till.

Partitioner i indata och utdata

Med partitionering kan du dela upp data i delmängder baserat på en partitionsnyckel. Om dina indata (till exempel Event Hubs) partitioneras av en nyckel rekommenderar vi att du anger partitionsnyckeln när du lägger till indata i Stream Analytics-jobbet. Skalning av ett Stream Analytics-jobb drar nytta av partitioner i indata och utdata. Ett Stream Analytics-jobb kan använda och skriva olika partitioner parallellt, vilket ökar dataflödet.

Indata

Alla Indata för Azure Stream Analytics-strömning kan dra nytta av partitionering: Event Hubs, IoT Hub, Blob Storage, Data Lake Storage Gen2.

Kommentar

För kompatibilitetsnivå 1.2 och senare ska partitionsnyckeln anges som en indataegenskap, utan att du behöver nyckelordet PARTITION BY i frågan. För kompatibilitetsnivå 1.1 och lägre måste partitionsnyckeln i stället definieras med nyckelordet PARTITION BY i frågan.

Utdata

När du arbetar med Stream Analytics kan du dra nytta av partitionering i utdata:

  • Azure Data Lake Storage
  • Azure Functions
  • Azure Table
  • Blob storage (kan uttryckligen ange partitionsnyckeln)
  • Azure Cosmos DB (måste ange partitionsnyckeln explicit)
  • Event Hubs (måste ange partitionsnyckeln explicit)
  • IoT Hub (måste ange partitionsnyckeln explicit)
  • Service Bus
  • SQL och Azure Synapse Analytics med valfri partitionering: se mer information på sidan Utdata till Azure SQL Database.

Power BI stöder inte partitionering. Du kan dock fortfarande partitioneras indata enligt beskrivningen i det här avsnittet.

Mer information om partitioner finns i följande artiklar:

Fråga

För att ett jobb ska vara parallellt måste partitionsnycklar justeras mellan alla indata, alla frågelogiksteg och alla utdata. Frågelogikpartitioneringen bestäms av de nycklar som används för kopplingar och sammansättningar (GROUP BY). Det här sista kravet kan ignoreras om frågelogik inte är nyckelad (projektion, filter, referenskopplingar...).

  • Om indata och utdata partitioneras av WarehouseId, och frågan grupperas ProductId utan WarehouseId, är jobbet inte parallellt.
  • Om två indata som ska anslutas partitioneras av olika partitionsnycklar (WarehouseId och ProductId) är jobbet inte parallellt.
  • Om två eller flera oberoende dataflöden finns i ett enda jobb, var och en med en egen partitionsnyckel, är jobbet inte parallellt.

Endast när alla indata, utdata och frågesteg använder samma nyckel är jobbet parallellt.

Pinsamt parallella jobb

Ett pinsamt parallellt jobb är det mest skalbara scenariot i Azure Stream Analytics. Den ansluter en partition av indata till en instans av frågan till en partition av utdata. Den här parallelliteten har följande krav:

  • Om din frågelogik är beroende av att samma nyckel bearbetas av samma frågeinstans måste du se till att händelserna går till samma partition av dina indata. För Event Hubs eller IoT Hub innebär det att händelsedata måste ha värdet PartitionKey inställt. Du kan också använda partitionerade avsändare. För bloblagring innebär det att händelserna skickas till samma partitionsmapp. Ett exempel är en frågeinstans som aggregerar data per userID där indatahändelsehubben partitioneras med userID som partitionsnyckel. Men om frågelogik inte kräver att samma nyckel bearbetas av samma frågeinstans kan du ignorera det här kravet. Ett exempel på den här logiken är en enkel select-project-filter-fråga.

  • Nästa steg är att partitionera frågan. För jobb med kompatibilitetsnivå 1.2 eller senare (rekommenderas) kan anpassad kolumn anges som partitionsnyckel i indatainställningarna och jobbet kommer att vara parallellt automatiskt. Jobb med kompatibilitetsnivå 1.0 eller 1.1 kräver att du använder PARTITION BY PartitionId i alla steg i frågan. Flera steg är tillåtna, men alla måste partitioneras med samma nyckel.

  • De flesta utdata som stöds i Stream Analytics kan dra nytta av partitionering. Om du använder en utdatatyp som inte stöder partitionering av jobbet blir det inte pinsamt parallellt. För Event Hubs-utdata kontrollerar du att kolumnen Partitionsnyckel är inställd på samma partitionsnyckel som används i frågan. Mer information finns i avsnittet utdata.

  • Antalet indatapartitioner måste vara lika med antalet utdatapartitioner. Blob Storage-utdata kan stödja partitioner och ärver partitioneringsschemat för den överordnade frågan. När en partitionsnyckel för Blob Storage anges partitioneras data per indatapartition, vilket innebär att resultatet fortfarande är helt parallellt. Här är exempel på partitionsvärden som tillåter ett helt parallellt jobb:

    • Åtta indatapartitioner för händelsehubben och åtta utdatapartitioner för händelsehubben
    • Åtta indatapartitioner för händelsehubben och bloblagringsutdata
    • Åtta indatapartitioner för händelsehubben och bloblagringsutdata partitionerade av ett anpassat fält med godtycklig kardinalitet
    • Åtta bloblagringsindatapartitioner och bloblagringsutdata
    • Åtta indatapartitioner för bloblagring och åtta utdatapartitioner för händelsehubben

I följande avsnitt beskrivs några exempelscenarier som är pinsamt parallella.

Exempelfråga

  • Indata: En händelsehubb med åtta partitioner
  • Utdata: En händelsehubb med åtta partitioner ("Partitionsnyckelkolumn" måste anges att använda PartitionId)

Fråga:

    --Using compatibility level 1.2 or above
    SELECT TollBoothId
    FROM Input1
    WHERE TollBoothId > 100
    
    --Using compatibility level 1.0 or 1.1
    SELECT TollBoothId
    FROM Input1 PARTITION BY PartitionId
    WHERE TollBoothId > 100

Den här frågan är ett enkelt filter. Därför behöver vi inte bekymra oss om partitionering av indata som skickas till händelsehubben. Observera att jobb med kompatibilitetsnivå före 1.2 måste innehålla PARTITION BY PartitionId-satsen , så det uppfyller krav nr 2 från tidigare. För utdata måste vi konfigurera händelsehubbens utdata i jobbet så att partitionsnyckeln är inställd på PartitionId. En sista kontroll är att se till att antalet indatapartitioner är lika med antalet utdatapartitioner.

Fråga med en grupperingsnyckel

  • Indata: Händelsehubb med åtta partitioner
  • Utdata: Blob Storage

Fråga:

    --Using compatibility level 1.2 or above
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId
    
    --Using compatibility level 1.0 or 1.1
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

Den här frågan har en grupperingsnyckel. Därför måste händelserna grupperade tillsammans skickas till samma Event Hubs-partition. Eftersom vi i det här exemplet grupperar efter TollBoothID bör vi vara säkra på att TollBoothID det används som partitionsnyckel när händelserna skickas till Event Hubs. I Azure Stream Analytics kan du sedan använda PARTITION BY PartitionId för att ärva från det här partitionsschemat och aktivera fullständig parallellisering. Eftersom utdata är bloblagring behöver vi inte bekymra oss om att konfigurera ett partitionsnyckelvärde enligt krav nr 4.

Exempel på scenarier som inte är* pinsamt parallella

I föregående avsnitt beskrev artikeln några pinsamt parallella scenarier. I det här avsnittet får du lära dig om scenarier som inte uppfyller alla krav för att vara pinsamt parallella.

Felmatchat partitionsantal

  • Indata: En händelsehubb med åtta partitioner
  • Utdata: En händelsehubb med 32 partitioner

Om antalet indatapartitioner inte matchar antalet utdatapartitioner är topologin inte pinsamt parallell oavsett fråga. Men vi kan fortfarande få en viss nivå av parallellisering.

Fråga med icke-partitionerade utdata

  • Indata: En händelsehubb med åtta partitioner
  • Utdata: Power BI

Power BI-utdata stöder för närvarande inte partitionering. Därför är det här scenariot inte pinsamt parallellt.

Flerstegsfråga med olika PARTITION BY-värden

  • Indata: Händelsehubb med åtta partitioner
  • Utdata: Händelsehubb med åtta partitioner
  • Kompatibilitetsnivå: 1.0 eller 1.1

Fråga:

    WITH Step1 AS (
    SELECT COUNT(*) AS Count, TollBoothId, PartitionId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1 Partition By TollBoothId
    GROUP BY TumblingWindow(minute, 3), TollBoothId

Som du ser använder det andra steget TollBoothId som partitioneringsnyckel. Det här steget är inte detsamma som det första steget, och det kräver därför att vi gör en blandning.

Flerstegsfråga med olika PARTITION BY-värden

  • Indata: Händelsehubb med åtta partitioner ("Partitionsnyckelkolumn" har inte angetts, standardvärdet "PartitionId")
  • Utdata: Händelsehubb med åtta partitioner ("Partitionsnyckelkolumn" måste anges för att använda "TollBoothId")
  • Kompatibilitetsnivå – 1,2 eller högre

Fråga:

    WITH Step1 AS (
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute, 3), TollBoothId

Kompatibilitetsnivå 1.2 eller senare möjliggör parallell frågekörning som standard. Till exempel partitioneras frågan från föregående avsnitt så länge kolumnen "TollBoothId" anges som indatapartitionsnyckel. PARTITION BY PartitionId-satsen krävs inte.

Beräkna de maximala strömningsenheterna för ett jobb

Det totala antalet strömmande enheter som kan användas av ett Stream Analytics-jobb beror på antalet steg i frågan som definierats för jobbet och antalet partitioner för varje steg.

Steg i en fråga

En fråga kan ha ett eller flera steg. Varje steg är en underfråga som definieras av nyckelordet WITH . Frågan som ligger utanför nyckelordet WITH (endast en fråga) räknas också som ett steg, till exempel SELECT-instruktioneni följande fråga:

Fråga:

    WITH Step1 AS (
        SELECT COUNT(*) AS Count, TollBoothId
        FROM Input1 Partition By PartitionId
        GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )
    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute,3), TollBoothId

Den här frågan har två steg.

Kommentar

Den här frågan beskrivs mer detaljerat senare i artikeln.

Partition ett steg

Partitionering av ett steg kräver följande villkor:

  • Indatakällan måste partitioneras.
  • SELECT-instruktionen för frågan måste läsas från en partitionerad indatakälla.
  • Frågan i steget måste ha nyckelordet PARTITION BY .

När en fråga partitioneras bearbetas och aggregeras indatahändelserna i separata partitionsgrupper, och utdatahändelser genereras för var och en av grupperna. Om du vill ha en kombinerad aggregering måste du skapa ett andra icke-partitionerat steg för att aggregera.

Beräkna maximalt antal strömmande enheter för ett jobb

Alla icke-partitionerade steg tillsammans kan skala upp till en strömningsenhet (SU V2s) för ett Stream Analytics-jobb. Dessutom kan du lägga till en SU V2 för varje partition i ett partitionerat steg. Du kan se några exempel i följande tabell.

Fråga Maximalt antal SUS:er för jobbet
  • Frågan innehåller ett steg.
  • Steget är inte partitionerat.
1 SU V2
  • Indataströmmen partitioneras med 16.
  • Frågan innehåller ett steg.
  • Steget är partitionerat.
16 SU V2 (1 * 16 partitioner)
  • Frågan innehåller två steg.
  • Inget av stegen partitioneras.
1 SU V2
  • Indataströmmen partitioneras med 3.
  • Frågan innehåller två steg. Indatasteget partitioneras och det andra steget är inte det.
  • SELECT-instruktionen läser från de partitionerade indata.
4 SU V2s (3 för partitionerade steg + 1 för icke-partitionerade steg

Exempel på skalning

Följande fråga beräknar antalet bilar inom ett treminutersfönster som går genom en avgiftsbelagd station som har tre vägtullar. Den här frågan kan skalas upp till en SU V2.

    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

Om du vill använda fler SU:er för frågan måste både indataströmmen och frågan partitioneras. Eftersom dataströmspartitionen är inställd på 3 kan följande ändrade fråga skalas upp till 3 SU V2:or:

    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

När en fråga partitioneras bearbetas och aggregeras indatahändelserna i separata partitionsgrupper. Utdatahändelser genereras också för var och en av grupperna. Partitionering kan orsaka oväntade resultat när fältet GROUP BY inte är partitionsnyckeln i indataströmmen. Till exempel är fältet TollBoothId i föregående fråga inte partitionsnyckeln för Input1. Resultatet är att data från TollBooth #1 kan spridas i flera partitioner.

Var och en av Indata1-partitionerna bearbetas separat av Stream Analytics. Därför skapas flera poster av antalet bilar för samma vägtull i samma rullande fönster. Om indatapartitionsnyckeln inte kan ändras kan det här problemet åtgärdas genom att lägga till ett icke-partitionssteg för att aggregera värden mellan partitioner, som i följande exempel:

    WITH Step1 AS (
        SELECT COUNT(*) AS Count, TollBoothId
        FROM Input1 Partition By PartitionId
        GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute, 3), TollBoothId

Den här frågan kan skalas till 4 SU V2s.

Kommentar

Om du ansluter till två strömmar kontrollerar du att strömmarna partitioneras av partitionsnyckeln i kolumnen som du använder för att skapa kopplingarna. Kontrollera också att du har samma antal partitioner i båda strömmarna.

Uppnå högre dataflöden i stor skala

Ett pinsamt parallellt jobb är nödvändigt men inte tillräckligt för att upprätthålla ett högre dataflöde i stor skala. Varje lagringssystem och dess motsvarande Stream Analytics-utdata har variationer på hur du uppnår bästa möjliga skrivdataflöde. Precis som med alla scenarier i stor skala finns det vissa utmaningar som kan lösas med hjälp av rätt konfigurationer. Det här avsnittet beskriver konfigurationer för några vanliga utdata och innehåller exempel för att upprätthålla inmatningshastigheter på 1 K, 5 K och 10 K händelser per sekund.

Följande observationer använder ett Stream Analytics-jobb med tillståndslös (genomströmning) fråga, en grundläggande JavaScript UDF som skriver till Event Hubs, Azure SQL eller Azure Cosmos DB.

Event Hubs

Inmatningshastighet (händelser per sekund) Enheter för strömning Utdataresurser
1 K 1/3 2 TU
5 K 1 6 TU
10 K 2 10 TU

Event Hubs-lösningen skalas linjärt när det gäller strömningsenheter (SU) och dataflöde, vilket gör den till det mest effektiva och högpresterande sättet att analysera och strömma data från Stream Analytics. Jobb kan skalas upp till 66 SU V2s, vilket ungefär innebär bearbetning av upp till 400 MB/s eller 38 biljoner händelser per dag.

Azure SQL

Inmatningshastighet (händelser per sekund) Enheter för strömning Utdataresurser
1 K 2/3 S3
5 K 3 P4
10 K 6 P6

Azure SQL har stöd för att skriva parallellt, med namnet Ärv partitionering, men det är inte aktiverat som standard. Att aktivera ärvningspartitionering, tillsammans med en helt parallell fråga, kanske inte räcker för att uppnå högre dataflöden. SQL-skrivdataflöden beror avsevärt på databaskonfigurationen och tabellschemat. Artikeln SQL-utdataprestanda innehåller mer information om de parametrar som kan maximera ditt skrivdataflöde. Som du ser i artikeln om Azure Stream Analytics-utdata till Azure SQL Database skalas inte den här lösningen linjärt som en helt parallell pipeline utöver 8 partitioner och kan behöva partitioneras om innan SQL-utdata (se INTO). Premium-SKU:er behövs för att upprätthålla höga I/O-priser tillsammans med omkostnader från loggsäkerhetskopior som sker med några minuters mellanrum.

Azure Cosmos DB

Inmatningshastighet (händelser per sekund) Enheter för strömning Utdataresurser
1 K 2/3 20 K RU
5 K 4 60 K RU
10 K 8 120 K RU

Azure Cosmos DB-utdata från Stream Analytics har uppdaterats för att använda intern integrering under kompatibilitetsnivå 1.2. Kompatibilitetsnivå 1.2 möjliggör betydligt högre dataflöde och minskar RU-förbrukningen jämfört med 1.1, vilket är standardkompatibilitetsnivån för nya jobb. Lösningen använder Azure Cosmos DB-containrar partitionerade på /deviceId och resten av lösningen är identiskt konfigurerad.

Alla Azure-exempel för direktuppspelning i skala använder Event Hubs som indata som matas ut genom att belastningssimulera testklienter. Varje indatahändelse är ett JSON-dokument på 1 KB som enkelt översätter konfigurerade inmatningshastigheter till dataflödeshastigheter (1 MB/s, 5 MB/s och 10 MB/s). Händelser simulerar en IoT-enhet som skickar följande JSON-data (i förkortat format) för upp till 1 000 enheter:

{
    "eventId": "b81d241f-5187-40b0-ab2a-940faf9757c0",
    "complexData": {
        "moreData0": 51.3068118685458,
        "moreData22": 45.34076957651598
    },
    "value": 49.02278128887753,
    "deviceId": "contoso://device-id-1554",
    "type": "CO2",
    "createdAt": "2019-05-16T17:16:40.000003Z"
}

Kommentar

Konfigurationerna kan komma att ändras på grund av de olika komponenter som används i lösningen. Om du vill ha en mer exakt uppskattning kan du anpassa exemplen så att de passar ditt scenario.

Identifiera flaskhalsar

Använd fönstret Mått i ditt Azure Stream Analytics-jobb för att identifiera flaskhalsar i din pipeline. Granska Indata-/utdatahändelser för dataflöde och "Vattenstämpelfördröjning" eller Efterloggade händelser för att se om jobbet håller jämna steg med indatahastigheten. För Event Hubs-mått letar du efter begränsade begäranden och justerar tröskelvärdesenheterna i enlighet med detta. För Azure Cosmos DB-mått läser du Max förbrukade RU/s per partitionsnyckelintervall under Dataflöde för att säkerställa att partitionsnyckelintervallen används på ett enhetligt sätt. För Azure SQL DB övervakar du Logg-I/O och CPU.

Få hjälp

Om du vill ha mer hjälp kan du prova vår frågesida för Microsoft Q&A för Azure Stream Analytics.

Nästa steg