Azure Event Hubs
Azure Event Hubs est un service d’ingestion de données de télémétrie à très grande échelle qui collecte, transforme et stocke des millions d’événements. En tant que plateforme de streaming distribuée, ce service propose une faible latence et une durée de conservation configurable, ce qui vous permet d’ingérer des quantités massives de données de télémétrie dans le cloud et de lire les données de plusieurs applications en utilisant une sémantique publication-abonnement.
Cet article explique comment utiliser une diffusion en continu structurée avec des clusters Azure Event Hubs et Azure Databricks.
Remarque
Azure Event Hubs fournit un point de terminaison compatible avec Apache Kafka que vous pouvez utiliser avec le connecteur flux structuré Kafka, disponible dans Databricks Runtime, pour traiter les messages provenant de Azure Event Hubs. Databricks recommande d’utiliser le connecteur Kafka Structured Streaming pour traiter les messages provenant de Azure Event Hubs.
Spécifications
Pour la prise en charge de la version actuelle, consultez « Latest Releases » dans le fichier readme du projet de connecteur Spark pour Azure Event Hubs.
Créez une bibliothèque dans votre espace de travail Azure Databricks à l’aide de la coordonnée Maven
com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17
.Notes
Ce connecteur étant mis à jour régulièrement, il se peut qu’une version plus récente soit disponible. Nous vous recommandons d’extraire le dernier connecteur du référentiel Maven
Installez la bibliothèque créée dans votre cluster.
schéma
Le schéma des enregistrements est le suivant :
Colonne | Type |
---|---|
body |
binary |
partition |
string |
offset |
string |
sequenceNumber |
long |
enqueuedTime |
timestamp |
publisher |
string |
partitionKey |
string |
properties |
map[string,json] |
Le body
est toujours fourni sous le forme d’un tableau d’octets. Utilisez cast("string")
pour désérialiser explicitement la colonne body
.
Configuration
Cette section décrit les paramètres de configuration que vous devez utiliser avec Event Hubs.
Pour obtenir des instructions détaillées sur la configuration de la diffusion en continu structurée avec Azure Event Hubs, consultez le Guide d’intégration de la diffusion en continu structurée et d’Azure Event Hubs élaboré par Microsoft.
Pour obtenir des instructions détaillées sur l’utilisation de Structured Streaming, consultez Diffusion sur Azure Databricks.
Chaîne de connexion
Une chaîne de connexion Event Hubs est requise pour se connecter au service Event Hubs. Vous pouvez obtenir la chaîne de connexion pour votre instance Event Hubs à partir du portail Azure ou en utilisant le ConnectionStringBuilder
dans la bibliothèque.
Portail Azure
Lorsque vous récupérez la chaîne de connexion à partir du portail Azure, elle peut avoir ou non la clé EntityPath
. Vous devez :
// Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"
// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"
Pour que vous puissiez vous connecter à votre Event Hubs, un EntityPath
doit être présent. Si votre chaîne de connexion n’en a pas, ne vous inquiétez pas.
La ressource suivante s’en occupera :
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder(without) // defined in the previous code block
.setEventHubName("<eventhub-name>")
.build
ConnectionStringBuilder
Vous pouvez également utiliser le ConnectionStringBuilder
pour créer votre chaîne de connexion.
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder()
.setNamespaceName("<namespace-name>")
.setEventHubName("<eventhub-name>")
.setSasKeyName("<key-name>")
.setSasKey("<key>")
.build
EventHubsConf
Toute la configuration relative à Event Hubs se produit dans votre EventHubsConf
. Pour créer un EventHubsConf
, vous devez passer une chaîne de connexion :
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
Pour plus d’informations sur l’obtention d’une chaîne de connexion valide, consultez Chaîne de connexion.
Pour obtenir la liste complète des configurations, consultez EventHubsConf. Voici un sous-ensemble de configurations pour commencer :
Option | Valeur | Default | Type de requête | Description |
---|---|---|---|---|
consumerGroup |
String | “$Default” | Diffusion en continu et traitement par lots | Un groupe de consommateurs est un affichage d’un hub d’événements entier. Les groupes de consommateurs permettent à plusieurs applications consommatrices d'avoir chacune une vue distincte du flux d'événements et de lire le flux indépendamment à leur propre rythme et avec leurs propres décalages. Pour plus d’informations, consultez la documentation Microsoft. |
startingPosition |
EventPosition | Début du flux | Diffusion en continu et traitement par lots | Position de départ de votre travail de diffusion en continu structurée. Pour plus d’informations sur l’ordre dans lequel les options sont lues, consultez startingPositions. |
maxEventsPerTrigger |
long | partitionCount 1000- |
Requête de diffusion en continu | Limite de débit sur le nombre maximal d’événements traités par intervalle de déclencheur. Le nombre total d’événements spécifié sera réparti de manière proportionnelle entre des partitions de volume différent. |
Pour chaque option, il existe un paramètre correspondant dans EventHubsConf
. Par exemple :
import org.apache.spark.eventhubs.
val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
.setConsumerGroup("sample-cg")
.setMaxEventsPerTrigger(10000)
EventPosition
EventHubsConf
permet aux utilisateurs de spécifier des positions de début (et de fin) avec la classe EventPosition
. EventPosition
définit la position d’un événement dans une partition de hub d’événements. La position peut être une heure mise en file d’attente, un décalage, un numéro de séquence, le début du flux ou la fin du flux.
import org.apache.spark.eventhubs._
EventPosition.fromOffset("246812") // Specifies offset 246812
EventPosition.fromSequenceNumber(100L) // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream // Specifies from start of stream
EventPosition.fromEndOfStream // Specifies from end of stream
Si vous souhaitez commencer (ou finir) à un emplacement spécifique, créez simplement la EventPosition
correcte et définissez-la dans votre EventHubsConf
:
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
.setStartingPosition(EventPosition.fromEndOfStream)