Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
На этой странице описываются концепции водяных меток и приводятся рекомендации по их использованию в распространённых операциях потоковой обработки с сохранением состояния.
Потоковые запросы накапливают данные о состоянии с течением времени. Водяные метки автоматически удаляют устаревшие данные состояния, чтобы предотвратить ошибки памяти и увеличение задержки обработки.
Что такое водяной знак?
Во время обработки структурированная потоковая передача сохраняет состояние в микропакетах. Потоковые запросы используют состояние для инкрементального обновления результатов вместо пересчёта всего после каждого микропакета. Пороговые значения определяют порог, при котором запрос прекращает обработку объекта состояния.
Распространенные примеры государственных органов включают:
- Агрегации по временному окну.
- Уникальные ключи в соединении между двумя потоками данных.
Чтобы задать водяную метку для потокового DataFrame, укажите поле метки времени и порог допустимого опоздания. По мере поступления новых данных диспетчер состояний отслеживает самую последнюю метку времени в указанном поле и обрабатывает только записи в пороге задержки.
Запросы всегда обрабатывают записи, поступающие в пределах порогового значения. Запросы могут по-прежнему обрабатывать записи, поступающие за пределы порогового значения, но это не гарантируется.
В следующем примере применяется пороговое значение 10 минут к окну подсчета.
Python
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
Scala
import org.apache.spark.sql.functions.window
df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
В этом примере:
- Столбец
event_timeиспользуется для определения 10-минутного водяного знака и 5-минутного скользящего окна. - Для каждого наблюдаемого
idподсчитывается количество в каждом неперекрывающемся 5-минутном интервале. - Информация о состоянии сохраняется для каждого подсчёта, пока конец окна не станет на 10 минут старше последнего наблюдаемого
event_time.
Внимание
При выполнении операции groupBy() и window() ссылайтесь на столбцы по имени, "<colName>" или col("<colName>"), чтобы сохранить маркер времени события. В Scala также можно использовать $colName.
Как водяные знаки влияют на время обработки и пропускную способность?
Режимы вывода определяют, когда запрос с водяными знаками записывает данные в приёмник. Водяные знаки важны для контроля пропускной способности в потоковой передаче с отслеживанием состояния, так как они снижают общий объем сведений о состоянии в памяти. Не все режимы вывода поддерживаются для всех операций с отслеживанием состояния. См. Водяные знаки и режим вывода для оконных агрегаций.
Выбор длительности водяного знака сопряжён с компромиссами:
- Более короткие интервалы водяных знаков снижают задержку выполнения запросов, так как запросам требуется хранить меньше информации о состоянии и записывать результаты после завершения каждого интервала водяного знака. Однако короткие водяные метки плохо переносят запаздывающие данные.
- Более длинные подложки имеют высокий уровень допустимости для поздних данных. Однако длинные водяные метки увеличивают задержку запросов, поскольку запросам приходится хранить больше информации о состоянии и ждать записи результатов до истечения более длительного интервала водяной метки.
Водяные знаки и выходной режим для оконных агрегатов
В следующей таблице показано поведение обработки запросов с агрегированием по метке времени и водяному знаку:
| Режим вывода | Поведение |
|---|---|
| Добавление | Запрос записывает строки в целевую таблицу после прохождения порогового значения водяного знака. Все записи задерживаются в соответствии с порогом опоздания. Старое состояние агрегирования удаляется после прохождения порогового значения. |
| Обновить | Запрос записывает строки в целевую таблицу по мере вычисления результатов, и запрос может обновлять и перезаписывать строки по мере поступления новых данных. Старое состояние агрегирования удаляется после прохождения порогового значения. |
| Завершено | Состояние агрегации не удаляется. Запрос перезаписывает целевую таблицу для каждого триггера. |
Водяные знаки и режимы вывода для соединений между потоками данных
Соединения между несколькими потоками поддерживают только режим добавления. Запросы записывают соответствующие записи для каждого пакета.
Для внутренних объединений Databricks рекомендует задавать порог watermark для каждого источника потоковых данных, чтобы запрос мог удалять информацию о состоянии для устаревших записей. Без подложки структурированная потоковая передача пытается объединить каждый ключ с обеих сторон соединения на каждом триггере, что может повлиять на производительность.
Для внешних соединений нанесение водяных знаков является обязательным. Если запись не совпадает, запрос записывает значение NULL для этого ключа. Так как соединения поддерживают только режим добавления, несовпаденные записи не записываются до тех пор, пока порог задержки не пройдет.
Управляйте порогом поздних данных с помощью политики нескольких водяных меток
Для нескольких структурированных входных данных потоковой передачи можно задать несколько подложек для контроля пороговых значений допустимости для поздних данных. Водяные знаки позволяют управлять информацией о состоянии и задержкой.
Потоковый запрос может иметь несколько входных потоков, которые объединены или соединены вместе. Для операций с отслеживанием состояния для каждого входного потока может потребоваться другое пороговое значение для поздней допустимости данных. Укажите эти пороги с помощью withWatermark("eventTime", delay) для каждого входного потока. Ниже приведен пример запроса с соединениями "поток — поток".
Python
input_stream1 = ... # delays up to 1 hour
input_stream2 = ... # delays up to 2 hours
(input_stream1.withWatermark("eventTime1", "1 hour")
.join(
input_stream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
)
Scala
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
При выполнении запроса с операциями с сохранением состояния Structured Streaming отдельно отслеживает максимальное время событий для каждого входного потока, вычисляет водяные знаки на основе соответствующей задержки и определяет единый глобальный водяной знак. По умолчанию структурированная потоковая передача использует минимальное значение в качестве глобального водяного знака. Если один поток отстаёт от остальных, минимальная глобальная водяная метка не даёт запросу по ошибке пометить данные как опоздавшие. Например, это может произойти, когда один из потоков перестает получать данные из-за сбоев вышестоящего потока. Глобальный водяной знак безопасно перемещается в темпе самого медленного потока и задерживает выходные данные запроса при необходимости.
Чтобы уменьшить задержку, установите для spark.sql.streaming.multipleWatermarkPolicy значение max (по умолчанию — min), чтобы использовать водяную метку самого быстрого потока в качестве глобальной водяной метки. Но эта конфигурация удаляет данные из самых медленных потоков. Databricks рекомендует применять эту конфигурацию с осторожностью.
Применение водяных знаков к различным операциям
Операция distinct отслеживает каждую уникальную запись в состоянии. Без водяного знака состояние растет бесконечно и может вызвать проблемы с памятью. Укажите водяной знак в поле метки времени для привязки состояния и удаления старых записей после прохождения порогового значения.
В следующем примере к операции применяется водяной знак distinct:
Python
streamingDf = spark.readStream. ... # columns: eventTime, id, value, ...
# Apply watermark before distinct operation
(streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
)
Scala
val streamingDf = spark.readStream. ... // columns: eventTime, id, value, ...
// Apply watermark before distinct operation
streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
В этом примере потоковый запрос удаляет дублирующиеся записи, поступающие в течение 1 часа с момента последнего наблюдаемого eventTime. Запрос удаляет сведения о состоянии дедупликации после прохождения порогового значения.
Внимание
Чтобы удалить дубликаты в определенных столбцах вместо всех столбцов, используйте dropDuplicates() или dropDuplicatesWithinWatermark() вместо distinct. См. раздел «Удалить дубликаты в водяном знаке».
Удаление дубликатов в пределах водяного знака
В Databricks Runtime версии 13.3 LTS и выше можно использовать уникальный идентификатор, чтобы удалять дубликаты записей в пределах порога водяного знака.
Структурированная потоковая передача гарантирует точно один раз обработку, но не дедупликации записей из источников данных. Используется dropDuplicatesWithinWatermark для удаления дубликатов в любом поле, даже если поля отличаются между повторяющимися записями, например временем события или временем прибытия.
При использовании dropDuplicatesWithinWatermark запросы всегда удаляют дубликаты записей, поступающих в пределах порога водяного знака. Запросы также могут удалять дубликаты записей, поступающих за пределами этого порога, однако это не гарантируется. Чтобы гарантировать, что запросы удаляют все дубликаты, задайте пороговое значение watermark больше максимальной разницы между метками времени дублирующихся событий.
Для использования метода dropDuplicatesWithinWatermark необходимо указать водяной знак:
Python
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
Scala
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(Seq("guid"))
Примеры вариантов использования
В следующих примерах показаны варианты использования расширенных окон:
Используйте кувыркающиеся окна для расчета почасовых итогов продаж
Кувыркающиеся окна имеют фиксированный размер и неперекрывающиеся интервалы. Каждая входная строка принадлежит ровно одному окну. Используйте переворачивающиеся окна для вычисления дискретных агрегатов периода времени, таких как суммарные суммы почасового объема продаж:
Python
from pyspark.sql.functions import window, sum
hourly_sales = (orders
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour"))
.agg(sum("amount").alias("total_sales"))
)
Scala
import org.apache.spark.sql.functions.{window, sum}
val hourlySales = orders
.withWatermark("timestamp", "1 hour")
.groupBy(window($"timestamp", "1 hour"))
.agg(sum($"amount").alias("total_sales"))
В этом примере:
-
window("timestamp", "1 hour")группирует заказы по непересекающимся часовым интервалам, например с 5 до 6 утра и с 6 до 7 утра. -
withWatermark("timestamp", "1 hour")хранит агрегированное значение для каждого окна в состоянии до тех пор, пока метка времени окончания окна не станет на 1 час старше максимальной метки времени заказа.
Использование скользящих окон для вычисления скользящих агрегатов
Скользящие окна имеют фиксированный размер, а интервалы могут перекрываться. Одна строка может принадлежать нескольким окнам. Используйте скользящие окна для вычисления скользящих агрегатов, таких как объём продаж за скользящий 6-часовой период:
Python
from pyspark.sql.functions import window, sum
rolling_sales = (orders
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "6 hours", slideDuration="1 hour"))
.agg(sum("amount").alias("total_sales"))
)
Scala
import org.apache.spark.sql.functions.{window, sum}
val rollingSales = orders
.withWatermark("timestamp", "1 hour")
.groupBy(window($"timestamp", "6 hours", "1 hour"))
.agg(sum($"amount").alias("total_sales"))
В этом примере:
-
window("timestamp", "6 hours", slideDuration="1 hour")группирует заказы по 6-часовым интервалам, которые сдвигаются на 1 час, например с 5:00 до 11:00 и с 6:00 до 12:00. -
withWatermark("timestamp", "1 hour")сохраняет агрегированное значение каждого окна в состоянии до тех пор, пока метка времени окончания окна не станет на 1 час старше максимальной метки времени заказа. -
slideDurationдолжно быть меньше или равно значениюwindowDuration.
Использование окон сеансов для проверки активности пользователя
Окна сеансов не имеют фиксированного размера. Окно открывается при поступлении строки и закрывается по истечении интервала, в течение которого не поступают новые строки. Используйте окна сеансов для агрегирования активности между длительными периодами простоя, например представления страниц пользователя в течение 30-минутного периода:
Python
from pyspark.sql.functions import session_window, sum
sessionized_page_views = (activity
.withWatermark("timestamp", "1 hour")
.groupBy("user_id", session_window("timestamp", gapDuration="30 minutes"))
.agg(sum("page_views").alias("total_page_views"))
)
Scala
import org.apache.spark.sql.functions.{session_window, sum}
val sessionizedPageViews = activity
.withWatermark("timestamp", "1 hour")
.groupBy($"user_id", session_window($"timestamp", "30 minutes"))
.agg(sum($"page_views").alias("total_page_views"))
В этом примере:
-
session_window("timestamp", gapDuration="30 minutes")открывает окно при поступлении первого представления страницы. Каждое последующее представление страницы, которое поступает в течение 30 минут, расширяет окно. Если представление страницы не поступает в течение 30 минут, окно закрывается, а следующее представление страницы запускает новое окно. -
withWatermark("timestamp", "1 hour")хранит агрегированное значение каждого сеанса в состоянии до тех пор, пока метка времени окончания окна не станет на 1 час старше максимальной метки времени просмотра страницы. - Аргумент
timeColumnдляwindow()иsession_window()должен быть илиTimestampTypeTimestampNTZType. - Используется
current_timestamp()для определения окон на основе времени обработки, а не времени события. - Длительность окна можно задать в микросекундах до дней. Длительности в месяц и более не поддерживаются.
- Используйте режим вывода
completeс оконными агрегатами, чтобы сохранять всё состояние окон неограниченно долго. Используйтеappendрежим вывода с соответствующим подложкой для ограничения роста состояния и предотвращения проблем с памятью для больших наборов данных. Дополнительные сведения о работе режима вывода см. в разделе Водяные знаки и режим вывода для оконных агрегатов.