Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
En este tutorial se muestran las operaciones comunes de tabla Delta mediante datos de ejemplo. Delta Lake es la capa de almacenamiento optimizada que proporciona la base para las tablas en Databricks. A menos que se especifique lo contrario, todas las tablas de Databricks son tablas delta.
Antes de empezar
Para completar este tutorial, necesita lo siguiente:
- Permiso para usar un recurso de proceso existente o crear un nuevo recurso de proceso. Consulte Computación.
- Permisos del catálogo de Unity:
USE CATALOG,USE SCHEMA, yCREATE TABLEen el catálogoworkspace. Para establecer estos permisos, consulte el administrador de Databricks o los privilegios de Unity Catalog y objetos protegibles.
Estos ejemplos se basan en un conjunto de datos denominado Registros de personas sintéticas: de 10 000 a 10 millones de registros. Este conjunto de datos contiene registros ficticios de personas, incluidos sus nombres y apellidos, sexo y edad.
En primer lugar, descargue el conjunto de datos de este tutorial.
- Visite la página Registros de personas sintéticas: 10 000 a 10 millones de registros en Kaggle.
- Haga clic en Descargar y, a continuación, en Descargar conjunto de datos como zip. De este modo, se descarga un archivo denominado
archive.zipen el equipo local. - Extraiga la
archivecarpeta delarchive.ziparchivo.
A continuación, cargue el person_10000.csv conjunto de datos en un volumen de Catálogo de Unity dentro del área de trabajo de Azure Databricks. Azure Databricks recomienda cargar los datos en un volumen de catálogo de Unity, ya que los volúmenes proporcionan funcionalidades para acceder, almacenar, gobernar y organizar archivos.
- Para abrir el Explorador de catálogos, haga clic en
Catálogo en la barra lateral.
- En el Explorador de catálogos, haga clic en
Agregar datos y Crear un volumen. - Asigne al volumen el nombre
my-volumey seleccione Volumen administrado como tipo de volumen. - Seleccione el
workspacecatálogo y eldefaultesquema y, a continuación, haga clic en Crear. - Abra
my-volumey haga clic en Cargar en este volumen. - Arrastrar y soltar o buscar y seleccionar el archivo
person_10000.csven la carpetaarchivedel equipo local. - Haga clic en Cargar.
Por último, cree un cuaderno para ejecutar el código de ejemplo.
- Haga clic en
Nuevo en la barra lateral. - Haga clic en
Cuaderno para crear un cuaderno.
- Elija un idioma para el cuaderno.
Creación de una tabla
Cree una nueva tabla administrada del catálogo de Unity denominada workspace.default.people_10k desde person_10000.csv. Delta Lake es el valor predeterminado para todos los comandos de creación, lectura y escritura de tablas en Azure Databricks.
Pitón
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])
df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/workspace/default/my-volume/person_10000.csv")
# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()
# If you know the table does not already exist, you can use this command instead.
# df.write.saveAsTable("workspace.default.people_10k")
# View the new table.
df = spark.read.table("workspace.default.people_10k")
display(df)
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", IntegerType, true),
StructField("firstName", StringType, true),
StructField("lastName", StringType, true),
StructField("gender", StringType, true),
StructField("age", IntegerType, true)
))
val df = spark.read
.format("csv")
.option("header", true)
.schema(schema)
.load("/Volumes/workspace/default/my-volume/person_10000.csv")
// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()
// If you know the table does not already exist, you can use this command instead.
// df.saveAsTable("workspace.default.people_10k")
// View the new table.
val df2 = spark.read.table("workspace.default.people_10k")
display(df2)
SQL
-- Create the table with only the required columns and rename person_id to id.
CREATE OR REPLACE TABLE workspace.default.people_10k AS
SELECT
person_id AS id,
firstname,
lastname,
gender,
age
FROM read_files(
'/Volumes/workspace/default/my-volume/person_10000.csv',
format => 'csv',
header => true
);
-- View the new table.
SELECT * FROM workspace.default.people_10k;
Hay varias maneras diferentes de crear o clonar tablas. Para obtener más información, consulte CREATE TABLE.
En Databricks Runtime 13.3 LTS y versiones posteriores, puede usar CREATE TABLE LIKE para crear una nueva tabla Delta vacía que duplique las propiedades de esquema y tabla de una tabla Delta de origen. Esto puede ser útil al promover tablas de un entorno de desarrollo en producción.
CREATE TABLE workspace.default.people_10k_prod LIKE workspace.default.people_10k
Importante
Esta característica está en versión preliminar pública.
Use la DeltaTableBuilder API para Python y Scala para crear una tabla vacía. En comparación con DataFrameWriter y DataFrameWriterV2, la DeltaTableBuilder API facilita la especificación de información adicional, como comentarios de columna, propiedades de tabla y columnas generadas.
Pitón
from delta.tables import DeltaTable
(
DeltaTable.createIfNotExists(spark)
.tableName("workspace.default.people_10k_2")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("lastName", "STRING", comment="surname")
.addColumn("gender", "STRING")
.addColumn("age", "INT")
.execute()
)
display(spark.read.table("workspace.default.people_10k_2"))
Scala
import io.delta.tables.DeltaTable
DeltaTable.createOrReplace(spark)
.tableName("workspace.default.people_10k")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build()
)
.addColumn("gender", "STRING")
.addColumn("age", "INT")
.execute()
display(spark.read.table("workspace.default.people_10k"))
Actualización/inserción (upsert) en una tabla
Modifique los registros existentes en una tabla o agregue otros nuevos mediante una operación llamada upsert. Para combinar un conjunto de actualizaciones e inserciones en una tabla Delta existente, use el DeltaTable.merge método en Python y Scala y la MERGE INTO instrucción en SQL.
Por ejemplo, combine datos de la tabla people_10k_updates de origen con la tabla workspace.default.people_10kDelta de destino . Cuando hay una fila coincidente en ambas tablas, Delta Lake actualiza la columna de datos mediante la expresión especificada. Cuando no hay ninguna fila que coincida, Delta Lake agrega una nueva fila.
Pitón
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from delta.tables import DeltaTable
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])
data = [
(10001, 'Billy', 'Luppitt', 'M', 55),
(10002, 'Mary', 'Smith', 'F', 98),
(10003, 'Elias', 'Leadbetter', 'M', 48),
(10004, 'Jane', 'Doe', 'F', 30),
(10005, 'Joshua', '', 'M', 90),
(10006, 'Ginger', '', 'F', 16),
]
# Create the source table if it does not exist. Otherwise, replace the existing source table.
people_10k_updates = spark.createDataFrame(data, schema)
people_10k_updates.createOrReplaceTempView("people_10k_updates")
# Merge the source and target tables.
deltaTable = DeltaTable.forName(spark, 'workspace.default.people_10k')
(deltaTable.alias("people_10k")
.merge(
people_10k_updates.alias("people_10k_updates"),
"people_10k.id = people_10k_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# View the additions to the table.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] >= 10001)
display(df_filtered)
Scala
import org.apache.spark.sql.types._
import io.delta.tables._
// Define schema
val schema = StructType(Array(
StructField("id", IntegerType, true),
StructField("firstName", StringType, true),
StructField("lastName", StringType, true),
StructField("gender", StringType, true),
StructField("age", IntegerType, true)
))
// Create data as Seq of Tuples
val data = Seq(
(10001, "Billy", "Luppitt", "M", 55),
(10002, "Mary", "Smith", "F", 98),
(10003, "Elias", "Leadbetter", "M", 48),
(10004, "Jane", "Doe", "F", 30),
(10005, "Joshua", "", "M", 90),
(10006, "Ginger", "", "F", 16)
)
// Create DataFrame directly from Seq of Tuples
val people_10k_updates = spark.createDataFrame(data).toDF(
"id", "firstName", "lastName", "gender", "age"
)
people_10k_updates.createOrReplaceTempView("people_10k_updates")
// Merge the source and target tables
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.as("people_10k")
.merge(
people_10k_updates.as("people_10k_updates"),
"people_10k.id = people_10k_updates.id"
)
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
// View the additions to the table.
val df = spark.read.table("workspace.default.people_10k")
val df_filtered = df.filter($"id" >= 10001)
display(df_filtered)
SQL
-- Create the source table if it does not exist. Otherwise, replace the existing source table.
CREATE OR REPLACE TABLE workspace.default.people_10k_updates(
id INT,
firstName STRING,
lastName STRING,
gender STRING,
age INT
);
-- Insert new data into the source table.
INSERT INTO workspace.default.people_10k_updates VALUES
(10001, "Billy", "Luppitt", "M", 55),
(10002, "Mary", "Smith", "F", 98),
(10003, "Elias", "Leadbetter", "M", 48),
(10004, "Jane", "Doe", "F", 30),
(10005, "Joshua", "", "M", 90),
(10006, "Ginger", "", "F", 16);
-- Merge the source and target tables.
MERGE INTO workspace.default.people_10k AS people_10k
USING workspace.default.people_10k_updates AS people_10k_updates
ON people_10k.id = people_10k_updates.id
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *;
-- View the additions to the table.
SELECT * FROM workspace.default.people_10k WHERE id >= 10001
En SQL, el * operador actualiza o inserta todas las columnas de la tabla de destino, suponiendo que la tabla de origen tenga las mismas columnas que la tabla de destino. Si la tabla de destino no tiene las mismas columnas, la consulta produce un error de análisis. Además, debe especificar un valor para cada columna de la tabla al realizar una operación de inserción. Los valores de columna pueden estar vacíos, por ejemplo, ''. Al realizar una operación de inserción, no es necesario actualizar todos los valores.
Lectura de una tabla
Use el nombre o la ruta de acceso de la tabla para acceder a los datos de las tablas Delta. Para acceder a las tablas administradas de Unity Catalog, use un nombre de tabla completamente cualificado. El acceso basado en rutas de acceso solo se admite para volúmenes y tablas externas, no para tablas administradas. Para obtener más información, consulte Reglas de ruta de acceso y acceso en volúmenes del catálogo de Unity.
Pitón
people_df = spark.read.table("workspace.default.people_10k")
display(people_df)
Scala
val people_df = spark.read.table("workspace.default.people_10k")
display(people_df)
SQL
SELECT * FROM workspace.default.people_10k;
Escritura en una tabla
Delta Lake usa la sintaxis estándar para escribir datos en tablas. Para agregar nuevos datos a una tabla Delta existente, use el modo de anexión. A diferencia de upserting, escribir en una tabla no comprueba si hay registros duplicados.
Pitón
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])
data = [
(10007, 'Miku', 'Hatsune', 'F', 25)
]
# Create the new data.
df = spark.createDataFrame(data, schema)
# Append the new data to the target table.
df.write.mode("append").saveAsTable("workspace.default.people_10k")
# View the new addition.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] == 10007)
display(df_filtered)
Scala
// Create the new data.
val data = Seq(
(10007, "Miku", "Hatsune", "F", 25)
)
val df = spark.createDataFrame(data)
.toDF("id", "firstName", "lastName", "gender", "age")
// Append the new data to the target table
df.write.mode("append").saveAsTable("workspace.default.people_10k")
// View the new addition.
val df2 = spark.read.table("workspace.default.people_10k")
val df_filtered = df2.filter($"id" === 10007)
display(df_filtered)
SQL
CREATE OR REPLACE TABLE workspace.default.people_10k_new (
id INT,
firstName STRING,
lastName STRING,
gender STRING,
age INT
);
-- Insert the new data.
INSERT INTO workspace.default.people_10k_new VALUES
(10007, 'Miku', 'Hatsune', 'F', 25);
-- Append the new data to the target table.
INSERT INTO workspace.default.people_10k
SELECT * FROM workspace.default.people_10k_new;
-- View the new addition.
SELECT * FROM workspace.default.people_10k WHERE id = 10007;
Las salidas de celda del notebook de Databricks muestran un máximo de 10.000 filas o 2 MB, lo que sea menor. Dado que workspace.default.people_10k contiene más de 10 000 filas, solo las primeras 10 000 filas aparecen en la salida del cuaderno para display(df). Las filas adicionales están presentes en la tabla, pero no se representan en la salida del cuaderno debido a este límite. Puede ver las filas adicionales filtrando específicamente para ellas.
Para reemplazar todos los datos de una tabla, use el modo de sobrescritura.
Pitón
df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")
Scala
df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")
SQL
INSERT OVERWRITE TABLE workspace.default.people_10k SELECT * FROM workspace.default.people_10k_2
Actualización de una tabla
Actualice los datos de una tabla Delta en función de un predicado. Por ejemplo, cambie los valores de la gender columna de Female a F, de Male a My de Other a O.
Pitón
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
# Declare the predicate and update rows using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'Female'",
set = { "gender": "'F'" }
)
# Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'Male',
set = { 'gender': lit('M') }
)
deltaTable.update(
condition = col('gender') == 'Other',
set = { 'gender': lit('O') }
)
# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
// Declare the predicate and update rows using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'Female'",
Map("gender" -> "'F'")
)
// Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
col("gender") === "Male",
Map("gender" -> lit("M")));
deltaTable.update(
col("gender") === "Other",
Map("gender" -> lit("O")));
// View the updated table.
val df = spark.read.table("workspace.default.people_10k")
display(df)
SQL
-- Declare the predicate and update rows.
UPDATE workspace.default.people_10k SET gender = 'F' WHERE gender = 'Female';
UPDATE workspace.default.people_10k SET gender = 'M' WHERE gender = 'Male';
UPDATE workspace.default.people_10k SET gender = 'O' WHERE gender = 'Other';
-- View the updated table.
SELECT * FROM workspace.default.people_10k;
Eliminación en una tabla
Quite los datos que coincidan con un predicado de una tabla Delta. Por ejemplo, el código siguiente muestra dos operaciones de eliminación: la primera eliminación de filas en las que la edad es inferior a 18 y, a continuación, elimina las filas en las que la edad es inferior a 21.
Pitón
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
# Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")
# Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col('age') < '21')
# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
// Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")
// Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col("age") < "21")
// View the updated table.
val df = spark.read.table("workspace.default.people_10k")
display(df)
SQL
-- Delete rows using a predicate.
DELETE FROM workspace.default.people_10k WHERE age < '21';
-- View the updated table.
SELECT * FROM workspace.default.people_10k;
Importante
La eliminación quita los datos de la versión más reciente de la tabla Delta, pero no lo quita del almacenamiento físico hasta que las versiones anteriores se vacían explícitamente. Para obtener más información, consulte vacío.
Visualización del historial de tablas
Use el DeltaTable.history método en Python y Scala y la DESCRIBE HISTORY instrucción en SQL para ver la información de procedencia de cada escritura en una tabla.
Pitón
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())
SQL
DESCRIBE HISTORY workspace.default.people_10k
Consulta una versión anterior de la tabla usando viaje en el tiempo
Consulte una instantánea anterior de una tabla Delta mediante el viaje en tiempo de Delta Lake. Para consultar una versión específica, use el número de versión o la marca de tiempo de la tabla. Por ejemplo, consulta la versión 0 o la marca de tiempo 2026-01-05T23:09:47.000+00:00 del historial de la tabla.
Pitón
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaHistory = deltaTable.history()
# Query using the version number.
display(deltaHistory.where("version == 0"))
# Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
val deltaHistory = deltaTable.history()
// Query using the version number.
display(deltaHistory.where("version == 0"))
// Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))
SQL
-- Query using the version number
SELECT * FROM workspace.default.people_10k VERSION AS OF 0;
-- Query using the timestamp
SELECT * FROM workspace.default.people_10k TIMESTAMP AS OF '2026-01-05T23:09:47.000+00:00';
En el caso de las marcas de tiempo, solo se aceptan cadenas de fecha o marca de tiempo. Por ejemplo, las cadenas deben tener el formato "2026-01-05T22:43:15.000+00:00" o "2026-01-05 22:43:15".
Utilice las opciones de DataFrameReader para crear un DataFrame a partir de una tabla Delta que está fijada en una versión o marca de tiempo específica de la tabla.
Pitón
# Query using the version number.
df = spark.read.option('versionAsOf', 0).table("workspace.default.people_10k")
# Query using the timestamp.
df = spark.read.option('timestampAsOf', '2026-01-05T23:09:47.000+00:00').table("workspace.default.people_10k")
display(df)
Scala
// Query using the version number.
val dfVersion = spark.read
.option("versionAsOf", 0)
.table("workspace.default.people_10k")
// Query using the timestamp.
val dfTimestamp = spark.read
.option("timestampAsOf", "2026-01-05T23:09:47.000+00:00")
.table("workspace.default.people_10k")
display(dfVersion)
display(dfTimestamp)
SQL
-- Create a temporary view from version 0 of the table.
CREATE OR REPLACE TEMPORARY VIEW people_10k_v0 AS
SELECT * FROM workspace.default.people_10k VERSION AS OF 0;
-- Create a temporary view from a previous timestamp of the table.
CREATE OR REPLACE TEMPORARY VIEW people_10k_t0 AS
SELECT * FROM workspace.default.people_10k TIMESTAMP AS OF '2026-01-05T23:09:47.000+00:00';
SELECT * FROM people_10k_v0;
SELECT * FROM people_10k_t0;
Para obtener más información, consulte Trabajar con historial de tablas.
Optimización de una tabla
Varios cambios en una tabla pueden crear varios archivos pequeños, lo que ralentiza el rendimiento de las consultas de lectura. Use la operación de optimización para mejorar la velocidad mediante la combinación de archivos pequeños en archivos más grandes. Consulte OPTIMIZE.
Pitón
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()
SQL
OPTIMIZE workspace.default.people_10k
Nota:
Si la optimización predictiva está habilitada, no es necesario optimizar manualmente. La optimización predictiva administra automáticamente las tareas de mantenimiento. Para obtener más información, consulte Optimización predictiva para tablas administradas del catálogo de Unity.
Orden Z por columnas
Para ordenar datos en z y así mejorar aún más el rendimiento de lectura, especifique las columnas por las cuales se debe ordenar durante la operación. Por ejemplo, colócate por la columna firstNamede cardinalidad alta . Para obtener más información sobre el "orden z", consulte Omisión de datos.
Pitón
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")
SQL
OPTIMIZE workspace.default.people_10k
ZORDER BY (firstName)
Limpiar instantáneas con la operación vacuum
Delta Lake tiene aislamiento de instantáneas para lecturas, lo que significa que es seguro ejecutar una operación de optimización mientras otros usuarios o trabajos consultan la tabla. Sin embargo, finalmente debe limpiar las instantáneas antiguas porque, al hacerlo, se reducen los costos de almacenamiento, se mejora el rendimiento de las consultas y se garantiza el cumplimiento de los datos. Ejecute la VACUUM operación para limpiar las instantáneas antiguas. Consulte VACUUM.
Pitón
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()
SQL
VACUUM workspace.default.people_10k
Para obtener más información sobre el uso eficaz de la operación de vacío, consulte Eliminación de archivos de datos sin usar con vacío.