Självstudie: Delta Lake
Den här självstudien beskriver vanliga Delta Lake-åtgärder i Azure Databricks, inklusive följande:
- Skapa en tabell.
- Upsert till en tabell.
- Läs från en tabell.
- Visa tabellhistorik.
- Fråga en tidigare version av en tabell.
- Optimera en tabell.
- Lägg till ett Z-orderindex.
- Dammsug oupphörliga filer.
Du kan köra python-, R-, Scala- och SQL-exempelkoden i den här artikeln från en notebook-fil som är kopplad till ett Azure Databricks-kluster. Du kan också köra SQL-koden i den här artikeln från en fråga som är associerad med ett SQL-lager i Databricks SQL.
Kommentar
Några av följande kodexempel använder en namnområdes notation på två nivåer som består av ett schema (kallas även en databas) och en tabell eller vy (till exempel default.people10m
). Om du vill använda de här exemplen med Unity Catalog ersätter du namnområdet på två nivåer med en namnområdes notation på tre nivåer i Unity Catalog som består av en katalog, ett schema och en tabell eller vy (till exempel main.default.people10m
).
Skapa en tabell
Alla tabeller som skapats i Azure Databricks använder Delta Lake som standard.
Kommentar
Delta Lake är standardvärdet för alla kommandon för läsning, skrivningar och tabellskapande i Azure Databricks.
Python
# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)
R
library(SparkR)
sparkR.session()
# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")
# Write the data to a table.
table_name = "people_10m"
saveAsTable(
df = df,
tableName = table_name
)
Scala
// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
// Write the data to a table.
val table_name = "people_10m"
people.write.saveAsTable("people_10m")
SQL
DROP TABLE IF EXISTS people_10m;
CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;
Föregående åtgärder skapar en ny hanterad tabell med hjälp av schemat som härleddes från data. Information om tillgängliga alternativ när du skapar en Delta-tabell finns i SKAPA TABELL.
För hanterade tabeller avgör Azure Databricks platsen för data. Om du vill hämta platsen kan du använda instruktionen DESCRIBE DETAIL , till exempel:
Python
display(spark.sql('DESCRIBE DETAIL people_10m'))
R
display(sql("DESCRIBE DETAIL people_10m"))
Scala
display(spark.sql("DESCRIBE DETAIL people_10m"))
SQL
DESCRIBE DETAIL people_10m;
Ibland kanske du vill skapa en tabell genom att ange schemat innan du infogar data. Du kan slutföra detta med följande SQL-kommandon:
CREATE TABLE IF NOT EXISTS people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
CREATE OR REPLACE TABLE people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
I Databricks Runtime 13.3 LTS och senare kan du använda CREATE TABLE LIKE
för att skapa en ny tom Delta-tabell som duplicerar schema- och tabellegenskaperna för en Delta-källtabell. Detta kan vara särskilt användbart när du befordrar tabeller från en utvecklingsmiljö till produktion, till exempel i följande kodexempel:
CREATE TABLE prod.people10m LIKE dev.people10m
Du kan också använda API:et DeltaTableBuilder
i Delta Lake för att skapa tabeller. Jämfört med Api:erna för DataFrameWriter gör det här API:et det enklare att ange ytterligare information som kolumnkommenterar, tabellegenskaper och genererade kolumner.
Viktigt!
Den här funktionen finns som allmänt tillgänglig förhandsversion.
Python
# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
.tableName("default.people10m") \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.execute()
# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.property("description", "table with people data") \
.location("/tmp/delta/people10m") \
.execute()
Scala
// Create table in the metastore
DeltaTable.createOrReplace(spark)
.tableName("default.people10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.property("description", "table with people data")
.location("/tmp/delta/people10m")
.execute()
Upsert till en tabell
Om du vill sammanfoga en uppsättning uppdateringar och infogningar i en befintlig Delta-tabell använder du MERGE INTO-instruktionen. Följande instruktion tar till exempel data från källtabellen och sammanfogar dem till deltatabellen för mål. När det finns en matchande rad i båda tabellerna uppdaterar Delta Lake datakolumnen med det angivna uttrycket. När det inte finns någon matchande rad lägger Delta Lake till en ny rad. Den här åtgärden kallas för en upsert.
CREATE OR REPLACE TEMP VIEW people_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
Om du anger *
uppdateras eller infogas alla kolumner i måltabellen. Detta förutsätter att källtabellen har samma kolumner som de i måltabellen, annars utlöser frågan ett analysfel.
Du måste ange ett värde för varje kolumn i tabellen när du utför en INSERT
åtgärd (till exempel när det inte finns någon matchande rad i den befintliga datamängden). Du behöver dock inte uppdatera alla värden.
Om du vill se resultatet frågar du tabellen.
SELECT * FROM people_10m WHERE id >= 9999998
Läsa en tabell
Du får åtkomst till data i Delta-tabeller efter tabellnamnet eller tabellsökvägen, enligt följande exempel:
Python
people_df = spark.read.table(table_name)
display(people_df)
## or
people_df = spark.read.load(table_path)
display(people_df)
R
people_df = tableToDF(table_name)
display(people_df)
Scala
val people_df = spark.read.table(table_name)
display(people_df)
\\ or
val people_df = spark.read.load(table_path)
display(people_df)
SQL
SELECT * FROM people_10m;
SELECT * FROM delta.`<path-to-table`;
Skriva till en tabell
Delta Lake använder standardsyntax för att skriva data till tabeller.
Om du vill lägga till nya data atomiskt i en befintlig Delta-tabell använder du append
läget som i följande exempel:
SQL
INSERT INTO people10m SELECT * FROM more_people
Python
df.write.mode("append").saveAsTable("people10m")
Scala
df.write.mode("append").saveAsTable("people10m")
Om du vill ersätta alla data i en tabell atomiskt använder du overwrite
läget som i följande exempel:
SQL
INSERT OVERWRITE TABLE people10m SELECT * FROM more_people
Python
df.write.mode("overwrite").saveAsTable("people10m")
Scala
df.write.mode("overwrite").saveAsTable("people10m")
Uppdatera en tabell
Du kan uppdatera data som matchar ett predikat i en Delta-tabell. Om du till exempel vill ändra en förkortning i gender
kolumnen från M
eller F
till Male
eller Female
i en tabell med namnet people10m
eller en sökväg på /tmp/delta/people-10m
kan du köra följande:
SQL
UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
Ta bort från en tabell
Du kan ta bort data som matchar ett predikat från en Delta-tabell. Du kan till exempel köra följande i en tabell med namnet people10m
eller en sökväg på /tmp/delta/people-10m
, för att ta bort alla rader som motsvarar personer med ett värde i birthDate
kolumnen från före 1955
:
SQL
DELETE FROM people10m WHERE birthDate < '1955-01-01'
DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
Viktigt!
delete
tar bort data från den senaste versionen av Delta-tabellen men tar inte bort dem från den fysiska lagringen förrän de gamla versionerna uttryckligen har dammsugats. Mer information finns i vakuum .
Visa tabellhistorik
Om du vill visa historiken för en tabell använder du INSTRUKTIONEN BESKRIVA HISTORIA , som innehåller härkomstinformation, inklusive tabellversion, åtgärd, användare och så vidare, för varje skrivning till en tabell.
DESCRIBE HISTORY people_10m
Fråga en tidigare version av tabellen (tidsresa)
Med Delta Lake-tidsresor kan du fråga en äldre ögonblicksbild av en Delta-tabell.
Om du vill köra frågor mot en äldre version av en tabell anger du en version eller tidsstämpel i en SELECT
instruktion. Om du till exempel vill fråga version 0 från historiken ovan använder du:
SELECT * FROM people_10m VERSION AS OF 0
eller
SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
För tidsstämplar accepteras endast datum- eller tidsstämpelsträngar, till exempel "2019-01-01"
och "2019-01-01'T'00:00:00.000Z"
.
Med DataFrameReader-alternativ kan du skapa en DataFrame från en Delta-tabell som är fast i en viss version av tabellen, till exempel i Python:
df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")
display(df1)
eller, alternativt:
df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")
display(df2)
Mer information finns i Arbeta med Delta Lake-tabellhistorik.
Optimera en tabell
När du har gjort flera ändringar i en tabell kan det finnas många små filer. För att förbättra hastigheten för läsfrågor kan du använda OPTIMIZE
för att komprimera små filer till större:
OPTIMIZE people_10m
Z-ordning efter kolumner
För att förbättra läsprestanda ytterligare kan du samplaceera relaterad information i samma uppsättning filer med Z-Ordering. Den här samlokaliteten används automatiskt av Delta Lake-datahoppande algoritmer för att dramatiskt minska mängden data som behöver läsas. För Z-Order-data anger du de kolumner som ska beställas ZORDER BY
i -satsen. Om du till exempel vill samplacera efter gender
kör du:
OPTIMIZE people_10m
ZORDER BY (gender)
Fullständig uppsättning alternativ som är tillgängliga när du kör finns OPTIMIZE
i Komprimera datafiler med optimering på Delta Lake.
Rensa ögonblicksbilder med VACUUM
Delta Lake tillhandahåller ögonblicksbildisolering för läsningar, vilket innebär att det är säkert att köra OPTIMIZE
även när andra användare eller jobb kör frågor mot tabellen. Men så småningom bör du rensa gamla ögonblicksbilder. Du kan göra detta genom att VACUUM
köra kommandot:
VACUUM people_10m
Mer information om hur du använder VACUUM
effektivt finns i Ta bort oanvända datafiler med vakuum.