Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Important
La update_flow API está en versión preliminar pública.
Use el @dp.update_flow decorador para crear un flujo de actualización. Actualice los flujos de escritura en receptores mediante el modo de salida de actualización, emitiendo solo las filas que cambiaron en cada lote. A diferencia de los flujos de anexión, admiten agregaciones con estado sin necesidad de una marca de agua.
Los flujos de actualización solo pueden dirigirse a receptores. No se admiten tablas delta.
Syntax
from pyspark import pipelines as dp
dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})
@dp.update_flow(
target = "<sink-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
comment = "<comment>", # optional
import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
return (<streaming-query>)
Parameters
| Parámetro | Tipo | Description |
|---|---|---|
| function | function |
Required. Función que devuelve un DataFrame de streaming de Apache Spark desde una consulta definida por el usuario. |
target |
str |
Required. Nombre del receptor en el que escribe este flujo. |
name |
str |
Nombre del flujo. Si no se proporciona, el valor predeterminado es el nombre de la función. |
comment |
str |
Descripción del flujo. |
spark_conf |
dict |
Un dict de configuraciones de Spark para la ejecución de esta consulta. Estas configuraciones invalidan los confs establecidos para el destino, la canalización o el clúster. |
import_checkpoint |
str |
Ruta de acceso de punto de control externo que se va a importar antes de iniciar el flujo. Se importa solo una vez, cuando el directorio de punto de control del flujo aún no existe. |
Ejemplos
Agregación a un receptor de Kafka
Escriba resultados de agregación con estado en un receptor de Kafka:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type")
.groupBy(col("event_type"))
.count()
)
Modo en tiempo real
Use spark_conf para configurar un flujo de actualización para el modo en tiempo real:
from pyspark import pipelines as dp
dp.create_sink("my_kafka_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
)
Limitaciones
- Los receptores de tabla delta no se admiten como destinos para los flujos de actualización.