Streamek feldolgozása az Azure Stream Analyticsszel

Azure Cosmos DB
Azure Event Hubs
Azure Monitor
Azure Stream Analytics

Ez a referenciaarchitektúra egy végpontok közötti streamfeldolgozási folyamatot mutat be. A folyamat két forrásból betölti az adatokat, korrelálja a két stream rekordjait, és kiszámítja a gördülő átlagot egy időablakban. Az eredmények tárolása további elemzés céljából történik.

GitHub logoAz architektúra referencia-implementációja elérhető a GitHubon.

Felépítés

Diagram showing reference architecture for creating a stream processing pipeline with Azure Stream Analytics.

Töltse le az architektúra Visio-fájlját.

Munkafolyamat

Az architektúra az alábbi összetevőkből áll:

Adatforrások. Ebben az architektúrában két adatforrás létezik, amelyek valós időben hoznak létre adatfolyamokat. Az első stream tartalmazza a menetadatokat, a második pedig a viteldíjadatokat. A referenciaarchitektúra egy szimulált adatgenerátort tartalmaz, amely statikus fájlokból olvas be, és leküldi az adatokat az Event Hubsba. Egy valós alkalmazásban az adatforrások a taxifülkékbe telepített eszközök.

Azure Event Hubs. Az Event Hubs egy eseménybetöltési szolgáltatás. Ez az architektúra két eseményközpont-példányt használ, egyet minden adatforráshoz. Minden adatforrás adatstreamet küld a társított eseményközpontnak.

Azure Stream Analytics. A Stream Analytics egy eseményfeldolgozó motor. A Stream Analytics-feladat beolvassa az adatfolyamokat a két eseményközpontból, és elvégzi a streamfeldolgozást.

Azure Cosmos DB. A Stream Analytics-feladat kimenete rekordsorozat, amely JSON-dokumentumként íródik egy Azure Cosmos DB-dokumentum-adatbázisba.

Microsoft Power BI. A Power BI üzleti elemzési eszközökből álló csomagja az üzleti elemzésekhez szükséges adatok elemzéséhez. Ebben az architektúrában betölti az Adatokat az Azure Cosmos DB-ből. Így a felhasználók elemezhetik az összegyűjtött előzményadatok teljes készletét. Az eredményeket közvetlenül streamelheti a Stream Analyticsből a Power BI-ba az adatok valós idejű megtekintéséhez. További információ: Valós idejű streamelés a Power BI-ban.

Azure Monitor. Az Azure Monitor a megoldásban üzembe helyezett Azure-szolgáltatások teljesítménymetrikáit gyűjti. Ha ezeket egy irányítópulton vizualizálja, betekintést nyerhet a megoldás állapotába.

Forgatókönyv részletei

Forgatókönyv: A taxitársaság minden taxiútról adatokat gyűjt. Ebben a forgatókönyvben feltételezzük, hogy két különálló eszköz küld adatokat. A taxi egy mérőórával rendelkezik, amely információkat küld az egyes utazásokról – az időtartamról, a távolságról, valamint a csomagfelvétel és a legördülő helyekről. Egy külön eszköz fogadja az ügyfelektől érkező kifizetéseket, és adatokat küld a viteldíjakról. A taxi vállalat szeretné kiszámítani az átlagos tipp egy mérföld meghajtott, valós időben, annak érdekében, hogy a trendek.

Lehetséges használati esetek

Ez a megoldás a kiskereskedelmi forgatókönyvhöz van optimalizálva.

Adatok betöltése

Az adatforrás szimulálásához ez a referenciaarchitektúra a New York-i taxiadatkészletet[1] használja. Ez az adatkészlet a New York-i taxiutakra vonatkozó adatokat tartalmazza négyéves időszak alatt (2010–2013). Kétféle rekordot tartalmaz: a menetadatokat és a viteldíjakat. A menetadatok magukban foglalják az utazás időtartamát, az utazás távolságát, valamint a csomagfelvételi és a legördülő helyet. A viteldíjadatok tartalmazzák a viteldíjakat, az adó- és a tippösszegeket. Mindkét rekordtípus gyakori mezői közé tartozik a medálszám, a feltört licenc és a szállító azonosítója. Ez a három mező együttesen azonosítja a taxit és a sofőrt. Az adatok CSV formátumban lesznek tárolva.

[1] Donovan, Brian; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). Illinois-i Egyetem, Urbana-Champaign. https://doi.org/10.13012/J8PN93H8

Az adatgenerátor egy .NET Core-alkalmazás, amely beolvassa a rekordokat, és elküldi őket az Azure Event Hubsnak. A generátor JSON formátumban küldi el a menetadatokat, a menetdíjak adatait PEDIG CSV formátumban.

Az Event Hubs partíciókkal szegmentálta az adatokat. A partíciók lehetővé teszik, hogy a felhasználó párhuzamosan olvassa be az egyes partíciókat. Amikor adatokat küld az Event Hubsnak, explicit módon megadhatja a partíciókulcsot. Ellenkező esetben a rekordok ciklikus időszeleteléses módon vannak hozzárendelve a partíciókhoz.

Ebben a konkrét forgatókönyvben a menetadatoknak és a viteldíjadatoknak egy adott taxifülkéhez ugyanazzal a partícióazonosítóval kell rendelkeznie. Ez lehetővé teszi, hogy a Stream Analytics bizonyos fokú párhuzamosságot alkalmazzon a két stream korrelációja esetén. A menetadatok n partíciójában lévő rekord megegyezik a viteldíjadatok n partíciójában lévő rekorddal.

Diagram of stream processing with Azure Stream Analytics and Event Hubs

Az adatgenerátorban a két rekordtípus közös adatmodellje rendelkezik olyan PartitionKey tulajdonságdal, amely az , HackLicenseés VendorIda Medallion.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Ez a tulajdonság explicit partíciókulcs megadására szolgál az Event Hubsba való küldéskor:

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Streamfeldolgozás

A streamfeldolgozási feladat egy SQL-lekérdezéssel van definiálva, amely több különböző lépésből áll. Az első két lépés egyszerűen kiválasztja a rekordokat a két bemeneti streamből.

WITH
Step1 AS (
    SELECT PartitionId,
           TRY_CAST(Medallion AS nvarchar(max)) AS Medallion,
           TRY_CAST(HackLicense AS nvarchar(max)) AS HackLicense,
           VendorId,
           TRY_CAST(PickupTime AS datetime) AS PickupTime,
           TripDistanceInMiles
    FROM [TaxiRide] PARTITION BY PartitionId
),
Step2 AS (
    SELECT PartitionId,
           medallion AS Medallion,
           hack_license AS HackLicense,
           vendor_id AS VendorId,
           TRY_CAST(pickup_datetime AS datetime) AS PickupTime,
           tip_amount AS TipAmount
    FROM [TaxiFare] PARTITION BY PartitionId
),

A következő lépés összekapcsolja a két bemeneti streamet az egyes streamek megfelelő rekordjainak kiválasztásához.

Step3 AS (
  SELECT tr.TripDistanceInMiles,
         tf.TipAmount
    FROM [Step1] tr
    PARTITION BY PartitionId
    JOIN [Step2] tf PARTITION BY PartitionId
      ON tr.PartitionId = tf.PartitionId
     AND tr.PickupTime = tf.PickupTime
     AND DATEDIFF(minute, tr, tf) BETWEEN 0 AND 15
)

Ez a lekérdezés olyan mezők rekordjait illeszti össze, amelyek egyedileg azonosítják az egyező rekordokat (PartitionId és PickupTime).

Megjegyzés:

Azt szeretnénk, hogy a streamek és a TaxiRide streamek az , VendorIdHackLicenseés PickupTimeaz Medallion.TaxiFare Ebben az esetben a PartitionId mezőket és VendorId a Medallionmezőket HackLicense fedi le, de általában nem szabad ezt figyelembe venni.

A Stream Analyticsben az illesztések időbeliek, ami azt jelenti, hogy a rekordok egy adott időkereten belül lesznek összekapcsolva. Ellenkező esetben előfordulhat, hogy a feladatnak határozatlan ideig várnia kell egyezésre. A DATEDIFF függvény azt határozza meg, hogy egy egyezéshez mennyi ideig lehet két egyező rekordot elválasztani.

A feladat utolsó lépése kiszámítja az átlagos csúcsot mérföldenként, egy 5 perces felugró ablak szerint csoportosítva.

SELECT System.Timestamp AS WindowTime,
       SUM(tr.TipAmount) / SUM(tr.TripDistanceInMiles) AS AverageTipPerMile
  INTO [TaxiDrain]
  FROM [Step3] tr
  GROUP BY HoppingWindow(Duration(minute, 5), Hop(minute, 1))

A Stream Analytics számos ablakfüggvényt biztosít. A felugró ablak előrehalad egy meghatározott időszakkal, ebben az esetben ugrásonként 1 perccel. Az eredmény az elmúlt 5 perc mozgóátlagának kiszámítása.

Az itt látható architektúra csak a Stream Analytics-feladat eredményeit menti az Azure Cosmos DB-be. Big Data-forgatókönyv esetén érdemes lehet az Event Hubs Capture használatával is menteni a nyers eseményadatokat az Azure Blob Storage-ba. A nyers adatok megőrzése lehetővé teszi, hogy kötegelt lekérdezéseket futtasson az előzményadatokon később, hogy új megállapításokat nyerhessen az adatokból.

Considerations

Ezek a szempontok implementálják az Azure Well-Architected Framework alappilléreit, amely a számítási feladatok minőségének javítására használható vezérelvek halmaza. További információ: Microsoft Azure Well-Architected Framework.

Méretezhetőség

Event Hubs

Az Event Hubs átviteli kapacitását átviteli egységekben mérik. Az eseményközpontok automatikus méretezéséhez engedélyezze az automatikus felfújást, amely automatikusan skálázza az átviteli egységeket a forgalom alapján, egy konfigurált maximális értékre.

Stream Analytics

A Stream Analytics esetében a feladathoz lefoglalt számítási erőforrásokat streamelési egységekben mérik. A Stream Analytics-feladatok akkor méretezhetők a legjobban, ha a feladat párhuzamosítható. Így a Stream Analytics több számítási csomóponton is elosztja a feladatot.

Az Event Hubs-bemenethez használja a PARTITION BY kulcsszót a Stream Analytics-feladat particionálásához. Az adatok részhalmazokra lesznek osztva az Event Hubs-partíciók alapján.

Az ablakfüggvények és az időbeli illesztések további su-t igényelnek. Ha lehetséges, használja PARTITION BY az egyes partíciók külön-külön történő feldolgozását. További információ: Streamelési egységek értelmezése és módosítása.

Ha nem lehetséges a teljes Stream Analytics-feladat párhuzamosítása, próbálja meg több lépésre bontani a feladatot, kezdve egy vagy több párhuzamos lépéssel. Így az első lépések párhuzamosan is futtathatók. Ebben a referenciaarchitektúrában például:

  • Az 1. és a 2. lépés egyszerű SELECT utasítások, amelyek egyetlen partíción belül választják ki a rekordokat.
  • A 3. lépés particionált illesztéseket hajt végre két bemeneti adatfolyamon. Ez a lépés kihasználja azt a tényt, hogy az egyező rekordok ugyanazt a partíciókulcsot használják, és így garantáltan ugyanazzal a partícióazonosítóval rendelkeznek az egyes bemeneti adatfolyamokban.
  • A 4. lépés összesíti az összes partíciót. Ez a lépés nem párhuzamos.

A Stream Analytics-feladatdiagram használatával megtekintheti, hogy hány partíció van hozzárendelve a feladat egyes lépéseihez. Az alábbi diagram a referenciaarchitektúra feladatábraét mutatja be:

Diagram showing Stream Analytics jobs.

Azure Cosmos DB

Az Azure Cosmos DB átviteli sebességének mérése kérelemegységekben (RU) történik. Ahhoz, hogy egy Azure Cosmos DB-tároló 10 000 RU-ra skálázható legyen, meg kell adnia egy partíciókulcsot a tároló létrehozásakor, és minden dokumentumban tartalmaznia kell a partíciókulcsot.

Ebben a referenciaarchitektúrában az új dokumentumok percenként csak egyszer jönnek létre (az emelési időintervallum), így az átviteli sebességre vonatkozó követelmények meglehetősen alacsonyak. Ezért ebben a forgatókönyvben nincs szükség partíciókulcs hozzárendelésére.

Figyelés

Bármilyen streamfeldolgozási megoldás esetén fontos a rendszer teljesítményének és állapotának monitorozása. Az Azure Monitor metrikákat és diagnosztikai naplókat gyűjt az architektúrában használt Azure-szolgáltatásokhoz. Az Azure Monitor az Azure platformba van beépítve, és nem igényel további kódot az alkalmazásban.

Az alábbi figyelmeztető jelek bármelyike azt jelzi, hogy fel kell méreteznie a megfelelő Azure-erőforrást:

  • Az Event Hubs szabályozza a kérelmeket, vagy megközelíti a napi üzenetkvótát.
  • A Stream Analytics-feladat következetesen a lefoglalt streamegységek (SU) több mint 80%-át használja.
  • Az Azure Cosmos DB megkezdi a kérések szabályozását.

A referenciaarchitektúra tartalmaz egy egyéni irányítópultot, amely az Azure Portalon van üzembe helyezve. Az architektúra üzembe helyezése után megtekintheti az irányítópultot az Azure Portal megnyitásával és az irányítópultok listájának kiválasztásávalTaxiRidesDashboard. Az egyéni irányítópultok Azure Portalon való létrehozásáról és üzembe helyezéséről további információt az Azure-irányítópultok programozott létrehozása című témakörben talál.

Az alábbi képen az irányítópult látható, miután a Stream Analytics-feladat körülbelül egy órán át futott.

Screenshot of the Taxi Rides dashboard

A bal alsó panelen látható, hogy a Stream Analytics-feladat su-felhasználása az első 15 percben megmászik, majd leesik. Ez egy tipikus minta, mivel a feladat stabil állapotba kerül.

Figyelje meg, hogy az Event Hubs szabályozza a kérelmeket, amely a jobb felső panelen látható. Az időnként szabályozott kérések nem okoznak problémát, mert az Event Hubs ügyféloldali SDK automatikusan újrapróbálkozott, amikor szabályozási hibát kap. Ha azonban konzisztens szabályozási hibákat lát, az azt jelenti, hogy az eseményközpontnak több átviteli egységre van szüksége. Az alábbi grafikon egy tesztfuttatást mutat be az Event Hubs automatikus felfújási funkciójával, amely szükség szerint automatikusan skálázza ki az átviteli egységeket.

Screenshot of Event Hubs autoscaling.

Az automatikus felfújás körülbelül 06:35-kor lett engedélyezve. A szabályozott kérelmek p csökkenése látható, mivel az Event Hubs automatikusan 3 átviteli egységre skálázott.

Érdekes módon ez a Stream Analytics-feladat su-kihasználtságának növelését eredményezte. A szabályozással az Event Hubs mesterségesen csökkentette a Stream Analytics-feladat betöltési sebességét. Valójában gyakori, hogy az egyik teljesítménybeli szűk keresztmetszet feloldása egy másikat fed fel. Ebben az esetben a Stream Analytics-feladathoz tartozó további su kiosztása megoldotta a problémát.

Költségoptimalizálás

A költségoptimalizálás a szükségtelen kiadások csökkentésének és a működési hatékonyság javításának módjairól szól. További információ: A költségoptimalizálási pillér áttekintése.

Az Azure díjkalkulátorával megbecsülheti költségeit. Íme néhány szempont a referenciaarchitektúrában használt szolgáltatásokra vonatkozóan.

Azure Stream Analytics

Az Azure Stream Analytics ára az adatok szolgáltatásba történő feldolgozásához szükséges streamelési egységek száma (0,11 USD/óra).

A Stream Analytics költséges lehet, ha nem valós idejű vagy kis mennyiségű adatot dolgoz fel. Ezekben a használati esetekben érdemes lehet az Azure Functions vagy a Logic Apps használatával adatokat áthelyezni az Azure Event Hubsból egy adattárba.

Az Azure Event Hubs és az Azure Cosmos DB

Az Azure Event Hubsra és az Azure Cosmos DB-re vonatkozó költségekkel kapcsolatos szempontokért tekintse meg az Azure Databricks referenciaarchitektúrájával végzett Stream-feldolgozást.

DevOps

  • Külön erőforráscsoportokat hozhat létre éles, fejlesztési és tesztelési környezetekhez. A külön erőforráscsoportok használata megkönnyíti az üzemelő példányok felügyeletét, a tesztkörnyezetek törlését és a hozzáférési jogok kiosztását.

  • Az Azure Resource Manager-sablonnal üzembe helyezheti az Azure-erőforrásokat az infrastruktúra kódolási (IaC) folyamataként. Sablonokkal egyszerűbb automatizálni az üzembe helyezéseket az Azure DevOps Services vagy más CI/CD-megoldások használatával.

  • Helyezze az egyes számítási feladatokat egy külön üzembehelyezési sablonba, és tárolja az erőforrásokat a forrásvezérlő rendszerekben. A sablonokat egy CI/CD-folyamat részeként együtt vagy egyenként is üzembe helyezheti, így egyszerűbbé teheti az automatizálási folyamatot.

    Ebben az architektúrában az Azure Event Hubs, a Log Analytics és az Azure Cosmos DB egyetlen számítási feladatként van azonosítva. Ezeket az erőforrásokat egyetlen ARM-sablon tartalmazza.

  • Fontolja meg a számítási feladatok átmeneti előkészítését. Helyezze üzembe a különböző szakaszokban, és futtassa az érvényesítési ellenőrzéseket minden egyes szakaszban, mielőtt továbblépne a következő fázisra. Így szigorúan ellenőrzött módon küldheti le a frissítéseket az éles környezetekbe, és minimalizálhatja a nem várt üzembe helyezési problémákat.

  • Fontolja meg az Azure Monitor használatát a streamfeldolgozási folyamat teljesítményének elemzéséhez. További információ: Az Azure Databricks monitorozása.

További információ: a Microsoft Azure Well-Architected Framework működési kiválósági pillére.

A forgatókönyv üzembe helyezése

A referencia-implementáció üzembe helyezéséhez és futtatásához kövesse a GitHub-olvasás lépéseit.

Érdemes lehet áttekinteni a következő Azure-példaforgatókönyveket , amelyek ugyanazon technológiák némelyikét használó konkrét megoldásokat mutatnak be: