Delen via


Overzicht van Linux Foundation Delta Lake

De oorspronkelijke inhoud (hier) van dit artikel is aangepast voor meer duidelijkheid. Gebruik dit artikel om snel de belangrijkste functies van Delta Lake te verkennen. In het artikel vindt u codefragmenten die aangeven hoe u gegevens uit Delta Lake-tabellen leest en hoe u hier gegevens heen schrijft op basis van query's over interactiviteit, batches en streaming. De codefragmenten zijn ook beschikbaar in een aantal notebooks (PySpark hier, Scala hier en C# hier)

Het volgende wordt behandeld:

  • Een tabel maken
  • Gegevens lezen
  • Tabelgegevens bijwerken
  • Tabelgegevens overschrijven
  • Voorwaardelijke update zonder overschrijven
  • Oudere gegevensversies lezen met behulp van Time Travel
  • Een gegevensstroom naar een tabel schrijven
  • Een stroom wijzigingen uit een tabel lezen
  • SQL-ondersteuning

Configuratie

Zorg ervoor dat u het onderstaande aanpast voor zover dat in uw omgeving past.

import random

session_id = random.randint(0,1000000)
delta_table_path = "/delta/delta-table-{0}".format(session_id)

delta_table_path
var sessionId = (new Random()).Next(10000000);
var deltaTablePath = $"/delta/delta-table-{sessionId}";

deltaTablePath
val sessionId = scala.util.Random.nextInt(1000000)
val deltaTablePath = s"/delta/delta-table-$sessionId";

Resulteert in:

'/delta/delta-table-335323'

Een tabel maken

Als u een Delta Lake-tabel wilt schrijven, schrijft u een DataFrame op basis van een DataFrame in de Delta-indeling. U kunt de indeling van Parquet, CSV, JSON, enzovoort, wijzigen in Delta.

In de volgende code ziet u hoe u een nieuwe Delta Lake-tabel maakt met behulp van het schema dat uit uw DataFrame is afgeleid.

data = spark.range(0,5)
data.show()
data.write.format("delta").save(delta_table_path)
var data = spark.Range(0,5);
data.Show();
data.Write().Format("delta").Save(deltaTablePath);
val data = spark.range(0, 5)
data.show
data.write.format("delta").save(deltaTablePath)

Resulteert in:

Id
0
1
2
3
4

Gegevens lezen

U leest gegevens in uw Delta Lake-tabel door het pad naar de bestanden en de Delta-indeling op te geven.

df = spark.read.format("delta").load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Load(deltaTablePath);
df.Show()
val df = spark.read.format("delta").load(deltaTablePath)
df.show()

Resulteert in:

Id
1
3
4
0
2

De volgorde waarin de resultaten worden opgegeven, is anders dan hierboven staat aangezien er geen expliciete volgorde is opgegeven voordat de resultaten zijn uitgevoerd.

Tabelgegevens bijwerken

Delta Lake ondersteunt verschillende bewerkingen voor het wijzigen van tabellen met behulp van standaard DataFrame-API's. Deze bewerkingen zijn een van de verbeteringen die de Delta-indeling toevoegt. In het volgende voorbeeld wordt een batchtaak uitgevoerd om de gegevens in de tabel te overschrijven.

data = spark.range(5,10)
data.write.format("delta").mode("overwrite").save(delta_table_path)
df.show()
var data = spark.Range(5,10);
data.Write().Format("delta").Mode("overwrite").Save(deltaTablePath);
df.Show();
val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save(deltaTablePath)
df.show()

Resulteert in:

Id
7
8
5
9
6

Hier ziet u dat alle vijf records zijn bijgewerkt met nieuwe waarden.

Opslaan als catalogustabellen

Delta Lake kan schrijven naar beheerde of externe catalogustabellen.

data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql("CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{0}'".format(delta_table_path))
spark.sql("SHOW TABLES").show()
data.Write().Format("delta").SaveAsTable("ManagedDeltaTable");
spark.Sql($"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{deltaTablePath}'");
spark.Sql("SHOW TABLES").Show();
data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql(s"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '$deltaTablePath'")
spark.sql("SHOW TABLES").show

Resulteert in:

database tableName isTemporary
standaardinstelling externaldeltatable false
standaardinstelling manageddeltatable false

Met deze code hebt u een nieuwe tabel in de catalogus gemaakt op basis van een bestaand DataFrame. Deze tabel wordt een beheerde tabel genoemd. Vervolgens hebt u een nieuwe externe tabel in de catalogus gedefinieerd waarvoor een bestaande locatie wordt gebruikt. Dit wordt een externe tabel genoemd. In de uitvoer ziet u dat beide tabellen, ongeacht de manier waarop ze zijn gemaakt, in de catalogus worden vermeld.

Bekijk nu de uitgebreide eigenschappen van beide tabellen

spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ManagedDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=false)

Resulteert in:

col_name data_type comment
id bigint null
Gedetailleerde tabelgegevens
Database standaardinstelling
Tabel manageddeltatable
Eigenaar trusted-service-user
Gemaakt om za 25 apr 00:35:34 UTC 2020
Laatste toegang do 1 jan 00:00:00 UTC 1970
Created By Spark 2.4.4.2.6.99.201-11401300
Type BEHEERD
Provider delta
Tabeleigenschappen [transient_lastDdlTime=1587774934]
statistieken 2407 bytes
Locatie abfss://data@<data lake.dfs.core.windows.net/synapse/workspaces/>< workspace name>/warehouse/manageddeltatable
Serde-bibliotheek org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Opslageigenschappen [serialization.format=1]
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ExternalDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=false)

Resulteert in:

col_name data_type comment
id bigint null
Gedetailleerde tabelgegevens
Database standaardinstelling
Tabel externaldeltatable
Eigenaar trusted-service-user
Gemaakt om za 25 apr 00:35:38 UTC 2020
Laatste toegang do 1 jan 00:00:00 UTC 1970
Created By Spark 2.4.4.2.6.99.201-11401300
Type EXTERN
Provider DELTA
Tabeleigenschappen [transient_lastDdlTime=1587774938]
Locatie abfss://data@<data lake.dfs.core.windows.net/delta/delta-table-587152>
Serde-bibliotheek org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Opslageigenschappen [serialization.format=1]

Voorwaardelijke update zonder overschrijven

Delta Lake biedt programmatische API's voor het voorwaardelijk bijwerken, verwijderen en samenvoegen van gegevens (deze opdracht wordt vaak een upsert genoemd) in tabellen.

from delta.tables import *
from pyspark.sql.functions import *

delta_table = DeltaTable.forPath(spark, delta_table_path)

delta_table.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })
delta_table.toDF().show()
using Microsoft.Spark.Extensions.Delta;
using Microsoft.Spark.Extensions.Delta.Tables;
using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions;

var deltaTable = DeltaTable.ForPath(deltaTablePath);

deltaTable.Update(
  condition: Expr("id % 2 == 0"),
  set: new Dictionary<string, Column>(){{ "id", Expr("id + 100") }});
deltaTable.ToDF().Show();
import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forPath(deltaTablePath)

// Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = Map("id" -> expr("id + 100")))
deltaTable.toDF.show

Resulteert in:

Id
106
108
5
7
9

Hier hebt u 100 toegevoegd aan elke even id.

delta_table.delete("id % 2 == 0")
delta_table.toDF().show()
deltaTable.Delete(condition: Expr("id % 2 == 0"));
deltaTable.ToDF().Show();
deltaTable.delete(condition = expr("id % 2 == 0"))
deltaTable.toDF.show

Resulteert in:

Id
5
7
9

U ziet dat elke even rij is verwijderd.

new_data = spark.range(0,20).alias("newData")

delta_table.alias("oldData")\
    .merge(new_data.alias("newData"), "oldData.id = newData.id")\
    .whenMatchedUpdate(set = { "id": lit("-1")})\
    .whenNotMatchedInsert(values = { "id": col("newData.id") })\
    .execute()

delta_table.toDF().show(100)
var newData = spark.Range(20).As("newData");

deltaTable
    .As("oldData")
    .Merge(newData, "oldData.id = newData.id")
    .WhenMatched()
        .Update(new Dictionary<string, Column>() {{"id", Lit("-1")}})
    .WhenNotMatched()
        .Insert(new Dictionary<string, Column>() {{"id", Col("newData.id")}})
    .Execute();

deltaTable.ToDF().Show(100);
val newData = spark.range(0, 20).toDF

deltaTable.as("oldData").
  merge(
    newData.as("newData"),
    "oldData.id = newData.id").
  whenMatched.
  update(Map("id" -> lit(-1))).
  whenNotMatched.
  insert(Map("id" -> col("newData.id"))).
  execute()

deltaTable.toDF.show()

Resulteert in:

Id
18
15
19
2
1
6
8
3
-1
10
13
0
16
4
-1
12
11
14
-1
17

Hier hebt u een combinatie van de bestaande gegevens. Aan de bestaande gegevens is een waarde van -1 toegewezen in het codepad update(WhenMatched). Die nieuwe gegevens die boven op het codefragment zijn gemaakt en die via het invoegcodepad (WhenNotMatched) zijn toegevoegd, zijn ook toegevoegd.

Geschiedenis

Delta Lake biedt de mogelijkheid om de geschiedenis van een tabel weer te geven. Dat wil zeggen: de wijzigingen die aan de onderliggende Delta-tabel zijn gemaakt. In de onderstaande cel ziet u hoe eenvoudig het is om de geschiedenis te controleren.

delta_table.history().show(20, 1000, False)
deltaTable.History().Show(20, 1000, false);
deltaTable.history.show(false)

Resulteert in:

versie tijdstempel userId userName bewerking operationParameters taak notebook clusterId readVersion isolationLevel isBlindAppend
4 25-04-2020 00:36:27 null null SAMENVOEGEN [predicaat -> (oldData.ID = newData.ID)] null null null 3 null false
3 25-04-2020 00:36:08 null null DELETE [predicaat -> ["((ID % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT)"]] null null null 2 null false
2 25-04-2020 00:35:51 null null UPDATE [predicaat -> ((ID#744L % cast(2 as bigint)) = cast(0 as bigint))] null null null 1 null false
1 25-04-2020 00:35:05 null null SCHRIJVEN [modus -> Overschrijven, partitionBy -> []] null null null 0 null false
0 25-04-2020 00:34:34 null null SCHRIJVEN [modus -> ErrorIfExists, partitionBy -> []] null null null null null true

Hier ziet u alle aanpassingen die op de bovenstaande codefragmenten zijn aangebracht.

Oudere gegevensversies lezen met behulp van Time Travel

U kunt met behulp van de functie Time Travel een query uitvoeren op vorige momentopnamen van uw Delta Lake-tabel. Als u toegang wilt krijgen tot de gegevens die u hebt overschreven, kunt u een query uitvoeren op een momentopname van de tabel voordat u de eerste gegevensset hebt overschreven met behulp van de optie versionAsOf.

Zodra u de onderstaande cel uitvoert, ziet u als het goed is de eerste gegevensset van vóór u deze hebt overschreven. Time Travel is een krachtige functie die gebruikmaakt van de kracht van het Delta Lake-transactielogboek om toegang te krijgen tot gegevens die niet meer in de tabel aanwezig zijn. Als u de versie 0-optie verwijdert (of versie 1 opgeeft), ziet u weer de nieuwere gegevens. Zie Query uitvoeren op een oudere momentopname van een tabel voor meer informatie.

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Option("versionAsOf", 0).Load(deltaTablePath);
df.Show();
val df = spark.read.format("delta").option("versionAsOf", 0).load(deltaTablePath)
df.show()

Resulteert in:

Id
0
1
4
3
2

Hier ziet u dat u terug bent gegaan naar de vroegste versie van de gegevens.

Een gegevensstroom naar een tabel schrijven

U kunt ook gegevens naar een Delta Lake-tabel schrijven met behulp van Gestructureerd streamen van Spark. Het Delta Lake-transactielogboek garandeert exact één verwerking, zelfs als er geen andere stromen of batchquery's gelijktijdig worden uitgevoerd op de tabel. Standaard worden stromen uitgevoerd in de toevoegmodus, waarmee nieuwe records aan de tabel worden toegevoegd.

Zie Lees- en schrijfbewerkingen in tabelstreaming voor meer informatie over Delta Lake-integratie met Gestructureerd streamen.

Bekijk bij de onderstaande cellen wat we aan het doen zijn:

  • Cel 30: de zojuist toegevoegde gegevens weergeven
  • Cel 31: de geschiedenis inspecteren
  • Cel 32: de gestructureerde streamingtaak stoppen
  • Cel 33 Geschiedenis controleren <- U ziet dat toevoegingen zijn gestopt

Eerst gaat u een eenvoudige Spark Streaming-taak instellen om een reeks te genereren en de taak naar uw Delta-tabel te laten schrijven.

streaming_df = spark.readStream.format("rate").load()
stream = streaming_df\
    .selectExpr("value as id")\
    .writeStream\
    .format("delta")\
    .option("checkpointLocation", "/tmp/checkpoint-{0}".format(session_id))\
    .start(delta_table_path)
var streamingDf = spark.ReadStream().Format("rate").Load();
var stream = streamingDf.SelectExpr("value as id").WriteStream().Format("delta").Option("checkpointLocation", $"/tmp/checkpoint-{sessionId}").Start(deltaTablePath);
val streamingDf = spark.readStream.format("rate").load()
val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", s"/tmp/checkpoint-$sessionId").start(deltaTablePath)

Een stroom wijzigingen uit een tabel lezen

Terwijl de stream gegevens naar de Delta Lake-tabel schrijft, kunt u gegevens uit die tabel lezen als een streamingbron. U kunt bijvoorbeeld nog een streamingquery's starten waarmee alle wijzigingen van de Delta Lake-tabel worden afgedrukt.

delta_table.toDF().sort(col("id").desc()).show(100)
deltaTable.ToDF().Sort(Col("id").Desc()).Show(100);
deltaTable.toDF.sort($"id".desc).show

Resulteert in:

Id
19
18
17
16
15
14
13
12
11
10
8
6
4
3
2
1
0
-1
-1
-1
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(20, 1000, False)
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(20, 1000, false);
deltaTable.history.show

Resulteert in:

versie tijdstempel bewerking operationParameters readVersion
5 25-04-2020 00:37:09 STREAMINGUPDATE [outputMode -> Toevoegen, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] 4
4 25-04-2020 00:36:27 SAMENVOEGEN [predicaat -> (oldData.id = newData.id)] 3
3 25-04-2020 00:36:08 DELETE [predicaat -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT)"]] 2
2 25-04-2020 00:35:51 UPDATE [predicaat -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] 1
1 25-04-2020 00:35:05 SCHRIJVEN [modus -> Overschrijven, partitionBy -> []] 0
0 25-04-2020 00:34:34 SCHRIJVEN [modus -> ErrorIfExists, partitionBy -> []] null

Hier worden enkele van de minder interessante kolommen verwijderd om de weergave van de geschiedenisweergave te vereenvoudigen.

stream.stop()
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(100, 1000, False)
stream.Stop();
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(100, 1000, false);
stream.stop
deltaTable.history.show

Resulteert in:

versie tijdstempel bewerking operationParameters readVersion
5 25-04-2020 00:37:09 STREAMINGUPDATE [outputMode -> Toevoegen, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] 4
4 25-04-2020 00:36:27 SAMENVOEGEN [predicaat -> (oldData.id = newData.id)] 3
3 25-04-2020 00:36:08 DELETE [predicaat -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT)"]] 2
2 25-04-2020 00:35:51 UPDATE [predicaat -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] 1
1 25-04-2020 00:35:05 SCHRIJVEN [modus -> Overschrijven, partitionBy -> []] 0
0 25-04-2020 00:34:34 SCHRIJVEN [modus -> ErrorIfExists, partitionBy -> []] null

Parquet omzetten in Delta

U kunt ter plekke gegevens in de Parquet-indeling omzetten naar Delta.

Hier gaat u testen of de bestaande tabel een delta-indeling heeft of niet.

parquet_path = "/parquet/parquet-table-{0}".format(session_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetPath = $"/parquet/parquet-table-{sessionId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath)
val parquetPath = s"/parquet/parquet-table-$sessionId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)

Resulteert in:

Niet waar

Nu gaat u de gegevens converteren naar de delta-indeling en controleren of het heeft gewerkt.

DeltaTable.convertToDelta(spark, "parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
DeltaTable.ConvertToDelta(spark, $"parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath)
DeltaTable.convertToDelta(spark, s"parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)

Resulteert in:

Waar

SQL-ondersteuning

Delta biedt ondersteuning voor opdrachten van het hulpprogramma voor tabellen via SQL. U kunt SQL gebruiken voor het volgende:

  • De geschiedenis van een Delta-tabel ophalen
  • Een Delta-tabel isoleren
  • Een Parquet-bestand omzetten in een Delta-indeling
spark.sql("DESCRIBE HISTORY delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"DESCRIBE HISTORY delta.`{deltaTablePath}`").Show();
spark.sql(s"DESCRIBE HISTORY delta.`$deltaTablePath`").show()

Resulteert in:

versie tijdstempel userId userName bewerking operationParameters taak notebook clusterId readVersion isolationLevel isBlindAppend
5 25-04-2020 00:37:09 null null STREAMINGUPDATE [outputMode -> Ap... null null null 4 null true
4 25-04-2020 00:36:27 null null SAMENVOEGEN [predicaat -> (ol... null null null 3 null false
3 25-04-2020 00:36:08 null null DELETE [predicaat -> ["(... null null null 2 null false
2 25-04-2020 00:35:51 null null UPDATE [predicaat -> ((i... null null null 1 null false
1 25-04-2020 00:35:05 null null SCHRIJVEN [modus -> Overschrijven... null null null 0 null false
0 25-04-2020 00:34:34 null null SCHRIJVEN [modus -> ErrorIfE... null null null null null true
spark.sql("VACUUM delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"VACUUM delta.`{deltaTablePath}`").Show();
spark.sql(s"VACUUM delta.`$deltaTablePath`").show()

Resulteert in:

leertraject
abfss://data@arca...

Nu gaat u controleren of een tabel geen tabel met een deltanotatie is. Vervolgens converteert u de tabel naar de delta-indeling met behulp van Spark SQL en bevestigt u dat deze correct is geconverteerd.

parquet_id = random.randint(0,1000)
parquet_path = "/parquet/parquet-table-{0}-{1}".format(session_id, parquet_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
spark.sql("CONVERT TO DELTA parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetId =  (new Random()).Next(10000000);
var parquetPath = $"/parquet/parquet-table-{sessionId}-{parquetId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath);
spark.Sql($"CONVERT TO DELTA parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath);
val parquetId = scala.util.Random.nextInt(1000)
val parquetPath = s"/parquet/parquet-table-$sessionId-$parquetId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)
spark.sql(s"CONVERT TO DELTA parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)

Resulteert in:

Waar

Zie de pagina met documentatie voor Delta Lake voor de volledige documentatie

Zie Delta Lake-project voor meer informatie.

Volgende stappen