Применение водяных знаков для контроля пороговых значений обработки данных

На этой странице описаны основные понятия водяного знака и рекомендации по использованию подложки в общих операциях потоковой передачи с отслеживанием состояния. Необходимо применять водяные знаки к потоковым операциям с отслеживанием состояния, чтобы избежать бесконечного расширения объема данных, что может привести к проблемам с памятью или увеличить задержку обработки во время длительных операций.

Что такое водяной знак?

Структурированная потоковая передача использует водяные знаки для управления порогом продолжительности обработки обновлений для конкретной сущности состояния. Распространенные примеры государственных органов включают:

  • Агрегации по временному окну.
  • Уникальные ключи в соединении между двумя потоками данных.

При объявлении водяного знака необходимо указать поле метки времени и пороговое значение подложки для потокового кадра данных. По мере поступления новых данных менеджер состояний отслеживает новейшую метку времени в указанном поле и обрабатывает все записи в пределах допустимой задержки.

В следующем примере применяется пороговое значение 10 минут к окну подсчета.

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

В этом примере:

  • Столбец event_time используется для определения 10-минутного водяного знака и 5-минутного скользящего окна.
  • Количество собирается для каждого id, наблюдаемого в каждом 5-минутном интервале без перекрытия.
  • Сведения о состоянии сохраняются для каждого счетчика до тех пор, пока конец окна не станет на 10 минут позже последнего наблюдаемого event_time.

Внимание

Пороговые значения гарантируют, что записи, поступающие в пределах указанного порога, обрабатываются в соответствии с семантикой определенного запроса. Записи, поступающие за пределы указанного порогового значения, возможно, все еще могут обрабатываться с использованием метрик запросов, но это не гарантируется.

Внимание

Обычно столбец называют полем DataFrame. Однако в операции groupBy() и window() это может привести к ссылке на столбец без маркера времени события. Используйте "<colName>", col("<colName>")или $colName в Scala, чтобы избежать этого.

Как метки времени влияют на время обработки и пропускную способность?

Подложки взаимодействуют с режимами вывода для управления записью данных в приемник. Поскольку водяные знаки сокращают общий объем сведений о состоянии, эффективное использование водяных знаков важно для эффективной пропускной способности потоковой передачи с управлением состоянием.

Примечание.

Не все режимы вывода поддерживаются для всех операций с отслеживанием состояния.

Водяные знаки и выходной режим для оконных агрегатов

Следующая таблица подробно описывает обработку запросов с агрегированием по метке времени с заданным водяным знаком.

Режим вывода Поведение
Добавление Строки записываются в целевую таблицу после превышения порогового значения временной метки. Все записи задерживаются в соответствии с порогом опоздания. Старое состояние агрегирования удаляется после прохождения порогового значения.
Обновить Строки записываются в целевую таблицу по мере вычисления результатов и могут быть обновлены и перезаписаны по мере поступления новых данных. Старое состояние агрегирования удаляется после прохождения порогового значения.
Завершено Состояние агрегации не удаляется. Целевая таблица перезаписывается с каждым триггером.

Временные метки и выходные данные для соединений потоков

Соединения между несколькими потоками поддерживают только режим добавления, и соответствующие записи записываются в каждом пакете по мере их обнаружения. Для внутренних соединений Databricks рекомендует задать пороговое значение подложки для каждого источника данных потоковой передачи. Это позволяет удалять сведения о состоянии для старых записей. Без водяных знаков структурированная потоковая обработка пытается соединить каждый ключ с каждой стороны соединения с каждым триггером.

Структурированная потоковая передача имеет специальную семантику для поддержки внешних соединений. Водяные знаки являются обязательными для внешних соединений, так как указывают, когда ключ должен быть записан с null-значением после того, как остается без совпадения. Хотя внешние соединения могут быть полезны для записи записей, которые никогда не совпадают во время обработки данных, так как соединения записываются только в таблицы в виде операций с добавлением, эти отсутствующие данные не записываются до тех пор, пока порог задержки не прошел.

управлять пороговым значением поздних данных с помощью нескольких политик водяного знака в структурированной потоковой передаче

При работе с несколькими входами структурированной потоковой передачи можно установить несколько водяных знаков для контроля порогов терпимости для поздних поступающих данных. Настройка водяных знаков позволяет управлять информацией о состоянии и влиять на задержку.

Потоковый запрос может иметь несколько входных потоков, которые объединены или соединены вместе. Каждый входной поток может иметь разные пороговые значения для запоздавших данных, которые нужно допускать для операций с отслеживанием состояния. Эти пороговые значения указываются с помощью withWatermarks("eventTime", delay) для каждого входного потока. Ниже приведен пример запроса с соединениями "поток — поток".

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

При выполнении запроса Structured Streaming по отдельности отслеживает максимальное время события, наблюдаемое в каждом входном потоке, вычисляет водяные знаки с учётом соответствующей задержки и выбирает один глобальный водяной знак для использования в операциях с сохранением состояния. По умолчанию минимальное значение устанавливается в качестве глобального водяного знака, так как он предотвращает случайное удаление данных из-за позднего поступления, если один из потоков начинает отставать от других (например, один из потоков перестает получать данные из-за сбоев в верхнем потоке). Другими словами, глобальный водяной знак безопасно перемещается в темпе самого медленного потока, и выходные данные запроса задерживаются соответствующим образом.

Если вы хотите получить результаты быстрее, можно задать политику нескольких водяных знаков, чтобы выбрать максимальное значение в качестве глобального водяного знака, задав конфигурацию SQL spark.sql.streaming.multipleWatermarkPolicy в значение max (по умолчанию используется min). Это позволяет глобальному маркеру двигаться в темпе самого быстрого потока. Но эта конфигурация удаляет данные из самых медленных потоков. Databricks рекомендует использовать эту конфигурацию разумно.

Применение водяных знаков к различным операциям

Операция distinct — это оператор с отслеживанием состояния, который требует watermarks, чтобы предотвратить неограниченный рост состояния. Без подложки структурированная потоковая передача пытается отслеживать каждую уникальную запись на неопределенный срок, что может привести к проблемам с памятью или увеличению задержки обработки.

При применении distinct к кадру потоковой передачи данных необходимо указать подложку в поле метки времени. Подложка определяет, как долго диспетчер состояний сохраняет записи для дедупликации. После прохождения порогового значения водяного знака старые записи удаляются из состояния.

В следующем примере к операции применяется водяной знак distinct:

Python

streamingDf = spark.readStream. ...  # columns: eventTime, id, value, ...

# Apply watermark before distinct operation
(streamingDf
  .withWatermark("eventTime", "1 hour")
  .distinct()
)

Scala

val streamingDf = spark.readStream. ...  // columns: eventTime, id, value, ...

// Apply watermark before distinct operation
streamingDf
  .withWatermark("eventTime", "1 hour")
  .distinct()

В этом примере повторяющиеся записи, поступающие в течение 1 часа после последнего наблюдаемого eventTime события, удаляются из потока. Сведения о состоянии дедупликации удаляются после прохождения порогового значения.

Внимание

Если необходимо удалить дубликаты по определенным столбцам, а не по всем, используйте dropDuplicates() или dropDuplicatesWithinWatermark() вместо distinct. подробности приведены в следующем разделе.

Удаление дубликатов в пределах водяного знака

В Databricks Runtime 13.3 LTS или более поздней версии можно устранять дублирование данных в пределах порога водяного знака с помощью уникального идентификатора.

Структурированная потоковая передача обеспечивает гарантии обработки с точностью до одного раза, но не осуществляет автоматическую дедупликацию записей из источников данных. Вы можете использовать dropDuplicatesWithinWatermark для дедупликации записей в любом указанном поле, что позволяет удалять дубликаты из потока, даже если некоторые поля отличаются (например, время события или время прибытия).

Повторяющиеся записи, поступающие в указанный предел, гарантированно будут удалены. Эта гарантия является строгой только в одном направлении, и повторяющиеся записи, поступающие за пределы указанного порогового значения, также могут быть удалены. Необходимо задать порог задержки водяного знака дольше, чем максимальные различия меток времени между повторяющимися событиями, чтобы удалить все повторяющиеся.

Чтобы использовать метод dropDuplicatesWithinWatermark, необходимо указать водяной знак, как показано в следующем примере:

Python

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])
)

Scala

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(Seq("guid"))