Поделиться через


Применение подложек для контроля пороговых значений обработки данных

В этой статье приведены основные понятия водяного знака и рекомендации по использованию подложек в общих операциях потоковой передачи с отслеживанием состояния. Необходимо применять подложки к операциям потоковой передачи с отслеживанием состояния, чтобы избежать бесконечного расширения объема данных, хранящихся в состоянии, что может привести к проблемам с памятью и увеличить задержку обработки во время длительных операций потоковой передачи.

Что такое водяной знак?

Структурированная потоковая передача использует подложки для управления пороговым значением для продолжения обработки обновлений для заданной сущности состояния. Ниже приведены распространенные примеры сущностей состояния:

  • Агрегаты в течение периода времени.
  • Уникальные ключи в соединении между двумя потоками.

При объявлении водяного знака необходимо указать поле метки времени и пороговое значение подложки для потокового кадра данных. По мере поступления новых данных диспетчер состояний отслеживает самую последнюю метку времени в указанном поле и обрабатывает все записи в пороге задержки.

В следующем примере применяется пороговое значение 10 минут к окну счетчика:

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

В этом примере:

  • Столбец event_time используется для определения 10-минутного водяного знака и 5-минутного переворачивающегося окна.
  • Счетчик собирается для каждого id наблюдаемого для каждого не перекрывающегося 5-минутного окна.
  • Сведения о состоянии сохраняются для каждого счетчика до конца окна до 10 минут старше последнего наблюдаемого event_time.

Внимание

Пороговые значения водяного знака гарантируют, что записи, поступающие в указанное пороговое значение, обрабатываются в соответствии с семантикой определенного запроса. Записи, поступающие за пределы указанного порогового значения, могут по-прежнему обрабатываться с помощью метрик запросов, но это не гарантируется.

Как водяной знак влияет на время обработки и пропускную способность?

Подложки взаимодействуют с режимами вывода для управления записью данных в приемник. Поскольку подложки сокращают общий объем сведений о состоянии, эффективное использование подложек важно для эффективной пропускной способности потоковой передачи с отслеживанием состояния.

Примечание.

Не все режимы вывода поддерживаются для всех операций с отслеживанием состояния.

Подложки и выходной режим для оконных агрегатов

Следующая таблица содержит сведения об обработке запросов с агрегированием по метке времени с определенным подложкой:

Режим вывода Поведение
Добавление Строки записываются в целевую таблицу после прохождения порогового значения водяного знака. Все записи задерживаются на основе порога задержки. Старое состояние агрегирования удаляется после прохождения порогового значения.
Обновить Строки записываются в целевую таблицу по мере вычисления результатов и могут быть обновлены и перезаписаны по мере поступления новых данных. Старое состояние агрегирования удаляется после прохождения порогового значения.
Завершено Состояние агрегирования не удаляется. Целевая таблица перезаписывается с каждым триггером.

Подложки и выходные данные для соединений потокового потока

Соединения между несколькими потоками поддерживают только режим добавления, а соответствующие записи записываются в каждом пакете, который они обнаруживают. Для внутренних соединений Databricks рекомендует задать пороговое значение подложки для каждого источника данных потоковой передачи. Это позволяет отсовести сведения о состоянии карта для старых записей. Без подложки структурированная потоковая передача пытается присоединить каждый ключ с обеих сторон соединения с каждым триггером.

Структурированная потоковая передача имеет специальную семантику для поддержки внешних соединений. Подложка является обязательной для внешних соединений, так как указывает, когда ключ должен быть записан со значением NULL после завершения несоответствен. Обратите внимание, что в то время как внешние соединения могут быть полезны для записи записей, которые никогда не совпадают во время обработки данных, так как соединения записываются только в таблицы в виде операций с добавлением, эти отсутствующие данные не записываются до тех пор, пока порог задержки не прошел.

Управление пороговым значением поздних данных с помощью нескольких политик водяного знака в структурированной потоковой передаче

При работе с несколькими входными данными структурированной потоковой передачи можно задать несколько пределов, чтобы управлять допустимыми пороговыми значениями для данных, поступающих с задержкой. Настройка пределов позволяет управлять сведениями о состоянии и влиять на задержку.

Потоковый запрос может иметь несколько входных потоков, которые объединяются или присоединяются. Каждый входной поток может иметь разные пороговые значения для запоздавших данных, которые нужно допускать для операций с отслеживанием состояния. Эти пороговые значения указываются с помощью withWatermarks("eventTime", delay) для каждого входного потока. Ниже приведен пример запроса с соединениями "поток — поток".

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

При выполнении запроса структурированная потоковая передача по отдельности отслеживает максимальное время события в каждом входном потоке, вычисляет пределы на основе соответствующей задержки и выбирает один глобальный предел, который можно использовать для операций с отслеживанием состояния. По умолчанию в качестве глобального предела выбирается минимум, так как он гарантирует, что данные не будут случайно отброшены, если один из потоков находится за другими (например, один из потоков прекращает получать данные из-за вышестоящего сбоя). Иными словами, глобальный предел будет безопасно сдвигаться в темпе самого медленного потока, а выходные данные запроса будут отложены соответствующим образом.

Если вы хотите ускорить получение результатов, можно настроить политику использования нескольких пределов, чтобы выбрать максимальное значение в качестве глобального предела, задав для конфигурации SQL spark.sql.streaming.multipleWatermarkPolicy значение max (по умолчанию используется min). Это позволяет перемещать глобальный водяной знак со скоростью самого быстрого потока. Но эта конфигурация удаляет данные из самых медленных потоков. Из-за этого Databricks рекомендует использовать эту конфигурацию разумно.

Удаление дубликатов в подложку

В Databricks Runtime 13.3 LTS и более поздних версиях можно дедупликировать записи в пороговом значении водяного знака с помощью уникального идентификатора.

Структурированная потоковая передача обеспечивает точно один раз гарантии обработки, но не выполняет автоматическое дедупликация записей из источников данных. Можно использовать dropDuplicatesWithinWatermark для дедупликации записей в любом указанном поле, что позволяет удалять дубликаты из потока, даже если некоторые поля отличаются (например, время события или время прибытия).

Повторяющиеся записи, поступающие в указанный водяной знак, гарантированно будут удалены. Эта гарантия является строгой только в одном направлении, и повторяющиеся записи, поступающие за пределы указанного порогового значения, также могут быть удалены. Необходимо задать порог задержки водяного знака дольше, чем максимальные различия меток времени между повторяющимися событиями, чтобы удалить все повторяющиеся.

Для использования dropDuplicatesWithinWatermark метода необходимо указать водяной знак, как показано в следующем примере:

Python

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark("guid")
)

Scala

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark("guid")