Поделиться через


Асинхронная контрольная точка состояния для запросов с отслеживанием состояния

Примечание.

Доступно в Databricks Runtime 10.4 LTS и более поздних версиях.

Асинхронная контрольная точка состояния поддерживает гарантии точной однократной обработки для потоковых запросов, но может снизить общую задержку для некоторых рабочих нагрузок с отслеживанием состояния в структурированной потоковой передаче, которые сужаются из-за обновлений состояния. Это достигается путем начала обработки следующего микропакета сразу после завершения вычисления предыдущего микропакета, не ожидая завершения контрольной точки состояния. В следующей таблице сравниваются компромиссы между синхронными и асинхронными контрольными точками.

Характеристика Синхронная контрольная точка асинхронные контрольные точки.
Задержка Более высокая задержка для каждого микропакета. Снижение задержки, так как микропакеты данных могут перекрываться.
Перезагрузить Быстрое восстановление, так как требуется повторно выполнить только последний пакет. Более длительная задержка перезапуска, так как может потребоваться повторный запуск более чем одного микропакета.

Ниже приведены характеристики потоковых заданий, которые могут получить выгоду от асинхронных контрольных точек состояния:

  • Задание имеет одну или несколько операций с отслеживанием состояния (например, агрегирование, flatMapGroupsWithState, mapGroupsWithState, соединения "поток — поток").
  • Задержка контрольной точки состояния является одним из основных факторов, определяющих общую задержку при выполнении пакета. Эти сведения можно найти в событиях StreamingQueryProgress. Эти события также находятся в журналах log4j в драйвере Spark. Ниже приведен пример прогресса выполнения потокового запроса и способы определения влияния контрольных точек на общую задержку выполнения партии.
    • {
         "id" : "2e3495a2-de2c-4a6a-9a8e-f6d4c4796f19",
         "runId" : "e36e9d7e-d2b1-4a43-b0b3-e875e767e1fe",
         "...",
         "batchId" : 0,
         "durationMs" : {
           "...",
           "triggerExecution" : 547730,
           "..."
         },
         "stateOperators" : [ {
           "...",
           "commitTimeMs" : 3186626,
           "numShufflePartitions" : 64,
           "..."
         }]
      }
      
    • Анализ задержки контрольной точки состояния приведенного выше события выполнения запроса

      • Длительность пакета (durationMs.triggerDuration) составляет около 547 сек.
      • Задержка фиксации состояния хранилища (stateOperations[0].commitTimeMs) составляет около 3,186 сек. Задержка фиксации агрегируется по задачам, содержащим хранилище состояния. В данном случае имеется 64 таких задачи (stateOperators[0].numShufflePartitions).
      • На каждую задачу, содержащую оператор состояния, потребовалось в среднем 50 секунд (3186/64) для контрольной точки. Это дополнительная задержка, которая добавляется к длительности серии. При условии, что все 64 задачи выполняются параллельно, этап контрольной точки внес вклад около 9 % (50 сек/547 сек) в длительность обработки пакета. Если максимальное число одновременных задач меньше 64, процентное значение становится пропорционально больше.

Включение асинхронного отслеживания контрольных точек

Для асинхронной контрольной точки состояния необходимо использовать хранилище состояний на основе RocksDB. Задайте следующие конфигурации:


spark.conf.set(
  "spark.databricks.streaming.statefulOperator.asyncCheckpoint.enabled",
  "true"
)

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
)

Ограничения и требования для асинхронных контрольных точек

Примечание.

Автоматическое масштабирование вычислений имеет ограничения, ограничивающие размер кластера для структурированных рабочих нагрузок потоковой передачи. Databricks рекомендует использовать декларативные конвейеры Lakeflow Spark с расширенным автомасштабированием для потоковых рабочих нагрузок. См. статью "Оптимизация использования кластеров Декларативных конвейеров Spark Lakeflow с помощью автомасштабирования".

  • Любой сбой в асинхронной контрольной точке в одном или нескольких хранилищах приведет к сбою запроса. В режиме синхронного отслеживания контрольных точек контрольная точка выполняется как часть задачи, а Spark повторяет выполнение задачи несколько раз, прежде чем отдать запрос. Этот механизм отсутствует в асинхронном отслеживании контрольных точек состояния. Databricks рекомендует использовать непрерывные задания для автоматического повтора при сбое задания. См . статью "Непрерывное выполнение заданий".
  • Асинхронное создание контрольных точек оптимально в тех случаях, когда расположения хранилищ состояния не изменяются в промежуток между выполнением микропакетов. Изменение размера кластера в сочетании с асинхронной контрольной точкой состояния может не работать, так как экземпляр хранилища состояний может повторно распространяться по мере добавления или удаления узлов в рамках события изменения размера кластера.
  • Асинхронное отслеживание контрольных точек состояния поддерживается только в реализации поставщика хранилища состояний RocksDB. Используемая по умолчанию реализация хранилища состояний в памяти не поддерживает ее.