Pola pemuatan data umum

Auto Loader menyederhanakan sejumlah tugas penyerapan data umum. Referensi cepat ini memberikan contoh untuk beberapa pola populer.

Menyerap data dari penyimpanan objek cloud sebagai varian

Auto Loader dapat memuat semua data dari sumber file yang didukung sebagai kolom tunggal VARIANT dalam tabel target. Karena VARIANT fleksibel terhadap perubahan skema dan jenis serta mempertahankan sensitivitas huruf besar/kecil dan nilai NULL yang ada di sumber data, pola ini kuat untuk sebagian besar skenario ingesti. Untuk informasi lebih lanjut, lihat Mengimpor data dari penyimpanan objek cloud sebagai varian.

Memfilter direktori atau file menggunakan pola glob

Pola "Glob" dapat digunakan untuk memfilter direktori dan file di dalam jalur.

Pola Deskripsi
? Cocok dengan karakter tunggal apa pun
* Cocok dengan nol atau lebih karakter
[abc] Cocok dengan satu karakter dari kumpulan karakter {a,b,c}.
[a-z] Cocok dengan satu karakter dari rentang karakter {a... z}.
[^a] Mencocokkan satu karakter yang bukan dari kumpulan karakter atau rentang {a}. Perhatikan bahwa karakter ^ harus muncul segera di sebelah kanan braket pembuka.
{ab,cd} Mencocokkan string dari kumpulan string {ab, cd}.
{ab,c{de, fh}} Mencocokkan string dari himpunan string {ab, cde, cfh}.

path Gunakan untuk menyediakan pola awalan, misalnya:

Phyton

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("/Volumes/catalog_name/schema_name/volume_name/*/files")

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("/Volumes/catalog_name/schema_name/volume_name/*/files")

Anda harus menggunakan opsi pathGlobFilter untuk secara eksplisit menyediakan pola akhiran. path hanya menyediakan filter awalan. Misalnya, jika Anda hanya ingin mengurai png file dalam direktori yang berisi file dengan akhiran yang berbeda, Anda dapat melakukan hal berikut:

Phyton

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load("/Volumes/catalog_name/schema_name/volume_name/path")

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")

Catatan

Perilaku globbing default Auto Loader berbeda dari perilaku default sumber file Spark lainnya. Tambahkan .option("cloudFiles.useStrictGlobber", "true") ke pembacaan Anda untuk menggunakan globbing yang sesuai dengan perilaku default Spark terhadap sumber file. Lihat tabel berikut ini untuk informasi selengkapnya tentang globbing:

Pola Jalur file Globber bawaan Globber yang ketat
/a/b /a/b/c/file.txt Ya Ya
/a/b file.txt /a/b_dir/c/ Tidak Tidak
/a/b /a/b.txt Tidak Tidak
/a/b/ /a/b.txt Tidak Tidak
/a/*/c/ /a/b/c/file.txt Ya Ya
/a/*/c/ /a/b/c/d/file.txt Ya Ya
/a/*/c/ /a/b/x/y/c/file.txt Ya Tidak
/a/*/c /a/b/c_file.txt Ya Tidak
/a/*/c/ /a/b/c_file.txt Ya Tidak
/a/*/c/ /a/*/cookie/file.txt Ya Tidak
/a/b* /a/b.txt Ya Ya
/a/b* /a/b/file.txt Ya Ya
/a/{0.txt,1.txt} /a/0.txt Ya Ya
/a/*/{0.txt,1.txt} /a/0.txt Tidak Tidak
/a/b/[cde-h]/i/ file.txt /a/b/c/i/ Ya Ya

Aktifkan ETL yang mudah

Cara mudah untuk memasukkan data Anda ke Delta Lake tanpa kehilangan data apa pun adalah dengan menggunakan pola berikut dan mengaktifkan inferensi skema dengan Auto Loader. Databricks merekomendasikan untuk menjalankan kode berikut dalam pekerjaan Azure Databricks agar dapat menghidupkan ulang aliran Anda secara otomatis saat skema data sumber Anda berubah. Secara default, skema disimpulkan sebagai jenis string, kesalahan penguraian apa pun (seharusnya tidak ada jika semuanya tetap sebagai string) akan dialihkan ke _rescued_data, dan kolom baru apa pun akan menggagalkan aliran dan memperbarui skema.

Phyton

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
  .load("/Volumes/catalog_name/schema_name/volume_name/source_data") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target>")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("/Volumes/catalog_name/schema_name/volume_name/source_data")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target>")

Mencegah kehilangan data dalam data yang terstruktur dengan baik

Ketika Anda mengetahui skema Anda tetapi ingin mengambil data yang tidak terduga, Databricks merekomendasikan penggunaan rescuedDataColumn.

Phyton

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("/Volumes/catalog_name/schema_name/volume_name/source_data") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target>")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("/Volumes/catalog_name/schema_name/volume_name/source_data")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target>")

Jika Anda ingin aliran Anda menghentikan pemrosesan saat bidang baru diperkenalkan yang tidak sesuai dengan skema Anda, Anda dapat menambahkan:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Mengaktifkan alur data semi-terstruktur yang fleksibel

Saat Anda menerima data dari vendor yang memperkenalkan kolom baru ke informasi yang mereka berikan, Anda mungkin tidak menyadari dengan tepat kapan mereka melakukannya, atau Anda mungkin tidak memiliki bandwidth untuk memperbarui alur data Anda. Anda sekarang dapat memanfaatkan evolusi skema untuk menghidupkan ulang aliran dan membiarkan Auto Loader memperbarui skema yang disimpulkan secara otomatis. Anda juga dapat memanfaatkan schemaHints untuk beberapa bidang "tanpa skema" yang mungkin disediakan vendor.

Phyton

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/Volumes/catalog_name/schema_name/volume_name/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target>")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/Volumes/catalog_name/schema_name/volume_name/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target>")

Mengubah data JSON bertumpuk

Karena Auto Loader menyimpulkan kolom JSON tingkat atas sebagai string, Anda dapat dibiarkan dengan objek JSON berlapis yang memerlukan transformasi lebih lanjut. Anda dapat menggunakan API akses data semi terstruktur untuk mengubah konten JSON yang kompleks lebih lanjut.

Phyton

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("/Volumes/catalog_name/schema_name/volume_name/nested_json") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("/Volumes/catalog_name/schema_name/volume_name/nested_json")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

Menyimpulkan data JSON berlapis

Saat Anda memiliki data berlapis, Anda bisa menggunakan opsi cloudFiles.inferColumnTypes untuk menyimpulkan struktur berlapis data Anda dan jenis kolom lainnya.

Phyton

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("/Volumes/catalog_name/schema_name/volume_name/nested_json")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("/Volumes/catalog_name/schema_name/volume_name/nested_json")

Memuat file CSV tanpa header

Contoh berikut menunjukkan cara memuat file CSV tanpa header menggunakan Auto Loader. Gunakan rescuedDataColumn untuk mengambil data apa pun yang tidak cocok dengan skema yang disediakan.

Phyton

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # ensure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Menerapkan skema pada file CSV dengan header

Contoh berikut menunjukkan cara menerapkan skema pada file CSV yang menyertakan header. Gunakan rescuedDataColumn untuk mengambil data apa pun yang tidak cocok dengan skema yang disediakan.

Phyton

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Memasukkan data gambar atau biner ke Delta Lake untuk pembelajaran mesin

Setelah data disimpan di Delta Lake, Anda dapat menjalankan inferensi terdistribusi pada data. Lihat Melakukan inferensi terdistribusi menggunakan panda UDF.

Phyton

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("/Volumes/catalog_name/schema_name/volume_name/images") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target>")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("/Volumes/catalog_name/schema_name/volume_name/images")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target>")

Sintaks Pemuat Otomatis untuk Alur Deklaratif Lakeflow Spark

Lakeflow Spark Declarative Pipelines menyediakan sintaks Python yang sedikit dimodifikasi untuk Auto Loader dan menambahkan dukungan SQL untuk Auto Loader. Contoh berikut menggunakan Auto Loader untuk membuat himpunan data dari file JSON menggunakan himpunan data pemesanan perjalanan sampel Wanderbricks :

Phyton

@dp.table
def booking_updates():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .option("multiLine", "true")
      .load("/Volumes/my_catalog/my_schema/my_volume/wanderbricks/booking_updates")
  )

@dp.table
def reviews():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .option("multiLine", "true")
      .load("/Volumes/my_catalog/my_schema/my_volume/wanderbricks/reviews")
  )

SQL

CREATE OR REFRESH STREAMING TABLE booking_updates
AS SELECT * FROM STREAM read_files(
  "/Volumes/my_catalog/my_schema/my_volume/wanderbricks/booking_updates",
  format => "json",
  multiLine => true
)

CREATE OR REFRESH STREAMING TABLE reviews
AS SELECT * FROM STREAM read_files(
  "/Volumes/my_catalog/my_schema/my_volume/wanderbricks/reviews",
  format => "json",
  multiLine => true
)

Anda dapat menggunakan opsi format yang didukung untuk Auto Loader. Opsi untuk read_files merupakan pasangan kunci-nilai. Untuk detail tentang format dan opsi yang didukung, lihat opsi .

CREATE OR REFRESH STREAMING TABLE my_table
AS SELECT *
  FROM STREAM read_files(
    "/Volumes/my_volume/path/to/files/*",
    option-key => option-value,
    ...
  )

Contoh berikut membaca file JSON multibaris dengan inferensi jenis kolom diaktifkan:

CREATE OR REFRESH STREAMING TABLE booking_updates
AS SELECT * FROM STREAM read_files(
  "/Volumes/my_catalog/my_schema/my_volume/wanderbricks/booking_updates",
  format => "json",
  multiLine => true,
  inferColumnTypes => true
)

Anda dapat menggunakan schema untuk menentukan format secara manual; Anda harus menentukan schema untuk format yang tidak mendukung inferensi skema:

Phyton

@dp.table
def booking_updates_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("booking_id LONG, booking_update_id LONG, user_id LONG, property_id LONG, status STRING, guests_count INT, total_amount DOUBLE, check_in DATE, check_out DATE, created_at TIMESTAMP, updated_at TIMESTAMP")
      .option("cloudFiles.format", "json")
      .option("multiLine", "true")
      .load("/Volumes/my_catalog/my_schema/my_volume/wanderbricks/booking_updates")
  )

SQL

CREATE OR REFRESH STREAMING TABLE booking_updates_raw
AS SELECT *
FROM STREAM read_files(
  "/Volumes/my_catalog/my_schema/my_volume/wanderbricks/booking_updates",
  format => "json",
  multiLine => true,
  schema => "booking_id LONG, booking_update_id LONG, user_id LONG, property_id LONG, status STRING, guests_count INT, total_amount DOUBLE, check_in DATE, check_out DATE, created_at TIMESTAMP, updated_at TIMESTAMP"
)

Catatan

Alur Deklaratif Lakeflow Spark secara otomatis mengonfigurasi dan mengelola skema dan direktori titik pemeriksaan saat menggunakan Auto Loader untuk membaca file. Namun, jika Anda mengonfigurasi salah satu direktori ini secara manual, melakukan penyegaran penuh tidak memengaruhi konten direktori yang dikonfigurasi. Databricks merekomendasikan untuk menggunakan direktori yang dikonfigurasi secara otomatis untuk menghindari efek samping yang tidak terduga selama pemrosesan.

Langkah berikutnya