Tutorial: Delta Lake

Tutorial ini memperkenalkan operasi Delta Lake umum di Azure Databricks, termasuk yang berikut ini:

Anda dapat menjalankan contoh python, R, Scala, dan kode SQL dalam artikel ini dari dalam buku catatan yang dilampirkan ke kluster Azure Databricks. Anda juga bisa menjalankan kode SQL dalam artikel ini dari dalam kueri yang terkait dengan gudang SQL pada Databricks SQL.

Catatan

Beberapa contoh kode berikut menggunakan notasi namespace dua tingkat yang terdiri dari skema (juga disebut database) dan tabel atau tampilan (misalnya, default.people10m). Untuk menggunakan contoh ini dengan Katalog Unity, ganti namespace dua tingkat dengan notasi namespace tiga tingkat Unity Catalog yang terdiri dari katalog, skema, dan tabel atau tampilan (misalnya, main.default.people10m).

Membuat tabel

Semua tabel yang dibuat pada Azure Databricks menggunakan Delta Lake secara default.

Catatan

Delta Lake adalah default untuk semua perintah baca, tulis, dan pembuatan tabel Azure Databricks.

Python

# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)

R

library(SparkR)
sparkR.session()

# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"

saveAsTable(
  df = df,
  tableName = table_name
)

Scala

// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

// Write the data to a table.
val table_name = "people_10m"

people.write.saveAsTable("people_10m")

SQL

DROP TABLE IF EXISTS people_10m;

CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;

Operasi sebelumnya membuat tabel terkelola yang baru menggunakan skema yang disimpulkan dari data. Untuk informasi tentang opsi yang tersedia saat Anda membuat tabel Delta, lihat CREATE TABLE.

Untuk tabel terkelola, Azure Databricks menentukan lokasi untuk data. Untuk mendapatkan lokasi, Anda dapat menggunakan pernyataan JELASKAN DETAIL, misalnya:

Python

display(spark.sql('DESCRIBE DETAIL people_10m'))

R

display(sql("DESCRIBE DETAIL people_10m"))

Scala

display(spark.sql("DESCRIBE DETAIL people_10m"))

SQL

DESCRIBE DETAIL people_10m;

Terkadang Anda mungkin ingin membuat tabel dengan menentukan skema sebelum menyisipkan data. Anda dapat menyelesaikan ini dengan perintah SQL berikut:

CREATE TABLE IF NOT EXISTS people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

CREATE OR REPLACE TABLE people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

Di Databricks Runtime 13.3 LTS ke atas, Anda dapat menggunakan CREATE TABLE LIKE untuk membuat tabel Delta kosong baru yang menduplikasi properti skema dan tabel untuk tabel Delta sumber. Ini bisa sangat berguna saat mempromosikan tabel dari lingkungan pengembangan ke dalam produksi, seperti dalam contoh kode berikut:

CREATE TABLE prod.people10m LIKE dev.people10m

Anda juga dapat menggunakan DeltaTableBuilder API di Delta Lake untuk membuat tabel. Dibandingkan dengan API DataFrameWriter, API ini memudahkan untuk menentukan informasi tambahan seperti komentar kolom, properti tabel, dan kolom yang dihasilkan.

Penting

Fitur ini ada di Pratinjau Publik.

Python

# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
  .tableName("default.people10m") \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .execute()

# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .property("description", "table with people data") \
  .location("/tmp/delta/people10m") \
  .execute()

Scala

// Create table in the metastore
DeltaTable.createOrReplace(spark)
  .tableName("default.people10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .property("description", "table with people data")
  .location("/tmp/delta/people10m")
  .execute()

Upsert ke tabel

Untuk menggabungkan serangkaian pembaruan dan penyisipan ke dalam tabel Delta yang ada, Anda menggunakan pernyataan MERGE INTO. Misalnya, pernyataan berikut mengambil data dari tabel sumber dan menggabungkannya ke dalam tabel Delta target. Ketika ada baris yang cocok di kedua tabel, Delta Lake memperbarui kolom data menggunakan ekspresi yang diberikan. Ketika tidak ada baris yang cocok, Delta Lake menambahkan baris baru. Operasi ini dikenal sebagai upsert.

CREATE OR REPLACE TEMP VIEW people_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

Jika Anda menentukan *, ini memperbarui atau menyisipkan semua kolom dalam tabel target. Oleh karena itu, tindakan ini mengasumsikan bahwa tabel sumber memiliki kolom yang sama dengan yang ada di tabel target, jika tidak kueri akan melemparkan kesalahan analisis.

Anda harus menentukan nilai untuk setiap kolom di tabel Anda saat anda melakukan operasi (misalnya, ketika tidak ada baris yang INSERT cocok dalam himpunan data yang ada). Namun, Anda tidak perlu memperbarui semua nilai.

Untuk melihat hasilnya, kueri tabel.

SELECT * FROM people_10m WHERE id >= 9999998

Membaca tabel

Anda mengakses data dalam tabel Delta dengan nama tabel atau jalur tabel, seperti yang diperlihatkan dalam contoh berikut:

Python

people_df = spark.read.table(table_name)

display(people_df)

## or

people_df = spark.read.load(table_path)

display(people_df)

R

people_df = tableToDF(table_name)

display(people_df)

Scala

val people_df = spark.read.table(table_name)

display(people_df)

\\ or

val people_df = spark.read.load(table_path)

display(people_df)

SQL

SELECT * FROM people_10m;

SELECT * FROM delta.`<path-to-table`;

Menulis ke tabel

Delta Lake menggunakan sintaks standar untuk menulis data ke tabel.

Untuk menambahkan data baru secara atomik ke tabel Delta yang sudah ada, gunakan append mode seperti dalam contoh berikut:

SQL

INSERT INTO people10m SELECT * FROM more_people

Python

df.write.mode("append").saveAsTable("people10m")

Scala

df.write.mode("append").saveAsTable("people10m")

Untuk mengganti semua data dalam tabel secara atomik, gunakan overwrite mode seperti dalam contoh berikut:

SQL

INSERT OVERWRITE TABLE people10m SELECT * FROM more_people

Python

df.write.mode("overwrite").saveAsTable("people10m")

Scala

df.write.mode("overwrite").saveAsTable("people10m")

Memperbarui tabel

Anda dapat memperbarui data yang cocok dengan predikat dalam tabel Delta. Misalnya, dalam tabel bernama people10m atau jalur di /tmp/delta/people-10m, untuk mengubah singkatan di kolom gender dari M atau F menjadi Male atau Female, Anda dapat menjalankan hal berikut:

SQL

UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';

UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

Menghapus dari tabel

Anda dapat menghapus data yang cocok dengan predikat dari tabel Delta. Misalnya, dalam tabel bernama people10m atau jalur di /tmp/delta/people-10m, untuk menghapus semua baris yang sesuai dengan orang dengan nilai di kolom birthDate dari sebelum 1955, Anda dapat menjalankan hal berikut:

SQL

DELETE FROM people10m WHERE birthDate < '1955-01-01'

DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

Penting

delete menghapus data dari versi terbaru tabel Delta, tetapi tidak menghapusnya dari penyimpanan fisik hingga versi lama divakum secara eksplisit. Lihat vakum untuk detailnya.

Tampilkan riwayat tabel

Untuk melihat riwayat tabel, gunakan pernyataan JELASKAN RIWAYAT, yang memberikan informasi asal, termasuk versi tabel, operasi, pengguna, dan sebagainya, untuk setiap penulisan ke tabel.

DESCRIBE HISTORY people_10m

Mengkueri versi tabel yang lebih lama (perjalanan waktu)

Perjalanan waktu Delta Lake memungkinkan Anda untuk menanyakan snapshot yang lebih lama dari tabel Delta.

Untuk mengkueri versi tabel yang lebih lama, tentukan versi atau stempel waktu dalam SELECT pernyataan. Misalnya, untuk kueri versi 0 dari riwayat di atas, gunakan:

SELECT * FROM people_10m VERSION AS OF 0

or

SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

Untuk stempel waktu, hanya untai (karakter) tanggal atau stempel waktu yang diterima, misalnya, "2019-01-01" dan "2019-01-01'T'00:00:00.000Z".

Opsi DataFrameReader memungkinkan Anda membuat DataFrame dari tabel Delta yang diperbaiki ke versi tabel tertentu, misalnya di Python:

df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")

display(df1)

atau, secara bergantian:

df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")

display(df2)

Untuk detailnya, lihat Bekerja dengan riwayat tabel Delta Lake.

Mengoptimalkan tabel

Setelah Anda melakukan beberapa perubahan pada tabel, Anda mungkin memiliki banyak file kecil. Untuk meningkatkan kecepatan membaca kueri, Anda dapat menggunakan OPTIMIZE untuk menciutkan file kecil menjadi file yang lebih besar:

OPTIMIZE people_10m

Urutan Z menurut kolom

Untuk meningkatkan performa baca lebih lanjut, Anda dapat menemukan informasi terkait dalam kumpulan file yang sama dengan Z-Ordering. Co-lokalitas ini secara otomatis digunakan oleh algoritma data-skipping Delta Lake untuk secara dramatis mengurangi jumlah data yang perlu dibaca. Untuk data Z-Order, Anda menentukan kolom yang akan dipesan dalam ZORDER BY klausa. Misalnya, untuk menemukan bersama dengan gender, jalankan:

OPTIMIZE people_10m
ZORDER BY (gender)

Untuk kumpulan opsi lengkap yang tersedia saat menjalankan OPTIMIZE, lihat Memampatkan file data dengan pengoptimalan di Delta Lake.

Membersihkan rekam jepret dengan VACUUM

Delta Lake menyediakan isolasi snapshot untuk membaca, yang berarti aman untuk dijalankan OPTIMIZE bahkan ketika pengguna atau pekerjaan lain sedang menanyakan tabel. Namun akhirnya, Anda harus membersihkan snapshot lama. Anda dapat melakukannya dengan menjalankan perintah VACUUM berikut:

VACUUM people_10m

Untuk detail tentang penggunaan VACUUM secara efektif, lihat Menghapus file data yang tidak digunakan dengan vakum.