Hubs de Eventos do Azure
Os Hubs de Eventos do Azure são um serviço de ingestão de telemetria de hiperescala que coleta, transforma e armazena milhões de eventos. Enquanto plataforma de transmissão em fluxo distribuída, este serviço garante baixa latência e um tempo de retenção configurável, o que lhe permite introduzir vastas quantidades de telemetria na cloud e ler dados de várias aplicações através de semântica de publicação/subscrição.
Este artigo explica como usar o Streaming Estruturado com Hubs de Eventos do Azure e clusters do Azure Databricks.
Nota
Os Hubs de Eventos do Azure fornecem um ponto de extremidade compatível com o Apache Kafka que você pode usar com o conector Kafka de Streaming Estruturado, disponível no Databricks Runtime, para processar mensagens dos Hubs de Eventos do Azure. O Databricks recomenda usar o conector Kafka de Streaming Estruturado para processar mensagens dos Hubs de Eventos do Azure.
Requisitos
Para obter suporte à versão atual, consulte "Versões mais recentes" no arquivo Leiame do projeto do Azure Event Hubs Spark Connector.
Crie uma biblioteca em seu espaço de trabalho do Azure Databricks usando a coordenada
com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17
Maven .Nota
Este conector é atualizado regularmente e uma versão mais recente pode estar disponível: recomendamos que você puxe o conector mais recente do repositório Maven
Instale a biblioteca criada no cluster.
Esquema
O esquema dos registros é:
Column | Tipo |
---|---|
body |
binário |
partition |
string |
offset |
string |
sequenceNumber |
long |
enqueuedTime |
carimbo de data/hora |
publisher |
string |
partitionKey |
string |
properties |
mapa[string,json] |
O body
é sempre fornecido como uma matriz de bytes. Use cast("string")
para desserializar explicitamente a body
coluna.
Configuração
Esta seção discute as definições de configuração necessárias para trabalhar com Hubs de Eventos.
Para obter orientações detalhadas sobre como configurar o Streaming Estruturado com Hubs de Eventos do Azure, consulte o Guia de Integração de Streaming Estruturado e Hubs de Eventos do Azure desenvolvido pela Microsoft.
Para obter orientações detalhadas sobre como usar o Streaming Estruturado, consulte Streaming no Azure Databricks.
Connection string
Uma cadeia de conexão de Hubs de Eventos é necessária para se conectar ao serviço Hubs de Eventos. Você pode obter a cadeia de conexão para sua instância de Hubs de Eventos no portal do Azure ou usando o ConnectionStringBuilder
na biblioteca.
Portal do Azure
Quando você obtém a cadeia de conexão do portal do Azure, ela pode ou não ter a EntityPath
chave. Considere:
// Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"
// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"
Para se conectar aos seus EventHubs, é necessário EntityPath
estar presente. Se a sua cadeia de conexão não tiver uma, não se preocupe.
Isso cuidará disso:
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder(without) // defined in the previous code block
.setEventHubName("<eventhub-name>")
.build
ConnectionStringBuilder
Como alternativa, você pode usar o ConnectionStringBuilder
para criar sua cadeia de conexão.
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder()
.setNamespaceName("<namespace-name>")
.setEventHubName("<eventhub-name>")
.setSasKeyName("<key-name>")
.setSasKey("<key>")
.build
EventHubsConf
Toda a configuração relacionada aos Hubs de Eventos acontece no seu EventHubsConf
. Para criar um EventHubsConf
, você deve passar uma cadeia de conexão:
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
Consulte Cadeia de conexão para obter mais informações sobre como obter uma cadeia de conexão válida.
Para obter uma lista completa de configurações, consulte EventHubsConf. Aqui está um subconjunto de configurações para você começar:
Opção | Value | Default | Tipo de consulta | Description |
---|---|---|---|---|
consumerGroup |
String | "$Default" | Streaming e lote | Um grupo de consumidores é uma visão de um hub de eventos inteiro. Os grupos de consumidores ativam várias aplicações de consumo e cada uma tem uma vista separada do fluxo de eventos e lê o fluxo de forma independente ao seu próprio ritmo e com os seus próprios desvios. Mais informações estão disponíveis na documentação da Microsoft. |
startingPosition |
EventPosition | Início do fluxo | Streaming e lote | A posição inicial para o seu trabalho de Streaming Estruturado. Consulte startingPositions para obter informações sobre a ordem em que as opções são lidas. |
maxEventsPerTrigger |
long | partitionCount - 1000 |
Consulta de streaming | Limite de taxa para o número máximo de eventos processados por intervalo de gatilho. O número total especificado de eventos será dividido proporcionalmente em partições de volume diferente. |
Para cada opção, existe uma configuração correspondente em EventHubsConf
. Por exemplo:
import org.apache.spark.eventhubs.
val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
.setConsumerGroup("sample-cg")
.setMaxEventsPerTrigger(10000)
EventPosition
EventHubsConf
permite que os usuários especifiquem posições iniciais (e finais) com a EventPosition
classe. EventPosition
define a posição de um evento em uma partição do Hub de Eventos. A posição pode ser um tempo enfileirado, deslocamento, número de sequência, o início do fluxo ou o final do fluxo.
import org.apache.spark.eventhubs._
EventPosition.fromOffset("246812") // Specifies offset 246812
EventPosition.fromSequenceNumber(100L) // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream // Specifies from start of stream
EventPosition.fromEndOfStream // Specifies from end of stream
Se você gostaria de começar (ou terminar) em uma posição específica, basta criar o correto EventPosition
e defini-lo em seu EventHubsConf
:
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
.setStartingPosition(EventPosition.fromEndOfStream)