Partilhar via


Utilize o ForEachBatch para escrever em destinos de dados arbitrários em pipelines

Importante

A foreach_batch_sink API está em Visualização Pública.

O sumidouro ForEachBatch permite-lhe processar um fluxo como uma série de micro-lotes. Cada lote pode ser processado em Python com lógica personalizada semelhante à foreachBatchdo Apache Spark Structured Streaming. Com o sumidouro Lakeflow Spark Declarative Pipelines (SDP) ForEachBatch, pode transformar, fundir ou escrever dados em streaming para um ou mais alvos que não suportam nativamente escritas em streaming. Esta página guia-o na configuração de um sink ForEachBatch, fornece exemplos e discute as principais considerações.

O sumidouro ForEachBatch fornece a seguinte funcionalidade:

  • Lógica personalizada para cada micro-batch: ForEachBatch é um sink de streaming flexível. Pode aplicar ações arbitrárias (como fundir numa tabela externa, escrever para múltiplos destinos ou realizar upserts) com código Python.
  • Suporte total de atualização: Os pipelines gerem checkpoints por fluxo, por isso os checkpoints são reiniciados automaticamente quando realiza uma atualização completa do pipeline. Com o ForEachBatch sink, és responsável por gerir o reset de dados a jusante quando isso acontece.
  • Suporte ao Unity Catalog: O sink ForEachBatch suporta todas as funcionalidades do Unity Catalog, como ler ou escrever em volumes ou tabelas do Unity Catalog.
  • Gestão limitada: O pipeline não acompanha os dados que são escritos a partir de um destino ForEachBatch, e por isso não tem a capacidade de limpar esses dados. É responsável por qualquer gestão de dados subsequente.
  • Entradas do registo de eventos: O registo de eventos do pipeline regista a criação e utilização de cada sink ForEachBatch. Se a sua função Python não for serializável, verá uma entrada de aviso no registo de eventos com sugestões adicionais.

Observação

  • O ForEachBatch sink foi concebido para consultas de streaming, como append_flow. Não se destina a pipelines em lote nem a AutoCDC semânticas.
  • O sumidouro ForEachBatch descrito nesta página é para oleodutos. O Apache Spark Structured Streaming também suporta foreachBatch. Para informações sobre o Structured Streaming foreachBatch, consulte Use foreachBatch para escrever em sumidouros de dados arbitrários.

Quando usar um lavatório ForEachBatch

Use um destino ForEachBatch sempre que o seu pipeline necessitar de funcionalidades que não estejam disponíveis através dos formatos de sink incorporados, como delta, ou kafka. Os casos de uso típicos incluem:

  • Mesclar ou inserir/atualizar numa tabela Delta Lake: Execute lógica de mesclagem personalizada para cada micro-batch (por exemplo, gerenciar registos atualizados).
  • Escrita para múltiplos destinos ou destinos não suportados: Escreva a saída de cada lote em múltiplas tabelas ou sistemas de armazenamento externos que não suportem gravações em streaming (como certos sinks JDBC).
  • Aplicar lógica personalizada ou transformações: Manipular dados diretamente em Python (por exemplo, usando bibliotecas especializadas ou transformações avançadas).

Para obter informações sobre os recetores incorporados ou sobre a criação de recetores personalizados com Python, veja Recetores em Pipelines Declarativos Lakeflow Spark.

Sintaxe

Use a @dp.foreach_batch_sink() decoração para gerar um pia ForEachBatch. Pode então referenciar isto como a target na sua definição de fluxo, por exemplo em @dp.append_flow.

from pyspark import pipelines as dp

@dp.foreach_batch_sink(name="<name>")
def batch_handler(df, batch_id):
    """
    Required:
      - `df`: a Spark DataFrame representing the rows of this micro-batch.
      - `batch_id`: unique integer ID for each micro-batch in the query.
    """
    # Your custom write or transformation logic here
    # Example:
    # df.write.format("some-target-system").save("...")
    #
    # To access the sparkSession inside the batch handler, use df.sparkSession.
Parâmetro Description
name Opcional. Um nome único para identificar o sumidouro dentro do oleoduto. Por defeito, o nome da UDF é usado quando não está incluído.
batch_handler Esta é a função definida pelo utilizador (UDF) que será chamada para cada micro-batch.
df Spark DataFrame contendo dados para o micro-lote atual.
batch_id O ID inteiro do micro-lote. O Spark incrementa este ID para cada intervalo de disparo.
Um batch_id de 0 representa o início de uma transmissão, ou o início de uma atualização completa. O foreach_batch_sink código deve gerir corretamente uma atualização total das fontes de dados a jusante. Consulte a secção seguinte para mais informações.

Atualização completa

Como o ForEachBatch utiliza uma consulta de streaming, o pipeline rastreia o diretório de checkpoints de cada fluxo. Em atualização completa:

  • O diretório do checkpoint é restabelecido.
  • A sua função sink (foreach_batch_sink UDF) deteta o início de um novo batch_id ciclo a partir de 0.
  • Os dados no seu sistema de destino não são automaticamente limpos pelo pipeline (porque o pipeline não sabe onde os seus dados são escritos). Se precisar de um cenário em branco, deve eliminar ou truncar manualmente as tabelas ou locais externos afetados pelo processo ForEachBatch.

Utilização das funcionalidades do Unity Catalog

Todas as capacidades existentes do Catálogo Unity no Spark Structured Streaming foreach_batch_sink continuam disponíveis.

Isto inclui escrever em tabelas geridas ou externas do Unity Catalog. Podes escrever micro-batches em tabelas geridas pelo Unity Catalog ou externas exatamente como farias em qualquer trabalho de Streaming Estruturado do Apache Spark.

Entradas do registo de eventos

Quando cria um sumidouro ForEachBatch, um SinkDefinition evento com "format": "foreachBatch" é adicionado ao registo de eventos do pipeline.

Isto permite-lhe acompanhar a utilização dos sumidouros ForEachBatch e ver avisos sobre o seu lavabo.

Utilização com Databricks Connect

Se a função que fornece não for serializável (um requisito importante para o Databricks Connect), o registo de eventos inclui uma WARN entrada que recomenda que simplifique ou refatore o seu código caso seja necessário suporte ao Databricks Connect.

Por exemplo, se utilizar dbutils para obter parâmetros dentro de um UDF ForEachBatch, pode primeiro obter o argumento antes de o utilizar no UDF:

# Instead of accessing parameters within the UDF...
def foreach_batch(df, batchId):
  value = dbutils.widgets.get ("X") + str (i)

# ...get the parameters first, and use them within the UDF:
argX = dbutils.widgets.get ("X")

def foreach_batch(df, batchId):
  value = argX + str (i)

Melhores práticas

  1. Mantenha a sua função ForEachBatch concisa: Evite threading, dependências pesadas de bibliotecas ou grandes manipulações de dados em memória. A lógica complexa ou a lógica com estado pode levar a erros de serialização ou a gargalos de desempenho.
  2. Monitore a sua pasta de checkpoints: Para consultas de streaming, o SDP gere os checkpoints por fluxo, não por destino. Se tiver vários fluxos na sua pipeline, cada fluxo tem o seu próprio diretório de checkpoint.
  3. Valide dependências externas: Se depender de sistemas ou bibliotecas externas, verifique se estão instalados em todos os nós do cluster ou no seu contentor.
  4. Tenha atenção ao Databricks Connect: Caso o seu ambiente venha a migrar para o Databricks Connect no futuro, assegure-se de que o seu código é serializável e não depende de nada dentro do UDF foreach_batch_sink.

Limitações

  • Sem manutenção para ForEachBatch: Como o seu código Python personalizado pode escrever dados em qualquer lugar, o pipeline não pode limpar ou rastrear esses dados. Deve gerir as suas próprias políticas de gestão ou retenção de dados para os destinos onde escreve.
  • Métricas em micro-batch: Os pipelines recolhem métricas de streaming, mas alguns cenários podem causar métricas incompletas ou anómalas ao usar o ForEachBatch. Isto deve-se à flexibilidade subjacente do ForEachBatch, que dificulta o acompanhamento do fluxo de dados e das linhas para o sistema.
  • Suporte à escrita para múltiplos destinos sem múltiplas leituras: Alguns clientes podem usar o ForEachBatch para ler de uma fonte uma vez e depois escrever para múltiplos destinos. Para tal, deve incluir df.persist ou df.cache dentro da sua função ForEachBatch. Usando estas opções, o Azure Databricks tentará preparar os dados apenas uma vez. Sem estas opções, a sua consulta resultará em múltiplas leituras. Isto não está incluído nos seguintes exemplos de código.
  • Usar com Databricks Connect: Se o seu pipeline correr no Databricks Connect, foreachBatch as funções definidas pelo utilizador (UDF) devem ser serializáveis e não podem usar dbutils. O pipeline gera avisos se detectar um UDF não serializável, mas não falha o pipeline.
  • Lógica não serializável: O código que referencia objetos locais, classes ou recursos não serializáveis pode falhar em contextos do Databricks Connect. Use módulos Python puros e confirme que referências (por exemplo, dbutils) não são usadas, se o Databricks Connect é um requisito.

Examples

Exemplo de sintaxe básica

from pyspark import pipelines as dp

# Create a ForEachBatch sink
@dp.foreach_batch_sink(name = "my_foreachbatch_sink")
def feb_sink(df, batch_id):
  # Custom logic here. You can perform merges,
  # write to multiple destinations, etc.
  return

# Create source data for example:
@dp.table()
def example_source_data():
  return spark.range(5)

# Add sink to an append flow:
@dp.append_flow(
    target="my_foreachbatch_sink",
)
def my_flow():
  return spark.readStream.format("delta").table("example_source_data")

Utilização de dados de exemplo para um pipeline simples

Este exemplo utiliza o exemplo do NYC Taxi. Assume que o administrador do seu espaço de trabalho ativou o catálogo Databricks Public Datasets. Para o lavadouro, muda my_catalog.my_schema para um catálogo e esquema a que tenhas acesso.

from pyspark import pipelines as dp
from pyspark.sql.functions import current_timestamp

# Create foreachBatch sink
@dp.foreach_batch_sink(name = "my_foreach_sink")
def my_foreach_sink(df, batch_id):
    # Custom logic here. You can perform merges,
    # write to multiple destinations, etc.
    # For this example, we are adding a timestamp column.
    enriched = df.withColumn("processed_timestamp", current_timestamp())
    # Write to a Delta location
    enriched.write \
      .format("delta") \
      .mode("append") \
      .saveAsTable("my_catalog.my_schema.trips_sink_delta")
    # Return is optional here, but generally not used for the sink
    return

# Create an append flow that reads sample data,
# and sends it to the ForEachBatch sink
@dp.append_flow(
    target="my_foreach_sink",
)
def taxi_source():
  df = spark.readStream.table("samples.nyctaxi.trips")
  return df

Escrever para múltiplos destinos

Este exemplo escreve para múltiplos destinos. Demonstra-se o uso de txnVersion e txnAppId para tornar as operações de escrita em tabelas Delta Lake idempotentes. Para mais detalhes, veja Tabela idempotente escreve em foreachBatch.

Suponha que estamos a escrever para duas tabelas, table_a e table_b, e suponha que dentro de um lote, a escrita para table_a tem sucesso enquanto a escrita para table_b falha. Quando o lote é reexecutado, o par (txnVersion, txnAppId) permitirá que Delta ignore a escrita duplicada em table_a, e apenas escreva o lote em table_b.

from pyspark import pipelines as dp

app_id = "my-app-name" # different applications that write to the same table should have unique txnAppId

# Create the ForEachBatch sink
@dp.foreach_batch_sink(name="user_events_feb")
def user_events_handler(df, batch_id):
    # Optionally do transformations, logging, or merging logic
    # ...

    # Write to a Delta table
    df.write \
     .format("delta") \
     .mode("append") \
     .option("txnVersion", batch_id) \
     .option("txnAppId", app_id) \
     .saveAsTable("my_catalog.my_schema.example_table_1")

    # Also write to a JSON file location
    df.write \
      .format("json") \
      .mode("append") \
      .option("txnVersion", batch_id) \
      .option("txnAppId", app_id) \
      .save("/tmp/json_target")
    return

# Create source data for example
@dp.table()
def example_source():
  return spark.range(5)


# Create the append flow, and target the ForEachBatch sink
@dp.append_flow(target="user_events_feb", name="user_events_flow")
def read_user_events():
    return spark.readStream.format("delta").table("example_source")

Usando spark.sql()

Podes usar spark.sql() no teu lavatório ForEachBatch, como no exemplo seguinte.

from pyspark import pipelines as dp
from pyspark.sql import Row

@dp.foreach_batch_sink(name = "example_sink")
def feb_sink(df, batch_id):
  df.createOrReplaceTempView("df_view")
  df.sparkSession.sql("MERGE INTO target_table AS tgt " +
            "USING df_view AS src ON tgt.id = src.id " +
            "WHEN MATCHED THEN UPDATE SET tgt.id = src.id * 10 " +
            "WHEN NOT MATCHED THEN INSERT (id) VALUES (id)"
          )
  return

# Create target delta table
spark.range(5).write.format("delta").mode("overwrite").saveAsTable("target_table")

# Create source table
@dp.table()
def src_table():
  return spark.range(5)

@dp.append_flow(
    target="example_sink",
)
def example_flow():
  return spark.readStream.format("delta").table("source_table")

Perguntas frequentes (FAQ)

Posso usar dbutils na minha pia ForEachBatch?

Se planeia correr o seu pipeline num ambiente Connect que não seja Databricks, dbutils pode funcionar. No entanto, se usar Databricks Connect, dbutils não está acessível dentro da sua foreachBatch função. O pipeline pode levantar alertas se detetar a utilização de dbutils de forma a ajudar a evitar falhas.

Posso usar múltiplos fluxos de dados com um único destino ForEachBatch?

Yes. Pode definir múltiplos fluxos (com @dp.append_flow) que têm como alvo o mesmo nome de sumidouro, mas cada um mantém os seus próprios checkpoints.

O pipeline trata da retenção de dados ou da limpeza para o meu alvo?

Não. Como o sumidouro ForEachBatch pode escrever em qualquer local ou sistema arbitrário, o pipeline não pode gerir ou eliminar automaticamente dados nesse destino. Deve tratar destas operações como parte do seu código personalizado ou processos externos.

Como posso resolver erros de serialização ou falhas na minha função ForEachBatch?

Veja os registos dos controladores do seu cluster ou dos eventos do pipeline. Para questões de serialização relacionadas com o Spark Connect, verifique se a sua função depende apenas de objetos Python serializáveis e não faz referência a objetos não permitidos (como handles de ficheiros abertos ou dbutils).