Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Tutorial ini memperkenalkan operasi Delta Lake umum di Azure Databricks, termasuk yang berikut ini:
- Membuat tabel.
- Upsert pada tabel.
- Membaca dari tabel.
- Menampilkan riwayat tabel.
- Mengkueri versi tabel yang lebih lama.
- Mengoptimalkan tabel.
- Menambahkan indeks Z-order.
- Vakum file yang tidak direferensikan.
Anda dapat menjalankan contoh kode Python, Scala, dan SQL dalam artikel ini dari dalam buku catatan yang dilampirkan ke sumber daya komputasi Azure Databricks seperti kluster. Anda juga bisa menjalankan kode SQL dalam artikel ini dari dalam kueri yang terkait dengan gudang SQL pada Databricks SQL.
Menyiapkan data sumber
Tutorial ini bergantung pada himpunan data yang disebut People 10 M. Ini berisi 10 juta catatan fiktif yang menyimpan fakta tentang orang-orang, seperti nama depan dan belakang, tanggal lahir, dan gaji. Tutorial ini mengasumsikan bahwa himpunan data ini berada dalam volume Unity Catalog yang terkait dengan ruang kerja Azure Databricks target Anda.
Untuk mendapatkan himpunan data People 10 M untuk tutorial ini, lakukan hal berikut:
- Buka halaman Orang 10 M di Kaggle.
- Klik Unduh untuk mengunduh file bernama
archive.zipke komputer lokal Anda. - Ekstrak file bernama
export.csvdariarchive.zipfile. Fileexport.csvberisi data untuk tutorial ini.
Untuk mengunggah export.csv file ke dalam volume, lakukan hal berikut:
- Pada bilah samping, klik Katalog.
- Di Catalog Explorer, telusuri dan buka volume tempat Anda ingin mengunggah
export.csvfile. - Klik Unggah ke volume ini.
- Seret dan letakkan, atau telusuri dan pilih,
export.csvfile di komputer lokal Anda. - Klik Unggah.
Dalam contoh kode berikut, ganti /Volumes/main/default/my-volume/export.csv dengan jalur ke export.csv file dalam volume target Anda.
Membuat tabel
Semua tabel yang dibuat pada Azure Databricks menggunakan Delta Lake secara default. Databricks merekomendasikan penggunaan tabel terkelola Unity Catalog.
Dalam contoh kode sebelumnya dan contoh kode berikut, ganti nama main.default.people_10m tabel dengan katalog tiga bagian, skema, dan nama tabel target Anda di Katalog Unity.
Catatan
Delta Lake adalah default untuk semua perintah baca, tulis, dan pembuatan tabel Azure Databricks.
Phyton
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", TimestampType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")
# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
# If you know the table does not already exist, you can call this instead:
# df.write.saveAsTable("main.default.people_10m")
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")
// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")
SQL
CREATE OR REPLACE TABLE main.default.people_10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
);
COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );
Operasi sebelumnya membuat tabel terkelola baru. Untuk informasi tentang opsi yang tersedia saat Anda membuat tabel Delta, lihat CREATE TABLE.
Di Databricks Runtime 13.3 LTS ke atas, Anda dapat menggunakan CREATE TABLE LIKE untuk membuat tabel Delta kosong baru yang menduplikasi skema dan properti tabel untuk tabel Delta sumber. Ini bisa sangat berguna saat mempromosikan tabel dari lingkungan pengembangan ke dalam produksi, seperti yang ditunjukkan dalam contoh kode berikut:
CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m
Untuk membuat tabel kosong, Anda juga dapat menggunakan DeltaTableBuilder API di Delta Lake untuk Python dan Scala. Dibandingkan dengan API DataFrameWriter yang setara, API ini memudahkan untuk menentukan informasi tambahan seperti komentar kolom, properti tabel, dan kolom yang dihasilkan.
Penting
Fitur ini ada di Pratinjau Publik.
Phyton
DeltaTable.createIfNotExists(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
Scala
DeltaTable.createOrReplace(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
Upsert ke tabel
Untuk menggabungkan serangkaian pembaruan dan penyisipan ke dalam tabel Delta yang ada, Anda menggunakan metode
Phyton
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", DateType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
data = [
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]
people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")
# ...
from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')
(deltaTable.alias("people_10m")
.merge(
people_10m_updates.alias("people_10m_updates"),
"people_10m.id = people_10m_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
Scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val data = Seq(
Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)
val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")
// ...
import io.delta.tables.DeltaTable
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.as("people_10m")
.merge(
people_10m_updates.as("people_10m_updates"),
"people_10m.id = people_10m_updates.id"
)
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
SQL
CREATE OR REPLACE TEMP VIEW people_10m_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
Di SQL, jika Anda menentukan *, ini memperbarui atau menyisipkan semua kolom dalam tabel target, dengan asumsi bahwa tabel sumber memiliki kolom yang sama dengan tabel target. Jika tabel target tidak memiliki kolom yang sama, kueri akan menampilkan kesalahan analisis.
Anda harus menentukan nilai untuk setiap kolom dalam tabel Saat Anda melakukan operasi penyisipan (misalnya, ketika tidak ada baris yang cocok di himpunan data yang ada). Namun, Anda tidak perlu memperbarui semua nilai.
Untuk melihat hasilnya, kueri tabel.
Phyton
df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)
Scala
val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)
SQL
SELECT * FROM main.default.people_10m WHERE id >= 9999998
Membaca tabel
Anda mengakses data dalam tabel Delta dengan nama tabel atau jalur tabel, seperti yang diperlihatkan dalam contoh berikut:
Phyton
people_df = spark.read.table("main.default.people_10m")
display(people_df)
Scala
val people_df = spark.read.table("main.default.people_10m")
display(people_df)
SQL
SELECT * FROM main.default.people_10m;
Menulis ke tabel
Delta Lake menggunakan sintaks standar untuk menulis data ke tabel.
Untuk menambahkan data baru secara atomik ke tabel Delta yang sudah ada, gunakan mode penambahan seperti yang ditunjukkan dalam contoh berikut:
Phyton
df.write.mode("append").saveAsTable("main.default.people_10m")
Scala
df.write.mode("append").saveAsTable("main.default.people_10m")
SQL
INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people
Untuk mengganti semua data dalam tabel, gunakan mode timpa seperti dalam contoh berikut:
Phyton
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
Scala
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
SQL
INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people
Memperbarui tabel
Anda dapat memperbarui data yang cocok dengan predikat dalam tabel Delta. Misalnya, dalam tabel contohpeople_10m, untuk mengubah singkatan di gender kolom dari M atau ke F atau MaleFemale, Anda bisa menjalankan yang berikut ini:
Phyton
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
)
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
SQL
UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';
Menghapus dari tabel
Anda dapat menghapus data yang cocok dengan predikat dari tabel Delta. Misalnya, dalam tabel contoh people_10m , untuk menghapus semua baris yang terkait dengan orang-orang dengan nilai di birthDate kolom dari sebelumnya 1955, Anda bisa menjalankan yang berikut ini:
Phyton
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
SQL
DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'
Penting
Penghapusan menghapus data dari versi terbaru tabel Delta tetapi tidak menghapusnya dari penyimpanan fisik sampai versi lama secara eksplisit dikosongkan. Lihat vakum untuk detailnya.
Tampilkan riwayat tabel
Untuk melihat riwayat tabel, Anda menggunakan metode DeltaTable.history untuk Python dan Scala, dan pernyataan DESCRIBE HISTORY di SQL, yang menyediakan informasi yang terbukti, termasuk versi tabel, operasi, pengguna, dan sebagainya, untuk setiap penulisan ke tabel.
Phyton
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
SQL
DESCRIBE HISTORY main.default.people_10m
Mengkueri versi tabel yang lebih lama (perjalanan waktu)
Perjalanan waktu Delta Lake memungkinkan Anda untuk menanyakan snapshot yang lebih lama dari tabel Delta.
Untuk mengkueri versi tabel yang lebih lama, tentukan versi tabel atau tanda waktu. Misalnya, untuk mengkueri versi 0 atau tanda 2024-05-15T22:43:15.000+00:00Z waktu dari riwayat sebelumnya, gunakan yang berikut ini:
Phyton
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
Untuk tanda waktu, hanya string tanggal atau tanda waktu yang diterima, misalnya, "2024-05-15T22:43:15.000+00:00" atau "2024-05-15 22:43:15".
Opsi DataFrameReader memungkinkan Anda membuat DataFrame dari tabel Delta yang diperbaiki ke versi atau tanda waktu tabel tertentu, misalnya:
Phyton
df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")
display(df)
Scala
val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")
display(df)
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'
Untuk detailnya, lihat Bekerja dengan riwayat tabel.
Mengoptimalkan tabel
Setelah Anda melakukan beberapa perubahan pada tabel, Anda mungkin memiliki banyak file kecil. Untuk meningkatkan kecepatan kueri baca, Anda dapat menggunakan operasi optimalkan untuk menciutkan file kecil menjadi yang lebih besar:
Phyton
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
SQL
OPTIMIZE main.default.people_10m
Catatan
Jika pengoptimalan prediktif diaktifkan, Anda tidak perlu berjalan OPTIMIZE secara manual. Pengoptimalan prediktif secara otomatis mengelola tugas pemeliharaan. Untuk informasi selengkapnya, lihat Pengoptimalan prediktif untuk tabel terkelola Unity Catalog.
Urutan Z menurut kolom
Untuk meningkatkan performa baca lebih lanjut, Anda dapat menyusun informasi terkait dalam kumpulan file yang sama dengan urutan z. Algoritma lompati data Delta Lake menggunakan kolokasi ini untuk mengurangi jumlah data yang perlu dibaca secara dramatis. Untuk data z-order, Anda menentukan kolom yang akan diurutkan dalam operasi z-order by. Misalnya, untuk mengalokasikan menurut gender, jalankan:
Phyton
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
SQL
OPTIMIZE main.default.people_10m
ZORDER BY (gender)
Untuk kumpulan opsi lengkap yang tersedia saat menjalankan operasi pengoptimalan, lihat Mengoptimalkan tata letak file data.
Membersihkan rekam jepret dengan VACUUM
Delta Lake menyediakan isolasi rekam jepret untuk bacaan, yang berarti aman untuk menjalankan operasi pengoptimalan bahkan saat pengguna atau pekerjaan lain mengkueri tabel. Namun akhirnya, Anda harus membersihkan snapshot lama. Anda dapat melakukan ini dengan menjalankan operasi vakum:
Phyton
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
SQL
VACUUM main.default.people_10m
Untuk detail tentang menggunakan operasi vakum secara efektif, lihat Menghapus file data yang tidak digunakan dengan vakum.