Поделиться через


Потоковая передача из Apache Pulsar

Внимание

Эта функция предоставляется в режиме общедоступной предварительной версии.

В Databricks Runtime 14.1 и более поздних версиях можно использовать структурированную потоковую передачу для потоковой передачи данных из Apache Pulsar в Azure Databricks.

Структурированная потоковая передача обеспечивает точно один раз семантику обработки для чтения данных из источников Pulsar.

Пример синтаксиса

Ниже приведен базовый пример использования структурированной потоковой передачи для чтения из Pulsar:

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

Для указания разделов всегда необходимо указать service.url один из следующих вариантов:

  • topic
  • topics
  • topicsPattern

Полный список параметров см. в разделе "Настройка параметров для потоковой передачи Pulsar".

Проверка подлинности в Pulsar

Azure Databricks поддерживает проверку подлинности truststore и хранилища ключей в Pulsar. Databricks рекомендует использовать секреты при хранении сведений о конфигурации.

Во время настройки потока можно задать следующие параметры:

  • pulsar.client.authPluginClassName
  • pulsar.client.authParams
  • pulsar.client.useKeyStoreTls
  • pulsar.client.tlsTrustStoreType
  • pulsar.client.tlsTrustStorePath
  • pulsar.client.tlsTrustStorePassword

Если поток использует a PulsarAdmin, также задайте следующее:

  • pulsar.admin.authPluginClassName
  • pulsar.admin.authParams

В следующем примере демонстрируется настройка параметров проверки подлинности:

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

Схема Pulsar

Схема записей, считываемых из Pulsar, зависит от того, как разделы кодируют свои схемы.

  • Для разделов с схемой Avro или JSON имена полей и типы полей сохраняются в результирующем кадре данных Spark.
  • Для разделов без схемы или с простым типом данных в Pulsar полезные данные загружаются в value столбец.
  • Если средство чтения настроено для чтения нескольких разделов с различными схемами, установите для allowDifferentTopicSchemas загрузки необработанного содержимого value в столбец.

Записи Pulsar имеют следующие поля метаданных:

Column Тип
__key binary
__topic string
__messageId binary
__publishTime timestamp
__eventTime timestamp
__messageProperties map<String, String>

Настройка параметров потоковой передачи Pulsar

Все параметры настраиваются как часть структурированной потоковой передачи с помощью .option("<optionName>", "<optionValue>") синтаксиса. Можно также настроить проверку подлинности с помощью параметров. См . проверку подлинности в Pulsar.

В следующей таблице описаны необходимые конфигурации для Pulsar. Необходимо указать только один из параметров topictopics или topicsPattern.

Вариант Значение по умолчанию Description
service.url ничего Конфигурация Pulsar для службы Pulsar serviceUrl .
topic ничего Строка имени раздела для использования темы.
topics ничего Разделенный запятыми список разделов, которые нужно использовать.
topicsPattern ничего Строка регулярных выражений Java, соответствующая темам для использования.

В следующей таблице описаны другие параметры, поддерживаемые для Pulsar:

Вариант Значение по умолчанию Description
predefinedSubscription ничего Предопределенное имя подписки, используемое соединителем для отслеживания хода выполнения приложения Spark.
subscriptionPrefix ничего Префикс, используемый соединителем для создания случайной подписки для отслеживания хода выполнения приложения Spark.
pollTimeoutMs 120000 Время ожидания для чтения сообщений из Pulsar в миллисекундах.
waitingForNonExistedTopic false Следует ли соединителю ожидать создания нужных разделов.
failOnDataLoss true Определяет, следует ли завершать запрос при потере данных (например, разделы удаляются или сообщения удаляются из-за политики хранения).
allowDifferentTopicSchemas false Если считываются несколько разделов с разными схемами, используйте этот параметр, чтобы отключить автоматическую десериализацию значений раздела на основе схем. При этом trueвозвращаются только необработанные значения.
startingOffsets latest Если latestсредство чтения считывает самые новые записи после запуска. Если earliestсредство чтения считывает с самого раннего смещения. Пользователь также может указать строку JSON, указывающую определенное смещение.
maxBytesPerTrigger ничего Обратимое ограничение максимального числа байтов, которые мы хотим обработать на микробатч. Если это указано, admin.url необходимо также указать.
admin.url ничего Конфигурация Pulsar serviceHttpUrl . Требуется только при maxBytesPerTrigger указании.

Вы также можете указать любые конфигурации клиента, администратора и читателя Pulsar, используя следующие шаблоны:

Расписание Ссылка на параметры конигурации
pulsar.client.* Конфигурация клиента Pulsar
pulsar.admin.* Конфигурация администратора Pulsar
pulsar.reader.* Конфигурация средства чтения Pulsar

Создание начальных смещения JSON

Можно вручную создать идентификатор сообщения, чтобы указать определенное смещение и передать его в формате JSON в startingOffsets параметр. В следующем примере кода показан этот синтаксис:

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()