Поделиться через


Центры событий Azure

Центры событий Azure — это служба обработки данных телеметрии с высокой степенью масштабируемости. Она собирает, преобразовывает и хранит миллионы событий. Эта распределенная платформа для потоковой передачи данных с низкой задержкой и настраиваемым временем хранения позволяет принимать большие объемы данных телеметрии в облаке, а также считывать данные из нескольких приложений на основе семантики публикации и подписки.

В этой статье описано, как использовать структурированную потоковую передачу с использованием Центров событий Azure и кластеров Azure Databricks.

Примечание.

Центры событий Azure предоставляет конечную точку, совместимую с Apache Kafka, которую можно использовать с помощью Структурированный соединитель Stream Kafka, доступный в Databricks Runtime, для обработки сообщений из Центры событий Azure. Databricks рекомендует использовать соединитель Структурированной потоковой передачи Kafka для обработки сообщений из Центры событий Azure.

Требования

Информацию о поддержке текущих выпусков см. в разделе с описанием последних выпусков в файле README проекта Spark Connector Центров событий Azure.

  1. Создайте библиотеку в рабочей области Azure Databricks, используя координату com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17 Maven.

    Примечание.

    Этот соединитель регулярно обновляется, и может быть доступна более новая версия. Мы рекомендуем вам получить последнюю версию соединителя из репозитория Maven.

  2. Установите созданную библиотеку в кластере.

Схема

Схема записей:

Column Тип
body binary
partition строка
offset строка
sequenceNumber длинный
enqueuedTime TIMESTAMP
publisher строка
partitionKey строка
properties map[string,json]

body всегда предоставляется в виде массива байтов. Используйте cast("string") для явной десериализации столбца body.

Настройка

В этом разделе обсуждаются параметры конфигурации, необходимые для работы с Центрами событий.

Подробные инструкции по настройке структурированной потоковой передачи с использованием Центров событий Azure см. в статье Руководство по интеграции структурированной потоковой передачи и Центров событий Azure от корпорации Майкрософт.

Подробные инструкции по использованию структурированной потоковой передачи см. в статье "Потоковая передача" в Azure Databricks.

Connection string

Строка подключения к Центрам событий требуется для подключения к службе Центров событий. Строку подключения для экземпляра Центров событий можно получить на портале Azure или с помощью ConnectionStringBuilder в библиотеке.

Портал Azure

Когда вы получаете строку подключения с портала Azure, в ней может быть или не быть ключ EntityPath. Необходимо учесть следующие моменты.

  // Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"

// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"

Чтобы подключиться к Центрам событий, требуется присутствие EntityPath. Если в строке подключения этого нет, не беспокойтесь. Этот вопрос будет решен за вас:

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder(without)   // defined in the previous code block
  .setEventHubName("<eventhub-name>")
  .build

ConnectionStringBuilder

Кроме того, можно использовать ConnectionStringBuilder для создания строки подключения.

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder()
  .setNamespaceName("<namespace-name>")
  .setEventHubName("<eventhub-name>")
  .setSasKeyName("<key-name>")
  .setSasKey("<key>")
  .build

EventHubsConf

Вся конфигурация, связанная с Центрами событий, определяется в EventHubsConf. Чтобы создать EventHubsConf, необходимо передать строку подключения.

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)

Дополнительные сведения о получении допустимой строки подключения см. в разделе Строка подключения.

Полный список конфигураций см. в разделе EventHubsConf. Вот набор конфигураций, которые помогут вам приступить к работе:

Вариант Значение По умолчанию Тип запроса Description
consumerGroup Строка $Default Потоковая передача и пакетная обработка Группа потребителей — это представление всего концентратора событий. Группы потребителей обеспечивают каждому из нескольких потребляющих приложений отдельное представление потока событий, а также возможность считывания потока независимо друг от друга в своем темпе и с собственными смещениями. Дополнительные сведения см. в документации Майкрософт.
startingPosition EventPosition Начало потоковой передачи Потоковая передача и пакетная обработка Начальная позиция для задания структурированной потоковой передачи. Дополнительные сведения о порядке, в котором считываются параметры, см. в разделе startingPositions.
maxEventsPerTrigger длинный partitionCount

1000-
Запрос потоковой передачи Ограничение скорости для максимального количества событий, обрабатываемых за интервал триггера. Указанное общее количество событий будет пропорционально распределено между секциями разного объема.

Для каждого варианта существует соответствующий параметр в EventHubsConf. Например:

import org.apache.spark.eventhubs.

val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
  .setConsumerGroup("sample-cg")
  .setMaxEventsPerTrigger(10000)

EventPosition

EventHubsConf позволяет пользователям указывать начальную (и конечную) позиции в классеEventPosition. EventPosition определяет позицию события в секции концентратора событий. Позицией может быть время постановки в очередь, смещение, порядковый номер, а также начало или конец потока.

import org.apache.spark.eventhubs._

EventPosition.fromOffset("246812")          // Specifies offset 246812
EventPosition.fromSequenceNumber(100L)      // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream             // Specifies from start of stream
EventPosition.fromEndOfStream               // Specifies from end of stream

Если вы хотите выполнить запуск (или остановку) в определенной позиции, просто создайте правильный класс EventPosition и настройте его в EventHubsConf.

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)