Поделиться через


Подписка на Google Pub/Sub

Azure Databricks предоставляет встроенный соединитель для подписки на Google Pub/Sub в Databricks Runtime 13.3 LTS и выше. Этот соединитель предоставляет точно один раз семантику обработки записей от подписчика.

Примечание.

Pub/Sub может публиковать повторяющиеся записи, и записи могут поступать подписчику вне порядка. Для обработки повторяющихся и устаревших записей необходимо написать код Azure Databricks.

Пример синтаксиса

В следующем примере кода показан базовый синтаксис для настройки структурированной потоковой передачи из Pub/Sub:

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // we will create a Pubsub subscription if none exists with this id
  .option("subscriptionId", "mysub") // required
  .option("topicId", "mytopic") // required
  .option("projectId", "myproject") // required
  .options(authOptions)
  .load()

Дополнительные параметры конфигурации см. в разделе "Настройка параметров потоковой передачи pub/sub".

Настройка доступа к Pub/Sub

Databricks рекомендует использовать секреты при предоставлении параметров авторизации. Для авторизации подключения требуются следующие параметры:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

В следующей таблице описаны роли, необходимые для настроенных учетных данных:

Роли Обязательно или необязательно Использование
roles/pubsub.viewer или roles/viewer Обязательное поле Проверьте, существует ли подписка и получить подписку
roles/pubsub.subscriber Обязательное поле Получение данных из подписки
roles/pubsub.editor или roles/editor Необязательно Включает создание подписки, если она не существует, а также позволяет использовать deleteSubscriptionOnStreamStop ее для удаления подписок при завершении потока.

Схема Pub/Sub

Схема потока соответствует записям, которые извлекаются из Pub/Sub, как описано в следующей таблице:

Поле Тип
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Настройка параметров для чтения потоковой передачи pub/sub

В следующей таблице описаны параметры, поддерживаемые для Pub/Sub. Все параметры настраиваются как часть структурированной потоковой передачи с помощью .option("<optionName>", "<optionValue>") синтаксиса.

Примечание.

Некоторые параметры конфигурации Pub/Sub используют концепцию получения вместо микропакетов. Это отражает сведения о внутренней реализации, а параметры работают аналогично совместному выполнению в других соединителях структурированной потоковой передачи, за исключением того, что записи извлекаются и обрабатываются.

Вариант Значение по умолчанию Description
numFetchPartitions При инициализации потока задается значение 1 половины числа исполнителей, присутствующих в потоке. Количество параллельных задач Spark, которые извлекает записи из подписки.
deleteSubscriptionOnStreamStop false Если trueподписка, переданная потоку, удаляется при завершении задания потоковой передачи.
maxBytesPerTrigger ничего Обратимое ограничение для обработки размера пакета во время каждого запускаемого микропакета.
maxRecordsPerFetch 1000 Количество записей для получения каждой задачи перед обработкой записей.
maxFetchPeriod 10 seconds Длительность времени для каждой задачи, извлекаемой перед обработкой записей. Databricks рекомендует использовать значение по умолчанию.

Добавочная семантика пакетной обработки для Pub/Sub

Можно использовать Trigger.AvailableNow для использования доступных записей из источников Pub/Sub добавочного пакета.

Azure Databricks записывает метку времени при начале чтения с параметром Trigger.AvailableNow . Записи, обработанные пакетом, включают все ранее извлекаемые данные и все недавно опубликованные записи с меткой времени меньше, чем метка времени начала записанного потока.

См. инструкции по настройке добавочной пакетной обработки.

Мониторинг метрик потоковой передачи

Структурированные метрики хода выполнения потоковой передачи сообщают о количестве записей, извлекаемых и готовых к обработке, размере записей, которые будут возвращены и готовы к обработке, и количеству повторяющихся записей, которые отображаются с момента запуска потока. Ниже приведен пример этих метрик:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Ограничения

Спекулятивное выполнение (spark.speculation) не поддерживается в Pub/Sub.