update_flow

Importante

A update_flow API está em Visualização Pública.

Use o @dp.update_flow decorador para criar um fluxo de atualizações. Os fluxos de atualização escrevem para os sinks usando o modo de saída de atualização, emitindo apenas as linhas que mudaram em cada lote. Ao contrário dos fluxos anexos, suportam agregações com estado sem necessidade de marca de água.

Os fluxos de atualização só podem atingir os sumidouros. As tabelas Delta não são suportadas.

Syntax

from pyspark import pipelines as dp

dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})

@dp.update_flow(
  target = "<sink-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
  comment = "<comment>", # optional
  import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
  return (<streaming-query>)

Parameters

Parâmetro Tipo Description
função function Required. Uma função que retorna um DataFrame de streaming Apache Spark de uma consulta definida pelo usuário.
target str Required. O nome do sumidouro onde este fluxo escreve.
name str O nome do fluxo. Se não for fornecido, o padrão será o nome da função.
comment str Uma descrição para o fluxo.
spark_conf dict Um ditado das configurações do Spark para a execução desta consulta. Estas configurações sobrepõem as conferências definidas para o destino, pipeline ou cluster.
import_checkpoint str Um caminho de checkpoint externo para importar antes de iniciar o fluxo. Importado apenas uma vez, quando o diretório de checkpoint do fluxo ainda não existe.

Exemplos

Agregação a um sumidouro de Kafka

Escreva resultados de agregação com estado num sumidouro de Kafka:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="event_counts_flow",
    target="event_counts_sink",
)
def event_counts():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .selectExpr("CAST(key AS STRING) AS event_type")
            .groupBy(col("event_type"))
            .count()
    )

Modo em tempo real

Use spark_conf para configurar um fluxo de atualização para o modo em tempo real:

from pyspark import pipelines as dp

dp.create_sink("my_kafka_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="my_rtm_flow",
    target="my_kafka_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def my_real_time_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
    )

Limitações

  • Os sumidouros de tabela Delta não são suportados como alvos para fluxos de atualização.