Linux Foundation Delta Lake – áttekintés
Ez a cikk át lett alakítva, hogy jobban érthető legyen az eredeti megfelelője itt. Ez a cikk segít gyorsan megismerni a Delta Lake főbb funkcióit. A cikk kódrészleteket tartalmaz, amelyek bemutatják, hogyan lehet olvasni és írni Delta Lake-táblákból interaktív, kötegelt és streamelési lekérdezésekből. A kódrészletek a PySpark, a Scala here és a C# jegyzetfüzetek készletében is elérhetők itt
A következőket fogjuk tárgyalni:
- Tábla létrehozása
- Adatok olvasása
- Táblaadatok frissítése
- Táblaadatok felülírása
- Feltételes frissítés felülírás nélkül
- Az adatok régebbi verzióinak olvasása a Time Travel használatával
- Adatstream írása egy táblába
- Változások adatfolyamának olvasása egy táblából
- SQL-támogatás
Konfiguráció
Győződjön meg arról, hogy az alábbiakat a környezetének megfelelően módosítja.
import random
session_id = random.randint(0,1000000)
delta_table_path = "/delta/delta-table-{0}".format(session_id)
delta_table_path
var sessionId = (new Random()).Next(10000000);
var deltaTablePath = $"/delta/delta-table-{sessionId}";
deltaTablePath
val sessionId = scala.util.Random.nextInt(1000000)
val deltaTablePath = s"/delta/delta-table-$sessionId";
Találatok:
"/delta/delta-table-335323"
Tábla létrehozása
Delta Lake-tábla létrehozásához írjon ki egy DataFrame-et delta formátumban. A formátumot a Parquet, a CSV, a JSON stb. formátumról a különbözetre módosíthatja.
Az alábbi kód bemutatja, hogyan hozhat létre új Delta Lake-táblát a DataFrame-ből levont sémával.
data = spark.range(0,5)
data.show()
data.write.format("delta").save(delta_table_path)
var data = spark.Range(0,5);
data.Show();
data.Write().Format("delta").Save(deltaTablePath);
val data = spark.range(0, 5)
data.show
data.write.format("delta").save(deltaTablePath)
Találatok:
ID (Azonosító) |
---|
0 |
1 |
2 |
3 |
4 |
Adatok olvasása
A Delta Lake-táblában lévő adatokat a fájlok elérési útjának és a delta formátumának megadásával olvashatja el.
df = spark.read.format("delta").load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Load(deltaTablePath);
df.Show()
val df = spark.read.format("delta").load(deltaTablePath)
df.show()
Találatok:
ID (Azonosító) |
---|
1 |
3 |
4 |
0 |
2 |
Az eredmények sorrendje eltér a fentitől, mivel az eredmények kimenetelése előtt nem volt explicit módon megadott sorrend.
Táblaadatok frissítése
A Delta Lake számos műveletet támogat a táblák standard DataFrame API-k használatával történő módosításához. Ezek a műveletek a Delta formátum által hozzáadott fejlesztések egyike. Az alábbi példa egy kötegelt feladatot futtat a tábla adatainak felülírásához.
data = spark.range(5,10)
data.write.format("delta").mode("overwrite").save(delta_table_path)
df.show()
var data = spark.Range(5,10);
data.Write().Format("delta").Mode("overwrite").Save(deltaTablePath);
df.Show();
val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save(deltaTablePath)
df.show()
Találatok:
ID (Azonosító) |
---|
7 |
8 |
5 |
9 |
6 |
Itt láthatja, hogy mind az öt rekord frissült az új értékek tárolására.
Mentés katalógustáblákként
A Delta Lake írhat felügyelt vagy külső katalógustáblákba.
data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql("CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{0}'".format(delta_table_path))
spark.sql("SHOW TABLES").show()
data.Write().Format("delta").SaveAsTable("ManagedDeltaTable");
spark.Sql($"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{deltaTablePath}'");
spark.Sql("SHOW TABLES").Show();
data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql(s"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '$deltaTablePath'")
spark.sql("SHOW TABLES").show
Találatok:
adatbázis | tableName | ideiglenes |
---|---|---|
alapértelmezett | externaldeltatable | hamis |
alapértelmezett | manageddeltatable | hamis |
Ezzel a kóddal létrehozott egy új táblát a katalógusban egy meglévő adatkeretből, amelyet felügyelt táblának nevezünk. Ezután definiált egy új külső táblát a katalógusban, amely egy meglévő helyet, más néven külső táblát használ. A kimenetben mindkét tábla látható, függetlenül attól, hogy hogyan lettek létrehozva, a katalógusban szerepel.
Most már mindkét tábla kiterjesztett tulajdonságait megtekintheti
spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ManagedDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=false)
Találatok:
col_name | data_type | megjegyzés |
---|---|---|
id | bigint | null |
Részletes táblázatadatok | ||
Adatbázis | alapértelmezett | |
Tábla | manageddeltatable | |
Tulajdonos | trusted-service-user | |
Létrehozás ideje | Szo ápr 25 00:35:34 UTC 2020 | |
Utolsó hozzáférés | Cs. jan. 01. 00:00:00 UTC 1970 | |
Created By (Létrehozó) | Spark 2.4.4.2.6.99.201-11401300 | |
Típus | KEZELT | |
Szolgáltató | delta | |
Táblázat tulajdonságai | [transient_lastDdlTime=1587774934] | |
Statisztika | 2407 bájt | |
Hely | abfss://data@<data lake.dfs.core.windows.net/synapse/workspaces/>< workspace name>/warehouse/manageddeltatable | |
Serde-könyvtár | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | |
InputFormat | org.apache.hadoop.mapred.SequenceFileInputFormat | |
OutputFormat | org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat | |
Tárolási tulajdonságok | [szerializálás.format=1] |
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ExternalDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=false)
Találatok:
col_name | data_type | megjegyzés |
---|---|---|
id | bigint | null |
Részletes táblázatadatok | ||
Adatbázis | alapértelmezett | |
Tábla | externaldeltatable | |
Tulajdonos | trusted-service-user | |
Létrehozás ideje | Szo ápr 25 00:35:38 UTC 2020 | |
Utolsó hozzáférés | Cs. jan. 01. 00:00:00 UTC 1970 | |
Created By (Létrehozó) | Spark 2.4.4.2.6.99.201-11401300 | |
Típus | KÜLSŐ | |
Szolgáltató | DELTA | |
Táblázat tulajdonságai | [transient_lastDdlTime=1587774938] | |
Hely | abfss://data@<data lake.dfs.core.windows.net/delta/delta-table-587152> | |
Serde-könyvtár | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | |
InputFormat | org.apache.hadoop.mapred.SequenceFileInputFormat | |
OutputFormat | org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat | |
Tárolási tulajdonságok | [szerializálás.format=1] |
Feltételes frissítés felülírás nélkül
A Delta Lake programozott API-kat biztosít a feltételes frissítéshez, törléshez és adategyesítéshez (ezt a parancsot gyakran upsert-nek nevezik) a táblákban.
from delta.tables import *
from pyspark.sql.functions import *
delta_table = DeltaTable.forPath(spark, delta_table_path)
delta_table.update(
condition = expr("id % 2 == 0"),
set = { "id": expr("id + 100") })
delta_table.toDF().show()
using Microsoft.Spark.Extensions.Delta;
using Microsoft.Spark.Extensions.Delta.Tables;
using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions;
var deltaTable = DeltaTable.ForPath(deltaTablePath);
deltaTable.Update(
condition: Expr("id % 2 == 0"),
set: new Dictionary<string, Column>(){{ "id", Expr("id + 100") }});
deltaTable.ToDF().Show();
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forPath(deltaTablePath)
// Update every even value by adding 100 to it
deltaTable.update(
condition = expr("id % 2 == 0"),
set = Map("id" -> expr("id + 100")))
deltaTable.toDF.show
Találatok:
ID (Azonosító) |
---|
106 |
108 |
5 |
7 |
9 |
Itt most 100-t adott hozzá minden páros azonosítóhoz.
delta_table.delete("id % 2 == 0")
delta_table.toDF().show()
deltaTable.Delete(condition: Expr("id % 2 == 0"));
deltaTable.ToDF().Show();
deltaTable.delete(condition = expr("id % 2 == 0"))
deltaTable.toDF.show
Találatok:
ID (Azonosító) |
---|
5 |
7 |
9 |
Figyelje meg, hogy minden páros sor törölve lett.
new_data = spark.range(0,20).alias("newData")
delta_table.alias("oldData")\
.merge(new_data.alias("newData"), "oldData.id = newData.id")\
.whenMatchedUpdate(set = { "id": lit("-1")})\
.whenNotMatchedInsert(values = { "id": col("newData.id") })\
.execute()
delta_table.toDF().show(100)
var newData = spark.Range(20).As("newData");
deltaTable
.As("oldData")
.Merge(newData, "oldData.id = newData.id")
.WhenMatched()
.Update(new Dictionary<string, Column>() {{"id", Lit("-1")}})
.WhenNotMatched()
.Insert(new Dictionary<string, Column>() {{"id", Col("newData.id")}})
.Execute();
deltaTable.ToDF().Show(100);
val newData = spark.range(0, 20).toDF
deltaTable.as("oldData").
merge(
newData.as("newData"),
"oldData.id = newData.id").
whenMatched.
update(Map("id" -> lit(-1))).
whenNotMatched.
insert(Map("id" -> col("newData.id"))).
execute()
deltaTable.toDF.show()
Találatok:
ID (Azonosító) |
---|
18 |
15 |
19 |
2 |
1 |
6 |
8 |
3 |
-1 |
10 |
13 |
0 |
16 |
4 |
-1 |
12 |
11 |
14 |
-1 |
17 |
Itt a meglévő adatok kombinációjával rendelkezik. A meglévő adatokhoz hozzá lett rendelve a -1 érték az update(WhenMatched) kódútvonalon. A kódrészlet tetején létrehozott és a beszúrási kód elérési útján (WhenNotMatched) hozzáadott új adatokat is hozzáadtuk.
Előzmények
A Delta Lake-nek lehetősége van egy tábla előzményeinek megtekintésére. Vagyis a mögöttes Delta-táblán végrehajtott módosítások. Az alábbi cella bemutatja, milyen egyszerű az előzmények vizsgálata.
delta_table.history().show(20, 1000, False)
deltaTable.History().Show(20, 1000, false);
deltaTable.history.show(false)
Találatok:
version | időbélyeg | userId | userName (Felhasználónév) | művelet | operationParameters | feladat | jegyzetfüzet | clusterId | readVersion | isolationLevel | isBlindAppend |
---|---|---|---|---|---|---|---|---|---|---|---|
4 | 2020-04-25 00:36:27 | null | null | EGYESÍTÉSE | [predikátum -> (oldData.ID = newData.ID )] |
null | null | null | 3 | null | hamis |
3 | 2020-04-25 00:36:08 | null | null | DELETE | [predikátum -> ["((ID % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] |
null | null | null | 2 | null | hamis |
2 | 2020-04-25 00:35:51 | null | null | UPDATE | [predikátum -> ((ID#744L % cast(2 as bigint)) = cast(0 as bigint))] | null | null | null | 1 | null | hamis |
1 | 2020-04-25 00:35:05 | null | null | ÍRNI | [mód -> Felülírás, partitionBy –> []] | null | null | null | 0 | null | hamis |
0 | 2020-04-25 00:34:34 | null | null | ÍRNI | [mód -> ErrorIfExists, partitionBy –> []] | null | null | null | null | null | true |
Itt láthatja a fenti kódrészleteken végrehajtott összes módosítást.
Az adatok régebbi verzióinak olvasása a Time Travel használatával
A Delta Lake-tábla korábbi pillanatképeit a Time Travel nevű funkcióval kérdezheti le. Ha hozzá szeretne férni a felülírt adatokhoz, lekérdezheti a tábla pillanatképét, mielőtt felülírja az első adatkészletet a versionAsOf beállítással.
Az alábbi cella futtatása után látnia kell az első adatkészletet, mielőtt felülírja azokat. A Time Travel egy hatékony funkció, amely kihasználja a Delta Lake tranzakciónaplójának előnyeit a táblában már nem szereplő adatok eléréséhez. Ha eltávolítja a 0-s verziót (vagy megadja az 1-es verziót), ismét megjelennek az újabb adatok. További információ: Tábla régebbi pillanatképének lekérdezése.
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Option("versionAsOf", 0).Load(deltaTablePath);
df.Show();
val df = spark.read.format("delta").option("versionAsOf", 0).load(deltaTablePath)
df.show()
Találatok:
ID (Azonosító) |
---|
0 |
1 |
4 |
3 |
2 |
Itt láthatja, hogy visszament az adatok legkorábbi verziójára.
Adatstream írása táblázatba
Delta Lake-táblába is írhat a Spark strukturált streameléssel. A Delta Lake tranzakciónaplója pontosan egyszeri feldolgozást garantál, még akkor is, ha más streamek vagy kötegelt lekérdezések futnak egyidejűleg a táblán. Alapértelmezés szerint a streamek hozzáfűzési módban futnak, amely új rekordokat ad hozzá a táblához.
További információ a Delta Lake strukturált streameléssel való integrációjáról: Table Streaming Reads and Writes (Táblastreamelés olvasása és írása).
Az alábbi cellákban a következőket tesszük:
- 30. cella Az újonnan hozzáfűzött adatok megjelenítése
- 31. cella – Előzmények vizsgálata
- 32. cella: A strukturált streamelési feladat leállítása
- 33. cella – Előzmények <vizsgálata – Láthatja, hogy a hozzáfűzések leálltak
Először beállít egy egyszerű Spark Streaming-feladatot, amely létrehoz egy sorozatot, és írásra állítja a feladatot a Delta-táblába.
streaming_df = spark.readStream.format("rate").load()
stream = streaming_df\
.selectExpr("value as id")\
.writeStream\
.format("delta")\
.option("checkpointLocation", "/tmp/checkpoint-{0}".format(session_id))\
.start(delta_table_path)
var streamingDf = spark.ReadStream().Format("rate").Load();
var stream = streamingDf.SelectExpr("value as id").WriteStream().Format("delta").Option("checkpointLocation", $"/tmp/checkpoint-{sessionId}").Start(deltaTablePath);
val streamingDf = spark.readStream.format("rate").load()
val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", s"/tmp/checkpoint-$sessionId").start(deltaTablePath)
Változások adatfolyamának beolvasása egy táblából
Miközben a stream a Delta Lake táblába ír, ebből a táblából is olvashat streamforrásként. Elindíthat például egy másik streamelési lekérdezést, amely kinyomtatja a Delta Lake-tábla összes módosítását.
delta_table.toDF().sort(col("id").desc()).show(100)
deltaTable.ToDF().Sort(Col("id").Desc()).Show(100);
deltaTable.toDF.sort($"id".desc).show
Találatok:
ID (Azonosító) |
---|
19 |
18 |
17 |
16 |
15 |
14 |
13 |
12 |
11 |
10 |
8 |
6 |
4 |
3 |
2 |
1 |
0 |
-1 |
-1 |
-1 |
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(20, 1000, False)
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(20, 1000, false);
deltaTable.history.show
Találatok:
version | időbélyeg | művelet | operationParameters | readVersion |
---|---|---|---|---|
5 | 2020-04-25 00:37:09 | STREAMELÉSI FRISSÍTÉS | [outputMode -> Append, queryId –> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId –> 0] | 4 |
4 | 2020-04-25 00:36:27 | EGYESÍTÉSE | [predikátum -> (oldData.id = newData.id )] |
3 |
3 | 2020-04-25 00:36:08 | DELETE | [predikátum -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] |
2 |
2 | 2020-04-25 00:35:51 | UPDATE | [predikátum -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] | 1 |
1 | 2020-04-25 00:35:05 | ÍRNI | [mód –> Felülírás, partitionBy –> []] | 0 |
0 | 2020-04-25 00:34:34 | ÍRNI | [mód –> ErrorIfExists, partitionBy –> []] | null |
Itt néhány kevésbé érdekes oszlopot elvet, hogy egyszerűbbé tegye az előzmények nézetének megtekintését.
stream.stop()
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(100, 1000, False)
stream.Stop();
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(100, 1000, false);
stream.stop
deltaTable.history.show
Találatok:
version | időbélyeg | művelet | operationParameters | readVersion |
---|---|---|---|---|
5 | 2020-04-25 00:37:09 | STREAMELÉSI FRISSÍTÉS | [outputMode -> Append, queryId –> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId –> 0] | 4 |
4 | 2020-04-25 00:36:27 | EGYESÍTÉSE | [predikátum -> (oldData.id = newData.id )] |
3 |
3 | 2020-04-25 00:36:08 | DELETE | [predikátum -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] |
2 |
2 | 2020-04-25 00:35:51 | UPDATE | [predikátum -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] | 1 |
1 | 2020-04-25 00:35:05 | ÍRNI | [mód –> Felülírás, partitionBy –> []] | 0 |
0 | 2020-04-25 00:34:34 | ÍRNI | [mód –> ErrorIfExists, partitionBy –> []] | null |
Parquet konvertálása Deltára
A Parquet formátumból a Delta formátumba történő helyben konvertálást is elvégezheti.
Itt tesztelni fogja, hogy a meglévő tábla különbözeti formátumban van-e.
parquet_path = "/parquet/parquet-table-{0}".format(session_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetPath = $"/parquet/parquet-table-{sessionId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath)
val parquetPath = s"/parquet/parquet-table-$sessionId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)
Találatok:
Hamis
Most átalakítja az adatokat delta formátumra, és ellenőrzi, hogy működött-e.
DeltaTable.convertToDelta(spark, "parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
DeltaTable.ConvertToDelta(spark, $"parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath)
DeltaTable.convertToDelta(spark, s"parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)
Találatok:
Igaz
SQL-támogatás
A Delta az SQL-en keresztül támogatja a table utility parancsokat. Az SQL-t a következőre használhatja:
- DeltaTable előzményeinek lekérése
- DeltaTable vákuum
- Parquet-fájl konvertálása Deltára
spark.sql("DESCRIBE HISTORY delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"DESCRIBE HISTORY delta.`{deltaTablePath}`").Show();
spark.sql(s"DESCRIBE HISTORY delta.`$deltaTablePath`").show()
Találatok:
version | időbélyeg | userId | userName (Felhasználónév) | művelet | operationParameters | feladat | jegyzetfüzet | clusterId | readVersion | isolationLevel | isBlindAppend |
---|---|---|---|---|---|---|---|---|---|---|---|
5 | 2020-04-25 00:37:09 | null | null | STREAMELÉSI FRISSÍTÉS | [outputMode –> Ap... | null | null | null | 4 | null | true |
4 | 2020-04-25 00:36:27 | null | null | EGYESÍTÉSE | [predikátum -> (ol... | null | null | null | 3 | null | hamis |
3 | 2020-04-25 00:36:08 | null | null | DELETE | [predikátum -> ["(... | null | null | null | 2 | null | hamis |
2 | 2020-04-25 00:35:51 | null | null | UPDATE | [predikátum -> ((i... | null | null | null | 1 | null | hamis |
1 | 2020-04-25 00:35:05 | null | null | ÍRNI | [mód –> Felülírás... | null | null | null | 0 | null | hamis |
0 | 2020-04-25 00:34:34 | null | null | ÍRNI | [mód –> ErrorIfE... | null | null | null | null | null | true |
spark.sql("VACUUM delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"VACUUM delta.`{deltaTablePath}`").Show();
spark.sql(s"VACUUM delta.`$deltaTablePath`").show()
Találatok:
path |
---|
abfss://data@arca... |
Most ellenőrizni fogja, hogy egy tábla nem különbözeti formátumú tábla-e. Ezután a Spark SQL használatával delta formátumba konvertálja a táblát, és meggyőződik arról, hogy megfelelően konvertálták.
parquet_id = random.randint(0,1000)
parquet_path = "/parquet/parquet-table-{0}-{1}".format(session_id, parquet_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
spark.sql("CONVERT TO DELTA parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetId = (new Random()).Next(10000000);
var parquetPath = $"/parquet/parquet-table-{sessionId}-{parquetId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath);
spark.Sql($"CONVERT TO DELTA parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath);
val parquetId = scala.util.Random.nextInt(1000)
val parquetPath = s"/parquet/parquet-table-$sessionId-$parquetId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)
spark.sql(s"CONVERT TO DELTA parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)
Találatok:
Igaz
A teljes dokumentációt a Delta Lake dokumentációs oldalán találja.
További információ: Delta Lake Project.