update_flow

Important

La update_flow API está en versión preliminar pública.

Use el @dp.update_flow decorador para crear un flujo de actualización. Actualice los flujos de escritura en receptores mediante el modo de salida de actualización, emitiendo solo las filas que cambiaron en cada lote. A diferencia de los flujos de anexión, admiten agregaciones con estado sin necesidad de una marca de agua.

Los flujos de actualización solo pueden dirigirse a receptores. No se admiten tablas delta.

Syntax

from pyspark import pipelines as dp

dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})

@dp.update_flow(
  target = "<sink-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
  comment = "<comment>", # optional
  import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
  return (<streaming-query>)

Parameters

Parámetro Tipo Description
function function Required. Función que devuelve un DataFrame de streaming de Apache Spark desde una consulta definida por el usuario.
target str Required. Nombre del receptor en el que escribe este flujo.
name str Nombre del flujo. Si no se proporciona, el valor predeterminado es el nombre de la función.
comment str Descripción del flujo.
spark_conf dict Un dict de configuraciones de Spark para la ejecución de esta consulta. Estas configuraciones invalidan los confs establecidos para el destino, la canalización o el clúster.
import_checkpoint str Ruta de acceso de punto de control externo que se va a importar antes de iniciar el flujo. Se importa solo una vez, cuando el directorio de punto de control del flujo aún no existe.

Ejemplos

Agregación a un receptor de Kafka

Escriba resultados de agregación con estado en un receptor de Kafka:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="event_counts_flow",
    target="event_counts_sink",
)
def event_counts():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .selectExpr("CAST(key AS STRING) AS event_type")
            .groupBy(col("event_type"))
            .count()
    )

Modo en tiempo real

Use spark_conf para configurar un flujo de actualización para el modo en tiempo real:

from pyspark import pipelines as dp

dp.create_sink("my_kafka_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="my_rtm_flow",
    target="my_kafka_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def my_real_time_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
    )

Limitaciones

  • Los receptores de tabla delta no se admiten como destinos para los flujos de actualización.