Tutorial: Delta Lake

En este tutorial se presentan las operaciones comunes de Delta Lake en Azure Databricks, incluidas las siguientes:

Puede ejecutar el código de ejemplo de Python, R, Scala y SQL de este artículo desde un cuaderno asociado a un clúster de Azure Databricks. También puede ejecutar el código de SQL de este artículo desde una consulta asociada a un almacén de SQL en Databricks SQL.

Nota:

Algunos de los ejemplos de código siguientes usan una notación de espacio de nombres de dos niveles que consta de un esquema (también denominado base de datos) y una tabla o vista (por ejemplo, default.people10m). Para usar estos ejemplos con Unity Catalog, reemplace el espacio de nombres de dos niveles por notación de espacio de nombres de tres niveles de Unity Catalog, que consta de un catálogo, un esquema y una tabla o vista (por ejemplo, main.default.people10m).

Creación de una tabla

Todas las tablas creadas en Azure Databricks usan Delta Lake de manera predeterminada.

Nota:

Delta Lake es el valor predeterminado para todos los comandos de lectura, escritura y creación de tablas de Azure Databricks.

Python

# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)

R

library(SparkR)
sparkR.session()

# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"

saveAsTable(
  df = df,
  tableName = table_name
)

Scala

// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

// Write the data to a table.
val table_name = "people_10m"

people.write.saveAsTable("people_10m")

SQL

DROP TABLE IF EXISTS people_10m;

CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;

Las operaciones anteriores crean una nueva tabla administrada mediante el esquema que se infirió de los datos. Para información sobre las opciones disponibles al crear una tabla de Delta, consulte CREATE TABLE (Databricks SQL).

Para las tablas administradas, Azure Databricks determina la ubicación de los datos. Para obtener la ubicación, puede usar la instrucción DESCRIBE DETAIL, por ejemplo:

Python

display(spark.sql('DESCRIBE DETAIL people_10m'))

R

display(sql("DESCRIBE DETAIL people_10m"))

Scala

display(spark.sql("DESCRIBE DETAIL people_10m"))

SQL

DESCRIBE DETAIL people_10m;

A veces, puede que desee crear una tabla especificando el esquema antes de insertar datos. Puede completarlo con los siguientes comandos SQL:

CREATE TABLE IF NOT EXISTS people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

CREATE OR REPLACE TABLE people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

En Databricks Runtime 13.3 LTS y versiones posteriores, puede usar CREATE TABLE LIKE para crear una nueva tabla Delta vacía que duplica las propiedades de esquema y tabla de una tabla Delta de origen. Esto puede ser especialmente útil al promover tablas de un entorno de desarrollo en producción, como en el ejemplo de código siguiente:

CREATE TABLE prod.people10m LIKE dev.people10m

También puede usar la API DeltaTableBuilder en Delta Lake para crear tablas. En comparación con DataFrameWriter API, esta API facilita la especificación de información adicional, como comentarios de columna, propiedades de tabla y columnas generadas.

Importante

Esta característica está en versión preliminar pública.

Python

# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
  .tableName("default.people10m") \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .execute()

# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .property("description", "table with people data") \
  .location("/tmp/delta/people10m") \
  .execute()

Scala

// Create table in the metastore
DeltaTable.createOrReplace(spark)
  .tableName("default.people10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .property("description", "table with people data")
  .location("/tmp/delta/people10m")
  .execute()

Actualización/inserción (upsert) en una tabla

Para combinar un conjunto de actualizaciones e inserciones en una tabla de Delta existente, use la instrucción MERGE INTO. Por ejemplo, la siguiente instrucción toma datos de la tabla de origen y los combina en la tabla de Delta de destino. Cuando hay una fila coincidente en ambas tablas, Delta Lake actualiza la columna de datos mediante la expresión especificada. Cuando no hay ninguna fila que coincida, Delta Lake agrega una nueva fila. Esta operación se conoce como upsert.

CREATE OR REPLACE TEMP VIEW people_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

Si especifica *, se actualizan o insertan todas las columnas de la tabla de destino. Esta acción supone que la tabla de origen tiene las mismas columnas que la tabla de destino; de lo contrario, la consulta producirá un error de análisis.

Debe especificar un valor para cada columna de la tabla al realizar una operación INSERT (por ejemplo, cuando no hay ninguna fila coincidente en el conjunto de datos existente). Sin embargo, no es necesario actualizar todos los valores.

Para ver los resultados, consulte la tabla.

SELECT * FROM people_10m WHERE id >= 9999998

Lectura de una tabla

Puede acceder a los datos de las tablas Delta por el nombre o la ruta de acceso de la tabla, como se muestra en los ejemplos siguientes:

Python

people_df = spark.read.table(table_name)

display(people_df)

## or

people_df = spark.read.load(table_path)

display(people_df)

R

people_df = tableToDF(table_name)

display(people_df)

Scala

val people_df = spark.read.table(table_name)

display(people_df)

\\ or

val people_df = spark.read.load(table_path)

display(people_df)

SQL

SELECT * FROM people_10m;

SELECT * FROM delta.`<path-to-table`;

Escritura en una tabla

Delta Lake usa una sintaxis estándar para escribir datos en tablas.

Para agregar datos nuevos de forma atómica a una tabla Delta existente, use el modo append como en los ejemplos siguientes:

SQL

INSERT INTO people10m SELECT * FROM more_people

Python

df.write.mode("append").saveAsTable("people10m")

Scala

df.write.mode("append").saveAsTable("people10m")

Para reemplazar de forma atómica todos los datos de una tabla, use el modo overwrite como en los ejemplos siguientes:

SQL

INSERT OVERWRITE TABLE people10m SELECT * FROM more_people

Python

df.write.mode("overwrite").saveAsTable("people10m")

Scala

df.write.mode("overwrite").saveAsTable("people10m")

Actualización de una tabla

Puede actualizar aquellos datos que coincidan con un predicado en una tabla Delta. Por ejemplo, en una tabla denominada people10m o en una ruta de acceso en /tmp/delta/people-10m, para cambiar una abreviatura en la columna gender de M o F a Male o Female puede ejecutar lo siguiente:

SQL

UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';

UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

Eliminación en una tabla

Puede quitar aquellos datos que coincidan con un predicado de una tabla Delta. Por ejemplo, en una tabla denominada people10m o en una ruta de acceso en /tmp/delta/people-10m, para eliminar todas las filas correspondientes a personas con un valor en la columna birthDateanterior a 1955 puede ejecutar lo siguiente:

SQL

DELETE FROM people10m WHERE birthDate < '1955-01-01'

DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

Importante

delete quita los datos de la versión más reciente de la tabla Delta, pero no los quita del almacenamiento físico hasta que se utiliza vaccum explícitamente en las versiones anteriores. Para más información, consulte la información sobre vacuum.

Visualización del historial de tablas

Para ver el historial de una tabla, use la instrucción DESCRIBE HISTORY, que proporciona información de procedencia, incluida la versión de la tabla, la operación, el usuario, etc.para cada escritura en una tabla.

DESCRIBE HISTORY people_10m

Consulta de una versión anterior de la tabla (viaje en el tiempo)

El viaje en el tiempo de Delta Lake permite consultar una instantánea anterior de una tabla de Delta.

Para consultar una versión anterior de una tabla, especifique una versión o marca de tiempo en una instrucción SELECT. Por ejemplo, para consultar la versión 0 del historial anterior, use:

SELECT * FROM people_10m VERSION AS OF 0

o

SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

Para las marcas de tiempo, solo se aceptan cadenas de fecha o marca de tiempo, por ejemplo, "2019-01-01" y "2019-01-01'T'00:00:00.000Z".

Las opciones de DataFrameReader permiten crear un DataFrame a partir de una tabla Delta fijada a una versión específica de la tabla, por ejemplo en Python:

df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")

display(df1)

o bien, como alternativa:

df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")

display(df2)

Para más información, consulte Trabajo con el historial de tablas de Delta Lake.

Optimización de una tabla

Una vez que haya realizado varios cambios en una tabla, es posible que tenga muchos archivos pequeños. Para mejorar la velocidad de las consultas de lectura, puede usar para OPTIMIZE contraer archivos pequeños en más grandes:

OPTIMIZE people_10m

Orden Z por columnas

Para mejorar aún más el rendimiento de lectura, puede coubicar información relacionada en el mismo conjunto de archivos mediante la ordenación Z. Los algoritmos de omisión de datos de Delta Lake usan esta coubicación automáticamente para reducir drásticamente la cantidad de datos que se deben leer. Para aplicar la ordenación Z a los datos, especifique las columnas en las que se va a ordenar en la cláusula ZORDER BY. Por ejemplo, para coubicar por gender, ejecute:

OPTIMIZE people_10m
ZORDER BY (gender)

Para obtener el conjunto completo de opciones disponibles al ejecutar OPTIMIZE, consulte Compactación de archivos de datos con optimización en Delta Lake.

Limpieza de instantáneas con VACUUM

Delta Lake proporciona aislamiento de instantáneas para lecturas, lo que significa que es seguro ejecutar OPTIMIZE incluso mientras otros usuarios o trabajos consultan la tabla. Sin embargo, al final debe limpiar las instantáneas antiguas. Para ello, ejecute el comando VACUUM:

VACUUM people_10m

Para más información sobre el uso eficaz de VACUUM, consulte Eliminación de archivos de datos sin usar con vacuum.