Usar foreachBatch para gravar em coletores de dados arbitrários

Este artigo discute o uso de foreachBatch com Structured Streaming para gravar a saída de uma consulta de streaming nas fontes de dados que não têm um coletor de streaming existente.

O padrão do código streamingDF.writeStream.foreachBatch(...) permite que você aplique funções nos lote aos dados de saída de cada microlote da consulta de streaming. As funções usadas com foreachBatch usam dois parâmetros:

  • Um DataFrame que tem os dados de saída de um microlote.
  • A ID exclusiva do microlote.

Você deve usar foreachBatch para operações de mesclagem do Delta Lake no Streaming Estruturado. Confira Executar upsert de consultas de streaming usando foreachBatch.

Aplicar operações adicionais de DataFrame

Muitas operações de DataFrame e Dataset não têm suporte em DataFrames de streaming porque o Spark não oferece suporte à geração de planos incrementais nesses casos. Usando foreachBatch(), você pode aplicar algumas dessas operações em cada saída de micro lote. Por exemplo, você pode usar foreachBath() e a operação SQL MERGE INTO para gravar a saída de agregações de streaming em uma tabela Delta no modo Atualização. Veja mais detalhes em MERGE INTO.

Importante

  • foreachBatch() fornece apenas garantias de gravação pelo menos uma vez. No entanto, você pode usar o batchId fornecido para a função como uma forma de eliminar a duplicação da saída e obter uma garantia exatamente uma vez. Em ambos os casos, você terá que usar a semântica de ponta a ponta por conta própria.
  • foreachBatch() não funciona com o modo de processamento contínuo, pois ele depende fundamentalmente da execução de uma consulta de streaming no micro lote. Se você gravar dados no modo contínuo, use foreach() em seu lugar.

Um dataframe vazio pode ser invocado com foreachBatch() e o código do usuário precisa ser resiliente para permitir a operação adequada. Um exemplo é mostrado aqui:

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Mudanças de comportamento para foreachBatch no Databricks Runtime 14.0

No Databricks Runtime 14.0 e superior no cálculo configurado com modo de acesso partilhado, forEachBatch é executado num processo Python isolado separado no Apache Spark, em vez de no ambiente REPL. Ele é serializado e enviado ao Spark e não tem acesso a objetos globais spark durante a sessão.

Em todas as outras configurações de computação, foreachBatch é executado no mesmo Python REPL que executa o restante do seu código. Como resultado, a função não é serializada.

Ao usar o Databricks Runtime 14.0 e superior em computação configurada com modo de acesso compartilhado, você deve usar a variável sparkSession com escopo no DataFrame local ao usar foreachBatch em Python, como no exemplo de código a seguir:

def example_function(df, batch_id):
  df.sparkSession.sql("<query>")

As seguintes alterações de comportamento se aplicam:

  • Você não pode acessar nenhuma variável global do Python de dentro da sua função.
  • print() comandos gravam a saída nos logs do driver.
  • Quaisquer arquivos, módulos ou objetos referenciados na função devem ser serializáveis e disponíveis no Spark.

Reutilizar fontes de dados do lote existentes

Usando foreachBatch(), você pode usar os gravadores de dados nos lotes existentes nos coletores de dados que podem não ter suporte do Streaming Estruturado. Veja alguns exemplos:

Muitas outras fontes de dados em lotes podem ser usadas em foreachBatch(). Confira Conectar-se a fontes de dados.

Gravar em vários locais

Se você precisar gravar a saída de uma consulta de streaming em vários locais, o Databricks recomendará o uso de vários gravadores de Streaming Estruturado para melhor paralelização e taxa de transferência.

Usar foreachBatch para gravar em vários coletores serializa a execução das gravações de streaming, o que pode aumentar a latência para cada microlote.

Se você usar foreachBatch para gravar em várias tabelas Delta, consulte Gravações de tabela idempotentes em foreachBatch.