Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Este tutorial demonstra operações comuns de tabela Delta usando dados de exemplo. Delta Lake é a camada de armazenamento otimizada que fornece a base para as tabelas em Databricks. Salvo especificação em contrário, todas as tabelas nos Databricks são tabelas Delta.
Antes de começar
Para concluir este tutorial, você precisa:
- Permissão para usar um recurso de computação existente ou criar um novo recurso de computação. Consulte Compute.
- Permissões do Catálogo Unity:
USE CATALOG,USE SCHEMA, eCREATE TABLEno catálogoworkspace. Para definir essas permissões, consulte o administrador do Databricks ou privilégios do Catálogo Unity e objetos securizáveis.
Estes exemplos baseiam-se num conjunto de dados chamado Synthetic Person Records: 10K to 10M Records. Este conjunto de dados contém registos fictícios de pessoas, incluindo os seus primeiros e apelidos, género e idade.
Primeiro, descarregue o conjunto de dados deste tutorial.
- Visite a página Synthetic Person Records: 10K to 10M Records em Kaggle.
- Clique em Download e depois Transferir conjunto de dados como ficheiro zip. Isto descarrega um ficheiro nomeado
archive.zippara a sua máquina local. - Extrai a
archivepasta doarchive.zipficheiro.
De seguida, carregue o person_10000.csv conjunto de dados para um volume do Unity Catalog dentro do seu espaço de trabalho Azure Databricks. O Azure Databricks recomenda carregar os seus dados para um volume do Unity Catalog, pois os volumes oferecem capacidades para aceder, armazenar, governar e organizar ficheiros.
- Abra o Explorador de Catálogos clicando
Catálogo na barra lateral.
- No Explorador de Catálogo, clique
Adicionar dados e Criar um volume. - Nomeie o volume
my-volumee selecione Volume Gerido como tipo de volume. - Selecione o
workspacecatálogo e odefaultesquema, e depois clique em Criar. - Abra
my-volumee clique em Carregar para este volume. - Arraste e larga ou navega e seleciona o
person_10000.csvficheiro dentro daarchivepasta na tua máquina local. - Clique em Carregar.
Por fim, cria um caderno para executar o código de exemplo.
- Clica
Novo na barra lateral. - Clica
Caderno para criar um novo caderno.
- Escolhe uma língua para o caderno.
Criar uma tabela
Crie uma nova tabela gerida pelo Unity Catalog nomeada workspace.default.people_10k a partir de person_10000.csv. Delta Lake é o padrão para todos os comandos de criação, leitura e escrita de tabelas no Azure Databricks.
Python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])
df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/workspace/default/my-volume/person_10000.csv")
# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()
# If you know the table does not already exist, you can use this command instead.
# df.write.saveAsTable("workspace.default.people_10k")
# View the new table.
df = spark.read.table("workspace.default.people_10k")
display(df)
linguagem de programação Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", IntegerType, true),
StructField("firstName", StringType, true),
StructField("lastName", StringType, true),
StructField("gender", StringType, true),
StructField("age", IntegerType, true)
))
val df = spark.read
.format("csv")
.option("header", true)
.schema(schema)
.load("/Volumes/workspace/default/my-volume/person_10000.csv")
// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()
// If you know the table does not already exist, you can use this command instead.
// df.saveAsTable("workspace.default.people_10k")
// View the new table.
val df2 = spark.read.table("workspace.default.people_10k")
display(df2)
SQL
-- Create the table with only the required columns and rename person_id to id.
CREATE OR REPLACE TABLE workspace.default.people_10k AS
SELECT
person_id AS id,
firstname,
lastname,
gender,
age
FROM read_files(
'/Volumes/workspace/default/my-volume/person_10000.csv',
format => 'csv',
header => true
);
-- View the new table.
SELECT * FROM workspace.default.people_10k;
Existem várias formas diferentes de criar ou clonar tabelas. Para obter mais informações, consulte CREATE TABLE.
No Databricks Runtime 13.3 LTS e superiores, pode usar CREATE TABLE LIKE para criar uma nova tabela Delta vazia que duplique o esquema e as propriedades de tabela de uma tabela Delta de origem. Isto pode ser útil ao promover tabelas de um ambiente de desenvolvimento para produção.
CREATE TABLE workspace.default.people_10k_prod LIKE workspace.default.people_10k
Importante
Esta funcionalidade está em Pré-visualização Pública.
Usa a DeltaTableBuilder API para Python e Scala para criar uma tabela vazia. Comparado com DataFrameWriter e DataFrameWriterV2, a DeltaTableBuilder API facilita a especificação de informação adicional como comentários de colunas, propriedades de tabelas e colunas geradas.
Python
from delta.tables import DeltaTable
(
DeltaTable.createIfNotExists(spark)
.tableName("workspace.default.people_10k_2")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("lastName", "STRING", comment="surname")
.addColumn("gender", "STRING")
.addColumn("age", "INT")
.execute()
)
display(spark.read.table("workspace.default.people_10k_2"))
linguagem de programação Scala
import io.delta.tables.DeltaTable
DeltaTable.createOrReplace(spark)
.tableName("workspace.default.people_10k")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build()
)
.addColumn("gender", "STRING")
.addColumn("age", "INT")
.execute()
display(spark.read.table("workspace.default.people_10k"))
Upsert para uma mesa
Modificar registos existentes numa tabela ou adicionar novos usando uma operação chamada upsert. Para fundir um conjunto de atualizações e inserções numa tabela Delta existente, use o DeltaTable.merge método em Python e Scala e a MERGE INTO instrução em SQL.
Por exemplo, mesclar dados da tabela de origem people_10k_updates para a tabela Delta alvo workspace.default.people_10k. Quando há uma linha correspondente em ambas as tabelas, o Delta Lake atualiza a coluna de dados usando a expressão fornecida. Quando não há uma linha correspondente, o Delta Lake adiciona uma nova linha.
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from delta.tables import DeltaTable
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])
data = [
(10001, 'Billy', 'Luppitt', 'M', 55),
(10002, 'Mary', 'Smith', 'F', 98),
(10003, 'Elias', 'Leadbetter', 'M', 48),
(10004, 'Jane', 'Doe', 'F', 30),
(10005, 'Joshua', '', 'M', 90),
(10006, 'Ginger', '', 'F', 16),
]
# Create the source table if it does not exist. Otherwise, replace the existing source table.
people_10k_updates = spark.createDataFrame(data, schema)
people_10k_updates.createOrReplaceTempView("people_10k_updates")
# Merge the source and target tables.
deltaTable = DeltaTable.forName(spark, 'workspace.default.people_10k')
(deltaTable.alias("people_10k")
.merge(
people_10k_updates.alias("people_10k_updates"),
"people_10k.id = people_10k_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# View the additions to the table.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] >= 10001)
display(df_filtered)
linguagem de programação Scala
import org.apache.spark.sql.types._
import io.delta.tables._
// Define schema
val schema = StructType(Array(
StructField("id", IntegerType, true),
StructField("firstName", StringType, true),
StructField("lastName", StringType, true),
StructField("gender", StringType, true),
StructField("age", IntegerType, true)
))
// Create data as Seq of Tuples
val data = Seq(
(10001, "Billy", "Luppitt", "M", 55),
(10002, "Mary", "Smith", "F", 98),
(10003, "Elias", "Leadbetter", "M", 48),
(10004, "Jane", "Doe", "F", 30),
(10005, "Joshua", "", "M", 90),
(10006, "Ginger", "", "F", 16)
)
// Create DataFrame directly from Seq of Tuples
val people_10k_updates = spark.createDataFrame(data).toDF(
"id", "firstName", "lastName", "gender", "age"
)
people_10k_updates.createOrReplaceTempView("people_10k_updates")
// Merge the source and target tables
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.as("people_10k")
.merge(
people_10k_updates.as("people_10k_updates"),
"people_10k.id = people_10k_updates.id"
)
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
// View the additions to the table.
val df = spark.read.table("workspace.default.people_10k")
val df_filtered = df.filter($"id" >= 10001)
display(df_filtered)
SQL
-- Create the source table if it does not exist. Otherwise, replace the existing source table.
CREATE OR REPLACE TABLE workspace.default.people_10k_updates(
id INT,
firstName STRING,
lastName STRING,
gender STRING,
age INT
);
-- Insert new data into the source table.
INSERT INTO workspace.default.people_10k_updates VALUES
(10001, "Billy", "Luppitt", "M", 55),
(10002, "Mary", "Smith", "F", 98),
(10003, "Elias", "Leadbetter", "M", 48),
(10004, "Jane", "Doe", "F", 30),
(10005, "Joshua", "", "M", 90),
(10006, "Ginger", "", "F", 16);
-- Merge the source and target tables.
MERGE INTO workspace.default.people_10k AS people_10k
USING workspace.default.people_10k_updates AS people_10k_updates
ON people_10k.id = people_10k_updates.id
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *;
-- View the additions to the table.
SELECT * FROM workspace.default.people_10k WHERE id >= 10001
Em SQL, o * operador atualiza ou insere todas as colunas na tabela de destino, assumindo que a tabela de origem tem as mesmas colunas que a tabela de destino. Se a tabela alvo não tiver as mesmas colunas, a consulta gera um erro de análise. Além disso, deve especificar um valor para cada coluna da sua tabela quando realiza uma operação de inserção. Os valores das colunas podem estar vazios, por exemplo, ''. Quando realiza uma operação de inserção, não precisa de atualizar todos os valores.
Ler uma tabela
Use o nome da tabela ou o caminho para aceder aos dados nas tabelas Delta. Para aceder às tabelas geridas pelo Unity Catalog, utilize um nome de tabela totalmente qualificado. O acesso baseado em caminhos é suportado apenas para volumes e tabelas externas, não para tabelas geridas. Para mais informações, consulte Regras de caminho e acesso nos volumes do Catálogo Unity.
Python
people_df = spark.read.table("workspace.default.people_10k")
display(people_df)
linguagem de programação Scala
val people_df = spark.read.table("workspace.default.people_10k")
display(people_df)
SQL
SELECT * FROM workspace.default.people_10k;
Escrever numa tabela
O Delta Lake utiliza a sintaxe padrão para escrever dados em tabelas. Para adicionar novos dados a uma tabela Delta existente, utilize o modo de anexação. Ao contrário do upserting, inserir dados numa tabela não verifica entradas duplicadas.
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])
data = [
(10007, 'Miku', 'Hatsune', 'F', 25)
]
# Create the new data.
df = spark.createDataFrame(data, schema)
# Append the new data to the target table.
df.write.mode("append").saveAsTable("workspace.default.people_10k")
# View the new addition.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] == 10007)
display(df_filtered)
linguagem de programação Scala
// Create the new data.
val data = Seq(
(10007, "Miku", "Hatsune", "F", 25)
)
val df = spark.createDataFrame(data)
.toDF("id", "firstName", "lastName", "gender", "age")
// Append the new data to the target table
df.write.mode("append").saveAsTable("workspace.default.people_10k")
// View the new addition.
val df2 = spark.read.table("workspace.default.people_10k")
val df_filtered = df2.filter($"id" === 10007)
display(df_filtered)
SQL
CREATE OR REPLACE TABLE workspace.default.people_10k_new (
id INT,
firstName STRING,
lastName STRING,
gender STRING,
age INT
);
-- Insert the new data.
INSERT INTO workspace.default.people_10k_new VALUES
(10007, 'Miku', 'Hatsune', 'F', 25);
-- Append the new data to the target table.
INSERT INTO workspace.default.people_10k
SELECT * FROM workspace.default.people_10k_new;
-- View the new addition.
SELECT * FROM workspace.default.people_10k WHERE id = 10007;
As saídas das células dos notebooks Databricks exibem um máximo de 10.000 linhas ou 2 MB, o que for inferior. Como workspace.default.people_10k contém mais de 10.000 linhas, apenas as primeiras 10.000 linhas aparecem na saída do caderno para display(df). As linhas adicionais estão presentes na tabela, mas não são renderizadas na saída do caderno devido a este limite. Podes ver as linhas adicionais filtrando especificamente para elas.
Para substituir todos os dados numa tabela, use o modo de sobrescrição.
Python
df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")
linguagem de programação Scala
df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")
SQL
INSERT OVERWRITE TABLE workspace.default.people_10k SELECT * FROM workspace.default.people_10k_2
Atualizar uma tabela
Atualize os dados numa tabela Delta com base num predicado. Por exemplo, altere os valores na gender coluna de Female para F, de Male para M, e de Other para O.
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
# Declare the predicate and update rows using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'Female'",
set = { "gender": "'F'" }
)
# Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'Male',
set = { 'gender': lit('M') }
)
deltaTable.update(
condition = col('gender') == 'Other',
set = { 'gender': lit('O') }
)
# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)
linguagem de programação Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
// Declare the predicate and update rows using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'Female'",
Map("gender" -> "'F'")
)
// Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
col("gender") === "Male",
Map("gender" -> lit("M")));
deltaTable.update(
col("gender") === "Other",
Map("gender" -> lit("O")));
// View the updated table.
val df = spark.read.table("workspace.default.people_10k")
display(df)
SQL
-- Declare the predicate and update rows.
UPDATE workspace.default.people_10k SET gender = 'F' WHERE gender = 'Female';
UPDATE workspace.default.people_10k SET gender = 'M' WHERE gender = 'Male';
UPDATE workspace.default.people_10k SET gender = 'O' WHERE gender = 'Other';
-- View the updated table.
SELECT * FROM workspace.default.people_10k;
Excluir de uma tabela
Remover dados que correspondam a um predicado de uma tabela Delta. Por exemplo, o código abaixo demonstra duas operações de eliminação: primeiro eliminar linhas com menos de 18 anos, depois eliminar linhas com menos de 21 anos.
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
# Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")
# Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col('age') < '21')
# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)
linguagem de programação Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
// Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")
// Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col("age") < "21")
// View the updated table.
val df = spark.read.table("workspace.default.people_10k")
display(df)
SQL
-- Delete rows using a predicate.
DELETE FROM workspace.default.people_10k WHERE age < '21';
-- View the updated table.
SELECT * FROM workspace.default.people_10k;
Importante
Eliminar remove os dados da versão mais recente da tabela Delta, mas não os remove do armazenamento físico até que as versões antigas sejam explicitamente limpas. Para mais informações, veja vácuo.
Exibir histórico da tabela
Use o método DeltaTable.history em Python e Scala e a instrução DESCRIBE HISTORY em SQL para visualizar a informação de proveniência de cada escrita numa tabela.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())
linguagem de programação Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())
SQL
DESCRIBE HISTORY workspace.default.people_10k
Consulte uma versão anterior da tabela usando viagem no tempo
Consulta um snapshot mais antigo de uma tabela Delta usando a funcionalidade de viagem no tempo do Delta Lake. Para consultar uma versão específica, use o número de versão ou carimbo temporal da tabela. Por exemplo, consultar a versão 0 ou data/hora 2026-01-05T23:09:47.000+00:00 do histórico da tabela.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaHistory = deltaTable.history()
# Query using the version number.
display(deltaHistory.where("version == 0"))
# Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))
linguagem de programação Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
val deltaHistory = deltaTable.history()
// Query using the version number.
display(deltaHistory.where("version == 0"))
// Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))
SQL
-- Query using the version number
SELECT * FROM workspace.default.people_10k VERSION AS OF 0;
-- Query using the timestamp
SELECT * FROM workspace.default.people_10k TIMESTAMP AS OF '2026-01-05T23:09:47.000+00:00';
Para timestamps, apenas são aceites cadeias de data ou de timestamp. Por exemplo, as cadeias devem ser formatadas como "2026-01-05T22:43:15.000+00:00" ou "2026-01-05 22:43:15".
Use as opções DataFrameReader para criar um DataFrame a partir de uma tabela Delta fixa a uma versão específica ou a um timestamp da tabela.
Python
# Query using the version number.
df = spark.read.option('versionAsOf', 0).table("workspace.default.people_10k")
# Query using the timestamp.
df = spark.read.option('timestampAsOf', '2026-01-05T23:09:47.000+00:00').table("workspace.default.people_10k")
display(df)
linguagem de programação Scala
// Query using the version number.
val dfVersion = spark.read
.option("versionAsOf", 0)
.table("workspace.default.people_10k")
// Query using the timestamp.
val dfTimestamp = spark.read
.option("timestampAsOf", "2026-01-05T23:09:47.000+00:00")
.table("workspace.default.people_10k")
display(dfVersion)
display(dfTimestamp)
SQL
-- Create a temporary view from version 0 of the table.
CREATE OR REPLACE TEMPORARY VIEW people_10k_v0 AS
SELECT * FROM workspace.default.people_10k VERSION AS OF 0;
-- Create a temporary view from a previous timestamp of the table.
CREATE OR REPLACE TEMPORARY VIEW people_10k_t0 AS
SELECT * FROM workspace.default.people_10k TIMESTAMP AS OF '2026-01-05T23:09:47.000+00:00';
SELECT * FROM people_10k_v0;
SELECT * FROM people_10k_t0;
Para mais informações, consulte Trabalho com histórico de tabelas.
Otimizar uma tabela
Múltiplas alterações a uma tabela podem criar vários ficheiros pequenos, o que atrasa o desempenho da consulta de leitura. Use a operação de otimização para melhorar a velocidade, combinando ficheiros pequenos em ficheiros maiores. Consulte OPTIMIZE.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()
linguagem de programação Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()
SQL
OPTIMIZE workspace.default.people_10k
Nota
Se a otimização preditiva estiver ativada, não precisa de otimizar manualmente. A otimização preditiva gerencia automaticamente as tarefas de manutenção. Para obter mais informações, consulte Otimização preditiva para tabelas gerenciadas do Unity Catalog.
Ordem Z por colunas
Para aplicar a ordenação Z nos dados e melhorar ainda mais o desempenho da leitura, especifique as colunas pelas quais ordenar na operação. Por exemplo, coloque na coluna de alta cardinalidade firstName. Para mais informações sobre a ordenação z, veja Salto de dados.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")
linguagem de programação Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")
SQL
OPTIMIZE workspace.default.people_10k
ZORDER BY (firstName)
Limpar instantâneos com a operação de aspiração
Delta Lake tem isolamento de snapshots para leituras, o que significa que é seguro executar uma operação de otimização enquanto outros utilizadores ou tarefas estão a consultar a tabela. No entanto, deves eventualmente limpar snapshots antigos porque isso reduz custos de armazenamento, melhora o desempenho das consultas e assegura a conformidade dos dados. Executa a VACUUM operação para limpar snapshots antigos. Consulte VACUUM.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()
linguagem de programação Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()
SQL
VACUUM workspace.default.people_10k
Para mais informações sobre como utilizar eficazmente a operação de vácuo, veja Remover ficheiros de dados não utilizados com vácuo.