Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Zachytávání dat změn (CDC) je vzor integrace dat, který zachycuje změny provedené v datech ve zdrojovém systému, jako jsou vložení, aktualizace a odstranění. Tyto změny, reprezentované jako seznam, se běžně označují jako CDC feed. Pokud pracujete s informačním kanálem CDC, můžete data zpracovávat mnohem rychleji a nemusíte číst celou zdrojovou datovou sadu. Transakční databáze, jako jsou SQL Server, MySQL a Oracle, generují informační kanály CDC. Tabulky Delta generují vlastní kanál CDC, označovaný jako kanál CDF (Change Data Feed).
Následující diagram ukazuje, že když se aktualizuje řádek ve zdrojové tabulce obsahující data zaměstnanců, vygeneruje novou sadu řádků v informačním kanálu CDC, který obsahuje pouze změny. Každý řádek informačního kanálu CDC obvykle obsahuje další metadata, včetně operace, jako je UPDATE a sloupec, který lze použít k deterministickému seřazení jednotlivých řádků v informačním kanálu CDC, abyste mohli zpracovávat aktualizace mimo pořadí. Například sequenceNum sloupec v následujícím diagramu určuje pořadí řádků v informačním kanálu CDC:
Zpracování toků změn v datech: Zachovat pouze nejnovější data vs. zachovat historické verze dat
Zpracování změněného datového kanálu se označuje jako pomalu se měnící dimenze (slowly changing dimensions, SCD). Při zpracování informačního kanálu CDC si můžete vybrat:
- Uchováváte pouze nejnovější data (to znamená přepsat existující data)? To se označuje jako SCD Type 1.
- Nebo uchováváte historii změn dat? Označuje se jako SCD Type 2.
Zpracování typu 1 typu SCD zahrnuje přepsání starých dat s novými daty při každé změně. To znamená, že se neuchovávají žádné historie změn. K dispozici je pouze nejnovější verze dat. Jedná se o jednoduchý přístup, který se často používá v případě, že historie změn není důležitá, například oprava chyb nebo aktualizace nekritické pole, jako jsou e-mailové adresy zákazníků.
Zpracování typu SCD 2 udržuje historický záznam změn dat vytvořením dalších záznamů pro zachycení různých verzí dat v průběhu času. Každá verze dat je opatřena časovým razítkem a/nebo označená metadaty, která uživatelům umožňují sledovat, kdy došlo ke změně. To je užitečné, když je důležité sledovat vývoj dat, jako je sledování změn zákazníků v průběhu času pro účely analýzy.
Příklady zpracování scd typu 1 a typu 2 pomocí deklarativních kanálů Sparku Lakeflow
Příklady v této části ukazují, jak používat SCD Type 1 a Type 2.
Krok 1: Příprava ukázkových dat
V tomto příkladu vygenerujete ukázkový informační kanál CDC. Nejprve vytvořte poznámkový blok a vložte do něj následující kód. Aktualizujte proměnné na začátku bloku kódu na katalog a schéma, kde máte oprávnění vytvářet tabulky a zobrazení.
Tento kód vytvoří novou tabulku Delta, která obsahuje několik záznamů změn. Schéma je následující:
-
id- Celé číslo, jedinečný identifikátor tohoto zaměstnance -
name- Řetězec, jméno zaměstnance -
role- Řetězec, role zaměstnance -
country- Řetězec, kód země, kde zaměstnanec pracuje -
operation- Změnit typ(napříkladINSERT,UPDATE, neboDELETE) -
sequenceNum– Celé číslo identifikuje logické pořadí událostí CDC ve zdrojových datech. Deklarativní kanály Sparku Lakeflow používají toto sekvencování ke zpracování událostí změn, které přicházejí mimo pořadí.
# update these to the catalog and schema where you have permissions
# to create tables and views.
catalog = "mycatalog"
schema = "myschema"
employees_cdf_table = "employees_cdf"
def write_employees_cdf_to_delta():
data = [
(1, "Alex", "chef", "FR", "INSERT", 1),
(2, "Jessica", "owner", "US", "INSERT", 2),
(3, "Mikhail", "security", "UK", "INSERT", 3),
(4, "Gary", "cleaner", "UK", "INSERT", 4),
(5, "Chris", "owner", "NL", "INSERT", 6),
# out of order update, this should be dropped from SCD Type 1
(5, "Chris", "manager", "NL", "UPDATE", 5),
(6, "Pat", "mechanic", "NL", "DELETE", 8),
(6, "Pat", "mechanic", "NL", "INSERT", 7)
]
columns = ["id", "name", "role", "country", "operation", "sequenceNum"]
df = spark.createDataFrame(data, columns)
df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.{employees_cdf_table}")
write_employees_cdf_to_delta()
Náhled těchto dat můžete zobrazit pomocí následujícího příkazu SQL:
SELECT *
FROM mycatalog.myschema.employees_cdf
Krok 2: Použití typu SCD 1 k zachování pouze nejnovějších dat
Doporučujeme použít AUTO CDC rozhraní API v deklarativních kanálech Spark Lakeflow ke zpracování datového kanálu změn do tabulky SCD Type 1.
- Vytvořte nový poznámkový blok.
- Vložte do něj následující kód.
- Vytvořte kanál a připojte se k němu.
Funkce employees_cdf přečte tabulku, kterou jsme právě vytvořili jako datový proud, protože create_auto_cdc_flow rozhraní API, které použijete ke zpracování zachytávání dat změn, očekává jako vstup datový proud změn. Zabalíte ho dekorátorem @dp.temporary_view , protože tento datový proud nechcete materializovat do tabulky.
Pak použijete dp.create_target_table ke vytvoření streamovací tabulky, která obsahuje výsledek zpracování tohoto datového kanálu změn.
Nakonec použijete dp.create_auto_cdc_flow ke zpracování datového kanálu změn. Pojďme se podívat na jednotlivé argumenty:
-
target– Cílová streamovací tabulka, kterou jste definovali dříve. -
source– Zobrazení streamu záznamů změn, které jste definovali dříve. -
keys– Identifikuje jedinečné řádky v kanálu změn. Protože používáteidjako jedinečný identifikátor, stačí zadatidjediný identifikační sloupec. -
sequence_by– Název sloupce, který určuje logické pořadí událostí CDC ve zdrojových datech. Toto sekvencování potřebujete ke zpracování událostí změn, které přicházejí mimo pořadí. ZadejtesequenceNumjako sloupec sekvencování. -
apply_as_deletes– Vzhledem k tomu, že ukázková data obsahují operace odstranění, použijeteapply_as_deletesk určení, kdy by se událost CDC měla považovat zaDELETEmísto upsertu. -
except_column_list– Obsahuje seznam sloupců, které nechcete zahrnout do cílové tabulky. V tomto příkladu použijete tento argument k vyloučenísequenceNumaoperation. -
stored_as_scd_type– Označuje typ SCD, který chcete použít.
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr, lit, when
from pyspark.sql.types import StringType, ArrayType
catalog = "mycatalog"
schema = "myschema"
employees_cdf_table = "employees_cdf"
employees_table_current = "employees_current"
employees_table_historical = "employees_historical"
@dp.temporary_view
def employees_cdf():
return spark.readStream.format("delta").table(f"{catalog}.{schema}.{employees_cdf_table}")
dp.create_target_table(f"{catalog}.{schema}.{employees_table_current}")
dp.create_auto_cdc_flow(
target=f"{catalog}.{schema}.{employees_table_current}",
source=employees_cdf_table,
keys=["id"],
sequence_by=col("sequenceNum"),
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
Spusťte toto potrubí kliknutím na Start.
Potom v editoru SQL spusťte následující dotaz, abyste ověřili, že se záznamy změn správně zpracovaly:
SELECT *
FROM mycatalog.myschema.employees_current
Poznámka:
Aktualizace mimo pořadí pro zaměstnance Chrise byla správně vynechána, protože jeho role je stále nastavena na Vlastník místo Manažer.
Krok 3: Zachování historických dat pomocí typu SCD 2
V tomto příkladu vytvoříte druhou cílovou tabulku s názvem employees_historical, která obsahuje úplnou historii změn záznamů zaměstnanců.
Přidejte tento kód do pipeline. Jediný rozdíl je v tom, že stored_as_scd_type je nastaven na 2 místo 1.
dp.create_target_table(f"{catalog}.{schema}.{employees_table_historical}")
dp.create_auto_cdc_flow(
target=f"{catalog}.{schema}.{employees_table_historical}",
source=employees_cdf_table,
keys=["id"],
sequence_by=col("sequenceNum"),
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 2
)
Spusťte toto potrubí kliknutím na Start.
Potom v editoru SQL spusťte následující dotaz, abyste ověřili, že se záznamy změn správně zpracovaly:
SELECT *
FROM mycatalog.myschema.employees_historical
Zobrazí se všechny změny zaměstnanců, včetně těch, kteří byli odstraněni, například Pat.
Krok 4: Vyčištění prostředků
Jakmile budete hotovi, vyčistěte prostředky pomocí následujícího postupu:
Odstraňte potrubí
Poznámka:
Když odstraníte kanál, automaticky smaže tabulky
employeesaemployees_historical.- Klikněte na Úlohy a kanály a vyhledejte název kanálu, který chcete odstranit.
- Klikněte na
Ve stejném řádku jako název kanálu a potom klikněte na Odstranit.
Odstraňte poznámkový blok.
Odstraňte tabulku obsahující datový kanál změn:
- Klikněte na Nový > dotaz.
- Vložte a spusťte následující kód SQL a podle potřeby upravte katalog a schéma:
DROP TABLE mycatalog.myschema.employees_cdf
Nevýhody použití MERGE INTO a foreachBatch zachytávání dat změn
Databricks poskytuje MERGE INTO příkaz SQL, který můžete použít s rozhraním foreachBatch API k přenesení řádků do tabulky Delta. V této části se dozvíte, jak lze tuto techniku použít pro jednoduché případy použití, ale tato metoda se při použití ve scénářích reálného světa stává stále složitější a křehkější.
V tomto příkladu použijete stejný ukázkový datový kanál změn, který jste použili v předchozích příkladech.
Naivní implementace s MERGE INTO a foreachBatch
Vytvořte poznámkový blok a zkopírujte do něj následující kód. Podle potřeby změňte proměnné catalog, schema a employees_table. Proměnné catalog by schema měly být nastavené na umístění v katalogu Unity, kde můžete vytvářet tabulky.
Když poznámkový blok spustíte, provede to toto:
- Vytvoří cílovou tabulku v objektu
create_table. Na rozdíl odcreate_auto_cdc_flowtoho, který tento krok zpracovává automaticky, musíte zadat schéma. - Přečte datový kanál změn jako datový proud. Každý mikrobatch se zpracovává pomocí
upsertToDeltametody, která spouštíMERGE INTOpříkaz.
catalog = "jobs"
schema = "myschema"
employees_cdf_table = "employees_cdf"
employees_table = "employees_merge"
def upsertToDelta(microBatchDF, batchId):
microBatchDF.createOrReplaceTempView("updates")
microBatchDF.sparkSession.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
def create_table():
spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.{employees_table}")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{employees_table}
(id INT, name STRING, age INT, country STRING)
""")
create_table()
cdcData = spark.readStream.table(f"{catalog}.{schema}.{employees_cdf_table}")
cdcData.writeStream \
.foreachBatch(upsertToDelta) \
.outputMode("append") \
.start()
Pokud chcete zobrazit výsledky, spusťte následující dotaz SQL:
SELECT *
FROM mycatalog.myschema.employees_merge
Výsledky jsou bohužel nesprávné, jak je znázorněno níže:
Několik aktualizací stejného klíče ve stejném mikrobatchu
Prvním problémem je, že kód nezpracuje více aktualizací stejného klíče ve stejném mikrobatchu. Například, použijete INSERT k vložení zaměstnance Chrise a potom aktualizujete jeho roli z vlastníka na manažera. Výsledkem by měl být jeden řádek, ale místo toho jsou dva řádky.
Která změna vyhrává, když v mikrobatchu existuje více aktualizací stejného klíče?
Logika se stává složitější. Následující příklad kódu získá nejnovější řádek podle sequenceNum a slučuje pouze tato data do cílové tabulky následujícím způsobem:
- Seskupí podle primárního klíče
id. - Vybere všechny sloupce pro řádek, který má maximální hodnotu
sequenceNumv dávce pro daný klíč. - Rozloží řádek zpět ven.
Aktualizujte metodu upsertToDelta následujícím způsobem a spusťte kód:
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
Když se dotazujete na cílovou tabulku, uvidíte, že zaměstnanec s názvem Chris má správnou roli, ale stále existují další problémy, které je potřeba vyřešit, protože jste stále odstranili záznamy, které se zobrazují v cílové tabulce.
Aktualizace mimo pořadí napříč mikrodávkami
Tato část zkoumá problém aktualizací v nesprávném pořadí napříč mikrodávkami. Následující diagram znázorňuje problém: co když má řádek s Chrisem operaci UPDATE v prvním mikrobatchi, která je následována INSERT v pozdějším mikrobatchi? Kód to nezvládá správně.
Která změna vyhrává, když dojde k aktualizaci stejného klíče mimo pořadí napříč několika mikrobatchy?
Chcete-li tento problém vyřešit, rozbalte kód pro uložení verze v každém řádku následujícím způsobem:
-
sequenceNumUložte, kdy byl řádek naposledy aktualizován. - U každého nového řádku zkontrolujte, jestli je časové razítko větší než časové razítko uložené, a pak použijte následující logiku:
- Pokud je větší, použijte nová data z cíle.
- Jinak ponechte data ve zdroji.
Nejprve aktualizujte metodu createTable, aby ukládala sequenceNum. Použijete ji k verzování každého řádku.
def create_table():
spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.{employees_table}")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{employees_table}
(id INT, name STRING, age INT, country STRING, sequenceNum INT)
""")
Nejprve aktualizujte upsertToDelta, aby zpracovával verze řádků. Klauzule UPDATE SETMERGE INTO musí samostatně zpracovat každý sloupec.
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED THEN UPDATE SET
name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
WHEN NOT MATCHED THEN INSERT *
""")
Zpracování odstranění
Kód má bohužel stále problém. Nezpracuje DELETE operace, jak vyplývá ze skutečnosti, že zaměstnanec Pat je stále v cílové tabulce.
Předpokládejme, že odstranění přicházejí do stejné mikrodávky. Pokud je chcete zpracovat, aktualizujte metodu upsertToDelta znovu, aby se řádek odstranil, když záznam změn dat indikuje odstranění, jak je znázorněno na následujícím obrázku:
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED AND s.operation = 'DELETE' THEN DELETE
WHEN MATCHED THEN UPDATE SET
name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
WHEN NOT MATCHED THEN INSERT *
""")
Zpracování aktualizací přicházejících ve špatném pořadí po odstranění
Bohužel, výše uvedený kód stále není úplně správný, protože nezpracovává případy, kdy DELETE je následován mimo pořadí UPDATE mezi mikrobatchy.
Algoritmus pro zpracování tohoto případu musí zaznamenat odstranění, aby mohl zpracovat následné aktualizace mimo pořadí. Jak to udělat:
- Místo okamžitého odstranění řádků proveďte měkké odstranění pomocí časového razítka nebo
sequenceNum. Obnovitelně odstraněné řádky jsou označené k odstranění. - Přesměrujte všechny uživatele na zobrazení, které filtruje záhrobky.
- Vytvořte úlohu vyčištění, která v průběhu času odebere náhrobky.
Použijte následující kód:
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED AND s.operation = 'DELETE' THEN UPDATE SET DELETED_AT=now()
WHEN MATCHED THEN UPDATE SET
name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
WHEN NOT MATCHED THEN INSERT *
""")
Vaši uživatelé nemůžou přímo použít cílovou tabulku, takže vytvořte zobrazení, které můžou dotazovat:
CREATE VIEW employees_v AS
SELECT * FROM employees_merge
WHERE DELETED_AT = NULL
Nakonec vytvořte úlohu čištění, která pravidelně odebírá náhrobky řádků:
DELETE FROM employees_merge
WHERE DELETED_AT < now() - INTERVAL 1 DAY