Partilhar via


Hubs de Eventos do Azure

Os Hubs de Eventos do Azure são um serviço de ingestão de telemetria de hiperescala que coleta, transforma e armazena milhões de eventos. Enquanto plataforma de transmissão em fluxo distribuída, este serviço garante baixa latência e um tempo de retenção configurável, o que lhe permite introduzir vastas quantidades de telemetria na cloud e ler dados de várias aplicações através de semântica de publicação/subscrição.

Este artigo explica como usar o Streaming Estruturado com Hubs de Eventos do Azure e clusters do Azure Databricks.

Nota

Os Hubs de Eventos do Azure fornecem um ponto de extremidade compatível com o Apache Kafka que você pode usar com o conector Kafka de Streaming Estruturado, disponível no Databricks Runtime, para processar mensagens dos Hubs de Eventos do Azure. O Databricks recomenda usar o conector Kafka de Streaming Estruturado para processar mensagens dos Hubs de Eventos do Azure.

Requisitos

Para obter suporte à versão atual, consulte "Versões mais recentes" no arquivo Leiame do projeto do Azure Event Hubs Spark Connector.

  1. Crie uma biblioteca em seu espaço de trabalho do Azure Databricks usando a coordenada com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17Maven .

    Nota

    Este conector é atualizado regularmente e uma versão mais recente pode estar disponível: recomendamos que você puxe o conector mais recente do repositório Maven

  2. Instale a biblioteca criada no cluster.

Esquema

O esquema dos registros é:

Column Tipo
body binário
partition string
offset string
sequenceNumber long
enqueuedTime carimbo de data/hora
publisher string
partitionKey string
properties mapa[string,json]

O body é sempre fornecido como uma matriz de bytes. Use cast("string") para desserializar explicitamente a body coluna.

Configuração

Esta seção discute as definições de configuração necessárias para trabalhar com Hubs de Eventos.

Para obter orientações detalhadas sobre como configurar o Streaming Estruturado com Hubs de Eventos do Azure, consulte o Guia de Integração de Streaming Estruturado e Hubs de Eventos do Azure desenvolvido pela Microsoft.

Para obter orientações detalhadas sobre como usar o Streaming Estruturado, consulte Streaming no Azure Databricks.

Connection string

Uma cadeia de conexão de Hubs de Eventos é necessária para se conectar ao serviço Hubs de Eventos. Você pode obter a cadeia de conexão para sua instância de Hubs de Eventos no portal do Azure ou usando o ConnectionStringBuilder na biblioteca.

Portal do Azure

Quando você obtém a cadeia de conexão do portal do Azure, ela pode ou não ter a EntityPath chave. Considere:

  // Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"

// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"

Para se conectar aos seus EventHubs, é necessário EntityPath estar presente. Se a sua cadeia de conexão não tiver uma, não se preocupe. Isso cuidará disso:

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder(without)   // defined in the previous code block
  .setEventHubName("<eventhub-name>")
  .build

ConnectionStringBuilder

Como alternativa, você pode usar o ConnectionStringBuilder para criar sua cadeia de conexão.

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder()
  .setNamespaceName("<namespace-name>")
  .setEventHubName("<eventhub-name>")
  .setSasKeyName("<key-name>")
  .setSasKey("<key>")
  .build

EventHubsConf

Toda a configuração relacionada aos Hubs de Eventos acontece no seu EventHubsConf. Para criar um EventHubsConf, você deve passar uma cadeia de conexão:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)

Consulte Cadeia de conexão para obter mais informações sobre como obter uma cadeia de conexão válida.

Para obter uma lista completa de configurações, consulte EventHubsConf. Aqui está um subconjunto de configurações para você começar:

Opção Value Default Tipo de consulta Description
consumerGroup String "$Default" Streaming e lote Um grupo de consumidores é uma visão de um hub de eventos inteiro. Os grupos de consumidores ativam várias aplicações de consumo e cada uma tem uma vista separada do fluxo de eventos e lê o fluxo de forma independente ao seu próprio ritmo e com os seus próprios desvios. Mais informações estão disponíveis na documentação da Microsoft.
startingPosition EventPosition Início do fluxo Streaming e lote A posição inicial para o seu trabalho de Streaming Estruturado. Consulte startingPositions para obter informações sobre a ordem em que as opções são lidas.
maxEventsPerTrigger long partitionCount

- 1000
Consulta de streaming Limite de taxa para o número máximo de eventos processados por intervalo de gatilho. O número total especificado de eventos será dividido proporcionalmente em partições de volume diferente.

Para cada opção, existe uma configuração correspondente em EventHubsConf. Por exemplo:

import org.apache.spark.eventhubs.

val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
  .setConsumerGroup("sample-cg")
  .setMaxEventsPerTrigger(10000)

EventPosition

EventHubsConf permite que os usuários especifiquem posições iniciais (e finais) com a EventPosition classe. EventPosition define a posição de um evento em uma partição do Hub de Eventos. A posição pode ser um tempo enfileirado, deslocamento, número de sequência, o início do fluxo ou o final do fluxo.

import org.apache.spark.eventhubs._

EventPosition.fromOffset("246812")          // Specifies offset 246812
EventPosition.fromSequenceNumber(100L)      // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream             // Specifies from start of stream
EventPosition.fromEndOfStream               // Specifies from end of stream

Se você gostaria de começar (ou terminar) em uma posição específica, basta criar o correto EventPosition e defini-lo em seu EventHubsConf:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)