Esercitazione: Delta Lake
Questa esercitazione introduce le operazioni Delta Lake comuni in Azure Databricks, incluse le seguenti:
- Creare una tabella.
- Upsert in una tabella.
- Leggere da una tabella.
- Visualizzare la cronologia di una tabella.
- Eseguire una query in una versione precedente di una tabella.
- Ottimizzare una tabella.
- Aggiungere un indice di ordine Z.
- Rimuovere file senza riferimenti.
È possibile eseguire l'esempio di codice Python, Scala e SQL in questo articolo da un notebook collegato a una risorsa di calcolo di Azure Databricks, ad esempio un cluster. È anche possibile eseguire il codice SQL in questo articolo dall'interno di una query associata a sql warehouse in Databricks SQL.
Preparare i dati di origine
Questa esercitazione si basa su un set di dati denominato People 10 M. Contiene 10 milioni di record fittizi che contengono fatti su persone, come nome e cognome, data di nascita e stipendio. Questa esercitazione presuppone che questo set di dati si trova in un volume di Catalogo Unity associato all'area di lavoro di Azure Databricks di destinazione.
Per ottenere il set di dati People 10 M per questa esercitazione, eseguire le operazioni seguenti:
- Vai alla pagina Persone 10 M in Kaggle.
- Fare clic su Scarica per scaricare un file denominato
archive.zip
nel computer locale. - Estrarre il file denominato
export.csv
dalarchive.zip
file. Ilexport.csv
file contiene i dati per questa esercitazione.
Per caricare il export.csv
file nel volume, eseguire le operazioni seguenti:
- Sulla barra laterale fare clic su Catalogo.
- In Esplora cataloghi passare e aprire il volume in cui si vuole caricare il
export.csv
file. - Fare clic su Carica in questo volume.
- Trascinare e rilasciare o passare a e selezionare il file nel
export.csv
computer locale. - Fare clic su Carica.
Negli esempi di codice seguenti sostituire /Volumes/main/default/my-volume/export.csv
con il percorso del export.csv
file nel volume di destinazione.
Creare una tabella
Per impostazione predefinita, tutte le tabelle create in Azure Databricks usano Delta Lake. Databricks consiglia di usare le tabelle gestite di Unity Catalog.
Nell'esempio di codice precedente e negli esempi di codice seguenti sostituire il nome main.default.people_10m
della tabella con il catalogo, lo schema e il nome della tabella di destinazione nel catalogo unity.
Nota
Delta Lake è l'impostazione predefinita per tutti i comandi di lettura, scrittura e creazione di tabelle in Azure Databricks.
Python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", TimestampType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")
# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")
// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")
SQL
CREATE OR REPLACE TABLE main.default.people_10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
);
COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );
Le operazioni precedenti creano una nuova tabella gestita. Per informazioni sulle opzioni disponibili quando si crea una tabella Delta, vedere CREATE TABLE.
In Databricks Runtime 13.3 LTS e versioni successive è possibile usare CREATE TABLE LIKE per creare una nuova tabella Delta vuota che duplica lo schema e le proprietà della tabella per una tabella Delta di origine. Ciò può essere particolarmente utile quando si promuovono tabelle da un ambiente di sviluppo all'ambiente di produzione, come illustrato nell'esempio di codice seguente:
CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m
Per creare una tabella vuota, è anche possibile usare l'API DeltaTableBuilder
in Delta Lake per Python e Scala. Rispetto alle API DataFrameWriter equivalenti, queste API semplificano la specifica di informazioni aggiuntive, ad esempio commenti di colonna, proprietà di tabella e colonne generate.
Importante
Questa funzionalità è disponibile in anteprima pubblica.
Python
DeltaTable.createIfNotExists(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
Scala
DeltaTable.createOrReplace(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
Upsert in una tabella
Per unire un set di aggiornamenti e inserimenti in una tabella Delta esistente, usare il DeltaTable.merge
metodo per Python e Scala e l'istruzione MERGE INTO per SQL. L'esempio seguente, ad esempio, accetta i dati dalla tabella di origine e lo unisce alla tabella Delta di destinazione. Quando è presente una riga corrispondente in entrambe le tabelle, Delta Lake aggiorna la colonna di dati usando l'espressione specificata. Quando non è presente alcuna riga corrispondente, Delta Lake aggiunge una nuova riga. Questa operazione è nota come upsert.
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", DateType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
data = [
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]
people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")
# ...
from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')
(deltaTable.alias("people_10m")
.merge(
people_10m_updates.alias("people_10m_updates"),
"people_10m.id = people_10m_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
Scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val data = Seq(
Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)
val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")
// ...
import io.delta.tables.DeltaTable
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.as("people_10m")
.merge(
people_10m_updates.as("people_10m_updates"),
"people_10m.id = people_10m_updates.id"
)
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
SQL
CREATE OR REPLACE TEMP VIEW people_10m_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
In SQL, se si specifica *
, questo aggiorna o inserisce tutte le colonne nella tabella di destinazione, presupponendo che la tabella di origine abbia le stesse colonne della tabella di destinazione. Se la tabella di destinazione non ha le stesse colonne, la query genera un errore di analisi.
È necessario specificare un valore per ogni colonna della tabella quando si esegue un'operazione di inserimento, ad esempio quando non è presente alcuna riga corrispondente nel set di dati esistente. Non è tuttavia necessario aggiornare tutti i valori.
Per visualizzare i risultati, eseguire una query sulla tabella.
Python
df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)
Scala
val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)
SQL
SELECT * FROM main.default.people_10m WHERE id >= 9999998
Leggere una tabella
È possibile accedere ai dati nelle tabelle Delta in base al nome della tabella o al percorso della tabella, come illustrato negli esempi seguenti:
Python
people_df = spark.read.table("main.default.people_10m")
display(people_df)
Scala
val people_df = spark.read.table("main.default.people_10m")
display(people_df)
SQL
SELECT * FROM main.default.people_10m;
Scrivere in una tabella
Delta Lake usa la sintassi standard per la scrittura di dati nelle tabelle.
Per aggiungere in modo atomico nuovi dati a una tabella Delta esistente, usare la modalità di accodamento come illustrato negli esempi seguenti:
Python
df.write.mode("append").saveAsTable("main.default.people_10m")
Scala
df.write.mode("append").saveAsTable("main.default.people_10m")
SQL
INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people
Per sostituire tutti i dati in una tabella, usare la modalità di sovrascrittura come negli esempi seguenti:
Python
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
Scala
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
SQL
INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people
Aggiornare una tabella
È possibile aggiornare i dati corrispondenti a un predicato in una tabella Delta. Nella tabella di esempio people_10m
, ad esempio, per modificare un'abbreviazione nella gender
colonna da M
o F
a Male
o Female
, è possibile eseguire quanto segue:
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
)
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
SQL
UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';
Eliminare da una tabella
È possibile rimuovere dati corrispondenti a un predicato da una tabella Delta. Ad esempio, nella tabella di esempio people_10m
, per eliminare tutte le righe corrispondenti alle persone con un valore nella birthDate
colonna da prima 1955
di , è possibile eseguire quanto segue:
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
SQL
DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'
Importante
L'eliminazione rimuove i dati dalla versione più recente della tabella Delta, ma non lo rimuove dallo spazio di archiviazione fisico fino a quando le versioni precedenti non vengono cancellate in modo esplicito. Per informazioni dettagliate, vedere vacuum .
Visualizzare la cronologia delle tabelle
Per visualizzare la cronologia di una tabella, usare il DeltaTable.history
metodo per Python e Scala e l'istruzione DESCRIBE HISTORY in SQL, che fornisce informazioni sulla provenienza, tra cui la versione della tabella, l'operazione, l'utente e così via, per ogni scrittura in una tabella.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
SQL
DESCRIBE HISTORY main.default.people_10m
Eseguire una query su una versione precedente della tabella (tempo di spostamento)
Il viaggio in tempo delta Lake consente di eseguire query su uno snapshot precedente di una tabella Delta.
Per eseguire una query su una versione precedente di una tabella, specificare la versione o il timestamp della tabella. Ad esempio, per eseguire una query sulla versione 0 o sul timestamp 2024-05-15T22:43:15.000+00:00Z
della cronologia precedente, usare quanto segue:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
Per i timestamp, vengono accettate solo stringhe di data o timestamp, ad esempio "2024-05-15T22:43:15.000+00:00"
o "2024-05-15 22:43:15"
.
Le opzioni DataFrameReader consentono di creare un dataframe da una tabella Delta fissa a una versione o un timestamp specifico della tabella, ad esempio:
Python
df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")
display(df)
Scala
val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")
display(df)
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'
Per informazioni dettagliate, vedere Usare la cronologia delle tabelle Delta Lake.
Ottimizzare una tabella
Dopo aver eseguito più modifiche a una tabella, potrebbero essere presenti molti file di piccole dimensioni. Per migliorare la velocità delle query di lettura, è possibile usare l'operazione di ottimizzazione per comprimere file di piccole dimensioni in file di dimensioni maggiori:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
SQL
OPTIMIZE main.default.people_10m
Ordine Z per colonne
Per migliorare ulteriormente le prestazioni di lettura, è possibile collocare le informazioni correlate nello stesso set di file ordinando z. Gli algoritmi di data-skipping di Delta Lake usano questa collocazione per ridurre drasticamente la quantità di dati che devono essere letti. Per ordinare i dati z, specificare le colonne da ordinare nell'operazione z order by. Ad esempio, per collocare da gender
, eseguire:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
SQL
OPTIMIZE main.default.people_10m
ZORDER BY (gender)
Per il set completo di opzioni disponibili durante l'esecuzione dell'operazione di ottimizzazione, vedere Ottimizzare il layout del file di dati.
Pulire gli snapshot con VACUUM
Delta Lake offre l'isolamento dello snapshot per le letture, il che significa che è sicuro eseguire un'operazione di ottimizzazione anche se altri utenti o processi eseguono query sulla tabella. Alla fine, tuttavia, è necessario pulire gli snapshot precedenti. A tale scopo, eseguire l'operazione vacuum:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
SQL
VACUUM main.default.people_10m
Per informazioni dettagliate sull'uso efficace dell'operazione vacuum, vedere Rimuovere i file di dati inutilizzati con vuoto.