read_pulsar функция потоковой передачи табличного значения

Область применения:check marked yes Databricks SQL check marked yes Databricks Runtime 14.1 и более поздних версий

Важно!

Эта функция предоставляется в режиме общедоступной предварительной версии.

Возвращает таблицу с записями, считываемыми из Pulsar.

Эта табличная функция поддерживает только потоковую передачу, а не пакетный запрос.

Синтаксис

read_pulsar ( { option_key => option_value } [, ...] )

Аргументы

Для этой функции требуется вызов именованного параметра для ключей параметров.

Параметры serviceUrl и topic обязательны.

Ниже приведены краткие описания аргументов. Дополнительные описания см . в структурированной документации по потоковой передаче Pulsar .

Параметр Тип По умолчанию. Description
serviceUrl STRING Обязательный Универсальный код ресурса (URI) службы Pulsar.
topic STRING Обязательный Раздел, из который следует прочитать.
predefinedSubscription STRING нет Предопределенное имя подписки, используемое соединителем для отслеживания хода выполнения приложения Spark.
subscriptionPrefix STRING нет Префикс, используемый соединителем для создания случайной подписки для отслеживания хода выполнения приложения Spark.
pollTimeoutMs LONG 120000 Время ожидания для чтения сообщений из Pulsar в миллисекундах.
failOnDataLoss BOOLEAN true Определяет, следует ли завершать запрос при потере данных (например, разделы удаляются или сообщения удаляются из-за политики хранения).
startingOffsets STRING latest Начальная точка при запуске запроса ( самая ранняя, последняя или строка JSON), указывающая определенное смещение. Если последняя версия, средство чтения считывает самые новые записи после запуска. Если раньше, средство чтения считывает с самого раннего смещения. Пользователь также может указать строку JSON, указывающую определенное смещение.
startingTime STRING нет При указании источник Pulsar считывает сообщения, начиная с позиции указанного начального времени.

Следующие аргументы используются для проверки подлинности клиента pulsar:

Параметр Тип По умолчанию. Description
pulsarClientAuthPluginClassName STRING нет Имя подключаемого модуля проверки подлинности.
pulsarClientAuthParams STRING нет Параметры подключаемого модуля проверки подлинности.
pulsarClientUseKeyStoreTls STRING нет Следует ли использовать KeyStore для проверки подлинности tls.
pulsarClientTlsTrustStoreType STRING нет Тип файла TrustStore для проверки подлинности tls.
pulsarClientTlsTrustStorePath STRING нет Путь к файлу TrustStore для проверки подлинности tls.
pulsarClientTlsTrustStorePassword STRING нет Пароль TrustStore для проверки подлинности tls.

Эти аргументы используются для настройки и проверки подлинности управления приемом пульсара, конфигурация администратора pulsar требуется только в том случае, если включен контроль допуска (если задан параметр maxBytesPerTrigger).

Параметр Тип По умолчанию. Description
maxBytesPerTrigger BIGINT нет Обратимое ограничение максимального числа байтов, которые мы хотим обработать на микробатч. Если это указано, необходимо также указать admin.url.
adminUrl STRING нет Конфигурация Pulsar serviceHttpUrl. Требуется только при указании maxBytesPerTrigger.
pulsar Администратор AuthPlugin STRING нет Имя подключаемого модуля проверки подлинности.
pulsar Администратор AuthParams STRING нет Параметры подключаемого модуля проверки подлинности.
pulsarClientUseKeyStoreTls STRING нет Следует ли использовать KeyStore для проверки подлинности tls.
pulsar Администратор TlsTrustStoreType STRING нет Тип файла TrustStore для проверки подлинности tls.
pulsar Администратор TlsTrustStorePath STRING нет Путь к файлу TrustStore для проверки подлинности tls.
pulsar Администратор TlsTrustStorePassword STRING нет Пароль TrustStore для проверки подлинности tls.

Возвраты

Таблица записей пульсара со следующей схемой.

  • __key STRING NOT NULL: ключ сообщения Pulsar.

  • value BINARY NOT NULL: значение сообщения Pulsar.

    Примечание. Для разделов с схемой Avro или JSON вместо загрузки содержимого в поле двоичного значения содержимое будет развернуто, чтобы сохранить имена полей и типы полей раздела Pulsar.

  • __topic STRING NOT NULL: имя раздела Pulsar.

  • __messageId BINARY NOT NULL: идентификатор сообщения Pulsar.

  • __publishTime TIMESTAMP NOT NULL: время публикации сообщения Pulsar.

  • __eventTime TIMESTAMP NOT NULL: время события сообщения Pulsar.

  • __messageProperties MAP<STRING, STRING>: свойства сообщения Pulsar.

Примеры

-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
      serviceUrl => 'pulsar://broker.example.com:6650',
      startingOffsets => 'earliest',
      topic => 'my-topic');

-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
        serviceUrl => 'pulsar://broker.example.com:6650',
        startingOffsets => 'earliest',
        topic => 'my-topic',
        pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
        pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
        );

The data can now to be queried from the testing.streaming_table for further analysis.