Потоковая обработка с помощью Apache Kafka и Azure Databricks
В этой статье описывается, как использовать Apache Kafka в качестве источника или приемника при выполнении рабочих нагрузок структурированной потоковой передачи в Azure Databricks.
Дополнительные сведения о Kafka см. в документации по Kafka.
Чтение данных из Kafka
Ниже приведен пример потокового чтения из Kafka:
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
Azure Databricks также поддерживает пакетную семантику чтения для источников данных Kafka, как показано в следующем примере:
df = (spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)
Для добавочной пакетной загрузки Databricks рекомендует использовать Kafka с Trigger.AvailableNow
. См. инструкции по настройке добавочной пакетной обработки.
В Databricks Runtime 13.3 LTS и более поздних версиях Azure Databricks предоставляет функцию SQL для чтения данных Kafka. Потоковая передача с помощью SQL поддерживается только в разностных динамических таблицах или с таблицами потоковой передачи в Databricks SQL. См . read_kafka табличное значение функции.
Настройка средства чтения структурированной потоковой передачи Kafka
Azure Databricks предоставляет ключевое kafka
слово в качестве формата данных для настройки подключений к Kafka 0.10+.
Ниже приведены наиболее распространенные конфигурации для Kafka:
Есть несколько способов указания разделов для подписки. Вам нужно указать только один из следующих параметров:
Вариант | значение | Описание |
---|---|---|
подписка | Список разделов, разделенных запятыми. | Список разделов для подписки. |
subscribePattern | Строка регулярного выражения Java. | Шаблон, используемый для подписки на разделы. |
назначить | Строка в формате JSON {"topicA":[0,1],"topic":[2,4]} . |
Конкретные разделы topicPartitions для использования. |
Другие важные конфигурации:
Вариант | Значение | Значение по умолчанию | Description |
---|---|---|---|
kafka.bootstrap.servers | Список host:port, разделенных запятыми. | empty | [Обязательно] Конфигурация Kafka bootstrap.servers . Если вы обнаружите, что данных из Kafka нет, сначала проверьте список адресов брокера. Если список адресов брокера составлен неправильно, могут возникнуть ошибки. Это обусловлено тем, что клиент Kafka предполагает, что брокеры будут доступны в конечном итоге и в случае ошибок сети повторные попытки можно будет выполнять без ограничений. |
failOnDataLoss | true или false . |
true |
[Необязательно] Указывает, следует ли завершать запрос, если возможна потеря данных. Запросы могут постоянно завершаться сбоем считывания данных из Kafka во многих сценариях, таких как удаление разделов, усечение разделов перед обработкой и т. д. Мы пытаемся с применением консервативного подхода определить, могут ли данные быть потеряны. Иногда это может привести к ложным сигналам. Установите для этого параметра значение false , если он не работает должным образом или необходимо продолжить выполнение запроса, несмотря на потерю данных. |
minPartitions | Целое число >= 0, 0 = отключено. | 0 (отключено) | [Необязательно] Минимальное число разделов для считывания из Kafka. С помощью minPartitions параметра Spark можно настроить произвольный минимум секций для чтения из Kafka. Обычно в Spark реализовано сопоставление разделов Kafka topicPartitions Kafka с разделами Spark, использующими данные из Kafka, в соотношении1-1. Если для параметра minPartitions задано значение, превышающее число topicPartitions Kafka, Spark разбивает большие разделы Kafka на меньшие части. Этот параметр можно установить для ускорения обработки в периоды пиковых нагрузок, неравномерного распределения данных, а также по мере отставания потока. Это происходит за счет инициализации потребителей Kafka для каждого триггера, который может повлиять на производительность, если при подключении к Kafka используется SSL. |
kafka.group.id | Идентификатор группы потребителей Kafka. | не задано | [Необязательно] Идентификатор группы для использования при чтении из Kafka. Следует использовать с осторожностью. По умолчанию каждый запрос формирует уникальный идентификатор группы для чтения данных. Это гарантирует, что каждый запрос будет иметь собственную группу потребителей, которая не испытывает помех от любого другого потребителя, и, следовательно, может считать все разделы соответствующих подписок. В некоторых сценариях (например, при авторизации на основе группы Kafka) возможно, потребуется использовать определенные идентификаторы авторизованных групп для чтения данных. При необходимости можно задать идентификатор группы. Но это нужно сделать с особой осторожностью, так как это может привести к непредвиденному поведению. — Одновременный запуск запросов (как пакет, так и потоковая передача) с одинаковым идентификатором группы, скорее всего, вмешивается друг в друга, что приводит к тому, что каждый запрос считывает только часть данных. — Это также может произойти при запуске или перезапуске запросов в быстром успешном выполнении. Чтобы избежать таких проблем, задайте для конфигурации потребителя Kafka session.timeout.ms очень малое значение. |
startingOffsets | первое, последнее | latest | [Необязательно] Начальная точка при запуске запроса. Это "первое" из начальных смещений или строка JSON, указывающая начальное смещение для каждого TopicPartition. В json, значение -2 в качестве смещения можно использовать для ссылки на первое, а -1 на последнее. Примечание. Для пакетных запросов последнее смещение (заданное неявно или с использованием -1 в json) запрещено. При использовании потоковых запросов это применимо только при запуске нового запроса, а возобновление всегда будет запускаться с места прерывания запроса. Недавно обнаруженные секции во время запроса будут начинаться в самое ближайшее время. |
Сведения о других необязательных конфигурациях см. в статье Руководство по интеграции структурированной потоковой передачи Kafka.
Схема записей Kafka
Схема записей Kafka:
Column | Тип |
---|---|
key | binary |
значение | binary |
topic | строка |
секцию | INT |
offset | длинный |
TIMESTAMP | длинный |
timestampType | INT |
key
и value
всегда десериализуются как массивы байтов с ByteArrayDeserializer
. Используйте операции DataFrame (например cast("string")
), чтобы явно десериализировать ключи и значения.
Запись данных в Kafka
Ниже приведен пример потоковой записи в Kafka:
(df
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)
Azure Databricks также поддерживает семантику пакетной записи в приемники данных Kafka, как показано в следующем примере:
(df
.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)
Настройка модуля записи структурированной потоковой передачи Kafka
Внимание
Databricks Runtime 13.3 LTS и выше включает более новую версию библиотеки kafka-clients
, которая позволяет идемпотентной записи по умолчанию. Если приемник Kafka использует версию 2.8.0 или ниже с настроенными списками управления доступом, но без IDEMPOTENT_WRITE
включения запись завершается ошибкой org.apache.kafka.common.KafkaException:
Cannot execute transactional method because we are in an error state
.
Чтобы устранить эту ошибку, обновите ее до Kafka версии 2.8.0 или более поздней, или установив параметр .option(“kafka.enable.idempotence”, “false”)
при настройке модуля записи структурированной потоковой передачи.
Схема, предоставленная DataStreamWriter, взаимодействует с приемником Kafka. Можно использовать следующие поля:
Имя столбца | Обязательно или необязательно | Тип |
---|---|---|
key |
необязательно | STRING или BINARY |
value |
обязательно | STRING или BINARY |
headers |
необязательно | ARRAY |
topic |
необязательный (игнорируется, если topic задан параметр записи) |
STRING |
partition |
необязательно | INT |
Ниже приведены распространенные параметры при записи в Kafka:
Вариант | Значение | Значение по умолчанию | Description |
---|---|---|---|
kafka.boostrap.servers |
Список разделенных запятыми <host:port> |
ничего | [Обязательно] Конфигурация Kafka bootstrap.servers . |
topic |
STRING |
не задано | [Необязательно] Задает раздел для записи всех строк. Этот параметр переопределяет любой столбец раздела, который существует в данных. |
includeHeaders |
BOOLEAN |
false |
[Необязательно] Следует ли включать заголовки Kafka в строку. |
Сведения о других необязательных конфигурациях см. в статье Руководство по интеграции структурированной потоковой передачи Kafka.
Получение метрик Kafka
С помощью метрик avgOffsetsBehindLatest
, maxOffsetsBehindLatest
и minOffsetsBehindLatest
вы можете получить сведения о среднем, минимальном и максимальном числе смещений, произошедших для запроса потоковой передачи до последнего доступного смещения, для всех разделов, на которые вы подписаны. См. статью Чтение метрик в интерактивном режиме.
Примечание.
Доступно в Databricks Runtime 9.1 и более поздних версий.
Получите предполагаемое общее количество байт, которые процесс запроса не обработал в разделах, на которые вы подписаны, проверив значение estimatedTotalBytesBehindLatest
. Эта оценка основана на пакетах, которые были обработаны за последние 300 секунд. Период, на котором основана оценка, можно изменить, задав для параметра bytesEstimateWindowLength
другое значение. Например, чтобы задать для него значение 10 минут, выполните следующий код:
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)
Если вы выполняете поток в записной книжке, эти метрики можно просмотреть на вкладке "Необработанные данные " на панели мониторинга хода выполнения потокового запроса:
{
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic]]",
"metrics" : {
"avgOffsetsBehindLatest" : "4.0",
"maxOffsetsBehindLatest" : "4",
"minOffsetsBehindLatest" : "4",
"estimatedTotalBytesBehindLatest" : "80.0"
},
} ]
}
Подключение Azure Databricks к Kafka с помощью SSL
Чтобы включить SSL-соединения с Kafka, следуйте инструкциям из раздела документации по Confluent Шифрование и аутентификация с использованием SSL. Вы можете указать описанные здесь конфигурации с префиксом с kafka.
как параметры. Например, вы можете указать расположение хранилища доверия в свойстве kafka.ssl.truststore.location
.
Databricks рекомендует:
- Храните сертификаты в облачном хранилище объектов. Доступ к сертификатам можно ограничить только кластерами, которые могут получить доступ к Kafka. См . сведения об управлении данными с помощью каталога Unity.
- Храните пароли сертификатов как секреты в области секретов.
В следующем примере используются расположения хранилища объектов и секреты Databricks для включения SSL-подключения:
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
Подключение Kafka в HDInsight к Azure Databricks
Создайте кластер Kafka HDInsight.
Инструкции см. в статье Подключение к Kafka в HDInsight с помощью виртуальной сети Azure.
Настройте брокеры Kafka для объявления правильного адреса.
Следуйте инструкциям из раздела Настройка Kafka для объявления IP-адресов. Если вы самостоятельно управляете Kafka на виртуальных машинах Azure, убедитесь, что для конфигурации
advertised.listeners
брокеров задан внутренний IP-адрес узлов.Создайте кластер Azure Databricks.
Установите пиринг между кластером Kafka и кластером Azure Databricks.
Следуйте инструкциям из раздела пиринг между одноранговыми виртуальными сетями.
Проверка подлинности субъекта-службы с помощью идентификатора Microsoft Entra и Центры событий Azure
Azure Databricks поддерживает проверку подлинности заданий Spark со службами Центров событий. Эта проверка подлинности выполняется с помощью OAuth с идентификатором Microsoft Entra.
Azure Databricks поддерживает проверку подлинности Идентификатора Microsoft Entra с идентификатором клиента и секретом в следующих вычислительных средах:
- Databricks Runtime 12.2 LTS и более поздних версий для вычислений, настроенных в режиме доступа к одному пользователю.
- Databricks Runtime 14.3 LTS и более поздних версий для вычислений, настроенных в режиме общего доступа.
- Конвейеры Delta Live Tables, настроенные без каталога Unity.
Azure Databricks не поддерживает проверку подлинности идентификатора Microsoft Entra с сертификатом в любой вычислительной среде или в конвейерах Delta Live Tables, настроенных с помощью каталога Unity.
Эта проверка подлинности не работает в общих кластерах или в разностных динамических таблицах каталога Unity.
Настройка соединителя Структурированной потоковой передачи Kafka
Чтобы выполнить проверку подлинности с помощью идентификатора Microsoft Entra, вам потребуется следующее:
Идентификатор клиента. Это можно найти на вкладке служб идентификатора Microsoft Entra ID .
Идентификатор клиента (также известный как идентификатор приложения).
Секрет клиента. После этого необходимо добавить его в качестве секрета в рабочую область Databricks. Чтобы добавить этот секрет, см. раздел "Управление секретами".
Раздел EventHubs. Список разделов в разделе "Центры событий" можно найти в разделе "Сущности" на определенной странице пространства имен Центров событий. Чтобы работать с несколькими разделами, можно задать роль IAM на уровне Центров событий.
Сервер EventHubs. Это можно найти на странице обзора определенного пространства имен Центров событий:
Кроме того, чтобы использовать идентификатор Записи, необходимо сообщить Kafka использовать механизм OAuth SASL (SASL является универсальным протоколом, и OAuth является типом SASL "механизм"):
kafka.security.protocol
должно бытьSASL_SSL
kafka.sasl.mechanism
должно бытьOAUTHBEARER
kafka.sasl.login.callback.handler.class
должно быть полным именем класса Java со значениемkafkashaded
обработчика обратного вызова для входа класса Kafka. См. следующий пример для точного класса.
Пример
Далее рассмотрим работающий пример:
Python
# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")
event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------
sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'
kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,
# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}
df = spark.readStream.format("kafka").options(**kafka_options)
display(df)
Scala
// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")
val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------
val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""
val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,
// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)
val scalaDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
display(scalaDF)
Обработка потенциальных ошибок
Параметры потоковой передачи не поддерживаются.
Если вы пытаетесь использовать этот механизм проверки подлинности в конвейере Delta Live Tables, настроенном с помощью каталога Unity, может появиться следующая ошибка:
Чтобы устранить эту ошибку, используйте поддерживаемую конфигурацию вычислений. См. проверку подлинности субъекта-службы с помощью идентификатора Microsoft Entra и Центры событий Azure.
Не удалось создать новый
KafkaAdminClient
объект.Это внутренняя ошибка, вызываемая Kafka, если какие-либо из следующих параметров проверки подлинности неверны:
- Идентификатор клиента (также известный как идентификатор приложения)
- Идентификатор клиента
- Сервер EventHubs
Чтобы устранить ошибку, убедитесь, что значения верны для этих параметров.
Кроме того, эта ошибка может появиться при изменении параметров конфигурации, предоставленных по умолчанию в примере (которые вы попросили не изменять), например
kafka.security.protocol
.Возвращаемые записи отсутствуют
Если вы пытаетесь отобразить или обработать кадр данных, но не получаете результаты, вы увидите следующее в пользовательском интерфейсе.
Это сообщение означает, что проверка подлинности прошла успешно, но EventHubs не возвращала никаких данных. Некоторые возможные причины (хотя и не являются исчерпывающими) являются:
- Вы указали неправильный раздел EventHubs .
- Для параметра конфигурации Kafka по умолчанию используется
latest
параметр конфигурацииstartingOffsets
Kafka, и вы пока не получаете никаких данных через раздел. Вы можетеstartingOffsetstoearliest
приступить к чтению данных, начиная с самых ранних смещениях Kafka.