Поделиться через


Оптимизация обработки с отслеживанием состояния с помощью водяных знаков

Для эффективного управления данными, хранящимися в состоянии, используйте вотермарки при обработке потоков с сохранением состояния в декларативных пайплайнах Lakeflow Spark, включая агрегации, соединения и дедупликацию. В этой статье описывается, как использовать водяные знаки в запросах вашего конвейера, и приводятся примеры рекомендуемых операций.

Замечание

Чтобы запросы, выполняющие агрегирование, обрабатывались добавочно и не полностью перекомпилировались с каждым обновлением, необходимо использовать подложки.

Что такое водяной знак?

В потоковой обработке водяной знак — это функция Apache Spark, которая может определять временной порог для обработки данных при выполнении операций с состоянием, таких как агрегации. Поступающие данные обрабатываются до достижения порогового значения, после чего временное окно, определенное пороговым значением, закрывается. Водяные знаки можно использовать для предотвращения проблем во время обработки запросов, главным образом при обработке больших наборов данных или длительной обработки. Эти проблемы могут включать высокую задержку при создании результатов и даже ошибок вне памяти (OOM) из-за объема данных, хранящихся в состоянии во время обработки. Так как потоковые данные по сути неупорядочены, водяные знаки также поддерживают корректный расчет таких операций, как агрегирование временных окон.

Дополнительные сведения об использовании водяных знаков в потоковой обработке см. в разделе "Водяные знаки в Apache Spark Структурированная потоковая передача и применение водяных знаков для управления пороговыми значениями обработки данных.

Как определить подложку?

Вы определяете подложку, указывая поле метки времени и значение, представляющее пороговое значение времени для получения поздних данных . Данные считаются поздними, если они поступают после определенного порогового значения времени. Например, если пороговое значение определяется как 10 минут, записи, поступающие после 10-минутного порогового значения, могут быть удалены.

Так как записи, поступающие после определенного порогового значения, могут быть удалены, важно выбрать пороговое значение, соответствующее требованиям задержки и правильности. Выбор меньшего порогового значения приводит к тому, что записи создаются раньше, но также означает, что поздние записи, скорее всего, будут удалены. Более большое пороговое значение означает более длительное ожидание, но, возможно, больше полноты данных. Из-за большего размера состояния больший порог также может потребовать дополнительных вычислительных ресурсов. Так как пороговое значение зависит от требований к данным и обработке, тестирование и мониторинг обработки важно определить оптимальное пороговое значение.

Вы используете функцию withWatermark() в Python для определения водяного знака. В SQL используйте оператор WATERMARK для установки водяного знака:

Питон

withWatermark("timestamp", "3 minutes")

SQL

WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

Использование временных меток с объединениями потоков

Для объединения потоков необходимо определить метку времени на обеих сторонах объединения и указать временной интервал. Поскольку каждый источник объединения имеет неполное представление данных, предложение интервала времени необходимо для указания стриминговому движку, когда дальнейшие совпадения не могут быть выполнены. Предложение интервала времени должно использовать те же поля, которые используются для определения водяных знаков.

Потому что в некоторых потоках могут потребоваться разные пороговые значения для водяных знаков, потоки не обязаны иметь одинаковые пороговые значения. Чтобы избежать потери данных, стриминговый модуль поддерживает одну глобальную метку времени на основе самого медленного потока.

В следующем примере объединяются поток показов рекламы и поток щелчков пользователей по рекламе. В этом примере щелчок должен быть сделан в течение 3 минут после показа. После прохождения 3-минутного интервала времени строки из текущего состояния, которые больше не могут быть сопоставлены, удаляются.

Питон

from pyspark import pipelines as dp

dp.create_streaming_table("adImpressionClicks")
@dp.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf

SQL

CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

Выполнение оконных агрегаций с watermark-ами

Распространенная операция с отслеживанием состояния потоковых данных — это агрегирование с окном. Оконные агрегаты похожи на сгруппированные агрегаты, за исключением того, что агрегированные значения возвращаются для набора строк, входящих в определенное окно.

Окно можно определить как определенную длину, и операцию агрегирования можно выполнить во всех строках, входящих в это окно. Spark Streaming поддерживает три типа окон:

  • Скользящие (фиксированные) окна: ряд интервалов времени фиксированного размера, не перекрывающихся и смежных. Входная запись принадлежит только одному окну.
  • Скользящие окна: как и переворачивающиеся окна, скользящие окна имеют фиксированный размер, но окна могут перекрываться, и запись может попасть в несколько окон.

Когда данные поступают после окончания окна плюс длина метки времени, новые данные не принимаются для окна, результат агрегирования создается, а состояние окна сбрасывается.

В следующем примере вычисляется сумма впечатлений каждые 5 минут с помощью фиксированного окна. В этом примере предложение select использует псевдоним impressions_window, а затем само окно определяется как часть GROUP BY предложения. Окно должно основываться на том же столбце метки времени, что и подложка, clickTimestamp столбц в этом примере.

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, window(clickTimestamp, "5 minutes") as impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

Аналогичный пример в Python для вычисления прибыли за фиксированные временные окна, повторяющиеся каждый час:

from pyspark import pipelines as dp

@dp.table()
def profit_by_hour():
  return (
    spark.readStream.table("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

Дедупликация записей потоковой передачи

Структурированная потоковая передача имеет гарантии обработки ровно один раз, но не обеспечивает автоматическое удаление повторяющихся записей из источников данных. Например, так как многие очереди сообщений имеют по крайней мере один раз гарантии, при чтении из одной из этих очередей сообщений следует ожидать повторяющиеся записи. Функцию dropDuplicatesWithinWatermark() можно использовать для отмены повторяющихся записей в любом указанном поле, удаляя дубликаты из потока, даже если некоторые поля отличаются (например, время события или время прибытия). Чтобы использовать функцию dropDuplicatesWithinWatermark(), необходимо указать водяной знак. Все повторяющиеся данные, поступающие в диапазон времени, указанный временной меткой, отбрасываются.

Упорядоченные данные важны, так как неупорядоченные данные приводят к неправильным скачкам значения водяного знака. Затем, когда старые данные поступают, они считаются полученными с опозданием и отбрасываются. Используйте опцию withEventTimeOrder для обработки начального моментального снимка в порядке на основе метки времени, указанной в водяном знаке. Параметр withEventTimeOrder можно объявить в коде, определяющем набор данных или в параметрах конвейера с помощью spark.databricks.delta.withEventTimeOrder.enabled. Рассмотрим пример.

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

Замечание

Этот withEventTimeOrder параметр поддерживается только в Python.

В следующем примере данные обрабатываются в порядке clickTimestamp, а записи, поступающие в течение 5 секунд друг от друга и содержащие повторяющиеся столбцы userId и clickAdId, удаляются.

clicksDedupDf = (
  spark.readStream.table
    .option("withEventTimeOrder", "true")
    .table("rawClicks")
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

Оптимизация конфигурации конвейера для обработки данных с сохранением состояния

Чтобы предотвратить проблемы с рабочей средой и чрезмерную задержку, Databricks рекомендует включить управление состоянием на основе RocksDB для обработки потоков с отслеживанием состояния, особенно если обработка требует сохранения большого количества промежуточного состояния.

Бессерверные конвейеры автоматически управляют конфигурациями хранилища состояний.

Вы можете включить управление состоянием на основе RocksDB, задав следующую конфигурацию перед развертыванием конвейера:

{
  "configuration": {
    "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
  }
}

Дополнительные сведения о хранилище состояний RocksDB, включая рекомендации по настройке RocksDB, см. в статье "Настройка хранилища состояний RocksDB в Azure Databricks".