Esercitazione: Delta Lake

Questa esercitazione introduce le operazioni Delta Lake comuni in Azure Databricks, incluse le seguenti:

È possibile eseguire l'esempio di codice Python, R, Scala e SQL in questo articolo da un notebook collegato a un cluster di Azure Databricks. È anche possibile eseguire il codice SQL in questo articolo dall'interno di una query associata a sql warehouse in Databricks SQL.

Nota

Alcuni degli esempi di codice seguenti usano una notazione dello spazio dei nomi a due livelli costituita da uno schema (detto anche database) e da una tabella o una vista (ad esempio, default.people10m). Per usare questi esempi con Unity Catalog, sostituire lo spazio dei nomi a due livelli con la notazione dello spazio dei nomi a tre livelli del catalogo Unity costituita da un catalogo, uno schema e una tabella o una vista (ad esempio, main.default.people10m).

Creare una tabella

Per impostazione predefinita, tutte le tabelle create in Azure Databricks usano Delta Lake.

Nota

Delta Lake è l'impostazione predefinita per tutti i comandi di lettura, scrittura e creazione di tabelle in Azure Databricks.

Python

# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)

R

library(SparkR)
sparkR.session()

# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"

saveAsTable(
  df = df,
  tableName = table_name
)

Scala

// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

// Write the data to a table.
val table_name = "people_10m"

people.write.saveAsTable("people_10m")

SQL

DROP TABLE IF EXISTS people_10m;

CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;

Le operazioni precedenti creano una nuova tabella gestita usando lo schema dedotto dai dati. Per informazioni sulle opzioni disponibili quando si crea una tabella Delta, vedere CREATE TABLE.

Per le tabelle gestite, Azure Databricks determina la posizione dei dati. Per ottenere il percorso, è possibile usare l'istruzione DESCRIBE DETAIL , ad esempio:

Python

display(spark.sql('DESCRIBE DETAIL people_10m'))

R

display(sql("DESCRIBE DETAIL people_10m"))

Scala

display(spark.sql("DESCRIBE DETAIL people_10m"))

SQL

DESCRIBE DETAIL people_10m;

In alcuni casi può essere necessario creare una tabella specificando lo schema prima di inserire i dati. È possibile completare questa operazione con i comandi SQL seguenti:

CREATE TABLE IF NOT EXISTS people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

CREATE OR REPLACE TABLE people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

In Databricks Runtime 13.3 LTS e versioni successive è possibile usare CREATE TABLE LIKE per creare una nuova tabella Delta vuota che duplica lo schema e le proprietà della tabella per una tabella Delta di origine. Ciò può essere particolarmente utile quando si promuovono tabelle da un ambiente di sviluppo all'ambiente di produzione, ad esempio nell'esempio di codice seguente:

CREATE TABLE prod.people10m LIKE dev.people10m

È anche possibile usare l'API DeltaTableBuilder in Delta Lake per creare tabelle. Rispetto alle API DataFrameWriter, questa API semplifica la specifica di informazioni aggiuntive, ad esempio commenti di colonna, proprietà di tabella e colonne generate.

Importante

Questa funzionalità è disponibile in anteprima pubblica.

Python

# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
  .tableName("default.people10m") \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .execute()

# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .property("description", "table with people data") \
  .location("/tmp/delta/people10m") \
  .execute()

Scala

// Create table in the metastore
DeltaTable.createOrReplace(spark)
  .tableName("default.people10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .property("description", "table with people data")
  .location("/tmp/delta/people10m")
  .execute()

Upsert in una tabella

Per unire un set di aggiornamenti e inserimenti in una tabella Delta esistente, utilizzare l'istruzione MERGE INTO . Ad esempio, l'istruzione seguente accetta i dati dalla tabella di origine e lo unisce alla tabella Delta di destinazione. Quando è presente una riga corrispondente in entrambe le tabelle, Delta Lake aggiorna la colonna di dati usando l'espressione specificata. Quando non è presente alcuna riga corrispondente, Delta Lake aggiunge una nuova riga. Questa operazione è nota come upsert.

CREATE OR REPLACE TEMP VIEW people_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

Se si specifica *, questo aggiorna o inserisce tutte le colonne nella tabella di destinazione. Ciò presuppone che la tabella di origine abbia le stesse colonne della tabella di destinazione. In caso contrario, la query genererà un errore di analisi.

È necessario specificare un valore per ogni colonna della tabella quando si esegue un'operazione INSERT , ad esempio quando non è presente alcuna riga corrispondente nel set di dati esistente. Non è tuttavia necessario aggiornare tutti i valori.

Per visualizzare i risultati, eseguire una query sulla tabella.

SELECT * FROM people_10m WHERE id >= 9999998

Leggere una tabella

È possibile accedere ai dati nelle tabelle Delta in base al nome della tabella o al percorso della tabella, come illustrato negli esempi seguenti:

Python

people_df = spark.read.table(table_name)

display(people_df)

## or

people_df = spark.read.load(table_path)

display(people_df)

R

people_df = tableToDF(table_name)

display(people_df)

Scala

val people_df = spark.read.table(table_name)

display(people_df)

\\ or

val people_df = spark.read.load(table_path)

display(people_df)

SQL

SELECT * FROM people_10m;

SELECT * FROM delta.`<path-to-table`;

Scrivere in una tabella

Delta Lake usa la sintassi standard per la scrittura di dati nelle tabelle.

Per aggiungere in modo atomico nuovi dati a una tabella Delta esistente, usare la append modalità come negli esempi seguenti:

SQL

INSERT INTO people10m SELECT * FROM more_people

Python

df.write.mode("append").saveAsTable("people10m")

Scala

df.write.mode("append").saveAsTable("people10m")

Per sostituire in modo atomico tutti i dati in una tabella, usare overwrite la modalità come negli esempi seguenti:

SQL

INSERT OVERWRITE TABLE people10m SELECT * FROM more_people

Python

df.write.mode("overwrite").saveAsTable("people10m")

Scala

df.write.mode("overwrite").saveAsTable("people10m")

Aggiornare una tabella

È possibile aggiornare i dati corrispondenti a un predicato in una tabella Delta. Ad esempio, in una tabella denominata people10m o in un percorso in /tmp/delta/people-10mper modificare un'abbreviazione nella gender colonna da M o F a Male o Female, è possibile eseguire quanto segue:

SQL

UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';

UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

Eliminare da una tabella

È possibile rimuovere dati corrispondenti a un predicato da una tabella Delta. Ad esempio, in una tabella denominata people10m o in un percorso in /tmp/delta/people-10m, per eliminare tutte le righe corrispondenti alle persone con un valore nella birthDate colonna da prima 1955di , è possibile eseguire quanto segue:

SQL

DELETE FROM people10m WHERE birthDate < '1955-01-01'

DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

Importante

delete rimuove i dati dalla versione più recente della tabella Delta, ma non lo rimuove dallo spazio di archiviazione fisico fino a quando le versioni precedenti non vengono esplicitamente vuoto. Per informazioni dettagliate, vedere vacuum .

Visualizzare la cronologia delle tabelle

Per visualizzare la cronologia di una tabella, utilizzare l'istruzione DESCRIBE HISTORY , che fornisce informazioni sulla provenienza, tra cui la versione della tabella, l'operazione, l'utente e così via, per ogni scrittura in una tabella.

DESCRIBE HISTORY people_10m

Eseguire una query su una versione precedente della tabella (tempo di spostamento)

Il viaggio in tempo delta Lake consente di eseguire query su uno snapshot precedente di una tabella Delta.

Per eseguire una query su una versione precedente di una tabella, specificare una versione o un timestamp in un'istruzione SELECT . Ad esempio, per eseguire una query sulla versione 0 della cronologia precedente, usare:

SELECT * FROM people_10m VERSION AS OF 0

or

SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

Per i timestamp, vengono accettate solo stringhe di data o timestamp, ad esempio "2019-01-01" e "2019-01-01'T'00:00:00.000Z".

Le opzioni DataFrameReader consentono di creare un dataframe da una tabella Delta fissa a una versione specifica della tabella, ad esempio in Python:

df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")

display(df1)

oppure, in alternativa:

df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")

display(df2)

Per informazioni dettagliate, vedere Usare la cronologia delle tabelle Delta Lake.

Ottimizzare una tabella

Dopo aver eseguito più modifiche a una tabella, potrebbero essere presenti molti file di piccole dimensioni. Per migliorare la velocità delle query di lettura, è possibile usare OPTIMIZE per comprimere file di piccole dimensioni in file di dimensioni maggiori:

OPTIMIZE people_10m

Ordine Z per colonne

Per migliorare ulteriormente le prestazioni di lettura, è possibile condividere le informazioni correlate nello stesso set di file tramite Z-Ordering. Questa co-località viene usata automaticamente dagli algoritmi di data skipping di Delta Lake per ridurre drasticamente la quantità di dati da leggere. Per i dati dell'ordine Z, specificare le colonne da ordinare nella ZORDER BY clausola . Ad esempio, per co-individuare in genderbase a , eseguire:

OPTIMIZE people_10m
ZORDER BY (gender)

Per il set completo di opzioni disponibili durante l'esecuzione OPTIMIZEdi , vedere Compattare i file di dati con Optimize in Delta Lake.

Pulire gli snapshot con VACUUM

Delta Lake offre l'isolamento dello snapshot per le letture, il che significa che è possibile eseguire OPTIMIZE in modo sicuro anche se altri utenti o processi eseguono query sulla tabella. Alla fine, tuttavia, è necessario pulire gli snapshot precedenti. A tale scopo, eseguire il VACUUM comando :

VACUUM people_10m

Per informazioni dettagliate sull'uso VACUUM efficace, vedere Rimuovere i file di dati inutilizzati con vacuum.