Eseguire l'upsert in una tabella Delta Lake usando l'unione

È possibile eseguire l'upsert dei dati da una tabella di origine, una vista o un dataframe in una tabella Delta di destinazione usando l'operazione MERGE SQL. Delta Lake supporta inserimenti, aggiornamenti ed eliminazioni in MERGEe supporta la sintassi estesa oltre gli standard SQL per facilitare i casi d'uso avanzati.

Si supponga di avere una tabella di origine denominata people10mupdates o un percorso di origine in /tmp/delta/people-10m-updates che contiene nuovi dati per una tabella di destinazione denominata people10m o un percorso di destinazione in /tmp/delta/people-10m. Alcuni di questi nuovi record potrebbero essere già presenti nei dati di destinazione. Per unire i nuovi dati, è necessario aggiornare le righe in cui è già presente la persona id e inserire le nuove righe in cui non è presente alcuna corrispondenza id . È possibile eseguire la query seguente:

SQL

MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName,
    middleName = people10mupdates.middleName,
    lastName = people10mupdates.lastName,
    gender = people10mupdates.gender,
    birthDate = people10mupdates.birthDate,
    ssn = people10mupdates.ssn,
    salary = people10mupdates.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    firstName,
    middleName,
    lastName,
    gender,
    birthDate,
    ssn,
    salary
  )
  VALUES (
    people10mupdates.id,
    people10mupdates.firstName,
    people10mupdates.middleName,
    people10mupdates.lastName,
    people10mupdates.gender,
    people10mupdates.birthDate,
    people10mupdates.ssn,
    people10mupdates.salary
  )

Python

from delta.tables import *

deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-10m-updates')

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTablePeople = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
val deltaTablePeopleUpdates = DeltaTable.forPath(spark, "tmp/delta/people-10m-updates")
val dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched
  .updateExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .whenNotMatched
  .insertExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .execute()

Per informazioni dettagliate sulla sintassi scala e Python, vedere la documentazione dell'API Delta Lake. Per informazioni dettagliate sulla sintassi SQL, vedere MERGE INTO

Modificare tutte le righe senza corrispondenza usando l'unione

In Databricks SQL e Databricks Runtime 12.2 LTS e versioni successive è possibile usare la WHEN NOT MATCHED BY SOURCE clausola per UPDATE o DELETE i record nella tabella di destinazione che non contengono record corrispondenti nella tabella di origine. Databricks consiglia di aggiungere una clausola condizionale facoltativa per evitare di riscrivere completamente la tabella di destinazione.

Nell'esempio di codice seguente viene illustrata la sintassi di base dell'utilizzo di questo oggetto per le eliminazioni, sovrascrivendo la tabella di destinazione con il contenuto della tabella di origine ed eliminando record non corrispondenti nella tabella di destinazione. Per un modello più scalabile per le tabelle in cui gli aggiornamenti e le eliminazioni di origine sono associati al tempo, vedere Sincronizzare in modo incrementale la tabella Delta con l'origine.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

Nell'esempio seguente vengono aggiunte condizioni alla WHEN NOT MATCHED BY SOURCE clausola e vengono specificati i valori da aggiornare in righe di destinazione non corrispondenti.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdate(
    set = {"target.lastSeen": "source.timestamp"}
  )
  .whenNotMatchedInsert(
    values = {
      "target.key": "source.key",
      "target.lastSeen": "source.timestamp",
      "target.status": "'active'"
    }
  )
  .whenNotMatchedBySourceUpdate(
    condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
    set = {"target.status": "'inactive'"}
  )
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateExpr(Map("target.lastSeen" -> "source.timestamp"))
  .whenNotMatched()
  .insertExpr(Map(
    "target.key" -> "source.key",
    "target.lastSeen" -> "source.timestamp",
    "target.status" -> "'active'",
    )
  )
  .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
  .updateExpr(Map("target.status" -> "'inactive'"))
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
  INSERT (key, lastSeen, status) VALUES (source.key,  source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
  UPDATE SET target.status = 'inactive'

Semantica dell'operazione di merge

Di seguito è riportata una descrizione dettagliata della semantica dell'operazione merge a livello di codice.

  • Può essere presente un numero qualsiasi di whenMatched clausole e whenNotMatched .

  • whenMatched le clausole vengono eseguite quando una riga di origine corrisponde a una riga della tabella di destinazione in base alla condizione di corrispondenza. Queste clausole hanno la semantica seguente.

    • whenMatched le clausole possono avere al massimo una update e un'azione delete . L'azione update in merge aggiorna solo le colonne specificate (analogamente all'operazioneupdate) della riga di destinazione corrispondente. L'azione delete elimina la riga corrispondente.

    • Ogni whenMatched clausola può avere una condizione facoltativa. Se questa condizione di clausola esiste, l'azione update o delete viene eseguita per qualsiasi coppia di righe di destinazione di origine corrispondente solo quando la condizione della clausola è true.

    • Se sono presenti più whenMatched clausole, vengono valutate nell'ordine in cui vengono specificate. Tutte le whenMatched clausole, ad eccezione dell'ultima, devono avere condizioni.

    • Se nessuna delle condizioni restituisce true per una coppia di righe di whenMatched origine e di destinazione corrispondente alla condizione di unione, la riga di destinazione rimane invariata.

    • Per aggiornare tutte le colonne della tabella Delta di destinazione con le colonne corrispondenti del set di dati di origine, usare whenMatched(...).updateAll(). Equivale a:

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      per tutte le colonne della tabella Delta di destinazione. Pertanto, questa azione presuppone che la tabella di origine abbia le stesse colonne della tabella di destinazione. In caso contrario, la query genera un errore di analisi.

      Nota

      Questo comportamento cambia quando è abilitata la migrazione automatica dello schema. Per informazioni dettagliate, vedere l'evoluzione automatica dello schema.

  • whenNotMatched le clausole vengono eseguite quando una riga di origine non corrisponde ad alcuna riga di destinazione in base alla condizione di corrispondenza. Queste clausole hanno la semantica seguente.

    • whenNotMatched le clausole possono avere solo l'azione insert . La nuova riga viene generata in base alla colonna specificata e alle espressioni corrispondenti. Non è necessario specificare tutte le colonne nella tabella di destinazione. Per le colonne di destinazione non specificate, NULL viene inserito .

    • Ogni whenNotMatched clausola può avere una condizione facoltativa. Se la condizione della clausola è presente, viene inserita una riga di origine solo se tale condizione è true per tale riga. In caso contrario, la colonna di origine viene ignorata.

    • Se sono presenti più whenNotMatched clausole, vengono valutate nell'ordine in cui vengono specificate. Tutte le whenNotMatched clausole, ad eccezione dell'ultima, devono avere condizioni.

    • Per inserire tutte le colonne della tabella Delta di destinazione con le colonne corrispondenti del set di dati di origine, usare whenNotMatched(...).insertAll(). Equivale a:

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      per tutte le colonne della tabella Delta di destinazione. Pertanto, questa azione presuppone che la tabella di origine abbia le stesse colonne della tabella di destinazione. In caso contrario, la query genera un errore di analisi.

      Nota

      Questo comportamento cambia quando è abilitata la migrazione automatica dello schema. Per informazioni dettagliate, vedere l'evoluzione automatica dello schema.

  • whenNotMatchedBySource le clausole vengono eseguite quando una riga di destinazione non corrisponde ad alcuna riga di origine in base alla condizione di merge. Queste clausole hanno la semantica seguente.

    • whenNotMatchedBySource le clausole possono specificare delete e update azioni.
    • Ogni whenNotMatchedBySource clausola può avere una condizione facoltativa. Se la condizione della clausola è presente, una riga di destinazione viene modificata solo se tale condizione è true per tale riga. In caso contrario, la riga di destinazione rimane invariata.
    • Se sono presenti più whenNotMatchedBySource clausole, vengono valutate nell'ordine in cui vengono specificate. Tutte le whenNotMatchedBySource clausole, ad eccezione dell'ultima, devono avere condizioni.
    • Per definizione, whenNotMatchedBySource le clausole non hanno una riga di origine da cui eseguire il pull dei valori delle colonne e quindi non è possibile fare riferimento alle colonne di origine. Per ogni colonna da modificare, è possibile specificare un valore letterale o eseguire un'azione nella colonna di destinazione, ad esempio SET target.deleted_count = target.deleted_count + 1.

Importante

  • Un'operazione merge può non riuscire se più righe del set di dati di origine corrispondono e il merge tenta di aggiornare le stesse righe della tabella Delta di destinazione. In base alla semantica SQL di merge, tale operazione di aggiornamento è ambigua perché non è chiaro quale riga di origine deve essere usata per aggiornare la riga di destinazione corrispondente. È possibile pre-elaborare la tabella di origine per eliminare la possibilità di più corrispondenze.
  • È possibile applicare un'operazione SQL MERGE in una VISTA SQL solo se la vista è stata definita come CREATE VIEW viewName AS SELECT * FROM deltaTable.

Deduplicazione dei dati durante la scrittura in tabelle Delta

Un caso d'uso ETL comune consiste nel raccogliere i log nella tabella Delta aggiungendoli a una tabella. Tuttavia, spesso le origini possono generare record di log duplicati e i passaggi di deduplicazione downstream sono necessari per gestirli. Con mergeè possibile evitare di inserire i record duplicati.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

Scala

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()

Java

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

Nota

Il set di dati contenente i nuovi log deve essere deduplicato all'interno di se stesso. Dalla semantica SQL di merge, corrisponde e deduplica i nuovi dati con i dati esistenti nella tabella, ma se sono presenti dati duplicati all'interno del nuovo set di dati, vengono inseriti. Di conseguenza, deduplicare i nuovi dati prima di eseguire l'unione nella tabella.

Se si sa che è possibile ottenere record duplicati solo per alcuni giorni, è possibile ottimizzare ulteriormente la query partizionando la tabella in base alla data e quindi specificando l'intervallo di date della tabella di destinazione su cui trovare la corrispondenza.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()

Scala

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()

Java

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

Questa operazione è più efficiente del comando precedente perché cerca duplicati solo negli ultimi 7 giorni di log, non nell'intera tabella. Inoltre, è possibile usare questo merge di solo inserimento con Structured Streaming per eseguire la deduplicazione continua dei log.

  • In una query di streaming è possibile usare l'operazione di unione in foreachBatch per scrivere continuamente tutti i dati di streaming in una tabella Delta con deduplicazione. Per altre informazioni su , vedere l'esempio di streaming seguente.foreachBatch
  • In un'altra query di streaming è possibile leggere continuamente i dati deduplicati da questa tabella Delta. Ciò è possibile perché un'unione di sola inserimento aggiunge nuovi dati alla tabella Delta.

Dati a modifica lenta (SCD) e Change Data Capture (CDC) con Delta Lake

Le tabelle live delta supportano in modo nativo il rilevamento e l'applicazione di scD type 1 e type 2. Usare APPLY CHANGES INTO con le tabelle live Delta per assicurarsi che i record non in ordine vengano gestiti correttamente durante l'elaborazione dei feed CDC. Vedere APPLY CHANGES API :Semplificare Change Data Capture in Tabelle Live Delta.

Sincronizzare in modo incrementale la tabella Delta con l'origine

In Databricks SQL e Databricks Runtime 12.2 LTS e versioni successive è possibile usare WHEN NOT MATCHED BY SOURCE per creare condizioni arbitrarie per eliminare e sostituire in modo atomico una parte di una tabella. Ciò può essere particolarmente utile quando si dispone di una tabella di origine in cui i record possono cambiare o essere eliminati per diversi giorni dopo l'immissione iniziale dei dati, ma alla fine si risolvano in uno stato finale.

La query seguente illustra l'uso di questo modello per selezionare 5 giorni di record dall'origine, aggiornare i record corrispondenti nella destinazione, inserire nuovi record dall'origine alla destinazione ed eliminare tutti i record non corrispondenti degli ultimi 5 giorni nella destinazione.

MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE

Fornendo lo stesso filtro booleano sulle tabelle di origine e di destinazione, è possibile propagare dinamicamente le modifiche dall'origine alle tabelle di destinazione, incluse le eliminazioni.

Nota

Anche se questo modello può essere usato senza clausole condizionali, ciò comporta la riscrittura completa della tabella di destinazione che può risultare costosa.