Потоковая обработка с помощью Apache Kafka и Azure Databricks

В этой статье описывается, как использовать Apache Kafka в качестве источника или приемника при выполнении рабочих нагрузок структурированной потоковой передачи в Azure Databricks.

Дополнительные сведения о Kafka см. в документации по Kafka.

Чтение данных из Kafka

Ниже приведен пример потокового чтения из Kafka:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

Azure Databricks также поддерживает пакетную семантику чтения для источников данных Kafka, как показано в следующем примере:

df = (spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

Для добавочной пакетной загрузки Databricks рекомендует использовать Kafka с Trigger.AvailableNow. См. инструкции по настройке добавочной пакетной обработки.

В Databricks Runtime 13.1 и более поздних версиях Azure Databricks предоставляет функцию SQL для чтения данных Kafka. Потоковая передача с помощью SQL поддерживается только в разностных динамических таблицах или с таблицами потоковой передачи в Databricks SQL. См . read_kafka табличное значение функции.

Настройка средства чтения структурированной потоковой передачи Kafka

Azure Databricks предоставляет kafka ключевое слово в виде формата данных для настройки подключений к Kafka 0.10+.

Ниже приведены наиболее распространенные конфигурации для Kafka:

Есть несколько способов указания разделов для подписки. Вам нужно указать только один из следующих параметров:

Вариант значение Описание
подписка Список разделов, разделенных запятыми. Список разделов для подписки.
subscribePattern Строка регулярного выражения Java. Шаблон, используемый для подписки на разделы.
назначить Строка в формате JSON {"topicA":[0,1],"topic":[2,4]}. Конкретные разделы topicPartitions для использования.

Другие важные конфигурации:

Вариант Значение Значение по умолчанию Description
kafka.bootstrap.servers Список host:port, разделенных запятыми. empty [Обязательно] Конфигурация Kafka bootstrap.servers. Если вы обнаружите, что данных из Kafka нет, сначала проверьте список адресов брокера. Если список адресов брокера составлен неправильно, могут возникнуть ошибки. Это обусловлено тем, что клиент Kafka предполагает, что брокеры будут доступны в конечном итоге и в случае ошибок сети повторные попытки можно будет выполнять без ограничений.
failOnDataLoss true или false. true [Необязательно] Указывает, следует ли завершать запрос, если возможна потеря данных. Запросы могут постоянно завершаться сбоем считывания данных из Kafka во многих сценариях, таких как удаление разделов, усечение разделов перед обработкой и т. д. Мы пытаемся с применением консервативного подхода определить, могут ли данные быть потеряны. Иногда это может привести к ложным сигналам. Установите для этого параметра значение false, если он не работает должным образом или необходимо продолжить выполнение запроса, несмотря на потерю данных.
minPartitions Целое число >= 0, 0 = отключено. 0 (отключено) [Необязательно] Минимальное число разделов для считывания из Kafka. С помощью minPartitions параметра Spark можно настроить произвольный минимум секций для чтения из Kafka. Обычно в Spark реализовано сопоставление разделов Kafka topicPartitions Kafka с разделами Spark, использующими данные из Kafka, в соотношении1-1. Если для параметра minPartitions задано значение, превышающее число topicPartitions Kafka, Spark разбивает большие разделы Kafka на меньшие части. Этот параметр можно установить для ускорения обработки в периоды пиковых нагрузок, неравномерного распределения данных, а также по мере отставания потока. Это происходит за счет инициализации потребителей Kafka для каждого триггера, который может повлиять на производительность, если при подключении к Kafka используется SSL.
kafka.group.id Идентификатор группы потребителей Kafka. не задано [Необязательно] Идентификатор группы для использования при чтении из Kafka. Следует использовать с осторожностью. По умолчанию каждый запрос формирует уникальный идентификатор группы для чтения данных. Это гарантирует, что каждый запрос будет иметь собственную группу потребителей, которая не испытывает помех от любого другого потребителя, и, следовательно, может считать все разделы соответствующих подписок. В некоторых сценариях (например, при авторизации на основе группы Kafka) возможно, потребуется использовать определенные идентификаторы авторизованных групп для чтения данных. При необходимости можно задать идентификатор группы. Но это нужно сделать с особой осторожностью, так как это может привести к непредвиденному поведению.

* Параллельно выполняющиеся запросы (как пакетной обработки, так и потоковой передачи) с одним и тем же идентификатором группы, скорее всего, препятствуют друг другу, поэтому для каждого запроса считывается только часть данных.
* Кроме того, это может произойти при запуске или перезапуске запросов в ходе быстрого выполнения. Чтобы избежать таких проблем, задайте для конфигурации потребителя Kafka session.timeout.ms очень малое значение.
startingOffsets первое, последнее latest [Необязательно] Начальная точка при запуске запроса. Это "первое" из начальных смещений или строка JSON, указывающая начальное смещение для каждого TopicPartition. В json, значение -2 в качестве смещения можно использовать для ссылки на первое, а -1 на последнее. Примечание. Для пакетных запросов последнее смещение (заданное неявно или с использованием -1 в json) запрещено. При использовании потоковых запросов это применимо только при запуске нового запроса, а возобновление всегда будет запускаться с места прерывания запроса. Недавно обнаруженные секции во время запроса будут начинаться в самое ближайшее время.

Сведения о других необязательных конфигурациях см. в статье Руководство по интеграции структурированной потоковой передачи Kafka.

Схема записей Kafka

Схема записей Kafka:

Column Тип
key binary
значение binary
topic строка
секцию INT
offset длинный
TIMESTAMP длинный
timestampType INT

key и value всегда десериализуются как массивы байтов с ByteArrayDeserializer. Используйте операции DataFrame (например cast("string")), чтобы явно десериализировать ключи и значения.

Запись данных в Kafka

Ниже приведен пример потоковой записи в Kafka:

(df
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

Azure Databricks также поддерживает семантику пакетной записи в приемники данных Kafka, как показано в следующем примере:

(df
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

Настройка модуля записи структурированной потоковой передачи Kafka

Внимание

Databricks Runtime 13.1 и более поздних версий kafka-clients включает более новую версию библиотеки, которая позволяет идемпотентной записи по умолчанию. Если приемник Kafka использует версию 2.8.0 или ниже с настроенными списками управления доступом, но без IDEMPOTENT_WRITE включения, запись завершается ошибкой org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error state.

Чтобы устранить эту ошибку, обновите ее до Kafka версии 2.8.0 или более поздней, или установив параметр .option(“kafka.enable.idempotence”, “false”) при настройке модуля записи структурированной потоковой передачи.

Схема, предоставленная DataStreamWriter, взаимодействует с приемником Kafka. Можно использовать следующие поля:

Имя столбца Обязательно или необязательно Тип
key необязательно STRING или BINARY
value обязательно STRING или BINARY
headers необязательно ARRAY
topic необязательный (игнорируется, если topic задан параметр записи) STRING
partition необязательно INT

Ниже приведены распространенные параметры при записи в Kafka:

Вариант Значение Значение по умолчанию Description
kafka.boostrap.servers Список разделенных запятыми <host:port> ничего [Обязательно] Конфигурация Kafka bootstrap.servers.
topic STRING не задано [Необязательно] Задает раздел для записи всех строк. Этот параметр переопределяет любой столбец раздела, который существует в данных.
includeHeaders BOOLEAN false [Необязательно] Следует ли включать заголовки Kafka в строку.

Сведения о других необязательных конфигурациях см. в статье Руководство по интеграции структурированной потоковой передачи Kafka.

Получение метрик Kafka

С помощью метрик avgOffsetsBehindLatest, maxOffsetsBehindLatest и minOffsetsBehindLatest вы можете получить сведения о среднем, минимальном и максимальном числе смещений, произошедших для запроса потоковой передачи до последнего доступного смещения, для всех разделов, на которые вы подписаны. См. статью Чтение метрик в интерактивном режиме.

Примечание.

Доступно в Databricks Runtime 9.1 и более поздних версий.

Получите предполагаемое общее количество байт, которые процесс запроса не обработал в разделах, на которые вы подписаны, проверив значение estimatedTotalBytesBehindLatest. Эта оценка основана на пакетах, которые были обработаны за последние 300 секунд. Период, на котором основана оценка, можно изменить, задав для параметра bytesEstimateWindowLength другое значение. Например, чтобы задать для него значение 10 минут, выполните следующий код:

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Если вы выполняете поток в записной книжке, эти метрики можно просмотреть на вкладке "Необработанные данные " на панели мониторинга хода выполнения потокового запроса:

{
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[topic]]",
    "metrics" : {
      "avgOffsetsBehindLatest" : "4.0",
      "maxOffsetsBehindLatest" : "4",
      "minOffsetsBehindLatest" : "4",
      "estimatedTotalBytesBehindLatest" : "80.0"
    },
  } ]
}

Подключение Azure Databricks к Kafka с помощью SSL

Чтобы включить SSL-соединения с Kafka, следуйте инструкциям из раздела документации по Confluent Шифрование и аутентификация с использованием SSL. Вы можете указать описанные здесь конфигурации с префиксом с kafka. как параметры. Например, вы можете указать расположение хранилища доверия в свойстве kafka.ssl.truststore.location.

Databricks рекомендует:

В следующем примере используются расположения хранилища объектов и секреты Databricks для включения SSL-подключения:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Подключение Kafka в HDInsight к Azure Databricks

  1. Создайте кластер Kafka HDInsight.

    Инструкции см. в статье Подключение к Kafka в HDInsight с помощью виртуальной сети Azure.

  2. Настройте брокеры Kafka для объявления правильного адреса.

    Следуйте инструкциям из раздела Настройка Kafka для объявления IP-адресов. Если вы самостоятельно управляете Kafka на виртуальных машинах Azure, убедитесь, что для конфигурации advertised.listeners брокеров задан внутренний IP-адрес узлов.

  3. Создайте кластер Azure Databricks.

  4. Установите пиринг между кластером Kafka и кластером Azure Databricks.

    Следуйте инструкциям из раздела пиринг между одноранговыми виртуальными сетями.

Проверка подлинности субъекта-службы с помощью идентификатора Microsoft Entra (ранее Azure Active Directory) и Центры событий Azure

Azure Databricks поддерживает проверку подлинности заданий Spark со службами Центров событий. Эта проверка подлинности выполняется с помощью OAuth с идентификатором Microsoft Entra (ранее — Azure Active Directory).

AAD Authentication diagram

Azure Databricks поддерживает проверку подлинности Идентификатора Microsoft Entra с идентификатором клиента и секретом в следующих вычислительных средах:

  • Databricks Runtime 12.2 LTS и более поздних версий для вычислений, настроенных в режиме доступа к одному пользователю.
  • Databricks Runtime 14.3 LTS и более поздних версий для вычислений, настроенных в режиме общего доступа.
  • Конвейеры Delta Live Tables, настроенные без каталога Unity.

Azure Databricks не поддерживает проверку подлинности идентификатора Microsoft Entra с сертификатом в любой вычислительной среде или в конвейерах Delta Live Tables, настроенных с помощью каталога Unity.

Эта проверка подлинности не работает в общих кластерах или в разностных динамических таблицах каталога Unity.

Настройка структурированной потоковой передачи Kafka Подключение or

Чтобы выполнить проверку подлинности с помощью идентификатора Microsoft Entra, вам потребуется следующее:

  • Идентификатор клиента. Это можно найти на вкладке служб идентификатора Microsoft Entra ID .

  • Идентификатор клиента (также известный как идентификатор приложения).

  • Секрет клиента. После этого необходимо добавить его в качестве секрета в рабочую область Databricks. Чтобы добавить этот секрет, см. раздел "Управление секретами".

  • Раздел EventHubs. Список разделов в разделе "Центры событий" можно найти в разделе "Сущности" на определенной странице пространства имен Центров событий. Чтобы работать с несколькими разделами, можно задать роль IAM на уровне Центров событий.

  • Сервер EventHubs. Это можно найти на странице обзора определенного пространства имен Центров событий:

    Event Hubs namespace

Кроме того, чтобы использовать идентификатор Записи, необходимо сообщить Kafka использовать механизм OAuth SASL (SASL является универсальным протоколом, и OAuth является типом SASL "механизм"):

  • kafka.security.protocol должно быть SASL_SSL
  • kafka.sasl.mechanism должно быть OAUTHBEARER
  • kafka.sasl.login.callback.handler.class должно быть полным именем класса Java со значением kafkashaded обработчика обратного вызова для входа класса Kafka. См. следующий пример для точного класса.

Пример

Далее рассмотрим работающий пример:

Python

# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,

# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

Scala

// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")

val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------

val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""

val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,

// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)

val scalaDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

display(scalaDF)

Обработка потенциальных ошибок

  • Параметры потоковой передачи не поддерживаются.

    Если вы пытаетесь использовать этот механизм проверки подлинности в конвейере Delta Live Tables, настроенном с помощью каталога Unity, может появиться следующая ошибка:

    Unsupported streaming error

    Чтобы устранить эту ошибку, используйте поддерживаемую конфигурацию вычислений. Ознакомьтесь с проверкой подлинности субъекта-службы с помощью идентификатора Microsoft Entra (ранее Azure Active Directory) и Центры событий Azure.

  • Не удалось создать новый KafkaAdminClientобъект.

    Это внутренняя ошибка, вызываемая Kafka, если какие-либо из следующих параметров проверки подлинности неверны:

    • Идентификатор клиента (также известный как идентификатор приложения)
    • Идентификатор клиента
    • Сервер EventHubs

    Чтобы устранить ошибку, убедитесь, что значения верны для этих параметров.

    Кроме того, эта ошибка может появиться при изменении параметров конфигурации, предоставленных по умолчанию в примере (которые вы попросили не изменять), например kafka.security.protocol.

  • Возвращаемые записи отсутствуют

    Если вы пытаетесь отобразить или обработать кадр данных, но не получаете результаты, вы увидите следующее в пользовательском интерфейсе.

    No results message

    Это сообщение означает, что проверка подлинности прошла успешно, но EventHubs не возвращала никаких данных. Некоторые возможные причины (хотя и не являются исчерпывающими) являются:

    • Вы указали неправильный раздел EventHubs .
    • Для параметра конфигурации Kafka по умолчанию используется latestпараметр конфигурации startingOffsets Kafka, и вы пока не получаете никаких данных через раздел. Вы можете startingOffsetstoearliest приступить к чтению данных, начиная с самых ранних смещениях Kafka.