Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Para administrar eficazmente los datos mantenidos en estado, use marcas de agua al realizar el procesamiento de flujos con estado en canalizaciones declarativas de Spark de Lakeflow, incluidas agregaciones, combinaciones y desduplicación. En este artículo se describe cómo usar marcas de agua en las consultas de canalización e incluye ejemplos de las operaciones recomendadas.
Nota:
Para asegurarse de que las consultas que realizan agregaciones se procesan de forma incremental y no se recalculen completamente cada vez que hay una actualización, debe usar marcas de agua.
¿Qué es una marca de agua?
En el procesamiento de flujos, una marca de agua es una característica de Apache Spark que puede definir un umbral basado en el tiempo para procesar datos al realizar operaciones con estado, como agregaciones. Los datos que llegan se procesan hasta que se alcanza el umbral, en cuyo momento se cierra el período de tiempo definido por el umbral. Las marcas de agua se pueden usar para evitar problemas durante el procesamiento de consultas, principalmente cuando se procesan conjuntos de datos más grandes o procesamiento de larga duración. Estos problemas pueden incluir una latencia alta en la generación de resultados e incluso errores fuera de memoria (OOM) debido a la cantidad de datos que se mantienen en estado durante el procesamiento. Dado que los datos de streaming son intrínsecamente desordenados, las marcas de agua también admiten el cálculo correcto de operaciones como agregaciones de ventana de tiempo.
Para más información sobre el uso de marcas de agua en el procesamiento de flujos, consulte Watermarking in Apache Spark Structured Streaming and Apply watermarks to control data processing thresholds (Aplicar marcas de agua en Apache Spark Structured Streaming y Aplicar marcas de agua para controlar los umbrales de procesamiento de datos).
¿Cómo se define una marca de agua?
Para definir una marca de agua, especifique un campo de marca de tiempo y un valor que represente el umbral de tiempo para que lleguen los datos tardíos. Los datos se consideran retrasados si llegan después del umbral de tiempo definido. Por ejemplo, si el umbral se define como 10 minutos, es posible que se quiten los registros que llegan después del umbral de 10 minutos.
Dado que los registros que llegan después del umbral definido se pueden quitar, es importante seleccionar un umbral que cumpla los requisitos de latencia frente a corrección. Si elige un umbral menor, los registros se emiten antes, pero también significa que es más probable que se quiten los registros retrasados. Un umbral mayor significa una espera más larga, pero posiblemente más integridad de los datos. Debido al tamaño de estado mayor, un umbral mayor también puede requerir recursos informáticos adicionales. Dado que el valor de umbral depende de los requisitos de procesamiento y datos, las pruebas y la supervisión del procesamiento son importantes para determinar un umbral óptimo.
Usas la función withWatermark() en Python para definir una marca de agua. En SQL, use la WATERMARK cláusula para definir una marca de agua:
Pitón
withWatermark("timestamp", "3 minutes")
SQL
WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES
Utilizar marcas de agua en uniones de flujo a flujo
Para las combinaciones de flujo-flujo, debe definir una marca de agua en ambos lados de la combinación y establecer una cláusula de intervalo de tiempo. Dado que cada origen de combinación tiene una vista incompleta de los datos, se requiere la cláusula time interval para indicar al motor de streaming cuando no se pueden realizar más coincidencias. La cláusula de intervalo de tiempo debe utilizar los mismos campos utilizados para definir las marcas de agua.
Dado que puede haber ocasiones en las que cada corriente requiera umbrales diferentes para las marcas de agua, las corrientes no necesitan tener los mismos umbrales. Para evitar la pérdida de datos, el motor de streaming mantiene un indicador de tiempo global basado en el flujo más lento.
En el ejemplo siguiente se combina un flujo de impresiones publicitarias y un flujo de clics de usuarios en los anuncios. En este ejemplo, se debe producir un clic en un plazo de 3 minutos a partir de la impresión. Una vez transcurrido el intervalo de tiempo de 3 minutos, se eliminan del estado las filas que ya no pueden ser emparejadas.
Pitón
from pyspark import pipelines as dp
dp.create_streaming_table("adImpressionClicks")
@dp.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
clicksDf = (read_stream("rawClicks")
.withWatermark("clickTimestamp", "3 minutes")
)
impressionsDf = (read_stream("rawAdImpressions")
.withWatermark("impressionTimestamp", "3 minutes")
)
joinDf = impressionsDf.alias("imp").join(
clicksDf.alias("click"),
expr("""
imp.userId = click.userId AND
clickAdId = impressionAdId AND
clickTimestamp >= impressionTimestamp AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
"""),
"inner"
).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")
return joinDf
SQL
CREATE OR REFRESH STREAMING TABLE
silver.adImpressionClicks
AS SELECT
imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
(bronze.rawAdImpressions)
WATERMARK
impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
(bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
imp.userId = click.userId
AND
clickAdId = impressionAdId
AND
clickTimestamp >= impressionTimestamp
AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
Realizar agregaciones en ventanas temporales con marcas de agua
Una operación común de mantenimiento de estado en datos de streaming es una agregación por ventanas. Las agregaciones con ventanas son similares a las agregaciones agrupadas, salvo que los valores agregados se devuelven para el conjunto de filas que forman parte de la ventana definida.
Una ventana se puede definir como una longitud determinada y una operación de agregación se puede realizar en todas las filas que forman parte de esa ventana. Spark Streaming admite tres tipos de ventanas:
- Ventanas deslizantes de tamaño fijo: una serie de intervalos de tiempo contiguos, no superpuestos y de tamaño fijo. Un registro de entrada pertenece solo a una sola ventana.
- Ventanas deslizantes: Al igual que las ventanas rodantes, las ventanas deslizantes son de tamaño fijo, pero se pueden superponer y un registro puede caer en varias ventanas.
Cuando los datos llegan más allá del final de la ventana más la longitud de la marca de agua, no se acepta ningún dato nuevo para la ventana, se emite el resultado de la agregación y se quita el estado de la ventana.
En el ejemplo siguiente se calcula una suma de impresiones cada 5 minutos mediante una ventana fija. En este ejemplo, la cláusula select usa el alias impressions_windowy, a continuación, la propia ventana se define como parte de la GROUP BY cláusula . La ventana debe basarse en la misma columna de marca de tiempo que la marca de agua, la columna clickTimestamp en este caso.
CREATE OR REFRESH STREAMING TABLE
gold.adImpressionSeconds
AS SELECT
impressionAdId, window(clickTimestamp, "5 minutes") as impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
(silver.adImpressionClicks)
WATERMARK
clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
impressionAdId, window(clickTimestamp, "5 minutes")
Un ejemplo similar en Python para calcular el beneficio a lo largo de las ventanas fijas por hora:
from pyspark import pipelines as dp
@dp.table()
def profit_by_hour():
return (
spark.readStream.table("sales")
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour").alias("time"))
.aggExpr("sum(profit) AS profit")
)
Desduplicar registros de streaming
Structured Streaming tiene garantías de procesamiento de manera exacta una vez, pero no desduplica automáticamente registros de orígenes de datos. Por ejemplo, dado que muchas colas de mensajes tienen garantías de entrega al menos una vez, se deben esperar registros duplicados al leer desde una de estas colas de mensajes. Puede usar la dropDuplicatesWithinWatermark() función para desduplicar registros en cualquier campo especificado, quitando duplicados de una secuencia incluso si algunos campos difieren (como la hora del evento o la hora de llegada). Debe especificar una marca de agua para usar la función dropDuplicatesWithinWatermark(). Se descartan todos los datos duplicados que llegan dentro del intervalo de tiempo especificado por la marca de agua.
Los datos ordenados son importantes porque los datos desordenados provocan que el valor de la marca de agua salte por delante incorrectamente. Luego, cuando llegan datos más antiguos, se considera retrasada y se descarta. Utilice la opción withEventTimeOrder para procesar la instantánea inicial en orden según el sello temporal especificado en la marca de agua. La withEventTimeOrder opción se puede declarar en el código que define el conjunto de datos o en la configuración de la canalización mediante spark.databricks.delta.withEventTimeOrder.enabled. Por ejemplo:
{
"spark_conf": {
"spark.databricks.delta.withEventTimeOrder.enabled": "true"
}
}
Nota:
La withEventTimeOrder opción solo se admite con Python.
En el ejemplo siguiente, los datos se procesan ordenados por clickTimestampy los registros que llegan en un plazo de 5 segundos entre sí que contienen columnas duplicadas userId y clickAdId se quitan.
clicksDedupDf = (
spark.readStream.table
.option("withEventTimeOrder", "true")
.table("rawClicks")
.withWatermark("clickTimestamp", "5 seconds")
.dropDuplicatesWithinWatermark(["userId", "clickAdId"]))
Optimización de la configuración de canalización para el procesamiento con estado
Para ayudar a evitar problemas de producción y latencia excesiva, Databricks recomienda habilitar la administración de estado basada en RocksDB para el procesamiento de flujos con estado, especialmente si el procesamiento requiere ahorrar una gran cantidad de estado intermedio.
Las canalizaciones sin servidor administran automáticamente las configuraciones del almacén de estado.
Puede habilitar la administración de estado basada en RocksDB estableciendo la siguiente configuración antes de implementar una canalización:
{
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
}
}
Para más información sobre el almacén de estado de RocksDB, incluidas las recomendaciones de configuración de RocksDB, consulte Configuración del almacén de estados de RocksDB en Azure Databricks.