Partilhar via


Upsert em uma tabela Delta Lake usando mesclagem

Você pode atualizar dados de uma tabela de origem, exibição ou DataFrame para uma tabela Delta de destino usando a MERGE operação SQL. O Delta Lake oferece suporte a inserções, atualizações e exclusões no MERGE, e oferece suporte à sintaxe estendida além dos padrões SQL para facilitar casos de uso avançados.

Suponha que você tenha uma tabela de origem nomeada people10mupdates ou um caminho de origem em /tmp/delta/people-10m-updates que contenha novos dados para uma tabela de destino nomeada people10m ou um caminho de destino em /tmp/delta/people-10m. Alguns destes novos registos podem já estar presentes nos dados de destino. Para mesclar os novos dados, você deseja atualizar as linhas onde a pessoa id já está presente e inserir as novas linhas onde nenhuma correspondência id está presente. Você pode executar a seguinte consulta:

SQL

MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName,
    middleName = people10mupdates.middleName,
    lastName = people10mupdates.lastName,
    gender = people10mupdates.gender,
    birthDate = people10mupdates.birthDate,
    ssn = people10mupdates.ssn,
    salary = people10mupdates.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    firstName,
    middleName,
    lastName,
    gender,
    birthDate,
    ssn,
    salary
  )
  VALUES (
    people10mupdates.id,
    people10mupdates.firstName,
    people10mupdates.middleName,
    people10mupdates.lastName,
    people10mupdates.gender,
    people10mupdates.birthDate,
    people10mupdates.ssn,
    people10mupdates.salary
  )

Python

from delta.tables import *

deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched
  .updateExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .whenNotMatched
  .insertExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .execute()

Importante

Apenas uma única linha da tabela de origem pode corresponder a uma determinada linha na tabela de destino. No Databricks Runtime 16.0 e superior, MERGE avalia as WHEN MATCHED condições especificadas nas cláusulas e ON para determinar correspondências duplicadas. No Databricks Runtime 15.4 LTS e inferior, MERGE as operações consideram apenas as condições especificadas na ON cláusula.

Consulte a documentação da API Delta Lake para obter detalhes de sintaxe Scala e Python. Para obter detalhes da sintaxe SQL, consulte MERGE INTO

Modificar todas as linhas incomparáveis usando mesclagem

No Databricks SQL e no Databricks Runtime 12.2 LTS e superior, você pode usar a WHEN NOT MATCHED BY SOURCE cláusula ou UPDATE DELETE registros na tabela de destino que não tenham registros correspondentes na tabela de origem. O Databricks recomenda a adição de uma cláusula condicional opcional para evitar a reescrita completa da tabela de destino.

O exemplo de código a seguir mostra a sintaxe básica de usar isso para exclusões, substituindo a tabela de destino pelo conteúdo da tabela de origem e excluindo registros incomparáveis na tabela de destino. Para obter um padrão mais escalável para tabelas em que as atualizações e exclusões de origem têm limite de tempo, consulte Sincronização incremental da tabela Delta com a origem.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

O exemplo a WHEN NOT MATCHED BY SOURCE seguir adiciona condições à cláusula e especifica valores a serem atualizados em linhas de destino incomparáveis.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdate(
    set = {"target.lastSeen": "source.timestamp"}
  )
  .whenNotMatchedInsert(
    values = {
      "target.key": "source.key",
      "target.lastSeen": "source.timestamp",
      "target.status": "'active'"
    }
  )
  .whenNotMatchedBySourceUpdate(
    condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
    set = {"target.status": "'inactive'"}
  )
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateExpr(Map("target.lastSeen" -> "source.timestamp"))
  .whenNotMatched()
  .insertExpr(Map(
    "target.key" -> "source.key",
    "target.lastSeen" -> "source.timestamp",
    "target.status" -> "'active'",
    )
  )
  .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
  .updateExpr(Map("target.status" -> "'inactive'"))
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
  INSERT (key, lastSeen, status) VALUES (source.key,  source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
  UPDATE SET target.status = 'inactive'

Semântica da operação de mesclagem

Segue-se uma descrição detalhada da semântica da merge operação programática.

  • Pode haver qualquer número de whenMatched e whenNotMatched cláusulas.

  • whenMatched As cláusulas são executadas quando uma linha de origem corresponde a uma linha da tabela de destino com base na condição de correspondência. Estas cláusulas têm a seguinte semântica.

    • whenMatched as cláusulas podem ter, no máximo, uma update única delete ação. A update ação em merge apenas atualiza as colunas especificadas (semelhante à update operação) da linha de destino correspondente. A delete ação exclui a linha correspondente.

    • Cada whenMatched cláusula pode ter uma condição facultativa. Se essa condição de cláusula existir, a ação ou delete será executada update para qualquer par de linha origem-destino correspondente somente quando a condição da cláusula for verdadeira.

    • Se houver várias whenMatched cláusulas, elas são avaliadas na ordem em que são especificadas. Todas as whenMatched cláusulas, exceto a última, devem ter condições.

    • Se nenhuma das whenMatched condições for avaliada como verdadeira para um par de linhas de origem e destino que corresponda à condição de mesclagem, a linha de destino será mantida inalterada.

    • Para atualizar todas as colunas da tabela Delta de destino com as colunas correspondentes do conjunto de dados de origem, use whenMatched(...).updateAll(). Isto é equivalente a:

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      para todas as colunas da tabela Delta de destino. Portanto, essa ação pressupõe que a tabela de origem tenha as mesmas colunas que as da tabela de destino, caso contrário, a consulta lançará um erro de análise.

      Nota

      Esse comportamento muda quando a evolução automática do esquema está habilitada. Consulte a evolução automática do esquema para obter detalhes.

  • whenNotMatched As cláusulas são executadas quando uma linha de origem não corresponde a nenhuma linha de destino com base na condição de correspondência. Estas cláusulas têm a seguinte semântica.

    • whenNotMatched cláusulas só podem ter a insert ação. A nova linha é gerada com base na coluna especificada e nas expressões correspondentes. Não é necessário especificar todas as colunas na tabela de destino. Para colunas de destino não especificadas, NULL é inserido.

    • Cada whenNotMatched cláusula pode ter uma condição facultativa. Se a condição da cláusula estiver presente, uma linha de origem será inserida somente se essa condição for verdadeira para essa linha. Caso contrário, a coluna de origem será ignorada.

    • Se houver várias whenNotMatched cláusulas, elas são avaliadas na ordem em que são especificadas. Todas as whenNotMatched cláusulas, exceto a última, devem ter condições.

    • Para inserir todas as colunas da tabela Delta de destino com as colunas correspondentes do conjunto de dados de origem, use whenNotMatched(...).insertAll(). Isto é equivalente a:

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      para todas as colunas da tabela Delta de destino. Portanto, essa ação pressupõe que a tabela de origem tenha as mesmas colunas que as da tabela de destino, caso contrário, a consulta lançará um erro de análise.

      Nota

      Esse comportamento muda quando a evolução automática do esquema está habilitada. Consulte a evolução automática do esquema para obter detalhes.

  • whenNotMatchedBySource As cláusulas são executadas quando uma linha de destino não corresponde a nenhuma linha de origem com base na condição de mesclagem. Estas cláusulas têm a seguinte semântica.

    • whenNotMatchedBySource cláusulas podem especificar delete e update ações.
    • Cada whenNotMatchedBySource cláusula pode ter uma condição facultativa. Se a condição da cláusula estiver presente, uma linha de destino será modificada somente se essa condição for verdadeira para essa linha. Caso contrário, a linha de destino será mantida inalterada.
    • Se houver várias whenNotMatchedBySource cláusulas, elas são avaliadas na ordem em que são especificadas. Todas as whenNotMatchedBySource cláusulas, exceto a última, devem ter condições.
    • Por definição, whenNotMatchedBySource as cláusulas não têm uma linha de origem da qual extrair valores de coluna e, portanto, as colunas de origem não podem ser referenciadas. Para cada coluna a ser modificada, você pode especificar um literal ou executar uma ação na coluna de destino, como SET target.deleted_count = target.deleted_count + 1.

Importante

  • Uma merge operação pode falhar se várias linhas do conjunto de dados de origem corresponderem e a mesclagem tentar atualizar as mesmas linhas da tabela Delta de destino. De acordo com a semântica SQL da mesclagem, essa operação de atualização é ambígua, pois não está claro qual linha de origem deve ser usada para atualizar a linha de destino correspondente. Você pode pré-processar a tabela de origem para eliminar a possibilidade de várias correspondências.
  • Você pode aplicar uma operação SQL MERGE em um SQL VIEW somente se o modo de exibição tiver sido definido como CREATE VIEW viewName AS SELECT * FROM deltaTable.

Desduplicação de dados ao gravar em tabelas Delta

Um caso de uso comum de ETL é coletar logs na tabela Delta anexando-os a uma tabela. No entanto, muitas vezes as fontes podem gerar registros de log duplicados e etapas de desduplicação a jusante são necessárias para cuidar deles. Com mergeo , você pode evitar inserir os registros duplicados.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

Scala

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()

Java

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

Nota

O conjunto de dados que contém os novos logs precisa ser desduplicado dentro de si mesmo. Pela semântica SQL de mesclagem, ele corresponde e desduplica os novos dados com os dados existentes na tabela, mas se houver dados duplicados dentro do novo conjunto de dados, ele é inserido. Portanto, desduplique os novos dados antes de mesclar na tabela.

Se você souber que pode obter registros duplicados apenas por alguns dias, poderá otimizar ainda mais sua consulta particionando a tabela por data e, em seguida, especificando o intervalo de datas da tabela de destino a ser correspondida.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()

Scala

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()

Java

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

Isso é mais eficiente do que o comando anterior, pois procura duplicatas apenas nos últimos 7 dias de logs, não a tabela inteira. Além disso, você pode usar essa mesclagem somente inserção com o Streaming estruturado para executar a desduplicação contínua dos logs.

  • Em uma consulta de streaming, você pode usar a operação de mesclagem para gravar continuamente quaisquer dados de streaming em foreachBatch uma tabela Delta com desduplicação. Consulte o exemplo de streaming a seguir para obter mais informações sobre foreachBatch.
  • Em outra consulta de streaming, você pode ler continuamente dados desduplicados desta tabela Delta. Isso é possível porque uma mesclagem somente inserção acrescenta novos dados à tabela Delta.

Alteração lenta de dados (SCD) e captura de dados de alteração (CDC) com Delta Lake

Delta Live Tables tem suporte nativo para rastrear e aplicar SCD Tipo 1 e Tipo 2. Use APPLY CHANGES INTO com Delta Live Tables para garantir que os registros fora de ordem sejam manipulados corretamente ao processar feeds CDC. Consulte As APIs APPLY CHANGES: Simplifique a captura de dados de alteração com o Delta Live Tables.

Sincronização incremental da tabela Delta com a origem

No Databricks SQL e no Databricks Runtime 12.2 LTS e superior, você pode usar WHEN NOT MATCHED BY SOURCE para criar condições arbitrárias para excluir e substituir atomicamente uma parte de uma tabela. Isso pode ser especialmente útil quando você tem uma tabela de origem onde os registros podem ser alterados ou excluídos por vários dias após a entrada inicial de dados, mas eventualmente se estabelecem em um estado final.

A consulta a seguir mostra o uso desse padrão para selecionar 5 dias de registros da origem, atualizar os registros correspondentes no destino, inserir novos registros da origem para o destino e excluir todos os registros incompatíveis dos últimos 5 dias no destino.

MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE

Ao fornecer o mesmo filtro booleano nas tabelas de origem e de destino, você pode propagar dinamicamente as alterações das tabelas de origem para as de destino, incluindo exclusões.

Nota

Embora este padrão possa ser usado sem quaisquer cláusulas condicionais, isso levaria a uma reformulação completa da tabela de destino, o que pode ser caro.