Compartilhar via


Usar ForEachBatch para gravar em coletores de dados arbitrários em pipelines

Importante

A foreach_batch_sink API está em Visualização Pública.

O coletor ForEachBatch permite que você processe um fluxo como uma série de microlotes. Cada lote pode ser processado em Python com lógica personalizada semelhante à do foreachBatchApache Spark Structured Streaming. Com o coletor do Lakeflow Spark Declarative Pipelines (SDP) ForEachBatch, você pode transformar, mesclar ou gravar dados de streaming em um ou mais destinos que não dão suporte nativo a gravações de streaming. Esta página orienta você pela configuração de um coletor ForEachBatch, fornece exemplos e discute as principais considerações.

O coletor ForEachBatch fornece a seguinte funcionalidade:

  • Lógica personalizada para cada microlote: ForEachBatch é um coletor de streaming flexível. Você pode aplicar ações arbitrárias (como mesclar em uma tabela externa, gravar em vários destinos ou executar upserts) com código Python.
  • Suporte completo à atualização: os pipelines gerenciam pontos de verificação por fluxo, portanto, os pontos de verificação são redefinidos automaticamente quando você executa uma atualização completa do pipeline. Com o coletor ForEachBatch, você é responsável por gerenciar a redefinição de dados downstream quando isso acontece.
  • Suporte ao Catálogo do Unity: o coletor ForEachBatch dá suporte a todos os recursos do Catálogo do Unity, como ler ou gravar em volumes ou tabelas do Catálogo do Unity.
  • Limpeza limitada: o pipeline não rastreia quais dados são gravados de um coletor ForEachBatch, portanto, não é possível limpar esses dados. Você é responsável por qualquer gerenciamento de dados downstream.
  • Entradas de log de eventos: o log de eventos do pipeline registra a criação e o uso de cada coletor ForEachBatch. Se sua função Python não for serializável, você verá uma entrada de aviso no log de eventos com sugestões adicionais.

Observação

  • O coletor ForEachBatch foi projetado para consultas de streaming, como append_flow. Ele não se destina exclusivamente a pipelines em lote ou a semânticas AutoCDC.
  • O coletor ForEachBatch descrito nesta página é para pipelines. Apache Spark Structured Streaming também oferece suporte a foreachBatch. Para obter informações sobre o Streaming foreachBatchEstruturado, consulte Use foreachBatch para gravar em coletores de dados arbitrários.

Quando usar um coletor ForEachBatch

Use um coletor ForEachBatch sempre que o pipeline exigir funcionalidade que não esteja disponível por meio de um formato de coletor interno, como delta, ou kafka. Casos de uso típicos incluem:

  • Mesclar ou fazer upserting em uma tabela Delta Lake: Executa a lógica de mesclagem personalizada para cada microlote (por exemplo, lidando com registros atualizados).
  • Gravação em vários destinos ou sem suporte: grave a saída de cada lote em várias tabelas ou sistemas de armazenamento externos que não dão suporte a gravações de streaming (como determinados coletores JDBC).
  • Aplicando lógicas ou transformações personalizadas: manipule dados diretamente no Python (por exemplo, usando bibliotecas especializadas ou transformações avançadas).

Para obter informações sobre os coletores internos ou a criação de coletores personalizados com Python, consulte Sinks in Lakeflow Spark Declarative Pipelines.

Sintaxe

Use a @dp.foreach_batch_sink() decoração para gerar um coletor ForEachBatch. Em seguida, você pode referenciar isso como um target na sua definição de fluxo, por exemplo, em @dp.append_flow.

from pyspark import pipelines as dp

@dp.foreach_batch_sink(name="<name>")
def batch_handler(df, batch_id):
    """
    Required:
      - `df`: a Spark DataFrame representing the rows of this micro-batch.
      - `batch_id`: unique integer ID for each micro-batch in the query.
    """
    # Your custom write or transformation logic here
    # Example:
    # df.write.format("some-target-system").save("...")
    #
    # To access the sparkSession inside the batch handler, use df.sparkSession.
Parâmetro Description
name Optional. Um nome exclusivo para identificar o coletor dentro do canal de processamento. O padrão é o nome da UDF, caso não seja incluído.
batch_handler Essa é a UDF (função definida pelo usuário) que será chamada para cada microlote.
Df DataFrame do Spark que contém dados para o microlote atual.
batch_id O ID inteiro do micro-lote. O Spark incrementa essa ID para cada intervalo de gatilho.
Um batch_id de 0 representa o início de um fluxo ou o início de um refresh completo. O foreach_batch_sink código deve lidar corretamente com uma atualização completa para fontes de dados downstream. Consulte a próxima seção para obter mais informações.

Atualização completa

Como ForEachBatch usa uma consulta de streaming, o pipeline rastreia o diretório de ponto de verificação para cada fluxo. Durante atualização completa:

  • O diretório de ponto de verificação é reinicializado.
  • Sua função de sink (foreach_batch_sink UDF) detecta um novo ciclo batch_id começando do 0.
  • Os dados em seu sistema de destino não são limpos automaticamente pelo pipeline (porque o pipeline não sabe onde seus dados são gravados). Se você precisar de um cenário de ardósia limpa, deverá remover ou truncar manualmente as tabelas ou locais externos que o coletor ForEachBatch preenche.

Usando recursos do Catálogo do Unity

Todos os recursos existentes do Unity Catalog no Spark Streaming Estruturado permanecem disponíveis.

Isso inclui escrever em tabelas gerenciadas ou externas do Unity Catalog. Você pode gravar micro-batches em tabelas gerenciadas ou externas do Unity Catalog exatamente como faria em qualquer trabalho de Structured Streaming do Apache Spark.

Entradas de log de eventos

Quando você cria um coletor ForEachBatch, um evento SinkDefinition, com "format": "foreachBatch", é adicionado ao log de eventos do pipeline.

Isso permite que você acompanhe o uso dos sinks ForEachBatch e veja avisos sobre o seu sink.

Utilizando o Databricks Connect

Se a função fornecida não for serializável (um requisito importante para o Databricks Connect), o log de eventos incluirá uma WARN entrada recomendando que você simplifique ou refatore seu código se o suporte do Databricks Connect for necessário.

Por exemplo, se você usar dbutils para obter parâmetros em um UDF ForEachBatch, poderá obter o argumento antes de usá-lo na UDF:

# Instead of accessing parameters within the UDF...
def foreach_batch(df, batchId):
  value = dbutils.widgets.get ("X") + str (i)

# ...get the parameters first, and use them within the UDF:
argX = dbutils.widgets.get ("X")

def foreach_batch(df, batchId):
  value = argX + str (i)

Práticas recomendadas

  1. Mantenha sua função ForEachBatch concisa: evite threading, dependências de biblioteca pesadas ou grandes manipulações de dados na memória. Lógica complexa ou lógica com manutenção de estado pode levar a falhas de serialização ou gargalos de desempenho.
  2. Monitore sua pasta de ponto de verificação: para consultas de streaming, o SDP gerencia pontos de verificação por fluxo, não por coletor. Se você tiver vários fluxos em seu pipeline, cada fluxo terá seu próprio diretório de ponto de verificação.
  3. Validar dependências externas: se você depender de sistemas ou bibliotecas externas, verifique se elas estão instaladas em todos os nós de cluster ou em seu contêiner.
  4. Fique atento ao Databricks Connect: se o ambiente puder ser transferido para o Databricks Connect no futuro, verifique se o código é serializável e não depende de dbutils dentro da foreach_batch_sink UDF.

Limitações

  • Nenhuma limpeza para ForEachBatch: como seu código Python personalizado pode gravar dados em qualquer lugar, o pipeline não pode limpar ou acompanhar esses dados. Você deve lidar com suas próprias políticas de gerenciamento de dados ou retenção para os destinos para os quais você escreve.
  • Métricas em microlote: Os Pipelines coletam métricas de streaming, mas alguns cenários podem resultar em métricas incompletas ou incomuns ao usar o ForEachBatch. Isso ocorre devido à flexibilidade subjacente do ForEachBatch, o que dificulta o controle de fluxo de dados e linhas para o sistema.
  • Suporte à gravação em múltiplos destinos sem múltiplas leituras: alguns clientes utilizam ForEachBatch para realizar a leitura de uma fonte uma única vez e gravar em diversos destinos. Para fazer isso, você deve incluir df.persist ou df.cache dentro de sua função ForEachBatch. Usando essas opções, o Azure Databricks tentará preparar os dados apenas uma única vez. Sem essas opções, sua consulta resultará em várias leituras. Isso não está incluído nos exemplos de código a seguir.
  • Usando com Databricks Connect: Se o pipeline for executado no Databricks Connect, as funções definidas pelo usuário (UDF) deverão ser serializáveis e não poderão usar dbutils. O pipeline gera avisos se detectar uma UDF (Função Definida pelo Usuário) não serializável, mas não ocasiona falha no pipeline.
  • Lógica não serializável: o código que faz referência a objetos locais, classes ou recursos não selecionáveis pode ser interrompido nos contextos do Databricks Connect. Use módulos Python puros e confirme se as referências (por exemplo, dbutils) não serão usadas se o Databricks Connect é um requisito.

Exemplos

Exemplo de sintaxe básica

from pyspark import pipelines as dp

# Create a ForEachBatch sink
@dp.foreach_batch_sink(name = "my_foreachbatch_sink")
def feb_sink(df, batch_id):
  # Custom logic here. You can perform merges,
  # write to multiple destinations, etc.
  return

# Create source data for example:
@dp.table()
def example_source_data():
  return spark.range(5)

# Add sink to an append flow:
@dp.append_flow(
    target="my_foreachbatch_sink",
)
def my_flow():
  return spark.readStream.format("delta").table("example_source_data")

Usando dados de exemplo para um pipeline simples

Este exemplo usa o exemplo de Táxi de NYC. Ele pressupõe que o administrador do workspace habilitou o catálogo de conjuntos de dados públicos do Databricks. Para o coletor, altere my_catalog.my_schema para um catálogo e um esquema ao qual você tem acesso.

from pyspark import pipelines as dp
from pyspark.sql.functions import current_timestamp

# Create foreachBatch sink
@dp.foreach_batch_sink(name = "my_foreach_sink")
def my_foreach_sink(df, batch_id):
    # Custom logic here. You can perform merges,
    # write to multiple destinations, etc.
    # For this example, we are adding a timestamp column.
    enriched = df.withColumn("processed_timestamp", current_timestamp())
    # Write to a Delta location
    enriched.write \
      .format("delta") \
      .mode("append") \
      .saveAsTable("my_catalog.my_schema.trips_sink_delta")
    # Return is optional here, but generally not used for the sink
    return

# Create an append flow that reads sample data,
# and sends it to the ForEachBatch sink
@dp.append_flow(
    target="my_foreach_sink",
)
def taxi_source():
  df = spark.readStream.table("samples.nyctaxi.trips")
  return df

Escrevendo para múltiplos destinos

Este exemplo escreve em vários destinos. Ele demonstra como usar txnVersion e txnAppId para tornar escritas idempotentes em tabelas Delta Lake. Para obter detalhes, consulte gravações de tabela Idempotente em foreachBatch.

Suponha que estejamos gravando em duas tabelas, table_a e table_b, e suponhamos que dentro de um lote a gravação em table_a seja bem-sucedida enquanto a gravação em table_b falha. Quando o lote for executado novamente, o par (txnVersion, txnAppId) permitirá que o Delta ignore a gravação duplicada de table_a e escreva apenas o lote em table_b.

from pyspark import pipelines as dp

app_id = "my-app-name" # different applications that write to the same table should have unique txnAppId

# Create the ForEachBatch sink
@dp.foreach_batch_sink(name="user_events_feb")
def user_events_handler(df, batch_id):
    # Optionally do transformations, logging, or merging logic
    # ...

    # Write to a Delta table
    df.write \
     .format("delta") \
     .mode("append") \
     .option("txnVersion", batch_id) \
     .option("txnAppId", app_id) \
     .saveAsTable("my_catalog.my_schema.example_table_1")

    # Also write to a JSON file location
    df.write \
      .format("json") \
      .mode("append") \
      .option("txnVersion", batch_id) \
      .option("txnAppId", app_id) \
      .save("/tmp/json_target")
    return

# Create source data for example
@dp.table()
def example_source():
  return spark.range(5)


# Create the append flow, and target the ForEachBatch sink
@dp.append_flow(target="user_events_feb", name="user_events_flow")
def read_user_events():
    return spark.readStream.format("delta").table("example_source")

Usando spark.sql()

Você pode usar spark.sql() no coletor ForEachBatch, como no exemplo a seguir.

from pyspark import pipelines as dp
from pyspark.sql import Row

@dp.foreach_batch_sink(name = "example_sink")
def feb_sink(df, batch_id):
  df.createOrReplaceTempView("df_view")
  df.sparkSession.sql("MERGE INTO target_table AS tgt " +
            "USING df_view AS src ON tgt.id = src.id " +
            "WHEN MATCHED THEN UPDATE SET tgt.id = src.id * 10 " +
            "WHEN NOT MATCHED THEN INSERT (id) VALUES (id)"
          )
  return

# Create target delta table
spark.range(5).write.format("delta").mode("overwrite").saveAsTable("target_table")

# Create source table
@dp.table()
def src_table():
  return spark.range(5)

@dp.append_flow(
    target="example_sink",
)
def example_flow():
  return spark.readStream.format("delta").table("source_table")

Perguntas frequentes (FAQ)

Posso usar dbutils no meu coletor ForEachBatch?

Se você planeja executar o pipeline em um ambiente que não seja do Databricks Connect, dbutils pode funcionar. No entanto, se você usar o Databricks Connect, dbutils não estará acessível em sua foreachBatch função. O pipeline poderá gerar avisos se detectar o uso de dbutils para ajudá-lo a evitar interrupções.

Posso usar vários fluxos com um único coletor ForEachBatch?

Sim. Você pode definir vários fluxos (com @dp.append_flow) que todos visam o mesmo nome do coletor, mas cada um deles mantém seus próprios pontos de verificação.

O pipeline gerencia a retenção ou a limpeza de dados para o meu alvo?

Não. Como o coletor ForEachBatch pode gravar em qualquer local ou sistema arbitrário, o pipeline não pode gerenciar ou excluir dados automaticamente nesse destino. Você deve lidar com essas operações como parte de seu código personalizado ou processos externos.

Como solucionar problemas de erros ou falhas de serialização na minha função ForEachBatch?

Verifique os logs do driver de cluster ou os logs de eventos do pipeline. Para problemas de serialização relacionados ao Spark Connect, verifique se sua função depende apenas de objetos Python serializáveis e não faz referência a objetos não permitidos (como identificadores de arquivo abertos ou dbutils).