Compartir a través de


Selección de un modo de salida para Structured Streaming

En este artículo se describe cómo seleccionar un modo de salida para el streaming con estado. Solo las secuencias con estado que contengan agregaciones requieren una configuración del modo de salida.

Las combinaciones solo admiten el modo de salida anexado y el modo de salida no afecta a la desduplicación. Los operadores con estado arbitrario mapGroupsWithState y flatMapGroupsWithState emiten registros mediante su propia lógica personalizada, por lo que el modo de salida de la secuencia no afecta a su comportamiento.

Para el streaming sin estado, todos los modos de salida se comportan igual.

Para configurar el modo de salida correctamente, debe comprender el streaming con estado, las referencias y los desencadenadores. Vea los artículos siguientes:

¿Qué es el modo de salida?

El modo de salida de una consulta de Structured Streaming determina qué registros emiten los operadores de la consulta durante cada desencadenador. Los tres tipos de registros que se pueden emitir son:

  • Registra que el procesamiento futuro no cambia.
  • Registros que han cambiado desde el último desencadenador.
  • Todos los registros de la tabla de estado.

Saber qué tipos de registros emitir es importante para los operadores con estado, ya que una fila determinada generada por un operador con estado podría cambiar de desencadenador a desencadenador. Por ejemplo, como un operador de agregación de streaming recibe más filas para una ventana determinada, los valores de agregación de esa ventana pueden cambiar entre desencadenadores.

En el caso de los operadores sin estado, la distinción entre los tipos de registro no afecta al comportamiento del operador. Los registros que emite un operador sin estado durante un desencadenador siempre son los registros de origen procesados durante ese desencadenador.

Modos de salida disponibles

Hay tres modos de salida que indican a un operador qué registros emitir durante un desencadenador determinado:

Modo de salida Descripción
Modo de anexión (valor predeterminado) De forma predeterminada, las consultas de streaming se ejecutan en modo de anexión. En este modo, los operadores solo emiten filas que no cambian en desencadenadores futuros. Los operadores con estado usan la referencia para determinar cuándo ocurre esto.
Modo de actualización En el modo de actualización, los operadores emiten todas las filas que cambiaron durante el desencadenador, incluso si el registro emitido podría cambiar en un desencadenador posterior.
Modo completo El modo completo solo funciona con agregaciones de streaming. En modo completo, todas las filas resultantes producidas por el operador se emiten de bajada.

Consideraciones de producción

Para muchas operaciones de streaming con estado, debe elegir entre los modos de anexión y actualización. En las secciones siguientes se describen las consideraciones que podrían determinar su decisión.

Nota:

El modo completo tiene algunas aplicaciones, pero puede funcionar mal a medida que se escalan los datos. Databricks recomienda usar vistas materializadas para obtener garantías semánticas asociadas al modo completo con el procesamiento incremental para muchas operaciones con estado. Consulte Uso de vistas materializadas en Databricks SQL.

Semántica de aplicaciones

La semántica de la aplicación describe cómo las aplicaciones de nivel inferior usan los datos de streaming.

Si los servicios de bajada deben realizar una sola acción para cada escritura de bajada, use el modo de anexión en la mayoría de los casos. Por ejemplo, si tiene un servicio de notificación de bajada que envía notificaciones para cada registro nuevo escrito en el receptor, el modo de anexión garantiza que cada registro solo se escriba una vez. El modo de actualización escribe el registro cada vez que cambia la información de estado, lo que daría lugar a numerosas actualizaciones.

Si los servicios de bajada necesitan resultados nuevos, el modo de actualización garantiza que el receptor permanezca lo más actualizado posible. Algunos ejemplos incluyen un modelo de aprendizaje automático que lee características en tiempo real o un panel de análisis que realiza el seguimiento de agregados en tiempo real.

Compatibilidad de operador y receptor

Structured Streaming no admite todas las operaciones disponibles en Apache Spark y algunas operaciones de streaming no se admiten en todos los modos de salida. Para más información sobre las limitaciones del operador, consulte los documentos de streaming de software de código abierto.

No todos los receptores admiten todos los modos de salida. Delta Lake, que respalda todas las tablas administradas por Unity Catalog, y Kafka admiten todos los modos de salida. Para más información sobre la compatibilidad del receptor, consulte los documentos de streaming de software de código abierto.

Latencia y costo

El modo de salida afecta a cuánto tiempo debe transcurrir antes de escribir un registro, y la frecuencia y la cantidad de datos escritos pueden afectar a los costos asociados a las canalizaciones de streaming.

El modo de anexión obliga a los operadores con estado a emitir resultados solo después de que se finalicen los resultados con estado, lo que es al menos tan largo como el retraso de la referencia. Un retraso de referencia de 1 hour en el modo de salida de anexión significa que los registros tienen al menos un retraso de 1 hora antes de emitirse hacia abajo.

El modo de actualización da como resultado una escritura por desencadenador por valor agregado. Si el receptor cobra por escritura por registro, esto puede ser costoso si los registros se actualizan muchas veces antes de que pase el retraso de la referencia.

Ejemplos de configuración

En los ejemplos de código siguientes se muestra cómo configurar el modo de salida para las actualizaciones de streaming en tablas de Unity Catalog:

Python

# Append output mode (default)
(df.writeStream
  .toTable("target_table")
)

# Append output mode (same as default behavior)
(df.writeStream
  .outputMode("append")
  .toTable("target_table")
)

# Update output mode
(df.writeStream
  .outputMode("update")
  .toTable("target_table")
)

# Complete output mode
(df.writeStream
  .outputMode("complete")
  .toTable("target_table")
)

Scala

// Append output mode (default)
df.writeStream
  .toTable("target_table")

// Append output mode (same as default behavior)
df.writeStream
  .outputMode("append")
  .toTable("target_table")

// Update output mode
df.writeStream
  .outputMode("update")
  .toTable("target_table")

// Complete output mode
df.writeStream
  .outputMode("complete")
  .toTable("target_table")

Consulte los documentos de software de código abierto para PySpark DataStreamWriter.outputMode o Scala DataStreamWriter.outputMode.

Ejemplo de modos de streaming y salida con estado

El ejemplo siguiente está diseñado para ayudarle a razonar sobre cómo interactúa el modo de salida con las referencias para el streaming con estado.

Considere una agregación de streaming que calcula los ingresos totales generados cada hora en un almacén con un retraso de referencia de 15 minutos. El primer microlote procesa los siguientes registros:

  • 15 USD a las 2:40 p. m.
  • 10 USD a las 2:30 p. m.
  • 30 USD a las 3:10 p. m.

En este momento, la referencia del motor es de 2:55 p.m. porque resta 15 minutos (el retraso) del tiempo máximo visto (3:10 p. m.). El operador de agregación de streaming tiene lo siguiente en su estado:

  • [2pm, 3pm]: 25 USD
  • [3pm, 4pm]: 30 USD

En la tabla siguiente se describe lo que sucedería en cada modo de salida:

Modo de salida Resultado y motivo
Anexar El operador de agregación de streaming no emite nada de bajada. Esto se debe a que ambas ventanas pueden cambiar a medida que aparezcan nuevos valores con un desencadenador posterior: la referencia de 2:55 p.m. indica que los registros después de las 2:55 p. m. podrían llegar y esos registros podrían caer en la ventana de [2pm, 3pm] o en la ventana de [3pm, 4pm].
Actualizar El operador emite ambos registros, porque ambos registros recibieron actualizaciones.
Completo El operador emite todos los registros.

Supongamos que la secuencia recibe un registro más:

  • 20 USD a las 3:20 p. m.

La referencia se actualiza a las 3:05 p.m. porque el motor resta 15 minutos de las 3:20 p. m. En este punto, el operador de agregación de streaming tiene lo siguiente en su estado:

  • [2pm, 3pm]: 25 USD
  • [3pm, 4pm]: 50 USD

En la tabla siguiente se describe lo que sucedería en cada modo de salida:

Modo de salida Resultado y motivo
Anexar El operador de agregación de streaming observa que la referencia de las 3:05 p. m. es mayor que el final de la ventana [2pm, 3pm]. Por la definición de la referencia, esa ventana ya no puede cambiar, por lo que emite la ventana [2pm, 3pm].
Actualizar El operador de agregación de streaming emite la ventana [3pm, 4pm] porque el valor de estado ha cambiado de 30 a 50 USD.
Completo El operador emite todos los registros.

A continuación se resume cómo se comportan los operadores con estado en cada modo de anexión:

  • En el modo de anexión, escriba registros una vez después del retraso de la referencia.
  • En el modo de actualización, escriba registros que han cambiado desde el desencadenador anterior.
  • En modo completo, escriba todos los registros producidos por el operador con estado.