Hubs de eventos do Azure
Os Hubs de Eventos do Azure são um serviço de ingestão de telemetria de hiperescala que coleta, transforma e armazena milhões de eventos. Como uma plataforma de streaming distribuída, ele oferece baixa latência e retenção de tempo configurável, permitindo que você ingresse grandes quantidades de telemetria na nuvem e leia os dados de diversos aplicativos usando semântica de publicação/assinatura.
Este artigo explica como usar o Fluxo Estruturado com clusters dos Hubs de Eventos do Azure e dados do Azure Databricks.
Observação
Os Hubs de Eventos do Azure fornecem um ponto de extremidade compatível com o Apache Kafka que você pode usar com o Conector Kafka de streaming estruturado, disponível no Databricks Runtime, para processar mensagens dos Hubs de Eventos do Azure. O Databricks recomenda usar o conector Kafka de fluxo estruturado para processar mensagens de Hubs de Eventos do Azure.
Requisitos
Para suporte à versão atual, consulte "Versões mais recentes" no arquivo leia-me do projeto Hubs de Eventos do Azure Spark Connector.
Crie uma biblioteca em seu workspace no Azure Databricks usando a coordenada Maven
com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17
.Observação
Esse conector é atualizado regularmente, e uma versão mais recente pode estar disponível: recomendamos que você busque o conector mais recente no repositório do Maven
Instale a biblioteca criada em seu cluster.
Esquema
O esquema dos registros é:
Coluna | Type |
---|---|
body |
binary |
partition |
string |
offset |
string |
sequenceNumber |
long |
enqueuedTime |
timestamp |
publisher |
string |
partitionKey |
string |
properties |
map[string,json] |
O body
é sempre fornecido como uma matriz de byte. Use cast("string")
para desserializar explicitamente a coluna body
.
Configuração
Esta seção discute as definições de configuração que você precisa para trabalhar com os Hubs de Eventos.
Para obter diretrizes detalhadas sobre como configurar o Fluxo Estruturado com Hubs de Eventos do Azure, consulte o Guia de Integração dos Hubs de Eventos do Azure e Fluxo Estruturado desenvolvido pela Microsoft.
Para obter diretrizes detalhadas sobre como usar o Fluxo Estruturado, confira Fluxo Estruturado no Azure Databricks.
Cadeia de conexão
Uma cadeia de conexão dos Hubs de Eventos é necessária para se conectar ao serviço hubs de eventos. Você pode obter a cadeia de conexão para sua instância dos Hubs de Eventos do portal do Azure ou usando o ConnectionStringBuilder
na biblioteca.
Portal do Azure
Quando você obter a cadeia de conexão do portal do Azure, ela poderá ou não ter a chave EntityPath
. Considerar:
// Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"
// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"
Para se conectar aos seus EventHubs, um EntityPath
deve estar presente. Se a cadeia de conexão não tiver uma, não se preocupe.
Isso tomará conta dele:
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder(without) // defined in the previous code block
.setEventHubName("<eventhub-name>")
.build
ConnectionStringBuilder
Como alternativa, você pode usar o ConnectionStringBuilder
para fazer sua cadeia de conexão.
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder()
.setNamespaceName("<namespace-name>")
.setEventHubName("<eventhub-name>")
.setSasKeyName("<key-name>")
.setSasKey("<key>")
.build
EventHubsConf
Todas as configurações relacionadas aos Hubs de Eventos ocorrem em seu EventHubsConf
. Para criar um EventHubsConf
, você deve passar uma cadeia de conexão:
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
Consulte Cadeia de conexão para obter mais informações sobre como obter uma cadeia de conexão válida.
Para ver uma lista completa de configurações, consulte EventHubsConf. Aqui está um subconjunto de configurações para você começar:
Opção | Valor | Padrão | Tipo de consulta | Descrição |
---|---|---|---|---|
consumerGroup |
String | “$Default” | Streaming e lote | Um grupo de consumidores é uma exibição de um hub de eventos inteiro. Os grupos de consumidores permitem que vários aplicativos de consumo tenham um modo de exibição separado do fluxo de eventos e leiam o fluxo de forma independente em seu próprio ritmo e com seus próprios deslocamentos. Mais informações estão disponíveis na documentação da Microsoft. |
startingPosition |
EventPosition | Início da transmissão | Streaming e lote | A posição inicial para seu trabalho de Fluxo Estruturado. Consulte startingPositions para obter informações sobre a ordem em que as opções são lidas. |
maxEventsPerTrigger |
long | partitionCount * 1000 |
Consulta de streaming | Limite de taxa no número máximo de eventos processados por intervalo de gatilho. O número total de eventos especificado será dividido proporcionalmente entre partições de volume diferente. |
Para cada opção, existe uma configuração correspondente em EventHubsConf
. Por exemplo:
import org.apache.spark.eventhubs.
val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
.setConsumerGroup("sample-cg")
.setMaxEventsPerTrigger(10000)
EventPosition
EventHubsConf
permite que os usuários especifiquem posições inicial (e final) com a classe EventPosition
. EventPosition
define a posição de um evento em uma partição do Hub de Eventos. A posição pode ser um tempo, deslocamento, número de sequência, o início do fluxo ou o final do fluxo.
import org.apache.spark.eventhubs._
EventPosition.fromOffset("246812") // Specifies offset 246812
EventPosition.fromSequenceNumber(100L) // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream // Specifies from start of stream
EventPosition.fromEndOfStream // Specifies from end of stream
Se você quiser iniciar (ou terminar) em uma posição específica, basta criar o EventPosition
correto e defini-lo em seu EventHubsConf
:
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
.setStartingPosition(EventPosition.fromEndOfStream)