Considerações de produção para o Streaming Estruturado
Este artigo contém recomendações para agendar cargas de trabalho de Fluxo Estruturado usando trabalhos no Azure Databricks.
O Databricks recomenda sempre fazer o seguinte:
- Remova o código desnecessário de notebooks que retornariam resultados, como
display
ecount
. - Não execute cargas de trabalho de Fluxo Estruturado usando computação para todas as finalidades. Sempre agende fluxos como trabalhos usando a computação de trabalhos.
- Agendar trabalhos usando o modo
Continuous
. - Não habilite o dimensionamento automático para computação para trabalhos de Fluxo Estruturado.
Algumas cargas de trabalho se beneficiam do seguinte:
- Configurar o repositório de estado do RocksDB no Azure Databricks
- Ponto de verificação de estado assíncrono para consultas com estado
- O que é o acompanhamento de progresso assíncrono?
O Azure Databricks introduziu as Tabelas Dinâmicas Delta para reduzir as complexidades do gerenciamento da infraestrutura de produção para cargas de trabalho de Streaming Estruturado. O Databricks recomenda o uso das Tabelas Dinâmicas Delta para novos pipelines de Fluxo Estruturado. ConsulteO que é o Delta Live Tables?.
Observação
O dimensionamento automático de computação tem limitação ao reduzir o tamanho do cluster para cargas de Fluxo Estruturado. O Databricks recomenda usar o Delta Live Tables com o Dimensionamento Automático Aprimorado para cargas de trabalho de fluxo. Consulte Otimizar a utilização do cluster de pipelines do Delta Live Tables com dimensionamento automático aprimorado.
Projetar cargas de trabalho de streaming para esperar falhas
A Databricks recomenda sempre configurar os trabalhos de streaming para reiniciar automaticamente em caso de falha. Algumas funcionalidades, incluindo a evolução do esquema, pressupõem que as cargas de trabalho do Structured Streaming sejam configuradas para tentar novamente automaticamente. Confira Configurar trabalhos do Fluxo Estruturado para reiniciar as consultas do fluxo em caso de falha.
Algumas operações como foreachBatch
fornecer pelo menos uma vez, em vez de exatamente uma vez. Para essas operações, você deve fazer com que seu pipeline de processamento seja idempotente. Confira Usar foreachBatch para gravar em coletores de dados arbitrários.
Observação
Quando uma consulta é reiniciada, o microlote planejado durante a execução anterior é processado. Se o trabalho falhou devido a um erro de falta de memória ou você cancelou manualmente um trabalho devido a um microlote superdimensionado, talvez seja necessário escalar verticalmente a computação para processar o microlote com êxito.
Se você alterar as configurações entre as execuções, essas configurações se aplicarão ao primeiro novo lote planejado. Consulte Recuperar-se após alterações em uma consulta do Fluxo Estruturado.
Quando um trabalho é repetido?
Você pode agendar várias tarefas como parte de um trabalho do Azure Databricks. Ao configurar um trabalho usando o gatilho contínuo, você não pode definir dependências entre tarefas.
Você pode optar por planejar vários fluxos em um único trabalho usando uma das seguintes abordagens:
- Várias tarefas: defina um trabalho com várias tarefas que executam cargas de trabalho de streaming usando o gatilho contínuo.
- Várias consultas: defina várias consultas de streaming no código-fonte para uma única tarefa.
Você também pode combinar essas estratégias. A tabela a seguir compara essas abordagens.
Várias tarefas | Várias consultas | |
---|---|---|
Como a computação é compartilhada? | O Databricks recomenda a implantação de trabalhos com o tamanho adequado para cada tarefa de streaming. Opcionalmente, você pode compartilhar a computação entre tarefas. | Todas as consultas compartilham a mesma computação. Você pode atribuir opcionalmente consultas a pools de agendadores. |
Como as novas tentativas são tratadas? | Todas as tarefas devem falhar antes que o trabalho seja repetido. | A tarefa será repetida se alguma consulta falhar. |
Configurar trabalhos do Fluxo Estruturado para reiniciar as consultas do fluxo em caso de falha
O Databricks recomenda configurar todas as cargas de trabalho de streaming usando o gatilho contínuo. Consulte Executar trabalhos continuamente.
O gatilho contínuo fornece o seguinte comportamento por padrão:
- Impede mais de uma execução simultânea do trabalho.
- Inicia uma nova execução quando uma execução anterior falha.
- Usa recuo exponencial para novas tentativas.
O Databricks recomenda sempre usar a computação de trabalhos em vez da computação para todas as finalidades ao agendar fluxos de trabalho. Em caso de falha e repetição do trabalho, novos recursos de computação são implantados.
Observação
Você não precisa usar streamingQuery.awaitTermination()
ou spark.streams.awaitAnyTermination()
. Os trabalhos impedem que uma execução seja concluída automaticamente quando uma consulta de streaming estiver ativa.
Usaros pools de agendador para várias consultas de streaming
Você pode configurar pools de agendamento para atribuir capacidade computacional a consultas ao executar várias consultas de streaming do mesmo código-fonte.
Por padrão, todas as consultas iniciadas em um bloco de anotações são executadas no mesmo pool de agendamento justo. Os trabalhos do Apache Spark gerados por gatilhos de todas as consultas de streaming em um notebook são executados um após o outro na ordem PEPS (primeiro a entrar, primeiro a sair). Isso pode causar atrasos desnecessários nas consultas, pois eles não compartilham com eficiência os recursos do cluster.
Os pools de agendador permitem declarar quais consultas de Streaming Estruturado compartilham recursos de computação.
O exemplo a seguir atribui query1
a um pool dedicado, e query2
e query3
compartilham um pool de agendador.
# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")
# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")
# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")
Observação
A configuração da propriedade local deve estar na mesma célula do notebook em que você inicia a consulta de streaming.
Confira a documentação do Agendador justo do Apache para obter mais detalhes.