Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Lakeflow Declarative Pipelines simplifica a captura de dados de alteração (CDC) com as APIs AUTO CDC
e AUTO CDC FROM SNAPSHOT
.
Observação
As AUTO CDC
APIs substituem as APPLY CHANGES
APIs e têm a mesma sintaxe. As APPLY CHANGES
APIs ainda estão disponíveis, mas o Databricks recomenda usar as AUTO CDC
APIs em seu lugar.
A interface que você usa depende da fonte de dados de alteração:
- Utilize
AUTO CDC
para processar alterações de um feed de dados de alteração (CDF). - Use
AUTO CDC FROM SNAPSHOT
(Pré-visualização pública, disponível apenas para Python) para processar alterações em instantâneos de bases de dados.
Anteriormente, a instrução MERGE INTO
era normalmente usada para processar registros CDC no Azure Databricks. No entanto, MERGE INTO
pode produzir resultados incorretos devido a registros fora de sequência ou requer lógica complexa para reordenar registros.
A AUTO CDC
API é suportada nas interfaces SQL e Python do Lakeflow Declarative Pipelines. A AUTO CDC FROM SNAPSHOT
API é suportada na interface Python do Lakeflow Declarative Pipelines.
Tanto o AUTO CDC
quanto o AUTO CDC FROM SNAPSHOT
suportam a atualização de tabelas usando SCD tipo 1 e tipo 2:
- Use o SCD tipo 1 para atualizar registros diretamente. O histórico não é retido para registros atualizados.
- Use o SCD tipo 2 para manter um histórico de registros, seja em todas as atualizações ou em atualizações de um conjunto especificado de colunas.
Para sintaxe e outras referências, consulte AUTO CDC for Lakeflow Declarative Pipelines SQL, AUTO CDC for Lakeflow Declarative Pipelines Python e AUTO CDC FROM SNAPSHOT for Lakeflow Declarative Pipelines Python.
Observação
Este artigo descreve como atualizar tabelas em seus pipelines declarativos Lakeflow com base em alterações nos dados de origem. Para saber como registrar e consultar informações de alteração ao nível da linha para tabelas Delta, veja Usar o feed de Mudança de Dados do Delta Lake no Azure Databricks.
Requerimentos
Para usar as APIs CDC, seu pipeline deve ser configurado para usar serverlessLakeflow Declarative Pipelines ou Lakeflow Declarative Pipelines Pro
ou Advanced
edições.
Como o CDC é implementado com a API AUTO CDC?
Ao lidar automaticamente com registros fora de sequência, a API AUTO CDC em Lakeflow Declarative Pipelines garante o processamento correto de registros CDC e elimina a necessidade de desenvolver uma lógica complexa para lidar com registros fora de sequência. Você deve especificar uma coluna nos dados de origem na qual sequenciar registros, que o Lakeflow Declarative Pipelines interpreta como uma representação monotonicamente crescente da ordenação adequada dos dados de origem. O Lakeflow Declarative Pipelines lida automaticamente com os dados que chegam fora de ordem. Para alterações de tipo 2 do SCD, o Lakeflow Declarative Pipelines propaga os valores de sequenciamento apropriados para as colunas __START_AT
e __END_AT
da tabela de destino. Deve haver uma atualização distinta por chave em cada valor de sequenciamento, e os valores de sequenciamento NULL não são suportados.
Para executar o processamento CDC com AUTO CDC
, primeiro crie uma tabela de streaming e, em seguida, use a instrução AUTO CDC ... INTO
em SQL ou a função create_auto_cdc_flow()
em Python para especificar a origem, as chaves e o sequenciamento para o feed de alterações. Para criar a tabela de streaming de destino, use a instrução CREATE OR REFRESH STREAMING TABLE
em SQL ou a função create_streaming_table()
em Python. Consulte os exemplos de processamento SCD tipo 1 e tipo 2.
Para obter detalhes de sintaxe, consulte a referência SQL do Lakeflow Declarative Pipelines ou a referência do Python.
Como o CDC é implementado com a API AUTO CDC FROM SNAPSHOT
?
AUTO CDC FROM SNAPSHOT
é uma API declarativa que determina eficientemente as alterações nos dados de origem ao comparar uma série de instantâneos ordenados e, em seguida, executa o processo necessário para o tratamento CDC dos registos nesses instantâneos.
AUTO CDC FROM SNAPSHOT
é suportado apenas pela interface Python do Lakeflow Declarative Pipelines.
AUTO CDC FROM SNAPSHOT
suporta a ingestão de snapshots de vários tipos de origem:
- Utilize a ingestão periódica de instantâneos para inserir instantâneos de uma tabela ou vista existente.
AUTO CDC FROM SNAPSHOT
tem uma interface simples e simplificada para suportar a ingestão periódica de instantâneos de um objeto de banco de dados existente. Uma nova imagem instantânea é criada com cada atualização do fluxo de trabalho, e o tempo de ingestão é usado como a versão da imagem instantânea. Quando um pipeline é executado no modo contínuo, várias capturas instantâneas são incorporadas com cada atualização do pipeline num período determinado pela configuração do intervalo de gatilho no fluxo que contém oAUTO CDC FROM SNAPSHOT
processamento. - Utilize a ingestão de instantâneos históricos para processar ficheiros que contenham instantâneos de bases de dados, como os gerados a partir de uma base de dados Oracle ou MySQL ou de um data warehouse.
Para realizar o processamento CDC de qualquer tipo de origem com AUTO CDC FROM SNAPSHOT
, primeiro crie uma tabela de streaming e, em seguida, utilize a função create_auto_cdc_from_snapshot_flow()
em Python para especificar o instantâneo, as chaves e outros argumentos necessários para implementar o processamento. Veja os exemplos de ingestão periódica de instantâneos e de ingestão de instantâneos históricos .
Os snapshots passados para a API devem estar em ordem crescente por versão. Se o Lakeflow Declarative Pipelines detetar um instantâneo fora de ordem, um erro será gerado.
Para obter detalhes de sintaxe, consulte a referência de Python dos Lakeflow Declarative Pipelines.
Usar várias colunas para sequenciamento
Você pode sequenciar por várias colunas (por exemplo, um carimbo de data/hora e um ID para desempatar), pode usar uma STRUCT para os combinar: ordena pelo primeiro campo da STRUCT e, em caso de empate, considera o segundo campo, e assim por diante.
Exemplo em SQL:
SEQUENCE BY STRUCT(timestamp_col, id_col)
Exemplo em Python:
sequence_by = struct("timestamp_col", "id_col")
Limitações
A coluna usada para o sequenciamento deve ser um tipo de dados classificável.
Exemplo: processamento de SCD tipo 1 e tipo 2 com dados de origem CDF
As seções a seguir fornecem exemplos de consultas SCD tipo 1 e tipo 2 do Lakeflow Declarative Pipelines que atualizam tabelas de destino com base em eventos de origem de um feed de dados de alteração que:
- Cria novos registros de usuário.
- Exclui um registro de usuário.
- Atualiza os registros do usuário. No exemplo SCD tipo 1, as últimas operações
UPDATE
chegam atrasadas e são removidas da tabela de destino, demonstrando a manipulação de eventos fora de ordem.
Os exemplos a seguir pressupõem familiaridade com a configuração e atualização de Lakeflow Declarative Pipelines. Consulte Tutorial: Criar um pipeline ETL usando a captura de dados de alteração com Lakeflow Declarative Pipelines.
Para executar esses exemplos, você deve começar criando um conjunto de dados de exemplo. Veja Gerar dados de teste.
A seguir estão os registros de entrada para esses exemplos:
ID de Utilizador | Nome | cidade | operação | sequênciaNum |
---|---|---|---|---|
124 | Raul | Oaxaca | INSERT | 1 |
123 | Isabel | Monterrey | INSERT | 1 |
125 | Mercedes | Tijuana | INSERT | 2 |
126 | Lírio | Cancún | INSERT | 2 |
123 | null | null | SUPRIMIR | 6 |
125 | Mercedes | Guadalajara | UPDATE | 6 |
125 | Mercedes | Mexicali | UPDATE | 5 |
123 | Isabel | Chihuahua | UPDATE | 5 |
Se você descomentar a linha final nos dados de exemplo, ele inserirá o seguinte registro que especifica onde os registros devem ser truncados:
ID de Utilizador | Nome | cidade | operação | sequênciaNum |
---|---|---|---|---|
null | null | null | TRUNCAR | 3 |
Observação
Todos os exemplos a seguir incluem opções para especificar operações DELETE
e TRUNCATE
, mas cada uma é opcional.
Processar atualizações do tipo 1 do SCD
O exemplo a seguir demonstra o processamento de atualizações do SCD tipo 1:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW flowname AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
Depois de executar o exemplo SCD tipo 1, a tabela de destino contém os seguintes registros:
ID de Utilizador | Nome | cidade |
---|---|---|
124 | Raul | Oaxaca |
125 | Mercedes | Guadalajara |
126 | Lírio | Cancún |
Depois de executar o exemplo SCD tipo 1 com o registo adicional TRUNCATE
, os registos 124
e 126
são truncados devido à operação TRUNCATE
em sequenceNum=3
, e a tabela de destino contém o seguinte registo:
ID de Utilizador | Nome | cidade |
---|---|---|
125 | Mercedes | Guadalajara |
Processar atualizações do tipo 2 do SCD
O exemplo a seguir demonstra o processamento de atualizações do tipo 2 do SCD:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW target_flow
AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
Depois de executar o exemplo SCD tipo 2, a tabela de destino contém os seguintes registros:
ID de Utilizador | Nome | cidade | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Monterrey | 1 | 5 |
123 | Isabel | Chihuahua | 5 | 6 |
124 | Raul | Oaxaca | 1 | null |
125 | Mercedes | Tijuana | 2 | 5 |
125 | Mercedes | Mexicali | 5 | 6 |
125 | Mercedes | Guadalajara | 6 | null |
126 | Lírio | Cancún | 2 | null |
Uma consulta SCD tipo 2 também pode especificar um subconjunto de colunas de saída a serem rastreadas para o histórico na tabela de destino. As alterações em outras colunas são atualizadas no local, em vez de gerar novos registros de histórico. O exemplo a seguir demonstra a exclusão da coluna city
do rastreamento:
O exemplo a seguir demonstra o uso do histórico de faixas com SCD tipo 2:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW target_flow
AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
Depois de executar este exemplo sem o registro de TRUNCATE
adicional, a tabela de destino contém os seguintes registros:
ID de Utilizador | Nome | cidade | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Chihuahua | 1 | 6 |
124 | Raul | Oaxaca | 1 | null |
125 | Mercedes | Guadalajara | 2 | null |
126 | Lírio | Cancún | 2 | null |
Gerar dados de teste
O código abaixo é fornecido para gerar um conjunto de dados de exemplo para uso nas consultas de exemplo presentes neste tutorial. Supondo que você tenha as credenciais adequadas para criar um novo esquema e criar uma nova tabela, você pode executar essas instruções com um bloco de anotações ou Databricks SQL. O código a seguir não se destina a ser executado como parte do Lakeflow Declarative Pipelines:
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
Exemplo: processamento periódico de captura instantânea
O exemplo a seguir demonstra o processamento SCD tipo 2 que ingere instantâneos de uma tabela armazenada em mycatalog.myschema.mytable
. Os resultados do processamento são gravados em uma tabela chamada target
.
mycatalog.myschema.mytable
registos na marca temporal 2024-01-01 00:00:00
Chave | Valor |
---|---|
1 | a1 |
2 | a2 |
mycatalog.myschema.mytable
registos na marca temporal 2024-01-01 12:00:00
Chave | Valor |
---|---|
2 | b2 |
3 | a3 |
import dlt
@dlt.view(name="source")
def source():
return spark.read.table("mycatalog.myschema.mytable")
dlt.create_streaming_table("target")
dlt.create_auto_cdc_from_snapshot_flow(
target="target",
source="source",
keys=["key"],
stored_as_scd_type=2
)
Depois de processar os instantâneos, a tabela de destino contém os seguintes registros:
Chave | Valor | __START_AT | __END_AT |
---|---|---|---|
1 | a1 | 1 de janeiro de 2024, 00:00:00 | 2024-01-01 12:00:00 |
2 | a2 | 1 de janeiro de 2024, 00:00:00 | 2024-01-01 12:00:00 |
2 | b2 | 2024-01-01 12:00:00 | null |
3 | a3 | 2024-01-01 12:00:00 | null |
Exemplo: processamento de instantâneos históricos
O exemplo a seguir demonstra o processamento SCD tipo 2 que atualiza uma tabela de destino com base em eventos de origem de dois snapshots armazenados em um sistema de armazenamento em nuvem:
Snapshot em timestamp
, armazenado em /<PATH>/filename1.csv
Chave | Coluna de Acompanhamento | ColunaSemRastreamento |
---|---|---|
1 | a1 | b1 |
2 | a2 | b2 |
4 | a4 | b4 |
Snapshot em timestamp + 5
, armazenado em /<PATH>/filename2.csv
Chave | Coluna de Acompanhamento | ColunaSemRastreamento |
---|---|---|
2 | a2_new | b2 |
3 | a3 | b3 |
4 | a4 | b4_new |
O exemplo de código a seguir demonstra o processamento de atualizações de tipo 2 do SCD com esses instantâneos.
import dlt
def exist(file_name):
# Storage system-dependent function that returns true if file_name exists, false otherwise
# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
latest_snapshot_version = latest_snapshot_version or 0
next_version = latest_snapshot_version + 1
file_name = "dir_path/filename_" + next_version + ".csv"
if (exist(file_name)):
return (spark.read.load(file_name), next_version)
else:
# No snapshot available
return None
dlt.create_streaming_live_table("target")
dlt.create_auto_cdc_from_snapshot_flow(
target = "target",
source = next_snapshot_and_version,
keys = ["Key"],
stored_as_scd_type = 2,
track_history_column_list = ["TrackingCol"]
)
Depois de processar os instantâneos, a tabela de destino contém os seguintes registros:
Chave | Coluna de Acompanhamento | ColunaSemRastreamento | __START_AT | __END_AT |
---|---|---|---|---|
1 | a1 | b1 | 1 | 2 |
2 | a2 | b2 | 1 | 2 |
2 | a2_new | b2 | 2 | null |
3 | a3 | b3 | 2 | null |
4 | a4 | b4_new | 1 | null |
Adicionar, alterar ou excluir dados em uma tabela de streaming de destino
Se o pipeline publicar tabelas no Unity Catalog, poderá usar instruções de linguagem de manipulação de dados ( DML), incluindo as instruções insert, update, delete e merge, para modificar as tabelas de streaming de destino criadas por instruções AUTO CDC ... INTO
.
Observação
- Não há suporte para instruções DML que modificam o esquema de tabela de uma tabela de streaming. Certifique-se de que suas instruções DML não tentem evoluir o esquema da tabela.
- As instruções DML que atualizam uma tabela de streaming podem ser executadas somente em um cluster compartilhado do Catálogo Unity ou em um armazém SQL usando o Databricks Runtime 13.3 LTS e superior.
- Como o streaming requer fontes de dados que apenas permitem acréscimos, se o seu processamento exigir streaming a partir de uma tabela de streaming de origem com alterações (por exemplo, por instruções DML), defina o sinalizador skipChangeCommits ao ler a tabela de origem no streaming. Quando
skipChangeCommits
é definido, as transações que excluem ou modificam registros na tabela de origem são ignoradas. Se o teu processamento não exigir uma tabela de streaming, podes usar uma vista materializada (que não possui a restrição de apenas inserção) como tabela de destino.
Como o Lakeflow Declarative Pipelines usa uma coluna especificada SEQUENCE BY
e propaga valores de sequenciamento apropriados para as __START_AT
colunas e __END_AT
da tabela de destino (para SCD tipo 2), você deve garantir que as instruções DML usem valores válidos para essas colunas para manter a ordenação adequada dos registros. Consulte Como o CDC é implementado com a API AUTO CDC?.
Para obter mais informações sobre como usar instruções DML com tabelas de streaming, consulte Adicionar, alterar ou excluir dados em uma tabela de streaming.
O exemplo a seguir insere um registro ativo com uma sequência inicial de 5:
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);
Ler um feed de dados de alteração a partir de uma tabela alvo do AUTO CDC
No Databricks Runtime 15.2 e versões posteriores, você pode ler um feed de dados de alterações em uma tabela de streaming que serve como destino de consultas AUTO CDC
ou AUTO CDC FROM SNAPSHOT
, tal como lê um feed de dados de alteração em outras tabelas Delta. Os seguintes requisitos são necessários para aceder ao fluxo de dados de alterações de uma tabela de streaming de destino:
- A tabela de streaming de destino deve ser publicada no Catálogo Unity. Veja Utilize o Catálogo Unity com as suas Pipelines Declarativas Lakeflow.
- Para ler o feed de dados de alteração da tabela de streaming de destino, você deve usar o Databricks Runtime 15.2 ou superior. Para ler o feed de dados de alteração em um pipeline diferente, o pipeline deve ser configurado para usar o Databricks Runtime 15.2 ou superior.
Você lê o feed de dados de alteração de uma tabela de streaming de destino criada em Lakeflow Declarative Pipelines da mesma forma que lê um feed de dados de alteração de outras tabelas Delta. Para saber mais sobre como usar a funcionalidade de feed de dados de alteração Delta, incluindo exemplos em Python e SQL, consulte Usar feed de dados de alteração Delta Lake no Azure Databricks.
Observação
O registo de feed de dados de alteração inclui metadados identificando o tipo de evento de alteração. Quando um registo é atualizado numa tabela, os metadados para os registos de alteração associados geralmente incluem valores de _change_type
definidos como eventos de update_preimage
e update_postimage
.
No entanto, os valores _change_type
são diferentes se forem feitas atualizações na tabela de streaming de destino que incluam a alteração de valores de chave primária. Quando as alterações incluem atualizações de chaves primárias, os campos de metadados _change_type
são definidos como insert
e delete
eventos. As alterações nas chaves primárias podem ocorrer quando atualizações manuais são feitas em um dos campos-chave com uma instrução UPDATE
ou MERGE
ou, para tabelas SCD tipo 2, quando o campo __start_at
é alterado para refletir um valor de sequência inicial anterior.
A consulta AUTO CDC
determina os valores de chave primária, que diferem para processamento de SCD tipo 1 e SCD tipo 2:
- Para o processamento SCD tipo 1 e a interface Python Lakeflow Declarative Pipelines, a chave primária é o valor do parâmetro
keys
na funçãocreate_auto_cdc_flow()
. Para a interface SQL do Lakeflow Declarative Pipelines, a chave primária são as colunas definidas pelaKEYS
cláusula naAUTO CDC ... INTO
instrução. - Para SCD tipo 2, a chave primária é o parâmetro
keys
ou a cláusulaKEYS
mais o valor de retorno da operaçãocoalesce(__START_AT, __END_AT)
, onde__START_AT
e__END_AT
são as colunas correspondentes da tabela de streaming de destino.
Obter dados sobre registos processados por uma consulta CDC de Lakeflow Declarative Pipelines
Observação
As métricas a seguir são capturadas apenas por consultas AUTO CDC
e não por consultas AUTO CDC FROM SNAPSHOT
.
As métricas a seguir são capturadas por consultas AUTO CDC
:
-
num_upserted_rows
: O número de linhas de saída inseridas no conjunto de dados durante uma atualização. -
num_deleted_rows
: O número de linhas de saída existentes excluídas do conjunto de dados durante uma atualização.
A métrica num_output_rows
, resultado para fluxos não CDC, não é capturada para as consultas AUTO CDC
.
Quais objetos de dados são usados para o processamento CDC do Lakeflow Declarative Pipelines?
Observação
- Estas estruturas de dados aplicam-se apenas ao tratamento
AUTO CDC
e não ao tratamentoAUTO CDC FROM SNAPSHOT
. - Essas estruturas de dados se aplicam somente quando a tabela de destino é publicada no metastore do Hive. Se um pipeline for publicado no Unity Catalog, as tabelas de suporte internas ficarão inacessíveis aos utilizadores.
Quando você declara a tabela de destino no metastore do Hive, duas estruturas de dados são criadas:
- Um modo de exibição usando o nome atribuído à tabela de destino.
- Uma tabela de apoio interna usada pelo Lakeflow Declarative Pipelines para gerir o processamento de CDC. Esta tabela é nomeada ao adicionar
__apply_changes_storage_
ao nome da tabela de destino.
Por exemplo, se você declarar uma tabela de destino chamada dlt_cdc_target
, verá uma exibição chamada dlt_cdc_target
e uma tabela chamada __apply_changes_storage_dlt_cdc_target
no metastore. A criação de uma exibição permite que o Lakeflow Declarative Pipelines filtre as informações extras (por exemplo, lápides e versões) necessárias para lidar com dados fora de ordem. Para visualizar os dados processados, consulte a vista alvo. Como o esquema da tabela __apply_changes_storage_
pode ser alterado para oferecer suporte a recursos ou aprimoramentos futuros, você não deve consultar a tabela para uso em produção. Se você adicionar dados manualmente à tabela, presume-se que os registros venham antes de outras alterações porque as colunas de versão estão ausentes.