Optimización del procesamiento con estado en Delta Live Tables con marcas de agua
Para administrar eficazmente los datos mantenidos en estado, use marcas de agua al realizar el procesamiento de flujos con estado en Delta Live Tables, incluidas agregaciones, combinaciones y desduplicación. En este artículo se describe cómo usar marcas de agua en las consultas de Delta Live Tables e incluye ejemplos de las operaciones recomendadas.
Nota:
Para asegurarse de que las consultas que realizan agregaciones se procesan de forma incremental y no se vuelven a calcular completamente con cada actualización, debe usar marcas de agua.
¿Qué es una marca de agua?
En el procesamiento de flujos, una marca de agua es una característica de Apache Spark que puede definir un umbral basado en el tiempo para procesar datos al realizar operaciones con estado, como agregaciones. Los datos que llegan se procesan hasta que se alcanza el umbral, en cuyo momento se cierra el período de tiempo definido por el umbral. Las marcas de agua se pueden usar para evitar problemas durante el procesamiento de consultas, principalmente cuando se procesan conjuntos de datos más grandes o procesamiento de larga duración. Estos problemas pueden incluir una latencia alta en la generación de resultados e incluso errores fuera de memoria (OOM) debido a la cantidad de datos que se mantienen en estado durante el procesamiento. Dado que los datos de streaming son intrínsecamente desordenados, las marcas de agua también admiten el cálculo correcto de operaciones como agregaciones de período de tiempo.
Para más información sobre el uso de marcas de agua en el procesamiento de flujos, consulte Marca de agua en Apache Spark Structured Streaming y Aplicar marcas de agua para controlar los umbrales de procesamiento de datos.
¿Cómo se define una marca de agua?
Se define una marca de agua especificando un campo de marca de tiempo y un valor que representa el umbral de tiempo para que lleguen datos tardíos. Los datos se consideran retrasados si llegan después del umbral de tiempo definido. Por ejemplo, si el umbral se define como 10 minutos, es posible que se quiten los registros que llegan después del umbral de 10 minutos.
Dado que los registros que llegan después del umbral definido se pueden quitar, es importante seleccionar un umbral que cumpla los requisitos de latencia frente a corrección. Si elige un umbral menor, los registros se emiten antes, pero también significa que es más probable que se quiten los registros retrasados. Un umbral mayor significa una espera más larga, pero posiblemente más integridad de los datos. Debido al tamaño de estado mayor, un umbral mayor también puede requerir recursos informáticos adicionales. Dado que el valor de umbral depende de los requisitos de procesamiento y datos, las pruebas y la supervisión del procesamiento son importantes para determinar un umbral óptimo.
Use la función withWatermark()
en Python para definir una marca de agua. En SQL, use la cláusula WATERMARK
para definir una marca de agua:
Python
withWatermark("timestamp", "3 minutes")
SQL
WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES
Uso de marcas de agua con combinaciones de flujo-flujo
Para las uniones flujo-flujo, debe definir una marca de agua a ambos lados de la unión y una cláusula de intervalo de tiempo. Dado que cada fuente de unión tiene una visión incompleta de los datos, la cláusula de intervalo de tiempo es necesaria para indicar al motor de flujo cuándo no se pueden realizar más coincidencias. La cláusula de intervalo de tiempo debe utilizar los mismos campos utilizados para definir las marcas de agua.
Dado que puede haber ocasiones en las que cada flujo requiera umbrales diferentes para las marcas de agua, las secuencias no necesitan tener los mismos umbrales. Para evitar que falten datos, el motor de streaming mantiene una marca de agua global basada en la secuencia más lenta.
En el ejemplo siguiente se combina una secuencia de impresiones publicitarias y un flujo de clics de usuario en anuncios. En este ejemplo, se debe producir un clic en un plazo de 3 minutos a partir de la impresión. Una vez transcurrido el intervalo de tiempo de 3 minutos, se quitan las filas del estado que ya no se pueden coincidir.
Python
import dlt
dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
clicksDf = (read_stream("rawClicks")
.withWatermark("clickTimestamp", "3 minutes")
)
impressionsDf = (read_stream("rawAdImpressions")
.withWatermark("impressionTimestamp", "3 minutes")
)
joinDf = impressionsDf.alias("imp").join(
clicksDf.alias("click"),
expr("""
imp.userId = click.userId AND
clickAdId = impressionAdId AND
clickTimestamp >= impressionTimestamp AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
"""),
"inner"
).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")
return joinDf
SQL
CREATE OR REFRESH STREAMING TABLE
silver.adImpressionClicks
AS SELECT
imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
(LIVE.bronze.rawAdImpressions)
WATERMARK
impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
(LIVE.bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
imp.userId = click.userId
AND
clickAdId = impressionAdId
AND
clickTimestamp >= impressionTimestamp
AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
Realizar agregaciones con ventanas con marcas de agua
Una operación con estado común en los datos de streaming es una agregación con ventanas. Las agregaciones con ventanas son similares a las agregaciones agrupadas, salvo que los valores agregados se devuelven para el conjunto de filas que forman parte de la ventana definida.
Una ventana se puede definir como una longitud determinada y una operación de agregación se puede realizar en todas las filas que forman parte de esa ventana. Spark Streaming admite tres tipos de ventanas:
- Ventanas de saltos de tamaño constante (fijas): Una serie de intervalos de tiempo de tamaño fijo, no solapados y contiguos. Un registro de entrada pertenece solo a una sola ventana.
- Ventanas deslizantes: similar a las ventanas de saltos de tamaño constante, las ventanas deslizantes tienen un tamaño fijo, pero las ventanas se pueden superponer y un registro puede caer en varias ventanas.
Cuando los datos llegan más allá del final de la ventana más la longitud de la marca de agua, no se acepta ningún dato nuevo para la ventana, se emite el resultado de la agregación y se quita el estado de la ventana.
En el ejemplo siguiente se calcula una suma de impresiones cada 5 minutos mediante una ventana fija. En este ejemplo, la cláusula de selección utiliza el alias impressions_window
y, a continuación, la propia ventana se define como parte de la cláusula GROUP BY
. La ventana debe basarse en la misma columna de marca de tiempo que la marca de agua, la columna clickTimestamp
de este ejemplo.
CREATE OR REFRESH STREAMING TABLE
gold.adImpressionSeconds
AS SELECT
impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
(LIVE.silver.adImpressionClicks)
WATERMARK
clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
impressionAdId, window(clickTimestamp, "5 minutes")
Un ejemplo similar en Python para calcular el beneficio a lo largo de las ventanas fijas por hora:
import dlt
@dlt.table()
def profit_by_hour():
return (
dlt.read_stream("sales")
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour").alias("time"))
.aggExpr("sum(profit) AS profit")
)
Desduplicar registros de streaming
Structured Streaming tiene garantías de procesamiento exactamente una vez, pero no quita automáticamente los registros duplicados de los orígenes de datos. Por ejemplo, dado que muchas colas de mensajes tienen al menos una garantía, se deben esperar registros duplicados al leer de una de estas colas de mensajes. Puede usar la función dropDuplicatesWithinWatermark()
para desduplicar registros en cualquier campo especificado, quitando duplicados de una secuencia incluso si algunos campos difieren (como la hora del evento o la hora de llegada). Debe especificar una marca de agua para usar la función dropDuplicatesWithinWatermark()
. Se quitan todos los datos duplicados que llegan dentro del intervalo de tiempo especificado por la marca de agua.
Los datos ordenados son importantes porque los datos desordenados provocan que el valor de la marca de agua salte por delante incorrectamente. A continuación, cuando llegan datos más antiguos, se considera tarde y se quita. Use la opción withEventTimeOrder
para procesar la instantánea inicial en orden en función de la marca de tiempo especificada en la marca de agua. La opción withEventTimeOrder
se puede declarar en el código que define el conjunto de datos o en la configuración de canalización mediante spark.databricks.delta.withEventTimeOrder.enabled
. Por ejemplo:
{
"spark_conf": {
"spark.databricks.delta.withEventTimeOrder.enabled": "true"
}
}
Nota:
La opción withEventTimeOrder
solo se admite con Python.
En el ejemplo siguiente, los datos se procesan ordenados por clickTimestamp
, y los registros que llegan en un plazo de 5 segundos entre sí que contienen userId
duplicados y columnas clickAdId
se quitan.
clicksDedupDf = (
spark.readStream
.option("withEventTimeOrder", "true")
.table(rawClicks)
.withWatermark("clickTimestamp", "5 seconds")
.dropDuplicatesWithinWatermark(["userId", "clickAdId"]))
Optimización de la configuración de canalización para el procesamiento con estado
Para ayudar a evitar problemas de producción y latencia excesiva, Databricks recomienda habilitar la administración de estado basada en RocksDB para el procesamiento de flujos con estado, especialmente si el procesamiento requiere ahorrar una gran cantidad de estado intermedio.
Las canalizaciones sin servidor administran automáticamente las configuraciones del almacén de estado.
Puede habilitar la administración de estado basada en RocksDB estableciendo la siguiente configuración antes de implementar una canalización:
{
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
}
}
Para más información sobre el almacén de estado de RocksDB, incluidas las recomendaciones de configuración de RocksDB, consulte Configuración del almacén de estados de RocksDB en Azure Databricks.