Compartilhar via


Usar foreachBatch para gravar em coletores de dados arbitrários

Este artigo discute o uso de foreachBatch com Structured Streaming para gravar a saída de uma consulta de streaming nas fontes de dados que não têm um coletor de streaming existente.

O padrão do código streamingDF.writeStream.foreachBatch(...) permite que você aplique funções nos lote aos dados de saída de cada microlote da consulta de streaming. As funções usadas com foreachBatch usam dois parâmetros:

  • Um DataFrame que tem os dados de saída de um microlote.
  • A ID exclusiva do microlote.

Você deve usar foreachBatch para operações de mesclagem do Delta Lake no Streaming Estruturado. Confira Executar upsert de consultas de streaming usando foreachBatch.

Aplicar operações adicionais de DataFrame

Muitas operações de DataFrame e Dataset não têm suporte em DataFrames de streaming porque o Spark não oferece suporte à geração de planos incrementais nesses casos. Usando foreachBatch(), você pode aplicar algumas dessas operações em cada saída de micro lote. Por exemplo, você pode usar foreachBath() e a operação SQL MERGE INTO para gravar a saída de agregações de streaming em uma tabela Delta no modo Atualização. Veja mais detalhes em MERGE INTO.

Importante

  • foreachBatch() fornece apenas garantias de gravação pelo menos uma vez. No entanto, você pode usar o batchId fornecido para a função como uma forma de eliminar a duplicação da saída e obter uma garantia exatamente uma vez. Em ambos os casos, você terá que usar a semântica de ponta a ponta por conta própria.
  • foreachBatch() não funciona com o modo de processamento contínuo, pois ele depende fundamentalmente da execução de uma consulta de streaming no micro lote. Se você gravar dados no modo contínuo, use foreach() em seu lugar.

Um dataframe vazio pode ser invocado com foreachBatch() e o código do usuário precisa ser resiliente para permitir a operação adequada. Um exemplo é mostrado aqui:

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Mudanças de comportamento para foreachBatch no Databricks Runtime 14.0

No Databricks Runtime 14.0 e superior na computação configurada com o modo de acesso compartilhado, as seguintes alterações de comportamento se aplicam:

  • print() comandos gravam a saída nos logs do driver.
  • Você não pode acessar o submódulo dbutils.widgets dentro da função.
  • Quaisquer arquivos, módulos ou objetos referenciados na função devem ser serializáveis e disponíveis no Spark.

Reutilizar fontes de dados do lote existentes

Usando foreachBatch(), você pode usar os gravadores de dados nos lotes existentes nos coletores de dados que podem não ter suporte do Streaming Estruturado. Veja alguns exemplos:

Muitas outras fontes de dados em lotes podem ser usadas em foreachBatch(). Confira Conectar-se a fontes de dados.

Gravar em vários locais

Se você precisar gravar a saída de uma consulta de streaming em vários locais, o Databricks recomendará o uso de vários gravadores de Streaming Estruturado para melhor paralelização e taxa de transferência.

Usar foreachBatch para gravar em vários coletores serializa a execução das gravações de streaming, o que pode aumentar a latência para cada microlote.

Se você usar foreachBatch para gravar em várias tabelas Delta, consulte Gravações de tabela idempotentes em foreachBatch.