Bagikan melalui


Memasukkan data sebagai tipe varian semi-terstruktur

Penting

Fitur ini ada di Pratinjau Publik.

Di Databricks Runtime 15.3 ke atas, Anda dapat menggunakan jenis VARIANT untuk mengolah data semi-terstruktur. Artikel ini menjelaskan perilaku dan menyediakan pola contoh untuk menyerap data dari penyimpanan objek cloud menggunakan Auto Loader dan COPY INTO, rekaman streaming dari kafka, dan perintah SQL untuk membuat tabel baru dengan data varian atau menyisipkan rekaman baru menggunakan jenis varian. Tabel berikut ini meringkas format file yang didukung dan dukungan versi Databricks Runtime:

Format berkas Versi Runtime Databricks yang didukung
JSON 15.3 ke atas
XML 16.4 ke atas
CSV 16.4 ke atas

Lihat Data Kueri Varian.

Membuat tabel dengan kolom varian

VARIANT adalah jenis SQL standar dalam Databricks Runtime 15.3 ke atas dan didukung oleh tabel yang didukung oleh Delta Lake. Tabel terkelola di Azure Databricks menggunakan Delta Lake secara default, sehingga Anda bisa membuat tabel kosong dengan satu VARIANT kolom menggunakan sintaks berikut:

CREATE TABLE table_name (variant_column VARIANT)

Secara bergantian, Anda dapat menggunakan PARSE_JSON fungsi pada string JSON atau FROM_XML fungsi pada string XML untuk menggunakan pernyataan CTAS untuk membuat tabel dengan kolom varian. Contoh berikut membuat tabel dengan dua kolom:

  • Kolom id yang diekstrak dari string JSON dalam bentuk tipe STRING.
  • Kolom variant_column berisi seluruh string JSON yang dikodekan sebagai VARIANT jenis.
CREATE TABLE table_name AS
  SELECT json_string:id AS id,
    PARSE_JSON(json_string) variant_column
  FROM source_data

Catatan

Databricks merekomendasikan ekstraksi dan penyimpanan bidang sebagai kolom non-varian yang Anda rencanakan untuk digunakan untuk mempercepat kueri dan mengoptimalkan tata letak penyimpanan.

VARIANT kolom tidak dapat digunakan untuk pengklusteran kunci, partisi, atau kunci urutan Z. Jenis data VARIANT tidak dapat digunakan untuk perbandingan, pengelompokan, pemesanan, dan operasi set. Untuk daftar lengkap batasan, lihat batasan .

Menyisipkan data menggunakan parse_json

Jika tabel target sudah berisi kolom yang dikodekan sebagai VARIANT, Anda bisa menggunakan parse_json untuk menyisipkan rekaman string JSON sebagai VARIANT, seperti dalam contoh berikut:

SQL

INSERT INTO table_name (variant_column)
  SELECT PARSE_JSON(json_string)
  FROM source_data

Phyton

from pyspark.sql.functions import col, parse_json

(spark.read
  .table("source_data")
  .select(parse_json(col("json_string")))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Menyisipkan data menggunakan from_xml

Jika tabel target sudah berisi kolom yang dikodekan sebagai VARIANT, Anda bisa menggunakan from_xml untuk menyisipkan rekaman string XML sebagai VARIANT. Contohnya:

SQL

INSERT INTO table_name (variant_column)
  SELECT FROM_XML(xml_string, 'variant')
  FROM source_data

Phyton

from pyspark.sql.functions import col, from_xml

(spark.read
  .table("source_data")
  .select(from_xml(col("xml_string"), "variant"))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Menyisipkan data menggunakan from_csv

Jika tabel target sudah berisi kolom yang dikodekan sebagai VARIANT, Anda bisa menggunakan from_xml untuk menyisipkan rekaman string XML sebagai VARIANT. Contohnya:

SQL

INSERT INTO table_name (variant_column)
  SELECT FROM_CSV(csv_string, 'v variant').v
  FROM source_data

Phyton

from pyspark.sql.functions import col, from_csv

(spark.read
  .table("source_data")
  .select(from_csv(col("csv_string"), "v variant").v)
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Memasukkan data dari penyimpanan objek cloud dalam bentuk varian data

Auto Loader dapat digunakan untuk memuat semua data dari sumber file yang didukung sebagai kolom tunggal VARIANT dalam tabel target. Karena VARIANT fleksibel terhadap perubahan skema dan jenis serta mempertahankan sensitivitas huruf besar-kecil dan nilai NULL yang ada di sumber data, pola ini kuat untuk sebagian besar skenario penyerapan dengan catatan berikut:

  • Rekaman cacat tidak dapat dikodekan menggunakan VARIANT tipe.
  • VARIANT jenis hanya dapat menyimpan rekaman hingga ukuran 16mb.

Catatan

Varian memperlakukan rekaman yang terlalu besar mirip dengan rekaman yang rusak. Dalam mode pemrosesan default PERMISSIVE , rekaman yang terlalu besar diambil di corruptRecordColumn.

Karena seluruh rekaman dicatat sebagai kolom tunggal VARIANT , tidak ada evolusi skema yang terjadi selama penyerapan dan rescuedDataColumn tidak didukung. Contoh berikut mengasumsikan bahwa tabel target sudah ada dengan satu VARIANT kolom.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("singleVariantColumn", "variant_column")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Anda juga dapat menentukan VARIANT kapan menentukan skema atau meneruskan schemaHints. Data di bidang sumber yang direferensikan harus berisi rekaman yang valid. Contoh berikut menunjukkan sintaks ini:

# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("name STRING, address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("payload VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaHints", "address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Gunakan COPY INTO dengan varian

Databricks merekomendasikan penggunaan Auto Loader dibandingkan dengan COPY INTO jika tersedia.

COPY INTO mendukung penyerapan seluruh konten sumber data yang didukung sebagai satu kolom. Contoh berikut membuat tabel baru dengan satu VARIANT kolom lalu menggunakan COPY INTO untuk menyerap rekaman dari sumber file JSON.

CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('singleVariantColumn' = 'name')

Alirkan data Kafka sebagai variasi

Banyak aliran Kafka mengodekan payload mereka menggunakan JSON. Menyerap aliran Kafka menggunakan VARIANT membuat beban kerja ini kuat terhadap perubahan skema.

Contoh berikut menunjukkan membaca sumber streaming Kafka, mengonversi key sebagai STRING dan value sebagai VARIANT, dan menulis ke tabel target.

from pyspark.sql.functions import col, parse_json

(spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("startingOffsets", "earliest")
  .load()
  .select(
    col("key").cast("string"),
    parse_json(col("value").cast("string"))
  ).writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)