Hub eventi di Azure

Hub eventi di Azure è un servizio di inserimento di dati di telemetria su vastissima scala che raccoglie, trasforma e archivia milioni di eventi. In quanto piattaforma di streaming distribuita, offre bassa latenza e tempo di conservazione configurabile, permettendo all'utente di inserire quantità molto elevate di dati di telemetria nel cloud e leggere i dati da più applicazioni usando una semantica di pubblicazione-sottoscrizione.

Questo articolo illustra come usare Structured Streaming con cluster Hub eventi di Azure e Azure Databricks.

Nota

Hub eventi di Azure fornisce un endpoint compatibile con Apache Kafka che è possibile usare con Connettore Kafka structured Streaming, disponibile in Databricks Runtime, per elaborare i messaggi da Hub eventi di Azure. Databricks consiglia di usare il connettore Kafka Structured Streaming per elaborare i messaggi da Hub eventi di Azure.

Requisiti

Per il supporto della versione corrente, vedere "Versioni più recenti" nel file leggimi del progetto spark Connessione or Hub eventi di Azure.

  1. Creare una libreria nell'area di lavoro di Azure Databricks usando la coordinata com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17Maven .

    Nota

    Questo connettore viene aggiornato regolarmente e potrebbe essere disponibile una versione più recente: è consigliabile eseguire il pull del connettore più recente dal repository Maven

  2. Installare la libreria creata nel cluster.

Schema

Lo schema dei record è:

Column Type
body binary
partition string
offset string
sequenceNumber long
enqueuedTime timestamp
publisher string
partitionKey string
properties map[string,json]

Viene body sempre fornito come matrice di byte. Utilizzare cast("string") per deserializzare in modo esplicito la body colonna.

Guida introduttiva

Si inizierà con un esempio rapido: WordCount. Il notebook seguente è tutto ciò che serve per eseguire WordCount usando Structured Streaming con Hub eventi di Azure.

Hub eventi di Azure WordCount con il notebook Structured Streaming

Ottenere il notebook

Impostazione

Questa sezione illustra le impostazioni di configurazione necessarie per lavorare con Hub eventi.

Per indicazioni dettagliate sulla configurazione di Structured Streaming con Hub eventi di Azure, vedere Structured Streaming and Hub eventi di Azure Integration Guide sviluppato da Microsoft.

Per indicazioni dettagliate sull'uso di Structured Streaming, vedere Streaming in Azure Databricks.

Connection string

Per connettersi al servizio Hub eventi, è necessario un stringa di connessione di Hub eventi. È possibile ottenere il stringa di connessione per l'istanza di Hub eventi dal portale di Azure o usando nella ConnectionStringBuilder libreria.

Azure portal

Quando si ottiene il stringa di connessione dal portale di Azure, potrebbe avere o meno la EntityPath chiave. Tenere in considerazione:

  // Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"

// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"

Per connettersi a EventHubs, è necessario che sia presente un oggetto EntityPath . Se il stringa di connessione non ne ha uno, non preoccuparti. Questo si occuperà di esso:

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder(without)   // defined in the previous code block
  .setEventHubName("<eventhub-name>")
  .build

Connessione ionStringBuilder

In alternativa, è possibile usare per ConnectionStringBuilder rendere il stringa di connessione.

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder()
  .setNamespaceName("<namespace-name>")
  .setEventHubName("<eventhub-name>")
  .setSasKeyName("<key-name>")
  .setSasKey("<key>")
  .build

EventHubsConf

Tutte le configurazioni relative a Hub eventi si verificano in EventHubsConf. Per creare un EventHubsConfoggetto , è necessario passare un stringa di connessione:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)

Per altre informazioni su come ottenere un stringa di connessione valido, vedere Connessione String.

Per un elenco completo delle configurazioni, vedere EventHubsConf. Ecco un subset di configurazioni per iniziare:

Opzione Valore Valori predefiniti Tipo di query Descrizione
consumerGroup Stringa "$Default" Streaming e batch Un gruppo di consumer è una visualizzazione di un intero hub eventi. I gruppi di consumer consentono a più applicazioni costose di avere una visualizzazione separata del flusso di eventi e di leggere il flusso in modo indipendente in base alle proprie esigenze e con i propri gli offset. Altre informazioni sono disponibili nella documentazione Microsoft.
startingPosition EventPosition Inizio del flusso Streaming e batch Posizione iniziale per il processo Structured Streaming. Per informazioni sull'ordine in cui vengono lette le opzioni, vedere startingPositions .
maxEventsPerTrigger long partitionCount

* 1000
Query di streaming Limite di frequenza per il numero massimo di eventi elaborati per intervallo di trigger. Il numero totale di eventi specificato verrà suddiviso proporzionalmente tra partizioni di volumi diversi.

Per ogni opzione esiste un'impostazione corrispondente in EventHubsConf. Ad esempio:

import org.apache.spark.eventhubs.

val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
  .setConsumerGroup("sample-cg")
  .setMaxEventsPerTrigger(10000)

EventPosition

EventHubsConf consente agli utenti di specificare posizioni iniziale (e finale) con la EventPosition classe . EventPosition definisce la posizione di un evento in una partizione di Hub eventi. La posizione può essere un tempo accodato, un offset, un numero di sequenza, l'inizio del flusso o la fine del flusso.

import org.apache.spark.eventhubs._

EventPosition.fromOffset("246812")          // Specifies offset 246812
EventPosition.fromSequenceNumber(100L)      // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream             // Specifies from start of stream
EventPosition.fromEndOfStream               // Specifies from end of stream

Se si vuole iniziare (o terminare) in una posizione specifica, è sufficiente creare il codice corretto EventPosition e impostarlo in EventHubsConf:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)

Streaming strutturato di produzione con Hub eventi di Azure

Quando si eseguono query di streaming nell'ambiente di produzione, è probabile che si vogliano garantire maggiore affidabilità e tempi di attività rispetto a quando si collega semplicemente un notebook a un cluster ed eseguire le query di streaming in modo interattivo. Importare ed eseguire il notebook seguente per una dimostrazione di come configurare ed eseguire Structured Streaming in produzione con Hub eventi di Azure e Azure Databricks.

Per altre informazioni, vedere Considerazioni sulla produzione per Structured Streaming.

Streaming strutturato di produzione con notebook Hub eventi di Azure

Ottenere il notebook