Bagikan melalui


Tutorial: Delta Lake

Tutorial ini memperkenalkan operasi Delta Lake umum di Azure Databricks, termasuk yang berikut ini:

Anda dapat menjalankan contoh kode Python, Scala, dan SQL dalam artikel ini dari dalam buku catatan yang dilampirkan ke sumber daya komputasi Azure Databricks seperti kluster. Anda juga bisa menjalankan kode SQL dalam artikel ini dari dalam kueri yang terkait dengan gudang SQL pada Databricks SQL.

Menyiapkan data sumber

Tutorial ini bergantung pada himpunan data yang disebut People 10 M. Ini berisi 10 juta catatan fiktif yang menyimpan fakta tentang orang-orang, seperti nama depan dan belakang, tanggal lahir, dan gaji. Tutorial ini mengasumsikan bahwa himpunan data ini berada dalam volume Unity Catalog yang terkait dengan ruang kerja Azure Databricks target Anda.

Untuk mendapatkan himpunan data People 10 M untuk tutorial ini, lakukan hal berikut:

  1. Buka halaman Orang 10 M di Kaggle.
  2. Klik Unduh untuk mengunduh file bernama archive.zip ke komputer lokal Anda.
  3. Ekstrak file bernama export.csv dari archive.zip file. File export.csv berisi data untuk tutorial ini.

Untuk mengunggah export.csv file ke dalam volume, lakukan hal berikut:

  1. Pada bilah samping, klik Katalog.
  2. Di Catalog Explorer, telusuri dan buka volume tempat Anda ingin mengunggah export.csv file.
  3. Klik Unggah ke volume ini.
  4. Seret dan letakkan, atau telusuri dan pilih, export.csv file di komputer lokal Anda.
  5. Klik Unggah.

Dalam contoh kode berikut, ganti /Volumes/main/default/my-volume/export.csv dengan jalur ke export.csv file dalam volume target Anda.

Membuat tabel

Semua tabel yang dibuat pada Azure Databricks menggunakan Delta Lake secara default. Databricks merekomendasikan penggunaan tabel terkelola Unity Catalog.

Dalam contoh kode sebelumnya dan contoh kode berikut, ganti nama main.default.people_10m tabel dengan katalog tiga bagian, skema, dan nama tabel target Anda di Katalog Unity.

Catatan

Delta Lake adalah default untuk semua perintah baca, tulis, dan pembuatan tabel Azure Databricks.

Phyton

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", TimestampType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

# If you know the table does not already exist, you can call this instead:
# df.write.saveAsTable("main.default.people_10m")

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")

// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")

SQL

CREATE OR REPLACE TABLE main.default.people_10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
);

COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );

Operasi sebelumnya membuat tabel terkelola baru. Untuk informasi tentang opsi yang tersedia saat Anda membuat tabel Delta, lihat CREATE TABLE.

Di Databricks Runtime 13.3 LTS ke atas, Anda dapat menggunakan CREATE TABLE LIKE untuk membuat tabel Delta kosong baru yang menduplikasi skema dan properti tabel untuk tabel Delta sumber. Ini bisa sangat berguna saat mempromosikan tabel dari lingkungan pengembangan ke dalam produksi, seperti yang ditunjukkan dalam contoh kode berikut:

CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m

Untuk membuat tabel kosong, Anda juga dapat menggunakan DeltaTableBuilder API di Delta Lake untuk Python dan Scala. Dibandingkan dengan API DataFrameWriter yang setara, API ini memudahkan untuk menentukan informasi tambahan seperti komentar kolom, properti tabel, dan kolom yang dihasilkan.

Penting

Fitur ini ada di Pratinjau Publik.

Phyton

DeltaTable.createIfNotExists(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Scala

DeltaTable.createOrReplace(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Upsert ke tabel

Untuk menggabungkan serangkaian pembaruan dan penyisipan ke dalam tabel Delta yang ada, Anda menggunakan metode untuk Python dan Scala, dan pernyataan untuk SQL. Misalnya, contoh berikut mengambil data dari tabel sumber dan menggabungkannya ke dalam tabel Delta target. Ketika ada baris yang cocok di kedua tabel, Delta Lake memperbarui kolom data menggunakan ekspresi yang diberikan. Ketika tidak ada baris yang cocok, Delta Lake menambahkan baris baru. Operasi ini dikenal sebagai upsert.

Phyton

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", DateType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

data = [
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]

people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")

# ...

from delta.tables import DeltaTable

deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')

(deltaTable.alias("people_10m")
  .merge(
    people_10m_updates.alias("people_10m_updates"),
    "people_10m.id = people_10m_updates.id")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

Scala

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val data = Seq(
  Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
  Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
  Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
  Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
  Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
  Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)

val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")

// ...

import io.delta.tables.DeltaTable

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

deltaTable.as("people_10m")
  .merge(
    people_10m_updates.as("people_10m_updates"),
    "people_10m.id = people_10m_updates.id"
  )
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .execute()

SQL

CREATE OR REPLACE TEMP VIEW people_10m_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

Di SQL, jika Anda menentukan *, ini memperbarui atau menyisipkan semua kolom dalam tabel target, dengan asumsi bahwa tabel sumber memiliki kolom yang sama dengan tabel target. Jika tabel target tidak memiliki kolom yang sama, kueri akan menampilkan kesalahan analisis.

Anda harus menentukan nilai untuk setiap kolom dalam tabel Saat Anda melakukan operasi penyisipan (misalnya, ketika tidak ada baris yang cocok di himpunan data yang ada). Namun, Anda tidak perlu memperbarui semua nilai.

Untuk melihat hasilnya, kueri tabel.

Phyton

df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)

Scala

val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)

SQL

SELECT * FROM main.default.people_10m WHERE id >= 9999998

Membaca tabel

Anda mengakses data dalam tabel Delta dengan nama tabel atau jalur tabel, seperti yang diperlihatkan dalam contoh berikut:

Phyton

people_df = spark.read.table("main.default.people_10m")
display(people_df)

Scala

val people_df = spark.read.table("main.default.people_10m")
display(people_df)

SQL

SELECT * FROM main.default.people_10m;

Menulis ke tabel

Delta Lake menggunakan sintaks standar untuk menulis data ke tabel.

Untuk menambahkan data baru secara atomik ke tabel Delta yang sudah ada, gunakan mode penambahan seperti yang ditunjukkan dalam contoh berikut:

Phyton

df.write.mode("append").saveAsTable("main.default.people_10m")

Scala

df.write.mode("append").saveAsTable("main.default.people_10m")

SQL

INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people

Untuk mengganti semua data dalam tabel, gunakan mode timpa seperti dalam contoh berikut:

Phyton

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

Scala

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

SQL

INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people

Memperbarui tabel

Anda dapat memperbarui data yang cocok dengan predikat dalam tabel Delta. Misalnya, dalam tabel contohpeople_10m, untuk mengubah singkatan di gender kolom dari M atau ke F atau MaleFemale, Anda bisa menjalankan yang berikut ini:

Phyton

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")
)

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

SQL

UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';

Menghapus dari tabel

Anda dapat menghapus data yang cocok dengan predikat dari tabel Delta. Misalnya, dalam tabel contoh people_10m , untuk menghapus semua baris yang terkait dengan orang-orang dengan nilai di birthDate kolom dari sebelumnya 1955, Anda bisa menjalankan yang berikut ini:

Phyton

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

SQL

DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'

Penting

Penghapusan menghapus data dari versi terbaru tabel Delta tetapi tidak menghapusnya dari penyimpanan fisik sampai versi lama secara eksplisit dikosongkan. Lihat vakum untuk detailnya.

Tampilkan riwayat tabel

Untuk melihat riwayat tabel, Anda menggunakan metode DeltaTable.history untuk Python dan Scala, dan pernyataan DESCRIBE HISTORY di SQL, yang menyediakan informasi yang terbukti, termasuk versi tabel, operasi, pengguna, dan sebagainya, untuk setiap penulisan ke tabel.

Phyton

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

SQL

DESCRIBE HISTORY main.default.people_10m

Mengkueri versi tabel yang lebih lama (perjalanan waktu)

Perjalanan waktu Delta Lake memungkinkan Anda untuk menanyakan snapshot yang lebih lama dari tabel Delta.

Untuk mengkueri versi tabel yang lebih lama, tentukan versi tabel atau tanda waktu. Misalnya, untuk mengkueri versi 0 atau tanda 2024-05-15T22:43:15.000+00:00Z waktu dari riwayat sebelumnya, gunakan yang berikut ini:

Phyton

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

Untuk tanda waktu, hanya string tanggal atau tanda waktu yang diterima, misalnya, "2024-05-15T22:43:15.000+00:00" atau "2024-05-15 22:43:15".

Opsi DataFrameReader memungkinkan Anda membuat DataFrame dari tabel Delta yang diperbaiki ke versi atau tanda waktu tabel tertentu, misalnya:

Phyton

df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")

display(df)

Scala

val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")

display(df)

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'

Untuk detailnya, lihat Bekerja dengan riwayat tabel.

Mengoptimalkan tabel

Setelah Anda melakukan beberapa perubahan pada tabel, Anda mungkin memiliki banyak file kecil. Untuk meningkatkan kecepatan kueri baca, Anda dapat menggunakan operasi optimalkan untuk menciutkan file kecil menjadi yang lebih besar:

Phyton

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

SQL

OPTIMIZE main.default.people_10m

Catatan

Jika pengoptimalan prediktif diaktifkan, Anda tidak perlu berjalan OPTIMIZE secara manual. Pengoptimalan prediktif secara otomatis mengelola tugas pemeliharaan. Untuk informasi selengkapnya, lihat Pengoptimalan prediktif untuk tabel terkelola Unity Catalog.

Urutan Z menurut kolom

Untuk meningkatkan performa baca lebih lanjut, Anda dapat menyusun informasi terkait dalam kumpulan file yang sama dengan urutan z. Algoritma lompati data Delta Lake menggunakan kolokasi ini untuk mengurangi jumlah data yang perlu dibaca secara dramatis. Untuk data z-order, Anda menentukan kolom yang akan diurutkan dalam operasi z-order by. Misalnya, untuk mengalokasikan menurut gender, jalankan:

Phyton

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

SQL

OPTIMIZE main.default.people_10m
ZORDER BY (gender)

Untuk kumpulan opsi lengkap yang tersedia saat menjalankan operasi pengoptimalan, lihat Mengoptimalkan tata letak file data.

Membersihkan rekam jepret dengan VACUUM

Delta Lake menyediakan isolasi rekam jepret untuk bacaan, yang berarti aman untuk menjalankan operasi pengoptimalan bahkan saat pengguna atau pekerjaan lain mengkueri tabel. Namun akhirnya, Anda harus membersihkan snapshot lama. Anda dapat melakukan ini dengan menjalankan operasi vakum:

Phyton

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

SQL

VACUUM main.default.people_10m

Untuk detail tentang menggunakan operasi vakum secara efektif, lihat Menghapus file data yang tidak digunakan dengan vakum.