Executar upsert em uma tabela Delta Lake usando mesclagem
Você pode fazer upsert de dados de uma tabela de origem, exibição ou DataFrame em uma tabela Delta de destino usando a operação MERGE
SQL. O Delta Lake dá suporte a inserções, atualizações e exclusões no MERGE
e dá suporte à sintaxe estendida além dos padrões SQL para facilitar casos de uso avançados.
Suponha que você tenha uma tabela de origem chamada people10mupdates
ou um caminho de origem em /tmp/delta/people-10m-updates
que contenha novos dados para uma tabela de destino chamada people10m
ou um caminho de destino em /tmp/delta/people-10m
. Alguns desses novos registros podem já estar presentes nos dados de destino. Para mesclar os novos dados, você deseja atualizar as linhas em que a pessoa id
já está presente e inserir as novas linhas em que nenhuma correspondência id
está presente. Execute a consulta a seguir:
SQL
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
)
Python
from delta.tables import *
deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.whenNotMatched
.insertExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.execute()
Importante
Somente uma única linha da tabela de origem pode corresponder a uma determinada linha na tabela de destino. No Databricks Runtime 16.0 e superior, MERGE
avalia as condições especificadas nas WHEN MATCHED
cláusulas and ON
para determinar correspondências duplicadas. No Databricks Runtime 15.4 LTS e inferior, MERGE
as operações consideram apenas as condições especificadas na ON
cláusula.
Confira a documentação da API do Delta Lake para obter detalhes da sintaxe de Scala e Python. Para obter detalhes de sintaxe do SQL, confira MERGE INTO
Modificar todas as linhas sem correspondência usando mesclagem
No Databricks SQL e no Databricks Runtime 12.2 LTS e superior, é possível usar a cláusula WHEN NOT MATCHED BY SOURCE
para UPDATE
ou DELETE
registros na tabela de destino que não tenham registros correspondentes na tabela de origem. O Databricks recomenda adicionar uma cláusula condicional opcional para evitar reescrever totalmente a tabela de destino.
O exemplo de código a seguir mostra a sintaxe básica desse uso para exclusões, substituindo a tabela de destino pelo conteúdo da tabela de origem e excluindo registros não correspondentes na tabela de destino. Para obter um padrão mais escalonável para tabelas em que as atualizações e exclusões de origem estão associadas ao tempo, consulte Sincronizar incrementalmente a tabela Delta com a origem.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
O exemplo a seguir adiciona condições à cláusula WHEN NOT MATCHED BY SOURCE
e especifica valores a serem atualizados em linhas de destino sem correspondência.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdate(
set = {"target.lastSeen": "source.timestamp"}
)
.whenNotMatchedInsert(
values = {
"target.key": "source.key",
"target.lastSeen": "source.timestamp",
"target.status": "'active'"
}
)
.whenNotMatchedBySourceUpdate(
condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
set = {"target.status": "'inactive'"}
)
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateExpr(Map("target.lastSeen" -> "source.timestamp"))
.whenNotMatched()
.insertExpr(Map(
"target.key" -> "source.key",
"target.lastSeen" -> "source.timestamp",
"target.status" -> "'active'",
)
)
.whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
.updateExpr(Map("target.status" -> "'inactive'"))
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
UPDATE SET target.status = 'inactive'
Semântica de operação de mesclagem
Aqui está uma descrição detalhada da semântica de operação programática merge
.
Pode haver qualquer número de cláusulas
whenMatched
ewhenNotMatched
.As cláusulas
whenMatched
são executadas quando uma linha de origem corresponde a uma linha da tabela de destino com base na condição de correspondência. Essas cláusulas têm a semântica a seguir.whenMatched
as cláusulas podem ter no máximo umaupdate
e uma açãodelete
. A açãoupdate
emmerge
atualiza apenas as colunas especificadas (semelhantes àupdate
operação) da linha de destino de acordo. Adelete
ação exclui a linha correspondente.Cada cláusula
whenMatched
pode ter uma condição opcional. Se essa condição de cláusula existir, a açãoupdate
oudelete
será executada para qualquer linha correspondente de pares de linhas de destino de origem somente quando a condição de cláusula for verdadeira.Se houver várias cláusulas
whenMatched
, elas serão avaliadas na ordem em que são especificadas. Todas as cláusulaswhenMatched
, exceto a última, devem ter condições.Se nenhuma das condições
whenMatched
for avaliada como verdadeira para um par de linhas de origem e de destino que corresponde à condição de mesclagem, a linha de destino será deixada inalterada.Para atualizar todas as colunas da tabela Delta de destino com as colunas correspondentes do conjunto de fonte de origem, use
whenMatched(...).updateAll()
. Isso é o mesmo que:whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
Para todas as colunas da tabela Delta de destino. Portanto, essa ação pressupõe que a tabela de origem tenha as mesmas colunas que as da tabela de destino, caso contrário, a consulta gera um erro de análise.
Observação
Esse comportamento muda quando a migração automática de esquema está habilitada. Confira Evolução do esquema automático para obter detalhes.
As cláusulas
whenNotMatched
são executadas quando uma linha de origem não corresponde a uma linha de destino com base na condição de correspondência. Essas cláusulas têm a semântica a seguir.whenNotMatched
Cláusulas podem ter apenas a açãoinsert
. A nova linha é gerada com base na coluna especificada e nas expressões correspondentes. Você não precisa especificar todas as colunas na tabela de destino. Para colunas de destino não especificadas,NULL
é inserido.Cada cláusula
whenNotMatched
pode ter uma condição opcional. Se a condição da cláusula estiver presente, uma linha de origem será inserida somente se essa condição for verdadeira para essa linha. Caso contrário, a coluna de origem será ignorada.Se houver várias cláusulas
whenNotMatched
, elas serão avaliadas na ordem em que são especificadas. Todas as cláusulaswhenNotMatched
, exceto a última, devem ter condições.Para inserir todas as colunas da tabela Delta de destino com as colunas correspondentes do conjunto de fonte de origem, use
whenNotMatched(...).insertAll()
. Isso é o mesmo que:whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
Para todas as colunas da tabela Delta de destino. Portanto, essa ação pressupõe que a tabela de origem tenha as mesmas colunas que as da tabela de destino, caso contrário, a consulta gera um erro de análise.
Observação
Esse comportamento muda quando a migração automática de esquema está habilitada. Confira Evolução do esquema automático para obter detalhes.
As cláusulas
whenNotMatchedBySource
são executadas quando uma linha de destino não corresponde a uma linha de origem com base na condição de mesclagem. Essas cláusulas têm a semântica a seguir.- As cláusulas
whenNotMatchedBySource
podem especificar as açõesdelete
eupdate
. - Cada cláusula
whenNotMatchedBySource
pode ter uma condição opcional. Se a condição da cláusula estiver presente, uma linha de destino será modificada somente se essa condição for verdadeira para essa linha. Caso contrário, a linha de destino será deixada inalterada. - Se houver várias cláusulas
whenNotMatchedBySource
, elas serão avaliadas na ordem em que são especificadas. Todas as cláusulaswhenNotMatchedBySource
, exceto a última, devem ter condições. - Por definição, as cláusulas
whenNotMatchedBySource
não têm uma linha de origem para extrair valores de coluna e, portanto, as colunas de origem não podem ser referenciadas. Para cada coluna a ser modificada, você pode especificar um literal ou executar uma ação na coluna de destino, comoSET target.deleted_count = target.deleted_count + 1
.
- As cláusulas
Importante
- Uma operação
merge
poderá falhar se várias linhas do conjunto de registros de origem corresponderem e a mesclagem tentar atualizar as mesmas linhas da tabela Delta de destino. Conforme a semântica de SQL de mesclagem, essa operação de atualização é ambígua, pois não é claro qual linha de origem deve ser usada para atualizar a linha de destino correspondente. É possível pré-processar a tabela de origem para eliminar a possibilidade de várias correspondências. - Você poderá aplicar uma operação de SQL
MERGE
em uma EXIBIÇÃO DE SQL somente se a exibição tiver sido definida comoCREATE VIEW viewName AS SELECT * FROM deltaTable
.
Desduplicação de dados ao gravar em tabelas Delta
Um caso de uso comum de ETL é coletar logs na tabela Delta anexando-os a uma tabela. No entanto, geralmente as fontes podem gerar registros de log duplicados e etapas de eliminação de duplicação de downstream são necessárias para cuidar deles. Com o merge
, você pode evitar inserir os registros duplicados.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
Scala
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
Java
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
Observação
O conjunto de registros que contém os novos logs precisa ser desduplicado em si mesmo. Pela semântica SQL de mesclagem, ela corresponde e elimina a duplicação dos novos dados com os dados existentes na tabela, mas, se houver dados duplicados dentro do novo conjunto de dados, ele será inserido. Portanto, desduplique os novos dados antes de mesclá-los na tabela.
Se você sabe que poderá obter registros duplicados apenas por alguns dias, poderá otimizar ainda mais sua consulta particionando a tabela por data e, em seguida, especificando o intervalo de datas da tabela de destino para correspondência.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
Scala
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
Java
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
Isso é mais eficiente do que o comando anterior, pois ele procura duplicatas somente nos últimos 7 dias de logs, não na tabela inteira. Além disso, você pode usar essa mesclagem somente inserção com o streaming estruturado para executar a eliminação de duplicação contínua dos logs.
- Em uma consulta de streaming, você pode usar a operação de mesclagem no
foreachBatch
para gravar continuamente todos os dados de streaming em uma tabela Delta com eliminação de duplicação. Consulte o exemplo de transmissão a seguir para obter mais informações sobre oforeachBatch
. - Em outra consulta de streaming, você pode ler continuamente os dados com eliminação de duplicação dessa tabela Delta. Isso é possível porque uma mesclagem somente inserção acrescenta novos dados à tabela Delta.
SCD (dados de alteração lenta) e CDC (captura de dados de alterações) com o Delta Lake
O Delta Live Tables tem suporte nativo para acompanhar e aplicar o SCD Tipo 1 e Tipo 2. Use APPLY CHANGES INTO
com o Delta Live Tables para garantir que os registros fora de ordem sejam tratados corretamente ao processar feeds CDC. Confira As APIs APPLY CHANGES: Simplifique a captura de dados de alterações com Delta Live Tables.
Sincronizar incrementalmente a tabela Delta com a origem
No Databricks SQL e no Databricks Runtime 12.2 LTS e superior, é possível usar WHEN NOT MATCHED BY SOURCE
para criar condições arbitrárias para excluir e substituir atomicamente uma parte de uma tabela. Isso pode ser especialmente útil quando você tem uma tabela de origem em que os registros podem ser alterados ou excluídos por vários dias após a entrada de dados inicial, mas eventualmente se estabelecer em um estado final.
A consulta a seguir mostra como usar esse padrão para selecionar 5 dias de registros da origem, atualizar registros correspondentes no destino, inserir novos registros da origem para o destino e excluir todos os registros sem correspondência dos últimos 5 dias no destino.
MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE
Ao fornecer o mesmo filtro booleano nas tabelas de origem e destino, você pode propagar dinamicamente as alterações das tabelas de origem para as de destino, incluindo exclusões.
Observação
Embora esse padrão possa ser usado sem cláusulas condicionais, isso levaria à reescrita completa da tabela de destino, o que pode ser caro.