Partilhar via


Recuperar de falhas de consulta do Streaming Estruturado com fluxos de trabalho

O Streaming Estruturado fornece tolerância a falhas e consistência de dados para consultas de streaming; usando fluxos de trabalho do Azure Databricks, você pode configurar facilmente suas consultas de Streaming Estruturado para reiniciar automaticamente em caso de falha. Ao habilitar o ponto de verificação para uma consulta de streaming, você pode reiniciar a consulta após uma falha. A consulta reiniciada continua de onde a falha parou.

Habilitar o ponto de verificação para consultas de Streaming Estruturado

O Databricks recomenda que você sempre especifique a opção de um caminho de armazenamento em nuvem antes de iniciar a checkpointLocation consulta. Por exemplo:

streamingDataFrame.writeStream
  .format("parquet")
  .option("path", "/path/to/table")
  .option("checkpointLocation", "/path/to/table/_checkpoint")
  .start()

Este local de ponto de verificação preserva todas as informações essenciais que identificam uma consulta. Cada consulta deve ter um local de ponto de verificação diferente. Várias consultas nunca devem ter o mesmo local. Para obter mais informações, consulte o Guia de programação de streaming estruturado.

Nota

Embora checkpointLocation seja necessário para a maioria dos tipos de coletores de saída, alguns coletores, como o coletor de memória, podem gerar automaticamente um local de ponto de verificação temporário quando você não fornece checkpointLocation. Esses locais de pontos de verificação temporários não garantem tolerância a falhas ou consistência de dados e podem não ser limpos corretamente. Evite possíveis armadilhas especificando sempre um checkpointLocationarquivo .

Configurar trabalhos de Streaming Estruturado para reiniciar consultas de streaming em caso de falha

Você pode criar um trabalho do Azure Databricks com o bloco de anotações ou JAR que tem suas consultas de streaming e configurá-lo para:

  • Use sempre um novo cluster.
  • Tente sempre novamente em caso de falha.

A reinicialização automática em caso de falha de trabalho é especialmente importante ao configurar cargas de trabalho de streaming com evolução de esquema. A evolução do esquema funciona no Azure Databricks gerando um erro esperado quando uma alteração de esquema é detetada e, em seguida, processando corretamente os dados usando o novo esquema quando o trabalho é reiniciado. O Databricks recomenda sempre configurar tarefas de streaming que contenham consultas com evolução de esquema para reiniciar automaticamente nos fluxos de trabalho do Databricks.

Os trabalhos têm integração total com APIs de Streaming Estruturado e podem monitorar todas as consultas de streaming ativas em uma execução. Essa configuração garante que, se qualquer parte da consulta falhar, os trabalhos encerrarão automaticamente a execução (junto com todas as outras consultas) e iniciarão uma nova execução em um novo cluster. Isso executa novamente o bloco de anotações ou o código JAR e reinicia todas as consultas novamente. Esta é a forma mais segura de regressar a um bom estado.

Nota

  • A falha em qualquer uma das consultas de streaming ativas faz com que a execução ativa falhe e encerre todas as outras consultas de streaming.
  • Você não precisa usar streamingQuery.awaitTermination() ou spark.streams.awaitAnyTermination() no final do seu notebook. Os trabalhos impedem automaticamente que uma execução seja concluída quando uma consulta de streaming está ativa.
  • O Databricks recomenda o uso de trabalhos em vez de e dbutils.notebook.run() ao orquestrar blocos de anotações de %run Streaming Estruturado. Consulte Executar um bloco de notas Databricks a partir de outro bloco de notas.

Segue-se um exemplo de uma configuração de trabalho recomendada.

  • Cluster: defina isso sempre para usar um novo cluster e usar a versão mais recente do Spark (ou pelo menos a versão 2.1). As consultas iniciadas no Spark 2.1 e superior são recuperáveis após consultas e atualizações de versão do Spark.
  • Notificações: defina isso se quiser receber notificações por e-mail sobre falhas.
  • Horário: Não defina um horário.
  • Tempo limite: não defina um tempo limite. As consultas de streaming são executadas por um tempo indefinidamente longo.
  • Máximo de execuções simultâneas: definido como 1. Deve haver apenas uma instância de cada consulta ativa simultaneamente.
  • Tentativas: Defina como Ilimitado.

Consulte Criar e executar trabalhos do Azure Databricks para entender essas configurações.

Recuperar após alterações em uma consulta de Streaming Estruturado

Há limitações sobre quais alterações em uma consulta de streaming são permitidas entre reinicializações do mesmo local de ponto de verificação. Aqui estão algumas alterações que não são permitidas ou o efeito da alteração não está bem definido. Para todos eles:

  • O termo permitido significa que você pode fazer a alteração especificada, mas se a semântica de seu efeito está bem definida depende da consulta e da alteração.
  • O termo não permitido significa que você não deve fazer a alteração especificada, pois a consulta reiniciada provavelmente falhará com erros imprevisíveis.
  • sdf representa um DataFrame/Dataset de streaming gerado com sparkSession.readStream.

Tipos de alterações em consultas de Streaming Estruturado

  • Alterações no número ou tipo (ou seja, fonte diferente) de fontes de entrada: Isso não é permitido.
  • Alterações nos parâmetros das fontes de entrada: se isso é permitido e se a semântica da alteração está bem definida depende da fonte e da consulta. Eis alguns exemplos.
    • É permitida a adição, supressão e modificação dos limites tarifários:

      spark.readStream.format("kafka").option("subscribe", "article")
      

      para

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      
    • Alterações em artigos e arquivos inscritos geralmente não são permitidas, pois os resultados são imprevisíveis: spark.readStream.format("kafka").option("subscribe", "article") até spark.readStream.format("kafka").option("subscribe", "newarticle")

  • Alterações no intervalo de gatilho: você pode alterar gatilhos entre lotes incrementais e intervalos de tempo. Consulte Alterando intervalos de gatilho entre execuções.
  • Alterações no tipo de dissipador de saída: Alterações entre algumas combinações específicas de dissipadores são permitidas. Isto tem de ser verificado caso a caso. Eis alguns exemplos.
    • É permitido o coletor de arquivos para a pia de Kafka. Kafka verá apenas os novos dados.
    • Kafka sink to file sink não é permitido.
    • Kafka pia mudou para foreach, ou vice-versa é permitido.
  • Alterações nos parâmetros do coletor de saída: se isso é permitido e se a semântica da alteração está bem definida depende do coletor e da consulta. Eis alguns exemplos.
    • Não são permitidas alterações no diretório de saída de um coletor de arquivos: sdf.writeStream.format("parquet").option("path", "/somePath") até sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • Alterações no tópico de saída são permitidas: sdf.writeStream.format("kafka").option("topic", "topic1") para sdf.writeStream.format("kafka").option("topic", "topic2")
    • Alterações no coletor foreach definido pelo usuário (ou seja, o ForeachWriter código) são permitidas, mas a semântica da alteração depende do código.
  • Alterações nas operações de projeção/filtro/mapa: Alguns casos são permitidos. Por exemplo:
    • É permitida a adição/supressão de filtros: sdf.selectExpr("a") a sdf.where(...).selectExpr("a").filter(...).
    • Alterações nas projeções com o mesmo esquema de saída são permitidas: sdf.selectExpr("stringColumn AS json").writeStream a sdf.select(to_json(...).as("json")).writeStream.
    • Alterações em projeções com esquema de saída diferente são condicionalmente permitidas: sdf.selectExpr("a").writeStream to sdf.selectExpr("b").writeStream é permitido somente se o coletor de saída permitir que o esquema mude de "a" para "b".
  • Alterações em operações com monitoração de estado: algumas operações em consultas de streaming precisam manter dados de estado para atualizar continuamente o resultado. O Streaming Estruturado verifica automaticamente os dados de estado para armazenamento tolerante a falhas (por exemplo, DBFS, armazenamento de Blob do Azure) e os restaura após a reinicialização. No entanto, isso pressupõe que o esquema dos dados de estado permaneça o mesmo nas reinicializações. Isso significa que quaisquer alterações (ou seja, adições, exclusões ou modificações de esquema) nas operações com monitoração de estado de uma consulta de streaming não são permitidas entre reinicializações. Aqui está a lista de operações com monitoração de estado cujo esquema não deve ser alterado entre reinicializações para garantir a recuperação de estado:
    • Agregação de streaming: Por exemplo, sdf.groupBy("a").agg(...). Não é permitida qualquer alteração no número ou tipo de chaves de agrupamento ou agregados.
    • Desduplicação de streaming: Por exemplo, sdf.dropDuplicates("a"). Não é permitida qualquer alteração no número ou tipo de chaves de agrupamento ou agregados.
    • Junção stream-stream: Por exemplo, (ou seja, sdf1.join(sdf2, ...) ambas as entradas são geradas com sparkSession.readStream). Não são permitidas alterações no esquema ou nas colunas de equijunção. Não são permitidas alterações no tipo de junção (exterior ou interior). Outras alterações na condição de junção são mal definidas.
    • Operação com estado arbitrária: Por exemplo, sdf.groupByKey(...).mapGroupsWithState(...) ou sdf.groupByKey(...).flatMapGroupsWithState(...). Qualquer alteração no esquema do estado definido pelo usuário e no tipo de tempo limite não é permitida. Qualquer alteração dentro da função de mapeamento de estado definida pelo usuário é permitida, mas o efeito semântico da alteração depende da lógica definida pelo usuário. Se você realmente quiser oferecer suporte a alterações de esquema de estado, poderá codificar/decodificar explicitamente suas estruturas de dados de estado complexas em bytes usando um esquema de codificação/decodificação que ofereça suporte à migração de esquema. Por exemplo, se você salvar seu estado como bytes codificados em Avro, poderá alterar o esquema Avro-state-schema entre as reinicializações da consulta, pois isso restaura o estado binário.