Usar foreachBatch para gravar em coletores de dados arbitrários
Este artigo discute o uso de foreachBatch
com Structured Streaming para gravar a saída de uma consulta de streaming nas fontes de dados que não têm um coletor de streaming existente.
O padrão do código streamingDF.writeStream.foreachBatch(...)
permite que você aplique funções nos lote aos dados de saída de cada microlote da consulta de streaming. As funções usadas com foreachBatch
usam dois parâmetros:
- Um DataFrame que tem os dados de saída de um microlote.
- A ID exclusiva do microlote.
Você deve usar foreachBatch
para operações de mesclagem do Delta Lake no Streaming Estruturado. Confira Executar upsert de consultas de streaming usando foreachBatch.
Aplicar operações adicionais de DataFrame
Muitas operações de DataFrame e Dataset não têm suporte em DataFrames de streaming porque o Spark não oferece suporte à geração de planos incrementais nesses casos. Usando foreachBatch()
, você pode aplicar algumas dessas operações em cada saída de micro lote. Por exemplo, você pode usar foreachBath()
e a operação SQL MERGE INTO
para gravar a saída de agregações de streaming em uma tabela Delta no modo Atualização. Veja mais detalhes em MERGE INTO.
Importante
foreachBatch()
fornece apenas garantias de gravação pelo menos uma vez. No entanto, você pode usar obatchId
fornecido para a função como uma forma de eliminar a duplicação da saída e obter uma garantia exatamente uma vez. Em ambos os casos, você terá que usar a semântica de ponta a ponta por conta própria.foreachBatch()
não funciona com o modo de processamento contínuo, pois ele depende fundamentalmente da execução de uma consulta de streaming no micro lote. Se você gravar dados no modo contínuo, useforeach()
em seu lugar.
Um dataframe vazio pode ser invocado com foreachBatch()
e o código do usuário precisa ser resiliente para permitir a operação adequada. Um exemplo é mostrado aqui:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Mudanças de comportamento para foreachBatch
no Databricks Runtime 14.0
No Databricks Runtime 14.0 e superior na computação configurada com o modo de acesso compartilhado, as seguintes alterações de comportamento se aplicam:
print()
comandos gravam a saída nos logs do driver.- Você não pode acessar o submódulo
dbutils.widgets
dentro da função. - Quaisquer arquivos, módulos ou objetos referenciados na função devem ser serializáveis e disponíveis no Spark.
Reutilizar fontes de dados do lote existentes
Usando foreachBatch()
, você pode usar os gravadores de dados nos lotes existentes nos coletores de dados que podem não ter suporte do Streaming Estruturado. Veja alguns exemplos:
Muitas outras fontes de dados em lotes podem ser usadas em foreachBatch()
. Confira Conectar-se a fontes de dados.
Gravar em vários locais
Se você precisar gravar a saída de uma consulta de streaming em vários locais, o Databricks recomendará o uso de vários gravadores de Streaming Estruturado para melhor paralelização e taxa de transferência.
Usar foreachBatch
para gravar em vários coletores serializa a execução das gravações de streaming, o que pode aumentar a latência para cada microlote.
Se você usar foreachBatch
para gravar em várias tabelas Delta, consulte Gravações de tabela idempotentes em foreachBatch.