Pelebaran Tipe

Tabel dengan pelesiran tipe diaktifkan memungkinkan Anda mengubah jenis data kolom ke jenis yang lebih luas tanpa menulis ulang file data yang mendasar. Anda dapat mengubah jenis kolom secara manual atau menggunakan evolusi skema untuk mengembangkan jenis kolom.

Penting

Pengembangan tipe tersedia di Databricks Runtime 15.4 LTS dan seterusnya. Tabel dengan perluasan tipe diaktifkan hanya dapat dibaca di Databricks Runtime 15.4 LTS atau yang lebih tinggi.

Pelebaran jenis memerlukan Delta Lake. Semua tabel terkelola Unity Catalog menggunakan Delta Lake secara default.

Perubahan jenis yang didukung

Anda dapat melebarkan jenis sesuai dengan aturan berikut:

Jenis sumber Jenis yang lebih beragam didukung
BYTE SHORT, INT, BIGINT, DECIMAL, DOUBLE
SHORT int,BIGINT,DECIMAL,DOUBLE
INT BIGINT, , DECIMALDOUBLE
BIGINT DECIMAL
FLOAT DOUBLE
DECIMAL DECIMAL dengan lebih presisi dan dalam skala yang lebih besar
DATE TIMESTAMP_NTZ
VOID Tipe apa saja

Perubahan jenis didukung untuk kolom dan bidang tingkat atas yang ditumpuk di dalam struktur, peta, dan array.

Catatan

VOID ke jenis apa pun tidak mengharuskan perluasan tipe dihidupkan pada tabel. Setiap operasi yang memperbarui jenis VOID kolom berhasil tanpa konfigurasi tambahan. VOID perluasan jenis tersedia di Databricks Runtime 18.2 dan seterusnya.

Penting

Spark memotong bagian pecahan dari nilai secara default ketika operasi mempromosikan jenis bilangan bulat ke decimal atau double dan penyerapan hilir menulis nilai kembali ke kolom bilangan bulat. Untuk detail tentang perilaku kebijakan penugasan, lihat Penugasan penyimpanan.

Catatan

Saat mengubah jenis numerik apa pun menjadi decimal, presisi total harus sama dengan atau lebih besar dari presisi awal. Jika Anda juga meningkatkan skala, presisi total harus meningkat dengan jumlah yang sesuai.

Target minimum untuk jenis byte, short, dan int adalah decimal(10,0). Target minimum untuk long adalah decimal(20,0).

Jika Anda ingin menambahkan dua tempat desimal ke bidang dengan decimal(10,1), target minimum adalah decimal(12,3).

Aktifkan perluasan tipe

Anda dapat mengaktifkan pelebaran tipe pada tabel yang ada dengan mengatur delta.enableTypeWidening properti tabel ke true:

  ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true')

Anda juga dapat mengaktifkan pelebaran tipe saat pembuatan tabel.

  CREATE TABLE T(c1 INT) TBLPROPERTIES('delta.enableTypeWidening' = 'true')

Penting

Saat Anda mengaktifkan pelebaran tipe, ini mengatur fitur tabel typeWidening, yang meningkatkan protokol pembaca dan penulis. Anda harus menggunakan Databricks Runtime 15.4 atau lebih tinggi untuk berinteraksi dengan tabel yang memiliki perluasan jenis diaktifkan. Jika klien eksternal juga berinteraksi dengan tabel, verifikasi bahwa klien tersebut mendukung fitur tabel ini. Lihat Kompatibilitas dan protokol fitur Delta Lake.

Menerapkan perubahan jenis secara manual

ALTER COLUMN Gunakan perintah untuk mengubah jenis secara manual:

ALTER TABLE <table_name> ALTER COLUMN <col_name> TYPE <new_type>

Operasi ini memperbarui skema tabel tanpa menulis ulang file data yang mendasar. Lihat ALTER TABLE untuk detail selengkapnya.

Melebarkan jenis dengan evolusi skema otomatis

Evolusi skema berfungsi dengan pelebaran jenis untuk memperbarui jenis data dalam tabel target agar sesuai dengan jenis data masuk.

Catatan

Tanpa pelebaran jenis diaktifkan, evolusi skema selalu mencoba menurunkan data untuk mencocokkan jenis kolom dalam tabel target. Jika Anda tidak ingin secara otomatis memperluas tipe data di tabel sasaran Anda, nonaktifkan memperluas tipe sebelum Anda menjalankan beban kerja dengan perubahan skema diaktifkan.

Untuk menggunakan evolusi skema untuk memperlebar jenis data kolom selama penyerapan, Anda harus memenuhi kondisi berikut:

  • Perintah write berjalan dengan evolusi skema otomatis yang diaktifkan.
  • Tabel target memiliki pelebaran tipe yang diaktifkan.
  • Jenis kolom sumber lebih lebar dari jenis kolom target.
  • Pelebaran tipe mendukung perubahan tipe.

Ketidakcocokan jenis yang tidak memenuhi semua kondisi ini akan mengikuti aturan penerapan skema yang berlaku. Lihat Penegakan skema.

Contoh berikut menunjukkan cara kerja pelebaran tipe dengan evolusi skema untuk operasi penulisan umum.

Python

# Create target table with INT column and source table with BIGINT column
spark.sql("CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true')")
spark.sql("CREATE TABLE source_table (id BIGINT, data STRING)")

# Example 1: Automatic type widening in saveAsTable()
spark.table("source_table").write.mode("append").option("mergeSchema", "true").saveAsTable("target_table")

# Example 2: Automatic type widening in MERGE INTO
from delta.tables import DeltaTable

source_df = spark.table("source_table")
target_table = DeltaTable.forName(spark, "target_table")

(target_table.alias("target")
  .merge(source_df.alias("source"), "target.id = source.id")
  .withSchemaEvolution()
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

Scala

// Create target table with INT column and source table with BIGINT column
spark.sql("CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true')")
spark.sql("CREATE TABLE source_table (id BIGINT, data STRING)")

// Example 1: Automatic type widening in saveAsTable()
spark.table("source_table").write.mode("append").option("mergeSchema", "true").saveAsTable("target_table")

// Example 2: Automatic type widening in MERGE INTO
import io.delta.tables.DeltaTable

val sourceDf = spark.table("source_table")
val targetTable = DeltaTable.forName(spark, "target_table")

targetTable.alias("target")
  .merge(sourceDf.alias("source"), "target.id = source.id")
  .withSchemaEvolution()
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

SQL

-- Create target table with INT column and source table with BIGINT column
CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true');
CREATE TABLE source_table (id BIGINT, data STRING);

-- Example 1: Automatic type widening in INSERT INTO
---- Insert data with BIGINT value column - automatically widens INT to BIGINT
INSERT WITH SCHEMA EVOLUTION INTO target_table SELECT * FROM source_table;

-- Example 2: Automatic type widening in MERGE INTO
MERGE WITH SCHEMA EVOLUTION INTO target_table
USING source_table
ON target_table.id = source_table.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

Pengisi Data Otomatis

Penting

Dukungan perluasan tipe di Auto Loader ada di Pratinjau Umum.

Auto Loader mendukung perluasan tipe dengan evolusi skema otomatis. Saat Anda menggunakan Auto Loader untuk menyerap data ke dalam tabel Delta dengan pelebaran jenis dan evolusi skema diaktifkan, jenis kolom secara otomatis diperlebar agar sesuai dengan data masuk.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .trigger(availableNow=True)
  .toTable("table_name")
)

Lihat Perluasan tipe otomatis dengan Auto Loader. Selain itu, tabel target harus memiliki pelebaran tipe yang diaktifkan. Lihat Mengaktifkan perluasan tipe.

Menonaktifkan fitur pelebaran tabel jenis

Anda dapat mencegah perluasan jenis yang tidak disengaja pada tabel yang telah diaktifkan dengan mengatur properti ke false:

  ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'false')

Pengaturan ini mencegah perubahan tipe ke depan pada tabel, tetapi tidak menghilangkan fitur pelebaran jenis tabel atau membatalkan perubahan tipe sebelumnya.

Jika Anda perlu menghapus sepenuhnya fitur pelebaran tipe di tabel, Anda dapat menggunakan perintah DROP FEATURE seperti yang ditunjukkan dalam contoh berikut:

 ALTER TABLE <table-name> DROP FEATURE 'typeWidening' [TRUNCATE HISTORY]

Catatan

Tabel yang mengaktifkan pelebaran jenis menggunakan Databricks Runtime 15.4 LTS memerlukan penghapusan fitur typeWidening-preview sebagai gantinya.

Saat menghilangkan pelebaran tipe, Databricks menyusun ulang semua file data yang tidak sesuai dengan skema tabel saat ini. Lihat Menghilangkan fitur tabel Delta Lake dan menurunkan protokol tabel.

Streaming dari tabel Delta

Catatan

Dukungan untuk pelebaran jenis dalam Streaming Terstruktur tersedia di Databricks Runtime 16.4 LTS ke atas.

Saat melakukan streaming dari tabel Delta dengan pelebaran tipe diaktifkan, Anda dapat mengonfigurasi pelebaran tipe otomatis untuk kueri streaming dengan mengaktifkan evolusi skema menggunakan opsi mergeSchema pada tabel target. Tabel target harus memiliki perluasan tipe diaktifkan. Lihat Mengaktifkan perluasan tipe.

Python

(spark.readStream
  .table("delta_source_table")
  .writeStream
  .option("checkpointLocation", "/path/to/checkpointLocation")
  .option("mergeSchema", "true")
  .toTable("output_table")
)

Scala

spark.readStream
  .table("delta_source_table")
  .writeStream
  .option("checkpointLocation", "/path/to/checkpointLocation")
  .option("mergeSchema", "true")
  .toTable("output_table")

Ketika mergeSchema diaktifkan dan tabel target memiliki pelebaran tipe diaktifkan:

  • Perubahan jenis diterapkan secara otomatis ke tabel hilir tanpa memerlukan intervensi manual.
  • Kolom baru ditambahkan secara otomatis ke skema tabel hilir.

Tanpa mergeSchema diaktifkan, nilai ditangani sesuai dengan spark.sql.storeAssignmentPolicy konfigurasi, yang secara default menurunkan nilai agar sesuai dengan jenis kolom target. Untuk informasi selengkapnya tentang perilaku kebijakan penugasan, lihat Penugasan Penyimpanan.

Pengakuan perubahan tipe manual

Saat streaming dari tabel Delta, Anda dapat menyediakan lokasi pelacakan skema untuk melacak perubahan skema non-aditif termasuk perubahan jenis. Menyediakan lokasi pelacakan skema diperlukan di Databricks Runtime 18.0 ke bawah, dan bersifat opsional di Databricks Runtime 18.1 ke atas.

Python

checkpoint_path = "/path/to/checkpointLocation"

(spark.readStream
  .option("schemaTrackingLocation", checkpoint_path)
  .table("delta_source_table")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("output_table")
)

Scala

val checkpointPath = "/path/to/checkpointLocation"

spark.readStream
  .option("schemaTrackingLocation", checkpointPath)
  .table("delta_source_table")
  .writeStream
  .option("checkpointLocation", checkpointPath)
  .toTable("output_table")

Setelah menyediakan lokasi pelacakan skema, aliran mengembangkan skema yang dilacak ketika mendeteksi perubahan jenis lalu berhenti. Pada saat itu, Anda dapat mengambil tindakan yang diperlukan untuk menangani perubahan jenis, seperti mengaktifkan pelebaran jenis pada tabel bawah atau memperbarui kueri streaming.

Untuk melanjutkan pemrosesan, atur konfigurasi spark.databricks.delta.streaming.allowSourceColumnTypeChange Spark atau opsi allowSourceColumnTypeChangepembaca DataFrame :

Python

checkpoint_path = "/path/to/checkpointLocation"

(spark.readStream
  .option("schemaTrackingLocation", checkpoint_path)
  .option("allowSourceColumnTypeChange", "<delta_source_table_version>")
  # alternatively to allow all future type changes for this stream:
  # .option("allowSourceColumnTypeChange", "always")
  .table("delta_source_table")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("output_table")
)

Scala

val checkpointPath = "/path/to/checkpointLocation"

spark.readStream
  .option("schemaTrackingLocation", checkpointPath)
  .option("allowSourceColumnTypeChange", "<delta_source_table_version>")
  // alternatively to allow all future type changes for this stream:
  // .option("allowSourceColumnTypeChange", "always")
  .table("delta_source_table")
  .writeStream
  .option("checkpointLocation", checkpointPath)
  .toTable("output_table")

SQL

  -- To unblock for this particular stream just for this series of schema change(s):
  SET spark.databricks.delta.streaming.allowSourceColumnTypeChange.ckpt_<checkpoint_id> = "<delta_source_table_version>"
  -- To unblock for this particular stream:
  SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "<delta_source_table_version>"
  -- To unblock for all streams:
  SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "always"

titik pemeriksaan ID <checkpoint_id> dan versi tabel sumber Delta Lake <delta_source_table_version> ditampilkan dalam pesan kesalahan ketika aliran berhenti.

Alur Deklaratif Lakeflow Spark

Anda dapat mengaktifkan pelebaran tipe untuk Lakeflow Spark Declarative Pipelines di tingkat pipeline atau untuk tabel tertentu. Pelebaran tipe memungkinkan jenis kolom diperlebar secara otomatis selama eksekusi pipeline tanpa memerlukan penyegaran penuh pada tabel streaming. Jenis perubahan dalam tampilan materialisasi selalu memicu komputasi ulang penuh, dan ketika perubahan jenis diterapkan ke tabel sumber, tampilan materialisasi yang bergantung pada tabel tersebut memerlukan komputasi ulang penuh untuk mencerminkan jenis baru.

Mengaktifkan perluasan tipe untuk seluruh rangkaian

Untuk mengaktifkan pelebaran tipe untuk semua tabel dalam alur, atur konfigurasi alur pipelines.enableTypeWidening.

JSON

{
  "configuration": {
    "pipelines.enableTypeWidening": "true"
  }
}

YAML

configuration:
  pipelines.enableTypeWidening: 'true'

Mengaktifkan peluasan tipe untuk tabel tertentu

Anda juga dapat mengaktifkan pelebaran tipe untuk tabel tertentu dengan mengatur properti tabel delta.enableTypeWidening:

Python

import dlt

@dlt.table(
  table_properties={"delta.enableTypeWidening": "true"}
)
def my_table():
  return spark.readStream.table("source_table")

SQL

CREATE OR REFRESH STREAMING TABLE my_table
TBLPROPERTIES ('delta.enableTypeWidening' = 'true')
AS SELECT * FROM source_table

Kompatibilitas dengan pembaca hilir

Tabel dengan perluasan tipe diaktifkan hanya dapat dibaca di Databricks Runtime 15.4 LTS atau yang lebih tinggi. Jika Anda ingin tabel dengan pelebaran tipe yang diaktifkan dalam pipeline Anda dapat dibaca oleh pembaca di Databricks Runtime 14.3 atau versi lebih rendah, Anda harus:

  • Nonaktifkan perluasan tipe dengan menghapus properti delta.enableTypeWidening/pipelines.enableTypeWidening atau mengaturnya ke false, dan lakukan penyegaran penuh tabel.
  • Aktifkan Mode Kompatibilitas pada tabel Anda.

Berbagi Delta

Catatan

Dukungan untuk Pelebaran Tipe pada Delta Sharing tersedia di Databricks Runtime 16.1 ke atas.

Berbagi tabel Delta Lake dengan pelebaran tipe yang diaktifkan didukung dalam Berbagi Delta dari Databricks ke Databricks. Penyedia dan penerima harus berada di Databricks Runtime 16.1 atau lebih tinggi.

Untuk membaca Umpan Data Perubahan dari tabel Delta Lake dengan pelebaran tipe diaktifkan menggunakan Delta Sharing, Anda harus menetapkan format tanggapan ke delta.

spark.read
  .format("deltaSharing")
  .option("responseFormat", "delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", "<start version>")
  .option("endingVersion", "<end version>")
  .load("<table>")

Mengubah umpan data pada perubahan tipe tidak didukung. Anda harus membagi operasi menjadi dua pembacaan terpisah, satu berakhir pada versi tabel yang mengandung perubahan tipe, dan yang lain dimulai pada versi yang mengandung perubahan tipe.

Keterbatasan

Kompatibilitas Apache Iceberg

Apache Iceberg tidak mendukung semua perubahan tipe yang dicakup oleh pelebaran tipe, lihat Iceberg Schema Evolution. Secara khusus, Azure Databricks tidak mendukung perubahan jenis berikut:

  • byte, short, int, long ke decimal atau double
  • peningkatan skala desimal
  • date ke timestampNTZ

Saat Anda mengaktifkan UniForm dengan kompatibilitas Iceberg pada tabel Delta Lake, menerapkan salah satu perubahan jenis ini menghasilkan kesalahan. Lihat Membaca tabel Delta dengan klien Iceberg.

Jika Anda menerapkan salah satu perubahan jenis yang tidak didukung ini ke tabel Delta Lake, Anda memiliki dua opsi:

  1. Bangun ulang metadata Iceberg: Gunakan perintah berikut untuk membangun ulang metadata Iceberg tanpa fitur tabel pelebaran jenis:

    ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.universalFormat.config.icebergCompatVersion' = '<version>')
    

    Ini memungkinkan Anda untuk mempertahankan kompatibilitas Uniform setelah menerapkan perubahan jenis yang tidak kompatibel.

  2. Hapus fitur tabel pelebaran jenis: Lihat Nonaktifkan fitur tabel pelebaran jenis.

Fungsi yang bergantung pada jenis

Beberapa fungsi SQL mengembalikan hasil yang bergantung pada jenis data input. Misalnya, hash fungsi mengembalikan nilai hash yang berbeda untuk nilai logika yang sama jika jenis argumen berbeda: hash(1::INT) mengembalikan hasil yang berbeda dari hash(1::BIGINT).

Jenis fungsi dependen lainnya adalah: xxhash64, , bit_get, bit_reversetypeof.

Untuk hasil yang stabil dalam kueri yang menggunakan fungsi-fungsi ini, secara eksplisit mentransmisikan nilai ke jenis yang diinginkan:

Python

spark.read.table("table_name") \
  .selectExpr("hash(CAST(column_name AS BIGINT))")

Scala

spark.read.table("main.johan_lasperas.dlt_type_widening_bronze2")
  .selectExpr("hash(CAST(a AS BIGINT))")

SQL

-- Use explicit casting for stable hash values
SELECT hash(CAST(column_name AS BIGINT)) FROM table_name

Batasan Lainnya

  • Anda tidak dapat menyediakan lokasi pelacakan skema menggunakan SQL saat streaming dari tabel Delta Lake dengan perubahan jenis.
  • Anda tidak dapat berbagi tabel dengan peluasan tipe diaktifkan kepada konsumen non-Databricks menggunakan Delta Sharing.