Примечание
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Запрос структурированной потоковой передачи с отслеживанием состояния требует добавочных обновлений сведений о промежуточном состоянии, тогда как запрос структурированной потоковой передачи без отслеживания состояния отслеживает только сведения о том, какие записи были обработаны из источника в приемник.
Операции с отслеживанием состояния включают агрегирование данных в потоках, потоковую dropDuplicates
, соединения потоков и пользовательские приложения с отслеживанием состояния.
Промежуточная информация о состоянии, необходимая для запросов структурированной потоковой передачи с отслеживанием состояния, может привести к непредвиденным задержкам и проблемам в производстве при неправильной настройке.
В Databricks Runtime 13.3 LTS и более поздних версиях можно включить контрольную точку журнала изменений с использованием RocksDB для сокращения длительности контрольной точки и сквозной задержки в вычислениях для структурированных потоковых нагрузок. Databricks рекомендует включать контрольные точки журнала изменений для всех запросов структурированных потоков с отслеживанием состояния. См. раздел "Включить контрольную точку журнала изменений".
Оптимизация запросов в рамках Stateful Structured Streaming
Управление сведениями о промежуточном состоянии для запросов структурированной потоковой передачи с отслеживанием состояния позволяет предотвратить непредвиденные задержки и проблемы в рабочей среде.
Databricks рекомендует следующее:
- Используйте оптимизированные для вычислений экземпляры в качестве работников.
- Установите количество секций перетасовки в 1–2 раза от числа ядер в кластере.
- Задайте для конфигурации
spark.sql.streaming.noDataMicroBatches.enabled
значениеfalse
в SparkSession. Это предотвращает обработку пустых микропакетов потоковым движком. Обратите внимание, что настройка этой конфигурацииfalse
может привести к тому, что операции с отслеживанием состояния, которые используют водяные знаки или ожидание времени обработки, не будут выводить данные до тех пор, пока не поступят новые данные, вместо немедленного вывода.
Databricks рекомендует использовать RocksDB с контрольными точками журнала изменений для управления состоянием потоков. См. статью Настройка хранилища состояний RocksDB в Azure Databricks.
Примечание.
Схему управления состоянием нельзя изменить между перезапусками запросов. Если запрос был запущен с помощью управления по умолчанию, необходимо перезапустить его с нуля с новым расположением контрольной точки, чтобы изменить хранилище состояний.
Работа с несколькими операторами с отслеживанием состояния в структурированной потоковой передаче
В Databricks Runtime 13.3 LTS и более поздних версиях Azure Databricks предлагает расширенную поддержку операторов с сохранением состояния в рабочих нагрузках структурированной потоковой передачи. Теперь можно объединять несколько операторов с сохранением состояния, что означает, что вы можете передавать выходные данные одной операции, такой как оконная агрегация, в другую операцию с сохранением состояния, такую как объединение.
В Databricks Runtime 16.2 и более поздних версиях можно использовать transformWithState
в рабочих нагрузках с несколькими операторами отслеживания состояния. См. Создание пользовательского стейтфул приложения.
В следующих примерах показано несколько шаблонов, которые можно использовать.
Внимание
При работе с несколькими операторами с отслеживанием состояния существуют следующие ограничения:
- Устаревшие пользовательские операторы состояния (
FlatMapGroupWithState
иapplyInPandasWithState
) не поддерживаются. - Поддерживается только режим добавления в вывод.
Агрегирование связанных временных окон
Питон
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()
язык программирования Scala
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
window($"window", "1 hour"),
$"word"
).count()
Агрегирование временного окна в двух разных потоках с последующим соединением окон между потоками
Питон
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()
impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()
clicksWindow.join(impressionsWindow, "window", "inner")
язык программирования Scala
val clicksWindow = clicksWithWatermark
.groupBy(window("clickTime", "1 hour"))
.count()
val impressionsWindow = impressionsWithWatermark
.groupBy(window("impressionTime", "1 hour"))
.count()
clicksWindow.join(impressionsWindow, "window", "inner")
Соединение потоков по временным интервалам, за которым следует агрегирование по временным окнам
Питон
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()
язык программирования Scala
val joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined
.groupBy($"clickAdId", window($"clickTime", "1 hour"))
.count()
Перебалансирование состояния для структурированной потоковой передачи
Включение перебалансировки состояния по умолчанию доступно для всех потоковых рабочих нагрузок в декларативных конвейерах Lakeflow. В Databricks Runtime 11.3 LTS и более поздних версиях можно задать следующий параметр конфигурации в конфигурации кластера Spark, чтобы включить перебалансирование состояния:
spark.sql.streaming.statefulOperator.stateRebalancing.enabled true
Перебалансировка состояния приносит пользу конвейерам управления состоянием в структурированной потоковой передаче, которые проходят через изменения размера кластера. Безгосударственные операции потоковой передачи не получают преимущества вне зависимости от изменения размеров кластера.
Примечание.
Автоматическое масштабирование вычислений имеет ограничения, ограничивающие размер кластера для структурированных рабочих нагрузок потоковой передачи. Databricks рекомендует использовать декларативные конвейеры Lakeflow с расширенным автомасштабированием для потоковых рабочих нагрузок. См. статью "Оптимизация использования кластеров декларативных конвейеров Lakeflow с помощью автомасштабирования".
События изменения размера кластера приводят к перебалансировке состояния. Микропакеты могут иметь более высокую задержку во время перебалансировки событий, так как состояние загружается из облачного хранилища в новые исполнители.