Usar foreachBatch para gravar em coletores de dados arbitrários
Este artigo discute o uso de foreachBatch
com Structured Streaming para gravar a saída de uma consulta de streaming nas fontes de dados que não têm um coletor de streaming existente.
O padrão do código streamingDF.writeStream.foreachBatch(...)
permite que você aplique funções nos lote aos dados de saída de cada microlote da consulta de streaming. As funções usadas com foreachBatch
usam dois parâmetros:
- Um DataFrame que tem os dados de saída de um microlote.
- A ID exclusiva do microlote.
Você deve usar foreachBatch
para operações de mesclagem do Delta Lake no Streaming Estruturado. Confira Executar upsert de consultas de streaming usando foreachBatch.
Aplicar operações adicionais de DataFrame
Muitas operações de DataFrame e Dataset não têm suporte em DataFrames de streaming porque o Spark não oferece suporte à geração de planos incrementais nesses casos. Usando foreachBatch()
, você pode aplicar algumas dessas operações em cada saída de micro lote. Por exemplo, você pode usar foreachBath()
e a operação SQL MERGE INTO
para gravar a saída de agregações de streaming em uma tabela Delta no modo Atualização. Veja mais detalhes em MERGE INTO.
Importante
foreachBatch()
fornece apenas garantias de gravação pelo menos uma vez. No entanto, você pode usar obatchId
fornecido para a função como uma forma de eliminar a duplicação da saída e obter uma garantia exatamente uma vez. Em ambos os casos, você terá que usar a semântica de ponta a ponta por conta própria.foreachBatch()
não funciona com o modo de processamento contínuo, pois ele depende fundamentalmente da execução de uma consulta de streaming no micro lote. Se você gravar dados no modo contínuo, useforeach()
em seu lugar.
Um dataframe vazio pode ser invocado com foreachBatch()
e o código do usuário precisa ser resiliente para permitir a operação adequada. Um exemplo é mostrado aqui:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Mudanças de comportamento para foreachBatch
no Databricks Runtime 14.0
No Databricks Runtime 14.0 e superior no cálculo configurado com modo de acesso partilhado, forEachBatch
é executado num processo Python isolado separado no Apache Spark, em vez de no ambiente REPL. Ele é serializado e enviado ao Spark e não tem acesso a objetos globais spark
durante a sessão.
Em todas as outras configurações de computação, foreachBatch
é executado no mesmo Python REPL que executa o restante do seu código. Como resultado, a função não é serializada.
Ao usar o Databricks Runtime 14.0 e superior em computação configurada com modo de acesso compartilhado, você deve usar a variável sparkSession
com escopo no DataFrame local ao usar foreachBatch
em Python, como no exemplo de código a seguir:
def example_function(df, batch_id):
df.sparkSession.sql("<query>")
As seguintes alterações de comportamento se aplicam:
- Você não pode acessar nenhuma variável global do Python de dentro da sua função.
print()
comandos gravam a saída nos logs do driver.- Quaisquer arquivos, módulos ou objetos referenciados na função devem ser serializáveis e disponíveis no Spark.
Reutilizar fontes de dados do lote existentes
Usando foreachBatch()
, você pode usar os gravadores de dados nos lotes existentes nos coletores de dados que podem não ter suporte do Streaming Estruturado. Veja alguns exemplos:
Muitas outras fontes de dados em lotes podem ser usadas em foreachBatch()
. Confira Conectar-se a fontes de dados.
Gravar em vários locais
Se você precisar gravar a saída de uma consulta de streaming em vários locais, o Databricks recomendará o uso de vários gravadores de Streaming Estruturado para melhor paralelização e taxa de transferência.
Usar foreachBatch
para gravar em vários coletores serializa a execução das gravações de streaming, o que pode aumentar a latência para cada microlote.
Se você usar foreachBatch
para gravar em várias tabelas Delta, consulte Gravações de tabela idempotentes em foreachBatch.