Elaborazione di flussi con Azure Databricks

Azure Cosmos DB
Azure Databricks
Hub eventi di Azure
Azure Log Analytics
Monitoraggio di Azure

Questa architettura di riferimento illustra una pipeline di elaborazione di flussi end-to-end. Questo tipo di pipeline include quattro fasi: inserimento, processo, archiviazione, e analisi e creazione di report. Per questa architettura di riferimento, la pipeline inserisce i dati da due origini, esegue un join in record correlati da ogni flusso, arricchisce il risultato e calcola una media in tempo reale. I risultati vengono archiviati per analisi aggiuntive.

GitHub logo Un'implementazione di riferimento per questa architettura è disponibile in GitHub.

Architettura

Diagram showing a reference architecture for stream processing with Azure Databricks.

Scaricare un file di Visio di questa architettura.

Workflow

Questa architettura è costituita dai componenti seguenti:

Origini dati. In questa architettura sono presenti due origini dati che generano flussi di dati in tempo reale. Il primo flusso contiene le informazioni sulla corsa e il secondo contiene le informazioni sui costi delle corse. L'architettura di riferimento include un generatore di dati simulato che legge dati da un set di file statici ed esegue il push dei dati in Hub eventi. Le origini dati in un'applicazione reale corrisponderebbero a dispositivi installati nei taxi.

Hub eventi di Azure. Hub eventi è un servizio di inserimento di eventi. Questa architettura usa due istanze di Hub eventi, una per ogni origine dati. Ogni origine dati invia un flusso di dati all'istanza associata di Hub eventi.

Azure Databricks. Databricks è una piattaforma di analisi basata su Apache Spark ottimizzata per la piattaforma dei servizi cloud di Microsoft Azure. Databricks viene usata per la correlazione dei dati su corse e tariffe dei taxi, nonché per migliorare i dati correlati con i dati sul quartiere archiviati nel file System di Databricks.

Azure Cosmos DB. L'output di un processo di Azure Databricks è una serie di record scritti in Azure Cosmos DB per Apache Cassandra. Azure Cosmos DB per Apache Cassandra viene usato perché supporta la modellazione dei dati delle serie temporali.

Azure Log Analytics. I dati del log applicazioni raccolti da Monitoraggio di Azure vengono archiviati in un'area di lavoro Log Analytics. Le query di Log Analytics permettono di analizzare e visualizzare le metriche e ispezionare i messaggi di log allo scopo di identificare i problemi all'interno dell'applicazione.

Alternative

  • Collegamento a Synapse è la soluzione preferita da Microsoft per l'analisi sui dati di Azure Cosmos DB.

Dettagli dello scenario

Scenario: una società di taxi raccoglie dati su ogni corsa. Per questo scenario si presuppone che siano presenti due dispositivi diversi che inviano dati. Il taxi ha un contatore che invia informazioni su ogni corsa, ovvero la durata, la distanza e le posizioni di ritiro e consegna. Un dispositivo separato accetta i pagamenti dai clienti e invia dati sui prezzi delle corse. Per individuare le tendenze dell'utenza, la società di taxi vuole calcolare la mancia media per miglia guidate, in tempo reale, per ogni quartiere.

Potenziali casi d'uso

Questa soluzione è ottimizzata per il settore della vendita al dettaglio.

Inserimento dati

Per simulare un'origine dati, questa architettura di riferimento usa il set di dati New York City Taxi Data[1]. Questo set di dati contiene dati sulle corse in taxi a New York City in un periodo di quattro anni (2010 - 2013). Contiene due tipi di record: i dati relativi alle corse e i dati relativi ai costi delle corse. I dati sulle corse includono durata delle corse, distanza di viaggio e posizione di ritiro e consegna. I dati relativi ai costi della corsa includono gli importi relativi a costo di base, imposte e mancia. I campi comuni in entrambi i tipi di record includono il numero di taxi, il numero di licenza e l'ID del fornitore. Questi tre campi identificano in modo univoco un taxi e un tassista. I dati vengono archiviati in formato CSV.

[1] Donovan, Brian; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). University of Illinois at Urbana-Champaign. https://doi.org/10.13012/J8PN93H8

Il generatore di dati è un'applicazione .NET Core che legge i record e li invia a Hub eventi di Azure. Il generatore invia i dati relativi alle corse in formato JSON e i dati relativi ai costi in formato CSV.

Hub eventi usa partizioni per segmentare i dati. Le partizioni consentono a un consumer di leggere ogni partizione in parallelo. Quando si inviano dati a Hub eventi, è possibile specificare in modo esplicito la chiave di partizione. In caso contrario, i record vengono assegnati alle partizioni in modalità round-robin.

In questo scenario i dati relativi alle corse e i dati relativi ai costi devono avere lo stesso ID di partizione per un taxi specifico. Ciò consente a Databricks di applicare un certo livello di parallelismo durante la correlazione dei due flussi. Un record nella partizione n dei dati relativi alle corse corrisponderà a un record nella partizione n dei dati relativi ai costi.

Diagram of stream processing with Azure Databricks and Event Hubs.

Scaricare un file di Visio di questa architettura.

Nel generatore di dati il modello di dati comune per entrambi i tipi di record ha una proprietà PartitionKey che corrisponde alla concatenazione di Medallion, HackLicense e VendorId.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Questa proprietà viene usata per fornire una chiave di partizione esplicita durante l'invio a Hub eventi:

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Hub eventi di

La capacità di elaborazione di Hub eventi viene misurata in unità elaborate. È possibile ridimensionare automaticamente un hub eventi abilitando l'aumento automatico, che ridimensiona automaticamente le unità elaborate in base al traffico, fino a un limite massimo configurato.

Elaborazione dei flussi

In Azure Databricks, viene eseguita l'elaborazione dei dati da un processo. Il processo viene assegnato a e viene eseguito in un cluster. Il processo può essere codice personalizzato scritto in Java o un notebook Spark.

In questa architettura di riferimento, il processo è un file di archivio Java con classi scritte in Java e Scala. Quando si specifica il file di archivio Java per un processo di Databricks, la classe viene specificata per l'esecuzione da parte del cluster Databricks. In questo caso, il principale metodo della classe com.microsoft.pnp.TaxiCabReader contiene la logica di elaborazione dati.

Lettura del flusso dalle due istanze dell'hub eventi

La logica di elaborazione dei dati usa lo streaming strutturato Spark per leggere dalle due istanze dell'hub eventi di Azure:

val rideEventHubOptions = EventHubsConf(rideEventHubConnectionString)
      .setConsumerGroup(conf.taxiRideConsumerGroup())
      .setStartingPosition(EventPosition.fromStartOfStream)
    val rideEvents = spark.readStream
      .format("eventhubs")
      .options(rideEventHubOptions.toMap)
      .load

    val fareEventHubOptions = EventHubsConf(fareEventHubConnectionString)
      .setConsumerGroup(conf.taxiFareConsumerGroup())
      .setStartingPosition(EventPosition.fromStartOfStream)
    val fareEvents = spark.readStream
      .format("eventhubs")
      .options(fareEventHubOptions.toMap)
      .load

Arricchimento dei dati con le informazioni sul quartiere

I dati sulla corsa includono le coordinate di latitudine e longitudine dei punti di partenza e arrivo. Benché siano utili, queste coordinate non sono facilmente analizzabili. Di conseguenza, questi dati vengono arricchiti con i dati sul quartiere, letti da un file di forma.

Il formato di file di forma è binario e non facilmente analizzato, ma la libreria GeoTools fornisce strumenti per i dati geospaziali che usano il formato di file di forma. Questa libreria viene usata nella classe com.microsoft.pnp.GeoFinder per determinare il nome del quartiere in base alle coordinate di partenza e arrivo.

val neighborhoodFinder = (lon: Double, lat: Double) => {
      NeighborhoodFinder.getNeighborhood(lon, lat).get()
    }

Unione di dati su corse e tariffe

Prima di tutto i dati su corse e tariffe vengono trasformati:

    val rides = transformedRides
      .filter(r => {
        if (r.isNullAt(r.fieldIndex("errorMessage"))) {
          true
        }
        else {
          malformedRides.add(1)
          false
        }
      })
      .select(
        $"ride.*",
        to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
          .as("pickupNeighborhood"),
        to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
          .as("dropoffNeighborhood")
      )
      .withWatermark("pickupTime", conf.taxiRideWatermarkInterval())

    val fares = transformedFares
      .filter(r => {
        if (r.isNullAt(r.fieldIndex("errorMessage"))) {
          true
        }
        else {
          malformedFares.add(1)
          false
        }
      })
      .select(
        $"fare.*",
        $"pickupTime"
      )
      .withWatermark("pickupTime", conf.taxiFareWatermarkInterval())

Quindi, i dati sulla corsa vengono aggiunti ai dati sulle tariffe:

val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))

Elaborazione dei dati e inserimento in Azure Cosmos DB

L'importo tariffario medio per ogni quartiere viene calcolato per un determinato intervallo di tempo:

val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
      .groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
      .agg(
        count("*").as("rideCount"),
        sum($"fareAmount").as("totalFareAmount"),
        sum($"tipAmount").as("totalTipAmount"),
        (sum($"fareAmount")/count("*")).as("averageFareAmount"),
        (sum($"tipAmount")/count("*")).as("averageTipAmount")
      )
      .select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")

Che viene quindi inserito in Azure Cosmos DB:

maxAvgFarePerNeighborhood
      .writeStream
      .queryName("maxAvgFarePerNeighborhood_cassandra_insert")
      .outputMode(OutputMode.Append())
      .foreach(new CassandraSinkForeach(connector))
      .start()
      .awaitTermination()

Considerazioni

Queste considerazioni implementano i pilastri di Azure Well-Architected Framework, che è un set di set di principi guida che possono essere usati per migliorare la qualità di un carico di lavoro. Per altre informazioni, vedere Framework ben progettato di Microsoft Azure.

Sicurezza

La sicurezza offre garanzie contro attacchi intenzionali e l'abuso di dati e sistemi preziosi. Per altre informazioni, vedere Panoramica del pilastro della sicurezza.

L'accesso all'area di lavoro di Azure Databricks viene controllato tramite la console di amministrazione. La console di amministrazione include funzionalità per aggiungere utenti, gestire le autorizzazioni utente e impostare il single sign-on. La console permette anche di impostare il controllo di accesso ad aree di lavoro, cluster, processi e tabelle.

Gestione dei segreti

Azure Databricks include un archivio segreto che viene usato per archiviare i segreti, tra cui le stringhe di connessione, le chiavi di accesso, i nomi utente e le password. I segreti all'interno dell'archivio segreto di Azure Databricks vengono partizionati per ambiti:

databricks secrets create-scope --scope "azure-databricks-job"

I segreti vengono aggiunti a livello ambito:

databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

Nota

È possibile usare un ambito di cui è stato eseguito il backup in Azure Key Vault invece dell'ambito nativo di Azure Databricks. Per altre informazioni, vedere Azure Key Vault-backed scopes ( Ambiti di cui è stato eseguito il backup in Azure Key Vault).

Nel codice, i segreti sono accessibili grazie alle utilità dei segreti di Azure Databricks.

Monitoraggio

Azure Databricks si basa su Apache Spark, e entrambi usano log4j come libreria standard per la registrazione. Oltre alla registrazione predefinita fornita da Apache Spark, è possibile implementare la registrazione in Azure Log Analytics seguendo l'articolo Monitoraggio di Azure Databricks.

Dal momento che la classe com.microsoft.pnp.TaxiCabReader elabora i messaggi relativi a corse e tariffe, è possibile che il formato di uno dei due non sia valido. In un ambiente di produzione, è importante analizzare questi messaggi in formato non valido per identificare un problema con le origini dati in modo da risolverlo rapidamente per evitare la perdita di dati. La classe com.microsoft.pnp.TaxiCabReader registra un accumulatore Apache Spark che tiene traccia del numero di record su tariffe e corse in formato non valido:

    @transient val appMetrics = new AppMetrics(spark.sparkContext)
    appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
    appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
    SparkEnv.get.metricsSystem.registerSource(appMetrics)

Apache Spark usa la libreria Dropwizard per inviare metriche e alcuni dei campi metrici nativi di Dropwizard non sono compatibili con Azure Log Analytics. Di conseguenza, questa architettura di riferimento include un sink e un reporter di Dropwizard personalizzati. Formatta le metriche nel formato previsto da Azure Log Analytics. Quando Apache Spark riporta le metriche, vengono inviate anche le metriche personalizzate per i dati di corse e tariffe in formato non valido.

Di seguito sono riportate query di esempio che è possibile usare nell'area di lavoro Log Analytics di Azure per monitorare l'esecuzione del processo di streaming. L'argomento ago(1d) in ogni query restituirà tutti i record generati nell'ultimo giorno e può essere modificato per visualizzare un periodo di tempo diverso.

Eccezioni registrate durante l'esecuzione di query di flusso

SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"

Accumulo di dati su tariffe e corse in formato non valido

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Esecuzione del processo nel tempo

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Per ulteriori informazioni, vedere Monitoraggio di Azure Databricks.

DevOps

  • Creare gruppi di risorse separati per gli ambienti di produzione, sviluppo e test. L'uso di gruppi di risorse separati semplifica la gestione delle distribuzioni, l'eliminazione delle distribuzioni di test e l'assegnazione dei diritti di accesso.

  • Usare il modello di Azure Resource Manager per distribuire le risorse di Azure che seguono l'infrastruttura come processo di codice (IaC). Con i modelli, l'automazione delle distribuzioni con Azure DevOps Services o altre soluzioni CI/CD è più semplice.

  • Inserire ogni carico di lavoro in un modello di distribuzione separato e archiviare le risorse nei sistemi di controllo del codice sorgente. È possibile distribuire i modelli insieme o singolarmente come parte di un processo CI/CD, semplificando il processo di automazione.

    In questa architettura, Hub eventi di Azure, Log Analytics e Azure Cosmos DB vengono identificati come un singolo carico di lavoro. Queste risorse sono incluse in un singolo modello di Resource Manager.

  • Prendere in considerazione la gestione temporanea dei carichi di lavoro. Eseguire la distribuzione in varie fasi ed eseguire controlli di convalida in ogni fase prima di passare alla fase successiva. In questo modo è possibile eseguire il push degli aggiornamenti negli ambienti di produzione in modo altamente controllato e ridurre al minimo i problemi di distribuzione imprevisti.

    In questa architettura sono presenti più fasi di distribuzione. Prendere in considerazione la creazione di una pipeline di Azure DevOps e l'aggiunta di tali fasi. Ecco alcuni esempi di fasi che è possibile automatizzare:

    • Avviare un cluster Databricks
    • Configurare l'interfaccia della riga di comando di Databricks
    • Installare Gli strumenti Scala
    • Aggiungere i segreti di Databricks

    Valutare anche la possibilità di scrivere test di integrazione automatizzati per migliorare la qualità e l'affidabilità del codice databricks e del ciclo di vita.

  • Prendere in considerazione l'uso di Monitoraggio di Azure per analizzare le prestazioni della pipeline di elaborazione del flusso. Per ulteriori informazioni, vedere Monitoraggio di Azure Databricks.

Per altre informazioni, vedere la sezione DevOps in Microsoft Azure Well-Architected Framework.

Ottimizzazione dei costi

L'ottimizzazione dei costi riguarda l'analisi dei modi per ridurre le spese non necessarie e migliorare l'efficienza operativa. Per altre informazioni, vedere Panoramica del pilastro di ottimizzazione dei costi.

Usare il calcolatore dei prezzi di Azure per stimare i costi. Ecco alcune considerazioni per i servizi usati in questa architettura di riferimento.

Hub eventi di

Questa architettura di riferimento distribuisce Hub eventi nel livello Standard . Il modello di determinazione dei prezzi si basa su unità elaborate, eventi in ingresso ed eventi di acquisizione. Un evento in ingresso è un'unità di dati da 64 KB o di dimensioni inferiori. I messaggi di dimensioni maggiori sono fatturati in multipli di 64 KB. È possibile specificare le unità elaborate tramite le API di gestione di hub eventi o di portale di Azure.

Se sono necessari più giorni di conservazione, prendere in considerazione il livello Dedicato . Questo livello offre distribuzioni a tenant singolo con requisiti più impegnativi. Questa offerta crea un cluster basato su unità di capacità (CU) non vincolate dalle unità elaborate.

Il livello Standard viene fatturato anche in base agli eventi in ingresso e alle unità elaborate.

Per informazioni sui prezzi di Hub eventi, vedere i prezzi di Hub eventi.

Azure Databricks

Azure Databricks offre due livelli Standard e Premium ognuno supporta tre carichi di lavoro. Questa architettura di riferimento distribuisce l'area di lavoro di Azure Databricks nel livello Premium .

Ingegneria dei dati e Ingegneria dei dati carichi di lavoro Light sono destinati ai data engineer per compilare ed eseguire processi. Il carico di lavoro Analisi dei dati è destinato ai data scientist per esplorare, visualizzare, modificare e condividere dati e informazioni dettagliate in modo interattivo.

Azure Databricks offre molti modelli di prezzi.

  • Piano con pagamento in base al consumo

    Viene addebitato il provisioning delle macchine virtuali (VM) nei cluster e nelle unità di databricks (DBU) in base all'istanza di macchina virtuale selezionata. Un'unità Databricks è un'unità di capacità del processo, fatturata in base all'utilizzo al secondo. Il consumo di unità Databricks dipende dalle dimensioni e dal tipo di istanza che esegue Azure Databricks. I prezzi dipendono dal carico di lavoro e dal livello selezionati.

  • Piano di pre-acquisto

    Si esegue il commit in unità DBU (Azure Databricks) come unità di commit di Databricks (DBCU) per uno o tre anni. Rispetto al modello con pagamento in base al consumo, è possibile risparmiare fino al 37%.

Per altre informazioni, vedere Prezzi di Azure Databricks.

Azure Cosmos DB

In questa architettura, una serie di record viene scritta in Azure Cosmos DB dal processo di Azure Databricks. Viene addebitata la capacità che si riserva, espressa in Unità richiesta al secondo (UR/sec), usata per eseguire operazioni di inserimento. L'unità per la fatturazione è di 100 UR/sec all'ora. Ad esempio, il costo di scrittura di elementi da 100 KB è di 50 UR/sec.

Per le operazioni di scrittura, effettuare il provisioning di capacità sufficiente per supportare il numero di scritture necessarie al secondo. È possibile aumentare la velocità effettiva con provisioning usando il portale o l'interfaccia della riga di comando di Azure prima di eseguire operazioni di scrittura e quindi ridurre la velocità effettiva al termine di tali operazioni. La velocità effettiva per il periodo di scrittura è la velocità effettiva minima necessaria per i dati specificati e la velocità effettiva necessaria per l'operazione di inserimento presupponendo che non sia in esecuzione alcun altro carico di lavoro.

Analisi dei costi di esempio

Si supponga di configurare un valore di velocità effettiva di 1.000 UR/sec in un contenitore. Viene distribuito per 24 ore per 30 giorni, un totale di 720 ore.

Il contenitore viene fatturato a 10 unità di 100 UR/sec all'ora per ogni ora. 10 unità a $ 0,008 (per 100 UR/sec all'ora) vengono addebitati $ 0,08 all'ora.

Per 720 ore o 7.200 unità (di 100 UR), vengono fatturati $ 57,60 per il mese.

Archiviazione vengono fatturati anche per ogni GB usato per i dati e l'indice archiviati. Per altre informazioni, vedere Modello di prezzi di Azure Cosmos DB.

Usare il calcolatore della capacità di Azure Cosmos DB per ottenere una stima rapida del costo del carico di lavoro.

Per altre informazioni, vedere la sezione sui costi in Microsoft Azure Well-Architected Framework.

Distribuire lo scenario

Per distribuire ed eseguire l'implementazione di riferimento, seguire la procedura illustrata nel file README in GitHub.

Passaggi successivi