Menyimpulkan dan mengembangkan skema menggunakan from_json dalam alur

Penting

Fitur ini sedang dalam Pratinjau Publik.

Artikel ini menjelaskan cara menyimpulkan dan mengembangkan skema blob JSON dengan from_json fungsi SQL di Lakeflow Spark Declarative Pipelines.

Gambaran Umum

Fungsi from_json SQL menguraikan kolom string JSON dan mengembalikan nilai struct. Saat digunakan di luar alur, Anda harus secara eksplisit memberikan skema nilai yang dikembalikan menggunakan schema argumen . Ketika digunakan dengan Lakeflow Spark Declarative Pipelines, Anda dapat mengaktifkan inferensi dan evolusi skema, yang secara otomatis mengelola skema nilai yang dikembalikan. Fitur ini menyederhanakan pengaturan awal (terutama ketika skema tidak diketahui) dan operasi yang sedang berlangsung ketika skema sering berubah. Ini memungkinkan pemrosesan blob JSON yang mulus dari sumber data streaming seperti Auto Loader, Kafka, atau Kinesis.

Secara khusus, ketika digunakan dalam alur, inferensi skema dan evolusi untuk from_json fungsi SQL dapat:

  • Mendeteksi bidang baru dalam rekaman JSON masuk (termasuk objek JSON berlapis)
  • Menyimpulkan jenis kolom dan mengasosiasikannya dengan jenis data Spark yang sesuai.
  • Secara otomatis mengembangkan skema untuk mengakomodasi bidang baru
  • Menangani data secara otomatis yang tidak sesuai dengan skema saat ini

Sintaks: Secara otomatis menyimpulkan dan mengembangkan skema

Untuk mengaktifkan inferensi skema dalam from_json pipeline, atur skema ke NULL dan tentukan opsi schemaLocationKey. Ini memungkinkannya untuk menyimpulkan dan melacak skema.

SQL

from_json(jsonStr, NULL, map("schemaLocationKey", "<uniqueKey>” [, otherOptions]))

Phyton

from_json(jsonStr, None, {"schemaLocationKey": "<uniqueKey>”[, otherOptions]})

Kueri dapat memiliki beberapa from_json ekspresi, tetapi setiap ekspresi harus memiliki ekspresi yang unik schemaLocationKey. Bagian schemaLocationKey juga harus unik per alur kerja.

SQL

SELECT
  value,
  from_json(value, NULL, map('schemaLocationKey', 'keyX')) parsedX,
  from_json(value, NULL, map('schemaLocationKey', 'keyY')) parsedY,
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

Phyton

(spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "text")
    .load("/databricks-datasets/nyctaxi/sample/json/")
    .select(
      col("value"),
      from_json(col("value"), None, {"schemaLocationKey": "keyX"}).alias("parsedX"),
      from_json(col("value"), None, {"schemaLocationKey": "keyY"}).alias("parsedY"))
)

Sintaks: Skema tetap

Jika Anda ingin menerapkan skema tertentu sebagai gantinya, Anda dapat menggunakan sintaks berikut from_json untuk mengurai string JSON menggunakan skema tersebut:

from_json(jsonStr, schema, [, options])

Sintaks ini dapat digunakan di lingkungan Azure Databricks apa pun, termasuk Alur Deklaratif Lakeflow Spark. Informasi selengkapnya tersedia di sini.

Inferensi Skema

from_json menyimpulkan skema dari batch pertama kolom data JSON dan secara internal mengindeksnya dengan schemaLocationKey (diperlukan).

Jika string JSON adalah objek tunggal (misalnya, {"id": 123, "name": "John"}), from_json menyimpulkan skema jenis STRUCT dan menambahkan rescuedDataColumn ke daftar bidang.

STRUCT<id LONG, name STRING, _rescued_data STRING>

Namun, jika string JSON memiliki array tingkat atas (seperti ["id": 123, "name": "John"]), maka from_json membungkus ARRAY dalam STRUCT. Pendekatan ini memungkinkan penyelamatan data yang tidak kompatibel dengan skema yang disimpulkan. Anda memiliki opsi untuk menguraikan nilai-nilai array ke dalam baris terpisah dalam proses berikutnya.

STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>

Mengambil alih inferensi skema menggunakan petunjuk skema

Anda dapat secara opsional menyediakan schemaHints untuk memengaruhi bagaimana from_json menyimpulkan jenis kolom. Ini berguna ketika Anda tahu bahwa kolom adalah jenis data tertentu, atau jika Anda ingin memilih jenis data yang lebih umum (misalnya, ganda alih-alih bilangan bulat). Anda dapat memberikan jumlah petunjuk arbitrer untuk jenis data kolom menggunakan sintaks spesifikasi skema SQL. Semantik untuk petunjuk skema sama dengan petunjuk skema Auto Loader. Contohnya:

SELECT
-- The JSON `{"a": 1}` will treat `a` as a BIGINT
from_json(data, NULL, map('schemaLocationKey', 'w', 'schemaHints', '')),
-- The JSON `{"a": 1}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'x', 'schemaHints', 'a STRING')),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a MAP<STRING, BIGINT>
from_json(data, NULL, map('schemaLocationKey', 'y', 'schemaHints', 'a MAP<STRING, BIGINT'>)),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'z', 'schemaHints', 'a STRING')),
FROM STREAM READ_FILES(...)

Ketika string JSON berisi ARRAY tingkat atas, string tersebut dibungkus dalam STRUCT. Dalam kasus ini, petunjuk skema diterapkan pada skema ARRAY, bukan pada STRUCT yang terbungkus. Misalnya, pertimbangkan string JSON dengan array tingkat atas seperti:

[{"id": 123, "name": "John"}]

Skema ARRAY yang disimpulkan dibungkus dalam STRUCT:

STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>

Untuk mengubah jenis iddata , tentukan petunjuk skema sebagai element.id STRING. Untuk menambahkan kolom baru tipe DOUBLE, tentukan element.new_col DOUBLE. Karena petunjuk ini, skema untuk array JSON tingkat atas menjadi:

struct<value array<id STRING, name STRING, new_col DOUBLE>, _rescued_data STRING>

Mengembangkan skema menggunakan schemaEvolutionMode

from_json mendeteksi penambahan kolom baru saat memproses data Anda. Saat from_json mendeteksi bidang baru, sistem memperbarui skema yang disimpulkan dengan menambahkan kolom baru di akhir skema yang ada. Tipe data kolom yang ada tetap tidak berubah. Setelah pembaruan skema, alur dimulai ulang secara otomatis dengan skema yang diperbarui.

from_json mendukung mode berikut untuk evolusi skema, yang Anda tetapkan menggunakan pengaturan opsional schemaEvolutionMode . Mode ini konsisten dengan Auto Loader.

schemaEvolutionMode Perilaku saat membaca kolom baru
addNewColumns (standar) Streaming gagal. Kolom baru ditambahkan ke skema. Kolom yang ada tidak mengembangkan jenis data.
rescue Skema tidak pernah berevolusi dan aliran tidak gagal karena perubahan skema. Semua kolom baru direkam di kolom data yang diselamatkan.
failOnNewColumns Streaming gagal. Aliran tidak dimulai ulang kecuali schemaHints diperbarui atau data yang menyinggung dihapus.
none Tidak mengubah skema, kolom baru diabaikan, dan data tidak diselamatkan kecuali opsi rescuedDataColumn diaktifkan. Aliran tidak gagal karena perubahan skema.

Contohnya:

SELECT
-- If a new column appears, the pipeline will automatically add it to the schema:
from_json(a, NULL, map('schemaLocationKey', 'w', 'schemaEvolutionMode', 'addNewColumns')),
-- If a new column appears, the pipeline will add it to the rescued data column:
from_json(b, NULL, map('schemaLocationKey', 'x', 'schemaEvolutionMode', 'rescue')),
-- If a new column appears, the pipeline will ignore it:
from_json(c, NULL, map('schemaLocationKey', 'y', 'schemaEvolutionMode', 'none')),
-- If a new column appears, the pipeline will fail:
from_json(d, NULL, map('schemaLocationKey', 'z', 'schemaEvolutionMode', 'failOnNewColumns')),
FROM STREAM READ_FILES(...)

Kolom data yang diselamatkan

Kolom data yang diselamatkan secara otomatis ditambahkan ke skema Anda sebagai _rescued_data. Anda dapat mengganti nama kolom dengan mengatur rescuedDataColumn opsi . Contohnya:

from_json(jsonStr, None, {"schemaLocationKey": "keyX", "rescuedDataColumn": "my_rescued_data"})

Saat Anda memilih untuk menggunakan kolom data yang diselamatkan, kolom apa pun yang tidak cocok dengan skema yang disimpulkan akan diselamatkan alih-alih dihilangkan. Ini mungkin terjadi karena ketidakcocokan tipe data, kolom yang hilang dalam skema, atau perbedaan casing nama kolom.

Menangani rekaman yang rusak

Untuk menyimpan rekaman yang salah format dan tidak dapat diurai, tambahkan _corrupt_record kolom dengan mengatur petunjuk skema, seperti dalam contoh berikut:

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL,
      map('schemaLocationKey', 'nycTaxi',
          'schemaHints', '_corrupt_record STRING',
          'columnNameOfCorruptRecord', '_corrupt_record')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

Untuk mengganti nama kolom catatan yang rusak, atur columnNameOfCorruptRecord opsi .

Pengurai JSON mendukung tiga mode untuk menangani rekaman yang rusak:

Pengaturan Description
PERMISSIVE Untuk rekaman yang rusak, menempatkan string cacat ke dalam bidang yang dikonfigurasi oleh columnNameOfCorruptRecord dan mengatur bidang cacat ke null. Untuk menyimpan rekaman yang rusak, Anda bisa mengatur bidang jenis string bernama columnNameOfCorruptRecord dalam skema yang ditentukan pengguna. Jika skema tidak memiliki bidang , rekaman yang rusak akan dihilangkan selama penguraian. Ketika menyimpulkan skema, pengurai secara implisit menambahkan columnNameOfCorruptRecord bidang dalam skema output.
DROPMALFORMED Mengabaikan rekaman yang rusak.
Saat Anda menggunakan DROPMALFORMED mode dengan rescuedDataColumn, ketidakcocokan jenis data tidak menyebabkan rekaman dihilangkan. Hanya rekaman rusak yang dihilangkan, seperti JSON yang tidak lengkap atau salah bentuk.
FAILFAST Melempar pengecualian ketika pengurai menemui rekor yang rusak.
Saat Anda menggunakan FAILFAST mode dengan rescuedDataColumn, ketidakcocokan jenis data tidak menimbulkan kesalahan. Hanya catatan yang rusak yang melemparkan kesalahan, seperti JSON yang tidak lengkap atau salah format.

Merujuk ke bidang dalam output "from_json"

from_json menentukan skema selama eksekusi alur. Jika sebuah kueri hilir mengacu ke bidang from_json sebelum fungsi from_json berhasil dijalankan setidaknya satu kali, maka bidang tersebut tidak dapat diolah dan kueri dilewati. Dalam contoh berikut, analisis untuk kueri tabel perak akan dilewati hingga from_json fungsi dalam kueri perunggu telah dijalankan dan menyimpulkan skema.

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

CREATE STREAMING TABLE silver AS
  SELECT jsonCol.VendorID, jsonCol.total_amount
  FROM bronze

from_json Jika fungsi dan bidang yang disimpulkannya dirujuk dalam kueri yang sama, analisis mungkin gagal seperti dalam contoh berikut:

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
  WHERE jsonCol.total_amount > 100.0

Anda dapat memperbaikinya dengan memindahkan referensi ke from_json bidang ke dalam kueri hilir (seperti contoh perunggu/perak di atas.) Atau, Anda dapat menentukan schemaHints yang berisi bidang yang dirujuk from_json . Contohnya:

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi', 'schemaHints', 'total_amount DOUBLE')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
  WHERE jsonCol.total_amount > 100.0

Contoh: Secara otomatis menyimpulkan dan mengembangkan skema

Bagian ini menyediakan contoh kode untuk mengaktifkan inferensi dan evolusi skema otomatis menggunakan from_json di Lakeflow Spark Declarative Pipelines.

Membuat tabel streaming dari penyimpanan objek cloud

Contoh berikut menggunakan read_files sintaksis untuk membuat tabel streaming dari penyimpanan objek cloud.

SQL

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

Phyton

@dp.table(comment="from_json autoloader example")
def bronze():
  return (
    spark.readStream
         .format("cloudFiles")
         .option("cloudFiles.format", "text")
         .load("/databricks-datasets/nyctaxi/sample/json/")
         .select(from_json(col("value"), None, {"schemaLocationKey": "nycTaxi"}).alias("jsonCol"))
)

Membuat tabel streaming dari Kafka

Contoh berikut menggunakan read_kafka sintaksis untuk membuat tabel streaming dari Kafka.

SQL

CREATE STREAMING TABLE bronze AS
  SELECT
    value,
    from_json(value, NULL, map('schemaLocationKey', 'keyX')) jsonCol,
  FROM READ_KAFKA(
    bootstrapSevers => '<server:ip>',
    subscribe => 'events',
    "startingOffsets", "latest"
)

Phyton

@dp.table(comment="from_json kafka example")
def bronze():
  return (
    spark.readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", "<server:ip>")
         .option("subscribe", "<topic>")
         .option("startingOffsets", "latest")
         .load()
         .select(col(“value”), from_json(col(“value”), None, {"schemaLocationKey": "keyX"}).alias("jsonCol"))
)

Contoh: Skema tetap

Misalnya kode menggunakan from_json dengan skema tetap, lihat from_json fungsi.

FAQs

Bagian ini menjawab pertanyaan yang sering diajukan tentang inferensi skema dan dukungan evolusi dalam from_json fungsi.

Apa perbedaan antara from_json dan parse_json?

Fungsi parse_json mengembalikan VARIANT nilai dari string JSON.

VARIAN menyediakan cara yang fleksibel dan efisien untuk menyimpan data semi terstruktur. Ini menghindari inferensi dan evolusi skema dengan menghilangkan tipe yang ketat. Namun, jika Anda ingin menerapkan skema pada waktu tulis (misalnya, karena Anda memiliki skema yang relatif ketat), from_json mungkin merupakan opsi yang lebih baik.

Tabel berikut ini menjelaskan perbedaan antara from_json dan parse_json:

Function Kasus penggunaan Availability
from_json Evolusi skema dengan from_json mempertahankan skema. Ini berguna ketika:
  • Anda ingin menerapkan skema data Anda (misalnya, meninjau setiap perubahan skema sebelum mempertahankannya).
  • Anda ingin mengoptimalkan penyimpanan dan memerlukan latensi dan biaya kueri yang rendah.
  • Anda ingin gagal pada data dengan jenis yang tidak cocok.
  • Anda ingin mengekstrak hasil parsial dari rekaman JSON yang rusak dan menyimpan rekaman cacat di _corrupt_record kolom. Sebaliknya, penyerapan VARIAN mengembalikan kesalahan untuk JSON yang tidak valid.
Hanya tersedia dengan inferensi dan evolusi skema di Pipeline Deklaratif Lakeflow Spark
parse_json VARIAN sangat cocok untuk menyimpan data yang tidak perlu diskema. Contohnya:
  • Anda ingin menyimpan data semi terstruktur karena fleksibel.
  • Skema berubah terlalu cepat untuk mentransmisikannya ke dalam skema tanpa sering melakukan kegagalan streaming dan memulai ulang.
  • Anda tidak ingin gagal pada data dengan jenis yang tidak cocok. (Penyerapan VARIAN akan selalu berhasil untuk rekaman JSON yang valid—bahkan jika ada ketidakcocokan jenis.)
  • Pengguna Anda tidak ingin berurusan dengan kolom data yang diselamatkan yang berisi bidang yang tidak sesuai dengan skema.
Tersedia dengan dan tanpa Alur Deklaratif Lakeflow Spark

Dapatkah saya menggunakan from_json sintaks inferensi dan evolusi skema di luar Alur Deklaratif Lakeflow Spark?

Tidak, Anda tidak dapat menggunakan from_json inferensi skema dan sintaks evolusi di luar Alur Deklaratif Lakeflow Spark.

Bagaimana cara mengakses skema yang disimpulkan oleh from_json?

Lihat skema tabel streaming target.

Dapatkah saya meneruskan from_json skema dan juga melakukan evolusi?

Tidak, Anda tidak dapat melewati from_json skema dan juga melakukan evolusi. Namun, Anda dapat memberikan petunjuk skema untuk mengambil alih beberapa atau semua bidang yang disimpulkan oleh from_json.

Apa yang terjadi pada skema jika tabel sepenuhnya diperbarui?

Lokasi skema yang terkait dengan tabel dibersihkan, dan skema disimpulkan ulang dari awal.