Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Important
Потоки REPLACE WHERE для автономных потоковых таблиц находятся на стадии бета-тестирования.
На этой странице описывается, как использовать потоки REPLACE WHERE для повторной компиляции и перезаписи целевого подмножества автономной потоковой таблицы без повторной обработки всей истории таблиц. Потоки REPLACE WHERE поддерживают обработку поздно поступающих данных, повторную обработку в вышестоящих системах, изменения схемы и дозагрузку исторических данных.
С помощью потока REPLACE WHERE вы определяете предикат в целевой таблице. Все строки, соответствующие предикату, удаляются и заменяются повторной оценкой исходного запроса для того же диапазона предиката. Строки, которые не соответствуют предикату, остаются незамеченными.
Требования
Потоки REPLACE WHERE имеют следующие требования:
- Потоковая таблица должна использовать
PREVIEWканал. Смchannelв конфигурациях конвейера. - Databricks рекомендует каталог Unity и бессерверные вычисления. Добавочное обновление поддерживается только для бессерверных вычислений.
Когда следует использовать потоки REPLACE WHERE
Используйте потоки REPLACE WHERE для следующих сценариев:
- Добавочная пакетная обработка без семантики потоковой передачи. Обработка новых строк в пакетах без управления понятиями потоковой передачи, такими как подложки.
- Выборочная повторная обработка: пересчитывайте только те строки, которые соответствуют предикату, оставляя все остальные строки без изменений.
-
Сценарии, выходящие за рамки стандартных материализованных возможностей представления:
- Целевые таблицы с более длительным сроком хранения, чем у источника
- Предотвращение повторной компиляции при изменении таблицы измерений
- Эволюция схемы без повторной компиляции всей истории
Создание потока REPLACE WHERE
Используйте FLOW REPLACE WHEREусловие в одной строке с CREATE OR REFRESH STREAMING TABLE:
CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.channel = 'PREVIEW')
SCHEDULE EVERY 1 DAY
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Во время обновления удаляются все строки в целевой таблице, соответствующие предикату, исходный запрос повторно компилируется для того же диапазона предиката, а новые результаты вставляются. В этом примере все строки за последние 7 дней удаляются из orders_enriched и заново вычисляются с помощью исходного запроса.
Вам не нужно добавлять предикат в исходный запрос. Механизм конвейера автоматически применяет это при чтении данных из источника.
Note
BY NAME является обязательным. Он гарантирует соответствие столбцов по имени, а не по позиции.
Заполнение исторических данных
Чтобы выполнить обратное заполнение, выполните инструкции DML непосредственно в целевой таблице:
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';
Поведение полного обновления
Полное обновление потока REPLACE WHERE повторно выполняет исходный запрос с использованием только текущего предиката. Строки, вставленные операторами DML за пределами текущего диапазона предиката, окончательно удаляются.
Предупреждение
Полное обновление очищает все существующие данные и повторно выполняет поток с помощью только определенного предиката. Если конвейер работает в течение года с предикатом 7 дней, полное обновление приводит к тому, что таблица содержит только последние 7 дней данных. Все старые строки окончательно удаляются.
REFRESH STREAMING TABLE orders_enriched FULL;
Чтобы предотвратить полное обновление таблицы, задайте для свойства pipelines.reset.allowedfalseтаблицы значение :
CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.reset.allowed = 'false')
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
...
Добавочное обновление
Потоки REPLACE WHERE используют инкрементное обновление, когда это возможно, обрабатывая только те исходные данные, которые изменились с момента последнего обновления, вместо пересчёта всего окна замены целиком. Добавочное обновление требует бессерверных вычислений.
Когда применяется инкрементальное обновление
Все из перечисленного ниже должно быть истинным:
- Конвейер выполняется на бессерверных вычислениях.
- Поддерживается структура запроса. Сведения о поддерживаемом наборе операторов см. в инкрементальном обновлении.
- Предикат ссылается на базовые столбцы из исходной таблицы. Предикаты по производным значениям, таким как результаты агрегатных или оконных функций, нельзя перенести на источник, из-за чего становится недоступным инкрементное обновление.
- Внешний DML не изменил строки в текущем окне замены. DML, изменяющий строки за пределами текущего окна, не затрагивается.
- Текущее окно замены не включает строки, исключенные из предыдущего предиката. Если вы расширяете предикат так, чтобы он охватывал диапазон, который ранее не обрабатывался, это обновление выполняется с полным пересчётом. Последующие обновления имеют право на добавочное обновление снова.
- Предикат детерминирован. Предикаты, использующие недетерминированные функции, такие как
rand(), отключают инкрементное обновление. Темпоральные функции, такие какcurrent_date()разрешены.
Первое обновление любого потока всегда является полным вычислением. Если какое-либо условие не выполнено, это обновление возвращается к полной повторной компиляции текущего окна замены.
Рекомендации по инкрементальному обновлению
Следуйте этим рекомендациям, чтобы потоки REPLACE WHERE оставались допустимыми для добавочного обновления.
Используйте подвижную нижнюю границу
Предикаты с перемещением нижней границы остаются допустимыми для добавочного обновления на неопределенный срок.
FLOW REPLACE WHERE date >= date_add(current_date(), -7)
Перемещаемая верхняя граница, например date BETWEEN date_add(current_date(), -7) AND current_date(), может сдвинуть окно, чтобы включить ранее исключенные строки, активировав обратный возврат к полной повторной компиляции.
Включить столбец предикатов в GROUP BY
При агрегации включите столбец условия в GROUP BY, чтобы механизм мог протолкнуть условие ниже операции агрегации.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;
Если столбец предиката отсутствует в GROUP BY, предикат нельзя протолкнуть ниже агрегации, и источник сканируется полностью.
Включить столбец предиката в ключи соединения
Включите столбец предиката в условие соединения, чтобы механизм мог отсечь все соединяемые источники.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;
Если присоединённая таблица не содержит столбец предиката, при каждом обновлении она полностью сканируется.
Диагностировать переход к полному пересчёту
Когда обновление переходит к полному пересчёту, причина указывается в событии planning_information для потока. См. статью "Мониторинг журналов событий конвейера". В следующей таблице перечислены причины, сообщаемые в событии:
| Причина | Значение |
|---|---|
EXTERNAL_CHANGE_IN_REPLACE_WINDOW |
Внешняя операция DML изменила строки в текущем окне замены. |
REPLACE_WHERE_NOT_DETERMINISTIC |
Предикат использует недетерминированные выражения. |
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC |
Предыдущее обновление использовало недетерминированный предикат. |
UNSUPPORTED_REPLACE_WHERE_PREDICATE |
Предикат нельзя отправить в любой источник, текущее окно содержит строки, не обработанные предыдущим предикатом, или выполнение использует переопределение предиката. |
Примеры
В следующих примерах показаны распространенные шаблоны потока REPLACE WHERE .
Пример 1. Сохранение статистических статистических данных из источника ограниченного хранения
Этот пример сохраняет ежедневные статистические данные неограниченное время даже после того, как необработанные данные выходят из исходной таблицы (3-дневное хранение):
CREATE OR REFRESH STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;
Пример 2. Предотвращение повторной компиляции при изменении таблицы измерений
В этом примере строки фактов сохраняются без изменений при изменении атрибутов измерения:
CREATE OR REFRESH STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;
Если регион пользователя изменяется, перекомпьютируются только последние строки. Исторические строки сохраняют значение региона во время их записи.
Пример 3. Добавление новой метрики без повторной компиляции полной истории
В этом примере показано, как изменить определение таблицы и заполучить только целевой диапазон:
Определите начальную таблицу:
CREATE OR REFRESH STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks FROM clickstream_raw GROUP BY ALL;Обновите запрос, чтобы добавить
uniq_users:CREATE OR REFRESH STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks, COUNT(DISTINCT user_id) AS uniq_users FROM clickstream_raw GROUP BY ALL;Строки старше 7-дневного окна содержат
NULLдляuniq_users.
Пример 4: Выполните итерации на небольшом временном диапазоне перед дозаполнением всей истории
В этом примере показано, как проверить логику запроса в небольшом окне данных перед обработкой полного исторического диапазона.
Начните с короткого окна для проверки метрик и итерации бизнес-логики с более низкими затратами на вычисления:
CREATE OR REFRESH STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;
Короткий интервал пересчитывает только последние 7 дней при каждом обновлении, поэтому корректируйте запрос столько раз, сколько необходимо, прежде чем запускать полный пересчёт за весь исторический период.
После завершения запроса используйте DML, чтобы заполнить полный исторический диапазон:
INSERT INTO revenue_attribution
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
WHERE event_date < date_add(current_date(), -7)
GROUP BY ALL;