Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Deklarativní potrubí Lakeflow Spark (SDP) zjednodušují zachycování změn dat (CDC) pomocí AUTO CDC a AUTO CDC FROM SNAPSHOT API.
Poznámka:
Rozhraní AUTO CDC API nahrazují APPLY CHANGES rozhraní API a mají stejnou syntaxi. Rozhraní APPLY CHANGES API jsou stále dostupná, ale Databricks doporučuje namísto nich používat rozhraní AUTO CDC API.
Rozhraní, které používáte, závisí na zdroji změn dat:
- Použijte
AUTO CDCke zpracování změn z kanálu změnových dat (CDF). - Ke zpracování změn ve snímcích databáze použijte
AUTO CDC FROM SNAPSHOT(Public Preview a je k dispozici pouze pro Python).
Dříve se MERGE INTO tento příkaz běžně používal ke zpracování záznamů CDC v Azure Databricks. Může však MERGE INTO vést k nesprávným výsledkům z důvodu záznamů mimo posloupnosti nebo vyžaduje složitou logiku pro opakované řazení záznamů.
Rozhraní AUTO CDC API je podporováno v pipeline rozhraních SQL a Python. Rozhraní AUTO CDC FROM SNAPSHOT API je podporované v rozhraní Pythonu. Deklarativní AUTO CDC kanály Apache Sparku nepodporují rozhraní API.
AUTO CDC i AUTO CDC FROM SNAPSHOT podporují aktualizaci tabulek pomocí SCD typu 1 a typu 2.
- Pomocí scd typu 1 aktualizujte záznamy přímo. Historie se neuchovává pro aktualizované záznamy.
- Pomocí scd typu 2 zachovejte historii záznamů, a to buď u všech aktualizací, nebo aktualizací v zadané sadě sloupců.
Syntaxe a další odkazy najdete v AUTO CDC pro kanály (SQL),AUTO CDC pro kanály (Python) a AUTO CDC FROM SNAPSHOT pro kanály (Python).
Poznámka:
Tento článek popisuje, jak aktualizovat tabulky v kanálech v souvislosti se změnami ve zdrojových datech. Informace o záznamu a dotazování informací o změnách na úrovni řádků pro tabulky Delta najdete v tématu Použití datového kanálu změn Delta Lake v Azure Databricks.
Požadavky
Pokud chcete používat rozhraní API CDC, musí být váš kanál nakonfigurovaný tak, aby používal bezserverový protokol SDP nebo SDPPro nebo Advancededice.
Jak se implementuje CDC pomocí rozhraní API AUTO CDC?
Díky automatickému zpracování záznamů mimo pořadí zajišťuje Auto CDC API správné zpracování záznamů CDC a eliminuje potřebu vyvíjet složitou logiku pro zpracování záznamů mimo pořadí. Ve zdrojových datech musíte zadat sloupec, na kterém se mají sekvencovat záznamy, které rozhraní API interpretují jako monotonicky rostoucí reprezentaci správného řazení zdrojových dat. Datové pipeline automaticky zpracovávají data, která přicházejí v nesprávném pořadí. U změn typu SCD 2 pipeliny šíří odpovídající hodnoty sekvencování do sloupců __START_AT a __END_AT cílové tabulky. V každé hodnotě sekvencování by měla existovat jedna samostatná aktualizace klíče a hodnoty sekvencování NULL nejsou podporovány.
K provedení zpracování CDC s AUTO CDC nejprve vytvoříte streamovací tabulku a pak použijete AUTO CDC ... INTO příkaz v SQL nebo create_auto_cdc_flow() funkci v Pythonu k určení zdroje, klíčů a sekvencování změn. Pokud chcete vytvořit cílovou streamovací tabulku, použijte CREATE OR REFRESH STREAMING TABLE příkaz v SQL nebo funkci v Pythonu create_streaming_table() . Podívejte se na příklady zpracování typu SCD typu 1 a typu 2 .
Podrobnosti o syntaxi najdete v referenčních informacích k SQL kanálům nebo v referenčních informacích k Pythonu.
Jak je CDC implementováno s rozhraním AUTO CDC FROM SNAPSHOT API?
Důležité
Rozhraní AUTO CDC FROM SNAPSHOT API je ve verzi Public Preview.
AUTO CDC FROM SNAPSHOT je deklarativní rozhraní API, které efektivně určuje změny ve zdrojových datech porovnáním řady snímků v daném pořadí a pak spustí zpracování potřebné ke zpracování záznamů ve snímcích CDC.
AUTO CDC FROM SNAPSHOT je podporováno pouze rozhraním kanálů Pythonu.
AUTO CDC FROM SNAPSHOT podporuje ingestování snímků z více zdrojových typů:
- K příjmu snímků z existující tabulky nebo zobrazení použijte periodické příjímání snímků.
AUTO CDC FROM SNAPSHOTmá jednoduché zjednodušené rozhraní, které podporuje pravidelné ingestování snímků z existujícího databázového objektu. S každou aktualizací datového kanálu se zpracovává nový snímek a čas zpracování se použije jako verze snímku. Při spuštění kanálu v průběžném režimu se ingestuje několik snímků s každou aktualizací kanálu v období určeném nastavením intervalu triggeruAUTO CDC FROM SNAPSHOTpro tok, který obsahuje zpracování. - K zpracování souborů obsahujících snímky databáze, jako jsou snímky vygenerované z databáze Oracle nebo MySQL nebo datového skladu, použijte historický příjem snímků.
Pokud chcete provádět zpracování CDC z libovolného zdrojového typu pomocí AUTO CDC FROM SNAPSHOT, nejprve vytvoříte streamovací tabulku a pak pomocí create_auto_cdc_from_snapshot_flow() funkce v Pythonu určíte snímek, klíče a další argumenty potřebné k implementaci zpracování. Podívejte se na příklady pravidelného příjmu snímků a historického příjmu snímků .
Snímky předané do rozhraní API musí být ve vzestupném pořadí podle verze. Pokud SDP zjistí snímek mimo pořadí, vyvolá se chyba.
Podrobnosti o syntaxi najdete v referenčních informacích k Pythonu v kanálech.
Použití více sloupců pro sekvencování
Můžete je sekvencovat podle více sloupců (například časové razítko a ID pro rozlišení shod), k jejich zkombinování můžete použít strukturu: se řadí nejprve podle prvního pole struktury a v případě shody je zohledněno druhé pole, atd.
Příklad v SQL:
SEQUENCE BY STRUCT(timestamp_col, id_col)
Příklad v Pythonu:
sequence_by = struct("timestamp_col", "id_col")
Omezení
Sloupec použitý pro sekvencování musí být řaditelný datový typ.
Příklad: Zpracování SCD typu 1 a SCD typu 2 se zdrojovými daty CDF
Následující části obsahují příklady dotazů typu SCD typu 1 a 2, které aktualizují cílové tabulky na základě zdrojových událostí z datového kanálu změn, které:
- Vytvoří nové záznamy uživatelů.
- Odstraní záznam uživatele.
- Aktualizuje záznamy uživatelů. V příkladu SCD typu 1 dorazí poslední
UPDATEoperace příliš pozdě a jsou vyřazeny z cílové tabulky, což demonstruje zpracování událostí mimo pořadí.
Následující příklady předpokládají znalost konfigurace a aktualizace kanálů. Viz kurz: Vytvoření kanálu ETL pomocí zachytávání dat změn
Abyste mohli tyto příklady spustit, musíte začít vytvořením ukázkové datové sady. Viz Generování testovacích dat.
Tady jsou vstupní záznamy pro tyto příklady:
| userId | název | město | operace | číslo sekvence |
|---|---|---|---|---|
| 124 | Raul | Oaxaca | INSERT | 1 |
| 123 | Isabel | Monterrey | INSERT | 1 |
| 125 | Mercedes | Tijuana | INSERT | 2 |
| 126 | Lilie | Cancun | INSERT | 2 |
| 123 | null | null | DELETE | 6 |
| 125 | Mercedes | Guadalajara | UPDATE | 6 |
| 125 | Mercedes | Mexicali | UPDATE | 5 |
| 123 | Isabel | Chihuahua | UPDATE | 5 |
Pokud odkomentujete poslední řádek v ukázkových datech, vloží se následující záznam, který určuje, kam se mají záznamy zkrátit:
| userId | název | město | operace | číslo sekvence |
|---|---|---|---|---|
| null | null | null | ZKRÁTIT | 3 |
Poznámka:
Všechny následující příklady zahrnují možnosti pro určení obou DELETETRUNCATE operací, ale každý z nich je volitelný.
Zpracování aktualizací typu SCD typu 1
Následující příklad ukazuje zpracování aktualizací typu SCD 1:
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("cdc_data.users")
dp.create_streaming_table("target")
dp.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW flowname AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
Po spuštění příkladu typu SCD 1 obsahuje cílová tabulka následující záznamy:
| userId | název | město |
|---|---|---|
| 124 | Raul | Oaxaca |
| 125 | Mercedes | Guadalajara |
| 126 | Lilie | Cancun |
Po spuštění příkladu SCD typu 1 s dalším TRUNCATE záznamem jsou záznamy 124 a 126 zkráceny kvůli operaci TRUNCATE v sequenceNum=3 a cílová tabulka obsahuje následující záznam:
| userId | název | město |
|---|---|---|
| 125 | Mercedes | Guadalajara |
Zpracování aktualizací typu SCD typu 2
Následující příklad ukazuje zpracování aktualizací typu 2 SCD:
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("cdc_data.users")
dp.create_streaming_table("target")
dp.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW target_flow
AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
Po spuštění příkladu typu SCD 2 obsahuje cílová tabulka následující záznamy:
| userId | název | město | __START_AT | __END_AT |
|---|---|---|---|---|
| 123 | Isabel | Monterrey | 1 | 5 |
| 123 | Isabel | Chihuahua | 5 | 6 |
| 124 | Raul | Oaxaca | 1 | null |
| 125 | Mercedes | Tijuana | 2 | 5 |
| 125 | Mercedes | Mexicali | 5 | 6 |
| 125 | Mercedes | Guadalajara | 6 | null |
| 126 | Lilie | Cancun | 2 | null |
Dotaz TYPU 2 SCD může také určit podmnožinu výstupních sloupců, které se mají sledovat pro historii v cílové tabulce. Změny v jiných sloupcích se aktualizují místo generování nových záznamů historie. Následující příklad ukazuje vyloučení city sloupce ze sledování:
Následující příklad ukazuje použití historie sledování s SCD typu 2:
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("cdc_data.users")
dp.create_streaming_table("target")
dp.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW target_flow
AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
Po spuštění tohoto příkladu bez dalšího TRUNCATE záznamu obsahuje cílová tabulka následující záznamy:
| userId | název | město | __START_AT | __END_AT |
|---|---|---|---|---|
| 123 | Isabel | Chihuahua | 1 | 6 |
| 124 | Raul | Oaxaca | 1 | null |
| 125 | Mercedes | Guadalajara | 2 | null |
| 126 | Lilie | Cancun | 2 | null |
Generování testovacích dat
Níže uvedený kód slouží k vygenerování ukázkové datové sady pro použití v ukázkových dotazech, které jsou přítomné v tomto kurzu. Za předpokladu, že máte správné přihlašovací údaje k vytvoření nového schématu a vytvoření nové tabulky, můžete tyto příkazy spustit pomocí poznámkového bloku nebo Databricks SQL. Následující kód není určen ke spuštění jako součást definice kanálu:
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
Příklad: Pravidelné zpracování snímků
Následující příklad ukazuje zpracování SCD typu 2, které zpracovává snímky tabulky uložené v mycatalog.myschema.mytable. Výsledky zpracování se zapisují do tabulky s názvem target.
mycatalog.myschema.mytable záznamy v časovém razítku 2024-01-01 00:00:00
| Key | Hodnota |
|---|---|
| 1 | a1 |
| 2 | a2 |
mycatalog.myschema.mytable záznamy v časovém razítku 2024-01-01 12:00:00
| Key | Hodnota |
|---|---|
| 2 | b2 |
| 3 | a3 |
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.table("mycatalog.myschema.mytable")
dp.create_streaming_table("target")
dp.create_auto_cdc_from_snapshot_flow(
target="target",
source="source",
keys=["key"],
stored_as_scd_type=2
)
Po zpracování snímků obsahuje cílová tabulka následující záznamy:
| Key | Hodnota | __START_AT | __END_AT |
|---|---|---|---|
| 1 | a1 | 01.01.2024 00:00:00 | 2024-01-01 12:00:00 |
| 2 | a2 | 01.01.2024 0:00 | 2024-01-01 12:00:00 |
| 2 | b2 | 2024-01-01 12:00:00 | null |
| 3 | a3 | 2024-01-01 12:00:00 | null |
Příklad: Historické zpracování snímků
Následující příklad ukazuje zpracování typu SCD 2, které aktualizuje cílovou tabulku na základě zdrojových událostí ze dvou snímků uložených v systému cloudového úložiště:
Snímek v timestamp, uložený v /<PATH>/filename1.csv
| Key | Sledovací sloupec | SloupecBezSledování |
|---|---|---|
| 1 | a1 | b1 |
| 2 | a2 | b2 |
| 4 | a4 | b4 |
Snímek v timestamp + 5, uložený v /<PATH>/filename2.csv
| Key | Sledovací sloupec | SloupecBezSledování |
|---|---|---|
| 2 | a2_new | b2 |
| 3 | a3 | b3 |
| 4 | a4 | b4_new |
Následující příklad kódu ukazuje zpracování aktualizací SCD typu 2 pomocí těchto snímků:
from pyspark import pipelines as dp
def exist(file_name):
# Storage system-dependent function that returns true if file_name exists, false otherwise
# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
latest_snapshot_version = latest_snapshot_version or 0
next_version = latest_snapshot_version + 1
file_name = "dir_path/filename_" + next_version + ".csv"
if (exist(file_name)):
return (spark.read.load(file_name), next_version)
else:
# No snapshot available
return None
dp.create_streaming_live_table("target")
dp.create_auto_cdc_from_snapshot_flow(
target = "target",
source = next_snapshot_and_version,
keys = ["Key"],
stored_as_scd_type = 2,
track_history_column_list = ["TrackingCol"]
)
Po zpracování snímků obsahuje cílová tabulka následující záznamy:
| Key | Sledovací sloupec | SloupecBezSledování | __START_AT | __END_AT |
|---|---|---|---|---|
| 1 | a1 | b1 | 1 | 2 |
| 2 | a2 | b2 | 1 | 2 |
| 2 | a2_new | b2 | 2 | null |
| 3 | a3 | b3 | 2 | null |
| 4 | a4 | b4_new | 1 | null |
Přidání, změna nebo odstranění dat v cílové streamovací tabulce
Pokud kanál publikuje tabulky do katalogu Unity, můžete použít příkazy jazyka DML ( Data Manipulat Language ), včetně příkazů vložení, aktualizace, odstranění a sloučení, a upravit cílové tabulky streamování vytvořené příkazy AUTO CDC ... INTO .
Poznámka:
- Příkazy DML, které upravují schéma tabulky streamované tabulky, nejsou podporovány. Ujistěte se, že se příkazy DML nepokoušnou vyvíjet schéma tabulky.
- Příkazy DML, které aktualizují streamovací tabulku, lze spustit pouze ve sdíleném clusteru Unity Catalog nebo v SQL warehouse s použitím Databricks Runtime 13.3 LTS a vyšší.
- Vzhledem k tomu, že streamování vyžaduje pouze připojované zdroje dat, vyžaduje-li vaše zpracování streamování ze zdrojové streamovací tabulky, která obsahuje změny (například prostřednictvím příkazů DML), nastavte příznak skipChangeCommits při čtení zdrojové streamovací tabulky. Při nastavení
skipChangeCommitsbudou transakce, které odstraňují nebo upravují záznamy ve zdrojové tabulce, ignorovány. Pokud vaše zpracování nevyžaduje streamovací tabulku, můžete jako cílovou tabulku použít materializované zobrazení (které nemá omezení pouze pro přidání).
Vzhledem k tomu, že deklarativní kanály Sparku Lakeflow používají zadaný SEQUENCE BY sloupec a šíří odpovídající hodnoty sekvencování do __START_AT sloupců __END_AT cílové tabulky (pro SCD typu 2), je nutné zajistit, aby příkazy DML používaly platné hodnoty pro tyto sloupce, aby zachovaly správné pořadí záznamů. Podívejte se, jak je CDC implementováno pomocí rozhraní API AUTO CDC?.
Další informace o použití příkazů DML se streamovanými tabulkami najdete v tématu Přidání, změna nebo odstranění dat v streamované tabulce.
Následující příklad vloží aktivní záznam s počáteční sekvencí 5:
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);
Přečíst datový vstup změn z cílové tabulky AUTO CDC
Ve službě Databricks Runtime 15.2 a novějších můžete číst datový proud změn z tabulky streamování, na kterou jsou směrovány dotazy AUTO CDC nebo AUTO CDC FROM SNAPSHOT, stejným způsobem jako čtete datový proud změn z jiných tabulek Delta. Pro čtení datového kanálu změn z cílové tabulky streamování jsou potřeba následující:
- Cílová streamovací tabulka musí být publikovaná v katalogu Unity. Podívejte se na Použití katalogu Unity s kanály.
- Pokud chcete číst datový kanál změn z cílové streamovací tabulky, musíte použít Databricks Runtime 15.2 nebo vyšší. Pokud chcete číst datový kanál změn v jiném potrubí, musí být potrubí nakonfigurované tak, aby používalo Databricks Runtime 15.2 nebo vyšší.
Datový kanál změn si přečtete z cílové streamovací tabulky vytvořené v deklarativních kanálech Sparku Lakeflow stejným způsobem jako čtení datového kanálu změn z jiných tabulek Delta. Další informace o používání funkce datového kanálu Delta Lake pro změny, včetně příkladů v Pythonu a SQL, najdete v článku Použití Delta Lake pro datový kanál změn na Azure Databricks.
Poznámka:
Záznam datového kanálu změn obsahuje metadata identifikující typ události změny. Když se záznam aktualizuje v tabulce, metadata přidružených záznamů změn obvykle obsahují _change_type hodnoty nastavené na update_preimage a update_postimage události.
Tyto hodnoty se ale liší, pokud jsou provedeny aktualizace cílové tabulky streamování, které zahrnují změnu hodnot primárního klíče _change_type. Pokud změny zahrnují aktualizace primárních klíčů, pole metadat _change_type jsou nastavena na insert a delete události. Změny primárních klíčů mohou nastat, když se provádí ruční aktualizace jednoho z klíčových polí prostřednictvím příkazu UPDATE nebo MERGE. U tabulek typu SCD 2 mohou nastat změny, když se pole __start_at změní, aby odráželo dřívější hodnotu počáteční sekvence.
Dotaz AUTO CDC určuje hodnoty primárního klíče, které se liší pro zpracování typu SCD 1 a SCD typu 2:
- Pro zpracování SCD typu 1 a rozhraní Pythonu pro kanály je hodnota parametru
keysve funkcicreate_auto_cdc_flow()primárním klíčem. Pro rozhraní SQL je primární klíč sloupce definovanéKEYSklauzulí vAUTO CDC ... INTOpříkazu. - Pro SCD typu 2 je
keysprimárním klíčem parametr neboKEYSklauzule plus návratová hodnota zcoalesce(__START_AT, __END_AT)operace, kde__START_ATa__END_ATjsou odpovídající sloupce z cílové streamovací tabulky.
Získání dat o záznamech zpracovaných CDC dotazem v pipelinech
Poznámka:
Následující metriky jsou zachyceny pouze AUTO CDC dotazy, nikoli AUTO CDC FROM SNAPSHOT dotazy.
Následující metriky jsou získávány pomocí dotazů AUTO CDC.
-
num_upserted_rows: Počet výstupních řádků přenesených do datové sady během aktualizace. -
num_deleted_rows: Počet existujících výstupních řádků odstraněných z datové sady během aktualizace.
Metrika num_output_rows, výstup pro toky bez CDC, se pro dotazy nezachytává AUTO CDC.
Jaké datové objekty se používají ke zpracování CDC v datovém toku?
Poznámka:
- Tyto datové struktury se vztahují pouze na
AUTO CDCzpracování, nikoliAUTO CDC FROM SNAPSHOTzpracování. - Tyto datové struktury platí pouze v případech, kdy je cílová tabulka publikovaná v metastoru Hive. Pokud pipeline publikuje do katalogu Unity, jsou interní podkladové tabulky pro uživatele nepřístupné.
Když deklarujete cílovou tabulku v metastoru Hive, vytvoří se dvě datové struktury:
- Zobrazení s názvem přiřazeným k cílové tabulce.
- Interní zálohovací tabulka používaná kanálem ke správě zpracování CDC. Tato tabulka je pojmenovaná tak, že se předsadí
__apply_changes_storage_na název cílové tabulky.
Pokud například deklarujete cílovou tabulku s názvem dp_cdc_target, zobrazí se v metastoru zobrazení s názvem dp_cdc_target a tabulka s názvem __apply_changes_storage_dp_cdc_target . Vytvoření zobrazení umožňuje deklarativním kanálům Lakeflow Spark vyfiltrovat dodatečné informace (například náhrobky a verze) potřebné k zpracování dat mimo pořadí. Pokud chcete zobrazit zpracovávaná data, zadejte dotaz na cílové zobrazení. Vzhledem k tomu, že schéma __apply_changes_storage_ tabulky se může změnit tak, aby podporovalo budoucí funkce nebo vylepšení, neměli byste se dotazovat na tabulku pro použití v produkčním prostředí. Pokud do tabulky přidáte data ručně, předpokládá se, že záznamy přicházejí před dalšími změnami, protože sloupce verze chybí.