Executar upsert em uma tabela Delta Lake usando mesclagem

Você pode fazer upsert de dados de uma tabela de origem, exibição ou DataFrame em uma tabela Delta de destino usando a operação MERGE SQL. O Delta Lake dá suporte a inserções, atualizações e exclusões no MERGE e dá suporte à sintaxe estendida além dos padrões SQL para facilitar casos de uso avançados.

Suponha que você tenha uma tabela de origem chamada people10mupdates ou um caminho de origem em /tmp/delta/people-10m-updates que contenha novos dados para uma tabela de destino chamada people10m ou um caminho de destino em /tmp/delta/people-10m. Alguns desses novos registros podem já estar presentes nos dados de destino. Para mesclar os novos dados, você deseja atualizar as linhas em que a pessoa id já está presente e inserir as novas linhas em que nenhuma correspondência id está presente. Execute a consulta a seguir:

SQL

MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName,
    middleName = people10mupdates.middleName,
    lastName = people10mupdates.lastName,
    gender = people10mupdates.gender,
    birthDate = people10mupdates.birthDate,
    ssn = people10mupdates.ssn,
    salary = people10mupdates.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    firstName,
    middleName,
    lastName,
    gender,
    birthDate,
    ssn,
    salary
  )
  VALUES (
    people10mupdates.id,
    people10mupdates.firstName,
    people10mupdates.middleName,
    people10mupdates.lastName,
    people10mupdates.gender,
    people10mupdates.birthDate,
    people10mupdates.ssn,
    people10mupdates.salary
  )

Python

from delta.tables import *

deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-10m-updates')

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTablePeople = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
val deltaTablePeopleUpdates = DeltaTable.forPath(spark, "tmp/delta/people-10m-updates")
val dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched
  .updateExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .whenNotMatched
  .insertExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .execute()

Confira a documentação da API do Delta Lake para obter detalhes da sintaxe de Scala e Python. Para obter detalhes de sintaxe do SQL, confira MERGE INTO

Modificar todas as linhas sem correspondência usando mesclagem

No Databricks SQL e no Databricks Runtime 12.2 LTS e superior, é possível usar a cláusula WHEN NOT MATCHED BY SOURCE para UPDATE ou DELETE registros na tabela de destino que não tenham registros correspondentes na tabela de origem. O Databricks recomenda adicionar uma cláusula condicional opcional para evitar reescrever totalmente a tabela de destino.

O exemplo de código a seguir mostra a sintaxe básica desse uso para exclusões, substituindo a tabela de destino pelo conteúdo da tabela de origem e excluindo registros não correspondentes na tabela de destino. Para obter um padrão mais escalonável para tabelas em que as atualizações e exclusões de origem estão associadas ao tempo, consulte Sincronizar incrementalmente a tabela Delta com a origem.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

O exemplo a seguir adiciona condições à cláusula WHEN NOT MATCHED BY SOURCE e especifica valores a serem atualizados em linhas de destino sem correspondência.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdate(
    set = {"target.lastSeen": "source.timestamp"}
  )
  .whenNotMatchedInsert(
    values = {
      "target.key": "source.key",
      "target.lastSeen": "source.timestamp",
      "target.status": "'active'"
    }
  )
  .whenNotMatchedBySourceUpdate(
    condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
    set = {"target.status": "'inactive'"}
  )
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateExpr(Map("target.lastSeen" -> "source.timestamp"))
  .whenNotMatched()
  .insertExpr(Map(
    "target.key" -> "source.key",
    "target.lastSeen" -> "source.timestamp",
    "target.status" -> "'active'",
    )
  )
  .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
  .updateExpr(Map("target.status" -> "'inactive'"))
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
  INSERT (key, lastSeen, status) VALUES (source.key,  source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
  UPDATE SET target.status = 'inactive'

Semântica de operação de mesclagem

Aqui está uma descrição detalhada da semântica de operação programática merge.

  • Pode haver qualquer número de cláusulas whenMatched e whenNotMatched.

  • As cláusulas whenMatched são executadas quando uma linha de origem corresponde a uma linha da tabela de destino com base na condição de correspondência. Essas cláusulas têm a semântica a seguir.

    • whenMatched as cláusulas podem ter no máximo uma update e uma ação delete. A update ação em merge atualiza apenas as colunas especificadas (semelhantes à updateoperação) da linha de destino de acordo. A delete ação exclui a linha correspondente.

    • Cada cláusula whenMatched pode ter uma condição opcional. Se essa condição de cláusula existir, a ação update ou delete será executada para qualquer linha correspondente de pares de linhas de destino de origem somente quando a condição de cláusula for verdadeira.

    • Se houver várias cláusulas whenMatched, elas serão avaliadas na ordem em que são especificadas. Todas as cláusulas whenMatched, exceto a última, devem ter condições.

    • Se nenhuma das condições whenMatched for avaliada como verdadeira para um par de linhas de origem e de destino que corresponde à condição de mesclagem, a linha de destino será deixada inalterada.

    • Para atualizar todas as colunas da tabela Delta de destino com as colunas correspondentes do conjunto de fonte de origem, use whenMatched(...).updateAll(). Isso é o mesmo que:

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      Para todas as colunas da tabela Delta de destino. Portanto, essa ação pressupõe que a tabela de origem tenha as mesmas colunas que as da tabela de destino, caso contrário, a consulta gera um erro de análise.

      Observação

      Esse comportamento muda quando a migração automática de esquema está habilitada. Confira Evolução do esquema automático para obter detalhes.

  • As cláusulas whenNotMatched são executadas quando uma linha de origem não corresponde a uma linha de destino com base na condição de correspondência. Essas cláusulas têm a semântica a seguir.

    • whenNotMatched Cláusulas podem ter apenas a ação insert. A nova linha é gerada com base na coluna especificada e nas expressões correspondentes. Você não precisa especificar todas as colunas na tabela de destino. Para colunas de destino não especificadas, NULL é inserido.

    • Cada cláusula whenNotMatched pode ter uma condição opcional. Se a condição da cláusula estiver presente, uma linha de origem será inserida somente se essa condição for verdadeira para essa linha. Caso contrário, a coluna de origem será ignorada.

    • Se houver várias cláusulas whenNotMatched, elas serão avaliadas na ordem em que são especificadas. Todas as cláusulas whenNotMatched, exceto a última, devem ter condições.

    • Para inserir todas as colunas da tabela Delta de destino com as colunas correspondentes do conjunto de fonte de origem, use whenNotMatched(...).insertAll(). Isso é o mesmo que:

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      Para todas as colunas da tabela Delta de destino. Portanto, essa ação pressupõe que a tabela de origem tenha as mesmas colunas que as da tabela de destino, caso contrário, a consulta gera um erro de análise.

      Observação

      Esse comportamento muda quando a migração automática de esquema está habilitada. Confira Evolução do esquema automático para obter detalhes.

  • As cláusulas whenNotMatchedBySource são executadas quando uma linha de destino não corresponde a uma linha de origem com base na condição de mesclagem. Essas cláusulas têm a semântica a seguir.

    • As cláusulas whenNotMatchedBySource podem especificar as ações delete e update.
    • Cada cláusula whenNotMatchedBySource pode ter uma condição opcional. Se a condição da cláusula estiver presente, uma linha de destino será modificada somente se essa condição for verdadeira para essa linha. Caso contrário, a linha de destino será deixada inalterada.
    • Se houver várias cláusulas whenNotMatchedBySource, elas serão avaliadas na ordem em que são especificadas. Todas as cláusulas whenNotMatchedBySource, exceto a última, devem ter condições.
    • Por definição, as cláusulas whenNotMatchedBySource não têm uma linha de origem para extrair valores de coluna e, portanto, as colunas de origem não podem ser referenciadas. Para cada coluna a ser modificada, você pode especificar um literal ou executar uma ação na coluna de destino, como SET target.deleted_count = target.deleted_count + 1.

Importante

  • Uma operação merge poderá falhar se várias linhas do conjunto de registros de origem corresponderem e a mesclagem tentar atualizar as mesmas linhas da tabela Delta de destino. Conforme a semântica de SQL de mesclagem, essa operação de atualização é ambígua, pois não é claro qual linha de origem deve ser usada para atualizar a linha de destino correspondente. É possível pré-processar a tabela de origem para eliminar a possibilidade de várias correspondências.
  • Você poderá aplicar uma operação de SQL MERGE em uma EXIBIÇÃO DE SQL somente se a exibição tiver sido definida como CREATE VIEW viewName AS SELECT * FROM deltaTable.

Desduplicação de dados ao gravar em tabelas Delta

Um caso de uso comum de ETL é coletar logs na tabela Delta anexando-os a uma tabela. No entanto, geralmente as fontes podem gerar registros de log duplicados e etapas de eliminação de duplicação de downstream são necessárias para cuidar deles. Com o merge, você pode evitar inserir os registros duplicados.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

Scala

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()

Java

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

Observação

O conjunto de registros que contém os novos logs precisa ser desduplicado em si mesmo. Pela semântica SQL de mesclagem, ela corresponde e elimina a duplicação dos novos dados com os dados existentes na tabela, mas, se houver dados duplicados dentro do novo conjunto de dados, ele será inserido. Portanto, desduplique os novos dados antes de mesclá-los na tabela.

Se você sabe que poderá obter registros duplicados apenas por alguns dias, poderá otimizar ainda mais sua consulta particionando a tabela por data e, em seguida, especificando o intervalo de datas da tabela de destino para correspondência.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()

Scala

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()

Java

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

Isso é mais eficiente do que o comando anterior, pois ele procura duplicatas somente nos últimos 7 dias de logs, não na tabela inteira. Além disso, você pode usar essa mesclagem somente inserção com o streaming estruturado para executar a eliminação de duplicação contínua dos logs.

  • Em uma consulta de streaming, você pode usar a operação de mesclagem no foreachBatch para gravar continuamente todos os dados de streaming em uma tabela Delta com eliminação de duplicação. Consulte o exemplo de transmissão a seguir para obter mais informações sobre o foreachBatch.
  • Em outra consulta de streaming, você pode ler continuamente os dados com eliminação de duplicação dessa tabela Delta. Isso é possível porque uma mesclagem somente inserção acrescenta novos dados à tabela Delta.

SCD (dados de alteração lenta) e CDC (captura de dados de alterações) com o Delta Lake

O Delta Live Tables tem suporte nativo para acompanhar e aplicar o SCD Tipo 1 e Tipo 2. Use APPLY CHANGES INTO com o Delta Live Tables para garantir que os registros fora de ordem sejam tratados corretamente ao processar feeds CDC. Confira API APPLY CHANGES: simplificar a captura de dados de alteração no Delta Live Tables.

Sincronizar incrementalmente a tabela Delta com a origem

No Databricks SQL e no Databricks Runtime 12.2 LTS e superior, é possível usar WHEN NOT MATCHED BY SOURCE para criar condições arbitrárias para excluir e substituir atomicamente uma parte de uma tabela. Isso pode ser especialmente útil quando você tem uma tabela de origem em que os registros podem ser alterados ou excluídos por vários dias após a entrada de dados inicial, mas eventualmente se estabelecer em um estado final.

A consulta a seguir mostra como usar esse padrão para selecionar 5 dias de registros da origem, atualizar registros correspondentes no destino, inserir novos registros da origem para o destino e excluir todos os registros sem correspondência dos últimos 5 dias no destino.

MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE

Ao fornecer o mesmo filtro booleano nas tabelas de origem e destino, você pode propagar dinamicamente as alterações das tabelas de origem para as de destino, incluindo exclusões.

Observação

Embora esse padrão possa ser usado sem cláusulas condicionais, isso levaria à reescrita completa da tabela de destino, o que pode ser caro.