Overzicht van Linux Foundation Delta Lake
De oorspronkelijke inhoud (hier) van dit artikel is aangepast voor meer duidelijkheid. Gebruik dit artikel om snel de belangrijkste functies van Delta Lake te verkennen. In het artikel vindt u codefragmenten die aangeven hoe u gegevens uit Delta Lake-tabellen leest en hoe u hier gegevens heen schrijft op basis van query's over interactiviteit, batches en streaming. De codefragmenten zijn ook beschikbaar in een aantal notebooks (PySpark hier, Scala hier en C# hier)
Het volgende wordt behandeld:
- Een tabel maken
- Gegevens lezen
- Tabelgegevens bijwerken
- Tabelgegevens overschrijven
- Voorwaardelijke update zonder overschrijven
- Oudere gegevensversies lezen met behulp van Time Travel
- Een gegevensstroom naar een tabel schrijven
- Een stroom wijzigingen uit een tabel lezen
- SQL-ondersteuning
Configuratie
Zorg ervoor dat u het onderstaande aanpast voor zover dat in uw omgeving past.
import random
session_id = random.randint(0,1000000)
delta_table_path = "/delta/delta-table-{0}".format(session_id)
delta_table_path
var sessionId = (new Random()).Next(10000000);
var deltaTablePath = $"/delta/delta-table-{sessionId}";
deltaTablePath
val sessionId = scala.util.Random.nextInt(1000000)
val deltaTablePath = s"/delta/delta-table-$sessionId";
Resulteert in:
'/delta/delta-table-335323'
Een tabel maken
Als u een Delta Lake-tabel wilt schrijven, schrijft u een DataFrame op basis van een DataFrame in de Delta-indeling. U kunt de indeling van Parquet, CSV, JSON, enzovoort, wijzigen in Delta.
In de volgende code ziet u hoe u een nieuwe Delta Lake-tabel maakt met behulp van het schema dat uit uw DataFrame is afgeleid.
data = spark.range(0,5)
data.show()
data.write.format("delta").save(delta_table_path)
var data = spark.Range(0,5);
data.Show();
data.Write().Format("delta").Save(deltaTablePath);
val data = spark.range(0, 5)
data.show
data.write.format("delta").save(deltaTablePath)
Resulteert in:
Id |
---|
0 |
1 |
2 |
3 |
4 |
Gegevens lezen
U leest gegevens in uw Delta Lake-tabel door het pad naar de bestanden en de Delta-indeling op te geven.
df = spark.read.format("delta").load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Load(deltaTablePath);
df.Show()
val df = spark.read.format("delta").load(deltaTablePath)
df.show()
Resulteert in:
Id |
---|
1 |
3 |
4 |
0 |
2 |
De volgorde waarin de resultaten worden opgegeven, is anders dan hierboven staat aangezien er geen expliciete volgorde is opgegeven voordat de resultaten zijn uitgevoerd.
Tabelgegevens bijwerken
Delta Lake ondersteunt verschillende bewerkingen voor het wijzigen van tabellen met behulp van standaard DataFrame-API's. Deze bewerkingen zijn een van de verbeteringen die de Delta-indeling toevoegt. In het volgende voorbeeld wordt een batchtaak uitgevoerd om de gegevens in de tabel te overschrijven.
data = spark.range(5,10)
data.write.format("delta").mode("overwrite").save(delta_table_path)
df.show()
var data = spark.Range(5,10);
data.Write().Format("delta").Mode("overwrite").Save(deltaTablePath);
df.Show();
val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save(deltaTablePath)
df.show()
Resulteert in:
Id |
---|
7 |
8 |
5 |
9 |
6 |
Hier ziet u dat alle vijf records zijn bijgewerkt met nieuwe waarden.
Opslaan als catalogustabellen
Delta Lake kan schrijven naar beheerde of externe catalogustabellen.
data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql("CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{0}'".format(delta_table_path))
spark.sql("SHOW TABLES").show()
data.Write().Format("delta").SaveAsTable("ManagedDeltaTable");
spark.Sql($"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{deltaTablePath}'");
spark.Sql("SHOW TABLES").Show();
data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql(s"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '$deltaTablePath'")
spark.sql("SHOW TABLES").show
Resulteert in:
database | tableName | isTemporary |
---|---|---|
standaardinstelling | externaldeltatable | false |
standaardinstelling | manageddeltatable | false |
Met deze code hebt u een nieuwe tabel in de catalogus gemaakt op basis van een bestaand DataFrame. Deze tabel wordt een beheerde tabel genoemd. Vervolgens hebt u een nieuwe externe tabel in de catalogus gedefinieerd waarvoor een bestaande locatie wordt gebruikt. Dit wordt een externe tabel genoemd. In de uitvoer ziet u dat beide tabellen, ongeacht de manier waarop ze zijn gemaakt, in de catalogus worden vermeld.
Bekijk nu de uitgebreide eigenschappen van beide tabellen
spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ManagedDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=false)
Resulteert in:
col_name | data_type | comment |
---|---|---|
id | bigint | null |
Gedetailleerde tabelgegevens | ||
Database | standaardinstelling | |
Tabel | manageddeltatable | |
Eigenaar | trusted-service-user | |
Gemaakt om | za 25 apr 00:35:34 UTC 2020 | |
Laatste toegang | do 1 jan 00:00:00 UTC 1970 | |
Created By | Spark 2.4.4.2.6.99.201-11401300 | |
Type | BEHEERD | |
Provider | delta | |
Tabeleigenschappen | [transient_lastDdlTime=1587774934] | |
statistieken | 2407 bytes | |
Locatie | abfss://data@<data lake.dfs.core.windows.net/synapse/workspaces/>< workspace name>/warehouse/manageddeltatable | |
Serde-bibliotheek | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | |
InputFormat | org.apache.hadoop.mapred.SequenceFileInputFormat | |
OutputFormat | org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat | |
Opslageigenschappen | [serialization.format=1] |
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ExternalDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=false)
Resulteert in:
col_name | data_type | comment |
---|---|---|
id | bigint | null |
Gedetailleerde tabelgegevens | ||
Database | standaardinstelling | |
Tabel | externaldeltatable | |
Eigenaar | trusted-service-user | |
Gemaakt om | za 25 apr 00:35:38 UTC 2020 | |
Laatste toegang | do 1 jan 00:00:00 UTC 1970 | |
Created By | Spark 2.4.4.2.6.99.201-11401300 | |
Type | EXTERN | |
Provider | DELTA | |
Tabeleigenschappen | [transient_lastDdlTime=1587774938] | |
Locatie | abfss://data@<data lake.dfs.core.windows.net/delta/delta-table-587152> | |
Serde-bibliotheek | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | |
InputFormat | org.apache.hadoop.mapred.SequenceFileInputFormat | |
OutputFormat | org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat | |
Opslageigenschappen | [serialization.format=1] |
Voorwaardelijke update zonder overschrijven
Delta Lake biedt programmatische API's voor het voorwaardelijk bijwerken, verwijderen en samenvoegen van gegevens (deze opdracht wordt vaak een upsert genoemd) in tabellen.
from delta.tables import *
from pyspark.sql.functions import *
delta_table = DeltaTable.forPath(spark, delta_table_path)
delta_table.update(
condition = expr("id % 2 == 0"),
set = { "id": expr("id + 100") })
delta_table.toDF().show()
using Microsoft.Spark.Extensions.Delta;
using Microsoft.Spark.Extensions.Delta.Tables;
using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions;
var deltaTable = DeltaTable.ForPath(deltaTablePath);
deltaTable.Update(
condition: Expr("id % 2 == 0"),
set: new Dictionary<string, Column>(){{ "id", Expr("id + 100") }});
deltaTable.ToDF().Show();
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forPath(deltaTablePath)
// Update every even value by adding 100 to it
deltaTable.update(
condition = expr("id % 2 == 0"),
set = Map("id" -> expr("id + 100")))
deltaTable.toDF.show
Resulteert in:
Id |
---|
106 |
108 |
5 |
7 |
9 |
Hier hebt u 100 toegevoegd aan elke even id.
delta_table.delete("id % 2 == 0")
delta_table.toDF().show()
deltaTable.Delete(condition: Expr("id % 2 == 0"));
deltaTable.ToDF().Show();
deltaTable.delete(condition = expr("id % 2 == 0"))
deltaTable.toDF.show
Resulteert in:
Id |
---|
5 |
7 |
9 |
U ziet dat elke even rij is verwijderd.
new_data = spark.range(0,20).alias("newData")
delta_table.alias("oldData")\
.merge(new_data.alias("newData"), "oldData.id = newData.id")\
.whenMatchedUpdate(set = { "id": lit("-1")})\
.whenNotMatchedInsert(values = { "id": col("newData.id") })\
.execute()
delta_table.toDF().show(100)
var newData = spark.Range(20).As("newData");
deltaTable
.As("oldData")
.Merge(newData, "oldData.id = newData.id")
.WhenMatched()
.Update(new Dictionary<string, Column>() {{"id", Lit("-1")}})
.WhenNotMatched()
.Insert(new Dictionary<string, Column>() {{"id", Col("newData.id")}})
.Execute();
deltaTable.ToDF().Show(100);
val newData = spark.range(0, 20).toDF
deltaTable.as("oldData").
merge(
newData.as("newData"),
"oldData.id = newData.id").
whenMatched.
update(Map("id" -> lit(-1))).
whenNotMatched.
insert(Map("id" -> col("newData.id"))).
execute()
deltaTable.toDF.show()
Resulteert in:
Id |
---|
18 |
15 |
19 |
2 |
1 |
6 |
8 |
3 |
-1 |
10 |
13 |
0 |
16 |
4 |
-1 |
12 |
11 |
14 |
-1 |
17 |
Hier hebt u een combinatie van de bestaande gegevens. Aan de bestaande gegevens is een waarde van -1 toegewezen in het codepad update(WhenMatched). Die nieuwe gegevens die boven op het codefragment zijn gemaakt en die via het invoegcodepad (WhenNotMatched) zijn toegevoegd, zijn ook toegevoegd.
Geschiedenis
Delta Lake biedt de mogelijkheid om de geschiedenis van een tabel weer te geven. Dat wil zeggen: de wijzigingen die aan de onderliggende Delta-tabel zijn gemaakt. In de onderstaande cel ziet u hoe eenvoudig het is om de geschiedenis te controleren.
delta_table.history().show(20, 1000, False)
deltaTable.History().Show(20, 1000, false);
deltaTable.history.show(false)
Resulteert in:
versie | tijdstempel | userId | userName | bewerking | operationParameters | taak | notebook | clusterId | readVersion | isolationLevel | isBlindAppend |
---|---|---|---|---|---|---|---|---|---|---|---|
4 | 25-04-2020 00:36:27 | null | null | SAMENVOEGEN | [predicaat -> (oldData.ID = newData.ID )] |
null | null | null | 3 | null | false |
3 | 25-04-2020 00:36:08 | null | null | DELETE | [predicaat -> ["((ID % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT)"]] |
null | null | null | 2 | null | false |
2 | 25-04-2020 00:35:51 | null | null | UPDATE | [predicaat -> ((ID#744L % cast(2 as bigint)) = cast(0 as bigint))] | null | null | null | 1 | null | false |
1 | 25-04-2020 00:35:05 | null | null | SCHRIJVEN | [modus -> Overschrijven, partitionBy -> []] | null | null | null | 0 | null | false |
0 | 25-04-2020 00:34:34 | null | null | SCHRIJVEN | [modus -> ErrorIfExists, partitionBy -> []] | null | null | null | null | null | true |
Hier ziet u alle aanpassingen die op de bovenstaande codefragmenten zijn aangebracht.
Oudere gegevensversies lezen met behulp van Time Travel
U kunt met behulp van de functie Time Travel een query uitvoeren op vorige momentopnamen van uw Delta Lake-tabel. Als u toegang wilt krijgen tot de gegevens die u hebt overschreven, kunt u een query uitvoeren op een momentopname van de tabel voordat u de eerste gegevensset hebt overschreven met behulp van de optie versionAsOf.
Zodra u de onderstaande cel uitvoert, ziet u als het goed is de eerste gegevensset van vóór u deze hebt overschreven. Time Travel is een krachtige functie die gebruikmaakt van de kracht van het Delta Lake-transactielogboek om toegang te krijgen tot gegevens die niet meer in de tabel aanwezig zijn. Als u de versie 0-optie verwijdert (of versie 1 opgeeft), ziet u weer de nieuwere gegevens. Zie Query uitvoeren op een oudere momentopname van een tabel voor meer informatie.
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Option("versionAsOf", 0).Load(deltaTablePath);
df.Show();
val df = spark.read.format("delta").option("versionAsOf", 0).load(deltaTablePath)
df.show()
Resulteert in:
Id |
---|
0 |
1 |
4 |
3 |
2 |
Hier ziet u dat u terug bent gegaan naar de vroegste versie van de gegevens.
Een gegevensstroom naar een tabel schrijven
U kunt ook gegevens naar een Delta Lake-tabel schrijven met behulp van Gestructureerd streamen van Spark. Het Delta Lake-transactielogboek garandeert exact één verwerking, zelfs als er geen andere stromen of batchquery's gelijktijdig worden uitgevoerd op de tabel. Standaard worden stromen uitgevoerd in de toevoegmodus, waarmee nieuwe records aan de tabel worden toegevoegd.
Zie Lees- en schrijfbewerkingen in tabelstreaming voor meer informatie over Delta Lake-integratie met Gestructureerd streamen.
Bekijk bij de onderstaande cellen wat we aan het doen zijn:
- Cel 30: de zojuist toegevoegde gegevens weergeven
- Cel 31: de geschiedenis inspecteren
- Cel 32: de gestructureerde streamingtaak stoppen
- Cel 33 Geschiedenis controleren <- U ziet dat toevoegingen zijn gestopt
Eerst gaat u een eenvoudige Spark Streaming-taak instellen om een reeks te genereren en de taak naar uw Delta-tabel te laten schrijven.
streaming_df = spark.readStream.format("rate").load()
stream = streaming_df\
.selectExpr("value as id")\
.writeStream\
.format("delta")\
.option("checkpointLocation", "/tmp/checkpoint-{0}".format(session_id))\
.start(delta_table_path)
var streamingDf = spark.ReadStream().Format("rate").Load();
var stream = streamingDf.SelectExpr("value as id").WriteStream().Format("delta").Option("checkpointLocation", $"/tmp/checkpoint-{sessionId}").Start(deltaTablePath);
val streamingDf = spark.readStream.format("rate").load()
val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", s"/tmp/checkpoint-$sessionId").start(deltaTablePath)
Een stroom wijzigingen uit een tabel lezen
Terwijl de stream gegevens naar de Delta Lake-tabel schrijft, kunt u gegevens uit die tabel lezen als een streamingbron. U kunt bijvoorbeeld nog een streamingquery's starten waarmee alle wijzigingen van de Delta Lake-tabel worden afgedrukt.
delta_table.toDF().sort(col("id").desc()).show(100)
deltaTable.ToDF().Sort(Col("id").Desc()).Show(100);
deltaTable.toDF.sort($"id".desc).show
Resulteert in:
Id |
---|
19 |
18 |
17 |
16 |
15 |
14 |
13 |
12 |
11 |
10 |
8 |
6 |
4 |
3 |
2 |
1 |
0 |
-1 |
-1 |
-1 |
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(20, 1000, False)
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(20, 1000, false);
deltaTable.history.show
Resulteert in:
versie | tijdstempel | bewerking | operationParameters | readVersion |
---|---|---|---|---|
5 | 25-04-2020 00:37:09 | STREAMINGUPDATE | [outputMode -> Toevoegen, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] | 4 |
4 | 25-04-2020 00:36:27 | SAMENVOEGEN | [predicaat -> (oldData.id = newData.id )] |
3 |
3 | 25-04-2020 00:36:08 | DELETE | [predicaat -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT)"]] |
2 |
2 | 25-04-2020 00:35:51 | UPDATE | [predicaat -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] | 1 |
1 | 25-04-2020 00:35:05 | SCHRIJVEN | [modus -> Overschrijven, partitionBy -> []] | 0 |
0 | 25-04-2020 00:34:34 | SCHRIJVEN | [modus -> ErrorIfExists, partitionBy -> []] | null |
Hier worden enkele van de minder interessante kolommen verwijderd om de weergave van de geschiedenisweergave te vereenvoudigen.
stream.stop()
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(100, 1000, False)
stream.Stop();
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(100, 1000, false);
stream.stop
deltaTable.history.show
Resulteert in:
versie | tijdstempel | bewerking | operationParameters | readVersion |
---|---|---|---|---|
5 | 25-04-2020 00:37:09 | STREAMINGUPDATE | [outputMode -> Toevoegen, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] | 4 |
4 | 25-04-2020 00:36:27 | SAMENVOEGEN | [predicaat -> (oldData.id = newData.id )] |
3 |
3 | 25-04-2020 00:36:08 | DELETE | [predicaat -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT)"]] |
2 |
2 | 25-04-2020 00:35:51 | UPDATE | [predicaat -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] | 1 |
1 | 25-04-2020 00:35:05 | SCHRIJVEN | [modus -> Overschrijven, partitionBy -> []] | 0 |
0 | 25-04-2020 00:34:34 | SCHRIJVEN | [modus -> ErrorIfExists, partitionBy -> []] | null |
Parquet omzetten in Delta
U kunt ter plekke gegevens in de Parquet-indeling omzetten naar Delta.
Hier gaat u testen of de bestaande tabel een delta-indeling heeft of niet.
parquet_path = "/parquet/parquet-table-{0}".format(session_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetPath = $"/parquet/parquet-table-{sessionId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath)
val parquetPath = s"/parquet/parquet-table-$sessionId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)
Resulteert in:
Niet waar
Nu gaat u de gegevens converteren naar de delta-indeling en controleren of het heeft gewerkt.
DeltaTable.convertToDelta(spark, "parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
DeltaTable.ConvertToDelta(spark, $"parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath)
DeltaTable.convertToDelta(spark, s"parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)
Resulteert in:
Waar
SQL-ondersteuning
Delta biedt ondersteuning voor opdrachten van het hulpprogramma voor tabellen via SQL. U kunt SQL gebruiken voor het volgende:
- De geschiedenis van een Delta-tabel ophalen
- Een Delta-tabel isoleren
- Een Parquet-bestand omzetten in een Delta-indeling
spark.sql("DESCRIBE HISTORY delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"DESCRIBE HISTORY delta.`{deltaTablePath}`").Show();
spark.sql(s"DESCRIBE HISTORY delta.`$deltaTablePath`").show()
Resulteert in:
versie | tijdstempel | userId | userName | bewerking | operationParameters | taak | notebook | clusterId | readVersion | isolationLevel | isBlindAppend |
---|---|---|---|---|---|---|---|---|---|---|---|
5 | 25-04-2020 00:37:09 | null | null | STREAMINGUPDATE | [outputMode -> Ap... | null | null | null | 4 | null | true |
4 | 25-04-2020 00:36:27 | null | null | SAMENVOEGEN | [predicaat -> (ol... | null | null | null | 3 | null | false |
3 | 25-04-2020 00:36:08 | null | null | DELETE | [predicaat -> ["(... | null | null | null | 2 | null | false |
2 | 25-04-2020 00:35:51 | null | null | UPDATE | [predicaat -> ((i... | null | null | null | 1 | null | false |
1 | 25-04-2020 00:35:05 | null | null | SCHRIJVEN | [modus -> Overschrijven... | null | null | null | 0 | null | false |
0 | 25-04-2020 00:34:34 | null | null | SCHRIJVEN | [modus -> ErrorIfE... | null | null | null | null | null | true |
spark.sql("VACUUM delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"VACUUM delta.`{deltaTablePath}`").Show();
spark.sql(s"VACUUM delta.`$deltaTablePath`").show()
Resulteert in:
leertraject |
---|
abfss://data@arca... |
Nu gaat u controleren of een tabel geen tabel met een deltanotatie is. Vervolgens converteert u de tabel naar de delta-indeling met behulp van Spark SQL en bevestigt u dat deze correct is geconverteerd.
parquet_id = random.randint(0,1000)
parquet_path = "/parquet/parquet-table-{0}-{1}".format(session_id, parquet_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
spark.sql("CONVERT TO DELTA parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetId = (new Random()).Next(10000000);
var parquetPath = $"/parquet/parquet-table-{sessionId}-{parquetId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath);
spark.Sql($"CONVERT TO DELTA parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath);
val parquetId = scala.util.Random.nextInt(1000)
val parquetPath = s"/parquet/parquet-table-$sessionId-$parquetId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)
spark.sql(s"CONVERT TO DELTA parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)
Resulteert in:
Waar
Zie de pagina met documentatie voor Delta Lake voor de volledige documentatie
Zie Delta Lake-project voor meer informatie.