Compartilhar via


Tutorial: Criar e gerenciar tabelas do Delta Lake

Este tutorial demonstra operações comuns da tabela Delta usando dados de exemplo. Delta Lake é a camada de armazenamento otimizada que fornece a base para tabelas no Databricks. A menos que especificado de outra forma, todas as tabelas no Databricks são tabelas Delta.

Antes de começar

Para concluir este tutorial, você precisará:

  • Permissão para usar um recurso de computação existente ou criar um novo recurso de computação. Consulte Compute.
  • Permissões do Catálogo do Unity: USE CATALOG, USE SCHEMA, e CREATE TABLE no catálogo workspace. Para definir essas permissões, consulte o administrador do Databricks ou Privilégios e objetos protegíveis do Catálogo Unity.

Esses exemplos dependem de um conjunto de dados chamado Registros de Pessoas Sintéticas: Registros de 10K a 10M. Esse conjunto de dados contém registros fictícios de pessoas, incluindo seus nomes e sobrenomes, sexo e idade.

Primeiro, baixe o conjunto de dados deste tutorial.

  1. Visite a página de Registros de Pessoas Sintéticas: 10 mil a 10 milhões de registros no Kaggle.
  2. Clique em Baixar e , em seguida, baixe o conjunto de dados como zip. Isso baixa um arquivo nomeado archive.zip para seu computador local.
  3. Extraia a archive pasta do archive.zip arquivo.

Em seguida, carregue o person_10000.csv conjunto de dados em um volume no Catálogo Unity em seu ambiente de trabalho do Azure Databricks. O Azure Databricks recomenda carregar seus dados em um volume do Catálogo do Unity porque os volumes fornecem recursos para acessar, armazenar, controlar e organizar arquivos.

  1. Abra o Gerenciador de Catálogos clicando no ícone Dados.Catálogo na barra lateral.
  2. No Gerenciador de Catálogos, clique em Adicionar ou mais íconeAdicionar dados e Criar um volume.
  3. Nomeie o volume my-volume e selecione Volume gerenciado como o tipo de volume.
  4. Selecione o workspace catálogo e o default esquema e clique em Criar.
  5. Abra my-volume e clique em Carregar para este volume.
  6. Arraste e solte ou navegue até e selecione o person_10000.csv arquivo no interior da archive pasta no seu computador local.
  7. Clique em Carregar.

Por fim, crie um bloco de anotações para executar o código de exemplo.

  1. Clique em Adicionar ou mais íconeNovo na barra lateral.
  2. Clique no ícone do bloco de anotações.Bloco de anotações para criar um novo bloco de anotações.
  3. Escolha um idioma para o bloco de anotações.

Criar uma tabela

Crie uma nova tabela gerenciada do Catálogo do Unity com o nome workspace.default.people_10k de person_10000.csv. O Delta Lake é o padrão para todos os comandos de criação, leitura e gravação de tabela no Azure Databricks.

Python

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("age", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/workspace/default/my-volume/person_10000.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()

# If you know the table does not already exist, you can use this command instead.
# df.write.saveAsTable("workspace.default.people_10k")

# View the new table.
df = spark.read.table("workspace.default.people_10k")
display(df)

Scala (linguagem de programação)

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("id", IntegerType, true),
  StructField("firstName", StringType, true),
  StructField("lastName", StringType, true),
  StructField("gender", StringType, true),
  StructField("age", IntegerType, true)
))

val df = spark.read
  .format("csv")
  .option("header", true)
  .schema(schema)
  .load("/Volumes/workspace/default/my-volume/person_10000.csv")

// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()

// If you know the table does not already exist, you can use this command instead.
// df.saveAsTable("workspace.default.people_10k")

// View the new table.
val df2 = spark.read.table("workspace.default.people_10k")
display(df2)

SQL

-- Create the table with only the required columns and rename person_id to id.
CREATE OR REPLACE TABLE workspace.default.people_10k AS
SELECT
  person_id AS id,
  firstname,
  lastname,
  gender,
  age
FROM read_files(
  '/Volumes/workspace/default/my-volume/person_10000.csv',
  format => 'csv',
  header => true
);

-- View the new table.
SELECT * FROM workspace.default.people_10k;

Há várias maneiras diferentes de criar ou clonar tabelas. Para obter mais informações, consulte CREATE TABLE.

No Databricks Runtime 13.3 LTS e posteriores, você pode usar CREATE TABLE LIKE para criar uma nova tabela Delta vazia que duplica a estrutura e as propriedades de tabela da tabela Delta de origem. Isso pode ser útil ao migrar tabelas de um ambiente de desenvolvimento para um ambiente em produção.

CREATE TABLE workspace.default.people_10k_prod LIKE workspace.default.people_10k

Importante

Esse recurso está em uma versão prévia.

Use a DeltaTableBuilder API para Python e Scala para criar uma tabela vazia. Em comparação DataFrameWriter com e DataFrameWriterV2, a DeltaTableBuilder API facilita a especificação de informações adicionais, como comentários de coluna, propriedades de tabela e colunas geradas.

Python

from delta.tables import DeltaTable

(
  DeltaTable.createIfNotExists(spark)
    .tableName("workspace.default.people_10k_2")
    .addColumn("id", "INT")
    .addColumn("firstName", "STRING")
    .addColumn("lastName", "STRING", comment="surname")
    .addColumn("gender", "STRING")
    .addColumn("age", "INT")
    .execute()
)

display(spark.read.table("workspace.default.people_10k_2"))

Scala (linguagem de programação)

import io.delta.tables.DeltaTable

DeltaTable.createOrReplace(spark)
  .tableName("workspace.default.people_10k")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build()
  )
  .addColumn("gender", "STRING")
  .addColumn("age", "INT")
  .execute()

display(spark.read.table("workspace.default.people_10k"))

Executar upsert para uma tabela

Modifique os registros existentes em uma tabela ou adicione novos usando uma operação chamada upsert. Para mesclar um conjunto de atualizações e inserções em uma tabela Delta existente, use o DeltaTable.merge método em Python e Scala e a instrução MERGE INTO no SQL.

Por exemplo, mesclar dados da tabela people_10k_updates de origem para a tabela workspace.default.people_10kDelta de destino. Quando há uma linha correspondente nas duas tabelas, o Delta Lake atualiza a coluna de dados usando a expressão especificada. Quando não há nenhuma linha correspondente, o Delta Lake adiciona uma nova linha.

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from delta.tables import DeltaTable

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("age", IntegerType(), True)
])

data = [
  (10001, 'Billy', 'Luppitt', 'M', 55),
  (10002, 'Mary', 'Smith', 'F', 98),
  (10003, 'Elias', 'Leadbetter', 'M', 48),
  (10004, 'Jane', 'Doe', 'F', 30),
  (10005, 'Joshua', '', 'M', 90),
  (10006, 'Ginger', '', 'F', 16),
]

# Create the source table if it does not exist. Otherwise, replace the existing source table.
people_10k_updates = spark.createDataFrame(data, schema)
people_10k_updates.createOrReplaceTempView("people_10k_updates")

# Merge the source and target tables.
deltaTable = DeltaTable.forName(spark, 'workspace.default.people_10k')

(deltaTable.alias("people_10k")
  .merge(
    people_10k_updates.alias("people_10k_updates"),
    "people_10k.id = people_10k_updates.id")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

# View the additions to the table.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] >= 10001)
display(df_filtered)

Scala (linguagem de programação)

import org.apache.spark.sql.types._
import io.delta.tables._

// Define schema
val schema = StructType(Array(
  StructField("id", IntegerType, true),
  StructField("firstName", StringType, true),
  StructField("lastName", StringType, true),
  StructField("gender", StringType, true),
  StructField("age", IntegerType, true)
))

// Create data as Seq of Tuples
val data = Seq(
  (10001, "Billy", "Luppitt", "M", 55),
  (10002, "Mary", "Smith", "F", 98),
  (10003, "Elias", "Leadbetter", "M", 48),
  (10004, "Jane", "Doe", "F", 30),
  (10005, "Joshua", "", "M", 90),
  (10006, "Ginger", "", "F", 16)
)

// Create DataFrame directly from Seq of Tuples
val people_10k_updates = spark.createDataFrame(data).toDF(
  "id", "firstName", "lastName", "gender", "age"
)
people_10k_updates.createOrReplaceTempView("people_10k_updates")

// Merge the source and target tables
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

deltaTable.as("people_10k")
  .merge(
    people_10k_updates.as("people_10k_updates"),
    "people_10k.id = people_10k_updates.id"
  )
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .execute()

// View the additions to the table.
val df = spark.read.table("workspace.default.people_10k")
val df_filtered = df.filter($"id" >= 10001)
display(df_filtered)

SQL

-- Create the source table if it does not exist. Otherwise, replace the existing source table.
CREATE OR REPLACE TABLE workspace.default.people_10k_updates(
  id INT,
  firstName STRING,
  lastName STRING,
  gender STRING,
  age INT
);
-- Insert new data into the source table.
INSERT INTO workspace.default.people_10k_updates VALUES
  (10001, "Billy", "Luppitt", "M", 55),
  (10002, "Mary", "Smith", "F", 98),
  (10003, "Elias", "Leadbetter", "M", 48),
  (10004, "Jane", "Doe", "F", 30),
  (10005, "Joshua", "", "M", 90),
  (10006, "Ginger", "", "F", 16);

-- Merge the source and target tables.
MERGE INTO workspace.default.people_10k AS people_10k
USING workspace.default.people_10k_updates AS people_10k_updates
ON people_10k.id = people_10k_updates.id
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *;

-- View the additions to the table.
SELECT * FROM workspace.default.people_10k WHERE id >= 10001

No SQL, o * operador atualiza ou insere todas as colunas na tabela de destino, supondo que a tabela de origem tenha as mesmas colunas que a tabela de destino. Se a tabela de destino não tiver as mesmas colunas, a consulta gerará um erro de análise. Além disso, você deve especificar um valor para cada coluna em sua tabela ao executar uma operação de inserção. Os valores de coluna podem estar vazios, por exemplo, ''. Ao executar uma operação de inserção, você não precisa atualizar todos os valores.

Ler uma tabela

Use o nome da tabela ou o caminho para acessar dados em tabelas Delta. Para acessar tabelas gerenciadas do Catálogo do Unity, use um nome de tabela totalmente qualificado. O acesso baseado em caminho só tem suporte para volumes e tabelas externas, não para tabelas gerenciadas. Para mais informações, consulte Regras de Caminho e Acesso em volumes do Unity Catalog.

Python

people_df = spark.read.table("workspace.default.people_10k")
display(people_df)

Scala (linguagem de programação)

val people_df = spark.read.table("workspace.default.people_10k")
display(people_df)

SQL

SELECT * FROM workspace.default.people_10k;

Gravar em uma tabela

O Delta Lake usa a sintaxe padrão para gravar dados em tabelas. Para adicionar novos dados a uma tabela Delta existente, use o modo de acréscimo. Ao contrário de usar upsert, gravar em uma tabela não verifica a existência de registros duplicados.

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("age", IntegerType(), True)
])

data = [
  (10007, 'Miku', 'Hatsune', 'F', 25)
]

# Create the new data.
df  = spark.createDataFrame(data, schema)

# Append the new data to the target table.
df.write.mode("append").saveAsTable("workspace.default.people_10k")

# View the new addition.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] == 10007)
display(df_filtered)

Scala (linguagem de programação)

// Create the new data.
val data = Seq(
  (10007, "Miku", "Hatsune", "F", 25)
)

val df = spark.createDataFrame(data)
  .toDF("id", "firstName", "lastName", "gender", "age")

// Append the new data to the target table
df.write.mode("append").saveAsTable("workspace.default.people_10k")

// View the new addition.
val df2 = spark.read.table("workspace.default.people_10k")
val df_filtered = df2.filter($"id" === 10007)
display(df_filtered)

SQL

CREATE OR REPLACE TABLE workspace.default.people_10k_new (
  id INT,
  firstName STRING,
  lastName STRING,
  gender STRING,
  age INT
);

-- Insert the new data.
INSERT INTO workspace.default.people_10k_new VALUES
  (10007, 'Miku', 'Hatsune', 'F', 25);

-- Append the new data to the target table.
INSERT INTO workspace.default.people_10k
SELECT * FROM workspace.default.people_10k_new;

-- View the new addition.
SELECT * FROM workspace.default.people_10k WHERE id = 10007;

O máximo que as saídas da célula do notebook do Databricks exibem é de 10.000 linhas ou 2 MB, o que for menor. Como workspace.default.people_10k contém mais de 10.000 linhas, apenas as primeiras 10.000 linhas aparecem na saída do notebook para display(df). As linhas adicionais estão presentes na tabela, mas não são renderizadas na saída do notebook devido a esse limite. Você pode exibir as linhas adicionais filtrando especificamente para elas.

Para substituir todos os dados em uma tabela, use o modo de substituição.

Python

df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")

Scala (linguagem de programação)

df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")

SQL

INSERT OVERWRITE TABLE workspace.default.people_10k SELECT * FROM workspace.default.people_10k_2

Atualizar uma tabela

Atualize dados em uma tabela Delta com base em um predicado. Por exemplo, altere os valores na gender coluna de Female para F, de Male para M, e de Other para O.

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

# Declare the predicate and update rows using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'Female'",
  set = { "gender": "'F'" }
)

# Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'Male',
  set = { 'gender': lit('M') }
)

deltaTable.update(
  condition = col('gender') == 'Other',
  set = { 'gender': lit('O') }
)

# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)

Scala (linguagem de programação)

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

// Declare the predicate and update rows using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'Female'",
  Map("gender" -> "'F'")
)

// Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
  col("gender") === "Male",
  Map("gender" -> lit("M")));

deltaTable.update(
  col("gender") === "Other",
  Map("gender" -> lit("O")));

// View the updated table.
val df = spark.read.table("workspace.default.people_10k")
display(df)

SQL

-- Declare the predicate and update rows.
UPDATE workspace.default.people_10k SET gender = 'F' WHERE gender = 'Female';
UPDATE workspace.default.people_10k SET gender = 'M' WHERE gender = 'Male';
UPDATE workspace.default.people_10k SET gender = 'O' WHERE gender = 'Other';

-- View the updated table.
SELECT * FROM workspace.default.people_10k;

Excluir de uma tabela

Remova os dados que correspondem a um predicado de uma tabela Delta. Por exemplo, o código abaixo demonstra duas operações de exclusão: primeiro excluir linhas em que a idade é menor que 18 e, em seguida, excluir linhas em que a idade é menor que 21.

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

# Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")

# Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col('age') < '21')

# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)

Scala (linguagem de programação)

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

// Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")

// Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col("age") < "21")

// View the updated table.
val df = spark.read.table("workspace.default.people_10k")
display(df)

SQL

-- Delete rows using a predicate.
DELETE FROM workspace.default.people_10k WHERE age < '21';

-- View the updated table.
SELECT * FROM workspace.default.people_10k;

Importante

Excluir remove os dados da versão mais recente da tabela Delta, mas não os remove do armazenamento físico até que as versões antigas sejam explicitamente purgadas. Para obter mais informações, consulte vácuo.

Exibir o histórico de tabelas

Utilize o DeltaTable.history método em Python e Scala e a DESCRIBE HISTORY instrução em SQL para exibir as informações de proveniência de cada gravação em uma tabela.

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())

Scala (linguagem de programação)

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())

SQL

DESCRIBE HISTORY workspace.default.people_10k

Consultar uma versão anterior da tabela usando a viagem no tempo

Recuperar um instantâneo mais antigo de uma tabela Delta usando a viagem no tempo do Delta Lake. Para consultar uma versão específica, use o número de versão ou o carimbo de data/hora da tabela. Por exemplo, consulte a versão 0 ou o timestamp 2026-01-05T23:09:47.000+00:00 do histórico da tabela.

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaHistory = deltaTable.history()

# Query using the version number.
display(deltaHistory.where("version == 0"))

# Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))

Scala (linguagem de programação)

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
val deltaHistory = deltaTable.history()

// Query using the version number.
display(deltaHistory.where("version == 0"))

// Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))

SQL

-- Query using the version number
SELECT * FROM workspace.default.people_10k VERSION AS OF 0;

-- Query using the timestamp
SELECT * FROM workspace.default.people_10k TIMESTAMP AS OF '2026-01-05T23:09:47.000+00:00';

Para timestamps, somente formatos de data ou timestamp são aceitos. Por exemplo, as cadeias de caracteres devem ser formatadas como "2026-01-05T22:43:15.000+00:00" ou "2026-01-05 22:43:15".

Use DataFrameReader para criar um DataFrame de uma tabela Delta fixada em uma versão específica ou timestamp da tabela.

Python

# Query using the version number.
df = spark.read.option('versionAsOf', 0).table("workspace.default.people_10k")

# Query using the timestamp.
df = spark.read.option('timestampAsOf', '2026-01-05T23:09:47.000+00:00').table("workspace.default.people_10k")

display(df)

Scala (linguagem de programação)

// Query using the version number.
val dfVersion = spark.read
  .option("versionAsOf", 0)
  .table("workspace.default.people_10k")

// Query using the timestamp.
val dfTimestamp = spark.read
  .option("timestampAsOf", "2026-01-05T23:09:47.000+00:00")
  .table("workspace.default.people_10k")

display(dfVersion)
display(dfTimestamp)

SQL

-- Create a temporary view from version 0 of the table.
CREATE OR REPLACE TEMPORARY VIEW people_10k_v0 AS
SELECT * FROM workspace.default.people_10k VERSION AS OF 0;

-- Create a temporary view from a previous timestamp of the table.
CREATE OR REPLACE TEMPORARY VIEW people_10k_t0 AS
SELECT * FROM workspace.default.people_10k TIMESTAMP AS OF '2026-01-05T23:09:47.000+00:00';

SELECT * FROM people_10k_v0;
SELECT * FROM people_10k_t0;

Para obter mais informações, consulte Trabalhar com o histórico de tabelas.

Otimizar uma tabela

Várias alterações em uma tabela podem criar vários arquivos pequenos, o que retarda o desempenho da consulta de leitura. Use a operação de otimização para melhorar a velocidade combinando arquivos pequenos em arquivos maiores. Consulte OPTIMIZE.

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()

Scala (linguagem de programação)

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()

SQL

OPTIMIZE workspace.default.people_10k

Observação

Se a otimização preditiva estiver habilitada, você não precisará otimizar manualmente. A otimização preditiva gerencia automaticamente as tarefas de manutenção. Para obter mais informações, consulte Otimização preditiva para tabelas gerenciadas do Catálogo do Unity.

Ordem Z por colunas

Para obter dados de ordem z e melhorar ainda mais o desempenho de leitura, especifique as colunas a serem ordenadas na operação. Por exemplo, agrupar pela coluna firstName de alta cardinalidade. Para mais informações sobre z-ordering, consulte Data Skipping.

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")

Scala (linguagem de programação)

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")

SQL

OPTIMIZE workspace.default.people_10k
ZORDER BY (firstName)

Limpar instantâneos com a operação de vácuo

O Delta Lake tem isolamento de instantâneo para leituras, o que significa que é seguro executar uma operação de otimização enquanto outros usuários ou processos consultam a tabela. No entanto, você deve eventualmente limpar instantâneos antigos porque isso reduz os custos de armazenamento, melhora o desempenho da consulta e garante a conformidade dos dados. Execute a VACUUM operação para limpar instantâneos antigos. Consulte VACUUM.

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()

Scala (linguagem de programação)

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()

SQL

VACUUM workspace.default.people_10k

Para obter mais informações sobre como usar a operação de vácuo efetivamente, consulte Remover arquivos de dados não utilizados com vácuo.

Próximas etapas