Azure Event Hubs
Azure Event Hubs es un servicio de ingesta de datos de telemetría a hiperescala que recopila, transforma y almacena millones de eventos. Como plataforma de streaming distribuida, ofrece retención de tiempo configurable y baja latencia, lo que permite ingresar grandes cantidades de datos de telemetría en la nube y leer los datos en varias aplicaciones mediante semántica de publicación o suscripción.
En este artículo se explica cómo usar el streaming estructurado con Azure Event Hubs y clústeres de Azure Databricks.
Nota:
Azure Event Hubs proporciona un punto de conexión compatible con Apache Kafka que puede usar con el conector de Kafka de Structured Streaming, disponible en Databricks Runtime, para procesar mensajes de Azure Event Hubs. Databricks recomienda usar el conector de Kafka de Structured Streaming para procesar mensajes de Azure Event Hubs.
Requisitos
Para compatibilidad con la versión actual, consulte "Versiones más recientes" en el archivo léame del proyecto del conector de Spark para Azure Event Hubs.
Cree una biblioteca en el área de trabajo de Azure Databricks mediante la coordenada
com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17
de Maven.Nota:
El conector se actualiza con regularidad y puede haber disponible una versión más reciente: le recomendamos que extraiga el conector más reciente del repositorio de Maven.
Instale la biblioteca creada en el clúster.
Schema
El esquema de los registros es:
Columna | Tipo |
---|---|
body |
binary |
partition |
string |
offset |
string |
sequenceNumber |
long |
enqueuedTime |
timestamp |
publisher |
string |
partitionKey |
string |
properties |
map[string,json] |
body
siempre se proporciona como matriz de bytes. Use cast("string")
para deserializar explícitamente la columna body
.
Configuración
En esta sección se explican las opciones de configuración necesarias para trabajar con Event Hubs.
Para instrucciones detalladas sobre cómo configurar el streaming estructurado con Azure Event Hubs, consulte la guía de integración del streaming estructurado y Azure Event Hubs desarrollada por Microsoft.
Para obtener instrucciones detalladas sobre el uso de Structured Streaming, consulte Streaming en Azure Databricks.
Cadena de conexión
Se necesita una cadena de conexión de Event Hubs para la conexión al servicio Event Hubs. Puede obtenerla para su instancia de Event Hubs en Azure Portal o si usa ConnectionStringBuilder
en la biblioteca.
Azure portal
Si obtiene la cadena de conexión de Azure Portal, la clave de EntityPath
puede que no esté. Considere:
// Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"
// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"
Para conectarse a Event Hubs, debe haber EntityPath
. Si la cadena de conexión no lo tiene, no se preocupe.
Con esto se consigue:
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder(without) // defined in the previous code block
.setEventHubName("<eventhub-name>")
.build
ConnectionStringBuilder
Como alternativa, puede usar ConnectionStringBuilder
para crear la cadena de conexión.
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder()
.setNamespaceName("<namespace-name>")
.setEventHubName("<eventhub-name>")
.setSasKeyName("<key-name>")
.setSasKey("<key>")
.build
EventHubsConf
Toda la configuración relativa a Event Hubs se produce en EventHubsConf
. Para crear EventHubsConf
, debe pasar una cadena de conexión:
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
Consulte Cadena de conexión para más información sobre cómo obtener una cadena de conexión válida.
Para una lista exhaustiva de las configuraciones, consulte EventHubsConf. Este es un subconjunto de configuraciones para empezar:
Opción | Value | Valor predeterminado | Tipo de consulta | Descripción |
---|---|---|---|---|
consumerGroup |
String | “$Default” | Streaming y lote | Un grupo de consumidores es una vista de un centro de eventos completo. Los grupos de consumidores habilitan varias aplicaciones consumidoras para que cada una tenga una vista separada del flujo de eventos y para que lean el flujo de forma independiente a su propio ritmo y con sus propios desplazamientos. Más información disponible en la documentación de Microsoft. |
startingPosition |
EventPosition | Inicio de la secuencia | Streaming y lote | Posición inicial del trabajo del streaming estructurado. Consulte startingPositions para información sobre el orden de lectura de las opciones. |
maxEventsPerTrigger |
long | partitionCount 1000- |
Consulta de streaming | Número máximo de eventos que se procesan por intervalo del desencadenador. El número total especificado de eventos se dividirá proporcionalmente entre las particiones de volumen distinto. |
Para cada opción, existe una configuración correspondiente en EventHubsConf
. Por ejemplo:
import org.apache.spark.eventhubs.
val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
.setConsumerGroup("sample-cg")
.setMaxEventsPerTrigger(10000)
EventPosition
EventHubsConf
permite a los usuarios especificar posiciones iniciales (y finales) con la clase EventPosition
. EventPosition
define la posición de un evento en una partición del centro de eventos. La posición puede ser un tiempo de puesta en cola, un desplazamiento, un número de secuencia, o el inicio o el final de la transmisión.
import org.apache.spark.eventhubs._
EventPosition.fromOffset("246812") // Specifies offset 246812
EventPosition.fromSequenceNumber(100L) // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream // Specifies from start of stream
EventPosition.fromEndOfStream // Specifies from end of stream
Si desea iniciar (o finalizar) en una posición específica, basta con crear el EventPosition
correcto y establecerlo en EventHubsConf
:
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
.setStartingPosition(EventPosition.fromEndOfStream)