Upsert в таблицу Delta Lake с помощью слияния
Данные из исходной таблицы, представления или DataFrame можно передать с помощью операции upsert в целевую таблицу Delta с помощью операции SQL MERGE
. Delta Lake поддерживает вставки, обновления и удаления, MERGE
а также поддерживает расширенный синтаксис за пределами стандартов SQL для упрощения расширенных вариантов использования.
Предположим, у вас есть исходная таблица с именем people10mupdates
или исходный путь /tmp/delta/people-10m-updates
, содержащий новые данные для целевой таблицы с именем people10m
или целевым путем /tmp/delta/people-10m
. Некоторые из этих новых записей уже могут присутствовать в целевых данных. Чтобы объединить новые данные, необходимо обновить строки, в которых уже имеется пользователь id
, и вставить новые строки, в которых нет соответствующих id
. Вы можете выполнить следующий запрос:
SQL
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
)
Python
from delta.tables import *
deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.whenNotMatched
.insertExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.execute()
Внимание
Только одна строка из исходной таблицы может соответствовать заданной строке в целевой таблице. В Databricks Runtime 16.0 и более поздних версий вычисляет условия, MERGE
указанные в WHEN MATCHED
предложениях и ON
определяющих повторяющиеся совпадения. В Databricks Runtime 15.4 LTS и ниже MERGE
операции рассматривают только условия, указанные в предложении ON
.
Дополнительные сведения о синтаксисе Scala и Python см. в документации по API Delta Lake. Сведения о синтаксисе SQL см. в разделе MERGE INTO
Изменение всех несовпаденных строк с помощью слияния
В Databricks SQL и Databricks Runtime 12.2 LTS и более поздних версиях можно использовать WHEN NOT MATCHED BY SOURCE
предложение UPDATE
для или DELETE
записи в целевой таблице, которые не имеют соответствующих записей в исходной таблице. Databricks рекомендует добавить необязательное условное предложение, чтобы избежать полной перезаписи целевой таблицы.
В следующем примере кода показан базовый синтаксис использования для удаления, перезапись целевой таблицы с содержимым исходной таблицы и удаление несовпаденных записей в целевой таблице. Дополнительные масштабируемые шаблоны для таблиц, в которых исходные обновления и удаления привязаны к времени, см. в разделе Добавочная синхронизация таблицы Delta с источником.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
В следующем примере добавляются условия в WHEN NOT MATCHED BY SOURCE
предложение и указываются значения для обновления в несовпаденных целевых строках.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdate(
set = {"target.lastSeen": "source.timestamp"}
)
.whenNotMatchedInsert(
values = {
"target.key": "source.key",
"target.lastSeen": "source.timestamp",
"target.status": "'active'"
}
)
.whenNotMatchedBySourceUpdate(
condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
set = {"target.status": "'inactive'"}
)
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateExpr(Map("target.lastSeen" -> "source.timestamp"))
.whenNotMatched()
.insertExpr(Map(
"target.key" -> "source.key",
"target.lastSeen" -> "source.timestamp",
"target.status" -> "'active'",
)
)
.whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
.updateExpr(Map("target.status" -> "'inactive'"))
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
UPDATE SET target.status = 'inactive'
Семантика операции слияния
Ниже приведено подробное описание семантики программной merge
операции.
Может существовать любое количество предложений
whenMatched
иwhenNotMatched
.Предложения
whenMatched
выполняются, когда исходная строка соответствует строке целевой таблицы на основе условия соответствия. Эти предложения имеют следующую семантику.Предложения
whenMatched
могут иметь не более одного действияupdate
и одного действияdelete
. Действиеupdate
обновляетmerge
только указанные столбцы (аналогичноupdate
операции) соответствующей целевой строки. Действиеdelete
удаляет сопоставленную строку.Каждое предложение
whenMatched
может иметь необязательное условие. Если это условие предложения существует, действиеupdate
илиdelete
выполняется для любой соответствующей пары исходной и целевой строк, только если условие предложения истинно.Если существует несколько предложений
whenMatched
, они вычисляются в том порядке, в котором указаны. Все предложенияwhenMatched
, за исключением последнего, должны иметь условия.Если ни одно из условий
whenMatched
не вычисляется как истинное для исходной и целевой пары строк, которая соответствует условию слияния, целевая строка остается неизменной.Чтобы заменить все столбцы целевой таблицы Delta соответствующими столбцами исходного набора данных, используйте
whenMatched(...).updateAll()
. Это соответствует следующей записи:whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
для всех столбцов целевой таблицы Delta. Следовательно, это действие предполагает, что в исходной таблице есть те же столбцы, что и в целевой таблице, иначе запрос выдаст ошибку анализа.
Примечание.
Это поведение изменяется при включенной автоматической эволюции схемы. Дополнительные сведения см. в статье об автоматической эволюции схемы.
Предложения
whenNotMatched
выполняются, когда исходная строка не соответствует какой-либо целевой строке на основе условия соответствия. Эти предложения имеют следующую семантику.Предложения
whenNotMatched
могут иметь только действиеinsert
. Новая строка создается на основе указанного столбца и соответствующих выражений. Нет необходимости указывать все столбцы в целевой таблице. Для неуказанных целевых столбцов вставляется значениеNULL
.Каждое предложение
whenNotMatched
может иметь необязательное условие. Если условие предложения присутствует, исходная строка вставляется, только если это условие истинно для этой строки. В противном случае исходный столбец игнорируется.Если существует несколько предложений
whenNotMatched
, они вычисляются в том порядке, в котором указаны. Все предложенияwhenNotMatched
, за исключением последнего, должны иметь условия.Чтобы заменить все столбцы целевой таблицы Delta соответствующими столбцами исходного набора данных, используйте
whenNotMatched(...).insertAll()
. Это соответствует следующей записи:whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
для всех столбцов целевой таблицы Delta. Следовательно, это действие предполагает, что в исходной таблице есть те же столбцы, что и в целевой таблице, иначе запрос выдаст ошибку анализа.
Примечание.
Это поведение изменяется при включенной автоматической эволюции схемы. Дополнительные сведения см. в статье об автоматической эволюции схемы.
whenNotMatchedBySource
предложения выполняются, если целевая строка не соответствует исходной строке в зависимости от условия слияния. Эти предложения имеют следующую семантику.whenNotMatchedBySource
предложения могут указыватьdelete
иupdate
выполнять действия.- Каждое предложение
whenNotMatchedBySource
может иметь необязательное условие. Если условие предложения присутствует, целевая строка изменяется только в том случае, если это условие имеет значение true для этой строки. В противном случае целевая строка остается без изменений. - Если существует несколько предложений
whenNotMatchedBySource
, они вычисляются в том порядке, в котором указаны. Все предложенияwhenNotMatchedBySource
, за исключением последнего, должны иметь условия. - По определению
whenNotMatchedBySource
предложения не имеют исходной строки для извлечения значений столбцов, поэтому на исходные столбцы нельзя ссылаться. Для каждого столбца, который необходимо изменить, можно указать литерал или выполнить действие в целевом столбце, напримерSET target.deleted_count = target.deleted_count + 1
.
Внимание
- Операция
merge
может завершиться ошибкой, если несколько строк исходного набора данных совпадают и операция слияния пытается обновить одни и те же строки целевой таблицы Delta. В соответствии с семантикой слияния SQL такая операция обновления неоднозначна, так как неясно, какую исходную строку следует использовать для обновления соответствующей целевой строки. Вы можете предварительно обработать исходную таблицу, чтобы исключить возможность множественных совпадений. - Вы можете применить операцию SQL
MERGE
к представлению SQL, только если представление определено какCREATE VIEW viewName AS SELECT * FROM deltaTable
.
Дедупликация данных при записи в таблицы Delta
Распространенным вариантом использования извлечения. преобразования и загрузки является сбору журналов в таблице Delta путем добавления их в таблицу. Однако часто источники могут создавать дублирующиеся записи журнала, и тогда требуются нижестоящие шаги по дедупликации. С помощью merge
можно избежать вставки дублирующихся записей.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
Scala
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
Java
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
Примечание.
Набор данных, содержащий новые журналы, нужно дедуплицировать внутри самого себя. По семантике SQL слияния оно сопоставляет новые данные с существующими данными в таблице и выполняет дедупликацию, но если дублирующиеся данные есть в новом наборе данных, они вставляются. Таким образом, перед слиянием в таблицу новых данных следует провести их дедупликацию.
Если вы знаете, что вы можете получить повторяющиеся записи только в течение нескольких дней, вы можете оптимизировать запрос дальше, секционируя таблицу по дате, а затем указывая диапазон дат целевой таблицы для сопоставления.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
Scala
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
Java
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
Это более эффективно, чем предыдущая команда, так как она ищет дубликаты только за последние семь дней журналов, а не во всей таблице. Кроме того, вы можете использовать слияние только со вставкой со структурированной потоковой передачей для выполнения непрерывной дедупликации журналов.
- В запросах потоковой передачи можно использовать операцию слияния в
foreachBatch
для непрерывной записи потоковых данных в таблицу Delta с дедупликацией. Дополнительные сведения оforeachBatch
см. в следующем примере потоковой передачи. - В другом запросе потоковой передачи вы можете постоянно считывать дедуплицированные данные из этой таблицы Delta. Это возможно, поскольку слияние только со вставкой добавляет новые данные в таблицу Delta.
Медленно меняющиеся данные (SCD) и запись измененных данных (CDC) с Delta Lake
Delta Live Tables имеет встроенную поддержку отслеживания и применения SCD Type 1 и Type 2. Используйте APPLY CHANGES INTO
с разностными динамическими таблицами, чтобы обеспечить правильную обработку записей вне порядка при обработке веб-каналов CDC. См . API APPLY CHANGES: упрощение отслеживания изменений с помощью разностных динамических таблиц.
Добавочная синхронизация таблицы Delta с источником
В Databricks SQL и Databricks Runtime 12.2 LTS и более поздних версиях можно использовать WHEN NOT MATCHED BY SOURCE
для создания произвольных условий для атомарного удаления и замены части таблицы. Это может быть особенно полезно, если у вас есть исходная таблица, в которой записи могут изменяться или удаляться в течение нескольких дней после начальной записи данных, но в конечном итоге урегулируются до окончательного состояния.
Следующий запрос показывает использование этого шаблона для выбора 5 дней записей из источника, обновления соответствующих записей в целевом объекте, вставки новых записей из источника в целевой объект и удаления всех несовпаденных записей за последние 5 дней в целевом объекте.
MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE
Предоставляя тот же логический фильтр в исходных и целевых таблицах, вы можете динамически распространять изменения из источника в целевые таблицы, включая удаления.
Примечание.
Хотя этот шаблон можно использовать без каких-либо условных предложений, это приведет к полной перезаписи целевой таблицы, которая может быть дорогой.