Azure Event Hubs
Azure Event Hubs is een telemetrieopnameservice op hyperschaal die miljoenen gebeurtenissen verzamelt, transformeert en opslaat. Omdat het een gedistribueerd streamingplatform is, biedt het u een lage latentie en configureerbare retentietijd, waardoor u enorme hoeveelheden telemetriegegevens in de cloud kunt opnemen en u de gegevens kunt lezen vanuit diverse toepassingen met behulp van publicatie-abonnementsemantiek.
In dit artikel wordt uitgelegd hoe u Structured Streaming gebruikt met Azure Event Hubs- en Azure Databricks-clusters.
Notitie
Azure Event Hubs biedt een eindpunt dat compatibel is met Apache Kafka die u kunt gebruiken met de Structured Streaming Kafka-connector, beschikbaar in Databricks Runtime, om berichten van Azure Event Hubs te verwerken. Databricks raadt aan om de Structured Streaming Kafka-connector te gebruiken om berichten van Azure Event Hubs te verwerken.
Vereisten
Zie 'Nieuwste releases' in het Leesmij-bestand van het Azure Event Hubs Spark-Verbinding maken or-project voor de huidige release.
Maak een bibliotheek in uw Azure Databricks-werkruimte met behulp van de Maven-coördinaat
com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17
.Notitie
Deze connector wordt regelmatig bijgewerkt en er is mogelijk een recentere versie beschikbaar: we raden u aan de meest recente connector op te halen uit de Maven-opslagplaats
Installeer de gemaakte bibliotheek in uw cluster.
Schema
Het schema van de records is:
Column | Type |
---|---|
body |
binair |
partition |
tekenreeks |
offset |
tekenreeks |
sequenceNumber |
long |
enqueuedTime |
timestamp |
publisher |
tekenreeks |
partitionKey |
tekenreeks |
properties |
map[string,json] |
De body
waarde wordt altijd geleverd als een bytematrix. Gebruik cast("string")
dit om de kolom expliciet deserialiseren body
.
Configuratie
In deze sectie worden de configuratie-instellingen besproken die u nodig hebt om te werken met Event Hubs.
Zie de integratiehandleiding voor gestructureerde streaming en Azure Event Hubs die door Microsoft zijn ontwikkeld voor gedetailleerde richtlijnen voor het configureren van Structured Streaming met Azure Event Hubs.
Zie Streaming op Azure Databricks voor gedetailleerde richtlijnen over het gebruik van Structured Streaming.
Connection string
Een Event Hubs-verbindingsreeks is vereist om verbinding te maken met de Event Hubs-service. U kunt de verbindingsreeks voor uw Event Hubs-exemplaar ophalen vanuit Azure Portal of met behulp van de ConnectionStringBuilder
bibliotheek.
Azure Portal
Wanneer u de verbindingsreeks uit Azure Portal krijgt, heeft deze mogelijk of niet de EntityPath
sleutel. Overweeg het volgende:
// Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"
// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"
Als u verbinding wilt maken met uw EventHubs, moet er een EntityPath
aanwezig zijn. Als uw verbindingsreeks geen verbindingsreeks heeft, maakt u zich geen zorgen.
Dit zorgt ervoor:
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder(without) // defined in the previous code block
.setEventHubName("<eventhub-name>")
.build
Verbinding maken ionStringBuilder
U kunt ook de ConnectionStringBuilder
verbindingsreeks maken.
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder()
.setNamespaceName("<namespace-name>")
.setEventHubName("<eventhub-name>")
.setSasKeyName("<key-name>")
.setSasKey("<key>")
.build
EventHubsConf
Alle configuraties met betrekking tot Event Hubs vindt plaats in uw EventHubsConf
. Als u een EventHubsConf
wilt maken, moet u een verbindingsreeks doorgeven:
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
Zie Verbinding maken ion String voor meer informatie over het verkrijgen van een geldige verbindingsreeks.
Zie EventHubsConf voor een volledige lijst met configuraties. Hier volgt een subset van configuraties om u op weg te helpen:
Optie | Weergegeven als | Standaardinstelling | Querytype | Beschrijving |
---|---|---|---|---|
consumerGroup |
String | "$Default" | Streaming en batch | Een consumentengroep is een weergave van een hele Event Hub. Consumergroepen maken het mogelijk dat meerdere consumerende toepassingen beschikken over een afzonderlijke weergave van de gebeurtenisstroom. De toepassingen kunnen de stroom onafhankelijk, in hun eigen tempo en met hun eigen offsets lezen. Meer informatie vindt u in de Microsoft-documentatie. |
startingPosition |
EventPosition | Begin van de stream | Streaming en batch | De beginpositie voor uw Structured Streaming-taak. Zie startingPositions voor informatie over de volgorde waarin de opties worden gelezen. |
maxEventsPerTrigger |
long | partitionCount * 1000 |
Streamingquery | Frequentielimiet voor het maximum aantal gebeurtenissen dat per triggerinterval wordt verwerkt. Het opgegeven totale aantal gebeurtenissen wordt proportioneel verdeeld over partities van verschillende volumes. |
Voor elke optie bestaat er een bijbehorende instelling in EventHubsConf
. Voorbeeld:
import org.apache.spark.eventhubs.
val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
.setConsumerGroup("sample-cg")
.setMaxEventsPerTrigger(10000)
EventPosition
EventHubsConf
staat gebruikers toe om beginposities (en eindposities) met de EventPosition
klasse op te geven. EventPosition
definieert de positie van een gebeurtenis in een Event Hub-partitie. De positie kan een enqueued tijd, offset, volgnummer, het begin van de stream of het einde van de stream zijn.
import org.apache.spark.eventhubs._
EventPosition.fromOffset("246812") // Specifies offset 246812
EventPosition.fromSequenceNumber(100L) // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream // Specifies from start of stream
EventPosition.fromEndOfStream // Specifies from end of stream
Als u wilt beginnen (of eindigen) op een specifieke positie, maakt u gewoon de juiste EventPosition
en stelt u deze in uw EventHubsConf
:
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
.setStartingPosition(EventPosition.fromEndOfStream)
Feedback
https://aka.ms/ContentUserFeedback.
Binnenkort beschikbaar: In de loop van 2024 zullen we GitHub-problemen geleidelijk uitfaseren als het feedbackmechanisme voor inhoud en deze vervangen door een nieuw feedbacksysteem. Zie voor meer informatie:Feedback verzenden en weergeven voor