Szerkesztés

Megosztás a következőn keresztül:


Streamek feldolgozása az Azure Databricksszel

Azure Cosmos DB
Azure Databricks
Azure Event Hubs
Azure Log Analytics
Azure Monitor

Ez a referenciaarchitektúra egy végpontok közötti streamfeldolgozási folyamatot mutat be. Ez a folyamattípus négy fázisból áll: betöltés, folyamat, tárolás, elemzés és jelentéskészítés. Ebben a referenciaarchitektúrában a folyamat két forrásból betölti az adatokat, összekapcsolja az egyes streamekből származó kapcsolódó rekordokat, gazdagítja az eredményt, és valós időben kiszámítja az átlagot. Az eredmények tárolása további elemzés céljából történik.

GitHub-emblémaAz architektúra referencia-implementációja elérhető a GitHubon.

Architektúra

Az Azure Databricks szolgáltatással végzett streamfeldolgozás referenciaarchitektúrájának ábrája.

Töltse le az architektúra Visio-fájlját.

Munkafolyamat

Az architektúra az alábbi összetevőkből áll:

Adatforrások. Ebben az architektúrában két adatforrás létezik, amelyek valós időben hoznak létre adatfolyamokat. Az első stream tartalmazza a menetadatokat, a második pedig a viteldíjadatokat. A referenciaarchitektúra egy szimulált adatgenerátort tartalmaz, amely statikus fájlokból olvas be, és leküldi az adatokat az Event Hubsba. A valódi alkalmazások adatforrásai a taxifülkékbe telepített eszközök.

Azure Event Hubs. Az Event Hubs egy eseménybetöltési szolgáltatás. Ez az architektúra két eseményközpont-példányt használ, egyet minden adatforráshoz. Minden adatforrás adatstreamet küld a társított eseményközpontnak.

Azure Databricks. A Databricks egy Apache Spark-alapú elemzési platform, amely a Microsoft Azure felhőszolgáltatási platformhoz van optimalizálva. A Databricks a taxiút és a viteldíj adatainak korrelációjára szolgál, valamint a korrelált adatok a Databricks fájlrendszerben tárolt szomszédos adatokkal való gazdagítására.

Azure Cosmos DB. Az Azure Databricks-feladatok kimenete rekordsorozat, amelyet az Apache Cassandra azure Cosmos DB-be írnak. Az Apache Cassandra-hoz készült Azure Cosmos DB azért használatos, mert támogatja az idősorok adatmodellezését.

  • Az Azure Cosmos DB-hez készült Azure Synapse Link lehetővé teszi a közel valós idejű elemzések futtatását az Azure Cosmos DB-ben, anélkül, hogy a tranzakciós számítási feladatra bármilyen teljesítményre vagy költségre hatással lenne, az Azure Synapse-munkaterületről elérhető két elemzési motor használatával: az SQL Serverless és a Spark-készletek használatával.

Azure Log Analytics. Az Azure Monitor által gyűjtött alkalmazásnapló-adatokat egy Log Analytics-munkaterület tárolja. A Log Analytics-lekérdezések metrikák elemzésére és vizualizációira, valamint naplóüzenetek vizsgálatára használhatók az alkalmazáson belüli problémák azonosításához.

Alternatívák

  • A Synapse Link a Microsoft által előnyben részesített megoldás az Azure Cosmos DB-adatokon alapuló elemzésekhez.

Forgatókönyv részletei

Forgatókönyv: A taxitársaság minden taxiútról adatokat gyűjt. Ebben a forgatókönyvben feltételezzük, hogy két különálló eszköz küld adatokat. A taxi egy mérőeszközt tartalmaz, amely információkat küld az egyes utazásokról – az időtartamról, a távolságról, a csomagfelvételről és a legördülő helyekről. Egy külön eszköz fogadja az ügyfelektől érkező kifizetéseket, és adatokat küld a viteldíjakról. A ridership trendek észlelése érdekében a taxitársaság valós időben szeretné kiszámítani az egyes környékek átlagos, mérföldenkénti tippjeit.

Lehetséges használati esetek

Ez a megoldás a kiskereskedelmi iparág számára van optimalizálva.

Adatok betöltése

Az adatforrás szimulálásához ez a referenciaarchitektúra a New York-i taxiadatkészletet[1] használja. Ez az adatkészlet a New York-i taxiutakra vonatkozó adatokat tartalmazza négyéves időszak alatt (2010–2013). Kétféle rekordot tartalmaz: Ride-adatokat és viteldíjakat. A menetadatok magukban foglalják az utazás időtartamát, az utazás távolságát, valamint a csomagfelvételi és a legördülő helyet. A viteldíjadatok tartalmazzák a viteldíjakat, az adó- és a tippösszegeket. Mindkét rekordtípus gyakori mezői közé tartozik a medálszám, a feltört licenc és a szállító azonosítója. Ez a három mező együttesen azonosítja a taxit és a sofőrt. Az adatok CSV formátumban lesznek tárolva.

[1] Donovan, Brian; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). Illinois-i Egyetem, Urbana-Champaign. https://doi.org/10.13012/J8PN93H8

Az adatgenerátor egy .NET Core-alkalmazás, amely beolvassa a rekordokat, és elküldi őket az Azure Event Hubsnak. A generátor JSON formátumban küldi el a menetadatokat, a menetdíjak adatait PEDIG CSV formátumban.

Az Event Hubs partíciókkal szegmentálta az adatokat. A partíciók lehetővé teszik, hogy a felhasználó párhuzamosan olvassa be az egyes partíciókat. Amikor adatokat küld az Event Hubsnak, explicit módon megadhatja a partíciókulcsot. Ellenkező esetben a rekordok ciklikus időszeleteléses módon vannak hozzárendelve a partíciókhoz.

Ebben a forgatókönyvben a menetadatoknak és a viteldíjadatoknak ugyanazzal a partícióazonosítóval kell rendelkeznie egy adott taxifülkéhez. Ez lehetővé teszi, hogy a Databricks bizonyos fokú párhuzamosságot alkalmazzon a két stream korrelációja esetén. A menetadatok n partíciójában lévő rekord megegyezik a viteldíjadatok n partíciójában lévő rekorddal.

Az Azure Databricks és az Event Hubs streamfeldolgozásának diagramja.

Töltse le az architektúra Visio-fájlját.

Az adatgenerátorban mindkét rekordtípus közös adatmodellje olyan PartitionKey tulajdonságot Medallionhasznál, amely az , HackLicenseés VendorIda .

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Ez a tulajdonság explicit partíciókulcs megadására szolgál az Event Hubsba való küldéskor:

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Event Hubs

Az Event Hubs átviteli kapacitását átviteli egységekben mérik. Az eseményközpontok automatikus méretezéséhez engedélyezze az automatikus felfújást, amely automatikusan skálázza az átviteli egységeket a forgalom alapján, egy konfigurált maximális értékre.

Streamfeldolgozás

Az Azure Databricksben az adatfeldolgozást egy feladat végzi. A feladat egy fürthöz van rendelve, és egy fürtön fut. A feladat lehet java nyelven írt egyéni kód vagy Spark-jegyzetfüzet.

Ebben a referenciaarchitektúrában a feladat egy Java-archívum, amely Java és Scala nyelven is ír osztályokat. Egy Databricks-feladat Java-archívumának megadásakor az osztályt a Databricks-fürt hajtja végre. Itt az maincom.microsoft.pnp.TaxiCabReader osztály metódusa tartalmazza az adatfeldolgozási logikát.

A stream olvasása a két eseményközpont-példányból

Az adatfeldolgozási logika Spark strukturált streamelést használ a két Azure-eseményközpont-példányból való olvasáshoz:

val rideEventHubOptions = EventHubsConf(rideEventHubConnectionString)
      .setConsumerGroup(conf.taxiRideConsumerGroup())
      .setStartingPosition(EventPosition.fromStartOfStream)
    val rideEvents = spark.readStream
      .format("eventhubs")
      .options(rideEventHubOptions.toMap)
      .load

    val fareEventHubOptions = EventHubsConf(fareEventHubConnectionString)
      .setConsumerGroup(conf.taxiFareConsumerGroup())
      .setStartingPosition(EventPosition.fromStartOfStream)
    val fareEvents = spark.readStream
      .format("eventhubs")
      .options(fareEventHubOptions.toMap)
      .load

Az adatok bővítése a környék információival

A menetadatok tartalmazzák a fel- és levételi helyek szélességi és hosszúsági koordinátáit. Bár ezek a koordináták hasznosak, nem könnyen használhatók elemzéshez. Ezért ezek az adatok egy alakzatfájlból beolvasott szomszédsági adatokkal bővülnek.

Az alakzatfájl formátuma bináris, és nem könnyen elemezhető, de a GeoTools könyvtár eszközöket biztosít az alakzatfájl formátumot használó térinformatikai adatokhoz. Ez a kódtár az com.microsoft.pnp.GeoFinder osztályban a fel- és levételi koordináták alapján határozza meg a környék nevét.

val neighborhoodFinder = (lon: Double, lat: Double) => {
      NeighborhoodFinder.getNeighborhood(lon, lat).get()
    }

Csatlakozás az utazáshoz és a viteldíjadatokhoz

Először a menet- és viteldíjadatok lesznek átalakítva:

    val rides = transformedRides
      .filter(r => {
        if (r.isNullAt(r.fieldIndex("errorMessage"))) {
          true
        }
        else {
          malformedRides.add(1)
          false
        }
      })
      .select(
        $"ride.*",
        to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
          .as("pickupNeighborhood"),
        to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
          .as("dropoffNeighborhood")
      )
      .withWatermark("pickupTime", conf.taxiRideWatermarkInterval())

    val fares = transformedFares
      .filter(r => {
        if (r.isNullAt(r.fieldIndex("errorMessage"))) {
          true
        }
        else {
          malformedFares.add(1)
          false
        }
      })
      .select(
        $"fare.*",
        $"pickupTime"
      )
      .withWatermark("pickupTime", conf.taxiFareWatermarkInterval())

Ezután a menetadatok a viteldíjadatokkal lesznek összekapcsolva:

val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))

Az adatok feldolgozása és az Azure Cosmos DB-be való beszúrás

Az egyes környékek átlagos viteldíj-összegét egy adott időintervallumra számítjuk ki:

val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
      .groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
      .agg(
        count("*").as("rideCount"),
        sum($"fareAmount").as("totalFareAmount"),
        sum($"tipAmount").as("totalTipAmount"),
        (sum($"fareAmount")/count("*")).as("averageFareAmount"),
        (sum($"tipAmount")/count("*")).as("averageTipAmount")
      )
      .select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")

Ezt követően beszúrjuk az Azure Cosmos DB-be:

maxAvgFarePerNeighborhood
      .writeStream
      .queryName("maxAvgFarePerNeighborhood_cassandra_insert")
      .outputMode(OutputMode.Append())
      .foreach(new CassandraSinkForeach(connector))
      .start()
      .awaitTermination()

Megfontolások

Ezek a szempontok implementálják az Azure Well-Architected Framework alappilléreit, amely a számítási feladatok minőségének javítására használható vezérelvek halmaza. További információ: Microsoft Azure Well-Architected Framework.

Biztonság

A biztonság biztosítékokat nyújt a szándékos támadások és az értékes adatokkal és rendszerekkel való visszaélés ellen. További információ: A biztonsági pillér áttekintése.

Az Azure Databricks-munkaterülethez való hozzáférést a rendszergazdai konzolon lehet szabályozni. A rendszergazdai konzol olyan funkciókat tartalmaz, amelyek lehetővé teszi a felhasználók hozzáadását, a felhasználói engedélyek kezelését és az egyszeri bejelentkezés beállítását. A munkaterületek, fürtök, feladatok és táblák hozzáférés-vezérlése a rendszergazdai konzolon is beállítható.

Titkos kódok kezelése

Az Azure Databricks tartalmaz egy titkos tárat, amely titkos kulcsok tárolására szolgál, beleértve a kapcsolati sztring, a hozzáférési kulcsokat, a felhasználóneveket és a jelszavakat. Az Azure Databricks titkos tárában lévő titkos kulcsok hatókörök szerint vannak particionálva:

databricks secrets create-scope --scope "azure-databricks-job"

A titkos kulcsok a hatókör szintjén vannak hozzáadva:

databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

Feljegyzés

Az Azure Key Vault által támogatott hatókör használható a natív Azure Databricks-hatókör helyett. További információ: Azure Key Vault által támogatott hatókörök.

A kódban a titkos kódok az Azure Databricks titkos kulcsok segédprogramjaival érhetők el.

Figyelés

Az Azure Databricks az Apache Sparkon alapul, és mindkettő a log4j-t használja a naplózás szabványos kódtáraként. Az Apache Spark által biztosított alapértelmezett naplózás mellett az Azure Databricks monitorozása című cikk alapján az Azure Log Analyticsben is implementálhatja a naplózást.

Mivel az com.microsoft.pnp.TaxiCabReader osztály menet- és viteldíj-üzeneteket dolgoz fel, lehetséges, hogy valamelyik hibás, ezért érvénytelen. Éles környezetben fontos elemezni ezeket a hibásan formázott üzeneteket, hogy azonosítani lehessen az adatforrásokkal kapcsolatos problémát, hogy gyorsan javítható legyen az adatvesztés megakadályozása érdekében. Az com.microsoft.pnp.TaxiCabReader osztály regisztrál egy Apache Spark-akkumulátort, amely nyomon követi a hibásan formázott viteldíjak és a menetjegyek számát:

    @transient val appMetrics = new AppMetrics(spark.sparkContext)
    appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
    appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
    SparkEnv.get.metricsSystem.registerSource(appMetrics)

Az Apache Spark a Dropwizard-kódtárat használja metrikák küldéséhez, és a natív Dropwizard-metrikák egyes mezői nem kompatibilisek az Azure Log Analyticsszel. Ezért ez a referenciaarchitektúra egy egyéni Dropwizard fogadót és riportert tartalmaz. A metrikákat az Azure Log Analytics által várt formátumban formázza. Amikor az Apache Spark metrikákat jelent, a rendszer elküldi a hibás út és a viteldíjadatok egyéni metrikáit is.

Az alábbi példalekérdezések az Azure Log Analytics-munkaterületen a streamelési feladat végrehajtásának figyelésére használhatók. Az egyes lekérdezések argumentumai ago(1d) az utolsó napon létrehozott összes rekordot visszaadják, és módosíthatók egy másik időszak megtekintéséhez.

Stream-lekérdezés végrehajtása során naplózott kivételek

SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"

Helytelen viteldíj és menetadatok felhalmozása

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Feladat végrehajtása az idő függvényében

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart

További információ: Az Azure Databricks monitorozása.

DevOps

  • Külön erőforráscsoportokat hozhat létre éles, fejlesztési és tesztelési környezetekhez. A külön erőforráscsoportok használata megkönnyíti az üzemelő példányok felügyeletét, a tesztkörnyezetek törlését és a hozzáférési jogok kiosztását.

  • Az Azure Resource Manager-sablonnal üzembe helyezheti az Azure-erőforrásokat az infrastruktúra kódolási (IaC) folyamataként. Sablonokkal egyszerűbb automatizálni az üzembe helyezéseket az Azure DevOps Services vagy más CI/CD-megoldások használatával.

  • Helyezze az egyes számítási feladatokat egy külön üzembehelyezési sablonba, és tárolja az erőforrásokat a forrásvezérlő rendszerekben. A sablonokat egy CI/CD-folyamat részeként együtt vagy egyenként is üzembe helyezheti, így egyszerűbbé teheti az automatizálási folyamatot.

    Ebben az architektúrában az Azure Event Hubs, a Log Analytics és az Azure Cosmos DB egyetlen számítási feladatként van azonosítva. Ezeket az erőforrásokat egyetlen ARM-sablon tartalmazza.

  • Fontolja meg a számítási feladatok átmeneti előkészítését. Helyezze üzembe a különböző szakaszokban, és futtassa az érvényesítési ellenőrzéseket minden egyes szakaszban, mielőtt továbblépne a következő fázisra. Így szigorúan ellenőrzött módon küldheti le a frissítéseket az éles környezetekbe, és minimalizálhatja a nem várt üzembe helyezési problémákat.

    Ebben az architektúrában több üzembe helyezési fázis is létezik. Érdemes lehet létrehozni egy Azure DevOps-folyamatot, és hozzáadni ezeket a fázisokat. Íme néhány példa az automatizálható szakaszokra:

    • Databricks-fürt indítása
    • A Databricks parancssori felületének konfigurálása
    • A Scala Tools telepítése
    • A Databricks titkos kulcsainak hozzáadása

    Emellett érdemes lehet automatizált integrációs teszteket írni a Databricks-kód és életciklusa minőségének és megbízhatóságának javítása érdekében.

  • Fontolja meg az Azure Monitor használatát a streamfeldolgozási folyamat teljesítményének elemzéséhez. További információ: Az Azure Databricks monitorozása.

További információt a Microsoft Azure Well-Architected Framework DevOps szakaszában talál.

Költségoptimalizálás

A költségoptimalizálás a szükségtelen kiadások csökkentésének és a működési hatékonyság javításának módjairól szól. További információ: A költségoptimalizálási pillér áttekintése.

Az Azure díjkalkulátorával megbecsülheti költségeit. Íme néhány szempont a referenciaarchitektúrában használt szolgáltatásokra vonatkozóan.

Event Hubs

Ez a referenciaarchitektúra az Event Hubsot a Standard szinten helyezi üzembe. A díjszabási modell az átviteli egységeken, a bejövő eseményeken és az események rögzítésén alapul. Egy belépő esemény egy legfeljebb 64 KB nagyságú adategység. A nagyobb üzenetek számlázása a 64 KB többszörösei alapján történik. Az átviteli egységeket az Azure Portalon vagy az Event Hubs felügyeleti API-ján keresztül adhatja meg.

Ha több megőrzési napra van szüksége, fontolja meg a dedikált szintet. Ez a szint a legigényesebb követelményekkel rendelkező egybérlős üzemelő példányokat kínál. Ez az ajánlat egy olyan fürtöt hoz létre a kapacitásegységek (CU) alapján, amelyet nem kötnek össze átviteli egységek.

A standard szint számlázása a bejövő események és az átviteli egységek alapján is történik.

Az Event Hubs díjszabásával kapcsolatos információkért tekintse meg az Event Hubs díjszabását.

Azure Databricks

Az Azure Databricks két standard és prémium szintű szolgáltatást kínál, amelyek mindegyike három számítási feladatot támogat. Ez a referenciaarchitektúra üzembe helyezi az Azure Databricks-munkaterületet a Prémium szinten.

A könnyű számítási feladatok adatmérnök és adatmérnök az adatmérnökök feladata a feladatok létrehozása és végrehajtása. A Data Analytics számítási feladat célja, hogy az adattudósok interaktívan feltárják, vizualizálják, manipulálják és megosztják az adatokat és az elemzéseket.

Az Azure Databricks számos díjszabási modellt kínál.

  • Használatalapú fizetési csomag

    A kiválasztott virtuálisgép-példány alapján a fürtökben és a Databricks-egységekben (DBU-kban) kiépített virtuális gépekért kell fizetnie. A Databricks-egység a feldolgozási képesség egysége, amely másodpercenkénti használat alapján van számlázva. A DBU-felhasználás az Azure Databrickst futtató példány méretétől és típusától függ. A díjszabás a kiválasztott számítási feladattól és szinttől függ.

  • Elővásárlási csomag

    Az Azure Databricks Units (DBU) kötelezettséget vállal databricks véglegesítési egységként (DBCU) egy vagy három évre. A használatalapú fizetéses modellhez képest akár 37%-ot is megtakaríthat.

További információkért lásd az Azure Databricks díjszabását.

Azure Cosmos DB

Ebben az architektúrában az Azure Databricks-feladat rekordokat ír az Azure Cosmos DB-be. A lekérhető kapacitásért a rendszer másodpercenkénti kérelemegységekben (RU/s) kifejezve számít fel díjat a beszúrási műveletek végrehajtásához. A számlázási egység óránként 100 RU/s. A 100 KB-os elemek írásának költsége például 50 RU/s.

Az írási műveletekhez elegendő kapacitást kell kiépíteni a másodpercenként szükséges írások számának támogatásához. A kiosztott átviteli sebesség növeléséhez használja a portált vagy az Azure CLI-t az írási műveletek végrehajtása előtt, majd csökkentse az átviteli sebességet a műveletek befejezése után. Az írási időszak átviteli sebessége a megadott adatokhoz szükséges minimális átviteli sebesség, valamint a beszúrási művelethez szükséges átviteli sebesség, feltéve, hogy más számítási feladat nem fut.

Példa költségelemzésre

Tegyük fel, hogy 1000 RU/s átviteli sebességű értéket konfigurál egy tárolón. 30 napig, összesen 720 órán át 24 órán keresztül van üzembe helyezve.

A tároló számlázása óránként 10 egység 100 RU/s/s sebességgel történik. 10 egység 0,008 dollárért (óránként 100 RU/s-ért) 0,08 USD/óra díjat számítunk fel.

720 órán át vagy 7200 egységért (100 kérelemegységből) 57,60 dollárt számlázunk ki a hónapra.

A tárterületet is kiszámlázzuk a tárolt adatokhoz és indexekhez használt minden GB-ra vonatkozóan. További információkért tekintse meg az Azure Cosmos DB díjszabási modelljét.

Az Azure Cosmos DB kapacitáskalkulátorával gyors becslést kaphat a számítási feladatok költségeiről.

További információért lásd a Microsoft Azure Well-Architected Framework költségekkel kapcsolatos részét.

A forgatókönyv üzembe helyezése

A referencia-implementáció üzembe helyezéséhez és futtatásához kövesse a GitHub-olvasás lépéseit.

Következő lépések