Nota
O acceso a esta páxina require autorización. Pode tentar iniciar sesión ou modificar os directorios.
O acceso a esta páxina require autorización. Pode tentar modificar os directorios.
Important
Los flujos de REPLACE WHERE para tablas de streaming autónomas están en fase Beta.
En esta página se describe cómo usar flujos REPLACE WHERE para volver a calcular y sobrescribir un subconjunto de destino de una tabla de streaming independiente sin volver a procesar todo el historial de tablas. Los flujos REPLACE WHERE controlan los datos de llegada tardía, el reprocesamiento ascendente, la evolución del esquema y los rerrellenos.
Con un flujo REPLACE WHERE , se define un predicado en la tabla de destino. Todas las filas que coinciden con el predicado se eliminan y se sustituyen volviendo a evaluar la consulta de origen para ese mismo rango del predicado. Las filas que no coinciden con el predicado se dejan intactas.
Requirements
Los flujos REPLACE WHERE tienen los siguientes requisitos:
- La tabla de transmisión debe usar el canal
PREVIEW. Consultechannelen Configuraciones de canalización. - Databricks recomienda Unity Catalog y la computación sin servidor. La actualización incremental solo se admite en el proceso de cálculo sin servidor.
Cuándo usar flujos REPLACE WHERE
Use flujos REPLACE WHERE para los escenarios siguientes:
- Procesamiento por lotes incremental sin semántica de streaming: procese nuevas filas en lotes sin administrar conceptos de streaming como marcas de agua.
- Reprocesamiento selectivo: recompute solo las filas que coinciden con un predicado mientras dejan sin modificar todas las demás filas.
-
Escenarios más allá de las funcionalidades de vista materializadas estándar:
- Tablas de destino con retención más larga que el origen
- Impedir la recomputación cuando cambia una tabla de dimensiones
- Evolución del esquema sin volver a calcular todo el historial
Crear un flujo REPLACE WHERE
Use la cláusula FLOW REPLACE WHERE en línea con CREATE OR REFRESH STREAMING TABLE:
CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.channel = 'PREVIEW')
SCHEDULE EVERY 1 DAY
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Durante la actualización, se eliminan todas las filas de la tabla de destino que coinciden con el predicado, se vuelve a calcular la consulta de origen para ese mismo intervalo de predicado y se insertan los nuevos resultados. En este ejemplo, todas las filas de los últimos 7 días se eliminan de orders_enriched y se recalculan usando la consulta de origen.
No es necesario agregar el predicado a la consulta de origen. El motor de canalización lo aplica automáticamente al leer desde el origen.
Note
BY NAME es obligatorio. Garantiza que las columnas coincidan por nombre en lugar de por posición.
Reposición de datos históricos
Para realizar reposición, ejecute instrucciones DML directamente en la tabla de destino:
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';
Comportamiento de actualización completa
Una actualización completa de un flujo REPLACE WHERE vuelve a ejecutar la consulta de origen con solo el predicado actual. Las filas insertadas por sentencias DML fuera del rango del predicado actual se eliminan permanentemente.
Advertencia
Una actualización completa borra todos los datos existentes y vuelve a ejecutar el flujo con solo su predicado definido. Si una canalización ha estado ejecutándose durante un año con un predicado de 7 días, una actualización completa hace que la tabla contenga solo los datos de los últimos 7 días. Todas las filas anteriores se eliminan permanentemente.
REFRESH STREAMING TABLE orders_enriched FULL;
Para evitar actualizaciones completas en una tabla, establezca la propiedad de tabla pipelines.reset.allowed en false:
CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.reset.allowed = 'false')
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
...
Actualización incremental
Los flujos REPLACE WHERE usan la actualización incremental siempre que sea posible, reprocesando solo los datos de origen que han cambiado desde la última actualización en lugar de volver a calcular toda la ventana de reemplazo. La actualización incremental requiere un proceso sin servidor.
Cuando se aplica la actualización incremental
Todo lo siguiente debe ser verdadero:
- La canalización se ejecuta en computación sin servidor.
- Se admite la forma de consulta. Consulte Actualización incremental para el conjunto de operadores admitidos.
- El predicado hace referencia a columnas base de una tabla de origen. Los predicados sobre valores derivados, como los resultados de funciones de agregado o de ventana, no se pueden aplicar en el origen, lo que desactiva la actualización incremental.
- Ninguna fila ha sido modificada por DML externo en la ventana de reemplazo actual. DML que modifica filas fuera de la ventana actual no se ve afectada.
- La ventana de reemplazo actual no incluye filas excluidas del predicado anterior. Si se amplía el predicado para abarcar un intervalo que no se había procesado previamente, esa actualización recurre de nuevo a la recomputación completa. Las actualizaciones posteriores son aptas para la actualización incremental de nuevo.
- El predicado es determinista. Los predicados que usan funciones no deterministas, como
rand(), deshabilitan la actualización incremental. Se permiten funciones temporales comocurrent_date().
La primera actualización de cualquier flujo siempre es un cálculo completo. Si no se cumple alguna condición, esa actualización vuelve a la recomputación completa de la ventana de reemplazo actual.
Procedimientos recomendados para la actualización incremental
Siga estas instrucciones para que los flujos REPLACE WHERE sigan siendo aptos para la actualización incremental.
Usar un límite inferior móvil
Los predicados con un límite inferior móvil siguen siendo aptos para la actualización incremental indefinidamente.
FLOW REPLACE WHERE date >= date_add(current_date(), -7)
Un límite superior móvil, como date BETWEEN date_add(current_date(), -7) AND current_date(), puede desplazar la ventana para incluir filas que antes quedaban excluidas, lo que provoca un recurso puntual al recálculo completo.
Incluir la columna de predicado en GROUP BY
Al realizar una agregación, incluya la columna del predicado en GROUP BY para que el motor pueda empujar el predicado por debajo de la agregación.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;
Si falta la columna del predicado en GROUP BY, el predicado no se puede desplazar por debajo de la agregación y la fuente se escanea por completo.
Incluir la columna de predicado en claves de combinación
Incluya la columna del predicado en la condición de unión para que el motor pueda descartar todas las fuentes unidas.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;
Si una tabla combinada no expone la columna de predicado, esa tabla se examina en su totalidad en cada actualización.
Diagnosticar el recurso a la recomputación completa
Cuando una actualización recurre a una recomputación completa, el motivo se informa en el evento planning_information del flujo. Consulte Supervisión de los registros de eventos de canalización. En la tabla siguiente se enumeran los motivos notificados en el evento:
| Motivo | Meaning |
|---|---|
EXTERNAL_CHANGE_IN_REPLACE_WINDOW |
Una instrucción DML externa modificó filas en la ventana de reemplazo actual. |
REPLACE_WHERE_NOT_DETERMINISTIC |
El predicado usa expresiones no deterministas. |
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC |
La actualización anterior usó un predicado no determinista. |
UNSUPPORTED_REPLACE_WHERE_PREDICATE |
El predicado no se puede aplicar a ningún origen, la ventana actual contiene filas no procesadas por el predicado anterior o la ejecución usa una anulación del predicado. |
Ejemplos
En los ejemplos siguientes se muestran patrones de flujo REPLACE WHERE comunes.
Ejemplo 1: Mantener agregados históricos de un origen de retención limitado
Este ejemplo mantiene agregados diarios de forma indefinida, incluso después de que los datos brutos caduquen en la tabla de origen (retención de 3 días):
CREATE OR REFRESH STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;
Ejemplo 2: Evitar la recomputación cuando cambia una tabla de dimensiones
En este ejemplo se mantienen las filas de hechos históricas sin cambios cuando cambian los atributos de dimensión:
CREATE OR REFRESH STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;
Si cambia la región de un usuario, solo se vuelven a calcular las filas recientes. Las filas históricas conservan el valor de región en el momento en que se escribieron.
Ejemplo 3: Agregar una nueva métrica sin volver a calcular el historial completo
En este ejemplo se muestra cómo evolucionar una definición de tabla y rellenar solo un intervalo de destino:
Defina la tabla inicial:
CREATE OR REFRESH STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks FROM clickstream_raw GROUP BY ALL;Actualice la consulta para agregar
uniq_users:CREATE OR REFRESH STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks, COUNT(DISTINCT user_id) AS uniq_users FROM clickstream_raw GROUP BY ALL;Las filas anteriores a la ventana de 7 días contienen
NULLparauniq_users.
Ejemplo 4: Iteración en una ventana pequeña antes de rellenar el historial completo
En este ejemplo se muestra cómo validar la lógica de consulta en una ventana de datos pequeña antes de procesar el intervalo histórico completo.
Comience con una ventana corta para validar las métricas e iterar en la lógica de negocios con menores costos de proceso:
CREATE OR REFRESH STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;
Una ventana corta recalcula solo los últimos 7 días en cada actualización, por lo que revise la consulta tantas veces como sea necesario antes de lanzar una ejecución histórica completa.
Una vez finalizada la consulta, use DML para rellenar el intervalo histórico completo:
INSERT INTO revenue_attribution
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
WHERE event_date < date_add(current_date(), -7)
GROUP BY ALL;