Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Pontos de verificação e registos de antecipação de escrita trabalham juntos para fornecer garantias de processamento para cargas de trabalho de Transmissão Estruturada. O ponto de verificação rastreia as informações que identificam a consulta, incluindo informações de estado e registros processados. Quando se apaga os arquivos num diretório de checkpoint ou se altera para um novo local de checkpoint, a próxima execução da consulta inicia-se do zero.
Cada consulta deve ter um local de ponto de verificação diferente. Várias consultas não devem em hipótese alguma compartilhar a mesma localização.
Habilitar o ponto de verificação para consultas de Streaming Estruturado
Você deve especificar a checkpointLocation opção antes de executar uma consulta de streaming, como no exemplo a seguir:
Python
(df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
)
Scala
df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
Nota
Alguns destinos, como a saída de display() nos notebooks e o destino memory, geram automaticamente um local de ponto de verificação temporário se você omitir essa opção. Esses locais de ponto de verificação temporários não garantem tolerância a falhas ou consistência de dados e podem não ser limpos corretamente. A Databricks recomenda sempre especificar um local de ponto de verificação para esses coletores.
Recuperar após alterações em uma consulta de Streaming Estruturado
Há limitações sobre quais alterações em uma consulta de streaming são permitidas entre reinicializações do mesmo local de ponto de verificação. Aqui estão algumas alterações que não são permitidas ou o efeito da alteração não está bem definido. Para todos eles:
- O termo permitido significa que você pode fazer a alteração especificada, mas se a semântica de seu efeito está bem definida depende da consulta e da alteração.
- O termo não permitido significa que você não deve fazer a alteração especificada, pois a consulta reiniciada provavelmente falhará com erros imprevisíveis.
-
sdfrepresenta um DataFrame/Dataset de streaming gerado comsparkSession.readStream.
Tipos de alterações em consultas de Streaming Estruturado
Alterações no número ou tipo (ou seja, fonte diferente) de fontes de entrada: Isso não é permitido.
Alterações nos parâmetros das fontes de entrada: Se isto é permitido e se a semântica da alteração está bem definida depende da fonte e da consulta, incluindo controlos de admissão como
maxFilesPerTriggeroumaxOffsetsPerTrigger. Eis alguns exemplos.É permitida a adição, supressão e modificação dos limites tarifários:
spark.readStream.format("kafka").option("subscribe", "article")para
spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)Alterações em artigos e arquivos inscritos geralmente não são permitidas, pois os resultados são imprevisíveis:
spark.readStream.format("kafka").option("subscribe", "article")atéspark.readStream.format("kafka").option("subscribe", "newarticle")
Alterações no intervalo de gatilho: você pode alterar gatilhos entre lotes incrementais e intervalos de tempo. Consulte Alterar intervalos de gatilho entre execuções.
Alterações no tipo de dissipador de saída: Alterações entre algumas combinações específicas de dissipadores são permitidas. Isto tem de ser verificado caso a caso. Eis alguns exemplos.
- É permitido o uso de um sink de ficheiro para um sink de Kafka. Kafka verá apenas os novos dados.
- Kafka sink para file sink não é permitido.
- Kafka destino mudou para foreach, ou vice-versa, é permitido.
Alterações nos parâmetros do coletor de saída: se isso é permitido e se a semântica da alteração está bem definida depende do coletor e da consulta. Eis alguns exemplos.
- Não são permitidas alterações no diretório de saída de um coletor de arquivos:
sdf.writeStream.format("parquet").option("path", "/somePath")atésdf.writeStream.format("parquet").option("path", "/anotherPath") - Alterações no tópico de saída são permitidas:
sdf.writeStream.format("kafka").option("topic", "topic1")parasdf.writeStream.format("kafka").option("topic", "topic2") - Alterações no sumidouro foreach definido pelo usuário (ou seja, o código
ForeachWriter) são permitidas, mas a semântica das alterações depende do código.
- Não são permitidas alterações no diretório de saída de um coletor de arquivos:
Alterações nas operações de projeção/filtro/tipo mapa: Alguns casos são permitidos. Por exemplo:
- É permitida a adição/supressão de filtros:
sdf.selectExpr("a")asdf.where(...).selectExpr("a").filter(...). - Alterações nas projeções com o mesmo esquema de saída são permitidas:
sdf.selectExpr("stringColumn AS json").writeStreamasdf.select(to_json(...).as("json")).writeStream. - Alterações em projeções com esquema de saída diferente são condicionalmente permitidas:
sdf.selectExpr("a").writeStreamtosdf.selectExpr("b").writeStreamé permitido somente se o coletor de saída permitir que o esquema mude de"a"para"b".
- É permitida a adição/supressão de filtros:
Alterações em operações com estado: algumas operações em consultas em streaming precisam manter dados de estado para atualizar continuamente os resultados. O Streaming Estruturado verifica automaticamente os dados de estado para armazenamento tolerante a falhas (por exemplo, DBFS, armazenamento de Blob do Azure) e os restaura após a reinicialização. No entanto, isso pressupõe que o esquema dos dados de estado permaneça o mesmo nas reinicializações. Isso significa que quaisquer alterações (ou seja, adições, exclusões ou modificações de esquema) nas operações stateful de uma consulta de streaming não são permitidas entre reinicializações. Aqui está a lista de operações com estado cujo esquema não deve ser alterado entre reinicios para garantir a recuperação do estado:
-
Agregação de streaming: Por exemplo,
sdf.groupBy("a").agg(...). Não é permitida qualquer alteração no número ou tipo de chaves de agrupamento ou agregados. -
Desduplicação de streaming: Por exemplo,
sdf.dropDuplicates("a"). Não é permitida qualquer alteração no número ou tipo de chaves de agrupamento ou agregados. -
Junção stream-stream: Por exemplo, (ou seja,
sdf1.join(sdf2, ...)ambas as entradas são geradas comsparkSession.readStream). Não são permitidas alterações no esquema ou nas colunas de equijunção. Não são permitidas alterações no tipo de junção (exterior ou interior). Outras alterações na condição de junção são mal definidas. -
Operação com estado arbitrária: Por exemplo,
sdf.groupByKey(...).mapGroupsWithState(...)ousdf.groupByKey(...).flatMapGroupsWithState(...). Qualquer alteração no esquema do estado definido pelo usuário e no tipo de tempo limite não é permitida. Qualquer alteração dentro da função de mapeamento de estado definida pelo usuário é permitida, mas o efeito semântico da alteração depende da lógica definida pelo usuário. Se você realmente quiser oferecer suporte a alterações de esquema de estado, poderá codificar/decodificar explicitamente suas estruturas de dados de estado complexas em bytes usando um esquema de codificação/decodificação que ofereça suporte à migração de esquema. Por exemplo, se você salvar seu estado como bytes codificados em Avro, poderá alterar o esquema Avro-state-schema entre as reinicializações da consulta, pois isso restaura o estado binário.
-
Agregação de streaming: Por exemplo,
Importante
Os operadores com estado dropDuplicates() e dropDuplicatesWithinWatermark() podem falhar ao reiniciar devido a uma verificação de compatibilidade do esquema de estado ao mudar entre modos de acesso computacional.
É permitido mudar entre modos de acesso dedicado e sem isolamento. É permitido mudar entre modos de acesso padrão e serverless. Não tente alternar entre outras combinações de modos de acesso.
Para evitar este erro, não altere o modo de acesso de computação para consultas de streaming que contenham estes operadores.