Dela via


Översikt över Linux Foundation Delta Lake

Den här artikeln har anpassats för tydlighetens skull från den ursprungliga motsvarigheten här. Den här artikeln hjälper dig att snabbt utforska de viktigaste funktionerna i Delta Lake. Artikeln innehåller kodfragment som visar hur du läser från och skriver till Delta Lake-tabeller från interaktiva frågor, batchfrågor och strömningsfrågor. Kodfragmenten är också tillgängliga i en uppsättning notebook-filer PySpark här, Scala här och C# här

Här är vad vi kommer att täcka:

  • Skapa en tabell
  • Läsa data
  • Uppdatera tabelldata
  • Skriv över tabelldata
  • Villkorlig uppdatering utan överskrivning
  • Läsa äldre versioner av data med time travel
  • Skriva en dataström till en tabell
  • Läsa en dataström med ändringar från en tabell
  • SQL-stöd

Konfiguration

Se till att du ändrar nedanstående efter behov för din miljö.

import random

session_id = random.randint(0,1000000)
delta_table_path = "/delta/delta-table-{0}".format(session_id)

delta_table_path
var sessionId = (new Random()).Next(10000000);
var deltaTablePath = $"/delta/delta-table-{sessionId}";

deltaTablePath
val sessionId = scala.util.Random.nextInt(1000000)
val deltaTablePath = s"/delta/delta-table-$sessionId";

Resulterar i:

'/delta/delta-table-335323'

Skapa en tabell

Om du vill skapa en Delta Lake-tabell skriver du en DataFrame ut en DataFrame i deltaformat. Du kan ändra formatet från Parquet, CSV, JSON och så vidare till delta.

Koden nedan visar hur du skapar en ny Delta Lake-tabell med hjälp av schemat som härleds från din DataFrame.

data = spark.range(0,5)
data.show()
data.write.format("delta").save(delta_table_path)
var data = spark.Range(0,5);
data.Show();
data.Write().Format("delta").Save(deltaTablePath);
val data = spark.range(0, 5)
data.show
data.write.format("delta").save(deltaTablePath)

Resulterar i:

ID
0
1
2
3
4

Läsa data

Du läser data i Delta Lake-tabellen genom att ange sökvägen till filerna och deltaformatet.

df = spark.read.format("delta").load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Load(deltaTablePath);
df.Show()
val df = spark.read.format("delta").load(deltaTablePath)
df.show()

Resulterar i:

ID
1
3
4
0
2

Resultatordningen skiljer sig från ovan eftersom ingen ordning uttryckligen angavs innan resultatet matas ut.

Uppdatera tabelldata

Delta Lake stöder flera åtgärder för att ändra tabeller med hjälp av standard-API:er för DataFrame. Dessa åtgärder är en av förbättringarna som deltaformatet lägger till. I följande exempel körs ett batchjobb för att skriva över data i tabellen.

data = spark.range(5,10)
data.write.format("delta").mode("overwrite").save(delta_table_path)
df.show()
var data = spark.Range(5,10);
data.Write().Format("delta").Mode("overwrite").Save(deltaTablePath);
df.Show();
val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save(deltaTablePath)
df.show()

Resulterar i:

ID
7
8
5
9
6

Här kan du se att alla fem posterna har uppdaterats för att innehålla nya värden.

Spara som katalogtabeller

Delta Lake kan skriva till hanterade eller externa katalogtabeller.

data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql("CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{0}'".format(delta_table_path))
spark.sql("SHOW TABLES").show()
data.Write().Format("delta").SaveAsTable("ManagedDeltaTable");
spark.Sql($"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{deltaTablePath}'");
spark.Sql("SHOW TABLES").Show();
data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql(s"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '$deltaTablePath'")
spark.sql("SHOW TABLES").show

Resulterar i:

databas tableName isTemporary
standard externaldeltatable falskt
standard manageddeltatable falskt

Med den här koden skapade du en ny tabell i katalogen från en befintlig dataram, som kallas en hanterad tabell. Sedan definierade du en ny extern tabell i katalogen som använder en befintlig plats, som kallas för en extern tabell. I utdata visas båda tabellerna, oavsett hur de skapades, i katalogen.

Nu kan du titta på de utökade egenskaperna för båda dessa tabeller

spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ManagedDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=false)

Resulterar i:

col_name data_type kommentar
id bigint null
Detaljerad tabellinformation
Databas standard
Tabell manageddeltatable
Ägare trusted-service-user
Genereringstid lör 25 apr 00:35:34 UTC 2020
Senaste åtkomst Tor jan 01 00:00:00 UTC 1970
Skapad av Spark 2.4.4.2.6.99.201-11401300
Typ HANTERADE
Leverantör delta
Tabellegenskaper [transient_lastDdlTime=1587774934]
Statistik 2 407 byte
Location abfss://data@<data lake.dfs.core.windows.net/synapse/workspaces/>< arbetsytans namn>/lager/manageddeltatable
Serde-bibliotek org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Lagringsegenskaper [serialization.format=1]
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ExternalDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=false)

Resultat i:

col_name data_type kommentar
id bigint null
Detaljerad tabellinformation
Databas standard
Tabell externaldeltatable
Ägare trusted-service-user
Genereringstid Lör 25 apr 00:35:38 UTC 2020
Senaste åtkomst Tor jan 01 00:00:00 UTC 1970
Skapad av Spark 2.4.4.2.6.99.201-11401300
Typ EXTERNA
Leverantör DELTA
Tabellegenskaper [transient_lastDdlTime=1587774938]
Location abfss://data@<data lake.dfs.core.windows.net/delta/delta-table-587152>
Serde-bibliotek org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Lagringsegenskaper [serialization.format=1]

Villkorsstyrd uppdatering utan att skriva över

Delta Lake tillhandahåller programmatiska API:er för villkorlig uppdatering, borttagning och sammanslagning (det här kommandot kallas ofta för en upsert) data i tabeller.

from delta.tables import *
from pyspark.sql.functions import *

delta_table = DeltaTable.forPath(spark, delta_table_path)

delta_table.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })
delta_table.toDF().show()
using Microsoft.Spark.Extensions.Delta;
using Microsoft.Spark.Extensions.Delta.Tables;
using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions;

var deltaTable = DeltaTable.ForPath(deltaTablePath);

deltaTable.Update(
  condition: Expr("id % 2 == 0"),
  set: new Dictionary<string, Column>(){{ "id", Expr("id + 100") }});
deltaTable.ToDF().Show();
import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forPath(deltaTablePath)

// Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = Map("id" -> expr("id + 100")))
deltaTable.toDF.show

Resultat i:

ID
106
108
5
7
9

Här har du precis lagt till 100 till varje jämnt ID.

delta_table.delete("id % 2 == 0")
delta_table.toDF().show()
deltaTable.Delete(condition: Expr("id % 2 == 0"));
deltaTable.ToDF().Show();
deltaTable.delete(condition = expr("id % 2 == 0"))
deltaTable.toDF.show

Resultat i:

ID
5
7
9

Observera att varje jämn rad har tagits bort.

new_data = spark.range(0,20).alias("newData")

delta_table.alias("oldData")\
    .merge(new_data.alias("newData"), "oldData.id = newData.id")\
    .whenMatchedUpdate(set = { "id": lit("-1")})\
    .whenNotMatchedInsert(values = { "id": col("newData.id") })\
    .execute()

delta_table.toDF().show(100)
var newData = spark.Range(20).As("newData");

deltaTable
    .As("oldData")
    .Merge(newData, "oldData.id = newData.id")
    .WhenMatched()
        .Update(new Dictionary<string, Column>() {{"id", Lit("-1")}})
    .WhenNotMatched()
        .Insert(new Dictionary<string, Column>() {{"id", Col("newData.id")}})
    .Execute();

deltaTable.ToDF().Show(100);
val newData = spark.range(0, 20).toDF

deltaTable.as("oldData").
  merge(
    newData.as("newData"),
    "oldData.id = newData.id").
  whenMatched.
  update(Map("id" -> lit(-1))).
  whenNotMatched.
  insert(Map("id" -> col("newData.id"))).
  execute()

deltaTable.toDF.show()

Resultat i:

ID
18
15
19
2
1
6
8
3
-1
10
13
0
16
4
-1
12
11
14
-1
17

Här har du en kombination av befintliga data. Befintliga data har tilldelats värdet -1 i kodsökvägen update(WhenMatched). De nya data som skapades överst i kodfragmentet och lades till via sökvägen infoga kod (WhenNotMatched) lades också till.

Historik

Delta Lake's har möjlighet att göra det möjligt att titta på historiken för en tabell. Det vill: de ändringar som har gjorts i den underliggande deltatabellen. Cellen nedan visar hur enkelt det är att inspektera historiken.

delta_table.history().show(20, 1000, False)
deltaTable.History().Show(20, 1000, false);
deltaTable.history.show(false)

Resultat i:

version timestamp userId userName operation operationParameters jobb notebook-fil clusterId readVersion isolationLevel isBlindAppend
4 2020-04-25 00:36:27 null null SAMMANFOGA [predikat -> (oldData.ID = newData.ID)] null null null 3 null falskt
3 2020-04-25 00:36:08 null null DELETE [predikat -> ["((ID % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] null null null 2 null falskt
2 2020-04-25 00:35:51 null null UPDATE [predikat -> ((ID#744L % cast(2 as bigint)) = cast(0 as bigint))] null null null 1 null falskt
1 2020-04-25 00:35:05 null null SKRIVA [mode -> Skriv över, partitionBy –> []] null null null 0 null falskt
0 2020-04-25 00:34:34 null null SKRIVA [mode -> ErrorIfExists, partitionBy -> []] null null null null null true

Här kan du se alla ändringar som gjorts i kodfragmenten ovan.

Läsa äldre versioner av data med time travel

Du kan köra frågor mot tidigare ögonblicksbilder av Din Delta Lake-tabell med hjälp av en funktion som kallas Tidsresor. Om du vill komma åt de data som du skrev över kan du fråga en ögonblicksbild av tabellen innan du skriver över den första datauppsättningen med hjälp av alternativet versionAsOf.

När du har kört cellen nedan bör du se den första uppsättningen data från innan du skriver över den. Time Travel är en kraftfull funktion som utnyttjar kraften i Delta Lake-transaktionsloggen för att komma åt data som inte längre finns i tabellen. Om du tar bort alternativet version 0 (eller anger version 1) kan du se nyare data igen. Mer information finns i Fråga en äldre ögonblicksbild av en tabell.

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Option("versionAsOf", 0).Load(deltaTablePath);
df.Show();
val df = spark.read.format("delta").option("versionAsOf", 0).load(deltaTablePath)
df.show()

Resulterar i:

ID
0
1
4
3
2

Här kan du se att du har gått tillbaka till den tidigaste versionen av data.

Skriva en dataström till en tabell

Du kan också skriva till en Delta Lake-tabell med hjälp av Sparks Structured Streaming. Delta Lake-transaktionsloggen garanterar bearbetning exakt en gång, även om det finns andra strömmar eller batchfrågor som körs samtidigt mot tabellen. Som standard körs strömmar i tilläggsläge, vilket lägger till nya poster i tabellen.

Mer information om Delta Lake-integrering med Structured Streaming finns i Läsa och skriva för tabellströmning.

I cellerna nedan gör vi följande:

  • Cell 30 Visa nyligen tillagda data
  • Cell 31 Inspektera historik
  • Cell 32 Stoppa det strukturerade direktuppspelningsjobbet
  • Cell 33 Inspektera historik <– Du kommer att märka att tilläggen har stoppats

Först ska du konfigurera ett enkelt Spark Streaming-jobb för att generera en sekvens och göra jobbet skrivet till deltatabellen.

streaming_df = spark.readStream.format("rate").load()
stream = streaming_df\
    .selectExpr("value as id")\
    .writeStream\
    .format("delta")\
    .option("checkpointLocation", "/tmp/checkpoint-{0}".format(session_id))\
    .start(delta_table_path)
var streamingDf = spark.ReadStream().Format("rate").Load();
var stream = streamingDf.SelectExpr("value as id").WriteStream().Format("delta").Option("checkpointLocation", $"/tmp/checkpoint-{sessionId}").Start(deltaTablePath);
val streamingDf = spark.readStream.format("rate").load()
val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", s"/tmp/checkpoint-$sessionId").start(deltaTablePath)

Läsa en dataström med ändringar från en tabell

När strömmen skrivs till Delta Lake-tabellen kan du också läsa från den tabellen som en strömningskälla. Du kan till exempel starta en annan strömmande fråga som skriver ut alla ändringar som gjorts i Delta Lake-tabellen.

delta_table.toDF().sort(col("id").desc()).show(100)
deltaTable.ToDF().Sort(Col("id").Desc()).Show(100);
deltaTable.toDF.sort($"id".desc).show

Resulterar i:

ID
19
18
17
16
15
14
13
12
11
10
8
6
4
3
2
1
0
-1
-1
-1
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(20, 1000, False)
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(20, 1000, false);
deltaTable.history.show

Resulterar i:

version timestamp operation operationParameters readVersion
5 2020-04-25 00:37:09 DIREKTUPPSPELNINGSUPPDATERING [outputMode –> Tillägg, queryId –> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] 4
4 2020-04-25 00:36:27 SAMMANFOGA [predikat -> (oldData.id = newData.id)] 3
3 2020-04-25 00:36:08 DELETE [predikat -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] 2
2 2020-04-25 00:35:51 UPDATE [predikat -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] 1
1 2020-04-25 00:35:05 SKRIVA [mode -> Skriv över, partitionBy –> []] 0
0 2020-04-25 00:34:34 SKRIVA [mode -> ErrorIfExists, partitionBy -> []] null

Här släpper du några av de mindre intressanta kolumnerna för att förenkla visningsupplevelsen i historikvyn.

stream.stop()
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(100, 1000, False)
stream.Stop();
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(100, 1000, false);
stream.stop
deltaTable.history.show

Resulterar i:

version timestamp operation operationParameters readVersion
5 2020-04-25 00:37:09 DIREKTUPPSPELNINGSUPPDATERING [outputMode –> Tillägg, queryId –> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] 4
4 2020-04-25 00:36:27 SAMMANFOGA [predikat -> (oldData.id = newData.id)] 3
3 2020-04-25 00:36:08 DELETE [predikat -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] 2
2 2020-04-25 00:35:51 UPDATE [predikat -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] 1
1 2020-04-25 00:35:05 SKRIVA [mode -> Skriv över, partitionBy –> []] 0
0 2020-04-25 00:34:34 SKRIVA [mode -> ErrorIfExists, partitionBy -> []] null

Konvertera Parquet till Delta

Du kan göra en konvertering på plats från Parquet-formatet till Delta.

Här ska du testa om den befintliga tabellen är i deltaformat eller inte.

parquet_path = "/parquet/parquet-table-{0}".format(session_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetPath = $"/parquet/parquet-table-{sessionId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath)
val parquetPath = s"/parquet/parquet-table-$sessionId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)

Resulterar i:

Falskt

Nu ska du konvertera data till deltaformat och kontrollera att de fungerade.

DeltaTable.convertToDelta(spark, "parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
DeltaTable.ConvertToDelta(spark, $"parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath)
DeltaTable.convertToDelta(spark, s"parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)

Resulterar i:

Sant

SQL-stöd

Delta stöder tabellverktygskommandon via SQL. Du kan använda SQL för att:

  • Hämta deltatabellens historik
  • Dammsuga en DeltaTable
  • Konvertera en Parquet-fil till Delta
spark.sql("DESCRIBE HISTORY delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"DESCRIBE HISTORY delta.`{deltaTablePath}`").Show();
spark.sql(s"DESCRIBE HISTORY delta.`$deltaTablePath`").show()

Resultat i:

version timestamp userId userName operation operationParameters jobb notebook-fil clusterId readVersion isolationLevel isBlindAppend
5 2020-04-25 00:37:09 null null DIREKTUPPSPELNINGSUPPDATERING [outputMode –> Ap... null null null 4 null true
4 2020-04-25 00:36:27 null null SAMMANFOGA [predikat -> (ol... null null null 3 null falskt
3 2020-04-25 00:36:08 null null DELETE [predikat -> ["(... null null null 2 null falskt
2 2020-04-25 00:35:51 null null UPDATE [predikat -> ((i... null null null 1 null falskt
1 2020-04-25 00:35:05 null null SKRIVA [mode -> Överskrivning... null null null 0 null falskt
0 2020-04-25 00:34:34 null null SKRIVA [mode -> ErrorIfE... null null null null null true
spark.sql("VACUUM delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"VACUUM delta.`{deltaTablePath}`").Show();
spark.sql(s"VACUUM delta.`$deltaTablePath`").show()

Resultat i:

path
abfss://data@arca...

Nu ska du kontrollera att en tabell inte är en deltaformattabell. Sedan konverterar du tabellen till deltaformat med Spark SQL och bekräftar att den har konverterats korrekt.

parquet_id = random.randint(0,1000)
parquet_path = "/parquet/parquet-table-{0}-{1}".format(session_id, parquet_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
spark.sql("CONVERT TO DELTA parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetId =  (new Random()).Next(10000000);
var parquetPath = $"/parquet/parquet-table-{sessionId}-{parquetId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath);
spark.Sql($"CONVERT TO DELTA parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath);
val parquetId = scala.util.Random.nextInt(1000)
val parquetPath = s"/parquet/parquet-table-$sessionId-$parquetId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)
spark.sql(s"CONVERT TO DELTA parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)

Resultat i:

Sant

Fullständig dokumentation finns på Delta Lake-dokumentationssidan

Mer information finns i Delta Lake Project.

Nästa steg