Bagikan melalui


Lihat Mengonfigurasikan inferensi dan evolusi skema di Auto Loader.

Anda dapat mengonfigurasi Auto Loader untuk secara otomatis mendeteksi skema data yang dimuat, memungkinkan Anda menginisialisasi tabel tanpa secara eksplisit mendeklarasikan skema data dan mengembangkan skema tabel saat kolom baru diperkenalkan. Ini menghilangkan kebutuhan untuk melacak dan menerapkan perubahan skema secara manual dari waktu ke waktu.

Loader Otomatis juga akan "menyelamatkan" data yang tidak terduga (misalnya, dari jenis data yang berbeda) di kolom blob JSON, yang dapat Anda pilih untuk diakses nanti dengan menggunakan API akses data semi-terstruktur.

Format berikut didukung untuk inferensi dan evolusi skema:

Format file Versi yang didukung
JSON Semua versi
CSV Semua versi
XML Databricks Runtime 14.3 LTS ke atas
Avro Databricks Runtime 10.4 LTS ke atas
Parquet Databricks Runtime 11.3 LTS ke atas
ORC Tidak didukung
Text Tidak berlaku (skema tetap)
Binaryfile Tidak berlaku (skema tetap)

Sintaks untuk inferensi dan evolusi skema

Menentukan direktori target untuk opsi cloudFiles.schemaLocation memungkinkan inferensi dan evolusi skema. Anda dapat memilih untuk menggunakan direktori yang sama dengan yang Anda tentukan untuk checkpointLocation. Jika Anda menggunakan Tabel Langsung Delta, Azure Databricks mengelola lokasi skema dan informasi titik pemeriksaan lainnya secara otomatis.

Catatan

Jika Anda memiliki lebih dari satu lokasi data sumber yang dimuat ke dalam tabel target, setiap beban kerja penyerapan Auto Loader memerlukan titik pemeriksaan streaming terpisah.

Contoh berikut menggunakan parquet untuk cloudFiles.format. Gunakan csv, avro, atau json untuk sumber file lainnya. Semua pengaturan lain untuk membaca dan menulis tetap sama untuk perilaku default untuk setiap format.

Python

(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")
)

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Bagaimana cara kerja inferensi skema Auto Loader?

Untuk menyimpulkan skema saat pertama kali membaca data, Auto Loader mengambil sampel file 50 GB atau 1000 pertama yang ditemukannya, batas mana pun yang disilangkan terlebih dahulu. Auto Loader menyimpan informasi skema dalam direktori _schemas di yang dikonfigurasi cloudFiles.schemaLocation untuk melacak perubahan skema pada data input dari waktu ke waktu.

Catatan

Untuk mengubah ukuran sampel yang digunakan, Anda dapat mengatur konfigurasi SQL:

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(string byte, misalnya 10gb)

dan

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

(bilangan bulat)

Secara default, inferensi skema Auto Loader berusaha menghindari masalah evolusi skema karena ketidakcocokan jenis. Untuk format yang tidak mengodekan jenis data (JSON, CSV, dan XML), Auto Loader menyimpulkan semua kolom sebagai string (termasuk bidang berlapis dalam file JSON). Untuk format dengan skema yang diketik (Parquet dan Avro), Auto Loader mengambil sampel subset file dan menggabungkan skema file individual. Perilaku ini dirangkum dalam tabel berikut:

Format file Jenis data default yang disimpulkan
JSON String
CSV String
XML String
Avro Jenis yang dikodekan dalam skema Avro
Parquet Jenis yang dikodekan dalam skema Parquet

Apache Spark DataFrameReader menggunakan perilaku yang berbeda untuk inferensi skema, memilih jenis data untuk kolom di sumber JSON, CSV, dan XML berdasarkan data sampel. Untuk mengaktifkan perilaku ini dengan Auto Loader, atur opsi cloudFiles.inferColumnTypes ke true.

Catatan

Saat menyimpulkan skema untuk data CSV, Auto Loader mengasumsikan bahwa file berisi header. Jika file CSV Anda tidak berisi header, berikan opsi .option("header", "false"). Selain itu, Auto Loader menggabungkan skema semua file dalam sampel untuk menghasilkan skema global. Auto Loader kemudian dapat membaca setiap file sesuai dengan headernya dan mengurai CSV dengan benar.

Catatan

Saat kolom memiliki jenis data yang berbeda dalam dua file Parquet, Auto Loader memilih jenis terluas. Anda dapat menggunakan skemaHints untuk mengambil alih pilihan ini. Saat Anda menentukan petunjuk skema, Auto Loader tidak mentransmisikan kolom ke jenis yang ditentukan, melainkan memberi tahu pembaca Parquet untuk membaca kolom sebagai jenis yang ditentukan. Dalam kasus ketidakcocokan, kolom diselamatkan di kolom data yang diselamatkan.

Bagaimana cara kerja evolusi skema Auto Loader?

Auto Loader mendeteksi penambahan kolom baru saat memproses data Anda. Saat Auto Loader mendeteksi kolom baru, aliran berhenti dengan UnknownFieldException. Sebelum aliran Anda melemparkan kesalahan ini, Auto Loader melakukan inferensi skema pada batch mikro data terbaru dan memperbarui lokasi skema dengan skema terbaru dengan menggabungkan kolom baru ke akhir skema. Jenis data kolom yang ada tetap tidak berubah.

Databricks merekomendasikan untuk mengonfigurasi aliran Auto Loader dengan Pekerjaan Databricks untuk memulai ulang secara otomatis setelah perubahan skema tersebut.

Auto Loader mendukung mode berikut untuk evolusi skema, yang Anda atur dalam opsi cloudFiles.schemaEvolutionMode:

Mode Perilaku saat membaca kolom baru
addNewColumns (default) Streaming gagal. Kolom baru ditambahkan ke skema. Kolom yang ada tidak mengembangkan jenis data.
rescue Skema tidak pernah berevolusi dan aliran tidak gagal karena perubahan skema. Semua kolom baru direkam di kolom data yang diselamatkan.
failOnNewColumns Streaming gagal. Aliran tidak dimulai ulang kecuali skema yang disediakan diperbarui, atau file data yang menyinggung dihapus.
none Tidak mengembangkan skema, kolom baru diabaikan, dan data tidak diselamatkan kecuali rescuedDataColumn opsi diatur. Aliran tidak gagal karena perubahan skema.

Bagaimana cara kerja partisi dengan Auto Loader?

Auto Loader mencoba menyimpulkan kolom partisi dari struktur direktori data yang mendasar jika data ditata dalam pemartisian gaya Apache Hive. Misalnya, jalur base_path/event=click/date=2021-04-01/f0.json file menghasilkan inferensi date dan event sebagai kolom partisi. Jika struktur direktori yang mendasar berisi partisi Apache Hive yang bertentangan atau tidak berisi partisi gaya Apache Hive, kolom partisi diabaikan.

File biner (binaryFile) dan text format file memiliki skema data tetap, tetapi mendukung inferensi kolom partisi. Databricks merekomendasikan pengaturan cloudFiles.schemaLocation untuk format file ini. Ini menghindari potensi kesalahan atau kehilangan informasi dan mencegah inferensi kolom partisi setiap kali Pemuat Otomatis dimulai.

Kolom partisi tidak dipertimbangkan untuk evolusi skema. Jika Anda memiliki struktur direktori awal seperti base_path/event=click/date=2021-04-01/f0.json, lalu mulai menerima file baru sebagai base_path/event=click/date=2021-04-01/hour=01/f1.json, Auto Loader mengabaikan kolom jam. Untuk menangkap informasi untuk kolom partisi baru, atur cloudFiles.partitionColumns ke event,date,hour.

Catatan

Opsi cloudFiles.partitionColumns mengambil daftar nama kolom yang dipisahkan koma. Hanya kolom yang ada sebagai key=value pasangan dalam struktur direktori Anda yang diurai.

Apa kolom data yang diselamatkan?

Saat Auto Loader menyimpulkan skema, kolom data yang diselamatkan secara otomatis ditambahkan ke skema Anda sebagai _rescued_data. Anda dapat mengganti nama kolom atau memasukkannya dalam kasus di mana Anda memberikan skema dengan mengatur opsi rescuedDataColumn.

Kolom data yang diselamatkan memastikan bahwa kolom yang tidak cocok dengan skema diselamatkan alih-alih dihilangkan. Kolom data yang diselamatkan berisi data apa pun yang tidak diurai karena alasan berikut:

  • Kolom hilang dari skema.
  • Ketidakcocokan jenis.
  • Ketidakcocokan kasus.

Kolom data yang diselamatkan berisi JSON yang berisi kolom yang diselamatkan dan jalur file sumber rekaman.

Catatan

Pengurai JSON dan CSV mendukung tiga mode saat mengurai baris: PERMISSIVE, DROPMALFORMED, dan FAILFAST. Ketika digunakan bersama dengan rescuedDataColumn, ketidakcocokan tipe data tidak menyebabkan catatan dijatuhkan dalam mode DROPMALFORMED atau melemparkan kesalahan dalam mode FAILFAST. Hanya rekaman rusak yang dihilangkan atau melempar kesalahan, seperti JSON atau CSV yang tidak lengkap atau cacat. Jika Anda menggunakan badRecordsPath saat mengurai JSON, ketidakcocokan tipe data tidak akan dianggap sebagai baris buruk saat menggunakan rescuedDataColumn. Hanya baris JSON yang tidak lengkap dan cacat yang disimpan di badRecordsPath.

Mengubah perilaku peka huruf besar/kecil

Kecuali jika kepekaan huruf besar/kecil diaktifkan, kolom abc, Abc, dan ABC dianggap sebagai kolom yang sama untuk tujuan inferensi skema. Kasus yang dipilih segan-segan dan tergantung pada data sampel. Anda dapat menggunakan petunjuk skema untuk memberlakukan huruf besar/kecil mana yang harus digunakan. Setelah pilihan dibuat dan skema disimpulkan, Auto Loader tidak mempertimbangkan varian casing yang tidak dipilih konsisten dengan skema.

Saat kolom data yang diselamatkan diaktifkan, bidang bernama dalam kasus selain dari skema dimuat ke _rescued_data kolom. Ubah perilaku ini dengan mengatur opsi readerCaseSensitive ke false, dalam hal ini Auto Loader membaca data dengan cara yang tidak peka huruf besar/kecil.

Mengambil alih inferensi skema dengan petunjuk skema

Anda dapat menggunakan petunjuk skema untuk memberlakukan informasi skema yang Anda ketahui dan harapkan pada skema yang disimpulkan. Saat Anda tahu bahwa kolom adalah tipe data tertentu, atau jika Anda ingin memilih jenis data yang lebih umum (misalnya, double bukan integer), Anda dapat memberikan jumlah petunjuk segan untuk jenis data kolom sebagai string menggunakan sintaks spesifikasi skema SQL, seperti berikut ini:

.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

Lihat dokumentasi pada jenis data untuk daftar jenis data yang didukung.

Jika kolom tidak ada di awal aliran, Anda juga dapat menggunakan petunjuk skema untuk menambahkan kolom tersebut ke skema yang disimpulkan.

Berikut adalah contoh skema yang disimpulkan untuk melihat perilaku tersebut dengan petunjuk skema.

Skema yang diinferensikan:

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

Dengan menentukan petunjuk skema berikut:

.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

Anda mendapatkan:

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

Catatan

Dukungan petunjuk skema Array dan Map tersedia di Databricks Runtime 9.1 LTS dan di atasnya.

Berikut adalah contoh skema yang disimpulkan dengan jenis data kompleks untuk melihat perilaku dengan petunjuk skema.

Skema yang diinferensikan:

|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int

Dengan menentukan petunjuk skema berikut:

.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")

Anda mendapatkan:

|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string -> int
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string -> int
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int -> string

Catatan

Petunjuk skema hanya digunakan jika Anda tidak memberikan skema ke Auto Loader. Anda dapat menggunakan petunjuk skema apakah cloudFiles.inferColumnTypes diaktifkan atau dinonaktifkan.