Gegevensstromen verwerken met Azure Databricks

Azure Cosmos DB
Azure Databricks
Azure Event Hubs
Azure Log Analytics
Azure Monitor

Deze referentiearchitectuur toont een end-to-end stroomverwerkingspijplijn. Dit type pijplijn heeft vier fasen: opnemen, verwerken, opslaan en analyseren en rapporteren. Voor deze referentiearchitectuur worden gegevens uit twee bronnen opgenomen in de pijplijn, wordt een join uitgevoerd op gerelateerde records uit elke stream, wordt het resultaat verrijkt en wordt een gemiddelde in realtime berekend. De resultaten worden opgeslagen voor verdere analyse.

GitHub logo Een referentie-implementatie voor deze architectuur is beschikbaar op GitHub.

Architectuur

Diagram showing a reference architecture for stream processing with Azure Databricks.

Download een Visio-bestand van deze architectuur.

Werkstroom

De architectuur bestaat uit de volgende onderdelen:

Gegevensbronnen. In deze architectuur zijn er twee gegevensbronnen die in realtime gegevensstromen genereren. De eerste stream bevat ritgegevens en de tweede bevat tariefinformatie. De referentiearchitectuur bevat een gesimuleerde gegevensgenerator die uit een set statische bestanden leest en de gegevens naar Event Hubs pusht. De gegevensbronnen in een echte toepassing zijn apparaten die in de taxi's zijn geïnstalleerd.

Azure Event Hubs. Event Hubs is een gebeurtenisopnameservice. Deze architectuur maakt gebruik van twee Event Hub-exemplaren, één voor elke gegevensbron. Elke gegevensbron verzendt een gegevensstroom naar de bijbehorende Event Hub.

Azure Databricks. Databricks is een op Apache Spark gebaseerd analyseplatform dat is geoptimaliseerd voor het Microsoft Azure-cloudservicesplatform. Databricks wordt gebruikt om de taxirit- en ritgegevens te correleren en om de gecorreleerde gegevens te verrijken met buurtgegevens die zijn opgeslagen in het Databricks-bestandssysteem.

Azure Cosmos DB. De uitvoer van een Azure Databricks-taak is een reeks records, die worden geschreven naar Azure Cosmos DB voor Apache Cassandra. Azure Cosmos DB voor Apache Cassandra wordt gebruikt omdat het modellering van tijdreeksgegevens ondersteunt.

  • Met Azure Synapse Link voor Azure Cosmos DB kunt u bijna realtime analyses uitvoeren op operationele gegevens in Azure Cosmos DB, zonder dat dit gevolgen heeft voor de prestaties of kosten van uw transactionele workload, met behulp van de twee analyse-engines die beschikbaar zijn in uw Azure Synapse-werkruimte: SQL Serverloze en Spark-pools.

Azure Log Analytics. Toepassingslogboekgegevens die door Azure Monitor worden verzameld, worden opgeslagen in een Log Analytics-werkruimte. Log Analytics-query's kunnen worden gebruikt om metrische gegevens te analyseren en te visualiseren en logboekberichten te inspecteren om problemen in de toepassing te identificeren.

Alternatieven

  • Synapse Link is de voorkeursoplossing van Microsoft voor analyses boven op Azure Cosmos DB-gegevens.

Scenariodetails

Scenario: Een taxibedrijf verzamelt gegevens over elke taxirit. Voor dit scenario wordt ervan uitgegaan dat er twee afzonderlijke apparaten zijn die gegevens verzenden. De taxi heeft een meter die informatie over elke rit verzendt: de duur, afstand en ophaal- en afleverlocaties. Een afzonderlijk apparaat accepteert betalingen van klanten en verzendt gegevens over tarieven. Om trends in de ruiterij te ontdekken, wil het taxibedrijf de gemiddelde tip per mijl rijden, in realtime, berekenen voor elke buurt.

Potentiële gebruikscases

Deze oplossing is geoptimaliseerd voor de detailhandel.

Gegevensopname

Voor het simuleren van een gegevensbron maakt deze referentiearchitectuur gebruik van de gegevensset New York City Taxi Data[1]. Deze gegevensset bevat gegevens over taxiritten in New York City gedurende een periode van vier jaar (2010 – 2013). Het bevat twee typen records: Ritgegevens en ritgegevens. Ritgegevens omvatten reisduur, reisafstand en ophaal- en afleverlocatie. Tariefgegevens omvatten tarief-, belasting- en fooibedragen. Algemene velden in beide recordtypen zijn het nummer van de medaille- en hacklicentie en de leverancier-id. Samen identificeren deze drie velden een taxi plus een chauffeur. De gegevens worden opgeslagen in CSV-indeling.

[1] Donovan, Brian; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). Universiteit van Illinois bij Urbana-Champaign. https://doi.org/10.13012/J8PN93H8

De gegevensgenerator is een .NET Core-toepassing waarmee de records worden gelezen en naar Azure Event Hubs worden verzonden. De generator verzendt ritgegevens in JSON-indeling en ritgegevens in CSV-indeling.

Event Hubs maakt gebruik van partities om de gegevens te segmenteren. Met partities kan een consument elke partitie parallel lezen. Wanneer u gegevens naar Event Hubs verzendt, kunt u de partitiesleutel expliciet opgeven. Anders worden records op round robin-wijze toegewezen aan partities.

In dit scenario moeten ritgegevens en ritgegevens uiteindelijk dezelfde partitie-id voor een bepaalde taxicabine hebben. Hierdoor kan Databricks een mate van parallelle uitvoering toepassen wanneer deze de twee streams correleert. Een record in partitie n van de ritgegevens komt overeen met een record in partitie n van de ritgegevens.

Diagram of stream processing with Azure Databricks and Event Hubs.

Download een Visio-bestand van deze architectuur.

In de gegevensgenerator heeft het algemene gegevensmodel voor beide recordtypen een PartitionKey eigenschap die de samenvoeging is van Medallion, HackLicenseen VendorId.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Deze eigenschap wordt gebruikt om een expliciete partitiesleutel op te geven bij het verzenden naar Event Hubs:

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Event Hubs

De doorvoercapaciteit van Event Hubs wordt gemeten in doorvoereenheden. U kunt een Event Hub automatisch schalen door automatisch vergroten in te schakelen , waardoor de doorvoereenheden automatisch worden geschaald op basis van verkeer, tot een geconfigureerd maximum.

Stroomverwerking

In Azure Databricks wordt gegevensverwerking uitgevoerd door een taak. De taak wordt toegewezen aan en wordt uitgevoerd op een cluster. De taak kan aangepaste code zijn die is geschreven in Java of een Spark-notebook.

In deze referentiearchitectuur is de taak een Java-archief met klassen die zijn geschreven in zowel Java als Scala. Wanneer u het Java-archief voor een Databricks-taak opgeeft, wordt de klasse opgegeven voor uitvoering door het Databricks-cluster. Hier bevat de belangrijkste methode van de klasse com.microsoft.pnp.TaxiCabReader de logica voor gegevensverwerking.

De stream lezen vanuit de twee Event Hub-exemplaren

De logica voor gegevensverwerking maakt gebruik van gestructureerd streamen van Spark om te lezen uit de twee Azure Event Hub-exemplaren:

val rideEventHubOptions = EventHubsConf(rideEventHubConnectionString)
      .setConsumerGroup(conf.taxiRideConsumerGroup())
      .setStartingPosition(EventPosition.fromStartOfStream)
    val rideEvents = spark.readStream
      .format("eventhubs")
      .options(rideEventHubOptions.toMap)
      .load

    val fareEventHubOptions = EventHubsConf(fareEventHubConnectionString)
      .setConsumerGroup(conf.taxiFareConsumerGroup())
      .setStartingPosition(EventPosition.fromStartOfStream)
    val fareEvents = spark.readStream
      .format("eventhubs")
      .options(fareEventHubOptions.toMap)
      .load

De gegevens verrijken met de buurtinformatie

De ritgegevens omvatten de breedte- en lengtegraadcoördinaten van de ophaal- en afgiftelocaties. Hoewel deze coördinaten nuttig zijn, worden ze niet eenvoudig gebruikt voor analyse. Daarom worden deze gegevens verrijkt met buurtgegevens die worden gelezen uit een shapebestand.

De indeling shapefile is binair en kan niet eenvoudig worden geparseerd, maar de GeoTools-bibliotheek biedt hulpprogramma's voor georuimtelijke gegevens die de indeling van het shapebestand gebruiken. Deze bibliotheek wordt gebruikt in de klasse com.microsoft.pnp.GeoFinder om de naam van de buurt te bepalen op basis van de coördinaten voor ophalen en afzetten.

val neighborhoodFinder = (lon: Double, lat: Double) => {
      NeighborhoodFinder.getNeighborhood(lon, lat).get()
    }

Deelnemen aan de rit- en ritgegevens

Eerst worden de rit- en ritgegevens getransformeerd:

    val rides = transformedRides
      .filter(r => {
        if (r.isNullAt(r.fieldIndex("errorMessage"))) {
          true
        }
        else {
          malformedRides.add(1)
          false
        }
      })
      .select(
        $"ride.*",
        to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
          .as("pickupNeighborhood"),
        to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
          .as("dropoffNeighborhood")
      )
      .withWatermark("pickupTime", conf.taxiRideWatermarkInterval())

    val fares = transformedFares
      .filter(r => {
        if (r.isNullAt(r.fieldIndex("errorMessage"))) {
          true
        }
        else {
          malformedFares.add(1)
          false
        }
      })
      .select(
        $"fare.*",
        $"pickupTime"
      )
      .withWatermark("pickupTime", conf.taxiFareWatermarkInterval())

Vervolgens worden de ritgegevens samengevoegd met de ritgegevens:

val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))

De gegevens verwerken en invoegen in Azure Cosmos DB

Het gemiddelde tariefbedrag voor elke buurt wordt berekend voor een bepaald tijdsinterval:

val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
      .groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
      .agg(
        count("*").as("rideCount"),
        sum($"fareAmount").as("totalFareAmount"),
        sum($"tipAmount").as("totalTipAmount"),
        (sum($"fareAmount")/count("*")).as("averageFareAmount"),
        (sum($"tipAmount")/count("*")).as("averageTipAmount")
      )
      .select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")

Deze wordt vervolgens ingevoegd in Azure Cosmos DB:

maxAvgFarePerNeighborhood
      .writeStream
      .queryName("maxAvgFarePerNeighborhood_cassandra_insert")
      .outputMode(OutputMode.Append())
      .foreach(new CassandraSinkForeach(connector))
      .start()
      .awaitTermination()

Overwegingen

Met deze overwegingen worden de pijlers van het Azure Well-Architected Framework geïmplementeerd. Dit is een set richtlijnen die kunnen worden gebruikt om de kwaliteit van een workload te verbeteren. Zie Microsoft Azure Well-Architected Framework voor meer informatie.

Beveiliging

Beveiliging biedt garanties tegen opzettelijke aanvallen en misbruik van uw waardevolle gegevens en systemen. Zie Overzicht van de beveiligingspijler voor meer informatie.

Toegang tot de Azure Databricks-werkruimte wordt beheerd met behulp van de beheerconsole. De beheerconsole bevat functionaliteit voor het toevoegen van gebruikers, het beheren van gebruikersmachtigingen en het instellen van eenmalige aanmelding. Toegangsbeheer voor werkruimten, clusters, taken en tabellen kan ook worden ingesteld via de beheerconsole.

Geheimen beheren

Azure Databricks bevat een geheimarchief dat wordt gebruikt voor het opslaan van geheimen, waaronder verbindingsreeks s, toegangssleutels, gebruikersnamen en wachtwoorden. Geheimen in het Azure Databricks-geheimarchief worden gepartitioneerd op basis van bereiken:

databricks secrets create-scope --scope "azure-databricks-job"

Geheimen worden toegevoegd op bereikniveau:

databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

Notitie

Een door Azure Key Vault ondersteund bereik kan worden gebruikt in plaats van het systeemeigen Azure Databricks-bereik. Zie Azure Key Vault-bereiken met ondersteuning voor meer informatie.

In code worden geheimen geopend via de azure Databricks-hulpprogramma's voor geheimen.

Controleren

Azure Databricks is gebaseerd op Apache Spark en gebruiken log4j als de standaardbibliotheek voor logboekregistratie. Naast de standaardlogboekregistratie van Apache Spark kunt u logboekregistratie implementeren in Azure Log Analytics volgens het artikel Azure Databricks bewaken.

Aangezien de klasse com.microsoft.pnp.TaxiCabReader rit- en ritberichten verwerkt, is het mogelijk dat een van beide ongeldige en daarom ongeldige berichten is. In een productieomgeving is het belangrijk om deze onjuiste berichten te analyseren om een probleem met de gegevensbronnen te identificeren, zodat deze snel kunnen worden opgelost om gegevensverlies te voorkomen. De klasse com.microsoft.pnp.TaxiCabReader registreert een Apache Spark Accumulator die het aantal onjuiste ritten en ritrecords bijhoudt:

    @transient val appMetrics = new AppMetrics(spark.sparkContext)
    appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
    appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
    SparkEnv.get.metricsSystem.registerSource(appMetrics)

Apache Spark maakt gebruik van de bibliotheek Dropwizard voor het verzenden van metrische gegevens en sommige systeemeigen velden voor metrische gegevens van Dropwizard zijn niet compatibel met Azure Log Analytics. Daarom bevat deze referentiearchitectuur een aangepaste Dropwizard-sink en reporter. Hiermee worden de metrische gegevens opgemaakt in de indeling die wordt verwacht door Azure Log Analytics. Wanneer Apache Spark metrische gegevens rapporteert, worden de aangepaste metrische gegevens voor de onjuiste rit en ritgegevens ook verzonden.

Hier volgen voorbeeldquery's die u in uw Azure Log Analytics-werkruimte kunt gebruiken om de uitvoering van de streamingtaak te controleren. Het argument ago(1d) in elke query retourneert alle records die zijn gegenereerd op de laatste dag en kan worden aangepast om een andere periode weer te geven.

Uitzonderingen die zijn vastgelegd tijdens het uitvoeren van streamquery's

SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"

Accumulatie van onjuiste ritten en ritgegevens

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Taakuitvoering in de loop van de tijd

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Zie Bewaking van Azure Databricks voor meer informatie.

DevOps

  • Maak afzonderlijke resourcegroepen voor productie-, ontwikkelings- en testomgevingen. Met afzonderlijke resourcegroepen kunt u eenvoudiger implementaties beheren, testimplementaties verwijderen en toegangsrechten verlenen.

  • Gebruik een Azure Resource Manager-sjabloon om de Azure-resources te implementeren die volgen op het proces infrastructuur als code (IaC). Met sjablonen is het automatiseren van implementaties met Behulp van Azure DevOps Services of andere CI/CD-oplossingen eenvoudiger.

  • Plaats elke workload in een afzonderlijke implementatiesjabloon en sla de resources op in broncodebeheersystemen. U kunt de sjablonen samen of afzonderlijk implementeren als onderdeel van een CI/CD-proces, waardoor het automatiseringsproces eenvoudiger wordt.

    In deze architectuur worden Azure Event Hubs, Log Analytics en Azure Cosmos DB geïdentificeerd als één workload. Deze resources zijn opgenomen in één ARM-sjabloon.

  • Overweeg om uw workloads te faseren. Implementeer in verschillende fasen en voer validatiecontroles uit in elke fase voordat u naar de volgende fase gaat. Op die manier kunt u op een zeer gecontroleerde manier updates naar uw productieomgevingen pushen en onverwachte implementatieproblemen minimaliseren.

    In deze architectuur zijn er meerdere implementatiefasen. Overweeg om een Azure DevOps-pijplijn te maken en deze fasen toe te voegen. Hier volgen enkele voorbeelden van fasen die u kunt automatiseren:

    • Een Databricks-cluster starten
    • Databricks CLI configureren
    • Scala-hulpprogramma's installeren
    • De Databricks-geheimen toevoegen

    Overweeg ook geautomatiseerde integratietests te schrijven om de kwaliteit en betrouwbaarheid van de Databricks-code en de levenscyclus ervan te verbeteren.

  • Overweeg om Azure Monitor te gebruiken om de prestaties van uw stroomverwerkingspijplijn te analyseren. Zie Bewaking van Azure Databricks voor meer informatie.

Zie de sectie DevOps in Microsoft Azure Well-Architected Framework voor meer informatie.

Kostenoptimalisatie

Kostenoptimalisatie gaat over manieren om onnodige uitgaven te verminderen en operationele efficiëntie te verbeteren. Zie Overzicht van de pijler kostenoptimalisatie voor meer informatie.

Gebruik de Azure-prijscalculator om een schatting van de kosten te maken. Hier volgen enkele overwegingen voor services die in deze referentiearchitectuur worden gebruikt.

Event Hubs

Met deze referentiearchitectuur worden Event Hubs geïmplementeerd in de Standard-laag . Het prijsmodel is gebaseerd op doorvoereenheden, inkomend verkeer en vastleggen van gebeurtenissen. Een ingangsgebeurtenis is een eenheid gegevens van 64 kB of minder. Grotere berichten worden gefactureerd in meervouden van 64 kB. U geeft doorvoereenheden op via Azure Portal of Event Hubs-beheer-API's.

Als u meer retentiedagen nodig hebt, kunt u de dedicated-laag overwegen. Deze laag biedt implementaties met één tenant met de meest veeleisende vereisten. Met deze aanbieding wordt een cluster gebouwd op basis van capaciteitseenheden (CU) die niet afhankelijk is van doorvoereenheden.

De Standard-laag wordt ook gefactureerd op basis van ingangsgebeurtenissen en doorvoereenheden.

Zie de prijzen van Event Hubs voor meer informatie over prijzen voor Event Hubs.

Azure Databricks

Azure Databricks biedt twee lagen Standard en Premium die elk drie workloads ondersteunen. Met deze referentiearchitectuur wordt de Azure Databricks-werkruimte geïmplementeerd in de Premium-laag .

Data-engineer ing en Data-engineer ing Light-workloads zijn bedoeld voor data engineers om taken te bouwen en uit te voeren. De Data Analytics-workload is bedoeld voor gegevenswetenschappers om gegevens en inzichten interactief te verkennen, visualiseren, bewerken en delen.

Azure Databricks biedt veel prijsmodellen.

  • Abonnement met betalen naar gebruik

    U wordt gefactureerd voor virtuele machines (VM's) die zijn ingericht in clusters en Databricks Units (DBU's) op basis van het geselecteerde VM-exemplaar. Een DBU is een verwerkingseenheid die wordt gefactureerd op basis van een gebruik per seconde. Het DBU-verbruik is afhankelijk van de grootte en het type exemplaar waarop Azure Databricks wordt uitgevoerd. Prijzen zijn afhankelijk van de geselecteerde workload en laag.

  • Abonnement vooraf aanschaffen

    U gaat azure Databricks Units (DBU) doorvoeren als Databricks Commit Units (DBCU) voor een of drie jaar. In vergelijking met het model voor betalen per gebruik kunt u tot 37% besparen.

Zie Prijzen voor Azure Databricks voor meer informatie.

Azure Cosmos DB

In deze architectuur wordt een reeks records naar Azure Cosmos DB geschreven door de Azure Databricks-taak. Er worden kosten in rekening gebracht voor de capaciteit die u reserveert, uitgedrukt in aanvraageenheden per seconde (RU/s), die worden gebruikt om invoegbewerkingen uit te voeren. De eenheid voor facturering is 100 RU/sec per uur. De kosten voor het schrijven van items van 100 kB zijn bijvoorbeeld 50 RU/s.

Voor schrijfbewerkingen moet u voldoende capaciteit inrichten ter ondersteuning van het aantal schrijfbewerkingen dat per seconde nodig is. U kunt de ingerichte doorvoer verhogen met behulp van de portal of Azure CLI voordat u schrijfbewerkingen uitvoert en vervolgens de doorvoer verminderen nadat deze bewerkingen zijn voltooid. Uw doorvoer voor de schrijfperiode is de minimale doorvoer die nodig is voor de opgegeven gegevens plus de doorvoer die is vereist voor de invoegbewerking, ervan uitgaande dat er geen andere workload wordt uitgevoerd.

Voorbeeld van kostenanalyse

Stel dat u een doorvoerwaarde van 1000 RU/sec voor een container configureert. Het wordt gedurende 24 uur gedurende 30 dagen geïmplementeerd, in totaal 720 uur.

De container wordt gefactureerd op 10 eenheden van 100 RU/sec per uur voor elk uur. 10 eenheden voor $ 0,008 (per 100 RU per seconde per uur) worden $ 0,08 per uur in rekening gebracht.

Voor 720 uur of 7.200 eenheden (van 100 RU's) wordt u $ 57,60 gefactureerd voor de maand.

Opslag wordt ook gefactureerd voor elke GB die wordt gebruikt voor uw opgeslagen gegevens en index. Zie het prijsmodel van Azure Cosmos DB voor meer informatie.

Gebruik de Azure Cosmos DB-capaciteitscalculator om een snelle schatting te krijgen van de workloadkosten.

Raadpleeg de kostensectie in Microsoft Azure Well-Architected Framework voor meer informatie.

Dit scenario implementeren

Volg de stappen in de GitHub-leesmij om de referentie-implementatie te implementeren en uit te voeren.

Volgende stappen