Ringkasan Linux Foundation Delta Lake
Artikel ini telah diadaptasi untuk lebih jelas dari mitra aslinya di sini. Artikel ini membantu Anda dengan cepat menjelajahi fitur utama Delta Lake. Artikel ini menyediakan cuplikan kode yang menunjukkan cara membaca dan menulis ke tabel Delta Lake dari kueri interaktif, batch, dan streaming. Cuplikan kode juga tersedia dalam satu set notebook PySpark di sini, Skala di sini, dan C # di sini
Inilah yang akan kami bahas:
- Membuat tabel
- Membaca data
- Memperbarui data tabel
- Menimpa data tabel
- Pembaruan bersyarat tanpa menimpa
- Membaca versi data yang lebih lama menggunakan Perjalanan Waktu
- Menulis aliran data ke tabel
- Membaca aliran perubahan dari tabel
- Dukungan SQL
Konfigurasi
Pastikan Anda memodifikasi hal di bawah ini sebagaimana mestinya untuk lingkungan Anda.
import random
session_id = random.randint(0,1000000)
delta_table_path = "/delta/delta-table-{0}".format(session_id)
delta_table_path
var sessionId = (new Random()).Next(10000000);
var deltaTablePath = $"/delta/delta-table-{sessionId}";
deltaTablePath
val sessionId = scala.util.Random.nextInt(1000000)
val deltaTablePath = s"/delta/delta-table-$sessionId";
menghasilkan:
'/delta/delta-table-335323'
Membuat tabel
Untuk membuat tabel Delta Lake, tulis DataFrame dari DataFrame dalam format delta. Anda dapat mengubah format dari Parquet, CSV, JSON, dan sebagainya, ke delta.
Kode yang mengikuti memperlihatkan kepada Anda cara membuat tabel Delta Lake baru menggunakan skema yang disimpulkan dari DataFrame Anda.
data = spark.range(0,5)
data.show()
data.write.format("delta").save(delta_table_path)
var data = spark.Range(0,5);
data.Show();
data.Write().Format("delta").Save(deltaTablePath);
val data = spark.range(0, 5)
data.show
data.write.format("delta").save(deltaTablePath)
menghasilkan:
ID |
---|
0 |
1 |
2 |
3 |
4 |
Membaca data
Anda membaca data dalam tabel Delta Lake Anda dengan menentukan jalur ke file dan format delta.
df = spark.read.format("delta").load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Load(deltaTablePath);
df.Show()
val df = spark.read.format("delta").load(deltaTablePath)
df.show()
menghasilkan:
ID |
---|
1 |
3 |
4 |
0 |
2 |
Urutan hasilnya berbeda dari di atas karena tidak ada urutan yang secara eksplisit ditentukan sebelum menghasilkan hasil.
Memperbarui data tabel
Delta Lake mendukung beberapa operasi untuk memodifikasi tabel menggunakan API DataFrame standar. Operasi ini adalah salah satu penyempurnaan yang ditambahkan format delta. Contoh berikut menjalankan tugas batch untuk menimpa data dalam tabel.
data = spark.range(5,10)
data.write.format("delta").mode("overwrite").save(delta_table_path)
df.show()
var data = spark.Range(5,10);
data.Write().Format("delta").Mode("overwrite").Save(deltaTablePath);
df.Show();
val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save(deltaTablePath)
df.show()
menghasilkan:
ID |
---|
7 |
8 |
5 |
9 |
6 |
Di sini Anda bisa melihat bahwa kelima rekaman telah diperbarui untuk menyimpan nilai baru.
Simpan sebagai tabel katalogtalog
Delta Lake dapat menulis ke tabel katalog terkelola atau eksternal.
data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql("CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{0}'".format(delta_table_path))
spark.sql("SHOW TABLES").show()
data.Write().Format("delta").SaveAsTable("ManagedDeltaTable");
spark.Sql($"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{deltaTablePath}'");
spark.Sql("SHOW TABLES").Show();
data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql(s"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '$deltaTablePath'")
spark.sql("SHOW TABLES").show
menghasilkan:
database | tableName | isTemporary |
---|---|---|
default | externaldeltatable | false |
default | manageddeltatable | false |
Dengan kode ini, Anda membuat tabel baru di katalog dari dataframe yang sudah ada, yang disebut sebagai tabel terkelola. Lalu Anda menentukan tabel eksternal baru di katalog yang menggunakan lokasi yang sudah ada, yang disebut sebagai tabel eksternal. Dalam output Anda dapat melihat kedua tabel, tidak peduli bagaimana mereka dibuat, tercantum dalam katalog.
Sekarang Anda dapat melihat properti yang diperluas dari kedua tabel ini
spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ManagedDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=false)
menghasilkan:
col_name | data_type | komentar |
---|---|---|
id | bigint | null |
Informasi Tabel Terperinci | ||
Database | default | |
Tabel | manageddeltatable | |
Pemilik | pengguna layanan tepercaya | |
Waktu Dibuat | Sab 25 Apr 00:35:34 UTC 2020 | |
Akses Terakhir | Kam 01 Jan 00:00:00 UTC 1970 | |
Dibuat Oleh | Spark 2.4.4.2.6.99.201-11401300 | |
Jenis | TERKELOLA | |
Penyedia | delta | |
Properti Tabel | [transient_lastDdlTime=1587774934] | |
Statistik | 2407 byte | |
Lokasi | abfss://data@<data lake>.dfs.core.windows.net/synapse/workspaces/<workspace name>/warehouse/manageddeltatable | |
Pustaka Serde | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | |
InputFormat | org.apache.hadoop.mapred.SequenceFileInputFormat | |
OutputFormat | org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat | |
Properti Penyimpanan | [serialization.format=1] |
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ExternalDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=false)
menghasilkan:
col_name | data_type | komentar |
---|---|---|
id | bigint | null |
Informasi Tabel Terperinci | ||
Database | default | |
Tabel | externaldeltatable | |
Pemilik | pengguna layanan tepercaya | |
Waktu Dibuat | Sab 25 Apr 00:35:38 UTC 2020 | |
Akses Terakhir | Kam 01 Jan 00:00:00 UTC 1970 | |
Dibuat Oleh | Spark 2.4.4.2.6.99.201-11401300 | |
Jenis | EKSTERNAL | |
Penyedia | DELTA | |
Properti Tabel | [transient_lastDdlTime=1587774938] | |
Lokasi | abfss://data@<data lake>.dfs.core.windows.net/delta/delta-table-587152 | |
Pustaka Serde | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | |
InputFormat | org.apache.hadoop.mapred.SequenceFileInputFormat | |
OutputFormat | org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat | |
Properti Penyimpanan | [serialization.format=1] |
Pembaruan bersyarat tanpa menimpa
Delta Lake menyediakan API terprogram untuk memperbarui, menghapus, dan menggabungkan kondisional (perintah ini biasanya disebut sebagai data upsert) ke dalam tabel.
from delta.tables import *
from pyspark.sql.functions import *
delta_table = DeltaTable.forPath(spark, delta_table_path)
delta_table.update(
condition = expr("id % 2 == 0"),
set = { "id": expr("id + 100") })
delta_table.toDF().show()
using Microsoft.Spark.Extensions.Delta;
using Microsoft.Spark.Extensions.Delta.Tables;
using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions;
var deltaTable = DeltaTable.ForPath(deltaTablePath);
deltaTable.Update(
condition: Expr("id % 2 == 0"),
set: new Dictionary<string, Column>(){{ "id", Expr("id + 100") }});
deltaTable.ToDF().Show();
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forPath(deltaTablePath)
// Update every even value by adding 100 to it
deltaTable.update(
condition = expr("id % 2 == 0"),
set = Map("id" -> expr("id + 100")))
deltaTable.toDF.show
menghasilkan:
ID |
---|
106 |
108 |
5 |
7 |
9 |
Di sini Anda baru saja menambahkan 100 ke setiap ID genap.
delta_table.delete("id % 2 == 0")
delta_table.toDF().show()
deltaTable.Delete(condition: Expr("id % 2 == 0"));
deltaTable.ToDF().Show();
deltaTable.delete(condition = expr("id % 2 == 0"))
deltaTable.toDF.show
menghasilkan:
ID |
---|
5 |
7 |
9 |
Perhatikan bahwa setiap baris genap telah dihapus.
new_data = spark.range(0,20).alias("newData")
delta_table.alias("oldData")\
.merge(new_data.alias("newData"), "oldData.id = newData.id")\
.whenMatchedUpdate(set = { "id": lit("-1")})\
.whenNotMatchedInsert(values = { "id": col("newData.id") })\
.execute()
delta_table.toDF().show(100)
var newData = spark.Range(20).As("newData");
deltaTable
.As("oldData")
.Merge(newData, "oldData.id = newData.id")
.WhenMatched()
.Update(new Dictionary<string, Column>() {{"id", Lit("-1")}})
.WhenNotMatched()
.Insert(new Dictionary<string, Column>() {{"id", Col("newData.id")}})
.Execute();
deltaTable.ToDF().Show(100);
val newData = spark.range(0, 20).toDF
deltaTable.as("oldData").
merge(
newData.as("newData"),
"oldData.id = newData.id").
whenMatched.
update(Map("id" -> lit(-1))).
whenNotMatched.
insert(Map("id" -> col("newData.id"))).
execute()
deltaTable.toDF.show()
menghasilkan:
ID |
---|
18 |
15 |
19 |
2 |
1 |
6 |
8 |
3 |
-1 |
10 |
13 |
0 |
16 |
4 |
-1 |
12 |
11 |
14 |
-1 |
17 |
Di sini Anda memiliki kombinasi data yang ada. Data yang ada telah ditetapkan nilai -1 dalam jalur kode update(WhenMatched). Data baru yang dibuat di bagian atas cuplikan dan ditambahkan melalui jalur kode sisipan (WhenNotMatched), juga ditambahkan.
Riwayat
Delta Lake memiliki kemampuan untuk memungkinkan melihat riwayat tabel. Yaitu, perubahan yang dilakukan pada Tabel Delta yang mendasarinya. Sel di bawah ini memperlihatkan betapa sederhananya memeriksa riwayat.
delta_table.history().show(20, 1000, False)
deltaTable.History().Show(20, 1000, false);
deltaTable.history.show(false)
menghasilkan:
versi | tanda waktu | userId | userName | operasi | operationParameters | tugas | buku catatan | clusterId | readVersion | isolationLevel | isBlindAppend |
---|---|---|---|---|---|---|---|---|---|---|---|
4 | 2020-04-25 00:36:27 | null | null | GABUNG | [predikat -> (OldData.ID = newData.ID )] |
null | null | null | 3 | null | false |
3 | 2020-04-25 00:36:08 | null | null | HAPUS | [predicate -> ["((ID % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] |
null | null | null | 2 | null | false |
2 | 2020-04-25 00:35:51 | null | null | PERBARUI | [predicate -> ((ID#744L % cast(2 as bigint)) = cast(0 as bigint))] | null | null | null | 1 | null | false |
1 | 2020-04-25 00:35:05 | null | null | TULIS | [mode -> Overwrite, partitionBy -> []] | null | null | null | 0 | null | false |
0 | 2020-04-25 00:34:34 | null | null | TULIS | [mode -> ErrorIfExists, partitionBy -> []] | null | null | null | null | null | true |
Di sini Anda dapat melihat semua modifikasi yang dilakukan di atas cuplikan kode di atas.
Membaca versi data yang lebih lama menggunakan Perjalanan Waktu
Anda dapat melakukan kueri rekam jepret tabel Delta Lake sebelumnya dengan menggunakan fitur yang disebut Perjalanan Waktu. Jika Anda ingin mengakses data yang Anda timpa, Anda bisa meminta rekam jepret tabel sebelum anda menimpa kumpulan data pertama menggunakan opsi versionAsOf.
Setelah Anda menjalankan sel di bawah ini, Anda akan melihat kumpulan data pertama sebelum Anda menimpanya. Perjalanan Waktu adalah fitur canggih yang memanfaatkan kekuatan log transaksi Delta Lake untuk mengakses data yang tidak lagi ada dalam tabel. Menghapus opsi versi 0 (atau menentukan versi 1) akan memungkinkan Anda melihat data yang lebih baru lagi. Untuk informasi selengkapnya, lihat Mengkueri rekam jepret tabel yang lebih lama.
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Option("versionAsOf", 0).Load(deltaTablePath);
df.Show();
val df = spark.read.format("delta").option("versionAsOf", 0).load(deltaTablePath)
df.show()
menghasilkan:
ID |
---|
0 |
1 |
4 |
3 |
2 |
Di sini Anda dapat melihat Bahwa Anda telah kembali ke versi data paling awal.
Menulis aliran data ke tabel
Anda juga dapat menulis ke tabel Delta Lake menggunakan Streaming Terstrukur Spark. Log transaksi Delta Lake menjamin persis sekali pemrosesan, bahkan ketika ada aliran lain atau kueri batch berjalan bersamaan terhadap tabel. Secara default, aliran berjalan dalam mode penambahan, yang menambahkan rekaman baru ke tabel.
Untuk informasi selengkapnya tentang integrasi Delta Lake dengan Streaming Terstruktur, lihat Baca dan Tulis Streaming Tabel.
Dalam sel-sel di bawah ini, inilah yang kami lakukan:
- Sel 30 Memperlihatkan data yang baru ditambahkan
- Sel 31 Pemeriksaan Riwayat
- Sel 32 Hentikan pekerjaan streaming terstruktur
- Sel 33 Periksa riwayat <--Anda akan melihat penambahan telah berhenti
Pertama, Anda akan menyiapkan pekerjaan Spark Streaming sederhana untuk menghasilkan urutan dan membuat pekerjaan menulis ke Tabel Delta Anda.
streaming_df = spark.readStream.format("rate").load()
stream = streaming_df\
.selectExpr("value as id")\
.writeStream\
.format("delta")\
.option("checkpointLocation", "/tmp/checkpoint-{0}".format(session_id))\
.start(delta_table_path)
var streamingDf = spark.ReadStream().Format("rate").Load();
var stream = streamingDf.SelectExpr("value as id").WriteStream().Format("delta").Option("checkpointLocation", $"/tmp/checkpoint-{sessionId}").Start(deltaTablePath);
val streamingDf = spark.readStream.format("rate").load()
val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", s"/tmp/checkpoint-$sessionId").start(deltaTablePath)
Membaca aliran perubahan dari tabel
Saat aliran sedang menulis ke meja Delta Lake, Anda juga dapat membaca dari tabel itu sebagai sumber streaming. Misalnya, Anda dapat memulai kueri streaming lain yang mencetak semua perubahan yang dibuat pada tabel Delta Lake.
delta_table.toDF().sort(col("id").desc()).show(100)
deltaTable.ToDF().Sort(Col("id").Desc()).Show(100);
deltaTable.toDF.sort($"id".desc).show
menghasilkan:
ID |
---|
19 |
18 |
17 |
16 |
15 |
14 |
13 |
12 |
11 |
10 |
8 |
6 |
4 |
3 |
2 |
1 |
0 |
-1 |
-1 |
-1 |
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(20, 1000, False)
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(20, 1000, false);
deltaTable.history.show
menghasilkan:
versi | tanda waktu | operasi | operationParameters | readVersion |
---|---|---|---|---|
5 | 2020-04-25 00:37:09 | PEMBARUAN STREAMING | [outputMode -> Append, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] | 4 |
4 | 2020-04-25 00:36:27 | GABUNG | [predikat -> (OldData.id = newData.id )] |
3 |
3 | 2020-04-25 00:36:08 | HAPUS | [predicate -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] |
2 |
2 | 2020-04-25 00:35:51 | PERBARUI | [predicate -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] | 1 |
1 | 2020-04-25 00:35:05 | TULIS | [mode -> Overwrite, partitionBy -> []] | 0 |
0 | 2020-04-25 00:34:34 | TULIS | [mode -> ErrorIfExists, partitionBy -> []] | null |
Di sini Anda menghilangkan beberapa kolom yang kurang menarik untuk menyederhanakan pengalaman melihat tampilan riwayat.
stream.stop()
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(100, 1000, False)
stream.Stop();
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(100, 1000, false);
stream.stop
deltaTable.history.show
menghasilkan:
versi | tanda waktu | operasi | operationParameters | readVersion |
---|---|---|---|---|
5 | 2020-04-25 00:37:09 | PEMBARUAN STREAMING | [outputMode -> Append, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] | 4 |
4 | 2020-04-25 00:36:27 | GABUNG | [predikat -> (OldData.id = newData.id )] |
3 |
3 | 2020-04-25 00:36:08 | HAPUS | [predicate -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] |
2 |
2 | 2020-04-25 00:35:51 | PERBARUI | [predicate -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] | 1 |
1 | 2020-04-25 00:35:05 | TULIS | [mode -> Overwrite, partitionBy -> []] | 0 |
0 | 2020-04-25 00:34:34 | TULIS | [mode -> ErrorIfExists, partitionBy -> []] | null |
Konversikan Parquet ke Delta
Anda dapat melakukan konversi di tempat dari format Parquet ke Delta.
Di sini Anda akan menguji apakah tabel yang ada dalam format delta atau tidak.
parquet_path = "/parquet/parquet-table-{0}".format(session_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetPath = $"/parquet/parquet-table-{sessionId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath)
val parquetPath = s"/parquet/parquet-table-$sessionId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)
menghasilkan:
False
Sekarang Anda akan mengonversi data ke format delta dan memverifikasi bahwa data berfungsi.
DeltaTable.convertToDelta(spark, "parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
DeltaTable.ConvertToDelta(spark, $"parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath)
DeltaTable.convertToDelta(spark, s"parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)
menghasilkan:
True
Dukungan SQL
Delta mendukung perintah utilitas tabel melalui SQL. Anda dapat menggunakan SQL untuk:
- Mendapatkan riwayat DeltaTable
- Vakum DeltaTable
- Mengonversi file Parquet ke Delta
spark.sql("DESCRIBE HISTORY delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"DESCRIBE HISTORY delta.`{deltaTablePath}`").Show();
spark.sql(s"DESCRIBE HISTORY delta.`$deltaTablePath`").show()
menghasilkan:
versi | tanda waktu | userId | userName | operasi | operationParameters | tugas | buku catatan | clusterId | readVersion | isolationLevel | isBlindAppend |
---|---|---|---|---|---|---|---|---|---|---|---|
5 | 2020-04-25 00:37:09 | null | null | PEMBARUAN STREAMING | [outputMode -> Ap... | null | null | null | 4 | null | true |
4 | 2020-04-25 00:36:27 | null | null | GABUNG | [predicate -> (ol... | null | null | null | 3 | null | false |
3 | 2020-04-25 00:36:08 | null | null | HAPUS | [predicate -> ["(... | null | null | null | 2 | null | false |
2 | 2020-04-25 00:35:51 | null | null | PERBARUI | [predicate -> ((i... | null | null | null | 1 | null | false |
1 | 2020-04-25 00:35:05 | null | null | TULIS | [mode -> Overwrit... | null | null | null | 0 | null | false |
0 | 2020-04-25 00:34:34 | null | null | TULIS | [mode -> ErrorIfE... | null | null | null | null | null | true |
spark.sql("VACUUM delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"VACUUM delta.`{deltaTablePath}`").Show();
spark.sql(s"VACUUM delta.`$deltaTablePath`").show()
menghasilkan:
jalur |
---|
abfss://data@arca... |
Sekarang, Anda akan memverifikasi bahwa tabel bukan tabel format delta. Kemudian, Anda akan mengonversi tabel ke format delta menggunakan Spark SQL dan mengonfirmasi bahwa tabel dikonversi dengan benar.
parquet_id = random.randint(0,1000)
parquet_path = "/parquet/parquet-table-{0}-{1}".format(session_id, parquet_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
spark.sql("CONVERT TO DELTA parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetId = (new Random()).Next(10000000);
var parquetPath = $"/parquet/parquet-table-{sessionId}-{parquetId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath);
spark.Sql($"CONVERT TO DELTA parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath);
val parquetId = scala.util.Random.nextInt(1000)
val parquetPath = s"/parquet/parquet-table-$sessionId-$parquetId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)
spark.sql(s"CONVERT TO DELTA parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)
menghasilkan:
True
Untuk dokumentasi lengkap, lihat Halaman Dokumentasi Delta Lake
Untuk informasi selengkapnya, lihat Proyek Delta Lake.