Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Penting
Fitur ini ada di Pratinjau Publik.
Di Databricks Runtime 15.3 ke atas, Anda dapat menggunakan jenis VARIANT
untuk mengolah data semi-terstruktur. Artikel ini menjelaskan perilaku dan menyediakan pola contoh untuk menyerap data dari penyimpanan objek cloud menggunakan Auto Loader dan COPY INTO
, rekaman streaming dari kafka, dan perintah SQL untuk membuat tabel baru dengan data varian atau menyisipkan rekaman baru menggunakan jenis varian. Tabel berikut ini meringkas format file yang didukung dan dukungan versi Databricks Runtime:
Format berkas | Versi Runtime Databricks yang didukung |
---|---|
JSON | 15.3 ke atas |
XML | 16.4 ke atas |
CSV | 16.4 ke atas |
Lihat Data Kueri Varian.
Membuat tabel dengan kolom varian
VARIANT
adalah jenis SQL standar dalam Databricks Runtime 15.3 ke atas dan didukung oleh tabel yang didukung oleh Delta Lake. Tabel terkelola di Azure Databricks menggunakan Delta Lake secara default, sehingga Anda bisa membuat tabel kosong dengan satu VARIANT
kolom menggunakan sintaks berikut:
CREATE TABLE table_name (variant_column VARIANT)
Secara bergantian, Anda dapat menggunakan PARSE_JSON
fungsi pada string JSON atau FROM_XML
fungsi pada string XML untuk menggunakan pernyataan CTAS untuk membuat tabel dengan kolom varian. Contoh berikut membuat tabel dengan dua kolom:
- Kolom
id
yang diekstrak dari string JSON dalam bentuk tipeSTRING
. - Kolom
variant_column
berisi seluruh string JSON yang dikodekan sebagaiVARIANT
jenis.
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
Catatan
Databricks merekomendasikan ekstraksi dan penyimpanan bidang sebagai kolom non-varian yang Anda rencanakan untuk digunakan untuk mempercepat kueri dan mengoptimalkan tata letak penyimpanan.
VARIANT
kolom tidak dapat digunakan untuk pengklusteran kunci, partisi, atau kunci urutan Z. Jenis data VARIANT
tidak dapat digunakan untuk perbandingan, pengelompokan, pemesanan, dan operasi set. Untuk daftar lengkap batasan, lihat batasan .
Menyisipkan data menggunakan parse_json
Jika tabel target sudah berisi kolom yang dikodekan sebagai VARIANT
, Anda bisa menggunakan parse_json
untuk menyisipkan rekaman string JSON sebagai VARIANT
, seperti dalam contoh berikut:
SQL
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data
Phyton
from pyspark.sql.functions import col, parse_json
(spark.read
.table("source_data")
.select(parse_json(col("json_string")))
.write
.mode("append")
.saveAsTable("table_name")
)
Menyisipkan data menggunakan from_xml
Jika tabel target sudah berisi kolom yang dikodekan sebagai VARIANT
, Anda bisa menggunakan from_xml
untuk menyisipkan rekaman string XML sebagai VARIANT
. Contohnya:
SQL
INSERT INTO table_name (variant_column)
SELECT FROM_XML(xml_string, 'variant')
FROM source_data
Phyton
from pyspark.sql.functions import col, from_xml
(spark.read
.table("source_data")
.select(from_xml(col("xml_string"), "variant"))
.write
.mode("append")
.saveAsTable("table_name")
)
Menyisipkan data menggunakan from_csv
Jika tabel target sudah berisi kolom yang dikodekan sebagai VARIANT
, Anda bisa menggunakan from_xml
untuk menyisipkan rekaman string XML sebagai VARIANT
. Contohnya:
SQL
INSERT INTO table_name (variant_column)
SELECT FROM_CSV(csv_string, 'v variant').v
FROM source_data
Phyton
from pyspark.sql.functions import col, from_csv
(spark.read
.table("source_data")
.select(from_csv(col("csv_string"), "v variant").v)
.write
.mode("append")
.saveAsTable("table_name")
)
Memasukkan data dari penyimpanan objek cloud dalam bentuk varian data
Auto Loader dapat digunakan untuk memuat semua data dari sumber file yang didukung sebagai kolom tunggal VARIANT
dalam tabel target. Karena VARIANT
fleksibel terhadap perubahan skema dan jenis serta mempertahankan sensitivitas huruf besar-kecil dan nilai NULL
yang ada di sumber data, pola ini kuat untuk sebagian besar skenario penyerapan dengan catatan berikut:
- Rekaman cacat tidak dapat dikodekan menggunakan
VARIANT
tipe. -
VARIANT
jenis hanya dapat menyimpan rekaman hingga ukuran 16mb.
Catatan
Varian memperlakukan rekaman yang terlalu besar mirip dengan rekaman yang rusak. Dalam mode pemrosesan default PERMISSIVE
, rekaman yang terlalu besar diambil di corruptRecordColumn
.
Karena seluruh rekaman dicatat sebagai kolom tunggal VARIANT
, tidak ada evolusi skema yang terjadi selama penyerapan dan rescuedDataColumn
tidak didukung. Contoh berikut mengasumsikan bahwa tabel target sudah ada dengan satu VARIANT
kolom.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Anda juga dapat menentukan VARIANT
kapan menentukan skema atau meneruskan schemaHints
. Data di bidang sumber yang direferensikan harus berisi rekaman yang valid. Contoh berikut menunjukkan sintaks ini:
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Gunakan COPY INTO
dengan varian
Databricks merekomendasikan penggunaan Auto Loader dibandingkan dengan COPY INTO
jika tersedia.
COPY INTO
mendukung penyerapan seluruh konten sumber data yang didukung sebagai satu kolom. Contoh berikut membuat tabel baru dengan satu VARIANT
kolom lalu menggunakan COPY INTO
untuk menyerap rekaman dari sumber file JSON.
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FORMAT_OPTIONS ('singleVariantColumn' = 'name')
Alirkan data Kafka sebagai variasi
Banyak aliran Kafka mengodekan payload mereka menggunakan JSON. Menyerap aliran Kafka menggunakan VARIANT
membuat beban kerja ini kuat terhadap perubahan skema.
Contoh berikut menunjukkan membaca sumber streaming Kafka, mengonversi key
sebagai STRING
dan value
sebagai VARIANT
, dan menulis ke tabel target.
from pyspark.sql.functions import col, parse_json
(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)