Центры событий Azure
Центры событий Azure — это служба обработки данных телеметрии с высокой степенью масштабируемости. Она собирает, преобразовывает и хранит миллионы событий. Эта распределенная платформа для потоковой передачи данных с низкой задержкой и настраиваемым временем хранения позволяет принимать большие объемы данных телеметрии в облаке, а также считывать данные из нескольких приложений на основе семантики публикации и подписки.
В этой статье описано, как использовать структурированную потоковую передачу с использованием Центров событий Azure и кластеров Azure Databricks.
Примечание.
Центры событий Azure предоставляет конечную точку, совместимую с Apache Kafka, которую можно использовать с помощью Структурированный соединитель Stream Kafka, доступный в Databricks Runtime, для обработки сообщений из Центры событий Azure. Databricks рекомендует использовать соединитель Структурированной потоковой передачи Kafka для обработки сообщений из Центры событий Azure.
Требования
Информацию о поддержке текущих выпусков см. в разделе с описанием последних выпусков в файле README проекта Spark Connector Центров событий Azure.
Создайте библиотеку в рабочей области Azure Databricks, используя координату
com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17
Maven.Примечание.
Этот соединитель регулярно обновляется, и может быть доступна более новая версия. Мы рекомендуем вам получить последнюю версию соединителя из репозитория Maven.
Установите созданную библиотеку в кластере.
Схема
Схема записей:
Column | Тип |
---|---|
body |
binary |
partition |
строка |
offset |
строка |
sequenceNumber |
длинный |
enqueuedTime |
TIMESTAMP |
publisher |
строка |
partitionKey |
строка |
properties |
map[string,json] |
body
всегда предоставляется в виде массива байтов. Используйте cast("string")
для явной десериализации столбца body
.
Настройка
В этом разделе обсуждаются параметры конфигурации, необходимые для работы с Центрами событий.
Подробные инструкции по настройке структурированной потоковой передачи с использованием Центров событий Azure см. в статье Руководство по интеграции структурированной потоковой передачи и Центров событий Azure от корпорации Майкрософт.
Подробные инструкции по использованию структурированной потоковой передачи см. в статье "Потоковая передача" в Azure Databricks.
Connection string
Строка подключения к Центрам событий требуется для подключения к службе Центров событий. Строку подключения для экземпляра Центров событий можно получить на портале Azure или с помощью ConnectionStringBuilder
в библиотеке.
Портал Azure
Когда вы получаете строку подключения с портала Azure, в ней может быть или не быть ключ EntityPath
. Необходимо учесть следующие моменты.
// Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"
// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"
Чтобы подключиться к Центрам событий, требуется присутствие EntityPath
. Если в строке подключения этого нет, не беспокойтесь.
Этот вопрос будет решен за вас:
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder(without) // defined in the previous code block
.setEventHubName("<eventhub-name>")
.build
ConnectionStringBuilder
Кроме того, можно использовать ConnectionStringBuilder
для создания строки подключения.
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder()
.setNamespaceName("<namespace-name>")
.setEventHubName("<eventhub-name>")
.setSasKeyName("<key-name>")
.setSasKey("<key>")
.build
EventHubsConf
Вся конфигурация, связанная с Центрами событий, определяется в EventHubsConf
. Чтобы создать EventHubsConf
, необходимо передать строку подключения.
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
Дополнительные сведения о получении допустимой строки подключения см. в разделе Строка подключения.
Полный список конфигураций см. в разделе EventHubsConf. Вот набор конфигураций, которые помогут вам приступить к работе:
Вариант | Значение | По умолчанию | Тип запроса | Description |
---|---|---|---|---|
consumerGroup |
Строка | $Default | Потоковая передача и пакетная обработка | Группа потребителей — это представление всего концентратора событий. Группы потребителей обеспечивают каждому из нескольких потребляющих приложений отдельное представление потока событий, а также возможность считывания потока независимо друг от друга в своем темпе и с собственными смещениями. Дополнительные сведения см. в документации Майкрософт. |
startingPosition |
EventPosition | Начало потоковой передачи | Потоковая передача и пакетная обработка | Начальная позиция для задания структурированной потоковой передачи. Дополнительные сведения о порядке, в котором считываются параметры, см. в разделе startingPositions. |
maxEventsPerTrigger |
длинный | partitionCount 1000- |
Запрос потоковой передачи | Ограничение скорости для максимального количества событий, обрабатываемых за интервал триггера. Указанное общее количество событий будет пропорционально распределено между секциями разного объема. |
Для каждого варианта существует соответствующий параметр в EventHubsConf
. Например:
import org.apache.spark.eventhubs.
val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
.setConsumerGroup("sample-cg")
.setMaxEventsPerTrigger(10000)
EventPosition
EventHubsConf
позволяет пользователям указывать начальную (и конечную) позиции в классеEventPosition
. EventPosition
определяет позицию события в секции концентратора событий. Позицией может быть время постановки в очередь, смещение, порядковый номер, а также начало или конец потока.
import org.apache.spark.eventhubs._
EventPosition.fromOffset("246812") // Specifies offset 246812
EventPosition.fromSequenceNumber(100L) // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream // Specifies from start of stream
EventPosition.fromEndOfStream // Specifies from end of stream
Если вы хотите выполнить запуск (или остановку) в определенной позиции, просто создайте правильный класс EventPosition
и настройте его в EventHubsConf
.
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
.setStartingPosition(EventPosition.fromEndOfStream)