Tutorial: Delta Lake
En este tutorial se presentan las operaciones comunes de Delta Lake en Azure Databricks, incluidas las siguientes:
- Creación de una tabla.
- Actualización/inserción (upsert) en una tabla.
- Lectura desde una tabla.
- Visualización del historial de tablas.
- Consulta de una versión anterior de una tabla.
- Optimización de una tabla.
- Adicción de un índice de orden Z.
- Vaciado de archivos sin referencia.
Puede ejecutar el código de ejemplo de Python, R, Scala y SQL de este artículo desde un cuaderno asociado a un clúster de Azure Databricks. También puede ejecutar el código de SQL de este artículo desde una consulta asociada a un almacén de SQL en Databricks SQL.
Nota:
Algunos de los ejemplos de código siguientes usan una notación de espacio de nombres de dos niveles que consta de un esquema (también denominado base de datos) y una tabla o vista (por ejemplo, default.people10m
). Para usar estos ejemplos con Unity Catalog, reemplace el espacio de nombres de dos niveles por notación de espacio de nombres de tres niveles de Unity Catalog, que consta de un catálogo, un esquema y una tabla o vista (por ejemplo, main.default.people10m
).
Creación de una tabla
Todas las tablas creadas en Azure Databricks usan Delta Lake de manera predeterminada.
Nota:
Delta Lake es el valor predeterminado para todos los comandos de lectura, escritura y creación de tablas de Azure Databricks.
Python
# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)
R
library(SparkR)
sparkR.session()
# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")
# Write the data to a table.
table_name = "people_10m"
saveAsTable(
df = df,
tableName = table_name
)
Scala
// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
// Write the data to a table.
val table_name = "people_10m"
people.write.saveAsTable("people_10m")
SQL
DROP TABLE IF EXISTS people_10m;
CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;
Las operaciones anteriores crean una nueva tabla administrada mediante el esquema que se infirió de los datos. Para información sobre las opciones disponibles al crear una tabla de Delta, consulte CREATE TABLE (Databricks SQL).
Para las tablas administradas, Azure Databricks determina la ubicación de los datos. Para obtener la ubicación, puede usar la instrucción DESCRIBE DETAIL, por ejemplo:
Python
display(spark.sql('DESCRIBE DETAIL people_10m'))
R
display(sql("DESCRIBE DETAIL people_10m"))
Scala
display(spark.sql("DESCRIBE DETAIL people_10m"))
SQL
DESCRIBE DETAIL people_10m;
A veces, puede que desee crear una tabla especificando el esquema antes de insertar datos. Puede completarlo con los siguientes comandos SQL:
CREATE TABLE IF NOT EXISTS people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
CREATE OR REPLACE TABLE people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
En Databricks Runtime 13.3 LTS y versiones posteriores, puede usar CREATE TABLE LIKE
para crear una nueva tabla Delta vacía que duplica las propiedades de esquema y tabla de una tabla Delta de origen. Esto puede ser especialmente útil al promover tablas de un entorno de desarrollo en producción, como en el ejemplo de código siguiente:
CREATE TABLE prod.people10m LIKE dev.people10m
También puede usar la API DeltaTableBuilder
en Delta Lake para crear tablas. En comparación con DataFrameWriter API, esta API facilita la especificación de información adicional, como comentarios de columna, propiedades de tabla y columnas generadas.
Importante
Esta característica está en versión preliminar pública.
Python
# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
.tableName("default.people10m") \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.execute()
# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.property("description", "table with people data") \
.location("/tmp/delta/people10m") \
.execute()
Scala
// Create table in the metastore
DeltaTable.createOrReplace(spark)
.tableName("default.people10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.property("description", "table with people data")
.location("/tmp/delta/people10m")
.execute()
Actualización/inserción (upsert) en una tabla
Para combinar un conjunto de actualizaciones e inserciones en una tabla de Delta existente, use la instrucción MERGE INTO. Por ejemplo, la siguiente instrucción toma datos de la tabla de origen y los combina en la tabla de Delta de destino. Cuando hay una fila coincidente en ambas tablas, Delta Lake actualiza la columna de datos mediante la expresión especificada. Cuando no hay ninguna fila que coincida, Delta Lake agrega una nueva fila. Esta operación se conoce como upsert.
CREATE OR REPLACE TEMP VIEW people_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
Si especifica *
, se actualizan o insertan todas las columnas de la tabla de destino. Esta acción supone que la tabla de origen tiene las mismas columnas que la tabla de destino; de lo contrario, la consulta producirá un error de análisis.
Debe especificar un valor para cada columna de la tabla al realizar una operación INSERT
(por ejemplo, cuando no hay ninguna fila coincidente en el conjunto de datos existente). Sin embargo, no es necesario actualizar todos los valores.
Para ver los resultados, consulte la tabla.
SELECT * FROM people_10m WHERE id >= 9999998
Lectura de una tabla
Puede acceder a los datos de las tablas Delta por el nombre o la ruta de acceso de la tabla, como se muestra en los ejemplos siguientes:
Python
people_df = spark.read.table(table_name)
display(people_df)
## or
people_df = spark.read.load(table_path)
display(people_df)
R
people_df = tableToDF(table_name)
display(people_df)
Scala
val people_df = spark.read.table(table_name)
display(people_df)
\\ or
val people_df = spark.read.load(table_path)
display(people_df)
SQL
SELECT * FROM people_10m;
SELECT * FROM delta.`<path-to-table`;
Escritura en una tabla
Delta Lake usa una sintaxis estándar para escribir datos en tablas.
Para agregar datos nuevos de forma atómica a una tabla Delta existente, use el modo append
como en los ejemplos siguientes:
SQL
INSERT INTO people10m SELECT * FROM more_people
Python
df.write.mode("append").saveAsTable("people10m")
Scala
df.write.mode("append").saveAsTable("people10m")
Para reemplazar de forma atómica todos los datos de una tabla, use el modo overwrite
como en los ejemplos siguientes:
SQL
INSERT OVERWRITE TABLE people10m SELECT * FROM more_people
Python
df.write.mode("overwrite").saveAsTable("people10m")
Scala
df.write.mode("overwrite").saveAsTable("people10m")
Actualización de una tabla
Puede actualizar aquellos datos que coincidan con un predicado en una tabla Delta. Por ejemplo, en una tabla denominada people10m
o en una ruta de acceso en /tmp/delta/people-10m
, para cambiar una abreviatura en la columna gender
de M
o F
a Male
o Female
puede ejecutar lo siguiente:
SQL
UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
Eliminación en una tabla
Puede quitar aquellos datos que coincidan con un predicado de una tabla Delta. Por ejemplo, en una tabla denominada people10m
o en una ruta de acceso en /tmp/delta/people-10m
, para eliminar todas las filas correspondientes a personas con un valor en la columna birthDate
anterior a 1955
puede ejecutar lo siguiente:
SQL
DELETE FROM people10m WHERE birthDate < '1955-01-01'
DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
Importante
delete
quita los datos de la versión más reciente de la tabla Delta, pero no los quita del almacenamiento físico hasta que se utiliza vaccum explícitamente en las versiones anteriores. Para más información, consulte la información sobre vacuum.
Visualización del historial de tablas
Para ver el historial de una tabla, use la instrucción DESCRIBE HISTORY, que proporciona información de procedencia, incluida la versión de la tabla, la operación, el usuario, etc.para cada escritura en una tabla.
DESCRIBE HISTORY people_10m
Consulta de una versión anterior de la tabla (viaje en el tiempo)
El viaje en el tiempo de Delta Lake permite consultar una instantánea anterior de una tabla de Delta.
Para consultar una versión anterior de una tabla, especifique una versión o marca de tiempo en una instrucción SELECT
. Por ejemplo, para consultar la versión 0 del historial anterior, use:
SELECT * FROM people_10m VERSION AS OF 0
o
SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
Para las marcas de tiempo, solo se aceptan cadenas de fecha o marca de tiempo, por ejemplo, "2019-01-01"
y "2019-01-01'T'00:00:00.000Z"
.
Las opciones de DataFrameReader permiten crear un DataFrame a partir de una tabla Delta fijada a una versión específica de la tabla, por ejemplo en Python:
df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")
display(df1)
o bien, como alternativa:
df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")
display(df2)
Para más información, consulte Trabajo con el historial de tablas de Delta Lake.
Optimización de una tabla
Una vez que haya realizado varios cambios en una tabla, es posible que tenga muchos archivos pequeños. Para mejorar la velocidad de las consultas de lectura, puede usar para OPTIMIZE
contraer archivos pequeños en más grandes:
OPTIMIZE people_10m
Orden Z por columnas
Para mejorar aún más el rendimiento de lectura, puede coubicar información relacionada en el mismo conjunto de archivos mediante la ordenación Z. Los algoritmos de omisión de datos de Delta Lake usan esta coubicación automáticamente para reducir drásticamente la cantidad de datos que se deben leer. Para aplicar la ordenación Z a los datos, especifique las columnas en las que se va a ordenar en la cláusula ZORDER BY
. Por ejemplo, para coubicar por gender
, ejecute:
OPTIMIZE people_10m
ZORDER BY (gender)
Para obtener el conjunto completo de opciones disponibles al ejecutar OPTIMIZE
, consulte Compactación de archivos de datos con optimización en Delta Lake.
Limpieza de instantáneas con VACUUM
Delta Lake proporciona aislamiento de instantáneas para lecturas, lo que significa que es seguro ejecutar OPTIMIZE
incluso mientras otros usuarios o trabajos consultan la tabla. Sin embargo, al final debe limpiar las instantáneas antiguas. Para ello, ejecute el comando VACUUM
:
VACUUM people_10m
Para más información sobre el uso eficaz de VACUUM
, consulte Eliminación de archivos de datos sin usar con vacuum.