Aplicar marcas de agua para controlar los umbrales de procesamiento de datos

En este artículo se presentan los conceptos básicos de la marca de agua y se proporcionan recomendaciones para usar marcas de agua en operaciones comunes de streaming con estado. Debe aplicar marcas de agua a las operaciones de streaming con estado para evitar expandir infinitamente la cantidad de datos que se mantienen en el estado, lo que podría generar problemas de memoria y aumentar las latencias de procesamiento durante las operaciones de streaming de larga ejecución.

¿Qué es una marca de agua?

Structured Streaming usa marcas de agua para controlar el umbral de cuánto tiempo se siguen procesando las actualizaciones de una entidad de estado determinada. Entre los ejemplos comunes de entidades de estado se incluyen:

  • Agregaciones a lo largo de una ventana de tiempo.
  • Claves únicas en una combinación entre dos flujos.

Al declarar una marca de agua, se especifica un campo de marca de tiempo y un umbral de marca de agua en un DataFrame de streaming. A medida que llegan nuevos datos, el administrador de estado realiza un seguimiento de la marca de tiempo más reciente en el campo especificado y procesa todos los registros dentro del umbral de demora.

En el ejemplo siguiente se aplica un umbral de marca de agua de 10 minutos a un recuento con ventanas:

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

En este ejemplo:

  • La columna event_time se usa para definir una marca de agua de 10 minutos y una ventana de saltos de tamaño constante de 5 minutos.
  • Se recopila un recuento para cada id observado para cada ventana de 5 minutos no superpuesta.
  • La información de estado se mantiene para cada recuento hasta que el final de la ventana es de 10 minutos más antiguo que el último event_time observado.

Importante

Los umbrales de marca de agua garantizan que los registros que llegan dentro del umbral especificado se procesen según la semántica de la consulta definida. Es posible que los registros que lleguen más tarde del umbral especificado se procesen igualmente mediante métricas de consulta, pero no se garantiza.

¿Cómo afectan las marcas de agua al tiempo de procesamiento y al rendimiento?

Las marcas de agua interactúan con los modos de salida para controlar cuándo se escriben los datos en el receptor. Dado que las marcas de agua reducen la cantidad total de información de estado que se va a procesar, el uso eficaz de las marcas de agua es esencial para un rendimiento eficaz de streaming con estado.

Nota:

No todos los modos de salida son compatibles con todas las operaciones con estado.

Marcas de agua y modo de salida para agregaciones con ventanas

En la tabla siguiente se detalla el procesamiento de consultas con agregación en una marca de tiempo con una marca de agua definida:

Modo de salida Comportamiento
Append Las filas se escriben en la tabla de destino una vez que se ha superado el umbral de marca de agua. Todas las escrituras se retrasan en función del umbral de demora. El estado de agregación anterior se anula una vez que se ha superado el umbral.
Actualizar Las filas se escriben en la tabla de destino a medida que se calculan los resultados, y se pueden actualizar y sobrescribir a medida que llegan nuevos datos. El estado de agregación anterior se anula una vez que se ha superado el umbral.
Operación completada El estado de agregación no se anula. La tabla de destino se sobrescribe con cada desencadenador.

Marcas de agua y salida para combinaciones de secuencia a secuencia

Las combinaciones entre varias secuencias solo admiten el modo de anexión, y los registros coincidentes se escriben en cada lote en que se descubren. Para las combinaciones internas, Databricks recomienda establecer un umbral de marca de agua en cada origen de datos de streaming. Esto permite descartar la información de estado de los registros antiguos. Sin marcas de agua, Structured Streaming intenta combinar cada clave desde ambos lados de la combinación con cada desencadenador.

Structured Streaming tiene semántica especial para admitir combinaciones externas. La marca de agua es obligatoria para las combinaciones externas, ya que indica cuándo se debe escribir una clave con un valor NULL después de no coincidir. Tenga en cuenta que, aunque las combinaciones externas pueden ser útiles para registrar registros que nunca coinciden durante el procesamiento de datos, dado que las combinaciones solo escriben en tablas como operaciones de anexión, estos datos que faltan no se registran hasta después de que se haya superado el umbral de demora.

Controlar el umbral de datos tardíos con una directiva de marca de agua múltiple en Structured Streaming

Al trabajar con varias entradas de Structured Streaming, es posible establecer varias marcas de agua con las que controlar los umbrales de tolerancia de los datos de llegada tardía. Configurar marcas de agua le permitirá controlar la información de estado y afectará a la latencia.

Una consulta de streaming puede tener varios flujos de entrada que se unen o se combinan. Cada uno de los flujos de entrada puede tener un umbral diferente de datos en tiempo de retraso que debe tolerarse para las operaciones con estado. Especifique estos umbrales mediante el uso del elemento withWatermarks("eventTime", delay) en cada uno de los flujos de entrada. A continuación se muestra una consulta de ejemplo en la que se usan combinaciones de flujos de datos.

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

Durante la ejecución de la consulta, Structured Streaming realiza un seguimiento individual del tiempo máximo de evento que se observa en cada flujo de entrada, calcula las marcas de agua en función del retraso correspondiente y, a partir de estas, elige una marca de agua única y global que se usará en las operaciones con estado. De forma predeterminada, el mínimo se elige como marca de agua global, ya que garantiza que ningún dato se elimina accidentalmente como demasiado tarde si uno de los flujos está detrás de los restantes (por ejemplo, uno de los flujos deja de recibir datos debido a errores ascendentes). Es decir, la marca de agua global se moverá de forma segura al ritmo del flujo más lento y la salida de la consulta se retrasará de forma correspondiente a este ritmo.

Si quiere obtener resultados más rápidamente, puede establecer la directiva de varias marcas de agua de forma que esta elija el valor máximo como marca de agua global. Para ello, deberá establecer la configuración spark.sql.streaming.multipleWatermarkPolicy de SQL con el valor max (el valor predeterminado es min). Esto permite que la marca de agua global se mueva al ritmo del flujo más rápido. Sin embargo, el uso de esta configuración resulta en pérdidas de datos de las secuencias más lentas. Por consiguiente, Databricks recomienda usar esta configuración prudentemente.

Anular duplicados en la marca de agua

En Databricks Runtime 13.3 LTS y versiones posteriores, puede quitar los registros duplicados en un umbral de marca de agua mediante un identificador único.

Structured Streaming proporciona garantías de procesamiento exactamente una vez, pero no quita automáticamente los registros duplicados de los orígenes de datos. Puede usar dropDuplicatesWithinWatermark para quitar los registros duplicados en cualquier campo especificado, lo que le permite quitar los duplicados de una secuencia incluso aunque algunos campos difieran (como la hora del evento o la hora de llegada).

Se garantiza la anulación de los registros duplicados que lleguen dentro de la marca de agua especificada. Esta garantía es estricta en una sola dirección y es posible que también se anulen los registros duplicados que lleguen fuera del umbral especificado. Para quitar todos los duplicados, debe establecer un umbral de retraso de la marca de agua superior a las diferencias máximas de marca de tiempo entre los eventos duplicados.

Debe especificar una marca de agua para usar el método dropDuplicatesWithinWatermark, tal como se muestra en el ejemplo siguiente:

Python

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark("guid")
)

Scala

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark("guid")