Oktatóanyag: Delta Lake
Ez az oktatóanyag bemutatja az Azure Databricks gyakori Delta Lake-műveleteit, beleértve a következőket:
- Tábla létrehozása.
- Felfelé egy táblához.
- Olvasás egy táblából.
- Táblaelőzmények megjelenítése.
- Tábla egy korábbi verziójának lekérdezése.
- Tábla optimalizálása.
- Egy Z-order index hozzáadása.
- Nem hivatkozott fájlok törlése.
A cikkben szereplő Python-, R-, Scala- és SQL-kódot egy Azure Databricks-fürthöz csatolt jegyzetfüzetből futtathatja. A cikkben szereplő SQL-kódot a Databricks SQL-ben lévő SQL-raktárhoztársított lekérdezésből is futtathatja.
Feljegyzés
Az alábbi példakódok egy kétszintű névtér-jelölést használnak, amely egy sémából (más néven adatbázisból) és egy táblából vagy nézetből (például default.people10m
) áll. Ha ezeket a példákat a Unity Catalogra szeretné használni, cserélje le a kétszintű névteret egy katalógusból, sémából és táblázatból vagy nézetből (példáulmain.default.people10m
) álló Unity Catalog háromszintű névtér-jelölésre.
Tábla létrehozása
Az Azure Databricksben létrehozott összes tábla alapértelmezés szerint a Delta Lake-t használja.
Feljegyzés
A Delta Lake az azure Databricks összes olvasási, írási és táblázat-létrehozási parancsának alapértelmezett értéke.
Python
# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)
R
library(SparkR)
sparkR.session()
# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")
# Write the data to a table.
table_name = "people_10m"
saveAsTable(
df = df,
tableName = table_name
)
Scala
// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
// Write the data to a table.
val table_name = "people_10m"
people.write.saveAsTable("people_10m")
SQL
DROP TABLE IF EXISTS people_10m;
CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;
Az előző műveletek létrehoznak egy új felügyelt táblát az adatokból levont sémával. A Delta-tábla létrehozásakor elérhető lehetőségekről további információt a CREATE TABLE (TÁBLA LÉTREHOZÁSA) című témakörben talál.
Felügyelt táblák esetén az Azure Databricks határozza meg az adatok helyét. A hely lekéréséhez használhatja a DESCRIBE DETAIL utasítást, például:
Python
display(spark.sql('DESCRIBE DETAIL people_10m'))
R
display(sql("DESCRIBE DETAIL people_10m"))
Scala
display(spark.sql("DESCRIBE DETAIL people_10m"))
SQL
DESCRIBE DETAIL people_10m;
Előfordulhat, hogy az adatok beszúrása előtt a séma megadásával szeretne táblázatot létrehozni. Ezt a következő SQL-parancsokkal hajthatja végre:
CREATE TABLE IF NOT EXISTS people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
CREATE OR REPLACE TABLE people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
A Databricks Runtime 13.3 LTS-ben és újabb verziókban létrehozhat egy új üres Delta-táblát, CREATE TABLE LIKE
amely duplikálja a forrás Delta-tábla séma- és táblatulajdonságait. Ez különösen hasznos lehet a táblák fejlesztési környezetből éles környezetbe való előléptetéséhez, például a következő kód példájában:
CREATE TABLE prod.people10m LIKE dev.people10m
A Delta Lake API-jának használatával DeltaTableBuilder
táblákat is létrehozhat. A DataFrameWriter API-khoz képest ez az API megkönnyíti a további információk megadását, például az oszlopbejegyzéseket, a táblatulajdonságokat és a létrehozott oszlopokat.
Fontos
Ez a funkció a nyilvános előzetes verzióban érhető el.
Python
# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
.tableName("default.people10m") \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.execute()
# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.property("description", "table with people data") \
.location("/tmp/delta/people10m") \
.execute()
Scala
// Create table in the metastore
DeltaTable.createOrReplace(spark)
.tableName("default.people10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.property("description", "table with people data")
.location("/tmp/delta/people10m")
.execute()
Upsert to a table
Ha frissítéseket és beszúrásokat szeretne egy meglévő Delta-táblába egyesíteni, használja a MERGE INTO utasítást . A következő utasítás például adatokat vesz fel a forrástáblából, és egyesíti őket a cél Delta-táblával. Ha mindkét táblában egyező sor található, a Delta Lake a megadott kifejezéssel frissíti az adatoszlopot. Ha nincs egyező sor, a Delta Lake új sort ad hozzá. Ez a művelet upsert néven ismert.
CREATE OR REPLACE TEMP VIEW people_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
Ha megadja *
, ez frissíti vagy beszúrja a céltábla összes oszlopát. Ez feltételezi, hogy a forrástábla oszlopai megegyeznek a céltáblában szereplő oszlopokkal, ellenkező esetben a lekérdezés elemzési hibát jelez.
A művelet végrehajtásakor INSERT
(például ha a meglévő adathalmazban nincs egyező sor) meg kell adnia egy értéket a tábla minden oszlopához. Azonban nem kell frissítenie az összes értéket.
Az eredmények megtekintéséhez kérdezze le a táblát.
SELECT * FROM people_10m WHERE id >= 9999998
Táblázat olvasása
A Delta-táblák adatait a táblanév vagy a tábla elérési útja alapján érheti el, ahogyan az alábbi példákban is látható:
Python
people_df = spark.read.table(table_name)
display(people_df)
## or
people_df = spark.read.load(table_path)
display(people_df)
R
people_df = tableToDF(table_name)
display(people_df)
Scala
val people_df = spark.read.table(table_name)
display(people_df)
\\ or
val people_df = spark.read.load(table_path)
display(people_df)
SQL
SELECT * FROM people_10m;
SELECT * FROM delta.`<path-to-table`;
Írás táblába
A Delta Lake szabványos szintaxist használ az adatok táblákba való írásához.
Ha új adatokat szeretne atomilag hozzáadni egy meglévő Delta-táblához, használja append
a módot az alábbi példákhoz hasonlóan:
SQL
INSERT INTO people10m SELECT * FROM more_people
Python
df.write.mode("append").saveAsTable("people10m")
Scala
df.write.mode("append").saveAsTable("people10m")
A tábla összes adatának atomi cseréjéhez használja overwrite
a módot az alábbi példákhoz hasonlóan:
SQL
INSERT OVERWRITE TABLE people10m SELECT * FROM more_people
Python
df.write.mode("overwrite").saveAsTable("people10m")
Scala
df.write.mode("overwrite").saveAsTable("people10m")
Tábla frissítése
A Delta-táblák predikátumának megfelelő adatokat frissítheti. Például egy névvel ellátott people10m
táblában vagy a következő /tmp/delta/people-10m
elérési úton módosíthatja az oszlopban M
Female
Male
F
lévő gender
rövidítést:
SQL
UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
Törlés táblából
Egy predikátumnak megfelelő adatokat eltávolíthat egy Delta-táblából. Például egy névvel ellátott people10m
táblában vagy a következő /tmp/delta/people-10m
elérési úton törölheti az oszlopban birthDate
1955
lévő értékekkel rendelkező személyeknek megfelelő összes sort, a következőt futtathatja:
SQL
DELETE FROM people10m WHERE birthDate < '1955-01-01'
DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
Fontos
delete
Eltávolítja az adatokat a Delta-tábla legújabb verziójából, de nem távolítja el azokat a fizikai tárolóból, amíg a régi verziók nem lesznek explicit módon porszívózva. Részletekért lásd a vákuumot .
Táblaelőzmények megjelenítése
Egy tábla előzményeinek megtekintéséhez használja a DESCRIBE HISTORY utasítást, amely az egyes táblákba történő íráshoz a táblaverziót, a műveletet, a felhasználót stb. is tartalmazza.
DESCRIBE HISTORY people_10m
A tábla egy korábbi verziójának lekérdezése (időutazás)
A Delta Lake időutazással lekérdezheti egy Delta-tábla régebbi pillanatképét.
A tábla régebbi verziójának lekérdezéséhez adjon meg egy verziót vagy időbélyeget egy SELECT
utasításban. Ha például a fenti előzményekből szeretné lekérdezni a 0-s verziót, használja a következőt:
SELECT * FROM people_10m VERSION AS OF 0
vagy
SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
Időbélyegek esetén a rendszer csak dátum- vagy időbélyeg-sztringeket fogad el, például "2019-01-01"
és "2019-01-01'T'00:00:00.000Z"
.
A DataFrameReader beállításai lehetővé teszik, hogy dataFrame-et hozzon létre egy Delta-táblából, amely a tábla egy adott verziójára van rögzítve, például Pythonban:
df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")
display(df1)
vagy másik lehetőségként:
df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")
display(df2)
További részletekért lásd : A Delta Lake táblaelőzményeinek ismertetése.
Táblázat optimalizálása
Ha több módosítást hajtott végre egy táblán, előfordulhat, hogy sok kis fájllal rendelkezik. Az olvasási lekérdezések sebességének javítása érdekében kis méretű fájlokat összecsukhat OPTIMIZE
nagyobbakra:
OPTIMIZE people_10m
Z-sorrend oszlopok szerint
Az olvasási teljesítmény további javítása érdekében a Z-Ordering segítségével azonos fájlkészletben keresheti meg a kapcsolódó információkat. Ezt a közös helységet a Delta Lake adatátugrási algoritmusai automatikusan használják az olvasni kívánt adatok mennyiségének drasztikus csökkentésére. A Z-Order adatokhoz meg kell adnia a záradékban a sorrendbe ZORDER BY
rendezendő oszlopokat. Például a társkereséshez futtassa a következőt gender
:
OPTIMIZE people_10m
ZORDER BY (gender)
A futtatáskor OPTIMIZE
elérhető összes beállításért tekintse meg a Delta Lake-beli optimalizálással rendelkező adatfájlok tömörítését.
Pillanatképek törlése a VACUUM
A Delta Lake pillanatkép-elkülönítést biztosít az olvasásokhoz, ami azt jelenti, hogy biztonságosan futtatható OPTIMIZE
, még akkor is, ha más felhasználók vagy feladatok kérdezik le a táblát. Végül azonban törölnie kell a régi pillanatképeket. Ezt a parancs futtatásával VACUUM
teheti meg:
VACUUM people_10m
A hatékony használatról VACUUM
további információt a nem használt adatfájlok eltávolítása vákuummal című témakörben talál.