Usar o feed de dados de alterações do Delta Lake no Azure Databricks

Observação

O feed de dados de alteração permite que o Azure Databricks acompanhe as alterações no nível de linha entre as versões de uma tabela Delta. Quando ele está habilitado em uma tabela Delta, o runtime registra eventos de alteração para todos os dados gravados na tabela. Isso inclui os dados de linha com metadados que indicam se a linha especificada foi inserida, excluída ou atualizada.

Você pode ler os eventos de alteração em consultas em lote usando o SQL do Spark, o Apache Spark DataFrames e o Streaming Estruturado.

Importante

O feed de dados de alterações funciona em conjunto com o histórico de tabelas para fornecer informações de alteração. Como a clonagem de uma tabela Delta cria um histórico separado, o feed de dados de alterações em tabelas clonadas não corresponde ao da tabela original.

Casos de uso

O feed de dados de alterações não está habilitado por padrão. Os casos de uso a seguir deverão orientar você ao habilitar o feed de dados de alterações.

  • Tabelas Silver e Gold: aprimorar o desempenho do Delta Lake processando apenas as alterações no nível de linha após as operações iniciais MERGE, UPDATE ou DELETE para acelerar e simplificar as operações de ETL e ELT.
  • Exibições materializadas: crie exibições agregadas e atualizadas de informações para uso em BI e análise sem precisar reprocessar as tabelas subjacentes completas, atualizando somente os locais em que as alterações foram feitas.
  • Transmissão de alterações: envie um feed de dados de alterações para sistemas downstream, como o Kafka ou o RDBMS, que podem usá-lo para processamento incremental nas fases posteriores dos pipelines de dados.
  • Tabela de trilha de auditoria: capturar o feed de dados de alterações como uma tabela Delta fornece armazenamento perpétuo e funcionalidade de consulta eficiente para ver todas as alterações ao longo do tempo, incluindo quando ocorrem exclusões e quais atualizações foram feitas.

Habilitar o feed de dados de alterações

Habilite explicitamente a opção de feed de dados de alterações usando um dos seguintes métodos:

  • Nova tabela: defina a propriedade de tabela delta.enableChangeDataFeed = true no comando CREATE TABLE.

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Tabela existente: defina a propriedade de tabela delta.enableChangeDataFeed = true no comando ALTER TABLE.

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Todas as novas tabelas:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Importante

Somente as alterações feitas após a habilitação do feed de dados de alterações são registradas. As alterações anteriores em uma tabela não são capturadas.

Alterar o armazenamento de dados

O Azure Databricks registra os dados de alterações para operações UPDATE, DELETE e MERGE na pasta _change_data no diretório da tabela. Algumas operações, como operações somente inserção e exclusões de partição completas, não geram dados no diretório _change_data porque o Azure Databricks pode calcular com eficiência o feed de dados de alterações diretamente no log de transações.

Os arquivos na pasta _change_data seguem a política de retenção da tabela. Portanto, se você executar o comando VACUUM, os dados do feed de dados de alterações também serão excluídos.

Ler as alterações em consultas em lote

Você pode fornecer a versão ou o carimbo de data/hora para o início e o término. As versões inicial e final e os carimbos de data/hora são inclusivos nas consultas. Para ler as alterações de uma versão inicial específica para a última versão da tabela, especifique apenas a versão inicial ou o carimbo de data/hora.

Especifique uma versão como um inteiro e um carimbo de data/hora como uma cadeia de caracteres no formato yyyy-MM-dd[ HH:mm:ss[.SSS]].

Se você fornecer uma versão inferior ou um carimbo de data/hora anterior àquele que registrou os eventos de alteração, ou seja, quando o feed de dados de alterações foi habilitado, um erro será gerado indicando que o feed de dados de alterações não foi habilitado.

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

-- path based tables
SELECT * FROM table_changes_by_path('\path', '2021-04-21 05:45:46')

Python

# version as ints or longs
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# path based tables
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .load("pathToMyDeltaTable")

Scala

// version as ints or longs
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// path based tables
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .load("pathToMyDeltaTable")

Ler as alterações em consultas de streaming

Python

# providing a starting version
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# providing a starting timestamp
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", "2021-04-21 05:35:43") \
  .load("/pathToMyDeltaTable")

# not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .table("myDeltaTable")

Scala

// providing a starting version
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// providing a starting timestamp
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", "2021-04-21 05:35:43")
  .load("/pathToMyDeltaTable")

// not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

Para obter os dados de alterações durante a leitura da tabela, defina a opção readChangeFeed como true. A startingVersion ou o startingTimestamp são opcionais e, se não são fornecidos, o fluxo retorna o instantâneo mais recente da tabela no momento do streaming como um INSERT e as alterações futuras como dados de alterações. Há suporte para opções como limites de taxa (maxFilesPerTrigger, maxBytesPerTrigger) e excludeRegex na leitura de dados de alterações.

Observação

A limitação de taxa pode ser atômica para versões diferentes da versão inicial do instantâneo. Ou seja, toda a versão de confirmação será limitada por taxa ou a confirmação inteira será retornada.

Por padrão, se um usuário passar uma versão ou um carimbo de data/hora que exceda a última confirmação em uma tabela, o erro timestampGreaterThanLatestCommit será gerado. No Databricks Runtime 11.3 LTS e acima, o feed de dados de alterações poderá lidar com o caso de versão fora do intervalo se o usuário definir a configuração a seguir como true:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Se você fornecer uma versão inicial maior que a última confirmação em uma tabela ou um carimbo de data/hora de início mais recente que a última confirmação em uma tabela, quando a configuração anterior estiver habilitada, um resultado de leitura vazio será retornado.

Se você fornecer uma versão final maior que a última confirmação em uma tabela ou um carimbo de data/hora de término mais recente que a última confirmação em uma tabela, quando a configuração anterior estiver habilitada no modo de leitura em lote, todas as alterações entre a versão inicial e a última confirmação serão retornadas.

Qual é o esquema do feed de dados de alterações?

Quando você lê do feed de dados de alterações de uma tabela, o esquema para a versão mais recente da tabela é usado.

Observação

Há suporte total para a maioria das operações de alteração e evolução do esquema. Tabela com mapeamento de coluna habilitado não dá suporte a todos os casos de uso e demonstra um comportamento diferente. Confira Limitações do feed de dados de alterações para tabelas com mapeamento de coluna habilitado.

Além das colunas de dados do esquema da tabela Delta, o feed de dados de alterações contém colunas de metadados que identificam o tipo de evento de alteração:

Nome da coluna Type Valores
_change_type String insert, update_preimage , update_postimage, delete(1)
_commit_version long O log Delta ou a versão da tabela que contém a alteração.
_commit_timestamp Timestamp O carimbo de data/hora associado de quando a confirmação foi criada.

(1)preimage é o valor antes da atualização, postimage é o valor após a atualização.

Observação

Você não poderá habilitar o feed de dados de alterações em uma tabela se o esquema contiver colunas com os mesmos nomes que essas colunas adicionadas. Renomeie as colunas na tabela para resolver esse conflito antes de tentar habilitar o feed de dados de alterações.

Limitações do feed de dados de alterações para tabelas com mapeamento de coluna habilitado

Com o mapeamento de coluna habilitado em uma tabela Delta, você pode remover ou renomear colunas na tabela sem reescrever arquivos de dados para dados existentes. Com o mapeamento de coluna habilitado, o feed de dados de alterações tem limitações após a execução de alterações de esquema não aditivas, como renomear ou descartar uma coluna, alterar o tipo de dados ou alterações de nulidade.

Importante

  • Não é possível ler o feed de dados de alterações para uma transação ou intervalo no qual ocorre uma alteração de esquema não aditiva usando semântica de lote.
  • No Databricks Runtime 12.2 LTS e versões anteriores, tabelas com mapeamento de coluna habilitado que tiveram alterações de esquema não aditivas não dão suporte a leituras de streaming no feed de dados de alterações. Confira Streaming com mapeamento de coluna e alterações de esquema.
  • No Databricks Runtime 11.3 LTS e versões anteriores, você não pode ler o feed de dados de alterações para tabelas com mapeamento de coluna habilitado que tenham passado por renomeação ou remoção de colunas.

No Databricks Runtime 12.2 LTS e versões superiores, você pode executar leituras em lote no feed de dados de alterações para tabelas com mapeamento de coluna habilitado que tiveram alterações de esquema não aditivas. Em vez de usar o esquema da versão mais recente da tabela, as operações de leitura usam o esquema da versão final da tabela especificada na consulta. As consultas ainda falharão se o intervalo de versão especificado abranger uma alteração de esquema não aditiva.

Perguntas frequentes (FAQ)

Qual é a sobrecarga de habilitar o feed de dados de alterações?

Não há nenhum impacto significativo. Os registros de dados de alterações são gerados em linha durante o processo de execução da consulta e geralmente são muito menores do que o tamanho total dos arquivos reescritos.

Qual é a política de retenção para os registros de alteração?

Os registros de alteração seguem a mesma política de retenção das versões de tabela desatualizadas e serão limpos por meio de VACUUM se estiverem fora do período de retenção especificado.

Quando novos registros ficam disponíveis no feed de dados de alterações?

Os dados de alterações são confirmados com a transação do Delta Lake e serão disponibilizados ao mesmo tempo que os novos dados ficam disponíveis na tabela.

Exemplo de notebook: propagar alterações com o feed de dados de alteração Delta

Esse notebook mostra como propagar as alterações feitas em uma tabela Silver de número absoluto de vacinações para uma tabela Gold de taxas de vacinação.

Alterar o notebook do feed de dados

Obter notebook