Поделиться через


Рекомендации по структурированной потоковой передаче в рабочей среде

В этой статье содержатся рекомендации по планированию структурированных рабочих нагрузок потоковой передачи с помощью заданий в Azure Databricks.

Databricks рекомендует всегда делать следующее:

  • Удалите ненужный код из записных книжек, которые возвращают результаты, такие как display и count.
  • Не запускайте структурированные рабочие нагрузки потоковой передачи с помощью вычислений всех назначений. Всегда планировать потоки в качестве заданий с помощью вычислений заданий.
  • Планирование заданий с помощью Continuous режима.
  • Не включите автоматическое масштабирование для вычислений для структурированных заданий потоковой передачи.

Некоторые рабочие нагрузки пользуются следующими преимуществами:

Azure Databricks предоставляет разностные динамические таблицы, которые уменьшают сложность управления рабочей инфраструктурой для рабочих нагрузок структурированной потоковой передачи. Databricks рекомендует использовать разностные динамические таблицы для новых структурированных конвейеров потоковой передачи. См. раздел "Что такое разностные динамические таблицы?".

Примечание.

Автоматическое масштабирование вычислений имеет ограничения, ограничивающие размер кластера для структурированных рабочих нагрузок потоковой передачи. Databricks рекомендует для потоковых рабочих нагрузок использовать разностные динамические таблицы (Delta Live Tables) с расширенным автомасштабированием. См. статью "Оптимизация использования кластеров конвейеров Delta Live Tables с расширенным автомасштабированием".

Разработка рабочих нагрузок потоковой передачи для ожидания сбоя

Databricks рекомендует всегда настраивать задания потоковой передачи для автоматического перезапуска при сбое. Некоторые функции, включая эволюцию схемы, предполагают, что структурированные рабочие нагрузки потоковой передачи настроены автоматически. Сведения о настройке заданий структурированной потоковой передачи для перезапуска запросов потоковой передачи при сбое.

Некоторые операции, такие как foreachBatch предоставление по крайней мере один раз, а не точно один раз гарантий. Для этих операций следует сделать, что конвейер обработки является идемпотентным. См. раздел Использование foreachBatch для записи в произвольные приемники данных.

Примечание.

При перезапуске запроса микропакет, запланированный во время предыдущих процессов выполнения. Если задание завершилось сбоем из-за ошибки вне памяти или вы вручную отменили задание из-за чрезмерного микропакета, может потребоваться увеличить масштаб вычислений, чтобы успешно обработать микропакет.

При изменении конфигураций между запусками эти конфигурации применяются к первому запланированному пакету. См. раздел "Восстановление после изменений" в запросе структурированной потоковой передачи.

Когда выполняется повторная попытка задания?

Вы можете запланировать несколько задач в рамках задания Azure Databricks. При настройке задания с помощью непрерывного триггера нельзя задать зависимости между задачами.

Можно запланировать несколько потоков в одном задании, используя один из следующих подходов:

  • Несколько задач. Определение задания с несколькими задачами, выполняющими рабочие нагрузки потоковой передачи с помощью непрерывного триггера.
  • Несколько запросов: определение нескольких потоковых запросов в исходном коде для одной задачи.

Вы также можете объединить эти стратегии. В следующей таблице сравниваются эти подходы.

Множественные задачи Несколько запросов
Как используется общий доступ к вычислительным ресурсам? Databricks рекомендует развертывать задания с соответствующим размером для каждой задачи потоковой передачи. Вы также можете совместно использовать вычислительные ресурсы между задачами. Все запросы используют одинаковые вычислительные ресурсы. Необязательные запросы можно назначать пулам планировщика.
Как обрабатываются повторные попытки? Все задачи должны завершиться ошибкой до повторных попыток задания. Задача повторяется, если любой запрос завершается ошибкой.

Настройка заданий структурированной потоковой передачи для перезапуска запросов потоковой передачи при сбое

Databricks рекомендует настроить все рабочие нагрузки потоковой передачи с помощью непрерывного триггера. См . статью "Непрерывное выполнение заданий".

Непрерывный триггер обеспечивает следующее поведение по умолчанию:

  • Запрещает несколько одновременных запусков задания.
  • Запускает новый запуск при сбое предыдущего запуска.
  • Использует экспоненциальную обратную передачу для повторных попыток.

Databricks рекомендует всегда использовать вычисления заданий вместо вычислений всех назначений при планировании рабочих процессов. При сбое задания и повторных попытках развертываются новые вычислительные ресурсы.

Примечание.

Вам не нужно использовать streamingQuery.awaitTermination() или spark.streams.awaitAnyTermination(). Задания автоматически не допускают выполнение, если потоковый запрос активен.

Использование пулов планировщика для нескольких потоковых запросов

Пулы расписаний можно настроить для назначения вычислительной емкости запросам при выполнении нескольких потоковых запросов из одного исходного кода.

По умолчанию все запросы, запущенные в записной книжке, выполняются в одном и том же пуле планирования. Задания Apache Spark, созданные триггерами из всех запросов потоковой передачи в записной книжке, выполняются друг за другом в порядке fiFO. Это может привести к ненужным задержкам в запросах, так как они не обеспечивают эффективное совместное использование кластерных ресурсов.

Пулы планировщиков позволяют объявлять, какие запросы структурированной потоковой передачи совместно используют вычислительные ресурсы.

В следующем примере выполняется назначение query1 выделенному пулу, а query2 и query3 совместно используют пул планировщика.

# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")

# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")

# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")

Примечание.

Настройка локального свойства должна находиться в той же ячейке записной книжки, где вы запускаете запрос потоковой передачи.

Дополнительные сведения см. в документации по планировщику Fair Scheduler Apache.