Lekérdezés-párhuzamosítás használata az Azure Stream Analyticsben

Ez a cikk bemutatja, hogyan használhatja ki a párhuzamosítás előnyeit az Azure Stream Analyticsben. Megtudhatja, hogyan méretezheti a Stream Analytics-feladatokat a bemeneti partíciók konfigurálásával és az elemzési lekérdezés definíciójának finomhangolásával.

Előfeltételként érdemes lehet megismerkednie a streamelési egység fogalmával, amely a streamelési egységek értelmezésében és módosításában szerepel.

Melyek a Stream Analytics-munka részei?

A Stream Analytics-feladat definíciója legalább egy stream bemenetet, lekérdezést és kimenetet tartalmaz. A bemenetek azok, amelyekből a feladat beolvassa az adatfolyamot. A lekérdezés az adatbemeneti adatfolyam átalakítására szolgál, és a kimenet az, ahová a feladat elküldi a feladat eredményeit.

Partíciók bemenetekben és kimenetekben

A particionálás lehetővé teszi az adatok részhalmazokra való felosztását egy partíciókulcs alapján. Ha a bemenetet (például az Event Hubsot) egy kulcs particionálta, javasoljuk, hogy adja meg a partíciókulcsot, amikor bemenetet ad hozzá a Stream Analytics-feladathoz. A Stream Analytics-feladatok méretezése kihasználja a bemeneti és kimeneti partíciók előnyeit. A Stream Analytics-feladatok különböző partíciókat használhatnak és írhatnak párhuzamosan, ami növeli az átviteli sebességet.

Bevitelek

Minden Azure Stream Analytics-streambemenet kihasználhatja a particionálás előnyeit: Event Hubs, IoT Hub, Blob Storage, Data Lake Storage Gen2.

Feljegyzés

Az 1.2-es és újabb kompatibilitási szinthez állítsa be a partíciókulcsot bemeneti tulajdonságként, és nincs szükség a PARTITION BY kulcsszóra a lekérdezésben. Az 1.1-es és újabb kompatibilitási szinthez adja meg a partíciókulcsot a PARTÍCIÓ BY kulcsszóval a lekérdezésben.

Kimenetek

A Stream Analytics használatakor használja ki a particionálás előnyeit a következő kimenetekben:

  • Azure Data Lake Storage
  • Azure Functions
  • Azure-tábla
  • Blob Storage (kifejezetten állítsa be a Partition Key-t)
  • Azure Cosmos DB (állítsa be explicit módon a partíció kulcsot)
  • Event Hubs (állítsa be kifejezetten a partíciókulcsot)
  • IoT Hub (állítsa be explicit módon a partíciókulcsot)
  • Service Bus
  • SQL és Azure Synapse Analytics opcionális particionálással: további információk az Azure SQL Database-beli kimenetről.

A Power BI nem támogatja a particionálást. A bemenetet azonban továbbra is particionálásra használhatja az ebben a szakaszban leírtak szerint.

A partíciókkal kapcsolatos további információkért tekintse meg a következő cikkeket:

Lekérdezés

Ahhoz, hogy egy feladat párhuzamos legyen, a partíciókulcsokat az összes bemenethez, az összes lekérdezési logikai lépéshez és az összes kimenethez kell igazítani. A lekérdezési logikai particionálást az illesztésekhez és aggregációkhoz (GROUP BY) használt kulcsok határozzák meg. Az utolsó követelmény figyelmen kívül hagyható, ha a lekérdezési logika nincs kulcsban (kivetítés, szűrők, hivatkozási illesztések...).

  • Ha egy bemenetet és egy kimenetet WarehouseId alapján particionálnak, és a lekérdezés ProductId alapján csoportosít WarehouseId nélkül, a feladat nem párhuzamos.
  • Ha két csatlakoztatni kívánt bemenetet különböző partíciókulcsok (WarehouseId és ProductId) particionálnak, a feladat nem párhuzamos.
  • Ha egy feladat két vagy több független adatfolyamot tartalmaz, mindegyiknek saját partíciókulcsa van, a feladat nem párhuzamos.

A feladat csak akkor párhuzamos, ha minden bemenet, kimenet és lekérdezési lépés ugyanazt a kulcsot használja.

Kínosan párhuzamos feladatok

Az nyilvánvalóan párhuzamos feladat az Azure Stream Analytics legskálázhatóbb forgatókönyve. A bemenet egy partícióját csatlakoztatja a lekérdezés egy példányához a kimenet egyik partíciójára. Ez a párhuzamosság a következő követelményekkel rendelkezik:

  • Ha a lekérdezési logika attól függ, hogy ugyanazt a kulcsot dolgozza fel ugyanaz a lekérdezéspéldány, győződjön meg arról, hogy az események a bemenet ugyanazon partíciójára kerülnek. Az Event Hubs vagy az IoT Hub esetében ez azt jelenti, hogy az eseményadatoknak a PartitionKey értékkészlettel kell rendelkezniük. Másik lehetőségként particionált feladókat is használhat. Blob Storage esetén ez azt jelenti, hogy az események ugyanabba a partíciómappába kerülnek. Ilyen például egy olyan lekérdezéspéldány, amely felhasználóazonosítónként összesíti az adatokat, ahol a bemeneti eseményközpont particionálása a userID használatával történik partíciókulcsként. Azonban, ha a lekérdezési logikájának nem szükséges, hogy ugyanaz a kulcs ugyanazzal a lekérdezés példánnyal legyen feldolgozása, figyelmen kívül hagyhatja ezt a követelményt. Erre a logikára példa egy egyszerű select-project-filter lekérdezés.

  • Ezután particionálja a lekérdezést. Az 1.2-es vagy újabb kompatibilitási szinttel rendelkező feladatok esetén (ajánlott) adjon meg egy egyéni oszlopot partíciókulcsként a bemeneti beállításokban, és a feladat automatikusan párhuzamos lesz. Az 1.0-s vagy 1.1-es kompatibilitási szinttel rendelkező feladatokhoz használja a PARTITION BY PartitionId parancsot a lekérdezés minden lépésében. Több lépés is lehet, de mindegyiknek ugyanazzal a kulccsal kell particionálást végeznie.

  • A Stream Analyticsben támogatott kimenetek többsége kihasználhatja a particionálás előnyeit. Ha olyan kimeneti típust használ, amely nem támogatja a particionálást, a feladat nem kínosan párhuzamos. Event Hubs-kimenetek esetén győződjön meg arról, hogy a partíciókulcs oszlopa a lekérdezésben használt partíciókulcsra van beállítva. További információkért lásd a kimeneti szakaszt.

  • A bemeneti partíciók számának meg kell egyezik a kimeneti partíciók számával. A Blob Storage-kimenet támogatja a partíciókat, és örökli a felsőbb rétegbeli lekérdezés particionálási sémáját. Amikor megad egy partíciókulcsot a Blob Storage-hoz, az adatok particionálása bemeneti partíciónként történik, így az eredmény továbbra is teljesen párhuzamos lesz. Íme néhány példa a partícióértékekre, amelyek teljes mértékben párhuzamos feladatokat engedélyeznek:

    • Nyolc eseményközpont bemeneti partíciója és nyolc eseményközpont kimeneti partíciója
    • Nyolc eseményközpont bemeneti partíciója és blobtároló kimenete
    • Nyolc eseményközpont bemeneti partíciója és egy egyéni mező által particionált Blob Storage-kimenet tetszőleges számossággal
    • Nyolc blobtároló bemeneti partíciója és blobtároló kimenete
    • Nyolc blobtároló bemeneti partíciója és nyolc eseményközpont kimeneti partíciója

Az alábbi szakaszok néhány olyan példaforgatókönyvet mutatnak be, amelyek kínosan párhuzamosak.

Egyszerű lekérdezés

  • Bemenet: Eseményközpont nyolc partícióval
  • Kimenet: Egy nyolc partícióval rendelkező eseményközpont ("A partíciókulcs oszlopnak be kell lennie állítva" a használathoz PartitionId)

Lekérdezés:

    --Using compatibility level 1.2 or above
    SELECT TollBoothId
    FROM Input1
    WHERE TollBoothId > 100
    
    --Using compatibility level 1.0 or 1.1
    SELECT TollBoothId
    FROM Input1 PARTITION BY PartitionId
    WHERE TollBoothId > 100

Ez a lekérdezés egy egyszerű szűrő. Ezért nem kell aggódnia az eseményközpontba küldött bemenet particionálása miatt. Figyelje meg, hogy az 1.2 előtti kompatibilitási szinttel rendelkező feladatoknak tartalmazniuk kell a PARTITION BY PartitionId záradékot, így teljesíti a korábbi 2. követelményt. A kimenethez konfigurálnia kell az eseményközpont kimenetét a feladatban, hogy a partíciókulcs a PartitionId értékre legyen állítva. Az utolsó ellenőrzés annak ellenőrzése, hogy a bemeneti partíciók száma megegyezik-e a kimeneti partíciók számával.

Lekérdezés csoportosítási kulccsal

  • Bemenet: Eseményközpont nyolc partícióval
  • Kimenet: Blob-tárolás

Lekérdezés:

    --Using compatibility level 1.2 or above
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId
    
    --Using compatibility level 1.0 or 1.1
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

Ez a lekérdezés egy csoportosítási kulccsal rendelkezik. Ezért a csoportosított eseményeket ugyanarra az Event Hubs-partícióra kell elküldeni. Mivel ebben a példában a TollBoothID szerint csoportosítja az adatokat, győződjön meg arról, hogy TollBoothID ezt használja partíciókulcsként, amikor a rendszer elküldi az eseményeket az Event Hubsnak. Ezután az Azure Stream Analyticsben a PARTITION BY PartitionId használatával örökölheti ezt a partíciós sémát, és engedélyezheti a teljes párhuzamosítást. Mivel a kimenet blobtároló, nem kell aggódnia egy partíciókulcs-érték konfigurálásával, a 4. követelménynek megfelelően.

Példa olyan forgatókönyvekre, amelyek nem* kínosan párhuzamosak

Az előző szakaszban a cikk néhány kínosan párhuzamos forgatókönyvet tárgyalt. Ebben a szakaszban olyan forgatókönyveket ismerhet meg, amelyek nem felelnek meg az összes követelménynek, hogy kínosan párhuzamosak legyenek.

Nem egyező partíciók száma

  • Bemenet: Eseményközpont nyolc partícióval
  • Kimenet: Eseményközpont 32 partícióval

Ha a bemeneti partíciók száma nem egyezik meg a kimeneti partíciók számával, a topológia nem kínosan párhuzamos a lekérdezéstől függetlenül. Azonban továbbra is elérhet némi párhuzamosítást.

Lekérdezés nem particionált kimenettel

  • Bemenet: Eseményközpont nyolc partícióval
  • Kimenet: Power BI

A Power BI-kimenet jelenleg nem támogatja a particionálást. Ezért ez a forgatókönyv nem kínosan párhuzamos.

Többlépéses lekérdezés különböző PARTITION BY értékekkel

  • Bemenet: Eseményközpont nyolc partícióval
  • Kimenet: Eseményközpont nyolc partícióval
  • Kompatibilitási szint: 1.0 vagy 1.1

Lekérdezés:

    WITH Step1 AS (
    SELECT COUNT(*) AS Count, TollBoothId, PartitionId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1 Partition By TollBoothId
    GROUP BY TumblingWindow(minute, 3), TollBoothId

Mint látható, a második lépés a TollBoothId azonosítót használja particionálási kulcsként. Ez a lépés nem ugyanaz, mint az első lépés, ezért shuffle-t igényel.

Többlépéses lekérdezés különböző PARTITION BY értékekkel

  • Bemenet: Eseményközpont nyolc partícióval ("Partíciókulcs oszlopa" nincs beállítva, alapértelmezés szerint "PartitionId")
  • Kimenet: Nyolc partícióval rendelkező eseményközpont ("A partíciókulcs oszlopát a "TollBoothId" használatára kell beállítani)
  • Kompatibilitási szint – 1,2 vagy újabb

Lekérdezés:

    WITH Step1 AS (
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute, 3), TollBoothId

Az 1.2-es vagy újabb kompatibilitási szint alapértelmezés szerint lehetővé teszi a párhuzamos lekérdezések végrehajtását. Az előző szakasz lekérdezése például particionálásra kerül, ha a "TollBoothId" oszlop bemeneti partíciókulcsként van beállítva. A PARTITION BY PartitionId záradék nem szükséges.

Feladat maximális streamelési egységeinek kiszámítása

A Stream Analytics-feladatok által használható streamelési egységek teljes száma a feladathoz definiált lekérdezés lépéseinek számától és az egyes lépések partícióinak számától függ.

A lekérdezés lépései

A lekérdezések egy vagy több lépéssel is rendelkezhetnek. Minden lépés egy, a WITH kulcsszó által definiált részquery. A WITH kulcsszón kívül eső lekérdezés (csak egy lekérdezés) is lépésnek számít, például a SELECT utasítást a következő lekérdezésben:

Lekérdezés:

    WITH Step1 AS (
        SELECT COUNT(*) AS Count, TollBoothId
        FROM Input1 Partition By PartitionId
        GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )
    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute,3), TollBoothId

Ez a lekérdezés két lépésből áll.

Feljegyzés

Ezt a lekérdezést a cikk későbbi részében részletesebben tárgyaljuk.

Lépés particionálása

A lépés particionálásához a következő feltételek szükségesek:

  • A bemeneti forrást particionáltnak kell lennie.
  • A lekérdezés SELECT utasításának particionált bemeneti forrásból kell olvasnia.
  • A lépésen belüli lekérdezésnek a PARTITION BY kulcsszóval kell rendelkeznie.

A lekérdezés particionálásakor a bemeneti események külön partíciócsoportokban lesznek feldolgozva és összesítve, a kimeneti események pedig mindegyik csoporthoz létre lesznek hozva. Ha kombinált aggregátumot szeretne, létre kell hoznia egy második nem particionált lépést az összesítéshez.

Feladat maximális streamelési egységeinek kiszámítása

Az összes nem particionált lépés együtt skálázható fel egy streamelési egységre (SU V2s) egy Stream Analytics-feladathoz. Emellett egy particionált lépésben minden partícióhoz hozzáadhat egy SU V2-t. Az alábbi táblázatban néhány példát láthat.

Lekérdezés A feladathoz tartozó termékváltozatok maximális száma
  • A lekérdezés egy lépést tartalmaz.
  • A lépés nincs particionálva.
1 SU V2
  • A bemeneti adatfolyam particionálása 16-tal történik.
  • A lekérdezés egy lépést tartalmaz.
  • A lépés particionálása történik.
16 SU V2 (1 * 16 partíciók)
  • A lekérdezés két lépést tartalmaz.
  • Egyik lépés sem particionált.
1 SU V2
  • A bemeneti adatfolyam particionálása 3-ra történik.
  • A lekérdezés két lépést tartalmaz. A bemeneti lépés particionálva van, a második lépés pedig nem.
  • A SELECT utasítás a particionált bemenetből olvas be.
4 SU V2 (3 particionált lépésekhez + 1 nem particionált lépésekhez)

Példák a skálázásra

Az alábbi lekérdezés kiszámítja az autók számát egy háromperces időkeretben, amelyek egy három útdíjkapuval rendelkező útdíj-állomáson haladnak át. Ezt a lekérdezést egy SU V2-re skálázhatja.

    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

Ha több SU-t szeretne használni a lekérdezéshez, ossza fel a bemeneti adatfolyamot és a lekérdezést is külön partíciókra. Mivel az adatfolyam partíciója 3-ra van állítva, a következő módosított lekérdezés 3 SU V2-re skálázható:

    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

Lekérdezés particionálásakor a bemeneti események feldolgozása és összesítése külön partíciócsoportokban történik. A lekérdezés kimeneti eseményeket hoz létre az egyes csoportokhoz. A particionálás váratlan eredményeket okozhat, ha a GROUP BY mező nem a bemeneti adatfolyam partíciókulcsa. Az előző lekérdezés TollBoothId mezője például nem az Input1 partíciókulcsa. Ennek az az eredménye, hogy a TollBooth #1 adatai több partíción is eloszthatók.

A Stream Analytics minden egyes Input1-partíciót külön dolgoz fel. Ennek eredményeképpen a lekérdezés több bejegyzést hoz létre az autók számáról ugyanazon útdíjfizető kapuhoz ugyanabban az időablakban. Ha nem tudja módosítani a bemeneti partíciókulcsot, a probléma megoldásához adjon hozzá egy nempartíciós lépést a partíciók értékeinek összesítéséhez, ahogyan az alábbi példában is látható:

    WITH Step1 AS (
        SELECT COUNT(*) AS Count, TollBoothId
        FROM Input1 Partition By PartitionId
        GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute, 3), TollBoothId

Ezt a lekérdezést 4 SU V2-re skálázhatja.

Feljegyzés

Ha két streamhez csatlakozik, győződjön meg arról, hogy a streamek particionálása az illesztések létrehozásához használt oszlop partíciókulcsával történik. Győződjön meg arról is, hogy mindkét streamben ugyanannyi partíció található.

Nagyobb átviteli sebesség elérése nagy méretekben

Egy kínosan párhuzamos feladat szükséges, de nem elegendő a nagyobb átviteli sebesség nagy léptékű fenntartásához. Minden tárolórendszer és annak megfelelő Stream Analytics-kimenete eltérően rendelkezik a lehető legjobb írási átviteli sebesség elérésének módjával. Mint minden nagy léptékű forgatókönyv esetében, egyes kihívásokhoz a megfelelő konfigurációk szükségesek. Ez a szakasz néhány gyakori kimenet konfigurációit ismerteti, és mintákat tartalmaz az 1 K, 5 K és 10 K események másodpercenkénti betöltési sebességének fenntartásához.

Az alábbi megfigyelések egy Stream Analytics-feladatot használnak állapot nélküli (átmenő) lekérdezéssel, amely egy alapszintű JavaScript-felhasználó által definiált függvény (UDF), amely az Event Hubsba, az Azure SQL-be vagy az Azure Cosmos DB-be ír.

Event Hubs

Betöltési sebesség (események másodpercenként) Streaming egységek Kimeneti erőforrások
1 kelvin 1/3 2 TU
5 kilométer 1 6 TU
10 ezer 2 10 TU

Az Event Hubs-megoldás lineárisan skálázható a streamelési egységek (SU) és az átviteli sebesség szempontjából, így ez a leghatékonyabb és leghatékonyabb módja az adatok Stream Analyticsből való elemzésének és streamelésének. A feladatokat akár 66 SU V2-esre is skálázhatja, ami nagyjából 400 MB/s vagy 38 billió esemény feldolgozását jelenti naponta.

Azure SQL

Betöltési sebesség (események másodpercenként) Streaming egységek Kimeneti erőforrások
1 kelvin 2/3 S3
5 kilométer 3 P4
10 ezer 6 P6

Az Azure SQL támogatja a párhuzamos írást, a particionálás öröklését, de alapértelmezés szerint nincs engedélyezve. Előfordulhat azonban, hogy a particionálás öröklésének engedélyezése egy teljesen párhuzamos lekérdezéssel együtt nem elegendő a magasabb átviteli sebesség eléréséhez. Az SQL írási átviteli sebessége jelentősen függ az adatbázis konfigurációjától és a táblasémától. Az SQL Kimeneti teljesítmény című cikk részletesebben ismerteti azokat a paramétereket, amelyek maximalizálhatják az írási teljesítményt. Amint azt az Azure Stream Analytics Azure SQL Database-hez készült kimenetében említettük, ez a megoldás nem lineárisan skálázható teljesen párhuzamos folyamatként 8 partíción túl, és előfordulhat, hogy újraparticionálásra van szükség az SQL-kimenet előtt (lásd: INTO). A magas I/O-műveleti sebesség fenntartásához prémium termékváltozatokra van szükség, valamint néhány percenkénti naplómentési terhelés kezelésére is.

Azure Cosmos DB

Betöltési sebesség (események másodpercenként) Streaming egységek Kimeneti erőforrások
1 kelvin 2/3 20 K RU
5 kilométer 4 60 K RU
10 ezer 8 120 000 RU

Azure Cosmos DB Stream Analytics-kimenet úgy frissül, hogy natív integrációt használjon az kompatibilitási szinten 1.2. Az 1.2-es kompatibilitási szint jelentősen nagyobb átviteli sebességet tesz lehetővé, és csökkenti az ru-használatot az 1.1-es szinthez képest, amely az új feladatok alapértelmezett kompatibilitási szintje. A megoldás a /deviceId-en particionált Azure Cosmos DB-tárolókat használja, a többi megoldás pedig azonos módon van konfigurálva.

A Scale Azure-mintákban a streamelés az Event Hubsot használja bemenetként, amelyet a tesztelési ügyfelek terhelésének szimulálása táplál. Minden bemeneti esemény egy 1 KB-os JSON-dokumentum, amely egyszerűen lefordítja a konfigurált betöltési sebességet az átviteli sebességre (1 MB/s, 5 MB/s és 10 MB/s). Az események egy olyan IoT-eszközt szimulálnak, amely legfeljebb 1000 eszközhöz küldi a következő JSON-adatokat (rövidített formában):

{
    "eventId": "b81d241f-5187-40b0-ab2a-940faf9757c0",
    "complexData": {
        "moreData0": 51.3068118685458,
        "moreData22": 45.34076957651598
    },
    "value": 49.02278128887753,
    "deviceId": "contoso://device-id-1554",
    "type": "CO2",
    "createdAt": "2019-05-16T17:16:40.000003Z"
}

Feljegyzés

A konfigurációk a megoldásban használt különböző összetevők miatt változhatnak. A pontosabb becslés érdekében szabja testre a mintákat a forgatókönyvnek megfelelően.

Szűk keresztmetszetek azonosítása

Az Azure Stream Analytics-feladat Metrikák paneljén azonosíthatja a folyamat szűk keresztmetszeteit. Tekintse át a bemeneti/kimeneti eseményeket az átviteli teljesítmény, a "Vízjel késleltetése" vagy a Torlódott események között annak ellenőrzéséhez, hogy a feladat lépést tart-e a bemeneti sebességgel. Event Hubs-metrikák esetén keresse meg a korlátozott kéréseket, és ennek megfelelően módosítsa a küszöbérték-egységeket. Az Azure Cosmos DB metrikák esetében tekintse át a partíciókulcs-tartományonkénti maximálisan felhasznált RU/s-t az Átviteli sebesség alatt, hogy a partíciókulcs-tartományok egységesen legyenek kihasználva. Az Azure SQL DB-hez figyelje a Log IO-t és a CPU-t.

Segítség kérése

További segítségért próbálja ki a Microsoft Q&A kérdésoldalát az Azure Stream Analyticshez.

Következő lépések