Ringkasan Linux Foundation Delta Lake

Artikel ini telah diadaptasi untuk lebih jelas dari mitra aslinya di sini. Artikel ini membantu Anda dengan cepat menjelajahi fitur utama Delta Lake. Artikel ini menyediakan cuplikan kode yang menunjukkan cara membaca dan menulis ke tabel Delta Lake dari kueri interaktif, batch, dan streaming. Cuplikan kode juga tersedia dalam satu set notebook PySpark di sini, Skala di sini, dan C # di sini

Inilah yang akan kami bahas:

  • Membuat tabel
  • Membaca data
  • Memperbarui data tabel
  • Menimpa data tabel
  • Pembaruan bersyarat tanpa menimpa
  • Membaca versi data yang lebih lama menggunakan Perjalanan Waktu
  • Menulis aliran data ke tabel
  • Membaca aliran perubahan dari tabel
  • Dukungan SQL

Konfigurasi

Pastikan Anda memodifikasi hal di bawah ini sebagaimana mestinya untuk lingkungan Anda.

import random

session_id = random.randint(0,1000000)
delta_table_path = "/delta/delta-table-{0}".format(session_id)

delta_table_path
var sessionId = (new Random()).Next(10000000);
var deltaTablePath = $"/delta/delta-table-{sessionId}";

deltaTablePath
val sessionId = scala.util.Random.nextInt(1000000)
val deltaTablePath = s"/delta/delta-table-$sessionId";

menghasilkan:

'/delta/delta-table-335323'

Membuat tabel

Untuk membuat tabel Delta Lake, tulis DataFrame dari DataFrame dalam format delta. Anda dapat mengubah format dari Parquet, CSV, JSON, dan sebagainya, ke delta.

Kode yang mengikuti memperlihatkan kepada Anda cara membuat tabel Delta Lake baru menggunakan skema yang disimpulkan dari DataFrame Anda.

data = spark.range(0,5)
data.show()
data.write.format("delta").save(delta_table_path)
var data = spark.Range(0,5);
data.Show();
data.Write().Format("delta").Save(deltaTablePath);
val data = spark.range(0, 5)
data.show
data.write.format("delta").save(deltaTablePath)

menghasilkan:

ID
0
1
2
3
4

Membaca data

Anda membaca data dalam tabel Delta Lake Anda dengan menentukan jalur ke file dan format delta.

df = spark.read.format("delta").load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Load(deltaTablePath);
df.Show()
val df = spark.read.format("delta").load(deltaTablePath)
df.show()

menghasilkan:

ID
1
3
4
0
2

Urutan hasilnya berbeda dari di atas karena tidak ada urutan yang secara eksplisit ditentukan sebelum menghasilkan hasil.

Memperbarui data tabel

Delta Lake mendukung beberapa operasi untuk memodifikasi tabel menggunakan API DataFrame standar. Operasi ini adalah salah satu penyempurnaan yang ditambahkan format delta. Contoh berikut menjalankan tugas batch untuk menimpa data dalam tabel.

data = spark.range(5,10)
data.write.format("delta").mode("overwrite").save(delta_table_path)
df.show()
var data = spark.Range(5,10);
data.Write().Format("delta").Mode("overwrite").Save(deltaTablePath);
df.Show();
val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save(deltaTablePath)
df.show()

menghasilkan:

ID
7
8
5
9
6

Di sini Anda bisa melihat bahwa kelima rekaman telah diperbarui untuk menyimpan nilai baru.

Simpan sebagai tabel katalogtalog

Delta Lake dapat menulis ke tabel katalog terkelola atau eksternal.

data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql("CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{0}'".format(delta_table_path))
spark.sql("SHOW TABLES").show()
data.Write().Format("delta").SaveAsTable("ManagedDeltaTable");
spark.Sql($"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{deltaTablePath}'");
spark.Sql("SHOW TABLES").Show();
data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql(s"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '$deltaTablePath'")
spark.sql("SHOW TABLES").show

menghasilkan:

database tableName isTemporary
default externaldeltatable false
default manageddeltatable false

Dengan kode ini, Anda membuat tabel baru di katalog dari dataframe yang sudah ada, yang disebut sebagai tabel terkelola. Lalu Anda menentukan tabel eksternal baru di katalog yang menggunakan lokasi yang sudah ada, yang disebut sebagai tabel eksternal. Dalam output Anda dapat melihat kedua tabel, tidak peduli bagaimana mereka dibuat, tercantum dalam katalog.

Sekarang Anda dapat melihat properti yang diperluas dari kedua tabel ini

spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ManagedDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=false)

menghasilkan:

col_name data_type komentar
id bigint null
Informasi Tabel Terperinci
Database default
Tabel manageddeltatable
Pemilik pengguna layanan tepercaya
Waktu Dibuat Sab 25 Apr 00:35:34 UTC 2020
Akses Terakhir Kam 01 Jan 00:00:00 UTC 1970
Dibuat Oleh Spark 2.4.4.2.6.99.201-11401300
Jenis TERKELOLA
Penyedia delta
Properti Tabel [transient_lastDdlTime=1587774934]
Statistik 2407 byte
Lokasi abfss://data@<data lake>.dfs.core.windows.net/synapse/workspaces/<workspace name>/warehouse/manageddeltatable
Pustaka Serde org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Properti Penyimpanan [serialization.format=1]
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ExternalDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=false)

menghasilkan:

col_name data_type komentar
id bigint null
Informasi Tabel Terperinci
Database default
Tabel externaldeltatable
Pemilik pengguna layanan tepercaya
Waktu Dibuat Sab 25 Apr 00:35:38 UTC 2020
Akses Terakhir Kam 01 Jan 00:00:00 UTC 1970
Dibuat Oleh Spark 2.4.4.2.6.99.201-11401300
Jenis EKSTERNAL
Penyedia DELTA
Properti Tabel [transient_lastDdlTime=1587774938]
Lokasi abfss://data@<data lake>.dfs.core.windows.net/delta/delta-table-587152
Pustaka Serde org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Properti Penyimpanan [serialization.format=1]

Pembaruan bersyarat tanpa menimpa

Delta Lake menyediakan API terprogram untuk memperbarui, menghapus, dan menggabungkan kondisional (perintah ini biasanya disebut sebagai data upsert) ke dalam tabel.

from delta.tables import *
from pyspark.sql.functions import *

delta_table = DeltaTable.forPath(spark, delta_table_path)

delta_table.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })
delta_table.toDF().show()
using Microsoft.Spark.Extensions.Delta;
using Microsoft.Spark.Extensions.Delta.Tables;
using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions;

var deltaTable = DeltaTable.ForPath(deltaTablePath);

deltaTable.Update(
  condition: Expr("id % 2 == 0"),
  set: new Dictionary<string, Column>(){{ "id", Expr("id + 100") }});
deltaTable.ToDF().Show();
import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forPath(deltaTablePath)

// Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = Map("id" -> expr("id + 100")))
deltaTable.toDF.show

menghasilkan:

ID
106
108
5
7
9

Di sini Anda baru saja menambahkan 100 ke setiap ID genap.

delta_table.delete("id % 2 == 0")
delta_table.toDF().show()
deltaTable.Delete(condition: Expr("id % 2 == 0"));
deltaTable.ToDF().Show();
deltaTable.delete(condition = expr("id % 2 == 0"))
deltaTable.toDF.show

menghasilkan:

ID
5
7
9

Perhatikan bahwa setiap baris genap telah dihapus.

new_data = spark.range(0,20).alias("newData")

delta_table.alias("oldData")\
    .merge(new_data.alias("newData"), "oldData.id = newData.id")\
    .whenMatchedUpdate(set = { "id": lit("-1")})\
    .whenNotMatchedInsert(values = { "id": col("newData.id") })\
    .execute()

delta_table.toDF().show(100)
var newData = spark.Range(20).As("newData");

deltaTable
    .As("oldData")
    .Merge(newData, "oldData.id = newData.id")
    .WhenMatched()
        .Update(new Dictionary<string, Column>() {{"id", Lit("-1")}})
    .WhenNotMatched()
        .Insert(new Dictionary<string, Column>() {{"id", Col("newData.id")}})
    .Execute();

deltaTable.ToDF().Show(100);
val newData = spark.range(0, 20).toDF

deltaTable.as("oldData").
  merge(
    newData.as("newData"),
    "oldData.id = newData.id").
  whenMatched.
  update(Map("id" -> lit(-1))).
  whenNotMatched.
  insert(Map("id" -> col("newData.id"))).
  execute()

deltaTable.toDF.show()

menghasilkan:

ID
18
15
19
2
1
6
8
3
-1
10
13
0
16
4
-1
12
11
14
-1
17

Di sini Anda memiliki kombinasi data yang ada. Data yang ada telah ditetapkan nilai -1 dalam jalur kode update(WhenMatched). Data baru yang dibuat di bagian atas cuplikan dan ditambahkan melalui jalur kode sisipan (WhenNotMatched), juga ditambahkan.

Riwayat

Delta Lake memiliki kemampuan untuk memungkinkan melihat riwayat tabel. Yaitu, perubahan yang dilakukan pada Tabel Delta yang mendasarinya. Sel di bawah ini memperlihatkan betapa sederhananya memeriksa riwayat.

delta_table.history().show(20, 1000, False)
deltaTable.History().Show(20, 1000, false);
deltaTable.history.show(false)

menghasilkan:

versi tanda waktu userId userName operasi operationParameters tugas buku catatan clusterId readVersion isolationLevel isBlindAppend
4 2020-04-25 00:36:27 null null GABUNG [predikat -> (OldData.ID = newData.ID)] null null null 3 null false
3 2020-04-25 00:36:08 null null HAPUS [predicate -> ["((ID % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] null null null 2 null false
2 2020-04-25 00:35:51 null null PERBARUI [predicate -> ((ID#744L % cast(2 as bigint)) = cast(0 as bigint))] null null null 1 null false
1 2020-04-25 00:35:05 null null TULIS [mode -> Overwrite, partitionBy -> []] null null null 0 null false
0 2020-04-25 00:34:34 null null TULIS [mode -> ErrorIfExists, partitionBy -> []] null null null null null true

Di sini Anda dapat melihat semua modifikasi yang dilakukan di atas cuplikan kode di atas.

Membaca versi data yang lebih lama menggunakan Perjalanan Waktu

Anda dapat melakukan kueri rekam jepret tabel Delta Lake sebelumnya dengan menggunakan fitur yang disebut Perjalanan Waktu. Jika Anda ingin mengakses data yang Anda timpa, Anda bisa meminta rekam jepret tabel sebelum anda menimpa kumpulan data pertama menggunakan opsi versionAsOf.

Setelah Anda menjalankan sel di bawah ini, Anda akan melihat kumpulan data pertama sebelum Anda menimpanya. Perjalanan Waktu adalah fitur canggih yang memanfaatkan kekuatan log transaksi Delta Lake untuk mengakses data yang tidak lagi ada dalam tabel. Menghapus opsi versi 0 (atau menentukan versi 1) akan memungkinkan Anda melihat data yang lebih baru lagi. Untuk informasi selengkapnya, lihat Mengkueri rekam jepret tabel yang lebih lama.

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Option("versionAsOf", 0).Load(deltaTablePath);
df.Show();
val df = spark.read.format("delta").option("versionAsOf", 0).load(deltaTablePath)
df.show()

menghasilkan:

ID
0
1
4
3
2

Di sini Anda dapat melihat Bahwa Anda telah kembali ke versi data paling awal.

Menulis aliran data ke tabel

Anda juga dapat menulis ke tabel Delta Lake menggunakan Streaming Terstrukur Spark. Log transaksi Delta Lake menjamin persis sekali pemrosesan, bahkan ketika ada aliran lain atau kueri batch berjalan bersamaan terhadap tabel. Secara default, aliran berjalan dalam mode penambahan, yang menambahkan rekaman baru ke tabel.

Untuk informasi selengkapnya tentang integrasi Delta Lake dengan Streaming Terstruktur, lihat Baca dan Tulis Streaming Tabel.

Dalam sel-sel di bawah ini, inilah yang kami lakukan:

  • Sel 30 Memperlihatkan data yang baru ditambahkan
  • Sel 31 Pemeriksaan Riwayat
  • Sel 32 Hentikan pekerjaan streaming terstruktur
  • Sel 33 Periksa riwayat <--Anda akan melihat penambahan telah berhenti

Pertama, Anda akan menyiapkan pekerjaan Spark Streaming sederhana untuk menghasilkan urutan dan membuat pekerjaan menulis ke Tabel Delta Anda.

streaming_df = spark.readStream.format("rate").load()
stream = streaming_df\
    .selectExpr("value as id")\
    .writeStream\
    .format("delta")\
    .option("checkpointLocation", "/tmp/checkpoint-{0}".format(session_id))\
    .start(delta_table_path)
var streamingDf = spark.ReadStream().Format("rate").Load();
var stream = streamingDf.SelectExpr("value as id").WriteStream().Format("delta").Option("checkpointLocation", $"/tmp/checkpoint-{sessionId}").Start(deltaTablePath);
val streamingDf = spark.readStream.format("rate").load()
val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", s"/tmp/checkpoint-$sessionId").start(deltaTablePath)

Membaca aliran perubahan dari tabel

Saat aliran sedang menulis ke meja Delta Lake, Anda juga dapat membaca dari tabel itu sebagai sumber streaming. Misalnya, Anda dapat memulai kueri streaming lain yang mencetak semua perubahan yang dibuat pada tabel Delta Lake.

delta_table.toDF().sort(col("id").desc()).show(100)
deltaTable.ToDF().Sort(Col("id").Desc()).Show(100);
deltaTable.toDF.sort($"id".desc).show

menghasilkan:

ID
19
18
17
16
15
14
13
12
11
10
8
6
4
3
2
1
0
-1
-1
-1
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(20, 1000, False)
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(20, 1000, false);
deltaTable.history.show

menghasilkan:

versi tanda waktu operasi operationParameters readVersion
5 2020-04-25 00:37:09 PEMBARUAN STREAMING [outputMode -> Append, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] 4
4 2020-04-25 00:36:27 GABUNG [predikat -> (OldData.id = newData.id)] 3
3 2020-04-25 00:36:08 HAPUS [predicate -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] 2
2 2020-04-25 00:35:51 PERBARUI [predicate -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] 1
1 2020-04-25 00:35:05 TULIS [mode -> Overwrite, partitionBy -> []] 0
0 2020-04-25 00:34:34 TULIS [mode -> ErrorIfExists, partitionBy -> []] null

Di sini Anda menghilangkan beberapa kolom yang kurang menarik untuk menyederhanakan pengalaman melihat tampilan riwayat.

stream.stop()
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(100, 1000, False)
stream.Stop();
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(100, 1000, false);
stream.stop
deltaTable.history.show

menghasilkan:

versi tanda waktu operasi operationParameters readVersion
5 2020-04-25 00:37:09 PEMBARUAN STREAMING [outputMode -> Append, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] 4
4 2020-04-25 00:36:27 GABUNG [predikat -> (OldData.id = newData.id)] 3
3 2020-04-25 00:36:08 HAPUS [predicate -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] 2
2 2020-04-25 00:35:51 PERBARUI [predicate -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] 1
1 2020-04-25 00:35:05 TULIS [mode -> Overwrite, partitionBy -> []] 0
0 2020-04-25 00:34:34 TULIS [mode -> ErrorIfExists, partitionBy -> []] null

Konversikan Parquet ke Delta

Anda dapat melakukan konversi di tempat dari format Parquet ke Delta.

Di sini Anda akan menguji apakah tabel yang ada dalam format delta atau tidak.

parquet_path = "/parquet/parquet-table-{0}".format(session_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetPath = $"/parquet/parquet-table-{sessionId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath)
val parquetPath = s"/parquet/parquet-table-$sessionId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)

menghasilkan:

False

Sekarang Anda akan mengonversi data ke format delta dan memverifikasi bahwa data berfungsi.

DeltaTable.convertToDelta(spark, "parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
DeltaTable.ConvertToDelta(spark, $"parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath)
DeltaTable.convertToDelta(spark, s"parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)

menghasilkan:

True

Dukungan SQL

Delta mendukung perintah utilitas tabel melalui SQL. Anda dapat menggunakan SQL untuk:

  • Mendapatkan riwayat DeltaTable
  • Vakum DeltaTable
  • Mengonversi file Parquet ke Delta
spark.sql("DESCRIBE HISTORY delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"DESCRIBE HISTORY delta.`{deltaTablePath}`").Show();
spark.sql(s"DESCRIBE HISTORY delta.`$deltaTablePath`").show()

menghasilkan:

versi tanda waktu userId userName operasi operationParameters tugas buku catatan clusterId readVersion isolationLevel isBlindAppend
5 2020-04-25 00:37:09 null null PEMBARUAN STREAMING [outputMode -> Ap... null null null 4 null true
4 2020-04-25 00:36:27 null null GABUNG [predicate -> (ol... null null null 3 null false
3 2020-04-25 00:36:08 null null HAPUS [predicate -> ["(... null null null 2 null false
2 2020-04-25 00:35:51 null null PERBARUI [predicate -> ((i... null null null 1 null false
1 2020-04-25 00:35:05 null null TULIS [mode -> Overwrit... null null null 0 null false
0 2020-04-25 00:34:34 null null TULIS [mode -> ErrorIfE... null null null null null true
spark.sql("VACUUM delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"VACUUM delta.`{deltaTablePath}`").Show();
spark.sql(s"VACUUM delta.`$deltaTablePath`").show()

menghasilkan:

jalur
abfss://data@arca...

Sekarang, Anda akan memverifikasi bahwa tabel bukan tabel format delta. Kemudian, Anda akan mengonversi tabel ke format delta menggunakan Spark SQL dan mengonfirmasi bahwa tabel dikonversi dengan benar.

parquet_id = random.randint(0,1000)
parquet_path = "/parquet/parquet-table-{0}-{1}".format(session_id, parquet_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
spark.sql("CONVERT TO DELTA parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetId =  (new Random()).Next(10000000);
var parquetPath = $"/parquet/parquet-table-{sessionId}-{parquetId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath);
spark.Sql($"CONVERT TO DELTA parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath);
val parquetId = scala.util.Random.nextInt(1000)
val parquetPath = s"/parquet/parquet-table-$sessionId-$parquetId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)
spark.sql(s"CONVERT TO DELTA parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)

menghasilkan:

True

Untuk dokumentasi lengkap, lihat Halaman Dokumentasi Delta Lake

Untuk informasi selengkapnya, lihat Proyek Delta Lake.

Langkah berikutnya