De APPLY CHANGES-API's: Het vastleggen van wijzigingsgegevens vereenvoudigen met Delta Live Tables
Delta Live Tables vereenvoudigt het vastleggen van wijzigingen van gegevens (CDC) met de APPLY CHANGES
en APPLY CHANGES FROM SNAPSHOT
API's. De interface die u gebruikt, is afhankelijk van de bron van wijzigingsgegevens:
- Gebruik
APPLY CHANGES
dit om wijzigingen te verwerken vanuit een CDF (Change Data Feed). - Gebruik
APPLY CHANGES FROM SNAPSHOT
(openbare preview) om wijzigingen in momentopnamen van databases te verwerken.
Voorheen werd de instructie vaak gebruikt voor het MERGE INTO
verwerken van CDC-records in Azure Databricks. MERGE INTO
Kan echter onjuiste resultaten produceren vanwege niet-opeenvolgende records of complexe logica vereist om records opnieuw te ordenen.
De APPLY CHANGES
API wordt ondersteund in de SQL- en Python-interfaces van Delta Live Tables. De APPLY CHANGES FROM SNAPSHOT
API wordt ondersteund in de Python-interface van Delta Live Tables.
Tabellen APPLY CHANGES
bijwerken met APPLY CHANGES FROM SNAPSHOT
SCD-type 1 en type 2 ondersteunen:
- Gebruik SCD-type 1 om records rechtstreeks bij te werken. Geschiedenis wordt niet bewaard voor bijgewerkte records.
- Gebruik SCD-type 2 om een geschiedenis van records te bewaren, op alle updates of op updates voor een opgegeven set kolommen.
Zie voor syntaxis en andere verwijzingen:
- Gegevens vastleggen vanuit een wijzigingenfeed wijzigen met Python in Delta Live Tables
- Gegevens vastleggen wijzigen met SQL in Delta Live Tables
Notitie
In dit artikel wordt beschreven hoe u tabellen in uw Delta Live Tables-pijplijn bijwerkt op basis van wijzigingen in brongegevens. Zie Delta Lake-wijzigingenfeed gebruiken in Azure Databricks voor informatie over het vastleggen en opvragen van gegevens op rijniveau voor Delta-tabellen.
Vereisten
Als u de CDC-API's wilt gebruiken, moet uw pijplijn zijn geconfigureerd voor het gebruik van serverloze DLT-pijplijnen of de Delta Live Tables Pro
of Advanced
edities.
Hoe wordt CDC geïmplementeerd met de APPLY CHANGES
API?
Door automatisch verouderde records te verwerken, zorgt de APPLY CHANGES
API in Delta Live Tables ervoor dat CDC-records correct worden verwerkt en hoeft u geen complexe logica te ontwikkelen voor het verwerken van out-of-sequence records. U moet een kolom opgeven in de brongegevens waarop records moeten worden gesequentieerd, wat Delta Live Tables interpreteert als een monotonische weergave van de juiste volgorde van de brongegevens. Delta Live Tables verwerkt automatisch gegevens die niet in orde zijn. Voor WIJZIGINGEN in SCD-type 2 worden in Delta Live-tabellen de juiste sequentiërende waarden doorgegeven aan de en kolommen van __START_AT
__END_AT
de doeltabel. Er moet één afzonderlijke update per sleutel zijn bij elke sequentiërende waarde en NULL-sequentiërende waarden worden niet ondersteund.
Als u CDC-verwerking wilt APPLY CHANGES
uitvoeren, maakt u eerst een streamingtabel en gebruikt u vervolgens de APPLY CHANGES INTO
instructie in SQL of de apply_changes()
functie in Python om de bron, sleutels en sequentiëren voor de wijzigingenfeed op te geven. Als u de doelstreamingtabel wilt maken, gebruikt u de CREATE OR REFRESH STREAMING TABLE
instructie in SQL of de create_streaming_table()
functie in Python. Zie de scd-type 1 en type 2 verwerkingsvoorbeelden .
Zie de SQL-verwijzing voor Delta Live Tables of Python voor syntaxisdetails.
Hoe wordt CDC geïmplementeerd met de APPLY CHANGES FROM SNAPSHOT
API?
Belangrijk
De APPLY CHANGES FROM SNAPSHOT
API bevindt zich in openbare preview.
APPLY CHANGES FROM SNAPSHOT
is een declaratieve API die efficiënt wijzigingen in brongegevens bepaalt door een reeks momentopnamen in volgorde te vergelijken en vervolgens de verwerking uit te voeren die nodig is voor CDC-verwerking van de records in de momentopnamen. APPLY CHANGES FROM SNAPSHOT
wordt alleen ondersteund door de Python-interface van Delta Live Tables.
APPLY CHANGES FROM SNAPSHOT
ondersteunt het opnemen van momentopnamen van meerdere brontypen:
- Gebruik periodieke opname van momentopnamen om momentopnamen op te nemen uit een bestaande tabel of weergave.
APPLY CHANGES FROM SNAPSHOT
heeft een eenvoudige, gestroomlijnde interface voor het periodiek opnemen van momentopnamen van een bestaand databaseobject. Er wordt een nieuwe momentopname opgenomen bij elke pijplijnupdate en de opnametijd wordt gebruikt als de momentopnameversie. Wanneer een pijplijn wordt uitgevoerd in de continue modus, worden meerdere momentopnamen opgenomen met elke pijplijnupdate op een periode die wordt bepaald door de instelling voor het triggerinterval voor de stroom die de VERWERKING VAN MOMENTOPNAMEN TOEPASSEN bevat. - Gebruik historische momentopnameopname voor het verwerken van bestanden met databasemomentopnamen, zoals momentopnamen die zijn gegenereerd op basis van een Oracle- of MySQL-database of een datawarehouse.
Als u CDC-verwerking wilt uitvoeren op basis van een brontype, APPLY CHANGES FROM SNAPSHOT
maakt u eerst een streamingtabel en gebruikt u vervolgens de apply_changes_from_snapshot()
functie in Python om de momentopname, sleutels en andere argumenten op te geven die nodig zijn om de verwerking te implementeren. Zie de voorbeelden van periodieke opname van momentopnamen en historische momentopnamen.
De momentopnamen die aan de API worden doorgegeven, moeten in oplopende volgorde zijn op basis van versie. Als Delta Live Tables een momentopname detecteert die niet in orde is, wordt er een fout gegenereerd.
Zie de Python-naslaginformatie over Delta Live Tables voor syntaxis.
Beperkingen
De kolom die wordt gebruikt voor sequentiëren, moet een sorteerbaar gegevenstype zijn.
Voorbeeld: SCD-type 1 en SCD-type 2 verwerken met CDF-brongegevens
De volgende secties bevatten voorbeelden van SCD-type 1 van Delta Live Tables en type 2-query's waarmee doeltabellen worden bijgewerkt op basis van brongebeurtenissen uit een wijzigingengegevensfeed die:
- Hiermee maakt u nieuwe gebruikersrecords.
- Hiermee verwijdert u een gebruikersrecord.
- Hiermee worden gebruikersrecords bijgewerkt. In het SCD-type 1-voorbeeld komen de laatste
UPDATE
bewerkingen te laat en worden ze verwijderd uit de doeltabel, waarin de verwerking van out-of-ordergebeurtenissen wordt gedemonstreerd.
In de volgende voorbeelden wordt ervan uitgegaan dat u bekend bent met het configureren en bijwerken van Pijplijnen voor Delta Live Tables. Zie zelfstudie: Uw eerste Delta Live Tables-pijplijn uitvoeren.
Als u deze voorbeelden wilt uitvoeren, moet u beginnen met het maken van een voorbeeldgegevensset. Zie Testgegevens genereren.
Hier volgen de invoerrecords voor deze voorbeelden:
userId | naam | plaats | schakelapparatuur optimaliseren | sequenceNum |
---|---|---|---|---|
124 | Raul | Oaxaca | INSERT | 1 |
123 | Isabel | Monterrey | INSERT | 1 |
125 | Mercedes | Tijuana | INSERT | 2 |
126 | Lelie | Cancun | INSERT | 2 |
123 | Nul | Nul | DELETE | 6 |
125 | Mercedes | Guadalajara | UPDATE | 6 |
125 | Mercedes | Mexicali | UPDATE | 5 |
123 | Isabel | Chihuahua | UPDATE | 5 |
Als u de opmerking bij de laatste rij in de voorbeeldgegevens ongedaan maakt, wordt de volgende record ingevoegd die aangeeft waar records moeten worden afgekapt:
userId | naam | plaats | schakelapparatuur optimaliseren | sequenceNum |
---|---|---|---|---|
Nul | Nul | Nul | TRUNCATE | 3 |
Notitie
Alle volgende voorbeelden bevatten opties om zowel als DELETE
TRUNCATE
bewerkingen op te geven, maar elk is optioneel.
SCD-updates van type 1 verwerken
In het volgende voorbeeld ziet u hoe SCD type 1-updates worden verwerkt:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
Nadat het SCD-type 1-voorbeeld is uitgevoerd, bevat de doeltabel de volgende records:
userId | naam | plaats |
---|---|---|
124 | Raul | Oaxaca |
125 | Mercedes | Guadalajara |
126 | Lelie | Cancun |
Nadat u het SCD-voorbeeld 1 met de extra TRUNCATE
record hebt uitgevoerd, worden records 124
en 126
afgekapt vanwege de TRUNCATE
bewerking op sequenceNum=3
en bevat de doeltabel de volgende record:
userId | naam | plaats |
---|---|---|
125 | Mercedes | Guadalajara |
SCD-updates van type 2 verwerken
In het volgende voorbeeld ziet u hoe scd-type 2-updates worden verwerkt:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
Nadat het SCD-type 2-voorbeeld is uitgevoerd, bevat de doeltabel de volgende records:
userId | naam | plaats | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Monterrey | 1 | 5 |
123 | Isabel | Chihuahua | 5 | 6 |
124 | Raul | Oaxaca | 1 | Nul |
125 | Mercedes | Tijuana | 2 | 5 |
125 | Mercedes | Mexicali | 5 | 6 |
125 | Mercedes | Guadalajara | 6 | Nul |
126 | Lelie | Cancun | 2 | Nul |
Een SCD-query van het type 2 kan ook een subset van uitvoerkolommen opgeven die moeten worden bijgehouden voor de geschiedenis in de doeltabel. Wijzigingen in andere kolommen worden bijgewerkt in plaats van nieuwe geschiedenisrecords te genereren. In het volgende voorbeeld ziet u hoe u de city
kolom niet kunt bijhouden:
In het volgende voorbeeld ziet u hoe u geschiedenis bijhouden gebruikt met SCD-type 2:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
Nadat u dit voorbeeld zonder de extra TRUNCATE
record hebt uitgevoerd, bevat de doeltabel de volgende records:
userId | naam | plaats | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Chihuahua | 1 | 6 |
124 | Raul | Oaxaca | 1 | Nul |
125 | Mercedes | Guadalajara | 2 | Nul |
126 | Lelie | Cancun | 2 | Nul |
Testgegevens genereren
De onderstaande code wordt gegeven om een voorbeeldgegevensset te genereren voor gebruik in de voorbeeldquery's die in deze zelfstudie aanwezig zijn. Ervan uitgaande dat u over de juiste referenties beschikt om een nieuw schema te maken en een nieuwe tabel te maken, kunt u deze instructies uitvoeren met een notebook of Databricks SQL. De volgende code is niet bedoeld om te worden uitgevoerd als onderdeel van een Delta Live Tables-pijplijn:
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
Voorbeeld: Periodieke verwerking van momentopnamen
In het volgende voorbeeld ziet u de SCD-verwerking van het type 2 waarmee momentopnamen van een tabel worden opgenomen die zijn opgeslagen op mycatalog.myschema.mytable
. De resultaten van de verwerking worden geschreven naar een tabel met de naam target
.
mycatalog.myschema.mytable
records op de tijdstempel 2024-01-01 00:00:00
Sleutel | Weergegeven als |
---|---|
1 | a1 |
2 | a2 |
mycatalog.myschema.mytable
records op de tijdstempel 2024-01-01 12:00:00
Sleutel | Weergegeven als |
---|---|
2 | b2 |
3 | a3 |
import dlt
@dlt.view(name="source")
def source():
return spark.read.table("mycatalog.myschema.mytable")
dlt.create_streaming_table("target")
dlt.apply_changes_from_snapshot(
target="target",
source="source",
keys=["key"],
stored_as_scd_type=2
)
Nadat de momentopnamen zijn verwerkt, bevat de doeltabel de volgende records:
Sleutel | Weergegeven als | __START_AT | __END_AT |
---|---|---|---|
1 | a1 | 2024-01-01 00:00:00 | 2024-01-01 12:00:00 |
2 | a2 | 2024-01-01 00:00:00 | 2024-01-01 12:00:00 |
2 | b2 | 2024-01-01 12:00:00 | Nul |
3 | a3 | 2024-01-01 12:00:00 | Nul |
Voorbeeld: Verwerking van historische momentopnamen
In het volgende voorbeeld ziet u de SCD-verwerking van het type 2 waarmee een doeltabel wordt bijgewerkt op basis van brongebeurtenissen van twee momentopnamen die zijn opgeslagen in een cloudopslagsysteem:
Momentopname op timestamp
, opgeslagen in /<PATH>/filename1.csv
Sleutel | TrackingColumn | NonTrackingColumn |
---|---|---|
1 | a1 | b1 |
2 | a2 | b2 |
4 | a4 | b4 |
Momentopname op timestamp + 5
, opgeslagen in /<PATH>/filename2.csv
Sleutel | TrackingColumn | NonTrackingColumn |
---|---|---|
2 | a2_new | b2 |
3 | a3 | b3 |
4 | a4 | b4_new |
In het volgende codevoorbeeld ziet u hoe scd-type 2-updates worden verwerkt met deze momentopnamen:
import dlt
def exist(file_name):
# Storage system-dependent function that returns true if file_name exists, false otherwise
# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
latest_snapshot_version = latest_snapshot_version or 0
next_version = latest_snapshot_version + 1
file_name = "dir_path/filename_" + next_version + ".csv"
if (exist(file_name)):
return (spark.read.load(file_name), next_version)
else:
# No snapshot available
return None
dlt.create_streaming_live_table("target")
dlt.apply_changes_from_snapshot(
target = "target",
source = next_snapshot_and_version,
keys = ["Key"],
stored_as_scd_type = 2,
track_history_column_list = ["TrackingCol"]
)
Nadat de momentopnamen zijn verwerkt, bevat de doeltabel de volgende records:
Sleutel | TrackingColumn | NonTrackingColumn | __START_AT | __END_AT |
---|---|---|---|---|
1 | a1 | b1 | 1 | 2 |
2 | a2 | b2 | 1 | 2 |
2 | a2_new | b2 | 2 | Nul |
3 | a3 | b3 | 2 | Nul |
4 | a4 | b4_new | 1 | Nul |
Gegevens toevoegen, wijzigen of verwijderen in een doelstreamingtabel
Als uw pijplijn tabellen publiceert naar Unity Catalog, kunt u DML-instructies (Data Manipulat Language) gebruiken, zoals invoeg-, update-, verwijder- en samenvoeginstructies om de doelstreamingtabellen te wijzigen die zijn gemaakt door APPLY CHANGES INTO
instructies.
Notitie
- DML-instructies die het tabelschema van een streamingtabel wijzigen, worden niet ondersteund. Zorg ervoor dat uw DML-instructies niet proberen het tabelschema te ontwikkelen.
- DML-instructies die een streamingtabel bijwerken, kunnen alleen worden uitgevoerd in een gedeeld Unity Catalog-cluster of een SQL-warehouse met Databricks Runtime 13.3 LTS en hoger.
- Omdat streaming alleen toevoeggegevensbronnen vereist, stelt u de vlag skipChangeCommits in bij het lezen van de bronstreamingtabel wanneer u de bronstreamingtabel leest. Wanneer
skipChangeCommits
deze is ingesteld, worden transacties die records in de brontabel verwijderen of wijzigen genegeerd. Als voor uw verwerking geen streamingtabel is vereist, kunt u een gerealiseerde weergave (die niet beschikt over de beperking voor alleen toevoegen) gebruiken als doeltabel.
Omdat Delta Live Tables een opgegeven SEQUENCE BY
kolom gebruikt en de juiste sequentiërende waarden doorgeeft aan de __START_AT
en __END_AT
kolommen van de doeltabel (voor SCD-type 2), moet u ervoor zorgen dat DML-instructies geldige waarden voor deze kolommen gebruiken om de juiste volgorde van records te behouden. Zie Hoe wordt CDC geïmplementeerd met de APPLY CHANGES API?
Zie Gegevens toevoegen, wijzigen of verwijderen in een streamingtabel voor meer informatie over het gebruik van DML-instructies met streamingtabellen.
In het volgende voorbeeld wordt een actieve record ingevoegd met een beginvolgorde van 5:
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);
Een wijzigingsgegevensfeed lezen uit een APPLY CHANGES
doeltabel
In Databricks Runtime 15.2 en hoger kunt u een gegevensfeed voor wijzigingen lezen uit een streamingtabel die het doel is van APPLY CHANGES
of APPLY CHANGES FROM SNAPSHOT
query's op dezelfde manier als u een gegevensfeed van andere Delta-tabellen leest. Het volgende is vereist om de wijzigingengegevensfeed te lezen uit een doelstreamingtabel:
- De doelstreamingtabel moet worden gepubliceerd naar Unity Catalog. Zie Unity Catalog gebruiken met uw Delta Live Tables-pijplijnen.
- Als u de wijzigingengegevensfeed wilt lezen uit de doelstreamingtabel, moet u Databricks Runtime 15.2 of hoger gebruiken. Als u de wijzigingengegevensfeed in een andere Delta Live Tables-pijplijn wilt lezen, moet de pijplijn zijn geconfigureerd voor het gebruik van Databricks Runtime 15.2 of hoger.
U leest de wijzigingengegevensfeed uit een doelstreamingtabel die is gemaakt in een Delta Live Tables-pijplijn op dezelfde manier als het lezen van een wijzigingengegevensfeed uit andere Delta-tabellen. Zie Delta Lake-wijzigingsgegevensfeeds gebruiken in Azure Databricks voor meer informatie over het gebruik van de delta-functionaliteit voor wijzigingenfeeds, waaronder voorbeelden in Python en SQL.
Notitie
De record voor de wijzigingengegevensfeed bevat metagegevens waarmee het type wijzigingsevenement wordt geïdentificeerd. Wanneer een record wordt bijgewerkt in een tabel, bevatten _change_type
de metagegevens voor de gekoppelde wijzigingsrecords doorgaans waarden die zijn ingesteld update_preimage
op en update_postimage
gebeurtenissen.
De _change_type
waarden verschillen echter als er updates worden aangebracht in de doelstreamingtabel, waaronder het wijzigen van primaire-sleutelwaarden. Wanneer wijzigingen updates voor primaire sleutels bevatten, worden de _change_type
metagegevensvelden ingesteld op insert
en delete
gebeurtenissen. Wijzigingen in primaire sleutels kunnen optreden wanneer handmatige updates worden aangebracht in een van de sleutelvelden met een of instructie UPDATE
of MERGE
, voor SCD-type 2-tabellen, wanneer het __start_at
veld wordt gewijzigd in een eerdere beginvolgordewaarde.
De APPLY CHANGES
query bepaalt de primaire-sleutelwaarden, die verschillen voor SCD-type 1- en SCD-type 2-verwerking:
- Voor SCD type 1-verwerking en de Python-interface van Delta Live Tables is de primaire sleutel de waarde van de
keys
parameter in deapply_changes()
functie. Voor de SQL-interface van Delta Live Tables is de primaire sleutel de kolommen die zijn gedefinieerd door deKEYS
component in deAPPLY CHANGES INTO
instructie. - Voor SCD-type 2 is de primaire sleutel de
keys
parameter ofKEYS
component plus de retourwaarde van decoalesce(__START_AT, __END_AT)
bewerking, waarbij__START_AT
en__END_AT
de bijbehorende kolommen uit de doelstreamingtabel zijn.
Gegevens ophalen over records die worden verwerkt door een CDC-query voor Delta Live Tables
Notitie
De volgende metrische gegevens worden alleen vastgelegd door APPLY CHANGES
query's en niet door APPLY CHANGES FROM SNAPSHOT
query's.
De volgende metrische gegevens worden vastgelegd door APPLY CHANGES
query's:
num_upserted_rows
: Het aantal uitvoerrijen dat tijdens een update in de gegevensset is opgeslagen.num_deleted_rows
: Het aantal bestaande uitvoerrijen dat tijdens een update uit de gegevensset is verwijderd.
De num_output_rows
metrische gegevens, uitvoer voor niet-CDC-stromen, worden niet vastgelegd voor apply changes
query's.
Welke gegevensobjecten worden gebruikt voor CDC-verwerking van Delta Live Tables?
Opmerking: De volgende gegevensstructuren zijn alleen van toepassing op APPLY CHANGES
verwerking, niet APPLY CHANGES FROM SNAPSHOT
op verwerking.
Wanneer u de doeltabel in de Hive-metastore declareert, worden er twee gegevensstructuren gemaakt:
- Een weergave met de naam die is toegewezen aan de doeltabel.
- Een interne back-uptabel die wordt gebruikt door Delta Live Tables voor het beheren van CDC-verwerking. Deze tabel krijgt een naam door de naam van de doeltabel voor te geven
__apply_changes_storage_
.
Als u bijvoorbeeld een doeltabel met de naam dlt_cdc_target
declareert, ziet u een weergave met de naam dlt_cdc_target
en een tabel met de naam __apply_changes_storage_dlt_cdc_target
in de metastore. Door een weergave te maken, kunnen Delta Live Tables de extra informatie (bijvoorbeeld tombstones en versies) filteren die nodig zijn om out-of-ordergegevens af te handelen. Als u de verwerkte gegevens wilt weergeven, voert u een query uit op de doelweergave. Omdat het schema van de __apply_changes_storage_
tabel kan veranderen om toekomstige functies of verbeteringen te ondersteunen, moet u geen query's uitvoeren op de tabel voor productiegebruik. Als u gegevens handmatig aan de tabel toevoegt, wordt ervan uitgegaan dat de records vóór andere wijzigingen komen omdat de versiekolommen ontbreken.
Als een pijplijn naar Unity Catalog publiceert, zijn de interne backingtabellen niet toegankelijk voor gebruikers.