Omówienie usługi Delta Lake programu Linux Foundation

Ten artykuł został dostosowany do większej jasności od oryginalnego odpowiednika tutaj. Ten artykuł ułatwia szybkie zapoznanie się z głównymi funkcjami usługi Delta Lake. Artykuł zawiera fragmenty kodu, które pokazują, jak odczytywać i zapisywać w tabelach usługi Delta Lake z interakcyjnych, wsadowych i przesyłanych strumieniowo zapytań. Fragmenty kodu są również dostępne w zestawie notesów PySpark tutaj, Scala tutaj i C# tutaj

Oto, co omówimy:

  • Tworzenie tabeli
  • Odczyt danych
  • Aktualizowanie danych tabeli
  • Zastępowanie danych tabeli
  • Aktualizacja warunkowa bez zastępowania
  • Odczytywanie starszych wersji danych przy użyciu funkcji Time Travel
  • Zapisywanie strumienia danych w tabeli
  • Odczytywanie strumienia zmian z tabeli
  • Obsługa języka SQL

Konfigurowanie

Upewnij się, że zmodyfikujesz poniższe zależnie od środowiska.

import random

session_id = random.randint(0,1000000)
delta_table_path = "/delta/delta-table-{0}".format(session_id)

delta_table_path
var sessionId = (new Random()).Next(10000000);
var deltaTablePath = $"/delta/delta-table-{sessionId}";

deltaTablePath
val sessionId = scala.util.Random.nextInt(1000000)
val deltaTablePath = s"/delta/delta-table-$sessionId";

Wyniki:

"/delta/delta-table-335323"

Tworzenie tabeli

Aby utworzyć tabelę usługi Delta Lake, napisz ramkę danych w formacie różnicowym. Możesz zmienić format z Parquet, CSV, JSON itd. na różnicę.

Poniższy kod pokazuje, jak utworzyć nową tabelę usługi Delta Lake przy użyciu schematu wnioskowanego z ramki danych.

data = spark.range(0,5)
data.show()
data.write.format("delta").save(delta_table_path)
var data = spark.Range(0,5);
data.Show();
data.Write().Format("delta").Save(deltaTablePath);
val data = spark.range(0, 5)
data.show
data.write.format("delta").save(deltaTablePath)

Wyniki:

ID (Identyfikator)
0
1
2
3
4

Odczyt danych

Dane są odczytywane w tabeli usługi Delta Lake, określając ścieżkę do plików i format różnicowy.

df = spark.read.format("delta").load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Load(deltaTablePath);
df.Show()
val df = spark.read.format("delta").load(deltaTablePath)
df.show()

Wyniki:

ID (Identyfikator)
1
3
4
0
2

Kolejność wyników różni się od powyższego, ponieważ nie określono jawnie kolejności przed wyświetleniem wyników.

Aktualizowanie danych tabeli

Usługa Delta Lake obsługuje kilka operacji modyfikowania tabel przy użyciu standardowych interfejsów API ramki danych. Te operacje są jednym z ulepszeń, które dodaje format różnicowy. Poniższy przykład uruchamia zadanie wsadowe, aby zastąpić dane w tabeli.

data = spark.range(5,10)
data.write.format("delta").mode("overwrite").save(delta_table_path)
df.show()
var data = spark.Range(5,10);
data.Write().Format("delta").Mode("overwrite").Save(deltaTablePath);
df.Show();
val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save(deltaTablePath)
df.show()

Wyniki:

ID (Identyfikator)
7
8
5
9
6

W tym miejscu widać, że wszystkie pięć rekordów zostało zaktualizowanych do przechowywania nowych wartości.

Zapisz jako tabele wykazu

Usługa Delta Lake może zapisywać w tabelach zarządzanych lub zewnętrznych wykazów.

data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql("CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{0}'".format(delta_table_path))
spark.sql("SHOW TABLES").show()
data.Write().Format("delta").SaveAsTable("ManagedDeltaTable");
spark.Sql($"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{deltaTablePath}'");
spark.Sql("SHOW TABLES").Show();
data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql(s"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '$deltaTablePath'")
spark.sql("SHOW TABLES").show

Wyniki:

database tableName Istemporary
default externaldeltatable fałsz
default manageddeltatable fałsz

W tym kodzie utworzono nową tabelę w wykazie z istniejącej ramki danych, nazywanej tabelą zarządzaną. Następnie zdefiniowano nową tabelę zewnętrzną w katalogu, która używa istniejącej lokalizacji, nazywanej tabelą zewnętrzną. W danych wyjściowych można zobaczyć obie tabele, bez względu na sposób ich tworzenia, są wyświetlane w wykazie.

Teraz możesz przyjrzeć się rozszerzonym właściwościom obu tych tabel

spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ManagedDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=false)

Wyniki:

col_name Data_type komentarz
identyfikator bigint null
Szczegółowe informacje o tabeli
baza danych default
Tabela manageddeltatable
Właściciel zaufany użytkownik usługi
Czas utworzenia Sat Apr 25 00:35:34 UTC 2020
Ostatni dostęp Czw 01 stycznia 00:00:00 UTC 1970
Created By Spark 2.4.4.2.6.99.201-11401300
Typ ZARZĄDZANE
Dostawca delta
Właściwości tabeli [transient_lastDdlTime=1587774934]
Statystyki 2407 bajtów
Lokalizacja abfss://data@<data lake.dfs.core.windows.net/synapse/workspaces/>< nazwa> przestrzeni pracy/magazyn/manageddeltatable
Biblioteka Serde org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Właściwości magazynu [serialization.format=1]
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ExternalDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=false)

Wyniki:

col_name Data_type komentarz
identyfikator bigint null
Szczegółowe informacje o tabeli
baza danych default
Tabela externaldeltatable
Właściciel zaufany użytkownik usługi
Czas utworzenia Sat Apr 25 00:35:38 UTC 2020
Ostatni dostęp Czw 01 stycznia 00:00:00 UTC 1970
Created By Spark 2.4.4.2.6.99.201-11401300
Typ ZEWNĘTRZNYCH
Dostawca DELTA
Właściwości tabeli [transient_lastDdlTime=1587774938]
Lokalizacja abfss://data@<data lake.dfs.core.windows.net/delta/delta-table-587152>
Biblioteka Serde org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Właściwości magazynu [serialization.format=1]

Aktualizacja warunkowa bez zastępowania

Usługa Delta Lake udostępnia programowe interfejsy API do aktualizacji warunkowej, usuwania i scalania (to polecenie jest często nazywane danymi upsert) w tabelach.

from delta.tables import *
from pyspark.sql.functions import *

delta_table = DeltaTable.forPath(spark, delta_table_path)

delta_table.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })
delta_table.toDF().show()
using Microsoft.Spark.Extensions.Delta;
using Microsoft.Spark.Extensions.Delta.Tables;
using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions;

var deltaTable = DeltaTable.ForPath(deltaTablePath);

deltaTable.Update(
  condition: Expr("id % 2 == 0"),
  set: new Dictionary<string, Column>(){{ "id", Expr("id + 100") }});
deltaTable.ToDF().Show();
import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forPath(deltaTablePath)

// Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = Map("id" -> expr("id + 100")))
deltaTable.toDF.show

Wyniki:

ID (Identyfikator)
106
108
5
7
9

W tym miejscu właśnie dodano 100 do każdego identyfikatora parzystego.

delta_table.delete("id % 2 == 0")
delta_table.toDF().show()
deltaTable.Delete(condition: Expr("id % 2 == 0"));
deltaTable.ToDF().Show();
deltaTable.delete(condition = expr("id % 2 == 0"))
deltaTable.toDF.show

Wyniki:

ID (Identyfikator)
5
7
9

Zwróć uwagę, że każdy wiersz parzystowy został usunięty.

new_data = spark.range(0,20).alias("newData")

delta_table.alias("oldData")\
    .merge(new_data.alias("newData"), "oldData.id = newData.id")\
    .whenMatchedUpdate(set = { "id": lit("-1")})\
    .whenNotMatchedInsert(values = { "id": col("newData.id") })\
    .execute()

delta_table.toDF().show(100)
var newData = spark.Range(20).As("newData");

deltaTable
    .As("oldData")
    .Merge(newData, "oldData.id = newData.id")
    .WhenMatched()
        .Update(new Dictionary<string, Column>() {{"id", Lit("-1")}})
    .WhenNotMatched()
        .Insert(new Dictionary<string, Column>() {{"id", Col("newData.id")}})
    .Execute();

deltaTable.ToDF().Show(100);
val newData = spark.range(0, 20).toDF

deltaTable.as("oldData").
  merge(
    newData.as("newData"),
    "oldData.id = newData.id").
  whenMatched.
  update(Map("id" -> lit(-1))).
  whenNotMatched.
  insert(Map("id" -> col("newData.id"))).
  execute()

deltaTable.toDF.show()

Wyniki:

ID (Identyfikator)
18
15
19
2
1
6
8
3
-1
10
13
0
16
4
-1
12
11
14
-1
17

W tym miejscu masz kombinację istniejących danych. Istniejące dane zostały przypisane do wartości -1 w ścieżce kodu update(WhenMatched). Dodano również nowe dane utworzone w górnej części fragmentu kodu i zostały dodane za pośrednictwem ścieżki kodu wstawiania (WhenNotMatched).

Historia

Usługa Delta Lake ma możliwość wglądu w historię tabeli. Oznacza to, że zmiany wprowadzone w podstawowej tabeli delty. W poniższej komórce pokazano, jak proste jest sprawdzenie historii.

delta_table.history().show(20, 1000, False)
deltaTable.History().Show(20, 1000, false);
deltaTable.history.show(false)

Wyniki:

Wersja sygnatura czasowa userId userName operation operationParameters zadanie notes clusterId readVersion Isolationlevel isBlindAppend
4 2020-04-25 00:36:27 null null SCALANIA [predykat -> (oldData.ID = newData.ID)] null null null 3 null fałsz
3 2020-04-25 00:36:08 null null DELETE [predykat -> ["((ID % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] null null null 2 null fałsz
2 2020-04-25 00:35:51 null null UPDATE [predykat -> ((ID#744L % cast(2 as bigint)) = cast(0 as bigint))] null null null 1 null fałsz
1 2020-04-25 00:35:05 null null NAPISZ [tryb —> Zastąp, partitionBy —> []] null null null 0 null fałsz
0 2020-04-25 00:34:34 null null NAPISZ [tryb —> ErrorIfExists, partitionBy —> []] null null null null null true

W tym miejscu można zobaczyć wszystkie modyfikacje wprowadzone w powyższych fragmentach kodu.

Odczytywanie starszych wersji danych przy użyciu funkcji Time Travel

Istnieje możliwość wykonywania zapytań względem poprzednich migawek tabeli usługi Delta Lake przy użyciu funkcji o nazwie Time Travel. Jeśli chcesz uzyskać dostęp do danych, które zastąpisz, możesz wykonać zapytanie dotyczące migawki tabeli przed zastąpieniem pierwszego zestawu danych przy użyciu opcji versionAsOf.

Po uruchomieniu poniższej komórki przed zastąpieniem powinien zostać wyświetlony pierwszy zestaw danych. Time Travel to zaawansowana funkcja, która wykorzystuje możliwości dziennika transakcji usługi Delta Lake w celu uzyskania dostępu do danych, które nie są już w tabeli. Usunięcie opcji w wersji 0 (lub określenie wersji 1) umożliwi ponowne wyświetlanie nowszych danych. Aby uzyskać więcej informacji, zobacz Wykonywanie zapytań o starszą migawkę tabeli.

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Option("versionAsOf", 0).Load(deltaTablePath);
df.Show();
val df = spark.read.format("delta").option("versionAsOf", 0).load(deltaTablePath)
df.show()

Wyniki:

ID (Identyfikator)
0
1
4
3
2

W tym miejscu widać, że wrócisz do najwcześniejszej wersji danych.

Zapisywanie strumienia danych w tabeli

Możesz również napisać do tabeli usługi Delta Lake przy użyciu przesyłania strumieniowego ze strukturą platformy Spark. Dziennik transakcji usługi Delta Lake gwarantuje dokładnie jednokrotne przetwarzanie, nawet jeśli istnieją inne strumienie lub zapytania wsadowe uruchomione współbieżnie względem tabeli. Domyślnie strumienie są uruchamiane w trybie dołączania, który dodaje nowe rekordy do tabeli.

Aby uzyskać więcej informacji na temat integracji usługi Delta Lake z przesyłaniem strumieniowym ze strukturą, zobacz Odczyty i zapisy przesyłane strumieniowo tabel.

W poniższych komórkach oto, co robimy:

  • Komórka 30 Pokaż nowo dołączone dane
  • Komórka 31 — Inspekcja historii
  • Komórka 32 Zatrzymaj ustrukturyzowane zadanie przesyłania strumieniowego
  • Komórka 33 Inspekcja historii <— zauważysz, że dołączenia zostały zatrzymane

Najpierw skonfigurujesz proste zadanie przesyłania strumieniowego platformy Spark w celu wygenerowania sekwencji i zapisania zadania do tabeli delty.

streaming_df = spark.readStream.format("rate").load()
stream = streaming_df\
    .selectExpr("value as id")\
    .writeStream\
    .format("delta")\
    .option("checkpointLocation", "/tmp/checkpoint-{0}".format(session_id))\
    .start(delta_table_path)
var streamingDf = spark.ReadStream().Format("rate").Load();
var stream = streamingDf.SelectExpr("value as id").WriteStream().Format("delta").Option("checkpointLocation", $"/tmp/checkpoint-{sessionId}").Start(deltaTablePath);
val streamingDf = spark.readStream.format("rate").load()
val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", s"/tmp/checkpoint-$sessionId").start(deltaTablePath)

Odczytywanie strumienia zmian z tabeli

Podczas zapisywania strumienia w tabeli usługi Delta Lake można również odczytać z tej tabeli jako źródło przesyłania strumieniowego. Możesz na przykład uruchomić kolejne zapytanie przesyłania strumieniowego, które wyświetla wszystkie zmiany wprowadzone w tabeli usługi Delta Lake.

delta_table.toDF().sort(col("id").desc()).show(100)
deltaTable.ToDF().Sort(Col("id").Desc()).Show(100);
deltaTable.toDF.sort($"id".desc).show

Wyniki:

ID (Identyfikator)
19
18
17
16
15
14
13
12
11
10
8
6
4
3
2
1
0
-1
-1
-1
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(20, 1000, False)
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(20, 1000, false);
deltaTable.history.show

Wyniki:

Wersja sygnatura czasowa operation operationParameters readVersion
5 2020-04-25 00:37:09 AKTUALIZACJA PRZESYŁANIA STRUMIENIOWEGO [outputMode —> Dołącz, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] 4
4 2020-04-25 00:36:27 SCALANIA [predykat -> (oldData.id = newData.id)] 3
3 2020-04-25 00:36:08 DELETE [predykat -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] 2
2 2020-04-25 00:35:51 UPDATE [predykat -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] 1
1 2020-04-25 00:35:05 NAPISZ [tryb —> Zastąp, partitionBy —> []] 0
0 2020-04-25 00:34:34 NAPISZ [tryb —> ErrorIfExists, partitionBy —> []] null

W tym miejscu upuszczasz niektóre z mniej interesujących kolumn, aby uprościć wyświetlanie widoku historii.

stream.stop()
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(100, 1000, False)
stream.Stop();
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(100, 1000, false);
stream.stop
deltaTable.history.show

Wyniki:

Wersja sygnatura czasowa operation operationParameters readVersion
5 2020-04-25 00:37:09 AKTUALIZACJA PRZESYŁANIA STRUMIENIOWEGO [outputMode —> Dołącz, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] 4
4 2020-04-25 00:36:27 SCALANIA [predykat -> (oldData.id = newData.id)] 3
3 2020-04-25 00:36:08 DELETE [predykat -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] 2
2 2020-04-25 00:35:51 UPDATE [predykat -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] 1
1 2020-04-25 00:35:05 NAPISZ [tryb —> Zastąp, partitionBy —> []] 0
0 2020-04-25 00:34:34 NAPISZ [tryb —> ErrorIfExists, partitionBy —> []] null

Konwertowanie parquet na różnicę

Konwersję w miejscu można wykonać z formatu Parquet na różnicę.

W tym miejscu sprawdzisz, czy istniejąca tabela ma format różnicowy, czy nie.

parquet_path = "/parquet/parquet-table-{0}".format(session_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetPath = $"/parquet/parquet-table-{sessionId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath)
val parquetPath = s"/parquet/parquet-table-$sessionId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)

Wyniki:

Fałsz

Teraz przekonwertujesz dane na format różnicowy i sprawdzisz, czy to zadziałało.

DeltaTable.convertToDelta(spark, "parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
DeltaTable.ConvertToDelta(spark, $"parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath)
DeltaTable.convertToDelta(spark, s"parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)

Wyniki:

Prawda

Obsługa języka SQL

Funkcja Delta obsługuje polecenia narzędzia tabel za pomocą języka SQL. Za pomocą programu SQL można wykonywać następujące czynności:

  • Uzyskiwanie historii tabeli DeltaTable
  • Opróżnij tabelę deltaTable
  • Konwertowanie pliku Parquet na różnicę
spark.sql("DESCRIBE HISTORY delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"DESCRIBE HISTORY delta.`{deltaTablePath}`").Show();
spark.sql(s"DESCRIBE HISTORY delta.`$deltaTablePath`").show()

Wyniki:

Wersja sygnatura czasowa userId userName operation operationParameters zadanie notes clusterId readVersion Isolationlevel isBlindAppend
5 2020-04-25 00:37:09 null null AKTUALIZACJA PRZESYŁANIA STRUMIENIOWEGO [outputMode —> Ap... null null null 4 null true
4 2020-04-25 00:36:27 null null SCALANIA [predykat -> (ol... null null null 3 null fałsz
3 2020-04-25 00:36:08 null null DELETE [predykat -> ["(... null null null 2 null fałsz
2 2020-04-25 00:35:51 null null UPDATE [predykat -> ((i... null null null 1 null fałsz
1 2020-04-25 00:35:05 null null NAPISZ [tryb —> Zastępowanie... null null null 0 null fałsz
0 2020-04-25 00:34:34 null null NAPISZ [tryb —> ErrorIfE... null null null null null true
spark.sql("VACUUM delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"VACUUM delta.`{deltaTablePath}`").Show();
spark.sql(s"VACUUM delta.`$deltaTablePath`").show()

Wyniki:

path
abfss://data@arca...

Teraz sprawdzisz, czy tabela nie jest tabelą formatu różnicowego. Następnie przekonwertujesz tabelę na format różnicowy przy użyciu usługi Spark SQL i potwierdzisz, że została ona poprawnie przekonwertowana.

parquet_id = random.randint(0,1000)
parquet_path = "/parquet/parquet-table-{0}-{1}".format(session_id, parquet_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
spark.sql("CONVERT TO DELTA parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetId =  (new Random()).Next(10000000);
var parquetPath = $"/parquet/parquet-table-{sessionId}-{parquetId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath);
spark.Sql($"CONVERT TO DELTA parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath);
val parquetId = scala.util.Random.nextInt(1000)
val parquetPath = s"/parquet/parquet-table-$sessionId-$parquetId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)
spark.sql(s"CONVERT TO DELTA parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)

Wyniki:

Prawda

Aby uzyskać pełną dokumentację, zobacz stronę dokumentacji usługi Delta Lake

Aby uzyskać więcej informacji, zobacz Projekt usługi Delta Lake.

Następne kroki