Förenklad insamling av ändringsdata med API:et APPLY CHANGES
i Delta Live Tables
Delta Live Tables förenklar insamling av ändringsdata (CDC) med API:et APPLY CHANGES
. Tidigare användes -instruktionen MERGE INTO
ofta för bearbetning av CDC-poster på Azure Databricks. Kan dock MERGE INTO
ge felaktiga resultat på grund av poster som inte är sekvenserade eller kräva komplex logik för att ordna om poster.
Genom att automatiskt hantera out-of-sequence-poster säkerställer API:et APPLY CHANGES
i Delta Live Tables korrekt bearbetning av CDC-poster och tar bort behovet av att utveckla komplex logik för hantering av out-of-sequence-poster.
APPLY CHANGES
API:et stöds i SQL- och Python-gränssnitten Delta Live Tables, inklusive stöd för uppdatering av tabeller med SCD-typ 1 och typ 2:
- Använd SCD typ 1 för att uppdatera poster direkt. Historiken behålls inte för poster som uppdateras.
- Använd SCD typ 2 för att behålla en historik över poster, antingen på alla uppdateringar eller vid uppdateringar av en angiven uppsättning kolumner.
Syntax och andra referenser finns i:
- Ändra datainsamling med Python i Delta Live Tables
- Ändra datainsamling med SQL i Delta Live Tables
- Kontrollera tombstone-hantering för SCD-typ 1-frågor
Kommentar
Den här artikeln beskriver hur du uppdaterar tabeller i din Delta Live Tables-pipeline baserat på ändringar i källdata. Information om hur du registrerar och frågar ändringsinformation på radnivå för Delta-tabeller finns i Använda Delta Lake-ändringsdataflöde i Azure Databricks.
Hur implementeras CDC med Delta Live Tables?
Du måste ange en kolumn i källdata som ska sekvensera poster, som Delta Live Tables tolkar som en monotont ökande representation av rätt ordning på källdata. Delta Live Tables hanterar automatiskt data som kommer i fel ordning. För SCD Typ 2-ändringar sprider Delta Live Tables lämpliga sekvenseringsvärden till kolumnerna __START_AT
och __END_AT
i måltabellen. Det bör finnas en distinkt uppdatering per nyckel vid varje sekvenseringsvärde och NULL-sekvenseringsvärden stöds inte.
Om du vill utföra CDC-bearbetning med Delta Live Tables skapar du först en strömmande tabell och använder sedan en APPLY CHANGES INTO
-instruktion för att ange källa, nycklar och sekvensering för ändringsflödet. Om du vill skapa måluppspelningstabellen använder du -instruktionen CREATE OR REFRESH STREAMING TABLE
create_streaming_table()
i SQL eller funktionen i Python. Om du vill skapa instruktionen som definierar CDC-bearbetningen använder du -instruktionen APPLY CHANGES
apply_changes()
i SQL eller funktionen i Python. Syntaxinformation finns i Ändra datainsamling med SQL i Delta Live Tables eller Ändra datainsamling med Python i Delta Live Tables.
Vilka dataobjekt används för CDC-bearbetning av Delta Live Tables?
När du deklarerar måltabellen i Hive-metaarkivet skapas två datastrukturer:
- En vy med det namn som tilldelats måltabellen.
- En intern bakgrundstabell som används av Delta Live Tables för att hantera CDC-bearbetning. Den här tabellen namnges genom att prepending
__apply_changes_storage_
till måltabellens namn.
Om du till exempel deklarerar en måltabell med namnet dlt_cdc_target
visas en vy med namnet dlt_cdc_target
och en tabell med namnet __apply_changes_storage_dlt_cdc_target
i metaarkivet. Om du skapar en vy kan Delta Live Tables filtrera bort den extra information (till exempel gravstenar och versioner) som krävs för att hantera data som inte är i ordning. Om du vill visa bearbetade data frågar du målvyn. Eftersom schemat för __apply_changes_storage_
tabellen kan ändras för att stödja framtida funktioner eller förbättringar bör du inte fråga tabellen om produktionsanvändning. Om du lägger till data manuellt i tabellen antas posterna komma före andra ändringar eftersom versionskolumnerna saknas.
Om en pipeline publiceras i Unity Catalog är de interna bakgrundstabellerna inte tillgängliga för användare.
Hämta data om poster som bearbetas av en CDC-fråga för Delta Live Tables
Följande mått samlas in av apply changes
frågor:
num_upserted_rows
: Antalet utdatarader som har ökats till datamängden under en uppdatering.num_deleted_rows
: Antalet befintliga utdatarader som tagits bort från datauppsättningen under en uppdatering.
Måttet num_output_rows
, som är utdata för icke-CDC-flöden, samlas inte in för apply changes
frågor.
Begränsningar
Målet för APPLY CHANGES INTO
frågan eller apply_changes
funktionen kan inte användas som källa för en strömmande tabell. En tabell som läser från målet för en APPLY CHANGES INTO
fråga eller apply_changes
funktion måste vara en materialiserad vy.
SCD-typ 1 och SCD typ 2 på Azure Databricks
Följande avsnitt innehåller exempel som visar SCD-typ 1 för Delta Live Tables och typ 2-frågor som uppdaterar måltabeller baserat på källhändelser som:
- Skapa nya användarposter.
- Ta bort en användarpost.
- Uppdatera användarposter. I exemplet scd typ 1 kommer de sista
UPDATE
åtgärderna sent och tas bort från måltabellen, vilket visar hanteringen av out-of-order-händelser.
I följande exempel förutsätter vi att du är bekant med att konfigurera och uppdatera Delta Live Tables-pipelines. Se Självstudie: Kör din första Delta Live Tables-pipeline.
Om du vill köra de här exemplen måste du börja med att skapa en exempeldatauppsättning. Se Generera testdata.
Följande är indataposterna för dessa exempel:
Användar-ID | name | ort | operation | sequenceNum |
---|---|---|---|---|
124 | Raul | Oaxaca | INSERT | 1 |
123 | Isabel | Monterrey | INSERT | 1 |
125 | Mercedes | Tijuana | INSERT | 2 |
126 | Lily | Cancun | INSERT | 2 |
123 | null | null | DELETE | 6 |
125 | Mercedes | Guadalajara | UPDATE | 6 |
125 | Mercedes | Mexicali | UPDATE | 5 |
123 | Isabel | Chihuahua | UPDATE | 5 |
Om du avkommenterar den sista raden i exempeldata infogas följande post som anger var poster ska trunkeras:
Användar-ID | name | ort | operation | sequenceNum |
---|---|---|---|---|
null | null | null | TRUNCATE | 3 |
Kommentar
Alla följande exempel innehåller alternativ för att ange både DELETE
och åtgärder, men var och TRUNCATE
en av dessa är valfria.
Bearbeta SCD-typ 1-uppdateringar
I följande kodexempel visas bearbetning av SCD typ 1-uppdateringar:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.format("delta").table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
När du har kört SCD typ 1-exemplet innehåller måltabellen följande poster:
Användar-ID | name | ort |
---|---|---|
124 | Raul | Oaxaca |
125 | Mercedes | Guadalajara |
126 | Lily | Cancun |
När du har kört SCD typ 1-exemplet med den ytterligare TRUNCATE
posten, poster 124
och 126
trunkeras på grund av TRUNCATE
åtgärden vid sequenceNum=3
, och måltabellen innehåller följande post:
Användar-ID | name | ort |
---|---|---|
125 | Mercedes | Guadalajara |
Bearbeta SCD-typ 2-uppdateringar
I följande kodexempel visas bearbetning av SCD typ 2-uppdateringar:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.format("delta").table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
När du har kört SCD typ 2-exemplet innehåller måltabellen följande poster:
Användar-ID | name | ort | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Monterrey | 1 | 5 |
123 | Isabel | Chihuahua | 5 | 6 |
124 | Raul | Oaxaca | 1 | NULL |
125 | Mercedes | Tijuana | 2 | 5 |
125 | Mercedes | Mexicali | 5 | 6 |
125 | Mercedes | Guadalajara | 6 | NULL |
126 | Lily | Cancun | 2 | NULL |
En SCD-typ 2-fråga kan också ange en delmängd av utdatakolumner som ska spåras för historik i måltabellen. Ändringar i andra kolumner uppdateras i stället för att nya historikposter genereras. I följande exempel visas hur du city
undantar kolumnen från spårning:
I följande exempel visas hur du använder spårningshistorik med SCD-typ 2:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.format("delta").table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
När du har kört det här exemplet utan den ytterligare TRUNCATE
posten innehåller måltabellen följande poster:
Användar-ID | name | ort | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Chihuahua | 1 | 6 |
124 | Raul | Oaxaca | 1 | NULL |
125 | Mercedes | Guadalajara | 2 | NULL |
126 | Lily | Cancun | 2 | NULL |
Generera testdata
Koden nedan tillhandahålls för att generera en exempeldatauppsättning för användning i exempelfrågorna som finns i den här självstudien. Förutsatt att du har rätt autentiseringsuppgifter för att skapa ett nytt schema och skapa en ny tabell kan du köra dessa instruktioner med antingen en notebook-fil eller Databricks SQL. Följande kod är inte avsedd att köras som en del av en Delta Live Tables-pipeline:
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
Lägga till, ändra eller ta bort data i en direktuppspelningstabell
Om din pipeline publicerar tabeller till Unity Catalog kan du använda DML-instruktioner (datamanipuleringsspråk ), inklusive infognings-, uppdaterings-, borttagnings- och sammanslagningsinstruktioner, för att ändra målströmningstabeller som skapats av APPLY CHANGES INTO
-instruktioner.
Kommentar
- DML-instruktioner som ändrar tabellschemat för en strömmande tabell stöds inte. Se till att DML-uttrycken inte försöker utveckla tabellschemat.
- DML-instruktioner som uppdaterar en strömmande tabell kan endast köras i ett delat Unity Catalog-kluster eller ett SQL-lager med Databricks Runtime 13.3 LTS och senare.
- Eftersom direktuppspelning kräver tilläggsdatakällor anger du flaggan skipChangeCommits när du läser källströmningstabellen om bearbetningen kräver strömning från en källströmningstabell med ändringar (till exempel av DML-instruktioner). När
skipChangeCommits
har angetts ignoreras transaktioner som tar bort eller ändrar poster i källtabellen. Om bearbetningen inte kräver en direktuppspelningstabell kan du använda en materialiserad vy (som inte har begränsningen endast för tillägg) som måltabell.
Eftersom Delta Live Tables använder en angiven SEQUENCE BY
kolumn och sprider lämpliga sekvenseringsvärden till kolumnerna __START_AT
och __END_AT
i måltabellen (för SCD-typ 2), måste du se till att DML-uttryck använder giltiga värden för dessa kolumner för att upprätthålla rätt ordning på posterna. Se Hur implementeras CDC med Delta Live Tables?.
Mer information om hur du använder DML-instruktioner med strömmande tabeller finns i Lägga till, ändra eller ta bort data i en strömmande tabell.
I följande exempel infogas en aktiv post med en startsekvens på 5:
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);