Bagikan melalui


Tutorial: Membuat dan mengelola tabel Delta Lake

Tutorial ini menunjukkan operasi tabel Delta umum menggunakan data sampel. Delta Lake adalah lapisan penyimpanan yang dioptimalkan yang menyediakan fondasi untuk tabel pada Databricks. Kecuali ditentukan lain, semua tabel pada Databricks adalah tabel Delta.

Sebelum Anda mulai

Untuk menyelesaikan tutorial ini, Anda memerlukan:

Contoh-contoh ini mengandalkan himpunan data yang disebut Rekaman Orang Sintetis: 10K hingga 10M Rekaman. Himpunan data ini berisi catatan fiktif orang, termasuk nama depan dan belakang, jenis kelamin, dan usia mereka.

Pertama, unduh himpunan data untuk tutorial ini.

  1. Kunjungi halaman Catatan Rekor Orang Sintetis: 10K hingga 10M Rekor di Kaggle.
  2. Klik Unduh lalu Unduh himpunan data sebagai zip. Ini mengunduh file bernama archive.zip ke komputer lokal Anda.
  3. Ekstrak folder archive dari berkas archive.zip.

Selanjutnya, unggah himpunan person_10000.csv data ke volume Unity Catalog dalam ruang kerja Azure Databricks Anda. Azure Databricks merekomendasikan untuk mengunggah data Anda ke volume Unity Catalog karena volume menyediakan kemampuan untuk mengakses, menyimpan, mengendalikan, dan mengatur file.

  1. Buka Catalog Explorer dengan mengklik ikon Data.Katalog di bilah samping.
  2. Di Penjelajah Katalog, klik ikon Tambahkan atau plusTambahkan data dan Buat volume.
  3. Beri nama volume my-volume dan pilih Volume terkelola sebagai jenis volume.
  4. workspace Pilih katalog dan default skema, lalu klik Buat.
  5. Buka my-volume dan klik Unggah ke volume ini.
  6. Seret dan letakkan atau telusuri dan pilih person_10000.csv file dari dalam archive folder di komputer lokal Anda.
  7. Klik Unggah.

Terakhir, buat buku catatan untuk menjalankan kode sampel.

  1. Klik Tambahkan atau tambah ikonBaru di bilah samping.
  2. Klik ikon Buku Catatan.Notebook untuk membuat buku catatan baru.
  3. Pilih bahasa untuk buku catatan.

Membuat tabel

Buat tabel terkelola Unity Catalog baru bernama workspace.default.people_10k dari person_10000.csv. Delta Lake adalah default untuk semua perintah pembuatan, baca, dan tulis tabel di Azure Databricks.

Phyton

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("age", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/workspace/default/my-volume/person_10000.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()

# If you know the table does not already exist, you can use this command instead.
# df.write.saveAsTable("workspace.default.people_10k")

# View the new table.
df = spark.read.table("workspace.default.people_10k")
display(df)

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("id", IntegerType, true),
  StructField("firstName", StringType, true),
  StructField("lastName", StringType, true),
  StructField("gender", StringType, true),
  StructField("age", IntegerType, true)
))

val df = spark.read
  .format("csv")
  .option("header", true)
  .schema(schema)
  .load("/Volumes/workspace/default/my-volume/person_10000.csv")

// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()

// If you know the table does not already exist, you can use this command instead.
// df.saveAsTable("workspace.default.people_10k")

// View the new table.
val df2 = spark.read.table("workspace.default.people_10k")
display(df2)

SQL

-- Create the table with only the required columns and rename person_id to id.
CREATE OR REPLACE TABLE workspace.default.people_10k AS
SELECT
  person_id AS id,
  firstname,
  lastname,
  gender,
  age
FROM read_files(
  '/Volumes/workspace/default/my-volume/person_10000.csv',
  format => 'csv',
  header => true
);

-- View the new table.
SELECT * FROM workspace.default.people_10k;

Ada beberapa cara berbeda untuk membuat atau mengkloning tabel. Untuk informasi selengkapnya, lihat CREATE TABLE .

Di Databricks Runtime 13.3 LTS ke atas, Anda dapat menggunakan CREATE TABLE LIKE untuk membuat tabel Delta kosong baru yang menduplikasi properti skema dan tabel dari tabel Delta sumber. Ini dapat berguna saat mempromosikan tabel dari lingkungan pengembangan ke lingkungan produksi.

CREATE TABLE workspace.default.people_10k_prod LIKE workspace.default.people_10k

Penting

Fitur ini ada di Pratinjau Publik.

DeltaTableBuilder Gunakan API untuk Python dan Scala untuk membuat tabel kosong. Dibandingkan DataFrameWriter dengan dan DataFrameWriterV2, DeltaTableBuilder API memudahkan untuk menentukan informasi tambahan seperti komentar kolom, properti tabel, dan kolom yang dihasilkan.

Phyton

from delta.tables import DeltaTable

(
  DeltaTable.createIfNotExists(spark)
    .tableName("workspace.default.people_10k_2")
    .addColumn("id", "INT")
    .addColumn("firstName", "STRING")
    .addColumn("lastName", "STRING", comment="surname")
    .addColumn("gender", "STRING")
    .addColumn("age", "INT")
    .execute()
)

display(spark.read.table("workspace.default.people_10k_2"))

Scala

import io.delta.tables.DeltaTable

DeltaTable.createOrReplace(spark)
  .tableName("workspace.default.people_10k")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build()
  )
  .addColumn("gender", "STRING")
  .addColumn("age", "INT")
  .execute()

display(spark.read.table("workspace.default.people_10k"))

Upsert ke tabel

Ubah rekaman yang ada dalam tabel atau tambahkan rekaman baru menggunakan operasi yang disebut upsert. Untuk menggabungkan serangkaian pembaruan dan penyisipan ke dalam tabel Delta yang ada, gunakan DeltaTable.merge metode di Python dan Scala dan MERGE INTO pernyataan di SQL.

Misalnya, gabungkan data dari tabel people_10k_updates sumber ke tabel workspace.default.people_10kDelta target . Ketika ada baris yang cocok di kedua tabel, Delta Lake memperbarui kolom data menggunakan ekspresi yang diberikan. Ketika tidak ada baris yang cocok, Delta Lake menambahkan baris baru.

Phyton

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from delta.tables import DeltaTable

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("age", IntegerType(), True)
])

data = [
  (10001, 'Billy', 'Luppitt', 'M', 55),
  (10002, 'Mary', 'Smith', 'F', 98),
  (10003, 'Elias', 'Leadbetter', 'M', 48),
  (10004, 'Jane', 'Doe', 'F', 30),
  (10005, 'Joshua', '', 'M', 90),
  (10006, 'Ginger', '', 'F', 16),
]

# Create the source table if it does not exist. Otherwise, replace the existing source table.
people_10k_updates = spark.createDataFrame(data, schema)
people_10k_updates.createOrReplaceTempView("people_10k_updates")

# Merge the source and target tables.
deltaTable = DeltaTable.forName(spark, 'workspace.default.people_10k')

(deltaTable.alias("people_10k")
  .merge(
    people_10k_updates.alias("people_10k_updates"),
    "people_10k.id = people_10k_updates.id")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

# View the additions to the table.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] >= 10001)
display(df_filtered)

Scala

import org.apache.spark.sql.types._
import io.delta.tables._

// Define schema
val schema = StructType(Array(
  StructField("id", IntegerType, true),
  StructField("firstName", StringType, true),
  StructField("lastName", StringType, true),
  StructField("gender", StringType, true),
  StructField("age", IntegerType, true)
))

// Create data as Seq of Tuples
val data = Seq(
  (10001, "Billy", "Luppitt", "M", 55),
  (10002, "Mary", "Smith", "F", 98),
  (10003, "Elias", "Leadbetter", "M", 48),
  (10004, "Jane", "Doe", "F", 30),
  (10005, "Joshua", "", "M", 90),
  (10006, "Ginger", "", "F", 16)
)

// Create DataFrame directly from Seq of Tuples
val people_10k_updates = spark.createDataFrame(data).toDF(
  "id", "firstName", "lastName", "gender", "age"
)
people_10k_updates.createOrReplaceTempView("people_10k_updates")

// Merge the source and target tables
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

deltaTable.as("people_10k")
  .merge(
    people_10k_updates.as("people_10k_updates"),
    "people_10k.id = people_10k_updates.id"
  )
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .execute()

// View the additions to the table.
val df = spark.read.table("workspace.default.people_10k")
val df_filtered = df.filter($"id" >= 10001)
display(df_filtered)

SQL

-- Create the source table if it does not exist. Otherwise, replace the existing source table.
CREATE OR REPLACE TABLE workspace.default.people_10k_updates(
  id INT,
  firstName STRING,
  lastName STRING,
  gender STRING,
  age INT
);
-- Insert new data into the source table.
INSERT INTO workspace.default.people_10k_updates VALUES
  (10001, "Billy", "Luppitt", "M", 55),
  (10002, "Mary", "Smith", "F", 98),
  (10003, "Elias", "Leadbetter", "M", 48),
  (10004, "Jane", "Doe", "F", 30),
  (10005, "Joshua", "", "M", 90),
  (10006, "Ginger", "", "F", 16);

-- Merge the source and target tables.
MERGE INTO workspace.default.people_10k AS people_10k
USING workspace.default.people_10k_updates AS people_10k_updates
ON people_10k.id = people_10k_updates.id
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *;

-- View the additions to the table.
SELECT * FROM workspace.default.people_10k WHERE id >= 10001

Di SQL, * operator memperbarui atau menyisipkan semua kolom dalam tabel target, dengan asumsi bahwa tabel sumber memiliki kolom yang sama dengan tabel target. Jika tabel target tidak memiliki kolom yang sama, kueri akan menampilkan kesalahan analisis. Selain itu, Anda harus menentukan nilai untuk setiap kolom dalam tabel Saat Anda melakukan operasi penyisipan. Nilai kolom bisa kosong, misalnya, ''. Saat Anda melakukan operasi sisipkan, Anda tidak perlu memperbarui semua nilai.

Membaca tabel

Gunakan nama atau jalur tabel untuk mengakses data dalam tabel Delta. Untuk mengakses tabel terkelola Unity Catalog, gunakan nama tabel yang sepenuhnya memenuhi syarat. Akses berbasis jalur hanya didukung untuk volume dan tabel eksternal, bukan untuk tabel terkelola. Untuk informasi selengkapnya, lihat Aturan jalur dan akses dalam volume Katalog Unity.

Phyton

people_df = spark.read.table("workspace.default.people_10k")
display(people_df)

Scala

val people_df = spark.read.table("workspace.default.people_10k")
display(people_df)

SQL

SELECT * FROM workspace.default.people_10k;

Menulis ke tabel

Delta Lake menggunakan sintaks standar untuk menulis data ke tabel. Untuk menambahkan data baru ke tabel Delta yang sudah ada, gunakan mode penambahan. Tidak seperti upserting, menulis ke tabel tidak memeriksa rekaman duplikat.

Phyton

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("age", IntegerType(), True)
])

data = [
  (10007, 'Miku', 'Hatsune', 'F', 25)
]

# Create the new data.
df  = spark.createDataFrame(data, schema)

# Append the new data to the target table.
df.write.mode("append").saveAsTable("workspace.default.people_10k")

# View the new addition.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] == 10007)
display(df_filtered)

Scala

// Create the new data.
val data = Seq(
  (10007, "Miku", "Hatsune", "F", 25)
)

val df = spark.createDataFrame(data)
  .toDF("id", "firstName", "lastName", "gender", "age")

// Append the new data to the target table
df.write.mode("append").saveAsTable("workspace.default.people_10k")

// View the new addition.
val df2 = spark.read.table("workspace.default.people_10k")
val df_filtered = df2.filter($"id" === 10007)
display(df_filtered)

SQL

CREATE OR REPLACE TABLE workspace.default.people_10k_new (
  id INT,
  firstName STRING,
  lastName STRING,
  gender STRING,
  age INT
);

-- Insert the new data.
INSERT INTO workspace.default.people_10k_new VALUES
  (10007, 'Miku', 'Hatsune', 'F', 25);

-- Append the new data to the target table.
INSERT INTO workspace.default.people_10k
SELECT * FROM workspace.default.people_10k_new;

-- View the new addition.
SELECT * FROM workspace.default.people_10k WHERE id = 10007;

Hasil keluaran sel notebook Databricks menampilkan maksimum 10.000 baris atau 2 MB, tergantung mana yang lebih rendah. Karena workspace.default.people_10k berisi lebih dari 10.000 baris, hanya 10.000 baris pertama yang muncul di output buku catatan untuk display(df). Baris tambahan ada dalam tabel, tetapi tidak dirender dalam output buku catatan karena batas ini. Anda dapat melihat baris tambahan dengan memfilternya secara khusus.

Untuk mengganti semua data dalam tabel, gunakan mode timpa.

Phyton

df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")

Scala

df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")

SQL

INSERT OVERWRITE TABLE workspace.default.people_10k SELECT * FROM workspace.default.people_10k_2

Memperbarui tabel

Memperbarui data dalam tabel Delta berdasarkan predikat. Misalnya, ubah nilai dalam kolom dari gender ke FemaleF, dari Male ke M, dan dari Other ke O.

Phyton

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

# Declare the predicate and update rows using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'Female'",
  set = { "gender": "'F'" }
)

# Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'Male',
  set = { 'gender': lit('M') }
)

deltaTable.update(
  condition = col('gender') == 'Other',
  set = { 'gender': lit('O') }
)

# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

// Declare the predicate and update rows using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'Female'",
  Map("gender" -> "'F'")
)

// Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
  col("gender") === "Male",
  Map("gender" -> lit("M")));

deltaTable.update(
  col("gender") === "Other",
  Map("gender" -> lit("O")));

// View the updated table.
val df = spark.read.table("workspace.default.people_10k")
display(df)

SQL

-- Declare the predicate and update rows.
UPDATE workspace.default.people_10k SET gender = 'F' WHERE gender = 'Female';
UPDATE workspace.default.people_10k SET gender = 'M' WHERE gender = 'Male';
UPDATE workspace.default.people_10k SET gender = 'O' WHERE gender = 'Other';

-- View the updated table.
SELECT * FROM workspace.default.people_10k;

Menghapus dari tabel

Hapus data yang cocok dengan predikat dari tabel Delta. Misalnya, kode di bawah ini menunjukkan dua operasi penghapusan: pertama menghapus baris di mana usia kurang dari 18, lalu menghapus baris di mana usia kurang dari 21.

Phyton

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

# Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")

# Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col('age') < '21')

# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

// Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")

// Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col("age") < "21")

// View the updated table.
val df = spark.read.table("workspace.default.people_10k")
display(df)

SQL

-- Delete rows using a predicate.
DELETE FROM workspace.default.people_10k WHERE age < '21';

-- View the updated table.
SELECT * FROM workspace.default.people_10k;

Penting

Penghapusan menghapus data dari versi terbaru tabel Delta tetapi tidak menghapusnya dari penyimpanan fisik sampai versi lama secara eksplisit dikosongkan. Untuk informasi selengkapnya, lihat vakum.

Tampilkan riwayat tabel

Gunakan metode DeltaTable.history dalam Python dan Scala dan pernyataan DESCRIBE HISTORY di SQL untuk melihat informasi asal-usul untuk setiap penulisan ke tabel.

Phyton

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())

SQL

DESCRIBE HISTORY workspace.default.people_10k

Meminta versi tabel sebelumnya menggunakan fitur perjalanan waktu

Mengkueri rekam jepret tabel Delta yang lebih lama menggunakan perjalanan waktu Delta Lake. Untuk mengkueri versi tertentu, gunakan nomor versi atau tanda waktu tabel. Misalnya, kueri versi 0 atau waktu 2026-01-05T23:09:47.000+00:00 dari riwayat tabel.

Phyton

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaHistory = deltaTable.history()

# Query using the version number.
display(deltaHistory.where("version == 0"))

# Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
val deltaHistory = deltaTable.history()

// Query using the version number.
display(deltaHistory.where("version == 0"))

// Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))

SQL

-- Query using the version number
SELECT * FROM workspace.default.people_10k VERSION AS OF 0;

-- Query using the timestamp
SELECT * FROM workspace.default.people_10k TIMESTAMP AS OF '2026-01-05T23:09:47.000+00:00';

Untuk tanda waktu, hanya string tanggal atau tanda waktu yang diterima. Misalnya, string harus diformat sebagai "2026-01-05T22:43:15.000+00:00" atau "2026-01-05 22:43:15".

Gunakan opsi DataFrameReader untuk membuat DataFrame dari tabel Delta yang diikat ke versi atau tanda waktu tertentu dari tabel tersebut.

Phyton

# Query using the version number.
df = spark.read.option('versionAsOf', 0).table("workspace.default.people_10k")

# Query using the timestamp.
df = spark.read.option('timestampAsOf', '2026-01-05T23:09:47.000+00:00').table("workspace.default.people_10k")

display(df)

Scala

// Query using the version number.
val dfVersion = spark.read
  .option("versionAsOf", 0)
  .table("workspace.default.people_10k")

// Query using the timestamp.
val dfTimestamp = spark.read
  .option("timestampAsOf", "2026-01-05T23:09:47.000+00:00")
  .table("workspace.default.people_10k")

display(dfVersion)
display(dfTimestamp)

SQL

-- Create a temporary view from version 0 of the table.
CREATE OR REPLACE TEMPORARY VIEW people_10k_v0 AS
SELECT * FROM workspace.default.people_10k VERSION AS OF 0;

-- Create a temporary view from a previous timestamp of the table.
CREATE OR REPLACE TEMPORARY VIEW people_10k_t0 AS
SELECT * FROM workspace.default.people_10k TIMESTAMP AS OF '2026-01-05T23:09:47.000+00:00';

SELECT * FROM people_10k_v0;
SELECT * FROM people_10k_t0;

Untuk informasi selengkapnya, lihat Bekerja dengan riwayat tabel.

Mengoptimalkan tabel

Beberapa perubahan pada tabel dapat membuat beberapa file kecil, yang memperlambat performa kueri baca. Gunakan operasi pengoptimalan untuk meningkatkan kecepatan dengan menggabungkan file kecil menjadi yang lebih besar. Lihat OPTIMIZE.

Phyton

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()

SQL

OPTIMIZE workspace.default.people_10k

Catatan

Jika pengoptimalan prediktif diaktifkan, Anda tidak perlu mengoptimalkan secara manual. Pengoptimalan prediktif secara otomatis mengelola tugas pemeliharaan. Untuk informasi selengkapnya, lihat Pengoptimalan prediktif untuk tabel terkelola Unity Catalog.

Urutan Z menurut kolom

Untuk melakukan z-order data dan lebih meningkatkan performa baca, tentukan kolom yang akan diurutkan dalam operasi. Misalnya, kolokasikan oleh kolom kardinalitas tinggi firstName. Untuk informasi selengkapnya tentang urutan z, lihat Melewatkan data.

Phyton

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")

SQL

OPTIMIZE workspace.default.people_10k
ZORDER BY (firstName)

Membersihkan rekam jepret dengan operasi vakum

Delta Lake memiliki isolasi rekam jepret untuk bacaan, yang berarti aman untuk menjalankan operasi pengoptimalan saat pengguna atau pekerjaan lain mengkueri tabel. Namun, Anda akhirnya harus membersihkan rekam jepret lama karena melakukannya mengurangi biaya penyimpanan, meningkatkan performa kueri, dan memastikan kepatuhan data. Jalankan VACUUM operasi untuk membersihkan rekam jepret lama. Lihat VACUUM.

Phyton

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()

SQL

VACUUM workspace.default.people_10k

Untuk informasi selengkapnya tentang menggunakan operasi vakum secara efektif, lihat Menghapus file data yang tidak digunakan dengan vakum.

Langkah selanjutnya