Compartir a través de


añadir_flujo

El @dlt.append_flow decorador crea flujos de anexión o reposición para las tablas de canalizaciones declarativas de Lakeflow. La función debe devolver un dataframe de streaming de Apache Spark. Consulte Carga y procesamiento de datos de forma incremental con flujos de canalizaciones declarativas de Lakeflow.

Los flujos de anexión pueden estar dirigidos a tablas o receptores de streaming.

Sintaxis

import dlt

dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming query>)

Parámetros

Parámetro Tipo Descripción
función function Obligatorio. Función que devuelve un DataFrame de streaming de Apache Spark desde una consulta definida por el usuario.
target str Obligatorio. Nombre de la tabla o receptor que es el destino del flujo de anexión.
name str El nombre del flujo. Si no se proporciona, el valor predeterminado es el nombre de la función.
comment str Descripción del flujo.
spark_conf dict Lista de configuraciones de Spark para la ejecución de esta consulta

Ejemplos

import dlt

# Create a sink for an external Delta table
dlt.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

# Add an append flow to an external Delta table
@dlt.append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return <streaming-query>

# Create a Kafka sink
dlt.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Add an append flow to a Kafka sink
@dlt.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
  return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))