Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
В этой статье рассматривается выбор режима вывода для потоковой передачи данных с сохранением состояния. Только потоки с сохранением состояния, содержащие агрегаты, требуют настройки выходного режима.
Соединения поддерживают только режим добавления данных, а режим вывода не влияет на дедупликацию. Произвольные операторы mapGroupsWithState с отслеживанием состояния и flatMapGroupsWithState генерируют записи с помощью собственной пользовательской логики, поэтому выходной режим потока не влияет на их поведение.
Для стриминга без сохранения состояния все режимы вывода ведут себя одинаково.
Чтобы правильно настроить режим вывода, необходимо понимать потоковую передачу с отслеживанием состояния, подложки и триггеры. См. следующие статьи:
- Что такое потоковая передача с отслеживанием состояния?
- Применение водяных знаков для контроля пороговых значений обработки данных
- Настройка интервалов триггера структурированной потоковой передачи
Что такое режим вывода?
Выходной режим запроса структурированной потоковой передачи определяет, какие записи записывают операторы запроса во время каждого триггера. Три типа записей, которые могут быть сгенерированы:
- Фиксирует, что в будущем обработка не изменится.
- Записи, которые изменились с момента последнего триггера.
- Все записи в таблице состояний.
Зная, какие типы записей следует выдавать, важно для операторов с отслеживанием состояния, так как определенная строка, созданная оператором с отслеживанием состояния, может измениться с триггера на триггер. Например, когда оператор потокового агрегирования получает больше строк для определенного окна, значения агрегирования этого окна могут изменяться с каждым запуском триггера.
Для операторов без состояния различие между типами записей не влияет на поведение оператора. Записи, которые оператор без отслеживания состояния выдает во время триггера, всегда являются исходными записями, обрабатываемыми во время этого триггера.
Доступные режимы вывода
Существует три режима вывода, которые сообщают оператору, какие записи нужно выдавать при определенном триггере.
| Режим вывода | Описание |
|---|---|
| Режим добавления (по умолчанию) | По умолчанию потоковые запросы выполняются в режиме добавления. В этом режиме операторы выдают только строки, которые не изменяются в будущих триггерах. Операторы с состоянием используют метку времени, чтобы определить, когда это происходит. |
| Режим обновления | В режиме обновления операторы выдают все строки, которые изменились во время триггера, даже если выдаваемая запись может измениться в последующем триггере. |
| Полный режим | Полный режим работает только с потоковой агрегацией. В полном режиме все строки результата, создаваемые оператором, передаются вниз по потоку. |
Соображения по производству
Для многих операций потоковой передачи с сохранением состояния необходимо выбрать между режимами добавления и обновления. В следующих разделах изложены соображения, которые могут повлиять на ваше решение.
Примечание.
Полный режим имеет некоторые применения, но может плохо работать с увеличением объема данных. Databricks рекомендует использовать материализованные представления для получения семантических гарантий, связанных с полным режимом с добавочной обработкой для многих операций с отслеживанием состояния. См. материализованные представления.
Семантика приложения
Семантика приложения описывает, как подчиненные приложения используют потоковые данные.
Если нижестоящие службы должны выполнять одно действие для каждой записи данных, используйте режим добавления в большинстве случаев. Например, если у вас есть потоковая служба уведомлений, отправляющая уведомления для каждой новой записи, записанной в приемное устройство, режим добавления гарантирует, что каждая запись записывается только один раз. Режим обновления записывает запись при каждом изменении сведений о состоянии, что приведет к многочисленным обновлениям.
Если зависимым службам требуются свежие результаты, режим обновления гарантирует, что приемник остается как можно более актуальным. Примеры включают модель машинного обучения, считывающую признаки в режиме реального времени, или аналитическую панель, отслеживающую агрегаты в реальном времени.
Совместимость операторов и приемников
Структурированная потоковая передача не поддерживает все операции, доступные в Apache Spark, и некоторые операции потоковой передачи не поддерживаются во всех режимах вывода. Дополнительные сведения об ограничениях операторов см. в документации по потоковой передаче OSS.
Не все приемники поддерживают все режимы вывода. Kafka поддерживает все режимы вывода. Delta Lake, который поддерживает все управляемые таблицы каталога Unity, поддерживает режимы добавления и завершения, но не режим обновления. Для поведения, аналогичного режиму обновления с приемниками Delta Lake, смотрите раздел "Слияние в потоковой передаче".
Дополнительные сведения о совместимости приемников см. в документации по потоковой передаче OSS.
Задержка и стоимость
Режим вывода влияет на то, сколько времени необходимо пройти перед записью, а частота и объем записанных данных может повлиять на затраты, связанные с конвейерами потоковой передачи.
Режим добавления заставляет операторов с сохранением состояния выдавать результаты только после окончательного завершения подсчета, время которого как минимум равно вашей задержке водяного знака. Задержка водяного 1 hour знака в режиме добавления данных означает, что записи имеют по крайней мере 1-часовую задержку, прежде чем они будут переданы дальше по потоку.
Режим обновления приводит к одной записи на триггер для каждого агрегатного значения. Если с вас взимается плата за каждую запись, это может быть дорого, если записи обновляются много раз до истечения задержки времени водяного знака.
Примеры конфигураций
В следующих примерах кода показано, как настроить выходной режим для потоковой передачи обновлений в таблицы каталога Unity:
Питон
# Append output mode (default)
(df.writeStream
.toTable("target_table")
)
# Append output mode (same as default behavior)
(df.writeStream
.outputMode("append")
.toTable("target_table")
)
# Update output mode
(df.writeStream
.outputMode("update")
.toTable("target_table")
)
# Complete output mode
(df.writeStream
.outputMode("complete")
.toTable("target_table")
)
язык программирования Scala
// Append output mode (default)
df.writeStream
.toTable("target_table")
// Append output mode (same as default behavior)
df.writeStream
.outputMode("append")
.toTable("target_table")
// Update output mode
df.writeStream
.outputMode("update")
.toTable("target_table")
// Complete output mode
df.writeStream
.outputMode("complete")
.toTable("target_table")
См. документы OSS для PySpark DataStreamWriter.outputMode или Scala DataStreamWriter.outputMode.
Пример режимов потоковой передачи и вывода с отслеживанием состояния
В следующем примере показано, как режим вывода взаимодействует с водяными знаками для отслеживания состояния в потоковой обработке данных.
Рассмотрим потоковую агрегацию, которая вычисляет общий доход, полученный каждый час в магазине с задержкой времени по водяному знаку в 15 минут. Первый микробатч обрабатывает следующие записи:
- $ 15 в 2:40 вечера
- $ 10 в 2:30 вечера
- $ 30 в 3:10 вечера
На этом этапе временная метка двигателя составляет 14:55, так как вычитается 15 минут (задержка) из максимального времени (15:10). Оператор агрегирования потоковой передачи имеет следующее состояние:
-
[2pm, 3pm]: $25 -
[3pm, 4pm]: $30
В следующей таблице описывается, что произойдет в каждом выходном режиме:
| Режим вывода | Результат и причина |
|---|---|
| Добавление | Оператор агрегирования потоковой передачи не передает ничего дальше по потоку. Это связано с тем, что оба окна могут измениться из-за новых значений, появляющихся с последующим срабатыванием: водяной знак, отмечающий 14:55, указывает на то, что записи после 14:55 ещё могут поступать, и эти записи могут относиться к окну [2pm, 3pm] или окну [3pm, 4pm]. |
| Обновить | Оператор выдает обе записи, так как обе записи получили обновления. |
| Завершено | Оператор выдает все записи. |
Теперь предположим, что поток получает еще одну запись:
- $ 20 в 3:20 вечера
Водяной знак обновляется до 3:05 после полудня, так как двигатель вычитает 15 минут из 3:20 после полудня. На этом этапе оператор агрегирования потоковой передачи имеет следующее состояние:
-
[2pm, 3pm]: $25 -
[3pm, 4pm]: $50
В следующей таблице описывается, что произойдет в каждом выходном режиме:
| Режим вывода | Результат и причина |
|---|---|
| Добавление | Оператор агрегирования потоковой передачи наблюдает, что водяной знак 3:05 после полудня больше, чем конец окна [2pm, 3pm]. Согласно определению водяного знака, это окно больше не может изменяться, поэтому оно выдает окно [2pm, 3pm]. |
| Обновить | Оператор агрегирования потоковой передачи выдает [3pm, 4pm] окно, так как значение состояния изменилось с $30 до $50. |
| Завершено | Оператор выдает все записи. |
Ниже приведены сведения о том, как работают операторы с отслеживанием состояния в каждом режиме добавления.
- В режиме добавления записей запись выполняется один раз после задержки водяного знака.
- В режиме обновления записывайте записи, которые изменились с момента предыдущего триггера.
- В полном режиме записывает все записи, которые когда-либо создаются оператором с отслеживанием состояния.