Потоки REPLACE WHERE для автономных потоковых таблиц

Important

Потоки REPLACE WHERE для автономных потоковых таблиц находятся на стадии бета-тестирования.

На этой странице описывается, как использовать потоки REPLACE WHERE для повторной компиляции и перезаписи целевого подмножества автономной потоковой таблицы без повторной обработки всей истории таблиц. Потоки REPLACE WHERE поддерживают обработку поздно поступающих данных, повторную обработку в вышестоящих системах, изменения схемы и дозагрузку исторических данных.

С помощью потока REPLACE WHERE вы определяете предикат в целевой таблице. Все строки, соответствующие предикату, удаляются и заменяются повторной оценкой исходного запроса для того же диапазона предиката. Строки, которые не соответствуют предикату, остаются незамеченными.

Требования

Потоки REPLACE WHERE имеют следующие требования:

Когда следует использовать потоки REPLACE WHERE

Используйте потоки REPLACE WHERE для следующих сценариев:

  • Добавочная пакетная обработка без семантики потоковой передачи. Обработка новых строк в пакетах без управления понятиями потоковой передачи, такими как подложки.
  • Выборочная повторная обработка: пересчитывайте только те строки, которые соответствуют предикату, оставляя все остальные строки без изменений.
  • Сценарии, выходящие за рамки стандартных материализованных возможностей представления:
    • Целевые таблицы с более длительным сроком хранения, чем у источника
    • Предотвращение повторной компиляции при изменении таблицы измерений
    • Эволюция схемы без повторной компиляции всей истории

Создание потока REPLACE WHERE

Используйте FLOW REPLACE WHEREусловие в одной строке с CREATE OR REFRESH STREAMING TABLE:

CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.channel = 'PREVIEW')
SCHEDULE EVERY 1 DAY
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
  o.order_id,
  o.date,
  o.region,
  p.product_name,
  o.qty,
  o.price
FROM orders_fct o
JOIN product_dim p
  ON o.product_id = p.product_id;

Во время обновления удаляются все строки в целевой таблице, соответствующие предикату, исходный запрос повторно компилируется для того же диапазона предиката, а новые результаты вставляются. В этом примере все строки за последние 7 дней удаляются из orders_enriched и заново вычисляются с помощью исходного запроса.

Вам не нужно добавлять предикат в исходный запрос. Механизм конвейера автоматически применяет это при чтении данных из источника.

Note

BY NAME является обязательным. Он гарантирует соответствие столбцов по имени, а не по позиции.

Заполнение исторических данных

Чтобы выполнить обратное заполнение, выполните инструкции DML непосредственно в целевой таблице:

INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';

Поведение полного обновления

Полное обновление потока REPLACE WHERE повторно выполняет исходный запрос с использованием только текущего предиката. Строки, вставленные операторами DML за пределами текущего диапазона предиката, окончательно удаляются.

Предупреждение

Полное обновление очищает все существующие данные и повторно выполняет поток с помощью только определенного предиката. Если конвейер работает в течение года с предикатом 7 дней, полное обновление приводит к тому, что таблица содержит только последние 7 дней данных. Все старые строки окончательно удаляются.

REFRESH STREAMING TABLE orders_enriched FULL;

Чтобы предотвратить полное обновление таблицы, задайте для свойства pipelines.reset.allowedfalseтаблицы значение :

CREATE OR REFRESH STREAMING TABLE orders_enriched
  TBLPROPERTIES (pipelines.reset.allowed = 'false')
  FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
  ...

Добавочное обновление

Потоки REPLACE WHERE используют инкрементное обновление, когда это возможно, обрабатывая только те исходные данные, которые изменились с момента последнего обновления, вместо пересчёта всего окна замены целиком. Добавочное обновление требует бессерверных вычислений.

Когда применяется инкрементальное обновление

Все из перечисленного ниже должно быть истинным:

  • Конвейер выполняется на бессерверных вычислениях.
  • Поддерживается структура запроса. Сведения о поддерживаемом наборе операторов см. в инкрементальном обновлении.
  • Предикат ссылается на базовые столбцы из исходной таблицы. Предикаты по производным значениям, таким как результаты агрегатных или оконных функций, нельзя перенести на источник, из-за чего становится недоступным инкрементное обновление.
  • Внешний DML не изменил строки в текущем окне замены. DML, изменяющий строки за пределами текущего окна, не затрагивается.
  • Текущее окно замены не включает строки, исключенные из предыдущего предиката. Если вы расширяете предикат так, чтобы он охватывал диапазон, который ранее не обрабатывался, это обновление выполняется с полным пересчётом. Последующие обновления имеют право на добавочное обновление снова.
  • Предикат детерминирован. Предикаты, использующие недетерминированные функции, такие как rand(), отключают инкрементное обновление. Темпоральные функции, такие как current_date() разрешены.

Первое обновление любого потока всегда является полным вычислением. Если какое-либо условие не выполнено, это обновление возвращается к полной повторной компиляции текущего окна замены.

Рекомендации по инкрементальному обновлению

Следуйте этим рекомендациям, чтобы потоки REPLACE WHERE оставались допустимыми для добавочного обновления.

Используйте подвижную нижнюю границу

Предикаты с перемещением нижней границы остаются допустимыми для добавочного обновления на неопределенный срок.

FLOW REPLACE WHERE date >= date_add(current_date(), -7)

Перемещаемая верхняя граница, например date BETWEEN date_add(current_date(), -7) AND current_date(), может сдвинуть окно, чтобы включить ранее исключенные строки, активировав обратный возврат к полной повторной компиляции.

Включить столбец предикатов в GROUP BY

При агрегации включите столбец условия в GROUP BY, чтобы механизм мог протолкнуть условие ниже операции агрегации.

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;

Если столбец предиката отсутствует в GROUP BY, предикат нельзя протолкнуть ниже агрегации, и источник сканируется полностью.

Включить столбец предиката в ключи соединения

Включите столбец предиката в условие соединения, чтобы механизм мог отсечь все соединяемые источники.

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;

Если присоединённая таблица не содержит столбец предиката, при каждом обновлении она полностью сканируется.

Диагностировать переход к полному пересчёту

Когда обновление переходит к полному пересчёту, причина указывается в событии planning_information для потока. См. статью "Мониторинг журналов событий конвейера". В следующей таблице перечислены причины, сообщаемые в событии:

Причина Значение
EXTERNAL_CHANGE_IN_REPLACE_WINDOW Внешняя операция DML изменила строки в текущем окне замены.
REPLACE_WHERE_NOT_DETERMINISTIC Предикат использует недетерминированные выражения.
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC Предыдущее обновление использовало недетерминированный предикат.
UNSUPPORTED_REPLACE_WHERE_PREDICATE Предикат нельзя отправить в любой источник, текущее окно содержит строки, не обработанные предыдущим предикатом, или выполнение использует переопределение предиката.

Примеры

В следующих примерах показаны распространенные шаблоны потока REPLACE WHERE .

Пример 1. Сохранение статистических статистических данных из источника ограниченного хранения

Этот пример сохраняет ежедневные статистические данные неограниченное время даже после того, как необработанные данные выходят из исходной таблицы (3-дневное хранение):

CREATE OR REFRESH STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
  date,
  key,
  SUM(val) AS agg
FROM events_raw
GROUP BY ALL;

Пример 2. Предотвращение повторной компиляции при изменении таблицы измерений

В этом примере строки фактов сохраняются без изменений при изменении атрибутов измерения:

CREATE OR REFRESH STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
  f.date,
  f.user_id,
  d.region,
  f.revenue
FROM fact_table f
JOIN dim_users d
  ON f.user_id = d.user_id;

Если регион пользователя изменяется, перекомпьютируются только последние строки. Исторические строки сохраняют значение региона во время их записи.

Пример 3. Добавление новой метрики без повторной компиляции полной истории

В этом примере показано, как изменить определение таблицы и заполучить только целевой диапазон:

  1. Определите начальную таблицу:

    CREATE OR REFRESH STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks
    FROM clickstream_raw
    GROUP BY ALL;
    
  2. Обновите запрос, чтобы добавить uniq_users:

    CREATE OR REFRESH STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks,
      COUNT(DISTINCT user_id) AS uniq_users
    FROM clickstream_raw
    GROUP BY ALL;
    

    Строки старше 7-дневного окна содержат NULL для uniq_users.

Пример 4: Выполните итерации на небольшом временном диапазоне перед дозаполнением всей истории

В этом примере показано, как проверить логику запроса в небольшом окне данных перед обработкой полного исторического диапазона.

Начните с короткого окна для проверки метрик и итерации бизнес-логики с более низкими затратами на вычисления:

CREATE OR REFRESH STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
  event_date,
  campaign_id,
  SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;

Короткий интервал пересчитывает только последние 7 дней при каждом обновлении, поэтому корректируйте запрос столько раз, сколько необходимо, прежде чем запускать полный пересчёт за весь исторический период.

После завершения запроса используйте DML, чтобы заполнить полный исторический диапазон:

INSERT INTO revenue_attribution
SELECT
  event_date,
  campaign_id,
  SUM(revenue) AS total_revenue
FROM marketing_events
WHERE event_date < date_add(current_date(), -7)
GROUP BY ALL;