Compartir vía


append_flow

El @dp.append_flow decorador crea flujos de anexión o reposición para las tablas de canalización. La función debe devolver un dataframe de streaming de Apache Spark. Ver Carga y procesa datos de manera incremental con los flujos de canalizaciones declarativas en Spark de Lakeflow.

Los flujos de inserción pueden tener como destino tablas de streaming o sumideros.

Syntax

from pyspark import pipelines as dp

dp.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dp.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  once = <bool>, # optional, defaults to false
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming-query>) #

Parámetros

Parámetro Tipo Description
function function Obligatorio. Función que devuelve un DataFrame de streaming de Apache Spark desde una consulta definida por el usuario.
target str Obligatorio. Nombre de la tabla o receptor que es el destino del flujo de anexión.
name str Nombre del flujo. Si no se proporciona, el valor predeterminado es el nombre de la función.
once bool Opcionalmente, defina el flujo como un flujo de un solo uso, como un reposición. El uso de once=True cambia el flujo de dos maneras:
  • El valor de retorno. streaming-query. debe ser un dataframe por lotes en este caso, no un dataframe de streaming.
  • El flujo se ejecuta una vez de forma predeterminada. Si la canalización se actualiza por completo, el flujo ONCE se ejecuta nuevamente para recrear los datos.
comment str Descripción del flujo.
spark_conf dict Lista de configuraciones de Spark para la ejecución de esta consulta

Examples

from pyspark import pipelines as dp

# Create a sink for an external Delta table
dp.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

# Add an append flow to an external Delta table
@dp.append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return <streaming-query>

# Add a backfill
@dp.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
    return (
      spark.read
      .format("json")
      .load("/path/to/backfill/")
    )

# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Add an append flow to a Kafka sink
@dp.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
  return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))