Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Questa esercitazione illustra le operazioni comuni sulle tabelle Delta usando dati di esempio. Delta Lake è il livello di archiviazione ottimizzato che fornisce le basi per le tabelle in Databricks. Se non diversamente specificato, tutte le tabelle di Databricks sono tabelle Delta.
Prima di iniziare
Per completare questa esercitazione, è necessario:
- Autorizzazione per usare una risorsa di calcolo esistente o creare una nuova risorsa di calcolo. Vedere Calcolo.
- Autorizzazioni del catalogo Unity:
USE CATALOG,USE SCHEMA, eCREATE TABLEsul catalogoworkspace. Per impostare queste autorizzazioni, contattare l'amministratore di Databricks o i privilegi di Unity Catalog e gli oggetti sicurizzabili.
Questi esempi si basano su un set di dati denominato Synthetic Person Records: da 10K a 10M Record. Questo set di dati contiene record fittizi di persone, inclusi il nome e il cognome, il sesso e l'età.
Prima di tutto, scaricare il set di dati per questa esercitazione.
- Visita la pagina Record di persone sintetiche: da 10.000 a 10 milioni di record su Kaggle.
- Fare clic su Scarica e quindi su Scarica set di dati come zip. Verrà scaricato un file denominato
archive.zipnel computer locale. - Estrarre la
archivecartella dalarchive.zipfile.
Caricare quindi il person_10000.csv set di dati in un volume di Unity Catalog all'interno dell'area di lavoro di Azure Databricks. Azure Databricks consiglia di caricare i dati in un volume di Unity Catalog perché i volumi offrono funzionalità per l'accesso, l'archiviazione, la governance e l'organizzazione dei file.
- Aprire Esplora cataloghi facendo clic
Catalogo nella barra laterale.
- In Esplora cataloghi fare clic
Aggiungi dati e Crea un volume. - Assegnare un nome al volume
my-volumee selezionare Volume gestito come tipo di volume. - Selezionare il
workspacecatalogo e lodefaultschema e quindi fare clic su Crea. - Aprire
my-volumee fare clic su Carica in questo volume. - Trascinare e rilasciare o sfogliare e selezionare il file
person_10000.csvdalla cartellaarchivesul computer locale. - Fare clic su Carica.
Creare infine un notebook per l'esecuzione del codice di esempio.
- Fare clic
Nuovo nella barra laterale. - Fare clic
Notebook per creare un nuovo notebook.
- Scegliere una lingua per il notebook.
Creare una tabella
Creare una nuova tabella gestita del catalogo Unity denominata workspace.default.people_10k da person_10000.csv. Delta Lake è l'impostazione predefinita per tutti i comandi di creazione, lettura e scrittura di tabelle in Azure Databricks.
Pitone
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])
df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/workspace/default/my-volume/person_10000.csv")
# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()
# If you know the table does not already exist, you can use this command instead.
# df.write.saveAsTable("workspace.default.people_10k")
# View the new table.
df = spark.read.table("workspace.default.people_10k")
display(df)
Linguaggio di programmazione Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", IntegerType, true),
StructField("firstName", StringType, true),
StructField("lastName", StringType, true),
StructField("gender", StringType, true),
StructField("age", IntegerType, true)
))
val df = spark.read
.format("csv")
.option("header", true)
.schema(schema)
.load("/Volumes/workspace/default/my-volume/person_10000.csv")
// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()
// If you know the table does not already exist, you can use this command instead.
// df.saveAsTable("workspace.default.people_10k")
// View the new table.
val df2 = spark.read.table("workspace.default.people_10k")
display(df2)
SQL
-- Create the table with only the required columns and rename person_id to id.
CREATE OR REPLACE TABLE workspace.default.people_10k AS
SELECT
person_id AS id,
firstname,
lastname,
gender,
age
FROM read_files(
'/Volumes/workspace/default/my-volume/person_10000.csv',
format => 'csv',
header => true
);
-- View the new table.
SELECT * FROM workspace.default.people_10k;
Esistono diversi modi per creare o clonare tabelle. Per altre informazioni, vedere CREATE TABLE.
In Databricks Runtime 13.3 LTS e versioni successive è possibile usare CREATE TABLE LIKE per creare una nuova tabella Delta vuota che duplica lo schema e le proprietà della tabella di una tabella Delta di origine. Ciò può essere utile quando si promuovono tabelle da un ambiente di sviluppo all'ambiente di produzione.
CREATE TABLE workspace.default.people_10k_prod LIKE workspace.default.people_10k
Importante
Questa funzionalità è disponibile in anteprima pubblica.
Usare l'API DeltaTableBuilder per Python e Scala per creare una tabella vuota. Rispetto a DataFrameWriter e DataFrameWriterV2, l'API DeltaTableBuilder semplifica la specifica di informazioni aggiuntive, ad esempio commenti di colonna, proprietà della tabella e colonne generate.
Pitone
from delta.tables import DeltaTable
(
DeltaTable.createIfNotExists(spark)
.tableName("workspace.default.people_10k_2")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("lastName", "STRING", comment="surname")
.addColumn("gender", "STRING")
.addColumn("age", "INT")
.execute()
)
display(spark.read.table("workspace.default.people_10k_2"))
Linguaggio di programmazione Scala
import io.delta.tables.DeltaTable
DeltaTable.createOrReplace(spark)
.tableName("workspace.default.people_10k")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build()
)
.addColumn("gender", "STRING")
.addColumn("age", "INT")
.execute()
display(spark.read.table("workspace.default.people_10k"))
Upsert in una tabella
Modificare i record esistenti in una tabella o aggiungerne di nuovi usando un'operazione denominata upsert. Per unire un set di aggiornamenti e inserimenti in una tabella Delta esistente, usare il DeltaTable.merge metodo in Python e Scala e l'istruzione MERGE INTO in SQL.
Ad esempio, unire dati dalla tabella people_10k_updates di origine alla tabella workspace.default.people_10kDelta di destinazione . Quando è presente una riga corrispondente in entrambe le tabelle, Delta Lake aggiorna la colonna di dati usando l'espressione specificata. Quando non è presente alcuna riga corrispondente, Delta Lake aggiunge una nuova riga.
Pitone
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from delta.tables import DeltaTable
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])
data = [
(10001, 'Billy', 'Luppitt', 'M', 55),
(10002, 'Mary', 'Smith', 'F', 98),
(10003, 'Elias', 'Leadbetter', 'M', 48),
(10004, 'Jane', 'Doe', 'F', 30),
(10005, 'Joshua', '', 'M', 90),
(10006, 'Ginger', '', 'F', 16),
]
# Create the source table if it does not exist. Otherwise, replace the existing source table.
people_10k_updates = spark.createDataFrame(data, schema)
people_10k_updates.createOrReplaceTempView("people_10k_updates")
# Merge the source and target tables.
deltaTable = DeltaTable.forName(spark, 'workspace.default.people_10k')
(deltaTable.alias("people_10k")
.merge(
people_10k_updates.alias("people_10k_updates"),
"people_10k.id = people_10k_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# View the additions to the table.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] >= 10001)
display(df_filtered)
Linguaggio di programmazione Scala
import org.apache.spark.sql.types._
import io.delta.tables._
// Define schema
val schema = StructType(Array(
StructField("id", IntegerType, true),
StructField("firstName", StringType, true),
StructField("lastName", StringType, true),
StructField("gender", StringType, true),
StructField("age", IntegerType, true)
))
// Create data as Seq of Tuples
val data = Seq(
(10001, "Billy", "Luppitt", "M", 55),
(10002, "Mary", "Smith", "F", 98),
(10003, "Elias", "Leadbetter", "M", 48),
(10004, "Jane", "Doe", "F", 30),
(10005, "Joshua", "", "M", 90),
(10006, "Ginger", "", "F", 16)
)
// Create DataFrame directly from Seq of Tuples
val people_10k_updates = spark.createDataFrame(data).toDF(
"id", "firstName", "lastName", "gender", "age"
)
people_10k_updates.createOrReplaceTempView("people_10k_updates")
// Merge the source and target tables
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.as("people_10k")
.merge(
people_10k_updates.as("people_10k_updates"),
"people_10k.id = people_10k_updates.id"
)
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
// View the additions to the table.
val df = spark.read.table("workspace.default.people_10k")
val df_filtered = df.filter($"id" >= 10001)
display(df_filtered)
SQL
-- Create the source table if it does not exist. Otherwise, replace the existing source table.
CREATE OR REPLACE TABLE workspace.default.people_10k_updates(
id INT,
firstName STRING,
lastName STRING,
gender STRING,
age INT
);
-- Insert new data into the source table.
INSERT INTO workspace.default.people_10k_updates VALUES
(10001, "Billy", "Luppitt", "M", 55),
(10002, "Mary", "Smith", "F", 98),
(10003, "Elias", "Leadbetter", "M", 48),
(10004, "Jane", "Doe", "F", 30),
(10005, "Joshua", "", "M", 90),
(10006, "Ginger", "", "F", 16);
-- Merge the source and target tables.
MERGE INTO workspace.default.people_10k AS people_10k
USING workspace.default.people_10k_updates AS people_10k_updates
ON people_10k.id = people_10k_updates.id
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *;
-- View the additions to the table.
SELECT * FROM workspace.default.people_10k WHERE id >= 10001
In SQL l'operatore * aggiorna o inserisce tutte le colonne nella tabella di destinazione, presupponendo che la tabella di origine abbia le stesse colonne della tabella di destinazione. Se la tabella di destinazione non ha le stesse colonne, la query genera un errore di analisi. Inoltre, è necessario specificare un valore per ogni colonna della tabella quando si esegue un'operazione di inserimento. I valori di colonna possono essere vuoti, ad esempio ''. Quando si esegue un'operazione di inserimento, non è necessario aggiornare tutti i valori.
Leggere una tabella
Usare il nome o il percorso della tabella per accedere ai dati nelle tabelle Delta. Per accedere alle tabelle gestite di Unity Catalog, usare un nome di tabella completo. L'accesso basato sul percorso è supportato solo per i volumi e le tabelle esterne, non per le tabelle gestite. Per altre informazioni, vedere Regole di percorso e accesso nei volumi del catalogo Unity.
Pitone
people_df = spark.read.table("workspace.default.people_10k")
display(people_df)
Linguaggio di programmazione Scala
val people_df = spark.read.table("workspace.default.people_10k")
display(people_df)
SQL
SELECT * FROM workspace.default.people_10k;
Scrivere in una tabella
Delta Lake usa la sintassi standard per la scrittura di dati nelle tabelle. Per aggiungere nuovi dati a una tabella Delta esistente, utilizzare la modalità di accodamento. A differenza dell'upserting, la scrittura in una tabella non verifica la presenza di record duplicati.
Pitone
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])
data = [
(10007, 'Miku', 'Hatsune', 'F', 25)
]
# Create the new data.
df = spark.createDataFrame(data, schema)
# Append the new data to the target table.
df.write.mode("append").saveAsTable("workspace.default.people_10k")
# View the new addition.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] == 10007)
display(df_filtered)
Linguaggio di programmazione Scala
// Create the new data.
val data = Seq(
(10007, "Miku", "Hatsune", "F", 25)
)
val df = spark.createDataFrame(data)
.toDF("id", "firstName", "lastName", "gender", "age")
// Append the new data to the target table
df.write.mode("append").saveAsTable("workspace.default.people_10k")
// View the new addition.
val df2 = spark.read.table("workspace.default.people_10k")
val df_filtered = df2.filter($"id" === 10007)
display(df_filtered)
SQL
CREATE OR REPLACE TABLE workspace.default.people_10k_new (
id INT,
firstName STRING,
lastName STRING,
gender STRING,
age INT
);
-- Insert the new data.
INSERT INTO workspace.default.people_10k_new VALUES
(10007, 'Miku', 'Hatsune', 'F', 25);
-- Append the new data to the target table.
INSERT INTO workspace.default.people_10k
SELECT * FROM workspace.default.people_10k_new;
-- View the new addition.
SELECT * FROM workspace.default.people_10k WHERE id = 10007;
Gli output delle celle del notebook di Databricks visualizzano un massimo di 10.000 righe o 2 MB, a seconda del valore inferiore. Poiché workspace.default.people_10k contiene più di 10.000 righe, solo le prime 10.000 righe vengono visualizzate nell'output del notebook per display(df). Le righe aggiuntive sono presenti nella tabella, ma non vengono visualizzate nell'output del notebook a causa di questo limite. È possibile visualizzare le righe aggiuntive filtrandole in modo specifico.
Per sostituire tutti i dati in una tabella, usare la modalità di sovrascrittura.
Pitone
df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")
Linguaggio di programmazione Scala
df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")
SQL
INSERT OVERWRITE TABLE workspace.default.people_10k SELECT * FROM workspace.default.people_10k_2
Aggiornare una tabella
Aggiornare i dati in una tabella Delta in base a un predicato. Ad esempio, modificare i valori nella gender colonna da Female a F, da Male a Me da Other a O.
Pitone
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
# Declare the predicate and update rows using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'Female'",
set = { "gender": "'F'" }
)
# Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'Male',
set = { 'gender': lit('M') }
)
deltaTable.update(
condition = col('gender') == 'Other',
set = { 'gender': lit('O') }
)
# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)
Linguaggio di programmazione Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
// Declare the predicate and update rows using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'Female'",
Map("gender" -> "'F'")
)
// Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
col("gender") === "Male",
Map("gender" -> lit("M")));
deltaTable.update(
col("gender") === "Other",
Map("gender" -> lit("O")));
// View the updated table.
val df = spark.read.table("workspace.default.people_10k")
display(df)
SQL
-- Declare the predicate and update rows.
UPDATE workspace.default.people_10k SET gender = 'F' WHERE gender = 'Female';
UPDATE workspace.default.people_10k SET gender = 'M' WHERE gender = 'Male';
UPDATE workspace.default.people_10k SET gender = 'O' WHERE gender = 'Other';
-- View the updated table.
SELECT * FROM workspace.default.people_10k;
Eliminare da una tabella
Rimuovere i dati che corrispondono a un predicato da una tabella Delta. Ad esempio, il codice seguente illustra due operazioni di eliminazione: prima eliminando le righe in cui l'età è inferiore a 18, quindi eliminando le righe in cui l'età è minore di 21.
Pitone
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
# Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")
# Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col('age') < '21')
# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)
Linguaggio di programmazione Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
// Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")
// Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col("age") < "21")
// View the updated table.
val df = spark.read.table("workspace.default.people_10k")
display(df)
SQL
-- Delete rows using a predicate.
DELETE FROM workspace.default.people_10k WHERE age < '21';
-- View the updated table.
SELECT * FROM workspace.default.people_10k;
Importante
L'eliminazione rimuove i dati dalla versione più recente della tabella Delta, ma non lo rimuove dall'archiviazione fisica fino a quando le versioni precedenti non vengono cancellate in modo esplicito. Per altre informazioni, vedere vacuum.
Visualizzare la cronologia delle tabelle
Usare il DeltaTable.history metodo in Python e Scala e l'istruzione DESCRIBE HISTORY in SQL per visualizzare le informazioni sulla provenienza per ogni scrittura in una tabella.
Pitone
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())
Linguaggio di programmazione Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())
SQL
DESCRIBE HISTORY workspace.default.people_10k
Eseguire una query su una versione precedente della tabella usando il tempo di spostamento
Eseguire una query su uno snapshot precedente di una tabella Delta usando il viaggio nel tempo in Delta Lake. Per eseguire una query su una versione specifica, usare il numero di versione o il timestamp della tabella. Ad esempio, la versione 0 della query o il timestamp 2026-01-05T23:09:47.000+00:00 dalla cronologia della tabella.
Pitone
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaHistory = deltaTable.history()
# Query using the version number.
display(deltaHistory.where("version == 0"))
# Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))
Linguaggio di programmazione Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
val deltaHistory = deltaTable.history()
// Query using the version number.
display(deltaHistory.where("version == 0"))
// Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))
SQL
-- Query using the version number
SELECT * FROM workspace.default.people_10k VERSION AS OF 0;
-- Query using the timestamp
SELECT * FROM workspace.default.people_10k TIMESTAMP AS OF '2026-01-05T23:09:47.000+00:00';
Per i timestamp, vengono accettate solo stringhe di data o timestamp. Ad esempio, le stringhe devono essere formattate come "2026-01-05T22:43:15.000+00:00" o "2026-01-05 22:43:15".
Utilizzare DataFrameReader le opzioni per creare un DataFrame da una tabella Delta vincolata a una versione o a un timestamp specifico della tabella.
Pitone
# Query using the version number.
df = spark.read.option('versionAsOf', 0).table("workspace.default.people_10k")
# Query using the timestamp.
df = spark.read.option('timestampAsOf', '2026-01-05T23:09:47.000+00:00').table("workspace.default.people_10k")
display(df)
Linguaggio di programmazione Scala
// Query using the version number.
val dfVersion = spark.read
.option("versionAsOf", 0)
.table("workspace.default.people_10k")
// Query using the timestamp.
val dfTimestamp = spark.read
.option("timestampAsOf", "2026-01-05T23:09:47.000+00:00")
.table("workspace.default.people_10k")
display(dfVersion)
display(dfTimestamp)
SQL
-- Create a temporary view from version 0 of the table.
CREATE OR REPLACE TEMPORARY VIEW people_10k_v0 AS
SELECT * FROM workspace.default.people_10k VERSION AS OF 0;
-- Create a temporary view from a previous timestamp of the table.
CREATE OR REPLACE TEMPORARY VIEW people_10k_t0 AS
SELECT * FROM workspace.default.people_10k TIMESTAMP AS OF '2026-01-05T23:09:47.000+00:00';
SELECT * FROM people_10k_v0;
SELECT * FROM people_10k_t0;
Per altre informazioni, vedere Usare la cronologia delle tabelle.
Ottimizzare una tabella
Più modifiche apportate a una tabella possono creare diversi file di piccole dimensioni, rallentando le prestazioni delle query di lettura. Usare l'operazione di ottimizzazione per migliorare la velocità combinando file di piccole dimensioni in quelli più grandi. Vedi OPTIMIZE.
Pitone
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()
Linguaggio di programmazione Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()
SQL
OPTIMIZE workspace.default.people_10k
Nota
Se l'ottimizzazione predittiva è abilitata, non è necessario ottimizzare manualmente. L'ottimizzazione predittiva gestisce automaticamente le attività di manutenzione. Per altre informazioni, vedere Ottimizzazione predittiva per le tabelle gestite del catalogo Unity.
Ordine Z per colonne
Per ordinare i dati tramite Z-order e migliorare ulteriormente le prestazioni di lettura, specificare le colonne su cui ordinare nell'operazione. Ad esempio, posizionare in base alla colonna di cardinalità elevata firstName. Per ulteriori informazioni sull'ordinamento per livelli (z-ordering), vedere Salto dei dati.
Pitone
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")
Linguaggio di programmazione Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")
SQL
OPTIMIZE workspace.default.people_10k
ZORDER BY (firstName)
Pulire gli snapshot con l'operazione di vacuum
Delta Lake ha l'isolamento dello snapshot per le letture, il che significa che è sicuro eseguire un'operazione di ottimizzazione mentre altri utenti o processi eseguono query sulla tabella. Tuttavia, è consigliabile pulire gli snapshot precedenti perché ciò riduce i costi di archiviazione, migliora le prestazioni delle query e garantisce la conformità dei dati. Eseguire l'operazione VACUUM per pulire gli snapshot precedenti. Vedi VACUUM.
Pitone
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()
Linguaggio di programmazione Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()
SQL
VACUUM workspace.default.people_10k
Per altre informazioni sull'uso efficace dell'operazione vacuum, vedere Rimuovere i file di dati inutilizzati con vuoto.