Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Este tutorial demonstra operações comuns da tabela Delta usando dados de exemplo. Delta Lake é a camada de armazenamento otimizada que fornece a base para tabelas no Databricks. A menos que especificado de outra forma, todas as tabelas no Databricks são tabelas Delta.
Antes de começar
Para concluir este tutorial, você precisará:
- Permissão para usar um recurso de computação existente ou criar um novo recurso de computação. Consulte Compute.
- Permissões do Catálogo do Unity:
USE CATALOG,USE SCHEMA, eCREATE TABLEno catálogoworkspace. Para definir essas permissões, consulte o administrador do Databricks ou Privilégios e objetos protegíveis do Catálogo Unity.
Esses exemplos dependem de um conjunto de dados chamado Registros de Pessoas Sintéticas: Registros de 10K a 10M. Esse conjunto de dados contém registros fictícios de pessoas, incluindo seus nomes e sobrenomes, sexo e idade.
Primeiro, baixe o conjunto de dados deste tutorial.
- Visite a página de Registros de Pessoas Sintéticas: 10 mil a 10 milhões de registros no Kaggle.
- Clique em Baixar e , em seguida, baixe o conjunto de dados como zip. Isso baixa um arquivo nomeado
archive.zippara seu computador local. - Extraia a
archivepasta doarchive.ziparquivo.
Em seguida, carregue o person_10000.csv conjunto de dados em um volume no Catálogo Unity em seu ambiente de trabalho do Azure Databricks. O Azure Databricks recomenda carregar seus dados em um volume do Catálogo do Unity porque os volumes fornecem recursos para acessar, armazenar, controlar e organizar arquivos.
- Abra o Gerenciador de Catálogos clicando no
Catálogo na barra lateral.
- No Gerenciador de Catálogos, clique em
Adicionar dados e Criar um volume. - Nomeie o volume
my-volumee selecione Volume gerenciado como o tipo de volume. - Selecione o
workspacecatálogo e odefaultesquema e clique em Criar. - Abra
my-volumee clique em Carregar para este volume. - Arraste e solte ou navegue até e selecione o
person_10000.csvarquivo no interior daarchivepasta no seu computador local. - Clique em Carregar.
Por fim, crie um bloco de anotações para executar o código de exemplo.
- Clique em
Novo na barra lateral. - Clique no
Bloco de anotações para criar um novo bloco de anotações.
- Escolha um idioma para o bloco de anotações.
Criar uma tabela
Crie uma nova tabela gerenciada do Catálogo do Unity com o nome workspace.default.people_10k de person_10000.csv. O Delta Lake é o padrão para todos os comandos de criação, leitura e gravação de tabela no Azure Databricks.
Python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])
df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/workspace/default/my-volume/person_10000.csv")
# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()
# If you know the table does not already exist, you can use this command instead.
# df.write.saveAsTable("workspace.default.people_10k")
# View the new table.
df = spark.read.table("workspace.default.people_10k")
display(df)
Scala (linguagem de programação)
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", IntegerType, true),
StructField("firstName", StringType, true),
StructField("lastName", StringType, true),
StructField("gender", StringType, true),
StructField("age", IntegerType, true)
))
val df = spark.read
.format("csv")
.option("header", true)
.schema(schema)
.load("/Volumes/workspace/default/my-volume/person_10000.csv")
// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()
// If you know the table does not already exist, you can use this command instead.
// df.saveAsTable("workspace.default.people_10k")
// View the new table.
val df2 = spark.read.table("workspace.default.people_10k")
display(df2)
SQL
-- Create the table with only the required columns and rename person_id to id.
CREATE OR REPLACE TABLE workspace.default.people_10k AS
SELECT
person_id AS id,
firstname,
lastname,
gender,
age
FROM read_files(
'/Volumes/workspace/default/my-volume/person_10000.csv',
format => 'csv',
header => true
);
-- View the new table.
SELECT * FROM workspace.default.people_10k;
Há várias maneiras diferentes de criar ou clonar tabelas. Para obter mais informações, consulte CREATE TABLE.
No Databricks Runtime 13.3 LTS e posteriores, você pode usar CREATE TABLE LIKE para criar uma nova tabela Delta vazia que duplica a estrutura e as propriedades de tabela da tabela Delta de origem. Isso pode ser útil ao migrar tabelas de um ambiente de desenvolvimento para um ambiente em produção.
CREATE TABLE workspace.default.people_10k_prod LIKE workspace.default.people_10k
Importante
Esse recurso está em uma versão prévia.
Use a DeltaTableBuilder API para Python e Scala para criar uma tabela vazia. Em comparação DataFrameWriter com e DataFrameWriterV2, a DeltaTableBuilder API facilita a especificação de informações adicionais, como comentários de coluna, propriedades de tabela e colunas geradas.
Python
from delta.tables import DeltaTable
(
DeltaTable.createIfNotExists(spark)
.tableName("workspace.default.people_10k_2")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("lastName", "STRING", comment="surname")
.addColumn("gender", "STRING")
.addColumn("age", "INT")
.execute()
)
display(spark.read.table("workspace.default.people_10k_2"))
Scala (linguagem de programação)
import io.delta.tables.DeltaTable
DeltaTable.createOrReplace(spark)
.tableName("workspace.default.people_10k")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build()
)
.addColumn("gender", "STRING")
.addColumn("age", "INT")
.execute()
display(spark.read.table("workspace.default.people_10k"))
Executar upsert para uma tabela
Modifique os registros existentes em uma tabela ou adicione novos usando uma operação chamada upsert. Para mesclar um conjunto de atualizações e inserções em uma tabela Delta existente, use o DeltaTable.merge método em Python e Scala e a instrução MERGE INTO no SQL.
Por exemplo, mesclar dados da tabela people_10k_updates de origem para a tabela workspace.default.people_10kDelta de destino. Quando há uma linha correspondente nas duas tabelas, o Delta Lake atualiza a coluna de dados usando a expressão especificada. Quando não há nenhuma linha correspondente, o Delta Lake adiciona uma nova linha.
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from delta.tables import DeltaTable
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])
data = [
(10001, 'Billy', 'Luppitt', 'M', 55),
(10002, 'Mary', 'Smith', 'F', 98),
(10003, 'Elias', 'Leadbetter', 'M', 48),
(10004, 'Jane', 'Doe', 'F', 30),
(10005, 'Joshua', '', 'M', 90),
(10006, 'Ginger', '', 'F', 16),
]
# Create the source table if it does not exist. Otherwise, replace the existing source table.
people_10k_updates = spark.createDataFrame(data, schema)
people_10k_updates.createOrReplaceTempView("people_10k_updates")
# Merge the source and target tables.
deltaTable = DeltaTable.forName(spark, 'workspace.default.people_10k')
(deltaTable.alias("people_10k")
.merge(
people_10k_updates.alias("people_10k_updates"),
"people_10k.id = people_10k_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# View the additions to the table.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] >= 10001)
display(df_filtered)
Scala (linguagem de programação)
import org.apache.spark.sql.types._
import io.delta.tables._
// Define schema
val schema = StructType(Array(
StructField("id", IntegerType, true),
StructField("firstName", StringType, true),
StructField("lastName", StringType, true),
StructField("gender", StringType, true),
StructField("age", IntegerType, true)
))
// Create data as Seq of Tuples
val data = Seq(
(10001, "Billy", "Luppitt", "M", 55),
(10002, "Mary", "Smith", "F", 98),
(10003, "Elias", "Leadbetter", "M", 48),
(10004, "Jane", "Doe", "F", 30),
(10005, "Joshua", "", "M", 90),
(10006, "Ginger", "", "F", 16)
)
// Create DataFrame directly from Seq of Tuples
val people_10k_updates = spark.createDataFrame(data).toDF(
"id", "firstName", "lastName", "gender", "age"
)
people_10k_updates.createOrReplaceTempView("people_10k_updates")
// Merge the source and target tables
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.as("people_10k")
.merge(
people_10k_updates.as("people_10k_updates"),
"people_10k.id = people_10k_updates.id"
)
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
// View the additions to the table.
val df = spark.read.table("workspace.default.people_10k")
val df_filtered = df.filter($"id" >= 10001)
display(df_filtered)
SQL
-- Create the source table if it does not exist. Otherwise, replace the existing source table.
CREATE OR REPLACE TABLE workspace.default.people_10k_updates(
id INT,
firstName STRING,
lastName STRING,
gender STRING,
age INT
);
-- Insert new data into the source table.
INSERT INTO workspace.default.people_10k_updates VALUES
(10001, "Billy", "Luppitt", "M", 55),
(10002, "Mary", "Smith", "F", 98),
(10003, "Elias", "Leadbetter", "M", 48),
(10004, "Jane", "Doe", "F", 30),
(10005, "Joshua", "", "M", 90),
(10006, "Ginger", "", "F", 16);
-- Merge the source and target tables.
MERGE INTO workspace.default.people_10k AS people_10k
USING workspace.default.people_10k_updates AS people_10k_updates
ON people_10k.id = people_10k_updates.id
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *;
-- View the additions to the table.
SELECT * FROM workspace.default.people_10k WHERE id >= 10001
No SQL, o * operador atualiza ou insere todas as colunas na tabela de destino, supondo que a tabela de origem tenha as mesmas colunas que a tabela de destino. Se a tabela de destino não tiver as mesmas colunas, a consulta gerará um erro de análise. Além disso, você deve especificar um valor para cada coluna em sua tabela ao executar uma operação de inserção. Os valores de coluna podem estar vazios, por exemplo, ''. Ao executar uma operação de inserção, você não precisa atualizar todos os valores.
Ler uma tabela
Use o nome da tabela ou o caminho para acessar dados em tabelas Delta. Para acessar tabelas gerenciadas do Catálogo do Unity, use um nome de tabela totalmente qualificado. O acesso baseado em caminho só tem suporte para volumes e tabelas externas, não para tabelas gerenciadas. Para mais informações, consulte Regras de Caminho e Acesso em volumes do Unity Catalog.
Python
people_df = spark.read.table("workspace.default.people_10k")
display(people_df)
Scala (linguagem de programação)
val people_df = spark.read.table("workspace.default.people_10k")
display(people_df)
SQL
SELECT * FROM workspace.default.people_10k;
Gravar em uma tabela
O Delta Lake usa a sintaxe padrão para gravar dados em tabelas. Para adicionar novos dados a uma tabela Delta existente, use o modo de acréscimo. Ao contrário de usar upsert, gravar em uma tabela não verifica a existência de registros duplicados.
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])
data = [
(10007, 'Miku', 'Hatsune', 'F', 25)
]
# Create the new data.
df = spark.createDataFrame(data, schema)
# Append the new data to the target table.
df.write.mode("append").saveAsTable("workspace.default.people_10k")
# View the new addition.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] == 10007)
display(df_filtered)
Scala (linguagem de programação)
// Create the new data.
val data = Seq(
(10007, "Miku", "Hatsune", "F", 25)
)
val df = spark.createDataFrame(data)
.toDF("id", "firstName", "lastName", "gender", "age")
// Append the new data to the target table
df.write.mode("append").saveAsTable("workspace.default.people_10k")
// View the new addition.
val df2 = spark.read.table("workspace.default.people_10k")
val df_filtered = df2.filter($"id" === 10007)
display(df_filtered)
SQL
CREATE OR REPLACE TABLE workspace.default.people_10k_new (
id INT,
firstName STRING,
lastName STRING,
gender STRING,
age INT
);
-- Insert the new data.
INSERT INTO workspace.default.people_10k_new VALUES
(10007, 'Miku', 'Hatsune', 'F', 25);
-- Append the new data to the target table.
INSERT INTO workspace.default.people_10k
SELECT * FROM workspace.default.people_10k_new;
-- View the new addition.
SELECT * FROM workspace.default.people_10k WHERE id = 10007;
O máximo que as saídas da célula do notebook do Databricks exibem é de 10.000 linhas ou 2 MB, o que for menor. Como workspace.default.people_10k contém mais de 10.000 linhas, apenas as primeiras 10.000 linhas aparecem na saída do notebook para display(df). As linhas adicionais estão presentes na tabela, mas não são renderizadas na saída do notebook devido a esse limite. Você pode exibir as linhas adicionais filtrando especificamente para elas.
Para substituir todos os dados em uma tabela, use o modo de substituição.
Python
df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")
Scala (linguagem de programação)
df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")
SQL
INSERT OVERWRITE TABLE workspace.default.people_10k SELECT * FROM workspace.default.people_10k_2
Atualizar uma tabela
Atualize dados em uma tabela Delta com base em um predicado. Por exemplo, altere os valores na gender coluna de Female para F, de Male para M, e de Other para O.
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
# Declare the predicate and update rows using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'Female'",
set = { "gender": "'F'" }
)
# Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'Male',
set = { 'gender': lit('M') }
)
deltaTable.update(
condition = col('gender') == 'Other',
set = { 'gender': lit('O') }
)
# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)
Scala (linguagem de programação)
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
// Declare the predicate and update rows using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'Female'",
Map("gender" -> "'F'")
)
// Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
col("gender") === "Male",
Map("gender" -> lit("M")));
deltaTable.update(
col("gender") === "Other",
Map("gender" -> lit("O")));
// View the updated table.
val df = spark.read.table("workspace.default.people_10k")
display(df)
SQL
-- Declare the predicate and update rows.
UPDATE workspace.default.people_10k SET gender = 'F' WHERE gender = 'Female';
UPDATE workspace.default.people_10k SET gender = 'M' WHERE gender = 'Male';
UPDATE workspace.default.people_10k SET gender = 'O' WHERE gender = 'Other';
-- View the updated table.
SELECT * FROM workspace.default.people_10k;
Excluir de uma tabela
Remova os dados que correspondem a um predicado de uma tabela Delta. Por exemplo, o código abaixo demonstra duas operações de exclusão: primeiro excluir linhas em que a idade é menor que 18 e, em seguida, excluir linhas em que a idade é menor que 21.
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
# Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")
# Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col('age') < '21')
# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)
Scala (linguagem de programação)
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
// Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")
// Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col("age") < "21")
// View the updated table.
val df = spark.read.table("workspace.default.people_10k")
display(df)
SQL
-- Delete rows using a predicate.
DELETE FROM workspace.default.people_10k WHERE age < '21';
-- View the updated table.
SELECT * FROM workspace.default.people_10k;
Importante
Excluir remove os dados da versão mais recente da tabela Delta, mas não os remove do armazenamento físico até que as versões antigas sejam explicitamente purgadas. Para obter mais informações, consulte vácuo.
Exibir o histórico de tabelas
Utilize o DeltaTable.history método em Python e Scala e a DESCRIBE HISTORY instrução em SQL para exibir as informações de proveniência de cada gravação em uma tabela.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())
Scala (linguagem de programação)
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())
SQL
DESCRIBE HISTORY workspace.default.people_10k
Consultar uma versão anterior da tabela usando a viagem no tempo
Recuperar um instantâneo mais antigo de uma tabela Delta usando a viagem no tempo do Delta Lake. Para consultar uma versão específica, use o número de versão ou o carimbo de data/hora da tabela. Por exemplo, consulte a versão 0 ou o timestamp 2026-01-05T23:09:47.000+00:00 do histórico da tabela.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaHistory = deltaTable.history()
# Query using the version number.
display(deltaHistory.where("version == 0"))
# Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))
Scala (linguagem de programação)
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
val deltaHistory = deltaTable.history()
// Query using the version number.
display(deltaHistory.where("version == 0"))
// Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))
SQL
-- Query using the version number
SELECT * FROM workspace.default.people_10k VERSION AS OF 0;
-- Query using the timestamp
SELECT * FROM workspace.default.people_10k TIMESTAMP AS OF '2026-01-05T23:09:47.000+00:00';
Para timestamps, somente formatos de data ou timestamp são aceitos. Por exemplo, as cadeias de caracteres devem ser formatadas como "2026-01-05T22:43:15.000+00:00" ou "2026-01-05 22:43:15".
Use DataFrameReader para criar um DataFrame de uma tabela Delta fixada em uma versão específica ou timestamp da tabela.
Python
# Query using the version number.
df = spark.read.option('versionAsOf', 0).table("workspace.default.people_10k")
# Query using the timestamp.
df = spark.read.option('timestampAsOf', '2026-01-05T23:09:47.000+00:00').table("workspace.default.people_10k")
display(df)
Scala (linguagem de programação)
// Query using the version number.
val dfVersion = spark.read
.option("versionAsOf", 0)
.table("workspace.default.people_10k")
// Query using the timestamp.
val dfTimestamp = spark.read
.option("timestampAsOf", "2026-01-05T23:09:47.000+00:00")
.table("workspace.default.people_10k")
display(dfVersion)
display(dfTimestamp)
SQL
-- Create a temporary view from version 0 of the table.
CREATE OR REPLACE TEMPORARY VIEW people_10k_v0 AS
SELECT * FROM workspace.default.people_10k VERSION AS OF 0;
-- Create a temporary view from a previous timestamp of the table.
CREATE OR REPLACE TEMPORARY VIEW people_10k_t0 AS
SELECT * FROM workspace.default.people_10k TIMESTAMP AS OF '2026-01-05T23:09:47.000+00:00';
SELECT * FROM people_10k_v0;
SELECT * FROM people_10k_t0;
Para obter mais informações, consulte Trabalhar com o histórico de tabelas.
Otimizar uma tabela
Várias alterações em uma tabela podem criar vários arquivos pequenos, o que retarda o desempenho da consulta de leitura. Use a operação de otimização para melhorar a velocidade combinando arquivos pequenos em arquivos maiores. Consulte OPTIMIZE.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()
Scala (linguagem de programação)
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()
SQL
OPTIMIZE workspace.default.people_10k
Observação
Se a otimização preditiva estiver habilitada, você não precisará otimizar manualmente. A otimização preditiva gerencia automaticamente as tarefas de manutenção. Para obter mais informações, consulte Otimização preditiva para tabelas gerenciadas do Catálogo do Unity.
Ordem Z por colunas
Para obter dados de ordem z e melhorar ainda mais o desempenho de leitura, especifique as colunas a serem ordenadas na operação. Por exemplo, agrupar pela coluna firstName de alta cardinalidade. Para mais informações sobre z-ordering, consulte Data Skipping.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")
Scala (linguagem de programação)
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")
SQL
OPTIMIZE workspace.default.people_10k
ZORDER BY (firstName)
Limpar instantâneos com a operação de vácuo
O Delta Lake tem isolamento de instantâneo para leituras, o que significa que é seguro executar uma operação de otimização enquanto outros usuários ou processos consultam a tabela. No entanto, você deve eventualmente limpar instantâneos antigos porque isso reduz os custos de armazenamento, melhora o desempenho da consulta e garante a conformidade dos dados. Execute a VACUUM operação para limpar instantâneos antigos. Consulte VACUUM.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()
Scala (linguagem de programação)
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()
SQL
VACUUM workspace.default.people_10k
Para obter mais informações sobre como usar a operação de vácuo efetivamente, consulte Remover arquivos de dados não utilizados com vácuo.