Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
На этой странице содержатся рекомендации по планированию структурированных рабочих нагрузок потоковой передачи с помощью заданий на Azure Databricks.
Databricks рекомендует всегда настраивать следующие параметры:
- Удалите ненужный код из записных книжек, который возвращает результаты, такие как
displayиcount. - Не запускайте структурированные рабочие нагрузки потоковой передачи на универсальных вычислительных ресурсах. Всегда планировать потоки в качестве заданий с помощью вычислений заданий.
- Планирование заданий с помощью
Continuousрежима. Это относится к функции планирования заданий Azure Databricks, а не к интервалу срабатывания Структурированной потоковой передачи. - Не включите автомасштабирование для вычислений для заданий структурированной потоковой передачи.
Некоторые рабочие нагрузки пользуются следующими преимуществами:
- Настройка хранилища состояний RocksDB в Azure Databricks
- Асинхронная контрольная точка состояния для запросов с сохранением состояния
- Асинхронное отслеживание хода выполнения
Azure Databricks представила декларативные конвейеры Spark Lakeflow для уменьшения сложности управления рабочей инфраструктурой для структурированных рабочих нагрузок потоковой передачи. Databricks рекомендует использовать декларативные конвейеры Lakeflow Spark для новых структурированных конвейеров потоковой передачи. См. Lakeflow Spark: декларативные конвейеры.
Примечание.
Автоматическое масштабирование вычислений имеет ограничения, ограничивающие размер кластера для структурированных рабочих нагрузок потоковой передачи. Databricks рекомендует использовать декларативные конвейеры Lakeflow Spark с расширенным автомасштабированием для потоковых рабочих нагрузок. См. статью "Оптимизация использования кластеров Декларативных конвейеров Spark Lakeflow с помощью автомасштабирования".
:::note Бессерверные вычисления
На бессерверных вычислительных ресурсах поддерживаются только Trigger.AvailableNow() и Trigger.Once(). Databricks рекомендует Trigger.AvailableNow().
Для непрерывной потоковой передачи на бессерверных вычислениях используйте режим конвейера: потоковый или непрерывный в непрерывном режиме.
См. ограничения потоковой передачи.
:::
Проектирование нагрузок стриминга с расчетом на сбои
Databricks рекомендует всегда настраивать задания потоковой передачи для автоматического перезапуска при сбое. Для некоторых возможностей, включая эволюцию схемы, требуется, чтобы рабочие нагрузки структурированной потоковой передачи были настроены на автоматический повтор. См. настройку структурированных потоковых заданий для перезапуска потоковых запросов при сбое.
Некоторые операции, такие как foreachBatch, предоставляют гарантию выполнения по крайней мере один раз, а не точно один раз. Для этих операций убедитесь, что конвейер процессинга идемпотентен. См. раздел Использование foreachBatch для записи в произвольные приемники данных.
Примечание.
При перезапуске запроса обрабатывается микропакет, запланированный во время предыдущего выполнения. Если задание завершилось сбоем из-за ошибки нехватки памяти или вы вручную отменили задание из-за слишком большого микро-пакета, может потребоваться увеличить масштаб вычислений, чтобы успешно обработать микро-пакет.
При изменении конфигураций между запусками эти конфигурации применяются к первому запланированному пакету. См. раздел «Восстановление после изменений в запросе структурированного потокового вещания».
Когда происходит повторная попытка задания?
Можно запланировать несколько задач в рамках задания Azure Databricks. При настройке задания с помощью непрерывного триггера нельзя задать зависимости между задачами.
Можно запланировать несколько потоков в одном задании, используя один из следующих подходов:
- Несколько задач: Определите задание с несколькими задачами, выполняющими рабочие нагрузки потоковой передачи с помощью непрерывного триггера.
- Несколько запросов: определение нескольких потоковых запросов в исходном коде для одной задачи.
Вы также можете объединить эти стратегии. В следующей таблице сравниваются эти подходы.
| Стратегия | Множественные задачи | Несколько запросов |
|---|---|---|
| Как используется общий доступ к вычислительным ресурсам? | Databricks рекомендует развертывать вычислительные ресурсы, соответствующие каждой задаче потоковой обработки. Вы также можете совместно использовать вычислительные ресурсы между задачами. | Все запросы используют одинаковые вычислительные ресурсы. При необходимости можно назначать запросы пулам планировщика. |
| Как обрабатываются повторные попытки? | Все задачи должны завершиться сбоем перед повторными попытками выполнения задания. | Задача повторяется, если любой запрос завершается ошибкой. |
Настройка заданий структурированной потоковой передачи для перезапуска запросов потоковой передачи при сбое
Databricks рекомендует настраивать все потоковые рабочие нагрузки с помощью непрерывного триггера. См . статью "Непрерывное выполнение заданий".
По умолчанию непрерывный триггер имеет следующее поведение:
- Предотвращает более одного одновременного выполнения задачи.
- Запускает новый запуск при сбое предыдущего запуска.
- Использует экспоненциальную задержку для повторных попыток.
Databricks рекомендует всегда использовать вычислительные ресурсы для заданий вместо универсальных вычислений при планировании рабочих процессов. При сбое задания и повторных попытках развертываются новые вычислительные ресурсы.
Примечание.
Databricks рекомендует не использовать streamingQuery.awaitTermination() или spark.streams.awaitAnyTermination(). См. раздел "Когда следует использовать awaitTermination()".
Когда следует использовать awaitTermination()
streamingQuery.awaitTermination() и spark.streams.awaitAnyTermination() блокируйте текущий поток до завершения потокового запроса. Использование этих функций зависит от вашей среды выполнения.
Для заданий Databricks не используйте streamingQuery.awaitTermination() или spark.streams.awaitAnyTermination(). Эти функции не нужны, так как служба заданий автоматически предотвращает завершение выполнения, когда активен запрос потоковой передачи. Обе функции блокируют заполнение ячеек записной книжки и не позволяют службе заданий отслеживать потоковый запрос, что нарушает метрики невыполненной работы и уведомления о задании.
Используйте awaitTermination() в следующих случаях:
| Сценарий использования | Поведение |
|---|---|
| Интерактивные тетради для универсальных вычислений |
awaitTermination() поддерживает работу ячейки, позволяет наблюдать за состоянием запроса и обеспечивает отображение сбоев в выходных данных записной книжки. |
| Локальные среды и среды разработки | При локальном запуске программы Spark процесс завершается после завершения основного потока. Вызов awaitTermination() для поддержания работоспособности программы до завершения или сбоя потокового запроса. |
| Распространение сбоя на драйвер | Без awaitTermination()этого сбой потокового запроса в контексте, отличном от задания, может не распространяться в вызывающий поток. Запрос может завершиться сбоем без уведомления, что затрудняет обнаружение и диагностику проблемы. Вызов awaitTermination() повторно генерирует исключение запроса в драйвере. |
Использование пулов планировщика для нескольких потоковых запросов
Пулы планировщиков можно настроить для назначения вычислительных ресурсов запросам при выполнении нескольких потоковых запросов из одного исходного кода.
По умолчанию все запросы, запущенные в ноутбуке, выполняются в том же пуле планирования. Задания Apache Spark, созданные триггерами из всех потоковых запросов в записной книжке, выполняются последовательно в порядке FIFO. Это может привести к ненужным задержкам в запросах, так как они не обеспечивают эффективное совместное использование кластерных ресурсов.
Пулы планировщиков позволяют объявлять, какие запросы структурированной потоковой передачи совместно используют вычислительные ресурсы.
В следующем примере query1 назначается выделенному пулу, тогда как query2 и query3 используют общий пул планировщика.
# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")
# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")
# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")
Примечание.
Конфигурация локального свойства должна находиться в той же ячейке записной книжки, где запускается запрос потоковой передачи.
Дополнительные сведения о пулах планировщика Apache Fair см. в документации по планировщику Apache Fair.