Compartilhar via


Pontos de verificação de Streaming Estruturado

Os pontos de verificação e os logs de registro antecipado trabalham juntos para fornecer garantias de processamento para aplicações de streaming estruturado. O ponto de verificação rastreia as informações que identificam a consulta, incluindo informações de estado e registros processados. Quando você exclui os arquivos em um diretório de ponto de verificação ou altera para um novo local de ponto de verificação, a próxima execução da consulta começa do zero.

Cada consulta deve ter um local de ponto de verificação diferente. Múltiplas consultas nunca devem compartilhar o mesmo local.

Habilitar pontos de verificação para consultas de Streaming Estruturado

Você deve especificar a opção checkpointLocation antes de executar uma consulta de streaming, como no exemplo a seguir:

Python

(df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")
)

Scala

df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")

Observação

Alguns destinos, como a saída para display() em notebooks e o destino memory, geram automaticamente um local de ponto de verificação temporário se você omitir essa configuração. Os locais de ponto de verificação temporários não garantem nenhuma tolerância a falhas ou consistência de dados e podem não ser limpos corretamente. A Databricks recomenda sempre especificar um local de ponto de verificação para esses sinks.

Recuperação após alterações em uma consulta de Streaming Estruturado

Há limitações sobre quais alterações são permitidas em uma consulta de streaming entre as reinicializações do mesmo local de ponto de verificação. Aqui estão algumas alterações que não são permitidas ou o efeito da alteração não está bem definido. Para todos eles:

  • O termo permitido significa que você pode fazer a alteração especificada, mas se a semântica de seu efeito é bem definida depende da consulta e da alteração.
  • O termo não permitido significa que você não deve fazer a alteração especificada, pois a consulta reiniciada provavelmente falhará com erros imprevisíveis.
  • sdf representa um DataFrame de streaming/Conjunto de dados gerado com sparkSession.readStream.

Tipos de alterações em consultas do Fluxo Estruturado

  • Alterações no número ou tipo (ou seja, fonte diferente) de fontes de entrada: isso não é permitido.

  • Alterações nos parâmetros das fontes de entrada: se isso é permitido e se a semântica da alteração é bem definida depende da origem e da consulta, incluindo controles de admissão como maxFilesPerTrigger ou maxOffsetsPerTrigger. Veja alguns exemplos.

    • Adição, exclusão e modificação de limites de taxa são permitidos:

      spark.readStream.format("kafka").option("subscribe", "article")
      

      para

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      
    • As alterações nos artigos e arquivos assinados geralmente não são permitidas, pois os resultados são imprevisíveis: spark.readStream.format("kafka").option("subscribe", "article") para spark.readStream.format("kafka").option("subscribe", "newarticle")

  • Alterações no intervalo de gatilho: você pode alterar gatilhos entre lotes incrementais e intervalos de tempo. Veja Alterar intervalos de gatilho entre execuções.

  • Alterações no tipo de coletor de saída: são permitidas alterações entre algumas combinações específicas de coletores. Isso precisa ser verificado em uma base caso a caso. Veja alguns exemplos.

    • É permitido usar o destino de arquivo para o destino Kafka. O Kafka verá apenas os novos dados.
    • O coletor de Kafka para o coletor de arquivos não é permitido.
    • O coletor Kafka pode ser alterado para "foreach" ou vice-versa.
  • Alterações nos parâmetros do coletor de saída: se isso é permitido e se a semântica da alteração é bem definida depende do coletor e da consulta. Veja alguns exemplos.

    • As alterações no diretório de saída de um coletor de arquivos não são permitidas: sdf.writeStream.format("parquet").option("path", "/somePath") para sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • É permitido alterar o tópico de saída: sdf.writeStream.format("kafka").option("topic", "topic1") para sdf.writeStream.format("kafka").option("topic", "topic2")
    • Alterações no coletor foreach definido pelo usuário (ou seja, o código ForeachWriter) são permitidas, mas a semântica da alteração depende do próprio código.
  • Alterações em operações tipo projeção/filtro/mapa: em alguns casos, são permitidas. Por exemplo:

    • Adição/exclusão de filtros é permitido: sdf.selectExpr("a") para sdf.where(...).selectExpr("a").filter(...).
    • As alterações nas projeções com o mesmo esquema de saída são permitidas: sdf.selectExpr("stringColumn AS json").writeStream para sdf.select(to_json(...).as("json")).writeStream.
    • As alterações nas projeções com um esquema de saída diferente são condicionalmente permitidas: sdf.selectExpr("a").writeStream a sdf.selectExpr("b").writeStream é permitida somente se o coletor de saída permitir que o esquema mude de "a" para "b".
  • Alterações em operações com estado: algumas operações em consultas de streaming precisam manter dados de estado para atualizar continuamente o resultado. O Fluxo estruturado automaticamente verifica os dados de estado no armazenamento tolerante a falhas (por exemplo, DBFS, Armazenamento de BLOBs do Azure) e restaura-os após a reinicialização. No entanto, isso pressupõe que o esquema dos dados de estado permaneça o mesmo entre as reinicializações. Isso significa que quaisquer alterações (isto é, adições, exclusões ou modificações de esquema) para as operações com estado de uma consulta de streaming não são permitidas entre as reinicializações. Aqui está a lista de operações com estado cujo esquema não deve ser alterado entre reinicializações para garantir a recuperação de estado:

    • Agregação de streaming: por exemplo, sdf.groupBy("a").agg(...). Qualquer alteração no número ou tipo de chaves ou agregações de agrupamento não é permitida.
    • Eliminação de duplicação de streaming: por exemplo, sdf.dropDuplicates("a"). Qualquer alteração no número ou tipo de chaves ou agregações de agrupamento não é permitida.
    • Junção de fluxo-fluxo: por exemplo, sdf1.join(sdf2, ...) (ou seja, ambas as entradas são geradas com sparkSession.readStream ). Alterações no esquema ou nas colunas de equijunção não são permitidas. Não são permitidas alterações no tipo de junção (externa ou interna). Outras alterações na condição de junção são mal definidas.
    • Operação com estado arbitrário: por exemplo, sdf.groupByKey(...).mapGroupsWithState(...) ou sdf.groupByKey(...).flatMapGroupsWithState(...). Qualquer alteração no esquema do estado definido pelo usuário e no tipo de tempo limite não é permitida. Qualquer alteração na função de mapeamento de estado definida pelo usuário é permitida, mas o efeito semântico da alteração depende da lógica definida pelo usuário. Se você realmente quiser dar suporte a alterações de esquema de estado, poderá codificar/decodificar explicitamente suas estruturas de dados de estado complexo em bytes usando um esquema de codificação/decodificação que dá suporte à migração de esquema. Por exemplo, se você salvar seu estado como bytes codificados em Avro, será possível alterar o esquema de estado Avro entre as reinicializações de consulta, pois isso restaura o estado binário.

Importante

Os operadores dropDuplicates() e dropDuplicatesWithinWatermark() com estado podem falhar ao reiniciar devido a uma verificação de compatibilidade do esquema de estado ao alterar entre os modos de acesso de computação.

A alteração entre modos de acesso dedicados e sem isolamento é permitida. A alteração entre os modos de acesso padrão e sem servidor é permitida. Não tente alterar entre outras combinações de modo de acesso.

Para evitar esse erro, não altere o modo de acesso de computação para consultas de streaming que contêm esses operadores.