Sdílet prostřednictvím


Přenesení do tabulky Delta Lake pomocí sloučení

Pomocí operace SQL můžete přenést data ze zdrojové tabulky, zobrazení nebo datového rámce do cílové tabulky MERGE Delta. Delta Lake podporuje vkládání, aktualizace a odstraňování MERGEa podporuje rozšířenou syntaxi nad rámec standardů SQL pro usnadnění pokročilých případů použití.

Předpokládejme, že máte zdrojovou tabulku s názvem people10mupdates nebo zdrojovou cestou /tmp/delta/people-10m-updates , která obsahuje nová data pro cílovou tabulku s názvem people10m nebo cílovou cestou na /tmp/delta/people-10madrese . Některé z těchto nových záznamů už můžou být v cílových datech. Chcete-li sloučit nová data, chcete aktualizovat řádky, ve kterých je osoba id již přítomna, a vložit nové řádky, ve kterých není žádná shoda id přítomna. Můžete spustit následující dotaz:

SQL

MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName,
    middleName = people10mupdates.middleName,
    lastName = people10mupdates.lastName,
    gender = people10mupdates.gender,
    birthDate = people10mupdates.birthDate,
    ssn = people10mupdates.ssn,
    salary = people10mupdates.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    firstName,
    middleName,
    lastName,
    gender,
    birthDate,
    ssn,
    salary
  )
  VALUES (
    people10mupdates.id,
    people10mupdates.firstName,
    people10mupdates.middleName,
    people10mupdates.lastName,
    people10mupdates.gender,
    people10mupdates.birthDate,
    people10mupdates.ssn,
    people10mupdates.salary
  )

Python

from delta.tables import *

deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched
  .updateExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .whenNotMatched
  .insertExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .execute()

Důležité

Pouze jeden řádek ze zdrojové tabulky se může shodovat s daným řádkem v cílové tabulce. V Databricks Runtime 16.0 a novějších MERGE vyhodnocuje podmínky zadané v WHEN MATCHED klauzulích a ON určí duplicitní shody. V Databricks Runtime 15.4 LTS a níže MERGE operace berou v úvahu pouze podmínky uvedené v ON klauzuli.

Podrobnosti o syntaxi jazyka Scala a Python najdete v dokumentaci k rozhraní Delta Lake API. Podrobnosti o syntaxi SQL najdete v tématu MERGE INTO

Úprava všech neseřazených řádků pomocí sloučení

V Databricks SQL a Databricks Runtime 12.2 LTS a vyšší můžete klauzuli použít WHEN NOT MATCHED BY SOURCE k UPDATE nebo DELETE záznamům v cílové tabulce, které nemají odpovídající záznamy ve zdrojové tabulce. Databricks doporučuje přidat volitelnou podmíněnou klauzuli, aby se zabránilo úplnému přepisování cílové tabulky.

Následující příklad kódu ukazuje základní syntaxi použití pro odstranění, přepsání cílové tabulky obsahem zdrojové tabulky a odstraněním chybějících záznamů v cílové tabulce. Škálovatelný vzor pro tabulky, ve kterých jsou aktualizace a odstranění zdroje vázané na čas, najdete v tématu Přírůstková synchronizace tabulky Delta se zdrojem.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

Následující příklad přidá podmínky do WHEN NOT MATCHED BY SOURCE klauzule a určuje hodnoty, které se mají aktualizovat v nezařazených cílových řádcích.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdate(
    set = {"target.lastSeen": "source.timestamp"}
  )
  .whenNotMatchedInsert(
    values = {
      "target.key": "source.key",
      "target.lastSeen": "source.timestamp",
      "target.status": "'active'"
    }
  )
  .whenNotMatchedBySourceUpdate(
    condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
    set = {"target.status": "'inactive'"}
  )
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateExpr(Map("target.lastSeen" -> "source.timestamp"))
  .whenNotMatched()
  .insertExpr(Map(
    "target.key" -> "source.key",
    "target.lastSeen" -> "source.timestamp",
    "target.status" -> "'active'",
    )
  )
  .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
  .updateExpr(Map("target.status" -> "'inactive'"))
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
  INSERT (key, lastSeen, status) VALUES (source.key,  source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
  UPDATE SET target.status = 'inactive'

Sémantika operace sloučení

Následuje podrobný popis merge sémantiky programových operací.

  • Může existovat libovolný počet whenMatched klauzulí a whenNotMatched klauzulí.

  • whenMatched Klauzule se provádějí, když zdrojový řádek odpovídá cílovému řádku tabulky na základě podmínky shody. Tyto klauzule mají následující sémantiku.

    • whenMatched klauzule mohou mít maximálně jednu update a jednu delete akci. Akce update pouze merge aktualizuje zadané sloupce (podobně jako update operace) odpovídajícího cílového řádku. Akce delete odstraní odpovídající řádek.

    • Každá whenMatched klauzule může mít volitelnou podmínku. Pokud tato podmínka klauzule existuje, provede se akce delete pro všechny odpovídající dvojice řádků zdrojového cíle pouze v případě, update že podmínka klauzule je pravdivá.

    • Pokud existuje více whenMatched klauzulí, vyhodnocují se v uvedeném pořadí. Všechny whenMatched klauzule, s výjimkou poslední, musí mít podmínky.

    • Pokud se žádná z whenMatched podmínek nevyhodnotí jako true pro dvojici zdrojového a cílového řádku, která odpovídá podmínce sloučení, zůstane cílový řádek beze změny.

    • Chcete-li aktualizovat všechny sloupce cílové tabulky Delta odpovídajícími sloupci zdrojové datové sady, použijte whenMatched(...).updateAll(). To odpovídá:

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      pro všechny sloupce cílové tabulky Delta. Proto tato akce předpokládá, že zdrojová tabulka má stejné sloupce jako v cílové tabulce, jinak dotaz vyvolá chybu analýzy.

      Poznámka:

      Toto chování se změní při povolení automatického vývoje schématu. Podrobnosti najdete v automatickém vývoji schématu.

  • whenNotMatched Klauzule se spouští, když zdrojový řádek neodpovídá žádnému cílovému řádku na základě podmínky shody. Tyto klauzule mají následující sémantiku.

    • whenNotMatched klauzule mohou mít pouze insert akci. Nový řádek se vygeneruje na základě zadaného sloupce a odpovídajících výrazů. Nemusíte zadávat všechny sloupce v cílové tabulce. Pro nezadané cílové sloupce NULL se vloží.

    • Každá whenNotMatched klauzule může mít volitelnou podmínku. Pokud podmínka klauzule existuje, vloží se zdrojový řádek pouze v případě, že je podmínka pro tento řádek pravdivá. Jinak se zdrojový sloupec ignoruje.

    • Pokud existuje více whenNotMatched klauzulí, vyhodnocují se v uvedeném pořadí. Všechny whenNotMatched klauzule, s výjimkou poslední, musí mít podmínky.

    • Chcete-li vložit všechny sloupce cílové tabulky Delta s odpovídajícími sloupci zdrojové datové sady, použijte whenNotMatched(...).insertAll(). To odpovídá:

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      pro všechny sloupce cílové tabulky Delta. Proto tato akce předpokládá, že zdrojová tabulka má stejné sloupce jako v cílové tabulce, jinak dotaz vyvolá chybu analýzy.

      Poznámka:

      Toto chování se změní při povolení automatického vývoje schématu. Podrobnosti najdete v automatickém vývoji schématu.

  • whenNotMatchedBySource Klauzule se spustí, když cílový řádek neodpovídá žádnému zdrojovému řádku na základě podmínky sloučení. Tyto klauzule mají následující sémantiku.

    • whenNotMatchedBySource klauzule mohou určovat delete a update akce.
    • Každá whenNotMatchedBySource klauzule může mít volitelnou podmínku. Pokud je podmínka klauzule přítomná, cílový řádek se upraví jenom v případě, že je tato podmínka pro tento řádek pravdivá. V opačném případě zůstane cílový řádek beze změny.
    • Pokud existuje více whenNotMatchedBySource klauzulí, vyhodnocují se v uvedeném pořadí. Všechny whenNotMatchedBySource klauzule, s výjimkou poslední, musí mít podmínky.
    • Podle definice whenNotMatchedBySource klauzule nemají zdrojový řádek pro vyžádání hodnot sloupců, a proto na zdrojové sloupce nelze odkazovat. Pro každý sloupec, který chcete upravit, můžete buď zadat literál, nebo provést akci s cílovým sloupcem, například SET target.deleted_count = target.deleted_count + 1.

Důležité

  • merge Operace může selhat, pokud se shoduje více řádků zdrojové datové sady a pokus o sloučení se pokusí aktualizovat stejné řádky cílové tabulky Delta. Podle sémantiky sloučení SQL je taková operace aktualizace nejednoznačná, protože není jasné, který zdrojový řádek by se měl použít k aktualizaci odpovídajícího cílového řádku. Zdrojová tabulka můžete předem zpracovat, abyste vyloučili možnost více shod.
  • Operaci SQL MERGE můžete použít v ZOBRAZENÍ SQL pouze v případě, že zobrazení bylo definováno jako CREATE VIEW viewName AS SELECT * FROM deltaTable.

Odstranění duplicitních dat při zápisu do tabulek Delta

Běžným případem použití ETL je shromáždit protokoly do tabulky Delta tak, že je připojíte k tabulce. Často ale zdroje můžou generovat duplicitní záznamy protokolu a kroky odstranění duplicitních dat jsou potřeba k jejich péči. Díky mergetomu se můžete vyhnout vkládání duplicitních záznamů.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

Scala

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()

Java

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

Poznámka:

Datová sada obsahující nové protokoly musí být odstraněna duplicitními daty uvnitř sebe. Sémantika sloučení SQL odpovídá novým datům v tabulce a odstranění duplicit s existujícími daty v tabulce, ale pokud v nové datové sadě existují duplicitní data, vloží se. Proto před sloučením do tabulky deduplikujte nová data.

Pokud víte, že můžete získat duplicitní záznamy jenom na několik dní, můžete dotaz dále optimalizovat rozdělením tabulky podle data a zadáním rozsahu kalendářních dat cílové tabulky, podle kterého se má shodovat.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()

Scala

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()

Java

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

To je efektivnější než předchozí příkaz, protože hledá duplicity pouze za posledních 7 dnů protokolů, ne celou tabulku. Kromě toho můžete použít toto sloučení jen pro vložení se strukturovaným streamováním k průběžnému odstraňování duplicitních dat protokolů.

  • V dotazu streamování můžete pomocí operace foreachBatch sloučení průběžně zapisovat všechna streamovaná data do tabulky Delta s odstraněním duplicitních dat. Další informace o foreachBatch.
  • V jiném streamovacím dotazu můžete průběžně číst odstraněná data z této tabulky Delta. Je to možné, protože sloučení pouze vložení připojí nová data do tabulky Delta.

Pomalu se měnící data (SCD) a zachytávání dat (CDC) pomocí Delta Lake

Delta Live Tables má nativní podporu pro sledování a použití SCD Type 1 a Type 2. Pomocí APPLY CHANGES INTO rozdílových živých tabulek se ujistěte, že se záznamy mimo pořadí zpracovávají správně při zpracování informačních kanálů CDC. Viz rozhraní API APPLY CHANGES: Zjednodušení zachytávání dat změn pomocí rozdílových živých tabulek.

Přírůstková synchronizace tabulky Delta se zdrojem

V Databricks SQL a Databricks Runtime 12.2 LTS a novějších verzích můžete vytvořit WHEN NOT MATCHED BY SOURCE libovolné podmínky pro atomické odstranění a nahrazení části tabulky. To může být užitečné zejména v případě, že máte zdrojovou tabulku, kde se záznamy můžou po dobu několika dnů po počátečním zadání dat změnit nebo odstranit, ale nakonec se usadit do konečného stavu.

Následující dotaz ukazuje použití tohoto vzoru k výběru 5 dnů záznamů ze zdroje, aktualizaci odpovídajících záznamů v cíli, vložení nových záznamů ze zdroje do cíle a odstranění všech chybějících záznamů z posledních 5 dnů v cíli.

MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE

Poskytnutím stejného logického filtru zdrojových a cílových tabulek můžete dynamicky šířit změny ze zdroje do cílových tabulek, včetně odstranění.

Poznámka:

I když se tento model dá použít bez podmíněných klauzulí, může to vést k úplnému přepsání cílové tabulky, která může být nákladná.