Dela via


Strömbearbetning med Azure Databricks

Azure Cosmos DB
Azure Databricks
Händelsehubbar för Azure
Azure Log Analytics
Azure Monitor

Den här referensarkitekturen visar en dataströmbearbetningspipeline från slutpunkt till slutpunkt. De fyra stegen i den här pipelinen omfattar inmatning, process, lagring och analys och rapport. För den här referensarkitekturen matar pipelinen in data från två källor, utför en koppling på relaterade poster från varje dataström, berikar resultatet och beräknar ett genomsnitt i realtid. Resultaten lagras sedan för ytterligare analys.

Arkitektur

diagram som visar en referensarkitektur för dataströmbearbetning med Azure Databricks.

Ladda ned en Visio-fil av den här arkitekturen.

Dataflöde

Följande dataflöde motsvarar föregående diagram:

  1. Ingest

    Två driftdataströmmar i realtid matar systemet: biljettdata och resedata . Enheter som är installerade i taxibilar fungerar som datakällor och publicerar händelser till Azure Event Hubs. Varje dataström går till sin egen händelsehubbinstans, som tillhandahåller oberoende inmatningsvägar.

  2. Bearbeta

    Azure Databricks använder båda Event Hubs-strömmarna och kör följande åtgärder:

    • Korrelerar biljettuppgifter med reseuppgifter
    • Berikar data med hjälp av en tredje datauppsättning som innehåller grannskapssökningsdata som lagras i Azure Databricks-filsystemet

    Den här processen skapar en enhetlig, berikad datauppsättning som lämpar sig för nedströmsanalys och lagring.

  3. Butik

    Resultatet av Azure Databricks-jobben är en serie poster. De bearbetade posterna skrivs till Azure Cosmos DB för NoSQL.

  4. Analysera/rapportera

    Fabric speglar driftdata från Azure Cosmos DB för NoSQL för att aktivera analysfrågor utan att påverka transaktionsprestanda. Den här metoden erbjuder en ETL-fri väg för analys. I den här arkitekturen kan du använda spegling i följande syften:

    • Spegla data från Azure Cosmos DB (eller Delta-formaterade data) i Fabric
    • Håll datauppsättningar synkroniserade med driftsystemet
    • Aktivera analys via följande verktyg:
      • Fabric SQL-analysslutpunkter för lakehouses och lager
      • Apache Spark-notebooks
      • Realtidsanalys med hjälp av Kusto Query Language (KQL) för utforskning av tidsserier och loggformat
  5. Monitor

    Azure Monitor samlar in telemetri från Azure Databricks-bearbetningspipelinen. En Log Analytics-arbetsyta lagrar programloggar och mått. Du kan utföra följande åtgärder:

    • Sök i driftsloggar
    • Visualisera mått
    • Inspektera fel, avvikelser och prestandaproblem
    • Skapa instrumentpaneler

Components

  • Azure Databricks är en Spark-baserad analysplattform optimerad för Azure-plattformen. I den här arkitekturen berikar Azure Databricks-jobb taxiresor och biljettdata och lagrar resultaten i Azure Cosmos DB.

  • Event Hubs är en hanterad, distribuerad inmatningstjänst som kan skalas för att mata in stora mängder händelser. Den här arkitekturen använder två händelsehubbinstanser för att ta emot data från taxibilar.

  • Azure Cosmos DB for NoSQL är en hanterad databastjänst med flera modeller. I den här arkitekturen lagras utdata från Azure Databricks-berikningsjobben. Fabric speglar Driftdata för Azure Cosmos DB för att aktivera analysfrågor.

  • Log Analytics är ett verktyg i Azure Monitor som hjälper dig att fråga och analysera loggdata från olika källor. I den här arkitekturen konfigurerar alla resurser Azure Diagnostics för att lagra plattformsloggar på den här arbetsytan. Arbetsytan fungerar också som datainsamlare för Spark-jobbmått som genereras från Azure Databricks bearbetningslinjer.

Information om scenario

Ett taxiföretag samlar in data om varje taxiresa. I det här scenariot förutsätter vi att två separata enheter skickar data. Taxin har en mätare som skickar information om varje resa, inklusive varaktighet, avstånd och upphämtnings- och avlämningsplatser. En separat enhet accepterar betalningar från kunder och skickar data om priser. För att upptäcka ridership-trender vill taxiföretaget beräkna den genomsnittliga dricksen per mil som körs för varje grannskap, i realtid.

Datainsamling

För att simulera en datakälla använder den här referensarkitekturen datauppsättningen taxidata i New York City. Den här datamängden innehåller data om taxiresor i New York City från 2010 till 2013. Den innehåller både rese- och biljettdataposter. Ride-data inkluderar resans varaktighet, reseavstånd och upphämtnings- och avlämningsplatserna. Prisdata inkluderar belopp för biljettpriser, skatter och tips. Fält i båda posttyperna inkluderar medaljongnummer, hacklicens och leverantörs-ID. Kombinationen av dessa tre fält identifierar unikt en taxi och en drivrutin. Data lagras i CSV-format.

Datageneratorn är ett .NET Core-program som läser posterna och skickar dem till Event Hubs. Generatorn skickar kördata i JSON-format och prisdata i CSV-format.

Event Hubs använder partitioner för att segmentera data. Med partitioner kan en konsument läsa alla läsdata parallellt. När du skickar data till Event Hubs kan du ange partitionsnyckeln direkt. I annat fall tilldelas poster till partitioner i resursallokering.

I det här scenariot ska kördata och prisdata tilldelas samma partitions-ID för en specifik taxi. Den här tilldelningen gör det möjligt för Databricks att tillämpa en grad av parallellitet när de korrelerar de två strömmarna. En post i partitionen n av kördata matchar till exempel en post i partitionen n av prisdata.

Diagram över dataströmbearbetning med Azure Databricks och Event Hubs.

Ladda ned en Visio-fil med den här arkitekturen.

I datageneratorn har den gemensamma datamodellen för båda posttyperna en PartitionKey egenskap som är sammanlänkningen av Medallion, HackLicenseoch VendorId.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Den här egenskapen tillhandahåller en explicit partitionsnyckel när den skickar data till Event Hubs.

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Event Hubs

Dataflödeskapaciteten för Event Hubs mäts i dataflödesenheter. Du kan automatiskt skala en händelsehubb genom att aktivera autoinflate. Den här funktionen skalar automatiskt dataflödesenheterna baserat på trafik, upp till ett konfigurerat maxvärde.

Dataströmbearbetning

I Azure Databricks utför ett jobb databehandling. Jobbet tilldelas till ett kluster och körs sedan på det. Jobbet kan vara anpassad kod som skrivits i Java eller en Spark notebook-.

I den här referensarkitekturen är jobbet ett Java-arkiv som har klasser skrivna i Java och Scala. När du anger Java-arkivet för ett Azure Databricks-jobb anger Azure Databricks-klustret klassen för drift. main Här innehåller -metoden för com.microsoft.pnp.TaxiCabReader klassen databearbetningslogik.

Läsa strömmen från de två händelsehubbens instanser

Databearbetningslogik använder Spark-strukturerad strömning för att läsa från de två Azure-händelsehubbens instanser:

// Create a token credential using Managed Identity
val credential = new DefaultAzureCredentialBuilder().build()

val rideEventHubOptions = EventHubsConf(rideEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiRideConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val rideEvents = spark.readStream
  .format("eventhubs")
  .options(rideEventHubOptions.toMap)
  .load

val fareEventHubOptions = EventHubsConf(fareEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiFareConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val fareEvents = spark.readStream
  .format("eventhubs")
  .options(fareEventHubOptions.toMap)
  .load

Utöka data med grannskapsinformationen

Färddata innehåller koordinaterna för latitud och longitud för upphämtnings- och avlämningsplatserna. Dessa koordinater är användbara men är inte lätta att använda för analys. Så bearbetningskedjan berikar dessa data med grannskapsdata som läses från en shapefile.

Formfilformatet är binärt och inte enkelt parsat. Men GeoTools-biblioteket innehåller verktyg för geospatiala data som använder formfilformatet. Det här biblioteket används i klassen com.microsoft.pnp.GeoFinder för att fastställa grannskapets namn baserat på koordinaterna för upphämtnings- och avlämningsplatser.

val neighborhoodFinder = (lon: Double, lat: Double) => {
      NeighborhoodFinder.getNeighborhood(lon, lat).get()
    }

Gå med i ride- och fare-data

Först transformeras ride- och fare-data:

val rides = transformedRides
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedRides.add(1)
      false
    }
  })
  .select(
    $"ride.*",
    to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
      .as("pickupNeighborhood"),
    to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
      .as("dropoffNeighborhood")
  )
  .withWatermark("pickupTime", conf.taxiRideWatermarkInterval())

val fares = transformedFares
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedFares.add(1)
      false
    }
  })
  .select(
    $"fare.*",
    $"pickupTime"
  )
  .withWatermark("pickupTime", conf.taxiFareWatermarkInterval())

Sedan kopplas ride-data till prisdata:

val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))

Bearbeta data och infoga dem i Azure Cosmos DB

Det genomsnittliga prisbeloppet för varje grannskap beräknas för ett visst tidsintervall:

val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
      .groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
      .agg(
        count("*").as("rideCount"),
        sum($"fareAmount").as("totalFareAmount"),
        sum($"tipAmount").as("totalTipAmount"),
        (sum($"fareAmount")/count("*")).as("averageFareAmount"),
        (sum($"tipAmount")/count("*")).as("averageTipAmount")
      )
      .select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")

Det genomsnittliga prisbeloppet infogas sedan i Azure Cosmos DB:

maxAvgFarePerNeighborhood
  .writeStream
  .format("cosmos.oltp")
  .option("spark.cosmos.accountEndpoint", "<your-cosmos-endpoint>")
  .option("spark.cosmos.accountKey", "<your-cosmos-key>")
  .option("spark.cosmos.database", "<your-database-name>")
  .option("spark.cosmos.container", "<your-container-name>")
  .option("checkpointLocation", "/mnt/checkpoints/maxAvgFarePerNeighborhood")
  .outputMode("append")
  .start()
  .awaitTermination()

Att tänka på

Dessa överväganden implementerar grundpelarna i Azure Well-Architected Framework, som är en uppsättning vägledande grundsatser som du kan använda för att förbättra kvaliteten på en arbetsbelastning. Mer information finns iWell-Architected Framework.

Säkerhet

Säkerhet ger garantier mot avsiktliga attacker och missbruk av dina värdefulla data och system. Mer information finns i checklistan för Designgranskning för Security.

Åtkomst till Azure Databricks-arbetsytan styrs med hjälp av -administratörskonsolen. Administratörskonsolen innehåller funktioner för att lägga till användare, hantera användarbehörigheter och konfigurera enkel inloggning. Åtkomstkontroll för arbetsytor, kluster, jobb och tabeller kan också anges via administratörskonsolen.

Hantera hemligheter

Azure Databricks innehåller ett hemligt arkiv som används för att lagra autentiseringsuppgifter och referera till dem i notebook-filer och jobb. Omfattningar partitionshemligheter i Azure Databricks-hemlighetslagret:

databricks secrets create-scope --scope "azure-databricks-job"

Hemligheter läggs till på omfångsnivå:

databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

Kommentar

Använd ett Azure Key Vault-backat omfång i stället för det interna Azure Databricks-omfånget.

Kod får åtkomst till hemligheter via Azure Databricks-verktyg för hemligheter.

Kostnadsoptimering

Kostnadsoptimering fokuserar på sätt att minska onödiga utgifter och förbättra drifteffektiviteten. Mer information finns i checklistan Designgranskning för kostnadsoptimering.

Normalt beräknar du kostnader med hjälp av priskalkylatorn för Azure. Överväg följande tjänster som används i den här referensarkitekturen.

Kostnadsöverväganden för Event Hubs

Den här referensarkitekturen distribuerar Event Hubs på standardnivån. Prismodellen baseras på dataflödesenheter, ingresshändelser och avbildningshändelser. En ingresshändelse är en dataenhet som är 64 KB eller mindre. Större meddelanden faktureras i multipler av 64 kB. Du anger dataflödesenheter antingen via Azure Portal- eller Event Hubs-hanterings-API:er.

Om du behöver fler kvarhållningsdagar bör du överväga den dedikerade nivån. Den här nivån tillhandahåller distributioner med en enda klientorganisation som har stränga krav. Det här erbjudandet bygger ett kluster baserat på kapacitetsenheter och är inte beroende av dataflödesenheter. Standardnivån faktureras också baserat på ingresshändelser och dataflödesenheter.

Mer information finns i Event Hubs-priser.

Kostnadsöverväganden för Azure Databricks

Azure Databricks tillhandahåller standardnivån och Premium-nivån, som båda har stöd för tre arbetsbelastningar. Den här referensarkitekturen distribuerar en Azure Databricks-arbetsyta på Premium-nivån.

Datateknikarbetsbelastningar ska köras på ett jobbkluster. Datatekniker använder kluster för att skapa och utföra jobb. Dataanalysarbetsbelastningar ska köras i ett kluster för alla syften och är avsedda för dataexperter att utforska, visualisera, manipulera och dela data och insikter interaktivt.

Azure Databricks tillhandahåller flera prismodeller.

  • betala per användning-plan

    Du debiteras för virtuella datorer (VM) som etablerats i kluster och Azure Databricks-enheter (DBUs) baserat på den valda VM-instansen. En DBU är en bearbetningsenhet som Azure fakturerar per användning per sekund. DBU-förbrukningen beror på storleken och typen av instans som körs i Azure Databricks. Prissättningen beror på den valda arbetsbelastningen och nivån.

  • förköpsplan

    Du checkar in på DBU:er som Azure Databricks-incheckningsenheter i antingen ett eller tre år för att minska den totala ägandekostnaden under den tidsperioden jämfört med modellen betala per användning.

Mer information finns i Prissättning för Azure Databricks.

Kostnadsöverväganden för Azure Cosmos DB

I den här arkitekturen skriver Azure Databricks-jobbet en serie poster till Azure Cosmos DB. Du debiteras för den kapacitet som du reserverar, vilket mäts i enheter för programbegäran per sekund (RU/s). Den här kapaciteten används för att utföra infogningsåtgärder. Faktureringsenheten är 100 RU/s per timme. Till exempel är kostnaden för att skriva 100 KB-objekt 50 RU/s.

För skrivåtgärder konfigurerar du tillräckligt med kapacitet för att stödja antalet skrivningar som behövs per sekund. Du kan öka det etablerade dataflödet med hjälp av portalen eller Azure CLI innan du utför skrivåtgärder och sedan minska dataflödet när dessa åtgärder har slutförts. Ditt dataflöde för skrivperioden är summan av det minsta dataflöde som krävs för specifika data och det dataflöde som krävs för infogningsåtgärden. Den här beräkningen förutsätter att det inte finns någon annan arbetsbelastning som körs.

Exempel på kostnadsanalys

Anta att du konfigurerar ett dataflödesvärde på 1 000 RU/s på en container och kör det kontinuerligt i 30 dagar, vilket är lika med 720 timmar.

Containern debiteras med 10 enheter på 100 RU/s per timme för varje timme. Tio enheter till 0,008 USD (per 100 RU/s per timme) debiteras till 0,08 USD per timme.

För 720 timmar eller 7 200 enheter (av 100 RU:er) debiteras du 57,60 USD för månaden.

Lagring faktureras också för varje GB som används för dina lagrade data och index. Mer information finns i Prismodellen för Azure Cosmos DB.

Använd Azure Cosmos DB-kapacitetskalkylatorn för en snabb uppskattning av arbetsbelastningskostnaden.

Operativ skicklighet

Operational Excellence omfattar de driftsprocesser som distribuerar ett program och håller det igång i produktion. Mer information finns i checklistan för Designgranskning för Operational Excellence.

Övervakning

Azure Databricks baseras på Apache Spark. Både Azure Databricks och Apache Spark använder Apache Log4j som standardbibliotek för loggning. Förutom standardloggningen som Apache Spark tillhandahåller kan du implementera loggning i Log Analytics. Mer information finns i Övervaka Azure Databricks.

Eftersom com.microsoft.pnp.TaxiCabReader-klassen bearbetar ride- och fare-meddelanden kan ett meddelande vara felaktigt och därför inte giltigt. I en produktionsmiljö är det viktigt att analysera dessa felaktiga meddelanden för att identifiera ett problem med datakällorna så att de snabbt kan åtgärdas för att förhindra dataförlust. Klassen com.microsoft.pnp.TaxiCabReader registrerar en Apache Spark Accumulator som spårar antalet felformade biljettposter och ride-poster:

@transient val appMetrics = new AppMetrics(spark.sparkContext)
appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
SparkEnv.get.metricsSystem.registerSource(appMetrics)

Apache Spark använder Dropwizard-biblioteket för att skicka mått. Vissa av de interna Dropwizard-måttfälten är inte kompatibla med Log Analytics, vilket är anledningen till att den här referensarkitekturen innehåller en anpassad Dropwizard-mottagare och reporter. Den formaterar måtten i det format som Log Analytics förväntar sig. När Apache Spark rapporterar mått skickas också anpassade mått för den felaktiga resan och prisdata.

Du kan använda följande exempelfrågor på Log Analytics-arbetsytan för att övervaka driften av strömningsjobbet. Argumentet ago(1d) i varje fråga returnerar alla poster som genererades under den senaste dagen. Du kan justera den här parametern för att visa en annan tidsperiod.

Undantag som loggas under dataströmfrågeåtgärden

SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"

Ackumulering av felaktiga biljett- och kördata

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Jobbåtgärd över tid

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Resursorganisation och distributioner

  • Skapa separata resursgrupper för produktions-, utvecklings- och testmiljöer. Med separata resursgrupper blir det enklare att hantera distributioner, ta bort testdistributioner och tilldela åtkomsträttigheter.

  • Använd Azure Resource Manager-mallen för att distribuera Azure-resurserna enligt processen infrastruktur som kod. Med hjälp av mallar kan du automatisera distributioner med Azure DevOps-tjänster eller andra ci/CD-lösningar (kontinuerlig integrering och kontinuerlig leverans).

  • Placera varje arbetsbelastning i en separat distributionsmall och lagra resurserna i källkontrollsystemen. Du kan distribuera mallarna tillsammans eller individuellt som en del av en CI/CD-process. Den här metoden förenklar automatiseringsprocessen.

    I den här arkitekturen identifieras Event Hubs, Log Analytics och Azure Cosmos DB som en enda arbetsbelastning. Dessa resurser ingår i en enda Azure Resource Manager-mall.

  • Överväg att mellanlagring av dina arbetsbelastningar. Distribuera till olika faser och kör valideringskontroller i varje steg innan du går vidare till nästa steg. På så sätt kan du styra hur du push-överför uppdateringar till dina produktionsmiljöer och minimera oväntade distributionsproblem.

    I den här arkitekturen finns det flera distributionssteg. Överväg att skapa en Azure DevOps-pipeline och lägga till dessa steg. Du kan automatisera följande steg:

    • Starta ett Azure Databricks-kluster.
    • Konfigurera Azure Databricks CLI.
    • Installera Scala-verktyg.
    • Lägg till Azure Databricks-hemligheterna.

    Överväg att skriva automatiserade integreringstester för att förbättra kvaliteten och tillförlitligheten i Azure Databricks-koden och dess livscykel.

Gå vidare