Потоковая передача из Apache Pulsar
Внимание
Эта функция предоставляется в режиме общедоступной предварительной версии.
В Databricks Runtime 14.1 и более поздних версиях можно использовать структурированную потоковую передачу для потоковой передачи данных из Apache Pulsar в Azure Databricks.
Структурированная потоковая передача обеспечивает точно один раз семантику обработки для чтения данных из источников Pulsar.
Пример синтаксиса
Ниже приведен базовый пример использования структурированной потоковой передачи для чтения из Pulsar:
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
Для указания разделов всегда необходимо указать service.url
один из следующих вариантов:
topic
topics
topicsPattern
Полный список параметров см. в разделе "Настройка параметров для потоковой передачи Pulsar".
Проверка подлинности в Pulsar
Azure Databricks поддерживает проверку подлинности truststore и хранилища ключей в Pulsar. Databricks рекомендует использовать секреты при хранении сведений о конфигурации.
Во время настройки потока можно задать следующие параметры:
pulsar.client.authPluginClassName
pulsar.client.authParams
pulsar.client.useKeyStoreTls
pulsar.client.tlsTrustStoreType
pulsar.client.tlsTrustStorePath
pulsar.client.tlsTrustStorePassword
Если поток использует a PulsarAdmin
, также задайте следующее:
pulsar.admin.authPluginClassName
pulsar.admin.authParams
В следующем примере демонстрируется настройка параметров проверки подлинности:
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()
Схема Pulsar
Схема записей, считываемых из Pulsar, зависит от того, как разделы кодируют свои схемы.
- Для разделов с схемой Avro или JSON имена полей и типы полей сохраняются в результирующем кадре данных Spark.
- Для разделов без схемы или с простым типом данных в Pulsar полезные данные загружаются в
value
столбец. - Если средство чтения настроено для чтения нескольких разделов с различными схемами, установите для
allowDifferentTopicSchemas
загрузки необработанного содержимогоvalue
в столбец.
Записи Pulsar имеют следующие поля метаданных:
Column | Тип |
---|---|
__key |
binary |
__topic |
string |
__messageId |
binary |
__publishTime |
timestamp |
__eventTime |
timestamp |
__messageProperties |
map<String, String> |
Настройка параметров потоковой передачи Pulsar
Все параметры настраиваются как часть структурированной потоковой передачи с помощью .option("<optionName>", "<optionValue>")
синтаксиса. Можно также настроить проверку подлинности с помощью параметров. См . проверку подлинности в Pulsar.
В следующей таблице описаны необходимые конфигурации для Pulsar. Необходимо указать только один из параметров topic
topics
или topicsPattern
.
Вариант | Значение по умолчанию | Description |
---|---|---|
service.url |
ничего | Конфигурация Pulsar для службы Pulsar serviceUrl . |
topic |
ничего | Строка имени раздела для использования темы. |
topics |
ничего | Разделенный запятыми список разделов, которые нужно использовать. |
topicsPattern |
ничего | Строка регулярных выражений Java, соответствующая темам для использования. |
В следующей таблице описаны другие параметры, поддерживаемые для Pulsar:
Вариант | Значение по умолчанию | Description |
---|---|---|
predefinedSubscription |
ничего | Предопределенное имя подписки, используемое соединителем для отслеживания хода выполнения приложения Spark. |
subscriptionPrefix |
ничего | Префикс, используемый соединителем для создания случайной подписки для отслеживания хода выполнения приложения Spark. |
pollTimeoutMs |
120000 | Время ожидания для чтения сообщений из Pulsar в миллисекундах. |
waitingForNonExistedTopic |
false |
Следует ли соединителю ожидать создания нужных разделов. |
failOnDataLoss |
true |
Определяет, следует ли завершать запрос при потере данных (например, разделы удаляются или сообщения удаляются из-за политики хранения). |
allowDifferentTopicSchemas |
false |
Если считываются несколько разделов с разными схемами, используйте этот параметр, чтобы отключить автоматическую десериализацию значений раздела на основе схем. При этом true возвращаются только необработанные значения. |
startingOffsets |
latest |
Если latest средство чтения считывает самые новые записи после запуска. Если earliest средство чтения считывает с самого раннего смещения. Пользователь также может указать строку JSON, указывающую определенное смещение. |
maxBytesPerTrigger |
ничего | Обратимое ограничение максимального числа байтов, которые мы хотим обработать на микробатч. Если это указано, admin.url необходимо также указать. |
admin.url |
ничего | Конфигурация Pulsar serviceHttpUrl . Требуется только при maxBytesPerTrigger указании. |
Вы также можете указать любые конфигурации клиента, администратора и читателя Pulsar, используя следующие шаблоны:
Расписание | Ссылка на параметры конигурации |
---|---|
pulsar.client.* |
Конфигурация клиента Pulsar |
pulsar.admin.* |
Конфигурация администратора Pulsar |
pulsar.reader.* |
Конфигурация средства чтения Pulsar |
Создание начальных смещения JSON
Можно вручную создать идентификатор сообщения, чтобы указать определенное смещение и передать его в формате JSON в startingOffsets
параметр. В следующем примере кода показан этот синтаксис:
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()