Förenklad insamling av ändringsdata med API:et APPLY CHANGES i Delta Live Tables

Delta Live Tables förenklar insamling av ändringsdata (CDC) med API:et APPLY CHANGES . Tidigare användes -instruktionen MERGE INTO ofta för bearbetning av CDC-poster på Azure Databricks. Kan dock MERGE INTO ge felaktiga resultat på grund av poster som inte är sekvenserade eller kräva komplex logik för att ordna om poster.

Genom att automatiskt hantera out-of-sequence-poster säkerställer API:et APPLY CHANGES i Delta Live Tables korrekt bearbetning av CDC-poster och tar bort behovet av att utveckla komplex logik för hantering av out-of-sequence-poster.

APPLY CHANGES API:et stöds i SQL- och Python-gränssnitten Delta Live Tables, inklusive stöd för uppdatering av tabeller med SCD-typ 1 och typ 2:

  • Använd SCD typ 1 för att uppdatera poster direkt. Historiken behålls inte för poster som uppdateras.
  • Använd SCD typ 2 för att behålla en historik över poster, antingen på alla uppdateringar eller vid uppdateringar av en angiven uppsättning kolumner.

Syntax och andra referenser finns i:

Kommentar

Den här artikeln beskriver hur du uppdaterar tabeller i din Delta Live Tables-pipeline baserat på ändringar i källdata. Information om hur du registrerar och frågar ändringsinformation på radnivå för Delta-tabeller finns i Använda Delta Lake-ändringsdataflöde i Azure Databricks.

Hur implementeras CDC med Delta Live Tables?

Du måste ange en kolumn i källdata som ska sekvensera poster, som Delta Live Tables tolkar som en monotont ökande representation av rätt ordning på källdata. Delta Live Tables hanterar automatiskt data som kommer i fel ordning. För SCD Typ 2-ändringar sprider Delta Live Tables lämpliga sekvenseringsvärden till kolumnerna __START_AT och __END_AT i måltabellen. Det bör finnas en distinkt uppdatering per nyckel vid varje sekvenseringsvärde och NULL-sekvenseringsvärden stöds inte.

Om du vill utföra CDC-bearbetning med Delta Live Tables skapar du först en strömmande tabell och använder sedan en APPLY CHANGES INTO -instruktion för att ange källa, nycklar och sekvensering för ändringsflödet. Om du vill skapa måluppspelningstabellen använder du -instruktionen CREATE OR REFRESH STREAMING TABLEcreate_streaming_table() i SQL eller funktionen i Python. Om du vill skapa instruktionen som definierar CDC-bearbetningen använder du -instruktionen APPLY CHANGESapply_changes() i SQL eller funktionen i Python. Syntaxinformation finns i Ändra datainsamling med SQL i Delta Live Tables eller Ändra datainsamling med Python i Delta Live Tables.

Vilka dataobjekt används för CDC-bearbetning av Delta Live Tables?

När du deklarerar måltabellen i Hive-metaarkivet skapas två datastrukturer:

  • En vy med det namn som tilldelats måltabellen.
  • En intern bakgrundstabell som används av Delta Live Tables för att hantera CDC-bearbetning. Den här tabellen namnges genom att prepending __apply_changes_storage_ till måltabellens namn.

Om du till exempel deklarerar en måltabell med namnet dlt_cdc_targetvisas en vy med namnet dlt_cdc_target och en tabell med namnet __apply_changes_storage_dlt_cdc_target i metaarkivet. Om du skapar en vy kan Delta Live Tables filtrera bort den extra information (till exempel gravstenar och versioner) som krävs för att hantera data som inte är i ordning. Om du vill visa bearbetade data frågar du målvyn. Eftersom schemat för __apply_changes_storage_ tabellen kan ändras för att stödja framtida funktioner eller förbättringar bör du inte fråga tabellen om produktionsanvändning. Om du lägger till data manuellt i tabellen antas posterna komma före andra ändringar eftersom versionskolumnerna saknas.

Om en pipeline publiceras i Unity Catalog är de interna bakgrundstabellerna inte tillgängliga för användare.

Hämta data om poster som bearbetas av en CDC-fråga för Delta Live Tables

Följande mått samlas in av apply changes frågor:

  • num_upserted_rows: Antalet utdatarader som har ökats till datamängden under en uppdatering.
  • num_deleted_rows: Antalet befintliga utdatarader som tagits bort från datauppsättningen under en uppdatering.

Måttet num_output_rows , som är utdata för icke-CDC-flöden, samlas inte in för apply changes frågor.

Begränsningar

Målet för APPLY CHANGES INTO frågan eller apply_changes funktionen kan inte användas som källa för en strömmande tabell. En tabell som läser från målet för en APPLY CHANGES INTO fråga eller apply_changes funktion måste vara en materialiserad vy.

SCD-typ 1 och SCD typ 2 på Azure Databricks

Följande avsnitt innehåller exempel som visar SCD-typ 1 för Delta Live Tables och typ 2-frågor som uppdaterar måltabeller baserat på källhändelser som:

  1. Skapa nya användarposter.
  2. Ta bort en användarpost.
  3. Uppdatera användarposter. I exemplet scd typ 1 kommer de sista UPDATE åtgärderna sent och tas bort från måltabellen, vilket visar hanteringen av out-of-order-händelser.

I följande exempel förutsätter vi att du är bekant med att konfigurera och uppdatera Delta Live Tables-pipelines. Se Självstudie: Kör din första Delta Live Tables-pipeline.

Om du vill köra de här exemplen måste du börja med att skapa en exempeldatauppsättning. Se Generera testdata.

Följande är indataposterna för dessa exempel:

Användar-ID name ort operation sequenceNum
124 Raul Oaxaca INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Tijuana INSERT 2
126 Lily Cancun INSERT 2
123 null null DELETE 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel Chihuahua UPDATE 5

Om du avkommenterar den sista raden i exempeldata infogas följande post som anger var poster ska trunkeras:

Användar-ID name ort operation sequenceNum
null null null TRUNCATE 3

Kommentar

Alla följande exempel innehåller alternativ för att ange både DELETE och åtgärder, men var och TRUNCATE en av dessa är valfria.

Bearbeta SCD-typ 1-uppdateringar

I följande kodexempel visas bearbetning av SCD typ 1-uppdateringar:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

När du har kört SCD typ 1-exemplet innehåller måltabellen följande poster:

Användar-ID name ort
124 Raul Oaxaca
125 Mercedes Guadalajara
126 Lily Cancun

När du har kört SCD typ 1-exemplet med den ytterligare TRUNCATE posten, poster 124 och 126 trunkeras på grund av TRUNCATE åtgärden vid sequenceNum=3, och måltabellen innehåller följande post:

Användar-ID name ort
125 Mercedes Guadalajara

Bearbeta SCD-typ 2-uppdateringar

I följande kodexempel visas bearbetning av SCD typ 2-uppdateringar:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

När du har kört SCD typ 2-exemplet innehåller måltabellen följande poster:

Användar-ID name ort __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel Chihuahua 5 6
124 Raul Oaxaca 1 NULL
125 Mercedes Tijuana 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 NULL
126 Lily Cancun 2 NULL

En SCD-typ 2-fråga kan också ange en delmängd av utdatakolumner som ska spåras för historik i måltabellen. Ändringar i andra kolumner uppdateras i stället för att nya historikposter genereras. I följande exempel visas hur du city undantar kolumnen från spårning:

I följande exempel visas hur du använder spårningshistorik med SCD-typ 2:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

När du har kört det här exemplet utan den ytterligare TRUNCATE posten innehåller måltabellen följande poster:

Användar-ID name ort __START_AT __END_AT
123 Isabel Chihuahua 1 6
124 Raul Oaxaca 1 NULL
125 Mercedes Guadalajara 2 NULL
126 Lily Cancun 2 NULL

Generera testdata

Koden nedan tillhandahålls för att generera en exempeldatauppsättning för användning i exempelfrågorna som finns i den här självstudien. Förutsatt att du har rätt autentiseringsuppgifter för att skapa ett nytt schema och skapa en ny tabell kan du köra dessa instruktioner med antingen en notebook-fil eller Databricks SQL. Följande kod är inte avsedd att köras som en del av en Delta Live Tables-pipeline:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

Lägga till, ändra eller ta bort data i en direktuppspelningstabell

Om din pipeline publicerar tabeller till Unity Catalog kan du använda DML-instruktioner (datamanipuleringsspråk ), inklusive infognings-, uppdaterings-, borttagnings- och sammanslagningsinstruktioner, för att ändra målströmningstabeller som skapats av APPLY CHANGES INTO -instruktioner.

Kommentar

  • DML-instruktioner som ändrar tabellschemat för en strömmande tabell stöds inte. Se till att DML-uttrycken inte försöker utveckla tabellschemat.
  • DML-instruktioner som uppdaterar en strömmande tabell kan endast köras i ett delat Unity Catalog-kluster eller ett SQL-lager med Databricks Runtime 13.3 LTS och senare.
  • Eftersom direktuppspelning kräver tilläggsdatakällor anger du flaggan skipChangeCommits när du läser källströmningstabellen om bearbetningen kräver strömning från en källströmningstabell med ändringar (till exempel av DML-instruktioner). När skipChangeCommits har angetts ignoreras transaktioner som tar bort eller ändrar poster i källtabellen. Om bearbetningen inte kräver en direktuppspelningstabell kan du använda en materialiserad vy (som inte har begränsningen endast för tillägg) som måltabell.

Eftersom Delta Live Tables använder en angiven SEQUENCE BY kolumn och sprider lämpliga sekvenseringsvärden till kolumnerna __START_AT och __END_AT i måltabellen (för SCD-typ 2), måste du se till att DML-uttryck använder giltiga värden för dessa kolumner för att upprätthålla rätt ordning på posterna. Se Hur implementeras CDC med Delta Live Tables?.

Mer information om hur du använder DML-instruktioner med strömmande tabeller finns i Lägga till, ändra eller ta bort data i en strömmande tabell.

I följande exempel infogas en aktiv post med en startsekvens på 5:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);