Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
El @dlt.append_flow
decorador crea flujos de anexión o reposición para las tablas de canalizaciones declarativas de Lakeflow. La función debe devolver un dataframe de streaming de Apache Spark. Consulte Carga y procesamiento de datos de forma incremental con flujos de canalizaciones declarativas de Lakeflow.
Los flujos de anexión pueden estar dirigidos a tablas o receptores de streaming.
Sintaxis
import dlt
dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming query>)
Parámetros
Parámetro | Tipo | Descripción |
---|---|---|
función | function |
Obligatorio. Función que devuelve un DataFrame de streaming de Apache Spark desde una consulta definida por el usuario. |
target |
str |
Obligatorio. Nombre de la tabla o receptor que es el destino del flujo de anexión. |
name |
str |
El nombre del flujo. Si no se proporciona, el valor predeterminado es el nombre de la función. |
comment |
str |
Descripción del flujo. |
spark_conf |
dict |
Lista de configuraciones de Spark para la ejecución de esta consulta |
Ejemplos
import dlt
# Create a sink for an external Delta table
dlt.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})
# Add an append flow to an external Delta table
@dlt.append_flow(name = "flow", target = "my_sink")
def flowFunc():
return <streaming-query>
# Create a Kafka sink
dlt.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Add an append flow to a Kafka sink
@dlt.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))