Tutorial: Delta Lake
Tutorial ini memperkenalkan operasi Delta Lake umum di Azure Databricks, termasuk yang berikut ini:
- Membuat tabel.
- Upsert pada tabel.
- Membaca dari tabel.
- Menampilkan riwayat tabel.
- Mengkueri versi tabel yang lebih lama.
- Mengoptimalkan tabel.
- Menambahkan indeks Z-order.
- Vakum file yang tidak direferensikan.
Anda dapat menjalankan contoh python, R, Scala, dan kode SQL dalam artikel ini dari dalam buku catatan yang dilampirkan ke kluster Azure Databricks. Anda juga bisa menjalankan kode SQL dalam artikel ini dari dalam kueri yang terkait dengan gudang SQL pada Databricks SQL.
Catatan
Beberapa contoh kode berikut menggunakan notasi namespace dua tingkat yang terdiri dari skema (juga disebut database) dan tabel atau tampilan (misalnya, default.people10m
). Untuk menggunakan contoh ini dengan Katalog Unity, ganti namespace dua tingkat dengan notasi namespace tiga tingkat Unity Catalog yang terdiri dari katalog, skema, dan tabel atau tampilan (misalnya, main.default.people10m
).
Membuat tabel
Semua tabel yang dibuat pada Azure Databricks menggunakan Delta Lake secara default.
Catatan
Delta Lake adalah default untuk semua perintah baca, tulis, dan pembuatan tabel Azure Databricks.
Python
# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)
R
library(SparkR)
sparkR.session()
# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")
# Write the data to a table.
table_name = "people_10m"
saveAsTable(
df = df,
tableName = table_name
)
Scala
// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
// Write the data to a table.
val table_name = "people_10m"
people.write.saveAsTable("people_10m")
SQL
DROP TABLE IF EXISTS people_10m;
CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;
Operasi sebelumnya membuat tabel terkelola yang baru menggunakan skema yang disimpulkan dari data. Untuk informasi tentang opsi yang tersedia saat Anda membuat tabel Delta, lihat CREATE TABLE.
Untuk tabel terkelola, Azure Databricks menentukan lokasi untuk data. Untuk mendapatkan lokasi, Anda dapat menggunakan pernyataan JELASKAN DETAIL, misalnya:
Python
display(spark.sql('DESCRIBE DETAIL people_10m'))
R
display(sql("DESCRIBE DETAIL people_10m"))
Scala
display(spark.sql("DESCRIBE DETAIL people_10m"))
SQL
DESCRIBE DETAIL people_10m;
Terkadang Anda mungkin ingin membuat tabel dengan menentukan skema sebelum menyisipkan data. Anda dapat menyelesaikan ini dengan perintah SQL berikut:
CREATE TABLE IF NOT EXISTS people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
CREATE OR REPLACE TABLE people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
Di Databricks Runtime 13.3 LTS ke atas, Anda dapat menggunakan CREATE TABLE LIKE
untuk membuat tabel Delta kosong baru yang menduplikasi properti skema dan tabel untuk tabel Delta sumber. Ini bisa sangat berguna saat mempromosikan tabel dari lingkungan pengembangan ke dalam produksi, seperti dalam contoh kode berikut:
CREATE TABLE prod.people10m LIKE dev.people10m
Anda juga dapat menggunakan DeltaTableBuilder
API di Delta Lake untuk membuat tabel. Dibandingkan dengan API DataFrameWriter, API ini memudahkan untuk menentukan informasi tambahan seperti komentar kolom, properti tabel, dan kolom yang dihasilkan.
Penting
Fitur ini ada di Pratinjau Publik.
Python
# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
.tableName("default.people10m") \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.execute()
# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.property("description", "table with people data") \
.location("/tmp/delta/people10m") \
.execute()
Scala
// Create table in the metastore
DeltaTable.createOrReplace(spark)
.tableName("default.people10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.property("description", "table with people data")
.location("/tmp/delta/people10m")
.execute()
Upsert ke tabel
Untuk menggabungkan serangkaian pembaruan dan penyisipan ke dalam tabel Delta yang ada, Anda menggunakan pernyataan MERGE INTO. Misalnya, pernyataan berikut mengambil data dari tabel sumber dan menggabungkannya ke dalam tabel Delta target. Ketika ada baris yang cocok di kedua tabel, Delta Lake memperbarui kolom data menggunakan ekspresi yang diberikan. Ketika tidak ada baris yang cocok, Delta Lake menambahkan baris baru. Operasi ini dikenal sebagai upsert.
CREATE OR REPLACE TEMP VIEW people_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
Jika Anda menentukan *
, ini memperbarui atau menyisipkan semua kolom dalam tabel target. Oleh karena itu, tindakan ini mengasumsikan bahwa tabel sumber memiliki kolom yang sama dengan yang ada di tabel target, jika tidak kueri akan melemparkan kesalahan analisis.
Anda harus menentukan nilai untuk setiap kolom di tabel Anda saat anda melakukan operasi (misalnya, ketika tidak ada baris yang INSERT
cocok dalam himpunan data yang ada). Namun, Anda tidak perlu memperbarui semua nilai.
Untuk melihat hasilnya, kueri tabel.
SELECT * FROM people_10m WHERE id >= 9999998
Membaca tabel
Anda mengakses data dalam tabel Delta dengan nama tabel atau jalur tabel, seperti yang diperlihatkan dalam contoh berikut:
Python
people_df = spark.read.table(table_name)
display(people_df)
## or
people_df = spark.read.load(table_path)
display(people_df)
R
people_df = tableToDF(table_name)
display(people_df)
Scala
val people_df = spark.read.table(table_name)
display(people_df)
\\ or
val people_df = spark.read.load(table_path)
display(people_df)
SQL
SELECT * FROM people_10m;
SELECT * FROM delta.`<path-to-table`;
Menulis ke tabel
Delta Lake menggunakan sintaks standar untuk menulis data ke tabel.
Untuk menambahkan data baru secara atomik ke tabel Delta yang sudah ada, gunakan append
mode seperti dalam contoh berikut:
SQL
INSERT INTO people10m SELECT * FROM more_people
Python
df.write.mode("append").saveAsTable("people10m")
Scala
df.write.mode("append").saveAsTable("people10m")
Untuk mengganti semua data dalam tabel secara atomik, gunakan overwrite
mode seperti dalam contoh berikut:
SQL
INSERT OVERWRITE TABLE people10m SELECT * FROM more_people
Python
df.write.mode("overwrite").saveAsTable("people10m")
Scala
df.write.mode("overwrite").saveAsTable("people10m")
Memperbarui tabel
Anda dapat memperbarui data yang cocok dengan predikat dalam tabel Delta. Misalnya, dalam tabel bernama people10m
atau jalur di /tmp/delta/people-10m
, untuk mengubah singkatan di kolom gender
dari M
atau F
menjadi Male
atau Female
, Anda dapat menjalankan hal berikut:
SQL
UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
Menghapus dari tabel
Anda dapat menghapus data yang cocok dengan predikat dari tabel Delta. Misalnya, dalam tabel bernama people10m
atau jalur di /tmp/delta/people-10m
, untuk menghapus semua baris yang sesuai dengan orang dengan nilai di kolom birthDate
dari sebelum 1955
, Anda dapat menjalankan hal berikut:
SQL
DELETE FROM people10m WHERE birthDate < '1955-01-01'
DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
Penting
delete
menghapus data dari versi terbaru tabel Delta, tetapi tidak menghapusnya dari penyimpanan fisik hingga versi lama divakum secara eksplisit. Lihat vakum untuk detailnya.
Tampilkan riwayat tabel
Untuk melihat riwayat tabel, gunakan pernyataan JELASKAN RIWAYAT, yang memberikan informasi asal, termasuk versi tabel, operasi, pengguna, dan sebagainya, untuk setiap penulisan ke tabel.
DESCRIBE HISTORY people_10m
Mengkueri versi tabel yang lebih lama (perjalanan waktu)
Perjalanan waktu Delta Lake memungkinkan Anda untuk menanyakan snapshot yang lebih lama dari tabel Delta.
Untuk mengkueri versi tabel yang lebih lama, tentukan versi atau stempel waktu dalam SELECT
pernyataan. Misalnya, untuk kueri versi 0 dari riwayat di atas, gunakan:
SELECT * FROM people_10m VERSION AS OF 0
or
SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
Untuk stempel waktu, hanya untai (karakter) tanggal atau stempel waktu yang diterima, misalnya, "2019-01-01"
dan "2019-01-01'T'00:00:00.000Z"
.
Opsi DataFrameReader memungkinkan Anda membuat DataFrame dari tabel Delta yang diperbaiki ke versi tabel tertentu, misalnya di Python:
df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")
display(df1)
atau, secara bergantian:
df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")
display(df2)
Untuk detailnya, lihat Bekerja dengan riwayat tabel Delta Lake.
Mengoptimalkan tabel
Setelah Anda melakukan beberapa perubahan pada tabel, Anda mungkin memiliki banyak file kecil. Untuk meningkatkan kecepatan membaca kueri, Anda dapat menggunakan OPTIMIZE
untuk menciutkan file kecil menjadi file yang lebih besar:
OPTIMIZE people_10m
Urutan Z menurut kolom
Untuk meningkatkan performa baca lebih lanjut, Anda dapat menemukan informasi terkait dalam kumpulan file yang sama dengan Z-Ordering. Co-lokalitas ini secara otomatis digunakan oleh algoritma data-skipping Delta Lake untuk secara dramatis mengurangi jumlah data yang perlu dibaca. Untuk data Z-Order, Anda menentukan kolom yang akan dipesan dalam ZORDER BY
klausa. Misalnya, untuk menemukan bersama dengan gender
, jalankan:
OPTIMIZE people_10m
ZORDER BY (gender)
Untuk kumpulan opsi lengkap yang tersedia saat menjalankan OPTIMIZE
, lihat Memampatkan file data dengan pengoptimalan di Delta Lake.
Membersihkan rekam jepret dengan VACUUM
Delta Lake menyediakan isolasi snapshot untuk membaca, yang berarti aman untuk dijalankan OPTIMIZE
bahkan ketika pengguna atau pekerjaan lain sedang menanyakan tabel. Namun akhirnya, Anda harus membersihkan snapshot lama. Anda dapat melakukannya dengan menjalankan perintah VACUUM
berikut:
VACUUM people_10m
Untuk detail tentang penggunaan VACUUM
secara efektif, lihat Menghapus file data yang tidak digunakan dengan vakum.