Azure Event Hubs

Azure Event Hubs is een hyperschaalservice voor telemetrieopname die miljoenen gebeurtenissen verzamelt, transformeert en opslaat. Als gedistribueerd streamingplatform biedt het u een lage latentie en configureerbare tijdretentie, waardoor u enorme hoeveelheden telemetrie in de cloud kunt invoeren en de gegevens van meerdere toepassingen kunt lezen met behulp van de semantiek publiceren/abonneren.

In dit artikel wordt uitgelegd hoe u Structured Streaming gebruikt met Azure Event Hubs- en Azure Databricks-clusters.

Vereisten

Zie 'Nieuwste releases' in het leesmij-bestand van het project Azure Event Hubs Spark Connector voor ondersteuning van de huidige release.

  1. Maak een bibliotheek in uw Azure Databricks-werkruimte met behulp van de Maven-coördinaat com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17.

    Notitie

    Deze connector wordt regelmatig bijgewerkt en er is mogelijk een recentere versie beschikbaar: u wordt aangeraden de meest recente connector op te halen uit de Maven-opslagplaats

  2. Installeer de gemaakte bibliotheek in uw cluster.

Schema

Het schema van de records is:

Kolom Type
body binair
partitie tekenreeks
offset tekenreeks
sequenceNumber long
enqueuedTime tijdstempel
publisher tekenreeks
partitionKey tekenreeks
properties map[tekenreeks,json]

De body wordt altijd opgegeven als een bytematrix. Gebruik cast("string") om de body kolom expliciet te deserialiseren.

Quick Start

Laten we beginnen met een kort voorbeeld: WordCount. Het volgende notebook is alles wat u nodig hebt om WordCount uit te voeren met behulp van Gestructureerd streamen met Azure Event Hubs.

WordCount Azure Event Hubs met structured streaming-notebook

Notebook downloaden

Configuratie

In deze sectie worden de configuratie-instellingen besproken die u nodig hebt om met Event Hubs te werken.

Zie de Integratiehandleiding voor gestructureerd streamen en Azure Event Hubs die zijn ontwikkeld door Microsoft voor gedetailleerde richtlijnen voor het configureren van gestructureerd streamen met Azure Event Hubs.

Zie Wat is Apache Spark Structured Streaming? voor gedetailleerde richtlijnen over het gebruik van Structured Streaming.

Verbindingsreeks

Er is een Event Hubs-connection string vereist om verbinding te maken met de Event Hubs-service. U kunt de connection string voor uw Event Hubs-exemplaar ophalen vanuit de Azure Portal of met behulp van de ConnectionStringBuilder in de bibliotheek.

Azure Portal

Wanneer u de connection string van de Azure Portal krijgt, kan deze de EntityPath sleutel al dan niet bevatten. Overweeg het volgende:

  // Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"

// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"

Als u verbinding wilt maken met uw EventHubs, moet een EntityPath aanwezig zijn. Als uw connection string er geen heeft, hoeft u zich geen zorgen te maken. Dit zorgt ervoor:

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder(without)   // defined in the previous code block
  .setEventHubName("<eventhub-name>")
  .build

ConnectionStringBuilder

U kunt ook de ConnectionStringBuilder gebruiken om uw connection string te maken.

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder()
  .setNamespaceName("<namespace-name>")
  .setEventHubName("<eventhub-name>")
  .setSasKeyName("<key-name>")
  .setSasKey("<key>")
  .build

EventHubsConf

Alle configuratie met betrekking tot Event Hubs vindt plaats in uw EventHubsConf. Als u een EventHubsConfwilt maken, moet u een connection string doorgeven:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)

Zie Verbindingsreeks voor meer informatie over het verkrijgen van een geldige connection string.

Zie EventHubsConf voor een volledige lijst met configuraties. Hier volgt een subset met configuraties om u op weg te helpen:

Optie Waarde Standaard Querytype Beschrijving
consumerGroup Tekenreeks "$Default" Streamen en batchgewijs Een consumentengroep is een weergave van een hele Event Hub. Consumergroepen maken het mogelijk dat meerdere consumerende toepassingen beschikken over een afzonderlijke weergave van de gebeurtenisstroom. De toepassingen kunnen de stroom onafhankelijk, in hun eigen tempo en met hun eigen offsets lezen. Meer informatie vindt u in de Microsoft-documentatie.
startingPosition EventPosition Begin van de stream Streamen en batchgewijs De beginpositie voor uw Structured Streaming-taak. Zie startingPositions voor informatie over de volgorde waarin opties worden gelezen.
maxEventsPerTrigger long partitionCount

* 1000
Streamingquery Frequentielimiet voor het maximum aantal gebeurtenissen dat per triggerinterval wordt verwerkt. Het opgegeven totale aantal gebeurtenissen wordt proportioneel verdeeld over partities van verschillende volumes.

Voor elke optie bestaat er een bijbehorende instelling in EventHubsConf. Bijvoorbeeld:

import org.apache.spark.eventhubs.

val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
  .setConsumerGroup("sample-cg")
  .setMaxEventsPerTrigger(10000)

EventPosition

EventHubsConf stelt gebruikers in staat om begin- (en eindposities) op te geven met de EventPosition klasse. EventPosition definieert de positie van een gebeurtenis in een Event Hub-partitie. De positie kan een onderbroken tijd, offset, volgnummer, het begin van de stroom of het einde van de stroom zijn.

import org.apache.spark.eventhubs._

EventPosition.fromOffset("246812")          // Specifies offset 246812
EventPosition.fromSequenceNumber(100L)      // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream             // Specifies from start of stream
EventPosition.fromEndOfStream               // Specifies from end of stream

Als u wilt beginnen (of eindigen) op een specifieke positie, maakt u gewoon de juiste EventPosition en stelt u deze in uw EventHubsConf:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)

Gestructureerd streamen van productie met Azure Event Hubs

Wanneer u streamingquery's in productie uitvoert, wilt u waarschijnlijk meer robuustheid en uptime garanderen dan wanneer u gewoon een notebook aan een cluster koppelt en uw streamingquery's interactief uitvoert. Importeer en voer het volgende notebook uit voor een demonstratie van het configureren en uitvoeren van Gestructureerd streamen in productie met Azure Event Hubs en Azure Databricks.

Zie Production considerations for Structured Streaming (Productieoverwegingen voor gestructureerd streamen) voor meer informatie.

Gestructureerd streamen van productie met Azure Event Hubs notebook

Notebook downloaden