Use foreachBatch para gravar em coletores de dados arbitrários
Este artigo discute o uso foreachBatch
com Streaming estruturado para gravar a saída de uma consulta de streaming em fontes de dados que não têm um coletor de streaming existente.
O padrão streamingDF.writeStream.foreachBatch(...)
de código permite que você aplique funções de lote aos dados de saída de cada microlote da consulta de streaming. Funções usadas com foreachBatch
tomar dois parâmetros:
- Um DataFrame que tem os dados de saída de um microlote.
- A ID exclusiva do microlote.
Você deve usar foreachBatch
para operações de mesclagem Delta Lake no Structured Streaming. Consulte Upsert de consultas de streaming usando foreachBatch.
Aplicar operações adicionais do DataFrame
Muitas operações DataFrame e Dataset não são suportadas no streaming de DataFrames porque o Spark não oferece suporte à geração de planos incrementais nesses casos. Usando foreachBatch()
você pode aplicar algumas dessas operações em cada saída de microlote. Por exemplo, você pode usar foreachBath()
e a operação SQL MERGE INTO
para gravar a saída de agregações de streaming em uma tabela Delta no modo de atualização. Veja mais detalhes em MERGE INTO.
Importante
foreachBatch()
fornece apenas garantias de escrita pelo menos uma vez. No entanto, você pode usar obatchId
fornecido para a função como forma de desduplicar a saída e obter uma garantia exata uma vez. Em ambos os casos, você mesmo terá que raciocinar sobre a semântica de ponta a ponta.foreachBatch()
não funciona com o modo de processamento contínuo, pois depende fundamentalmente da execução em microlote de uma consulta de streaming. Se você gravar dados no modo contínuo, useforeach()
em vez disso.
Um dataframe vazio pode ser invocado e foreachBatch()
o código do usuário precisa ser resiliente para permitir a operação adequada. Pode ver um exemplo aqui:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Alterações de comportamento para foreachBatch
no Databricks Runtime 14.0
No Databricks Runtime 14.0 e superior na computação configurada com o modo de acesso compartilhado, as seguintes alterações de comportamento se aplicam:
print()
comandos gravam a saída nos logs do driver.- Não é possível acessar o
dbutils.widgets
submódulo dentro da função. - Todos os arquivos, módulos ou objetos referenciados na função devem ser serializáveis e estar disponíveis no Spark.
Reutilizar fontes de dados em lote existentes
Usando foreachBatch()
o , você pode usar gravadores de dados em lote existentes para coletores de dados que podem não ter suporte a Streaming Estruturado. Eis alguns exemplos:
Muitas outras fontes de dados em lote podem ser usadas a partir de foreachBatch()
. Consulte Conectar-se a fontes de dados.
Gravar em vários locais
Se você precisar gravar a saída de uma consulta de streaming em vários locais, o Databricks recomenda o uso de vários gravadores de Streaming Estruturado para melhor paralelização e taxa de transferência.
O uso foreachBatch
para gravar em vários coletores serializa a execução de gravações de streaming, o que pode aumentar a latência para cada microlote.
Se você usar foreachBatch
para gravar em várias tabelas Delta, consulte Idempotent tabela escreve em foreachBatch.