Übersicht über Linux Foundation Delta Lake

Die ursprüngliche Version dieses Artikels finden Sie hier. Sie wurde aus Gründen der Übersichtlichkeit überarbeitet. Anhand dieses Artikels können Sie sich schnell mit den wichtigsten Features von Delta Lake vertraut machen. In den Codeausschnitten dieses Artikels wird gezeigt, wie Sie interaktive Abfragen, Batchabfragen und Streamingabfragen verwenden, um Lese- und Schreibvorgänge für Delta Lake-Tabellen auszuführen. Die Codeausschnitte sind auch in einer Reihe von Notebooks für PySpark, Scala und C# verfügbar.

In diesem Artikel wird Folgendes behandelt:

  • Erstellen einer Tabelle
  • Lesen von Daten
  • Aktualisieren von Tabellendaten
  • Überschreiben von Tabellendaten
  • Bedingtes Aktualisieren ohne Überschreiben
  • Lesen älterer Versionen von Daten mittels Zeitreise
  • Schreiben eines Datenstroms in eine Tabelle
  • Lesen eines Datenstrom mit Änderungen aus einer Tabelle
  • SQL-Unterstützung

Konfiguration

Passen Sie Folgendes an Ihre Umgebung an:

import random

session_id = random.randint(0,1000000)
delta_table_path = "/delta/delta-table-{0}".format(session_id)

delta_table_path
var sessionId = (new Random()).Next(10000000);
var deltaTablePath = $"/delta/delta-table-{sessionId}";

deltaTablePath
val sessionId = scala.util.Random.nextInt(1000000)
val deltaTablePath = s"/delta/delta-table-$sessionId";

Ergebnis:

'/delta/delta-table-335323'

Erstellen einer Tabelle

Schreiben Sie zum Erstellen einer Delta Lake-Tabelle einen Datenrahmen auf der Grundlage eines Datenrahmens im Deltaformat. Das Format kann von Parquet, CSV, JSON usw. in Delta geändert werden.

Der folgende Code zeigt, wie Sie mithilfe des von Ihrem Datenrahmen abgeleiteten Schemas eine neue Delta Lake-Tabelle erstellen:

data = spark.range(0,5)
data.show()
data.write.format("delta").save(delta_table_path)
var data = spark.Range(0,5);
data.Show();
data.Write().Format("delta").Save(deltaTablePath);
val data = spark.range(0, 5)
data.show
data.write.format("delta").save(deltaTablePath)

Ergebnis:

id
0
1
2
3
4

Lesen von Daten

Geben Sie zum Lesen von Daten aus Ihrer Delta Lake-Tabelle den Pfad der Dateien und das Deltaformat an:

df = spark.read.format("delta").load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Load(deltaTablePath);
df.Show()
val df = spark.read.format("delta").load(deltaTablePath)
df.show()

Ergebnis:

id
1
3
4
0
2

Die Ergebnisreihenfolge unterscheidet sich von der obigen Reihenfolge, da vor der Ausgabe der Ergebnisse keine explizite Reihenfolge angegeben wurde.

Aktualisieren von Tabellendaten

Delta Lake unterstützt mehrere Vorgänge zum Ändern von Tabellen mithilfe standardmäßiger Datenrahmen-APIs. Diese Vorgänge sind eine der Verbesserungen, die das Deltaformat mit sich bringt. Im folgenden Beispiel wird ein Batchauftrag ausgeführt, um die Daten in der Tabelle zu überschreiben:

data = spark.range(5,10)
data.write.format("delta").mode("overwrite").save(delta_table_path)
df.show()
var data = spark.Range(5,10);
data.Write().Format("delta").Mode("overwrite").Save(deltaTablePath);
df.Show();
val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save(deltaTablePath)
df.show()

Ergebnis:

id
7
8
5
9
6

Wie Sie sehen, wurden alle fünf Datensätze mit neuen Werten aktualisiert.

Speichern als Katalogtabellen

Von Delta Lake kann in verwaltete oder externe Katalogtabellen geschrieben werden:

data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql("CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{0}'".format(delta_table_path))
spark.sql("SHOW TABLES").show()
data.Write().Format("delta").SaveAsTable("ManagedDeltaTable");
spark.Sql($"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{deltaTablePath}'");
spark.Sql("SHOW TABLES").Show();
data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql(s"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '$deltaTablePath'")
spark.sql("SHOW TABLES").show

Ergebnis:

database tableName isTemporary
default externaldeltatable false
default manageddeltatable false

Mithilfe dieses Codes wurde auf der Grundlage eines vorhandenen Datenrahmens eine neue Tabelle im Katalog erstellt. Diese wird als verwaltete Tabelle bezeichnet. Anschließend wurde eine neue externe Tabelle im Katalog definiert, die einen vorhandenen Speicherort verwendet. Diese wird als externe Tabelle bezeichnet. In der Ausgabe sehen Sie, dass beide Tabellen im Katalog aufgeführt werden – unabhängig davon, wie sie erstellt wurden.

Nun können Sie sich die erweiterten Eigenschaften der beiden Tabellen ansehen:

spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ManagedDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=false)

Ergebnis:

col_name data_type comment
id BIGINT NULL
Ausführliche Tabelleninformationen
Datenbank default
Tabelle manageddeltatable
Besitzer trusted-service-user
Erstellungszeit Sat Apr 25 00:35:34 UTC 2020
Letzter Zugriff Thu Jan 01 00:00:00 UTC 1970
Erstellt von Spark 2.4.4.2.6.99.201-11401300
type MANAGED
Anbieter delta
Tabelleneigenschaften [transient_lastDdlTime=1587774934]
Statistik 2\.407 Bytes
Standort abfss://data@<data lake>.dfs.core.windows.net/synapse/workspaces/<Name des Arbeitsbereichs>/warehouse/manageddeltatable
SerDe-Bibliothek org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Speichereigenschaften [serialization.format=1]
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ExternalDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=false)

Ergebnis:

col_name data_type comment
id BIGINT NULL
Ausführliche Tabelleninformationen
Datenbank default
Tabelle externaldeltatable
Besitzer trusted-service-user
Erstellungszeit Sat Apr 25 00:35:38 UTC 2020
Letzter Zugriff Thu Jan 01 00:00:00 UTC 1970
Erstellt von Spark 2.4.4.2.6.99.201-11401300
type EXTERNAL
Anbieter DELTA
Tabelleneigenschaften [transient_lastDdlTime=1587774938]
Standort abfss://data@<data lake>.dfs.core.windows.net/delta/delta-table-587152
SerDe-Bibliothek org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Speichereigenschaften [serialization.format=1]

Bedingtes Aktualisieren ohne Überschreiben

Delta Lake bietet programmgesteuerte APIs für die bedingte Aktualisierung, Löschung und Zusammenführung von Tabellendaten. (Dieser Befehl wird häufig als Upsert bezeichnet.)

from delta.tables import *
from pyspark.sql.functions import *

delta_table = DeltaTable.forPath(spark, delta_table_path)

delta_table.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })
delta_table.toDF().show()
using Microsoft.Spark.Extensions.Delta;
using Microsoft.Spark.Extensions.Delta.Tables;
using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions;

var deltaTable = DeltaTable.ForPath(deltaTablePath);

deltaTable.Update(
  condition: Expr("id % 2 == 0"),
  set: new Dictionary<string, Column>(){{ "id", Expr("id + 100") }});
deltaTable.ToDF().Show();
import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forPath(deltaTablePath)

// Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = Map("id" -> expr("id + 100")))
deltaTable.toDF.show

Ergebnis:

id
106
108
5
7
9

Hier wurde einfach 100 zu jeder geraden ID addiert.

delta_table.delete("id % 2 == 0")
delta_table.toDF().show()
deltaTable.Delete(condition: Expr("id % 2 == 0"));
deltaTable.ToDF().Show();
deltaTable.delete(condition = expr("id % 2 == 0"))
deltaTable.toDF.show

Ergebnis:

id
5
7
9

Wie Sie sehen, wurde jede Zeile mit einem geraden Wert gelöscht.

new_data = spark.range(0,20).alias("newData")

delta_table.alias("oldData")\
    .merge(new_data.alias("newData"), "oldData.id = newData.id")\
    .whenMatchedUpdate(set = { "id": lit("-1")})\
    .whenNotMatchedInsert(values = { "id": col("newData.id") })\
    .execute()

delta_table.toDF().show(100)
var newData = spark.Range(20).As("newData");

deltaTable
    .As("oldData")
    .Merge(newData, "oldData.id = newData.id")
    .WhenMatched()
        .Update(new Dictionary<string, Column>() {{"id", Lit("-1")}})
    .WhenNotMatched()
        .Insert(new Dictionary<string, Column>() {{"id", Col("newData.id")}})
    .Execute();

deltaTable.ToDF().Show(100);
val newData = spark.range(0, 20).toDF

deltaTable.as("oldData").
  merge(
    newData.as("newData"),
    "oldData.id = newData.id").
  whenMatched.
  update(Map("id" -> lit(-1))).
  whenNotMatched.
  insert(Map("id" -> col("newData.id"))).
  execute()

deltaTable.toDF.show()

Ergebnis:

id
18
15
19
2
1
6
8
3
-1
10
13
0
16
4
-1
12
11
14
-1
17

Hier sehen Sie eine Kombination der vorhandenen Daten. Den vorhandenen Daten wurde im update-Codepfad (WhenMatched) der Wert „-1“ zugewiesen. Die neuen Daten, die am Anfang des Codeausschnitts erstellt und über den insert-Codepfad (WhenNotMatched) hinzugefügt wurden, wurden ebenfalls hinzugefügt.

Verlauf

Mit Delta Lake haben Sie die Möglichkeit, sich den Verlauf einer Tabelle anzusehen. Sprich: Sie können sich die Änderungen ansehen, die an der zugrunde liegenden Deltatabelle vorgenommen wurden. Die folgende Zelle zeigt, wie einfach sich der Verlauf untersuchen lässt:

delta_table.history().show(20, 1000, False)
deltaTable.History().Show(20, 1000, false);
deltaTable.history.show(false)

Ergebnis:

version timestamp userId userName operation operationParameters Auftrag Notebook clusterId readVersion isolationLevel isBlindAppend
4 2020-04-25 00:36:27 NULL NULL MERGE [predicate -> (oldData.ID = newData.ID)] NULL NULL NULL 3 NULL false
3 2020-04-25 00:36:08 NULL NULL Delete [predicate -> ["((ID % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] NULL NULL NULL 2 NULL false
2 2020-04-25 00:35:51 NULL NULL UPDATE [predicate -> ((ID#744L % cast(2 as bigint)) = cast(0 as bigint))] NULL NULL NULL 1 NULL false
1 2020-04-25 00:35:05 NULL NULL WRITE [mode -> Overwrite, partitionBy -> []] NULL NULL NULL 0 NULL false
0 2020-04-25 00:34:34 NULL NULL WRITE [mode -> ErrorIfExists, partitionBy -> []] NULL NULL NULL NULL NULL true

Hier sehen Sie alle Änderungen, die über die obigen Codeausschnitte vorgenommen wurden.

Lesen älterer Versionen von Daten mittels Zeitreise

Mithilfe des Zeitreise-Features können Sie ältere Momentaufnahmen Ihrer Delta Lake-Tabelle abfragen. Wenn Sie auf die überschriebenen Daten zugreifen möchten, können Sie mithilfe der Option „versionAsOf“ eine Momentaufnahme der Tabelle abfragen, die erstellt wurde, bevor die ersten Daten überschrieben wurden.

Nach dem Ausführen der folgenden Zelle sollten die ersten Daten vor der Überschreibung angezeigt werden. Die Zeitreise ist ein praktisches Feature, das sich das Delta Lake-Transaktionsprotokoll zunutze macht, um auf Daten zuzugreifen, die sich nicht mehr in der Tabelle befinden. Wenn Sie die Option für die Version 0 entfernen (oder die Version 1 angeben), werden wieder die neueren Daten angezeigt. Weitere Informationen finden Sie unter Abfragen einer älteren Momentaufnahme einer Tabelle (Zeitreise).

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Option("versionAsOf", 0).Load(deltaTablePath);
df.Show();
val df = spark.read.format("delta").option("versionAsOf", 0).load(deltaTablePath)
df.show()

Ergebnis:

id
0
1
4
3
2

Hier sehen Sie wieder die früheste Version der Daten.

Schreiben eines Datenstroms in eine Tabelle

Schreibvorgänge für eine Delta Lake-Tabelle können auch mithilfe des strukturierten Streamings von Spark durchgeführt werden. Durch das Delta Lake-Transaktionsprotokoll wird eine Exactly Once-Verarbeitung garantiert, auch wenn parallel andere Datenströme oder Batchabfragen für die Tabelle aktiv sind. Datenströme werden standardmäßig im Anfügemodus ausgeführt, wodurch der Tabelle neue Datensätze hinzugefügt werden.

Weitere Informationen zur Delta Lake-Integration für strukturiertes Streaming finden Sie unter Tabelle: Streaming für Lese- und Schreibvorgänge.

In den folgenden Zellen passiert Folgendes:

  • Zelle 30: Anzeigen der neu angefügten Daten
  • Zelle 31: Untersuchen des Verlaufs
  • Zelle 32: Beenden des Auftrags für strukturiertes Streaming
  • Zelle 33: Untersuchen des Verlaufs < – Sie werden sehen, dass das Anfügen beendet wurde.

Richten Sie zunächst einen einfachen Spark-Streamingauftrag ein, um eine Sequenz zu generieren und dafür zu sorgen, dass durch den Auftrag in die Deltatabelle geschrieben wird.

streaming_df = spark.readStream.format("rate").load()
stream = streaming_df\
    .selectExpr("value as id")\
    .writeStream\
    .format("delta")\
    .option("checkpointLocation", "/tmp/checkpoint-{0}".format(session_id))\
    .start(delta_table_path)
var streamingDf = spark.ReadStream().Format("rate").Load();
var stream = streamingDf.SelectExpr("value as id").WriteStream().Format("delta").Option("checkpointLocation", $"/tmp/checkpoint-{sessionId}").Start(deltaTablePath);
val streamingDf = spark.readStream.format("rate").load()
val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", s"/tmp/checkpoint-$sessionId").start(deltaTablePath)

Lesen eines Datenstrom mit Änderungen aus einer Tabelle

Während der Datenstrom in die Delta Lake-Tabelle schreibt, können Sie diese Tabelle auch als Streamingquelle verwenden und daraus lesen. So können Sie beispielsweise eine andere Streamingabfrage starten, die alle an der Delta Lake-Tabelle vorgenommenen Änderungen ausgibt.

delta_table.toDF().sort(col("id").desc()).show(100)
deltaTable.ToDF().Sort(Col("id").Desc()).Show(100);
deltaTable.toDF.sort($"id".desc).show

Ergebnis:

id
19
18
17
16
15
14
13
12
11
10
8
6
4
3
2
1
0
-1
-1
-1
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(20, 1000, False)
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(20, 1000, false);
deltaTable.history.show

Ergebnis:

version timestamp operation operationParameters readVersion
5 2020-04-25 00:37:09 STREAMING UPDATE [outputMode -> Append, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] 4
4 2020-04-25 00:36:27 MERGE [predicate -> (oldData.id = newData.id)] 3
3 2020-04-25 00:36:08 Delete [predicate -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] 2
2 2020-04-25 00:35:51 UPDATE [predicate -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] 1
1 2020-04-25 00:35:05 WRITE [mode -> Overwrite, partitionBy -> []] 0
0 2020-04-25 00:34:34 WRITE [mode -> ErrorIfExists, partitionBy -> []] NULL

Hier werden einige der weniger interessanten Spalten gelöscht, um die Verlaufsansicht übersichtlicher zu machen:

stream.stop()
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(100, 1000, False)
stream.Stop();
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(100, 1000, false);
stream.stop
deltaTable.history.show

Ergebnis:

version timestamp operation operationParameters readVersion
5 2020-04-25 00:37:09 STREAMING UPDATE [outputMode -> Append, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] 4
4 2020-04-25 00:36:27 MERGE [predicate -> (oldData.id = newData.id)] 3
3 2020-04-25 00:36:08 Delete [predicate -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] 2
2 2020-04-25 00:35:51 UPDATE [predicate -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] 1
1 2020-04-25 00:35:05 WRITE [mode -> Overwrite, partitionBy -> []] 0
0 2020-04-25 00:34:34 WRITE [mode -> ErrorIfExists, partitionBy -> []] NULL

Konvertieren von Parquet in Delta

Daten können direkt vom Parquet-Format in das Deltaformat konvertiert werden.

Hier wird getestet, ob die vorhandene Tabelle im Deltaformat vorliegt:

parquet_path = "/parquet/parquet-table-{0}".format(session_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetPath = $"/parquet/parquet-table-{sessionId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath)
val parquetPath = s"/parquet/parquet-table-$sessionId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)

Ergebnis:

Falsch

Hier werden Daten in das Deltaformat konvertiert, und es wird überprüft, ob der Vorgang erfolgreich war:

DeltaTable.convertToDelta(spark, "parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
DeltaTable.ConvertToDelta(spark, $"parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath)
DeltaTable.convertToDelta(spark, s"parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)

Ergebnis:

Richtig

SQL-Unterstützung

Von Delta werden tabellenbezogene Hilfsprogrammbefehle über SQL unterstützt. SQL kann für Folgendes verwendet werden:

  • Abrufen des Verlaufs einer Deltatabelle
  • Bereinigen einer Deltatabelle
  • Konvertieren einer Parquet-Datei in Delta
spark.sql("DESCRIBE HISTORY delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"DESCRIBE HISTORY delta.`{deltaTablePath}`").Show();
spark.sql(s"DESCRIBE HISTORY delta.`$deltaTablePath`").show()

Ergebnis:

version timestamp userId userName operation operationParameters Auftrag Notebook clusterId readVersion isolationLevel isBlindAppend
5 2020-04-25 00:37:09 NULL NULL STREAMING UPDATE [outputMode > Ap... NULL NULL NULL 4 NULL true
4 2020-04-25 00:36:27 NULL NULL MERGE [predicate > (ol... NULL NULL NULL 3 NULL false
3 2020-04-25 00:36:08 NULL NULL Delete [predicate > ["(... NULL NULL NULL 2 NULL false
2 2020-04-25 00:35:51 NULL NULL UPDATE [predicate > ((i... NULL NULL NULL 1 NULL false
1 2020-04-25 00:35:05 NULL NULL WRITE [mode > Overwrit... NULL NULL NULL 0 NULL false
0 2020-04-25 00:34:34 NULL NULL WRITE [mode > ErrorIfE... NULL NULL NULL NULL NULL true
spark.sql("VACUUM delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"VACUUM delta.`{deltaTablePath}`").Show();
spark.sql(s"VACUUM delta.`$deltaTablePath`").show()

Ergebnis:

path
abfss://data@arca...

Jetzt überprüfen Sie die Tabelle, um sicherzugehen, dass sie nicht im Deltaformat vorliegt. Anschließend konvertieren Sie sie mithilfe von Spark SQL in das Deltaformat, und es wird überprüft, ob sie ordnungsgemäß konvertiert wurde:

parquet_id = random.randint(0,1000)
parquet_path = "/parquet/parquet-table-{0}-{1}".format(session_id, parquet_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
spark.sql("CONVERT TO DELTA parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetId =  (new Random()).Next(10000000);
var parquetPath = $"/parquet/parquet-table-{sessionId}-{parquetId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath);
spark.Sql($"CONVERT TO DELTA parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath);
val parquetId = scala.util.Random.nextInt(1000)
val parquetPath = s"/parquet/parquet-table-$sessionId-$parquetId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)
spark.sql(s"CONVERT TO DELTA parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)

Ergebnis:

Richtig

Eine vollständige Dokumentation finden Sie auf der Delta Lake-Dokumentationsseite.

Weitere Informationen finden Sie unter Delta Lake-Projekt.

Nächste Schritte