Queryparallelisatie gebruiken in Azure Stream Analytics

In dit artikel leest u hoe u kunt profiteren van parallelle uitvoering in Azure Stream Analytics. U leert hoe u Stream Analytics-taken schaalt door invoerpartities te configureren en de definitie van de analysequery af te stemmen.

Als vereiste wilt u misschien vertrouwd zijn met het begrip streaming-eenheid dat wordt beschreven in Begrijpen en streaming-eenheden aanpassen.

Wat zijn de onderdelen van een Stream Analytics-taak?

Een Stream Analytics-taakdefinitie bevat ten minste één streaming-invoer, een query en uitvoer. Invoer is de plaats waar de taak de gegevensstroom leest. De query wordt gebruikt om de gegevensinvoerstroom te transformeren en de uitvoer is waar de taakresultaten naartoe worden verzonden.

Partities in invoer en uitvoer

Met partitionering kunt u gegevens onderverdelen in subsets op basis van een partitiesleutel. Als uw invoer (bijvoorbeeld Event Hubs) wordt gepartitioneerd door een sleutel, raden we u aan de partitiesleutel op te geven bij het toevoegen van invoer aan uw Stream Analytics-taak. Het schalen van een Stream Analytics-taak maakt gebruik van partities in de invoer en uitvoer. Een Stream Analytics-taak kan verschillende partities parallel verbruiken en schrijven, waardoor de doorvoer toeneemt.

Invoerwaarden

Alle streaming-invoer van Azure Stream Analytics kan profiteren van partitionering: Event Hubs, IoT Hub, Blob Storage, Data Lake Storage Gen2.

Notitie

Voor compatibiliteitsniveau 1.2 en hoger moet de partitiesleutel worden ingesteld als invoereigenschap, zonder dat het sleutelwoord PARTITION BY in de query nodig is. Voor compatibiliteitsniveau 1.1 en lager moet de partitiesleutel worden gedefinieerd met het trefwoord PARTITION BY in de query.

Uitvoerwaarden

Wanneer u met Stream Analytics werkt, kunt u profiteren van partitionering in de uitvoer:

  • Azure Data Lake Storage
  • Azure Functions
  • Azure-tabel
  • Blob Storage (kan de partitiesleutel expliciet instellen)
  • Azure Cosmos DB (moet de partitiesleutel expliciet instellen)
  • Event Hubs (moet de partitiesleutel expliciet instellen)
  • IoT Hub (moet de partitiesleutel expliciet instellen)
  • Service Bus
  • SQL en Azure Synapse Analytics met optionele partitionering: zie meer informatie over de pagina Uitvoer naar Azure SQL Database.

Power BI biedt geen ondersteuning voor partitionering. U kunt de invoer echter nog steeds partitioneren zoals beschreven in deze sectie.

Zie de volgende artikelen voor meer informatie over partities:

Query

Om een taak parallel te laten zijn, moeten partitiesleutels worden uitgelijnd tussen alle invoer, alle querylogicastappen en alle uitvoer. De partitionering van querylogica wordt bepaald door de sleutels die worden gebruikt voor joins en aggregaties (GROUP BY). Deze laatste vereiste kan worden genegeerd als de querylogica niet is gesleuteld (projectie, filters, referentiële joins...).

  • Als een invoer en uitvoer worden gepartitioneerd op WarehouseId, en de query wordt ProductId gegroepeerd zonder WarehouseId, is de taak niet parallel.
  • Als twee invoer die moet worden gekoppeld, worden gepartitioneerd door verschillende partitiesleutels (WarehouseId en ProductId), is de taak niet parallel.
  • Als twee of meer onafhankelijke gegevensstromen zich in één taak bevinden, elk met een eigen partitiesleutel, is de taak niet parallel.

Alleen wanneer alle invoer-, uitvoer- en querystappen dezelfde sleutel gebruiken, is de taak parallel.

Gênant parallelle taken

Een gênant parallelle taak is het meest schaalbare scenario in Azure Stream Analytics. Er wordt één partitie van de invoer verbonden met één exemplaar van de query met één partitie van de uitvoer. Deze parallelle uitvoering heeft de volgende vereisten:

  • Als uw querylogica afhankelijk is van dezelfde sleutel die door hetzelfde query-exemplaar wordt verwerkt, moet u ervoor zorgen dat de gebeurtenissen naar dezelfde partitie van uw invoer gaan. Voor Event Hubs of IoT Hub betekent dit dat de gebeurtenisgegevens de PartitionKey-waarde moeten hebben ingesteld. U kunt ook gepartitioneerde afzenders gebruiken. Voor blobopslag betekent dit dat de gebeurtenissen naar dezelfde partitiemap worden verzonden. Een voorbeeld hiervan is een query-exemplaar waarmee gegevens per gebruikers-id worden geaggregeerd, waarbij de input Event Hub wordt gepartitioneerd met behulp van de userID als partitiesleutel. Als uw querylogica echter niet vereist dat dezelfde sleutel door hetzelfde query-exemplaar wordt verwerkt, kunt u deze vereiste negeren. Een voorbeeld van deze logica is een eenvoudige selectieprojectfilterquery.

  • De volgende stap bestaat uit het partitioneren van uw query. Voor taken met compatibiliteitsniveau 1.2 of hoger (aanbevolen) kan een aangepaste kolom worden opgegeven als partitiesleutel in de invoerinstellingen en wordt de taak automatisch parallel uitgevoerd. Voor taken met compatibiliteitsniveau 1.0 of 1.1 moet u PARTITION BY PartitionId gebruiken in alle stappen van uw query. Er zijn meerdere stappen toegestaan, maar ze moeten allemaal worden gepartitioneerd met dezelfde sleutel.

  • De meeste uitvoer die wordt ondersteund in Stream Analytics, kan profiteren van partitionering. Als u een uitvoertype gebruikt dat geen ondersteuning biedt voor partitionering van uw taak, is dit niet gênant parallel. Voor Event Hubs-uitvoer moet u ervoor zorgen dat de kolom Partitiesleutel is ingesteld op dezelfde partitiesleutel die in de query wordt gebruikt. Zie de uitvoersectie voor meer informatie.

  • Het aantal invoerpartities moet gelijk zijn aan het aantal uitvoerpartities. Blob Storage-uitvoer kan partities ondersteunen en het partitioneringsschema van de upstream-query overnemen. Wanneer een partitiesleutel voor Blob Storage is opgegeven, worden gegevens gepartitioneerd per invoerpartitie, waardoor het resultaat nog steeds volledig parallel is. Hier volgen voorbeelden van partitiewaarden die een volledig parallelle taak toestaan:

    • Acht event hub-invoerpartities en acht event hub-uitvoerpartities
    • Acht Event Hub-invoerpartities en blobopslaguitvoer
    • Acht Event Hub-invoerpartities en blobopslaguitvoer gepartitioneerd door een aangepast veld met willekeurige kardinaliteit
    • Acht blob-opslaginvoerpartities en blobopslaguitvoer
    • Acht blob storage-invoerpartities en acht event hub-uitvoerpartities

In de volgende secties worden enkele voorbeeldscenario's besproken die gênant parallel zijn.

Eenvoudige query

  • Invoer: Een Event Hub met acht partities
  • Uitvoer: Een Event Hub met acht partities ('Partitiesleutelkolom' moet worden ingesteld op gebruik PartitionId)

Query:

    --Using compatibility level 1.2 or above
    SELECT TollBoothId
    FROM Input1
    WHERE TollBoothId > 100
    
    --Using compatibility level 1.0 or 1.1
    SELECT TollBoothId
    FROM Input1 PARTITION BY PartitionId
    WHERE TollBoothId > 100

Deze query is een eenvoudig filter. Daarom hoeven we ons geen zorgen te maken over het partitioneren van de invoer die naar de Event Hub wordt verzonden. U ziet dat taken met compatibiliteitsniveau vóór 1.2 de PARTITION BY PartitionId-component moeten bevatten, zodat deze voldoet aan de vereiste #2 van eerder. Voor de uitvoer moeten we de Event Hub-uitvoer in de taak configureren om de partitiesleutel op PartitionId te laten instellen. Een laatste controle is om ervoor te zorgen dat het aantal invoerpartities gelijk is aan het aantal uitvoerpartities.

Query uitvoeren met een groeperingssleutel

  • Invoer: Event Hub met acht partities
  • Uitvoer: Blob Storage

Query:

    --Using compatibility level 1.2 or above
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId
    
    --Using compatibility level 1.0 or 1.1
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

Deze query heeft een groeperingssleutel. Daarom moeten de gebeurtenissen die zijn gegroepeerd, worden verzonden naar dezelfde Event Hubs-partitie. Aangezien we in dit voorbeeld op TollBoothID groeperen, moeten we ervoor zorgen dat TollBoothID deze wordt gebruikt als de partitiesleutel wanneer de gebeurtenissen naar Event Hubs worden verzonden. In Azure Stream Analytics kunt u PARTITION BY PartitionId gebruiken om dit partitieschema over te nemen en volledige parallellisatie in te schakelen. Omdat de uitvoer blobopslag is, hoeft u zich geen zorgen te maken over het configureren van een partitiesleutelwaarde, volgens vereiste #4.

Voorbeeld van scenario's die niet* gênant parallel zijn

In de vorige sectie heeft het artikel betrekking op een aantal gênante parallelle scenario's. In deze sectie leert u meer over scenario's die niet voldoen aan alle vereisten om gênant parallel te zijn.

Aantal niet-overeenkomende partities

  • Invoer: Een Event Hub met acht partities
  • Uitvoer: Een Event Hub met 32 partities

Als het aantal invoerpartities niet overeenkomt met het aantal uitvoerpartities, is de topologie niet gênant parallel, ongeacht de query. We kunnen echter nog steeds een niveau van parallelle uitvoering krijgen.

Query uitvoeren met niet-gepartitioneerde uitvoer

  • Invoer: Een Event Hub met acht partities
  • Uitvoer: Power BI

Power BI-uitvoer biedt momenteel geen ondersteuning voor partitionering. Daarom is dit scenario niet gênant parallel.

Query met meerdere stappen met verschillende PARTITION BY-waarden

  • Invoer: Event Hub met acht partities
  • Uitvoer: Event Hub met acht partities
  • Compatibiliteitsniveau: 1.0 of 1.1

Query:

    WITH Step1 AS (
    SELECT COUNT(*) AS Count, TollBoothId, PartitionId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1 Partition By TollBoothId
    GROUP BY TumblingWindow(minute, 3), TollBoothId

Zoals u kunt zien, gebruikt de tweede stap TollBoothId als de partitioneringssleutel. Deze stap is niet hetzelfde als de eerste stap en daarom moeten we een willekeurige volgorde uitvoeren.

Query met meerdere stappen met verschillende PARTITION BY-waarden

  • Invoer: Event Hub met acht partities ('Partitiesleutelkolom' niet ingesteld, standaard ingesteld op 'PartitionId')
  • Uitvoer: Event Hub met acht partities ('Partitiesleutelkolom' moet worden ingesteld op 'TollBoothId')
  • Compatibiliteitsniveau - 1.2 of hoger

Query:

    WITH Step1 AS (
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute, 3), TollBoothId

Compatibiliteitsniveau 1.2 of hoger zorgt ervoor dat parallelle query's standaard worden uitgevoerd. Query uit de vorige sectie wordt bijvoorbeeld gepartitioneerd zolang de kolom TollBoothId is ingesteld als invoerpartitiesleutel. PARTITION BY PartitionId-component is niet vereist.

De maximale streaming-eenheden van een taak berekenen

Het totale aantal streaming-eenheden dat door een Stream Analytics-taak kan worden gebruikt, is afhankelijk van het aantal stappen in de query die is gedefinieerd voor de taak en het aantal partities voor elke stap.

Stappen in een query

Een query kan een of meer stappen hebben. Elke stap is een subquery die is gedefinieerd door het trefwoord WITH . De query die zich buiten het trefwoord WITH bevindt (alleen één query) wordt ook geteld als een stap, zoals de SELECT-instructie in de volgende query:

Query:

    WITH Step1 AS (
        SELECT COUNT(*) AS Count, TollBoothId
        FROM Input1 Partition By PartitionId
        GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )
    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute,3), TollBoothId

Deze query heeft twee stappen.

Notitie

Deze query wordt verderop in het artikel uitgebreider besproken.

Een stap partitioneren

Voor het partitioneren van een stap zijn de volgende voorwaarden vereist:

  • De invoerbron moet worden gepartitioneerd.
  • De SELECT-instructie van de query moet worden gelezen uit een gepartitioneerde invoerbron.
  • De query in de stap moet het sleutelwoord PARTITION BY hebben.

Wanneer een query wordt gepartitioneerd, worden de invoergebeurtenissen verwerkt en samengevoegd in afzonderlijke partitiegroepen en worden uitvoergebeurtenissen gegenereerd voor elk van de groepen. Als u een gecombineerde aggregatie wilt, moet u een tweede niet-gepartitioneerde stap maken om te aggregeren.

Het maximum aantal streaming-eenheden voor een taak berekenen

Alle niet-gepartitioneerde stappen kunnen worden geschaald tot één streaming-eenheid (SU V2s) voor een Stream Analytics-taak. Daarnaast kunt u één SU V2 toevoegen voor elke partitie in een gepartitioneerde stap. In de volgende tabel ziet u enkele voorbeelden .

Query Maximum aantal RU's voor de taak
  • De query bevat één stap.
  • De stap is niet gepartitioneerd.
1 SU V2
  • De invoergegevensstroom wordt gepartitioneerd door 16.
  • De query bevat één stap.
  • De stap is gepartitioneerd.
16 SU V2 (1 * 16 partities)
  • De query bevat twee stappen.
  • Geen van de stappen wordt gepartitioneerd.
1 SU V2
  • De invoergegevensstroom wordt gepartitioneerd door 3.
  • De query bevat twee stappen. De invoerstap is gepartitioneerd en de tweede stap is niet.
  • De SELECT-instructie leest uit de gepartitioneerde invoer.
4 SU V2's (3 voor gepartitioneerde stappen + 1 voor niet-gepartitioneerde stappen

Voorbeelden van schalen

De volgende query berekent het aantal auto's binnen een venster van drie minuten dat door een tolstation met drie tolbooths gaat. Deze query kan worden geschaald tot één SU V2.

    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

Als u meer RU's voor de query wilt gebruiken, moeten zowel de invoergegevensstroom als de query worden gepartitioneerd. Omdat de gegevensstroompartitie is ingesteld op 3, kan de volgende gewijzigde query worden geschaald tot 3 SU V2's:

    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

Wanneer een query wordt gepartitioneerd, worden de invoergebeurtenissen verwerkt en samengevoegd in afzonderlijke partitiegroepen. Uitvoer gebeurtenissen worden ook gegenereerd voor elk van de groepen. Partitionering kan onverwachte resultaten veroorzaken wanneer het veld GROUP BY niet de partitiesleutel in de invoergegevensstroom is. Het veld TollBoothId in de vorige query is bijvoorbeeld niet de partitiesleutel van Input1. Het resultaat is dat de gegevens van TollBooth #1 in meerdere partities kunnen worden verspreid.

Elk van de Input1-partities wordt afzonderlijk verwerkt door Stream Analytics. Als gevolg hiervan worden meerdere records van het aantal auto's voor dezelfde tolbooth in hetzelfde Tumblingvenster gemaakt. Als de invoerpartitiesleutel niet kan worden gewijzigd, kan dit probleem worden opgelost door een niet-partitiestap toe te voegen om waarden over partities samen te voegen, zoals in het volgende voorbeeld:

    WITH Step1 AS (
        SELECT COUNT(*) AS Count, TollBoothId
        FROM Input1 Partition By PartitionId
        GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute, 3), TollBoothId

Deze query kan worden geschaald naar 4 SU V2's.

Notitie

Als u twee streams koppelt, moet u ervoor zorgen dat de streams worden gepartitioneerd door de partitiesleutel van de kolom die u gebruikt om de joins te maken. Zorg er ook voor dat u hetzelfde aantal partities in beide streams hebt.

Hogere doorvoer op schaal bereiken

Een gênant parallelle taak is noodzakelijk, maar niet voldoende om een hogere doorvoer op schaal te behouden. Elk opslagsysteem en de bijbehorende Stream Analytics-uitvoer hebben variaties op het bereiken van de best mogelijke schrijfdoorvoer. Net als bij elk scenario op schaal zijn er enkele uitdagingen die kunnen worden opgelost met behulp van de juiste configuraties. In deze sectie worden configuraties voor enkele algemene uitvoer besproken en worden voorbeelden geboden voor het ondersteunen van opnamesnelheden van 1 K, 5 K en 10 K per seconde.

De volgende waarnemingen gebruiken een Stream Analytics-taak met stateless (passthrough)-query, een eenvoudige JavaScript UDF die schrijft naar Event Hubs, Azure SQL of Azure Cosmos DB.

Event Hubs

Opnamesnelheid (gebeurtenissen per seconde) Streamingeenheden Uitvoerbronnen
1 K 1/3 2 TU
5 K 1 6 TU
10k 2 10 TU

De Event Hubs-oplossing wordt lineair geschaald in termen van streaming-eenheden (SU) en doorvoer, waardoor het de meest efficiënte en performante manier is om gegevens uit Stream Analytics te analyseren en te streamen. Taken kunnen worden geschaald tot 66 SU V2's, wat ongeveer vertaalt in verwerking tot 400 MB/s, of 38 biljoen gebeurtenissen per dag.

Azure SQL

Opnamesnelheid (gebeurtenissen per seconde) Streamingeenheden Uitvoerbronnen
1 K 2/3 S3
5 K 3 P4
10k 6 P6

Azure SQL biedt ondersteuning voor het parallel schrijven, ook wel Partitionering overnemen genoemd, maar deze functie is niet standaard ingeschakeld. Het inschakelen van Partitionering overnemen, samen met een volledig parallelle query, is echter mogelijk niet voldoende om hogere doorvoer te bereiken. SQL-schrijfdoorvoer is aanzienlijk afhankelijk van uw databaseconfiguratie en tabelschema. In het artikel SQL Output Performance vindt u meer informatie over de parameters waarmee de schrijfdoorvoer kan worden gemaximaliseerd. Zoals vermeld in het artikel azure Stream Analytics-uitvoer naar Azure SQL Database , wordt deze oplossing niet lineair geschaald als een volledig parallelle pijplijn buiten 8 partities en moet deze mogelijk opnieuw worden gepartitioneerde voordat SQL-uitvoer (zie INTO) wordt uitgevoerd. Premium-SKU's zijn nodig om hoge I/O-tarieven te ondersteunen, samen met overhead van logboekback-ups die elke paar minuten plaatsvinden.

Azure Cosmos DB

Opnamesnelheid (gebeurtenissen per seconde) Streamingeenheden Uitvoerbronnen
1 K 2/3 20 K RU
5 K 4 60 K RU
10k 8 120 K RU

Azure Cosmos DB-uitvoer van Stream Analytics is bijgewerkt voor gebruik van systeemeigen integratie onder compatibiliteitsniveau 1.2. Compatibiliteitsniveau 1.2 maakt aanzienlijk hogere doorvoer mogelijk en vermindert het RU-verbruik in vergelijking met 1.1. Dit is het standaardcompatibiliteitsniveau voor nieuwe taken. De oplossing maakt gebruik van Azure Cosmos DB-containers die zijn gepartitioneerd op /deviceId en de rest van de oplossing is identiek geconfigureerd.

Alle streaming-voorbeelden op schaal van Azure gebruiken Event Hubs als invoer die wordt ingevoerd door het simuleren van testclients. Elke invoergebeurtenis is een JSON-document van 1 KB, waarmee geconfigureerde opnamesnelheden eenvoudig worden omgezet in doorvoersnelheden (1 MB/s, 5 MB/s en 10 MB/s). Gebeurtenissen simuleren een IoT-apparaat dat de volgende JSON-gegevens verzendt (in een verkorte vorm) voor maximaal 1000 apparaten:

{
    "eventId": "b81d241f-5187-40b0-ab2a-940faf9757c0",
    "complexData": {
        "moreData0": 51.3068118685458,
        "moreData22": 45.34076957651598
    },
    "value": 49.02278128887753,
    "deviceId": "contoso://device-id-1554",
    "type": "CO2",
    "createdAt": "2019-05-16T17:16:40.000003Z"
}

Notitie

De configuraties kunnen worden gewijzigd vanwege de verschillende onderdelen die in de oplossing worden gebruikt. Voor een nauwkeurigere schatting kunt u de voorbeelden aanpassen aan uw scenario.

Knelpunten identificeren

Gebruik het deelvenster Metrische gegevens in uw Azure Stream Analytics-taak om knelpunten in uw pijplijn te identificeren. Bekijk invoer-/uitvoergebeurtenissen voor doorvoer en 'Watermerkvertraging' of 'Backloggebeurtenissen' om te zien of de taak de invoersnelheid bijhoudt. Voor metrische gegevens van Event Hubs zoekt u naar vertraagde aanvragen en past u de drempelwaarde-eenheden dienovereenkomstig aan. Raadpleeg voor metrische gegevens van Azure Cosmos DB het maximum aantal verbruikte RU/s per partitiesleutelbereik onder Doorvoer om ervoor te zorgen dat uw partitiesleutelbereiken uniform worden gebruikt. Voor Azure SQL DB bewaakt u logboek-IO en CPU.

Hulp vragen

Probeer onze microsoft Q&A-vragenpagina voor Azure Stream Analytics voor meer hulp.

Volgende stappen