Självstudie: Delta Lake

Den här självstudien beskriver vanliga Delta Lake-åtgärder i Azure Databricks, inklusive följande:

Du kan köra python-, R-, Scala- och SQL-exempelkoden i den här artikeln från en notebook-fil som är kopplad till ett Azure Databricks-kluster. Du kan också köra SQL-koden i den här artikeln från en fråga som är associerad med ett SQL-lager i Databricks SQL.

Kommentar

Några av följande kodexempel använder en namnområdes notation på två nivåer som består av ett schema (kallas även en databas) och en tabell eller vy (till exempel default.people10m). Om du vill använda de här exemplen med Unity Catalog ersätter du namnområdet på två nivåer med en namnområdes notation på tre nivåer i Unity Catalog som består av en katalog, ett schema och en tabell eller vy (till exempel main.default.people10m).

Skapa en tabell

Alla tabeller som skapats i Azure Databricks använder Delta Lake som standard.

Kommentar

Delta Lake är standardvärdet för alla kommandon för läsning, skrivningar och tabellskapande i Azure Databricks.

Python

# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)

R

library(SparkR)
sparkR.session()

# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"

saveAsTable(
  df = df,
  tableName = table_name
)

Scala

// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

// Write the data to a table.
val table_name = "people_10m"

people.write.saveAsTable("people_10m")

SQL

DROP TABLE IF EXISTS people_10m;

CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;

Föregående åtgärder skapar en ny hanterad tabell med hjälp av schemat som härleddes från data. Information om tillgängliga alternativ när du skapar en Delta-tabell finns i SKAPA TABELL.

För hanterade tabeller avgör Azure Databricks platsen för data. Om du vill hämta platsen kan du använda instruktionen DESCRIBE DETAIL , till exempel:

Python

display(spark.sql('DESCRIBE DETAIL people_10m'))

R

display(sql("DESCRIBE DETAIL people_10m"))

Scala

display(spark.sql("DESCRIBE DETAIL people_10m"))

SQL

DESCRIBE DETAIL people_10m;

Ibland kanske du vill skapa en tabell genom att ange schemat innan du infogar data. Du kan slutföra detta med följande SQL-kommandon:

CREATE TABLE IF NOT EXISTS people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

CREATE OR REPLACE TABLE people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

I Databricks Runtime 13.3 LTS och senare kan du använda CREATE TABLE LIKE för att skapa en ny tom Delta-tabell som duplicerar schema- och tabellegenskaperna för en Delta-källtabell. Detta kan vara särskilt användbart när du befordrar tabeller från en utvecklingsmiljö till produktion, till exempel i följande kodexempel:

CREATE TABLE prod.people10m LIKE dev.people10m

Du kan också använda API:et DeltaTableBuilder i Delta Lake för att skapa tabeller. Jämfört med Api:erna för DataFrameWriter gör det här API:et det enklare att ange ytterligare information som kolumnkommenterar, tabellegenskaper och genererade kolumner.

Viktigt!

Den här funktionen finns som allmänt tillgänglig förhandsversion.

Python

# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
  .tableName("default.people10m") \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .execute()

# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .property("description", "table with people data") \
  .location("/tmp/delta/people10m") \
  .execute()

Scala

// Create table in the metastore
DeltaTable.createOrReplace(spark)
  .tableName("default.people10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .property("description", "table with people data")
  .location("/tmp/delta/people10m")
  .execute()

Upsert till en tabell

Om du vill sammanfoga en uppsättning uppdateringar och infogningar i en befintlig Delta-tabell använder du MERGE INTO-instruktionen. Följande instruktion tar till exempel data från källtabellen och sammanfogar dem till deltatabellen för mål. När det finns en matchande rad i båda tabellerna uppdaterar Delta Lake datakolumnen med det angivna uttrycket. När det inte finns någon matchande rad lägger Delta Lake till en ny rad. Den här åtgärden kallas för en upsert.

CREATE OR REPLACE TEMP VIEW people_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

Om du anger *uppdateras eller infogas alla kolumner i måltabellen. Detta förutsätter att källtabellen har samma kolumner som de i måltabellen, annars utlöser frågan ett analysfel.

Du måste ange ett värde för varje kolumn i tabellen när du utför en INSERT åtgärd (till exempel när det inte finns någon matchande rad i den befintliga datamängden). Du behöver dock inte uppdatera alla värden.

Om du vill se resultatet frågar du tabellen.

SELECT * FROM people_10m WHERE id >= 9999998

Läsa en tabell

Du får åtkomst till data i Delta-tabeller efter tabellnamnet eller tabellsökvägen, enligt följande exempel:

Python

people_df = spark.read.table(table_name)

display(people_df)

## or

people_df = spark.read.load(table_path)

display(people_df)

R

people_df = tableToDF(table_name)

display(people_df)

Scala

val people_df = spark.read.table(table_name)

display(people_df)

\\ or

val people_df = spark.read.load(table_path)

display(people_df)

SQL

SELECT * FROM people_10m;

SELECT * FROM delta.`<path-to-table`;

Skriva till en tabell

Delta Lake använder standardsyntax för att skriva data till tabeller.

Om du vill lägga till nya data atomiskt i en befintlig Delta-tabell använder du append läget som i följande exempel:

SQL

INSERT INTO people10m SELECT * FROM more_people

Python

df.write.mode("append").saveAsTable("people10m")

Scala

df.write.mode("append").saveAsTable("people10m")

Om du vill ersätta alla data i en tabell atomiskt använder du overwrite läget som i följande exempel:

SQL

INSERT OVERWRITE TABLE people10m SELECT * FROM more_people

Python

df.write.mode("overwrite").saveAsTable("people10m")

Scala

df.write.mode("overwrite").saveAsTable("people10m")

Uppdatera en tabell

Du kan uppdatera data som matchar ett predikat i en Delta-tabell. Om du till exempel vill ändra en förkortning i gender kolumnen från M eller F till Male eller Femalei en tabell med namnet people10m eller en sökväg på /tmp/delta/people-10mkan du köra följande:

SQL

UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';

UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

Ta bort från en tabell

Du kan ta bort data som matchar ett predikat från en Delta-tabell. Du kan till exempel köra följande i en tabell med namnet people10m eller en sökväg på /tmp/delta/people-10m, för att ta bort alla rader som motsvarar personer med ett värde i birthDate kolumnen från före 1955:

SQL

DELETE FROM people10m WHERE birthDate < '1955-01-01'

DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

Viktigt!

delete tar bort data från den senaste versionen av Delta-tabellen men tar inte bort dem från den fysiska lagringen förrän de gamla versionerna uttryckligen har dammsugats. Mer information finns i vakuum .

Visa tabellhistorik

Om du vill visa historiken för en tabell använder du INSTRUKTIONEN BESKRIVA HISTORIA , som innehåller härkomstinformation, inklusive tabellversion, åtgärd, användare och så vidare, för varje skrivning till en tabell.

DESCRIBE HISTORY people_10m

Fråga en tidigare version av tabellen (tidsresa)

Med Delta Lake-tidsresor kan du fråga en äldre ögonblicksbild av en Delta-tabell.

Om du vill köra frågor mot en äldre version av en tabell anger du en version eller tidsstämpel i en SELECT instruktion. Om du till exempel vill fråga version 0 från historiken ovan använder du:

SELECT * FROM people_10m VERSION AS OF 0

eller

SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

För tidsstämplar accepteras endast datum- eller tidsstämpelsträngar, till exempel "2019-01-01" och "2019-01-01'T'00:00:00.000Z".

Med DataFrameReader-alternativ kan du skapa en DataFrame från en Delta-tabell som är fast i en viss version av tabellen, till exempel i Python:

df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")

display(df1)

eller, alternativt:

df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")

display(df2)

Mer information finns i Arbeta med Delta Lake-tabellhistorik.

Optimera en tabell

När du har gjort flera ändringar i en tabell kan det finnas många små filer. För att förbättra hastigheten för läsfrågor kan du använda OPTIMIZE för att komprimera små filer till större:

OPTIMIZE people_10m

Z-ordning efter kolumner

För att förbättra läsprestanda ytterligare kan du samplaceera relaterad information i samma uppsättning filer med Z-Ordering. Den här samlokaliteten används automatiskt av Delta Lake-datahoppande algoritmer för att dramatiskt minska mängden data som behöver läsas. För Z-Order-data anger du de kolumner som ska beställas ZORDER BY i -satsen. Om du till exempel vill samplacera efter genderkör du:

OPTIMIZE people_10m
ZORDER BY (gender)

Fullständig uppsättning alternativ som är tillgängliga när du kör finns OPTIMIZEi Komprimera datafiler med optimering på Delta Lake.

Rensa ögonblicksbilder med VACUUM

Delta Lake tillhandahåller ögonblicksbildisolering för läsningar, vilket innebär att det är säkert att köra OPTIMIZE även när andra användare eller jobb kör frågor mot tabellen. Men så småningom bör du rensa gamla ögonblicksbilder. Du kan göra detta genom att VACUUM köra kommandot:

VACUUM people_10m

Mer information om hur du använder VACUUM effektivt finns i Ta bort oanvända datafiler med vakuum.