Pontos de verificação de streaming estruturado

Pontos de verificação e registos de antecipação de escrita trabalham juntos para fornecer garantias de processamento para cargas de trabalho de Transmissão Estruturada. O ponto de verificação rastreia as informações que identificam a consulta, incluindo informações de estado e registros processados. Quando se apaga os arquivos num diretório de checkpoint ou se altera para um novo local de checkpoint, a próxima execução da consulta inicia-se do zero.

Um diretório de checkpoints contém o seguinte:

  • Deslocamentos: Os deslocamentos de origem processados em cada micro-lote. Isto permite que a consulta retome exatamente de onde ficou sem reprocessar os dados.
  • Commits: Um registo de quais micro-lotes foram enviados para o lava-loiça, permitindo uma semântica exata.
  • Estado: Para consultas com estado (agregações, joins fluxo-fluxo, deduplicação e operadores com estado personalizados como transformWithState), o ponto de controlo armazena metadados sobre o operador com estado, o esquema de estados e o conteúdo armazenado por estados do checkpoint gerido pelo fornecedor do armazenamento de estado.
  • Metadados: O ID único da consulta usado para identificar a consulta. As definições de configuração são armazenadas como parte do log de deslocamento.

Cada consulta deve ter um local de ponto de verificação diferente. Várias consultas não devem em hipótese alguma compartilhar a mesma localização.

Nota

Este artigo aborda os checkpoints de Streaming Estruturado para consultas em streaming. Para informações sobre a utilização DataFrame.checkpoint() dos volumes do Unity Catalog para truncar planos de execução de DataFrames não em streaming, consulte Checkpoints de DataFrame em volumes.

Habilitar o ponto de verificação para consultas de Streaming Estruturado

Você deve especificar a checkpointLocation opção antes de executar uma consulta de streaming, como no exemplo a seguir:

Python

(df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")
)

Scala

df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")

Nota

Alguns destinos, como a saída de display() nos notebooks e o destino memory, geram automaticamente um local de ponto de verificação temporário se você omitir essa opção. Esses locais de ponto de verificação temporários não garantem tolerância a falhas ou consistência de dados e podem não ser limpos corretamente. A Databricks recomenda sempre especificar um local de ponto de verificação para esses coletores.

Recuperar após alterações em uma consulta de Streaming Estruturado

Há limitações sobre quais alterações em uma consulta de streaming são permitidas entre reinicializações do mesmo local de ponto de verificação.

As alterações que geralmente exigem um novo checkpoint incluem o número ou tipo de fontes de entrada, tópicos de Kafka subscritos ou caminhos do Auto Loader, tipos de operações com estado, esquema de estado e tipo de sumidouro de saída.

Alterações geralmente seguras incluem adicionar ou remover filtros, alterar limites de taxa, intervalos de disparo e atualizar a lógica de funções definida pelo utilizador ( mapGroupsWithState embora a semântica possa mudar).

A secção seguinte descreve alterações que não são permitidas ou que o efeito da alteração não está bem definido, onde:

  • O termo permitido significa que você pode fazer a alteração especificada, mas se a semântica de seu efeito está bem definida depende da consulta e da alteração.
  • O termo não permitido significa que você não deve fazer a alteração especificada, pois a consulta reiniciada provavelmente falhará com erros imprevisíveis.
  • sdf representa um DataFrame/Dataset de streaming gerado com sparkSession.readStream.

Tipos de alterações em consultas de Streaming Estruturado

  • Alterações no número ou tipo de fontes de entrada: Isto não é permitido por defeito porque o Structured Streaming identifica as fontes pela sua posição no plano de consulta. Se ativares a nomenclatura das fontes, podes reordenar as fontes existentes e adicionar novas fontes sem teres de começar a partir de um ponto de controlo novo. Veja Alterar fontes de streaming com evolução da fonte.

  • Alterações nos parâmetros das fontes de entrada: Se isto é permitido e se a semântica da alteração está bem definida depende da fonte e da consulta, incluindo controlos de admissão como maxFilesPerTrigger ou maxOffsetsPerTrigger. Eis alguns exemplos:

    • É permitida a adição, supressão e modificação dos limites tarifários:

      spark.readStream.format("kafka").option("subscribe", "article")
      

      para

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      

      Para mais detalhes, consulte Configurar o tamanho do lote de Structured Streaming no Azure Databricks

    • Alterações em artigos e arquivos inscritos geralmente não são permitidas, pois os resultados são imprevisíveis: spark.readStream.format("kafka").option("subscribe", "article") até spark.readStream.format("kafka").option("subscribe", "newarticle")

  • Alterações no intervalo de gatilho: você pode alterar gatilhos entre lotes incrementais e intervalos de tempo. Consulte Alterar intervalos de gatilho entre execuções.

  • Alterações no tipo de dissipador de saída: Alterações entre algumas combinações específicas de dissipadores são permitidas. Isto tem de ser verificado caso a caso. Eis alguns exemplos.

    • É permitido o uso de um sink de ficheiro para um sink de Kafka. Kafka verá apenas os novos dados.
    • Kafka sink para file sink não é permitido.
    • Kafka destino mudou para foreach, ou vice-versa, é permitido.
  • Alterações nos parâmetros do coletor de saída: se isso é permitido e se a semântica da alteração está bem definida depende do coletor e da consulta. Eis alguns exemplos.

    • Não são permitidas alterações no diretório de saída de um coletor de arquivos: sdf.writeStream.format("parquet").option("path", "/somePath") até sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • Alterações no tópico de saída são permitidas: sdf.writeStream.format("kafka").option("topic", "topic1") para sdf.writeStream.format("kafka").option("topic", "topic2")
    • Alterações no sumidouro foreach definido pelo usuário (ou seja, o código ForeachWriter) são permitidas, mas a semântica das alterações depende do código.
  • Alterações nas operações de projeção/filtro/tipo mapa: Alguns casos são permitidos. Por exemplo:

    • É permitida a adição/supressão de filtros: sdf.selectExpr("a") a sdf.where(...).selectExpr("a").filter(...).
    • Alterações nas projeções com o mesmo esquema de saída são permitidas: sdf.selectExpr("stringColumn AS json").writeStream a sdf.select(to_json(...).as("json")).writeStream.
    • Alterações em projeções com esquema de saída diferente são condicionalmente permitidas: sdf.selectExpr("a").writeStream to sdf.selectExpr("b").writeStream é permitido somente se o coletor de saída permitir que o esquema mude de "a" para "b".
  • Alterações em operações com estado: algumas operações em consultas em streaming precisam manter dados de estado para atualizar continuamente os resultados. O Streaming Estruturado verifica automaticamente os dados de estado para armazenamento tolerante a falhas (por exemplo, DBFS, armazenamento de Blob do Azure) e os restaura após a reinicialização. No entanto, isso pressupõe que o esquema dos dados de estado permaneça o mesmo nas reinicializações. Isso significa que quaisquer alterações (ou seja, adições, exclusões ou modificações de esquema) nas operações stateful de uma consulta de streaming não são permitidas entre reinicializações. Aqui está a lista de operações com estado cujo esquema não deve ser alterado entre reinicios para garantir a recuperação do estado:

    • Agregação de streaming: Por exemplo, sdf.groupBy("a").agg(...). Não é permitida qualquer alteração no número ou tipo de chaves de agrupamento ou agregados.
    • Desduplicação de streaming: Por exemplo, sdf.dropDuplicates("a"). Não é permitida qualquer alteração no número ou tipo de chaves de agrupamento ou agregados.
    • Junção stream-stream: Por exemplo, (ou seja, sdf1.join(sdf2, ...) ambas as entradas são geradas com sparkSession.readStream). Não são permitidas alterações no esquema ou nas colunas de equijunção. Não são permitidas alterações no tipo de junção (exterior ou interior). Outras alterações na condição de junção são mal definidas.
    • Operação com estado arbitrária: Por exemplo, sdf.groupByKey(...).mapGroupsWithState(...) ou sdf.groupByKey(...).flatMapGroupsWithState(...). Qualquer alteração no esquema do estado definido pelo usuário e no tipo de tempo limite não é permitida. Qualquer alteração dentro da função de mapeamento de estado definida pelo usuário é permitida, mas o efeito semântico da alteração depende da lógica definida pelo usuário. Se você realmente quiser oferecer suporte a alterações de esquema de estado, poderá codificar/decodificar explicitamente suas estruturas de dados de estado complexas em bytes usando um esquema de codificação/decodificação que ofereça suporte à migração de esquema. Por exemplo, se você salvar seu estado como bytes codificados em Avro, poderá alterar o esquema Avro-state-schema entre as reinicializações da consulta, pois isso restaura o estado binário.

Importante

Os operadores com estado dropDuplicates() e dropDuplicatesWithinWatermark() podem falhar ao reiniciar devido a uma verificação de compatibilidade do esquema de estado ao mudar entre modos de acesso computacional.

É permitido mudar entre modos de acesso dedicado e sem isolamento. É permitido mudar entre modos de acesso padrão e serverless. Não tente alternar entre outras combinações de modos de acesso.

Para evitar este erro, não altere o modo de acesso de computação para consultas de streaming que contenham estes operadores.

Alterar as fontes de streaming com a evolução das fontes

Por defeito, o Structured Streaming identifica as fontes pela sua posição no plano de consulta, como 0, 1, 2, etc. Qualquer alteração no número ou ordem das fontes de entrada quebra a compatibilidade dos checkpoints e requer um checkpoint novo. A evolução das fontes permite-lhe atribuir nomes estáveis, definidos pelo utilizador, a cada fonte de transmissão em fluxo, para que possa reordenar, adicionar ou remover fontes de uma consulta sem perder o estado do ponto de verificação.

A evolução de código-fonte requer Databricks Runtime 18.2 e superiores.

Configuração necessária

Para permitir a evolução da fonte, defina duas configurações do Spark:

  • spark.sql.streaming.queryEvolution.enableSourceEvolution: Quando true, todas as fontes de streaming na consulta devem ser explicitamente nomeadas usando a .name() API. A predefinição é false.
  • spark.sql.streaming.offsetLog.formatVersion: Deve ser definido para 2 para usar o formato de rastreio de deslocamento baseado no nome. A predefinição é 1.

Defina ambas as configurações antes de definir a consulta de streaming:

spark.conf.set("spark.sql.streaming.queryEvolution.enableSourceEvolution", "true")
spark.conf.set("spark.sql.streaming.offsetLog.formatVersion", "2")

Regras de nomenclatura

  • Os nomes devem conter apenas caracteres alfanuméricos e sublinhados ([a-zA-Z0-9_]+).
  • Cada nome de origem deve ser único dentro de uma consulta.
  • Quando a evolução da fonte está ativada, cada fonte de streaming deve ter um nome. Fontes não identificadas causam um UNNAMED_STREAMING_SOURCES_WITH_ENFORCEMENT erro.

Reordenar, adicionar e remover fontes

As seguintes alterações são seguras ao longo de reinícios da consulta com o mesmo ponto de verificação:

  • Reordenar as fontes: Reiniciar a consulta com uma ordem diferente das fontes. Cada fonte retoma a partir do seu último offset confirmado, com base no respetivo nome, e não altera o estado do ponto de controlo.
  • Adicionar novas fontes: Reiniciar a consulta com uma nova fonte. As novas origens são processadas desde o início e as origens existentes continuam a partir dos seus últimos offsets.
  • Remover fontes: Reiniciar a consulta sem a fonte. A fonte é permanentemente removida do ponto de controlo. Uma fonte removida não pode ser adicionada novamente com o mesmo nome.

Example

Utilize .name() em DataStreamReader antes de invocar .load() ou .table():

Python

orders_us = (spark.readStream
  .name("orders_us")
  .table("catalog.schema.orders_us")
)

orders_eu = (spark.readStream
  .name("orders_eu")
  .table("catalog.schema.orders_eu")
)

all_orders = orders_us.union(orders_eu)

Scala

val ordersUS = spark.readStream
  .name("orders_us")
  .table("catalog.schema.orders_us")

val ordersEU = spark.readStream
  .name("orders_eu")
  .table("catalog.schema.orders_eu")

val allOrders = ordersUS.union(ordersEU)

Limitações

  • A nomeação da fonte requer um novo ponto de controlo. Não podes ativar a evolução de fonte com um checkpoint existente que use o formato de log offset V1.
  • Depois de uma atualização para o formato log offset V2, não podes voltar ao V1. Ver Configuração necessária.
  • Os nomes das fontes são permanentes. Para renomear uma fonte, remova-a e depois adicione-a com um novo nome. O código-fonte renomeado processa desde o início.