Actualización en una tabla de Delta Lake mediante combinación

Puede actualizar datos de una tabla de origen, vista o dataframe en una tabla Delta de destino mediante la operación SQL MERGE. Delta Lake admite inserciones, actualizaciones y eliminaciones en MERGE y admite la sintaxis extendida más allá de los estándares de SQL para facilitar casos de uso avanzados.

Supongamos que tiene una tabla de origen denominada people10mupdates o una ruta de acceso de origen en /tmp/delta/people-10m-updates que contiene nuevos datos para una tabla de destino denominada people10m o una ruta de acceso de destino en /tmp/delta/people-10m. Es posible que algunos de estos nuevos registros ya están presentes en los datos de destino. Para combinar los nuevos datos, quiere actualizar las filas en las que el id de la persona ya está presente e insertar las nuevas filas en las que no haya ningún id que coincida. Puede ejecutar la siguiente consulta:

SQL

MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName,
    middleName = people10mupdates.middleName,
    lastName = people10mupdates.lastName,
    gender = people10mupdates.gender,
    birthDate = people10mupdates.birthDate,
    ssn = people10mupdates.ssn,
    salary = people10mupdates.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    firstName,
    middleName,
    lastName,
    gender,
    birthDate,
    ssn,
    salary
  )
  VALUES (
    people10mupdates.id,
    people10mupdates.firstName,
    people10mupdates.middleName,
    people10mupdates.lastName,
    people10mupdates.gender,
    people10mupdates.birthDate,
    people10mupdates.ssn,
    people10mupdates.salary
  )

Python

from delta.tables import *

deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-10m-updates')

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTablePeople = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
val deltaTablePeopleUpdates = DeltaTable.forPath(spark, "tmp/delta/people-10m-updates")
val dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched
  .updateExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .whenNotMatched
  .insertExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .execute()

Para obtener información sobre la sintaxis de Scala y Python, consulte la Documentación de la API de Delta Lake. Para más detalles sobre la sintaxis SQL, consulte MERGE INTO

Modificación de todas las filas no coincidentes mediante la combinación

En Databricks SQL, Databricks Runtime 12.1 y versiones posteriores, puede usar la cláusula WHEN NOT MATCHED BY SOURCE para los registros UPDATE o DELETE de la tabla de destino que no tienen los registros correspondientes en la tabla de origen. Databricks recomienda agregar una cláusula condicional opcional para evitar volver a escribir completamente la tabla de destino.

En el código de ejemplo siguiente se muestra la sintaxis básica para usar esto para las eliminaciones, sobrescribiendo la tabla de destino con el contenido de la tabla de origen y eliminando registros no coincidentes en la tabla de destino. Para obtener un patrón más escalable para las tablas en las que las actualizaciones y eliminaciones de origen están enlazadas por tiempo, consulte Sincronización incremental de la tabla Delta con el origen.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

En el ejemplo siguiente se agregan condiciones a la cláusula WHEN NOT MATCHED BY SOURCE y se especifican los valores que se van a actualizar en filas de destino no coincidentes.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdate(
    set = {"target.lastSeen": "source.timestamp"}
  )
  .whenNotMatchedInsert(
    values = {
      "target.key": "source.key",
      "target.lastSeen": "source.timestamp",
      "target.status": "'active'"
    }
  )
  .whenNotMatchedBySourceUpdate(
    condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
    set = {"target.status": "'inactive'"}
  )
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateExpr(Map("target.lastSeen" -> "source.timestamp"))
  .whenNotMatched()
  .insertExpr(Map(
    "target.key" -> "source.key",
    "target.lastSeen" -> "source.timestamp",
    "target.status" -> "'active'",
    )
  )
  .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
  .updateExpr(Map("target.status" -> "'inactive'"))
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
  INSERT (key, lastSeen, status) VALUES (source.key,  source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
  UPDATE SET target.status = 'inactive'

Semántica de la operación Merge

A continuación se encuentra una descripción detallada de la semántica de la operación merge mediante programación.

  • Puede haber varias cláusulas whenMatched y whenNotMatched.

  • Las cláusulas whenMatched se ejecutan cuando una fila de origen coincide con una fila de tabla de destino en función de la condición de coincidencia. Estas cláusulas tienen la semántica siguiente.

    • Las cláusulas whenMatched pueden tener como máximo una acción update y una acción delete. La acción update de merge solo actualiza las columnas especificadas (de forma similar a la operación update) de la fila de destino coincidente. La acción delete elimina la fila coincidente.

    • Cada cláusula whenMatched puede tener una condición opcional. Si esta condición de cláusula existe, la acción update o delete se ejecuta para cualquier fila del par de filas de origen-destino coincidente solo cuando la condición de la cláusula es true.

    • Si hay varias cláusulas whenMatched, se evalúan en el orden en que se especifican. Todas las cláusulas whenMatched, excepto la última, deben tener condiciones.

    • Si ninguna de las condiciones whenMatched se evalúa como true para un par de filas de origen y destino que coincida con la condición de combinación, la fila de destino se deja sin modificar.

    • Para actualizar todas las columnas de la tabla Delta de destino con las columnas correspondientes del conjunto de datos de origen, use whenMatched(...).updateAll(). Esto equivale a:

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      para todas las columnas de la tabla Delta de destino. Por lo tanto, esta acción supone que la tabla de origen tiene las mismas columnas que la tabla de destino; si no es así, la consulta genera un error de análisis.

      Nota:

      Este comportamiento cambia cuando se habilita la migración automática de esquemas. Para más información, consulte Evolución automática del esquema.

  • Las cláusulas whenNotMatched se ejecutan cuando una fila de origen no coincide con ninguna fila de destino según la condición de coincidencia. Estas cláusulas tienen la semántica siguiente.

    • Las cláusulas whenNotMatched solo pueden tener la acción insert. La nueva fila se genera en función de la columna especificada y las expresiones correspondientes. No es preciso especificar todas las columnas de la tabla de destino. Para las columnas de destino no especificadas, se inserta NULL.

    • Cada cláusula whenNotMatched puede tener una condición opcional. Si la condición de la cláusula está presente, se inserta una fila de origen, pero solo si la condición es "true" en esa fila. De lo contrario, se omite la columna de origen.

    • Si hay varias cláusulas whenNotMatched, se evalúan en el orden en que se especifican. Todas las cláusulas whenNotMatched, excepto la última, deben tener condiciones.

    • Para insertar todas las columnas de la tabla Delta de destino con las columnas correspondientes del conjunto de datos de origen, use whenNotMatched(...).insertAll(). Esto equivale a:

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      para todas las columnas de la tabla Delta de destino. Por lo tanto, esta acción supone que la tabla de origen tiene las mismas columnas que la tabla de destino; si no es así, la consulta genera un error de análisis.

      Nota:

      Este comportamiento cambia cuando se habilita la migración automática de esquemas. Para más información, consulte Evolución automática del esquema.

  • Las cláusulas whenNotMatchedBySource se ejecutan cuando una fila de destino no coincide con ninguna fila de origen según la condición de combinación. Estas cláusulas tienen la semántica siguiente.

    • Las cláusulas whenNotMatchedBySource pueden especificar las acciones delete y update.
    • Cada cláusula whenNotMatchedBySource puede tener una condición opcional. Si la condición de la cláusula está presente, solo se insertan aquellas filas de destino en las que se cumpla la condición. De lo contrario, la fila de destino se quedan sin cambios.
    • Si hay varias cláusulas whenNotMatchedBySource, se evalúan en el orden en que se especifican. Todas las cláusulas whenNotMatchedBySource, excepto la última, deben tener condiciones.
    • Por definición, las cláusulas whenNotMatchedBySource no tienen una fila de origen para extraer valores de columna, por lo que no se puede hacer referencia a las columnas de origen. Para cada columna que se va a modificar, puede especificar un literal o realizar una acción en la columna de destino, como SET target.deleted_count = target.deleted_count + 1.

Importante

  • Una operación merge puede generar un error si varias filas del conjunto de datos de origen coinciden y la fusión mediante combinación intenta actualizar las mismas filas de la tabla Delta de destino. Según la semántica SQL de la fusión mediante combinación, esta operación de actualización es ambigua, ya que no está claro qué fila de origen se debe usar para actualizar la fila de destino coincidente. Puede procesar previamente la tabla de origen para eliminar la posibilidad de que haya varias coincidencias.
  • Puede aplicar una operación SQL MERGE a SQL VIEW solo si la vista se ha definido como CREATE VIEW viewName AS SELECT * FROM deltaTable.

Desduplicación de datos al escribir en tablas Delta

Un caso de uso común de extracción, transformación y carga de datos es recopilar registros en la tabla Delta anexándolos a una tabla. Sin embargo, a menudo los orígenes pueden generar registros duplicados y se necesitan pasos de desduplicación de bajada para ocuparse de ellos. Con merge, puede evitar insertar los registros duplicados.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

Scala

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()

Java

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

Nota:

El conjunto de datos que contiene los nuevos registros debe desduplicarse dentro de sí mismo. Por la semántica SQL de la fusión mediante combinación, coincide con los nuevos datos y los desduplica con los datos existentes en la tabla, pero si hay datos duplicados dentro del nuevo conjunto de datos, se insertan. Por lo tanto, desduplica los nuevos datos antes de combinarlos en la tabla.

Si sabe que solo puede obtener registros duplicados durante unos días, para optimizar aún más la consulta cree particiones de la tabla por fecha y, después, especifique el intervalo de fechas de la tabla de destino con el que debe coincidir.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()

Scala

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()

Java

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

Esto es más eficaz que el comando anterior, ya que busca duplicados solo en los siete últimos días de registros, no en toda la tabla. Además, puede usar esta fusión mediante combinación de solo inserción con Structured Streaming para realizar la desduplicación continua de los registros.

  • En una consulta de streaming, puede usar la operación Merge en foreachBatch para escribir continuamente los datos de streaming en una tabla Delta con desduplicación. Consulte el siguiente ejemplo de streaming para más información sobre foreachBatch.
  • En otra consulta de streaming, puede leer continuamente datos desduplicados de esta tabla Delta. Esto es posible porque las fusiones mediante combinación de solo inserción solo anexan nuevos datos a la tabla Delta.

Datos de variación lenta (SCD) y captura de datos modificados (CDC) con Delta Lake

Las tablas de Delta Live tiene compatibilidad nativa con el seguimiento y la aplicación de DVL de tipo 1 y tipo 2. Use APPLY CHANGES INTO con Delta Live Tables para asegurarse de que los registros desordenados se controlan correctamente al procesar fuentes de CDC. Vea Captura simplificada de datos modificados con APPLY CHANGES API en Delta Live Tables.

Sincronización incremental de la tabla Delta con el origen

En Databricks SQL, Databricks Runtime 12.1 y versiones posteriores, puede usar WHEN NOT MATCHED BY SOURCE para crear condiciones arbitrarias para eliminar y reemplazar de forma atómica una parte de una tabla. Esto puede ser especialmente útil cuando tiene una tabla de origen en la que los registros pueden cambiarse o eliminarse durante varios días después de la entrada de datos inicial, pero acaban quedándose en un estado final.

En la consulta siguiente se muestra el uso de este patrón para seleccionar cinco días de registros en el origen, actualizar los registros coincidentes en el destino, insertar nuevos registros desde el origen al destino y eliminar todos los registros no coincidentes de los últimos cinco días en el destino.

MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE

Al proporcionar el mismo filtro booleano en las tablas de origen y de destino, puede propagar dinámicamente los cambios de las tablas de origen a las de destino, incluidas las eliminaciones.

Nota:

Aunque este patrón se puede usar sin cláusulas condicionales, esto provocaría una reescritura completa de la tabla de destino que puede ser onerosa.