Sdílet prostřednictvím


Co je zachytávání dat změn (CDC)?

Zachytávání dat změn (CDC) je vzor integrace dat, který zachycuje změny provedené v datech ve zdrojovém systému, jako jsou vložení, aktualizace a odstranění. Tyto změny, reprezentované jako seznam, se běžně označují jako CDC feed. Pokud pracujete s informačním kanálem CDC, můžete data zpracovávat mnohem rychleji a nemusíte číst celou zdrojovou datovou sadu. Transakční databáze, jako jsou SQL Server, MySQL a Oracle, generují informační kanály CDC. Tabulky Delta generují vlastní kanál CDC, označovaný jako kanál CDF (Change Data Feed).

Následující diagram ukazuje, že když se aktualizuje řádek ve zdrojové tabulce obsahující data zaměstnanců, vygeneruje novou sadu řádků v informačním kanálu CDC, který obsahuje pouze změny. Každý řádek informačního kanálu CDC obvykle obsahuje další metadata, včetně operace, jako je UPDATE a sloupec, který lze použít k deterministickému seřazení jednotlivých řádků v informačním kanálu CDC, abyste mohli zpracovávat aktualizace mimo pořadí. Například sequenceNum sloupec v následujícím diagramu určuje pořadí řádků v informačním kanálu CDC:

Přehled zachytávání změn dat

Zpracování toků změn v datech: Zachovat pouze nejnovější data vs. zachovat historické verze dat

Zpracování změněného datového kanálu se označuje jako pomalu se měnící dimenze (slowly changing dimensions, SCD). Při zpracování informačního kanálu CDC si můžete vybrat:

  • Uchováváte pouze nejnovější data (to znamená přepsat existující data)? To se označuje jako SCD Type 1.
  • Nebo uchováváte historii změn dat? Označuje se jako SCD Type 2.

Zpracování typu 1 typu SCD zahrnuje přepsání starých dat s novými daty při každé změně. To znamená, že se neuchovávají žádné historie změn. K dispozici je pouze nejnovější verze dat. Jedná se o jednoduchý přístup, který se často používá v případě, že historie změn není důležitá, například oprava chyb nebo aktualizace nekritické pole, jako jsou e-mailové adresy zákazníků.

Přehled zachytávání dat změn typu SCD typu 1

Zpracování typu SCD 2 udržuje historický záznam změn dat vytvořením dalších záznamů pro zachycení různých verzí dat v průběhu času. Každá verze dat je opatřena časovým razítkem a/nebo označená metadaty, která uživatelům umožňují sledovat, kdy došlo ke změně. To je užitečné, když je důležité sledovat vývoj dat, jako je sledování změn zákazníků v průběhu času pro účely analýzy.

Přehled zachytávání změn v datech SCD typ 2

Příklady zpracování scd typu 1 a typu 2 pomocí deklarativních kanálů Sparku Lakeflow

Příklady v této části ukazují, jak používat SCD Type 1 a Type 2.

Krok 1: Příprava ukázkových dat

V tomto příkladu vygenerujete ukázkový informační kanál CDC. Nejprve vytvořte poznámkový blok a vložte do něj následující kód. Aktualizujte proměnné na začátku bloku kódu na katalog a schéma, kde máte oprávnění vytvářet tabulky a zobrazení.

Tento kód vytvoří novou tabulku Delta, která obsahuje několik záznamů změn. Schéma je následující:

  • id - Celé číslo, jedinečný identifikátor tohoto zaměstnance
  • name - Řetězec, jméno zaměstnance
  • role - Řetězec, role zaměstnance
  • country - Řetězec, kód země, kde zaměstnanec pracuje
  • operation - Změnit typ(například INSERT, UPDATE, nebo DELETE)
  • sequenceNum – Celé číslo identifikuje logické pořadí událostí CDC ve zdrojových datech. Deklarativní kanály Sparku Lakeflow používají toto sekvencování ke zpracování událostí změn, které přicházejí mimo pořadí.
# update these to the catalog and schema where you have permissions
# to create tables and views.

catalog = "mycatalog"
schema = "myschema"
employees_cdf_table = "employees_cdf"

def write_employees_cdf_to_delta():
 data = [
   (1, "Alex", "chef", "FR", "INSERT", 1),
   (2, "Jessica", "owner", "US", "INSERT", 2),
   (3, "Mikhail", "security", "UK", "INSERT", 3),
   (4, "Gary", "cleaner", "UK", "INSERT", 4),
   (5, "Chris", "owner", "NL", "INSERT", 6),
   # out of order update, this should be dropped from SCD Type 1
   (5, "Chris", "manager", "NL", "UPDATE", 5),
   (6, "Pat", "mechanic", "NL", "DELETE", 8),
   (6, "Pat", "mechanic", "NL", "INSERT", 7)
 ]
 columns = ["id", "name", "role", "country", "operation", "sequenceNum"]
 df = spark.createDataFrame(data, columns)
 df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.{employees_cdf_table}")

write_employees_cdf_to_delta()

Náhled těchto dat můžete zobrazit pomocí následujícího příkazu SQL:

SELECT *
FROM mycatalog.myschema.employees_cdf

Krok 2: Použití typu SCD 1 k zachování pouze nejnovějších dat

Doporučujeme použít AUTO CDC rozhraní API v deklarativních kanálech Spark Lakeflow ke zpracování datového kanálu změn do tabulky SCD Type 1.

  1. Vytvořte nový poznámkový blok.
  2. Vložte do něj následující kód.
  3. Vytvořte kanál a připojte se k němu.

Funkce employees_cdf přečte tabulku, kterou jsme právě vytvořili jako datový proud, protože create_auto_cdc_flow rozhraní API, které použijete ke zpracování zachytávání dat změn, očekává jako vstup datový proud změn. Zabalíte ho dekorátorem @dp.temporary_view , protože tento datový proud nechcete materializovat do tabulky.

Pak použijete dp.create_target_table ke vytvoření streamovací tabulky, která obsahuje výsledek zpracování tohoto datového kanálu změn.

Nakonec použijete dp.create_auto_cdc_flow ke zpracování datového kanálu změn. Pojďme se podívat na jednotlivé argumenty:

  • target – Cílová streamovací tabulka, kterou jste definovali dříve.
  • source – Zobrazení streamu záznamů změn, které jste definovali dříve.
  • keys – Identifikuje jedinečné řádky v kanálu změn. Protože používáte id jako jedinečný identifikátor, stačí zadat id jediný identifikační sloupec.
  • sequence_by – Název sloupce, který určuje logické pořadí událostí CDC ve zdrojových datech. Toto sekvencování potřebujete ke zpracování událostí změn, které přicházejí mimo pořadí. Zadejte sequenceNum jako sloupec sekvencování.
  • apply_as_deletes – Vzhledem k tomu, že ukázková data obsahují operace odstranění, použijete apply_as_deletes k určení, kdy by se událost CDC měla považovat za DELETE místo upsertu.
  • except_column_list – Obsahuje seznam sloupců, které nechcete zahrnout do cílové tabulky. V tomto příkladu použijete tento argument k vyloučení sequenceNum a operation.
  • stored_as_scd_type – Označuje typ SCD, který chcete použít.
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr, lit, when
from pyspark.sql.types import StringType, ArrayType

catalog = "mycatalog"
schema = "myschema"
employees_cdf_table = "employees_cdf"
employees_table_current = "employees_current"
employees_table_historical = "employees_historical"

@dp.temporary_view
def employees_cdf():
 return spark.readStream.format("delta").table(f"{catalog}.{schema}.{employees_cdf_table}")

dp.create_target_table(f"{catalog}.{schema}.{employees_table_current}")

dp.create_auto_cdc_flow(
 target=f"{catalog}.{schema}.{employees_table_current}",
 source=employees_cdf_table,
 keys=["id"],
 sequence_by=col("sequenceNum"),
 apply_as_deletes=expr("operation = 'DELETE'"),
 except_column_list = ["operation", "sequenceNum"],
 stored_as_scd_type = 1
)

Spusťte toto potrubí kliknutím na Start.

Potom v editoru SQL spusťte následující dotaz, abyste ověřili, že se záznamy změn správně zpracovaly:

SELECT *
FROM mycatalog.myschema.employees_current

Poznámka:

Aktualizace mimo pořadí pro zaměstnance Chrise byla správně vynechána, protože jeho role je stále nastavena na Vlastník místo Manažer.

Příklad zachycení změn dat SCD Typu 1

Krok 3: Zachování historických dat pomocí typu SCD 2

V tomto příkladu vytvoříte druhou cílovou tabulku s názvem employees_historical, která obsahuje úplnou historii změn záznamů zaměstnanců.

Přidejte tento kód do pipeline. Jediný rozdíl je v tom, že stored_as_scd_type je nastaven na 2 místo 1.

dp.create_target_table(f"{catalog}.{schema}.{employees_table_historical}")

dp.create_auto_cdc_flow(
 target=f"{catalog}.{schema}.{employees_table_historical}",
 source=employees_cdf_table,
 keys=["id"],
 sequence_by=col("sequenceNum"),
 apply_as_deletes=expr("operation = 'DELETE'"),
 except_column_list = ["operation", "sequenceNum"],
 stored_as_scd_type = 2
)

Spusťte toto potrubí kliknutím na Start.

Potom v editoru SQL spusťte následující dotaz, abyste ověřili, že se záznamy změn správně zpracovaly:

SELECT *
FROM mycatalog.myschema.employees_historical

Zobrazí se všechny změny zaměstnanců, včetně těch, kteří byli odstraněni, například Pat.

Příklad zachycení změn dat SCD typu 2

Krok 4: Vyčištění prostředků

Jakmile budete hotovi, vyčistěte prostředky pomocí následujícího postupu:

  1. Odstraňte potrubí

    Poznámka:

    Když odstraníte kanál, automaticky smaže tabulky employees a employees_historical.

    1. Klikněte na Úlohy a kanály a vyhledejte název kanálu, který chcete odstranit.
    2. Klikněte na ikonu Přetečení. Ve stejném řádku jako název kanálu a potom klikněte na Odstranit.
  2. Odstraňte poznámkový blok.

  3. Odstraňte tabulku obsahující datový kanál změn:

    1. Klikněte na Nový > dotaz.
    2. Vložte a spusťte následující kód SQL a podle potřeby upravte katalog a schéma:
DROP TABLE mycatalog.myschema.employees_cdf

Nevýhody použití MERGE INTO a foreachBatch zachytávání dat změn

Databricks poskytuje MERGE INTO příkaz SQL, který můžete použít s rozhraním foreachBatch API k přenesení řádků do tabulky Delta. V této části se dozvíte, jak lze tuto techniku použít pro jednoduché případy použití, ale tato metoda se při použití ve scénářích reálného světa stává stále složitější a křehkější.

V tomto příkladu použijete stejný ukázkový datový kanál změn, který jste použili v předchozích příkladech.

Naivní implementace s MERGE INTO a foreachBatch

Vytvořte poznámkový blok a zkopírujte do něj následující kód. Podle potřeby změňte proměnné catalog, schema a employees_table. Proměnné catalog by schema měly být nastavené na umístění v katalogu Unity, kde můžete vytvářet tabulky.

Když poznámkový blok spustíte, provede to toto:

  • Vytvoří cílovou tabulku v objektu create_table. Na rozdíl od create_auto_cdc_flowtoho, který tento krok zpracovává automaticky, musíte zadat schéma.
  • Přečte datový kanál změn jako datový proud. Každý mikrobatch se zpracovává pomocí upsertToDelta metody, která spouští MERGE INTO příkaz.
catalog = "jobs"
schema = "myschema"
employees_cdf_table = "employees_cdf"
employees_table = "employees_merge"

def upsertToDelta(microBatchDF, batchId):
 microBatchDF.createOrReplaceTempView("updates")
 microBatchDF.sparkSession.sql(f"""
   MERGE INTO {catalog}.{schema}.{employees_table} t
   USING updates s
   ON s.id = t.id
   WHEN MATCHED THEN UPDATE SET *
   WHEN NOT MATCHED THEN INSERT *
 """)

def create_table():
 spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.{employees_table}")
  spark.sql(f"""
   CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{employees_table}
   (id INT, name STRING, age INT, country STRING)
 """)

create_table()

cdcData = spark.readStream.table(f"{catalog}.{schema}.{employees_cdf_table}")

cdcData.writeStream \
.foreachBatch(upsertToDelta) \
.outputMode("append") \
.start()

Pokud chcete zobrazit výsledky, spusťte následující dotaz SQL:

SELECT *
FROM mycatalog.myschema.employees_merge

Výsledky jsou bohužel nesprávné, jak je znázorněno níže:

Příklad zachytávání změn MERGE INTO dat

Několik aktualizací stejného klíče ve stejném mikrobatchu

Prvním problémem je, že kód nezpracuje více aktualizací stejného klíče ve stejném mikrobatchu. Například, použijete INSERT k vložení zaměstnance Chrise a potom aktualizujete jeho roli z vlastníka na manažera. Výsledkem by měl být jeden řádek, ale místo toho jsou dva řádky.

Která změna vyhrává, když v mikrobatchu existuje více aktualizací stejného klíče?

Změna dat zaznamenává několik aktualizací stejného klíče ve stejném příkladu mikrobatchu.

Logika se stává složitější. Následující příklad kódu získá nejnovější řádek podle sequenceNum a slučuje pouze tato data do cílové tabulky následujícím způsobem:

  • Seskupí podle primárního klíče id.
  • Vybere všechny sloupce pro řádek, který má maximální hodnotu sequenceNum v dávce pro daný klíč.
  • Rozloží řádek zpět ven.

Aktualizujte metodu upsertToDelta následujícím způsobem a spusťte kód:

def upsertToDelta(microBatchDF, batchId):
 microBatchDF = microBatchDF.groupBy("id").agg(
   max_by(struct("*"), "sequenceNum").alias("row")
 ).select("row.*").createOrReplaceTempView("updates")

 spark.sql(f"""
   MERGE INTO {catalog}.{schema}.{employees_table} t
   USING updates s
   ON s.id = t.id
   WHEN MATCHED THEN UPDATE SET *
   WHEN NOT MATCHED THEN INSERT *
 """)

Když se dotazujete na cílovou tabulku, uvidíte, že zaměstnanec s názvem Chris má správnou roli, ale stále existují další problémy, které je potřeba vyřešit, protože jste stále odstranili záznamy, které se zobrazují v cílové tabulce.

Změna dat zachytává několik aktualizací stejného klíče ve stejném příkladu mikrobatchu.

Aktualizace mimo pořadí napříč mikrodávkami

Tato část zkoumá problém aktualizací v nesprávném pořadí napříč mikrodávkami. Následující diagram znázorňuje problém: co když má řádek s Chrisem operaci UPDATE v prvním mikrobatchi, která je následována INSERT v pozdějším mikrobatchi? Kód to nezvládá správně.

Která změna vyhrává, když dojde k aktualizaci stejného klíče mimo pořadí napříč několika mikrobatchy?

Změna dat zaznamenává aktualizace mimo pořadí v příkladu mikrobatchů.

Chcete-li tento problém vyřešit, rozbalte kód pro uložení verze v každém řádku následujícím způsobem:

  • sequenceNum Uložte, kdy byl řádek naposledy aktualizován.
  • U každého nového řádku zkontrolujte, jestli je časové razítko větší než časové razítko uložené, a pak použijte následující logiku:
    • Pokud je větší, použijte nová data z cíle.
    • Jinak ponechte data ve zdroji.

Nejprve aktualizujte metodu createTable, aby ukládala sequenceNum. Použijete ji k verzování každého řádku.

def create_table():
 spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.{employees_table}")
  spark.sql(f"""
   CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{employees_table}
   (id INT, name STRING, age INT, country STRING, sequenceNum INT)
 """)

Nejprve aktualizujte upsertToDelta, aby zpracovával verze řádků. Klauzule UPDATE SETMERGE INTO musí samostatně zpracovat každý sloupec.

def upsertToDelta(microBatchDF, batchId):
 microBatchDF = microBatchDF.groupBy("id").agg(
   max_by(struct("*"), "sequenceNum").alias("row")
 ).select("row.*").createOrReplaceTempView("updates")

 spark.sql(f"""
   MERGE INTO {catalog}.{schema}.{employees_table} t
   USING updates s
   ON s.id = t.id
   WHEN MATCHED THEN UPDATE SET
     name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
     age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
     country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
   WHEN NOT MATCHED THEN INSERT *
 """)

Zpracování odstranění

Kód má bohužel stále problém. Nezpracuje DELETE operace, jak vyplývá ze skutečnosti, že zaměstnanec Pat je stále v cílové tabulce.

Předpokládejme, že odstranění přicházejí do stejné mikrodávky. Pokud je chcete zpracovat, aktualizujte metodu upsertToDelta znovu, aby se řádek odstranil, když záznam změn dat indikuje odstranění, jak je znázorněno na následujícím obrázku:

def upsertToDelta(microBatchDF, batchId):
 microBatchDF = microBatchDF.groupBy("id").agg(
   max_by(struct("*"), "sequenceNum").alias("row")
 ).select("row.*").createOrReplaceTempView("updates")

 spark.sql(f"""
   MERGE INTO {catalog}.{schema}.{employees_table} t
   USING updates s
   ON s.id = t.id
   WHEN MATCHED AND s.operation = 'DELETE' THEN DELETE
   WHEN MATCHED THEN UPDATE SET
     name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
     age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
     country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
   WHEN NOT MATCHED THEN INSERT *
 """)

Zpracování aktualizací přicházejících ve špatném pořadí po odstranění

Bohužel, výše uvedený kód stále není úplně správný, protože nezpracovává případy, kdy DELETE je následován mimo pořadí UPDATE mezi mikrobatchy.

Příklad zpracování zachytávání změn dat a aktualizací přicházejících mimo pořadí po odstraněních.

Algoritmus pro zpracování tohoto případu musí zaznamenat odstranění, aby mohl zpracovat následné aktualizace mimo pořadí. Jak to udělat:

  • Místo okamžitého odstranění řádků proveďte měkké odstranění pomocí časového razítka nebo sequenceNum. Obnovitelně odstraněné řádky jsou označené k odstranění.
  • Přesměrujte všechny uživatele na zobrazení, které filtruje záhrobky.
  • Vytvořte úlohu vyčištění, která v průběhu času odebere náhrobky.

Použijte následující kód:

def upsertToDelta(microBatchDF, batchId):
 microBatchDF = microBatchDF.groupBy("id").agg(
   max_by(struct("*"), "sequenceNum").alias("row")
 ).select("row.*").createOrReplaceTempView("updates")

 spark.sql(f"""
   MERGE INTO {catalog}.{schema}.{employees_table} t
   USING updates s
   ON s.id = t.id
   WHEN MATCHED AND s.operation = 'DELETE' THEN UPDATE SET DELETED_AT=now()
   WHEN MATCHED THEN UPDATE SET
     name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
     age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
     country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
   WHEN NOT MATCHED THEN INSERT *
 """)

Vaši uživatelé nemůžou přímo použít cílovou tabulku, takže vytvořte zobrazení, které můžou dotazovat:

CREATE VIEW employees_v AS
SELECT * FROM employees_merge
WHERE DELETED_AT = NULL

Nakonec vytvořte úlohu čištění, která pravidelně odebírá náhrobky řádků:

DELETE FROM employees_merge
WHERE DELETED_AT < now() - INTERVAL 1 DAY