Bagikan melalui


Membaca dan menulis file XML

Penting

Fitur ini ada di Pratinjau Publik.

Artikel ini menjelaskan cara membaca dan menulis file XML.

Extensible Markup Language (XML) adalah bahasa markup untuk memformat, menyimpan, dan berbagi data dalam format tekstual. Ini mendefinisikan sekumpulan aturan untuk menserialisasikan data mulai dari dokumen hingga struktur data sewenang-wenang.

Dukungan format file XML asli memungkinkan penyerapan, kueri, dan penguraian data XML untuk pemrosesan atau streaming batch. Ini dapat secara otomatis menyimpulkan dan mengembangkan skema dan jenis data, mendukung ekspresi SQL seperti from_xml, dan dapat menghasilkan dokumen XML. Ini tidak memerlukan jar eksternal dan bekerja dengan mulus dengan Auto Loader, read_files dan COPY INTO.

Persyaratan

Databricks Runtime 14.3 ke atas

Mengurai rekaman XML

Spesifikasi XML mengamanatkan struktur yang terbentuk dengan baik. Namun, spesifikasi ini tidak segera dipetakan ke format tabular. Anda harus menentukan rowTag opsi untuk menunjukkan elemen XML yang memetakan ke DataFrameRow. Elemen rowTag menjadi tingkat structatas . Elemen turunan dari rowTag menjadi bidang tingkat structatas .

Anda dapat menentukan skema untuk rekaman ini atau membiarkannya disimpulkan secara otomatis. Karena pengurai hanya memeriksa rowTag elemen, DTD dan entitas eksternal difilter.

Contoh berikut mengilustrasikan inferensi skema dan penguraian file XML menggunakan opsi yang berbeda rowTag :

Python

xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""

xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)

Scala

val xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)

Baca file XML dengan rowTag opsi sebagai "buku":

Python

df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)

Scala

val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)

Output:

root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)

+------------------------------------------------------------------------------+
|book                                                                          |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+

Baca file XML dengan rowTag sebagai "buku":

Python

df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:

Scala

val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:

Output:

root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)

+-----+-----------+---------------+
|_id  |author     |title          |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+

Opsi sumber data

Opsi sumber data untuk XML dapat ditentukan dengan cara berikut:

Untuk daftar opsi, lihat Opsi Auto Loader.

Dukungan XSD

Anda dapat memvalidasi setiap rekaman XML tingkat baris secara opsional dengan Definisi Skema XML (XSD). File XSD ditentukan dalam rowValidationXSDPath opsi . XSD tidak memengaruhi skema yang disediakan atau disimpulkan. Rekaman yang gagal validasi ditandai sebagai "rusak" dan ditangani berdasarkan opsi mode penanganan rekaman yang rusak yang dijelaskan di bagian opsi.

Anda dapat menggunakan XSDToSchema untuk mengekstrak skema Spark DataFrame dari file XSD. Ini hanya mendukung jenis sederhana, kompleks, dan urutan, dan hanya mendukung fungsionalitas XSD dasar.

import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path

val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
  <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="book">
      <xs:complexType>
        <xs:sequence>
          <xs:element name="author" type="xs:string" />
          <xs:element name="title" type="xs:string" />
          <xs:element name="genre" type="xs:string" />
          <xs:element name="price" type="xs:decimal" />
          <xs:element name="publish_date" type="xs:date" />
          <xs:element name="description" type="xs:string" />
        </xs:sequence>
        <xs:attribute name="id" type="xs:string" use="required" />
      </xs:complexType>
    </xs:element>
  </xs:schema>"""

dbutils.fs.put(xsdPath, xsdString, true)

val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))

Tabel berikut ini memperlihatkan konversi jenis data XSD ke jenis data Spark:

Jenis Data XSD Tipe Data Spark
boolean BooleanType
decimal DecimalType
unsignedLong DecimalType(38, 0)
double DoubleType
float FloatType
byte ByteType
short, unsignedByte ShortType
integer, , negativeIntegernonNegativeInteger, nonPositiveInteger, , positiveInteger,unsignedShort IntegerType
long, unsignedInt LongType
date DateType
dateTime TimestampType
Others StringType

Mengurai XML bertumpuk

Data XML dalam kolom bernilai string di DataFrame yang ada dapat diurai dengan schema_of_xml dan from_xml yang mengembalikan skema dan hasil yang diurai sebagai kolom baru struct . Data XML diteruskan sebagai argumen ke schema_of_xml dan from_xml harus menjadi satu catatan XML yang terbentuk dengan baik.

schema_of_xml

Sintaksis

schema_of_xml(xmlStr [, options] )

Argumen

  • xmlStr: Ekspresi STRING yang menentukan satu rekaman XML yang terbentuk dengan baik.
  • options: Arahan penentuan harfiah opsional MAP<STRING,STRING> .

Kembali

STRING yang memegang definisi struct dengan n bidang string tempat nama kolom berasal dari elemen XML dan nama atribut. nilai bidang menyimpan jenis SQL terformat turunan.

from_xml

Sintaksis

from_xml(xmlStr, schema [, options])

Argumen

  • xmlStr: Ekspresi STRING yang menentukan satu rekaman XML yang terbentuk dengan baik.
  • schema: Ekspresi STRING atau pemanggilan schema_of_xml fungsi.
  • options: Arahan penentuan harfiah opsional MAP<STRING,STRING> .

Kembali

Struktur dengan nama bidang dan jenis yang cocok dengan definisi skema. Skema harus didefinisikan sebagai nama kolom dan pasangan jenis data yang dipisahkan koma seperti yang digunakan dalam, misalnya, CREATE TABLE. Sebagian besar opsi yang diperlihatkan dalam opsi sumber data berlaku dengan pengecualian berikut:

  • rowTag: Karena hanya ada satu catatan XML, rowTag opsi tidak berlaku.
  • mode (default: PERMISSIVE): Memungkinkan mode untuk menangani rekaman yang rusak selama penguraian.
    • PERMISSIVE: Ketika memenuhi rekaman yang rusak, menempatkan string cacat ke dalam bidang yang dikonfigurasi oleh columnNameOfCorruptRecord, dan mengatur bidang cacat ke null. Untuk menyimpan rekaman yang rusak, Anda bisa mengatur bidang jenis string bernama columnNameOfCorruptRecord dalam skema yang ditentukan pengguna. Jika skema tidak memiliki bidang, skema akan menghilangkan rekaman yang rusak selama penguraian. Saat inferensi skema, skema secara implisit menambahkan columnNameOfCorruptRecord bidang dalam skema output.
    • FAILFAST: Melemparkan pengecualian ketika memenuhi rekaman yang rusak.

Konversi struktur

Karena perbedaan struktur antara DataFrame dan XML, ada beberapa aturan konversi dari data XML ke DataFrame dan dari DataFrame ke data XML. Perhatikan bahwa menangani atribut dapat dinonaktifkan dengan opsi excludeAttribute.

Konversi dari XML ke DataFrame

Atribut: Atribut dikonversi sebagai bidang dengan awalan attributePrefixjudul .

<one myOneAttrib="AAAA">
  <two>two</two>
  <three>three</three>
</one>

menghasilkan skema di bawah ini:

root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)

Data karakter dalam elemen yang berisi atribut atau elemen turunan: Ini diurai ke valueTag dalam bidang . Jika ada beberapa kemunculan data karakter, bidang dikonversi valueTag menjadi jenis array .

<one>
  <two myTwoAttrib="BBBBB">two</two>
  some value between elements
  <three>three</three>
  some other value between elements
</one>

menghasilkan skema di bawah ini:

root
 |-- _VALUE: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- two: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)

Konversi dari DataFrame ke XML

Elemen sebagai array dalam array: Menulis file XML dari DataFrame memiliki bidang ArrayType dengan elemennya seperti ArrayType yang akan memiliki bidang berlapis tambahan untuk elemen . Ini tidak akan terjadi dalam membaca dan menulis data XML tetapi menulis DataFrame bacaan dari sumber lain. Oleh karena itu, pulang-pergi dalam membaca dan menulis file XML memiliki struktur yang sama tetapi menulis DataFrame baca dari sumber lain dimungkinkan untuk memiliki struktur yang berbeda.

DataFrame dengan skema di bawah ini:

|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)

dan dengan data di bawah ini:

+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

menghasilkan file XML di bawah ini:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>

Nama elemen array yang tidak disebutkan namanya dalam DataFrame ditentukan oleh opsi arrayElementName (Default: item).

Kolom data yang diselamatkan

Kolom data yang disimpan memastikan bahwa Anda tidak pernah kehilangan atau ketinggalan data selama ETL. Anda bisa mengaktifkan kolom data yang diselamatkan untuk mengambil data apa pun yang tidak diurai karena satu atau beberapa bidang dalam rekaman memiliki salah satu masalah berikut:

  • Tidak ada dari skema yang disediakan
  • Tidak cocok dengan tipe data dari skema yang disediakan
  • Memiliki ketidakcocokan kasus dengan nama bidang dalam skema yang disediakan

Kolom data yang diselamatkan dikembalikan sebagai dokumen JSON yang berisi kolom yang diselamatkan, dan jalur file sumber rekaman. Untuk menghapus jalur file sumber dari kolom data yang diselamatkan, Anda dapat mengatur konfigurasi SQL berikut:

Python

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

Scala

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").

Anda bisa mengaktifkan kolom data yang diselamatkan dengan mengatur opsi rescuedDataColumn ke nama kolom, seperti _rescued_data dengan spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>).

Pengurai XML mendukung tiga mode saat mengurai rekaman: PERMISSIVE, , DROPMALFORMEDdan FAILFAST. Ketika digunakan bersama dengan rescuedDataColumn, ketidakcocokan tipe data tidak menyebabkan catatan dijatuhkan dalam mode DROPMALFORMED atau melemparkan kesalahan dalam mode FAILFAST. Hanya rekaman rusak (XML yang tidak lengkap atau cacat) yang dihilangkan atau melempar kesalahan.

Inferensi skema dan evolusi di Auto Loader

Untuk diskusi terperinci tentang topik ini dan opsi yang berlaku, lihat Mengonfigurasi inferensi dan evolusi skema di Auto Loader. Anda dapat mengonfigurasi Auto Loader untuk secara otomatis mendeteksi skema data XML yang dimuat, memungkinkan Anda menginisialisasi tabel tanpa secara eksplisit mendeklarasikan skema data dan mengembangkan skema tabel saat kolom baru diperkenalkan. Ini menghilangkan kebutuhan untuk melacak dan menerapkan perubahan skema secara manual dari waktu ke waktu.

Secara default, inferensi skema Auto Loader berusaha menghindari masalah evolusi skema karena ketidakcocokan jenis. Untuk format yang tidak mengodekan jenis data (JSON, CSV, dan XML), Auto Loader menyimpulkan semua kolom sebagai string, termasuk bidang berlapis dalam file XML. Apache Spark DataFrameReader menggunakan perilaku yang berbeda untuk inferensi skema, memilih jenis data untuk kolom di sumber XML berdasarkan data sampel. Untuk mengaktifkan perilaku ini dengan Auto Loader, atur opsi cloudFiles.inferColumnTypes ke true.

Auto Loader mendeteksi penambahan kolom baru saat memproses data Anda. Saat Auto Loader mendeteksi kolom baru, aliran berhenti dengan UnknownFieldException. Sebelum aliran Anda melemparkan kesalahan ini, Auto Loader melakukan inferensi skema pada batch mikro data terbaru dan memperbarui lokasi skema dengan skema terbaru dengan menggabungkan kolom baru ke akhir skema. Jenis data kolom yang ada tetap tidak berubah. Auto Loader mendukung mode yang berbeda untuk evolusi skema, yang Anda tetapkan dalam opsi cloudFiles.schemaEvolutionMode.

Anda dapat menggunakan petunjuk skema untuk memberlakukan informasi skema yang Anda ketahui dan harapkan pada skema yang disimpulkan. Ketika Anda tahu bahwa kolom adalah jenis data tertentu, atau jika Anda ingin memilih jenis data yang lebih umum (misalnya, ganda alih-alih bilangan bulat), Anda dapat memberikan jumlah petunjuk segan-segan untuk jenis data kolom sebagai string menggunakan sintaks spesifikasi skema SQL. Saat kolom data yang diselamatkan diaktifkan, bidang bernama dalam kasus selain dari skema dimuat ke _rescued_data kolom. Anda dapat mengubah perilaku ini dengan mengatur opsi readerCaseSensitive ke false, dalam hal ini Auto Loader membaca data dengan cara yang tidak peka huruf besar/kecil.

Contoh

Contoh di bagian ini menggunakan file XML yang tersedia untuk diunduh di repositori Apache Spark GitHub.

Baca dan tulis XML

Python

df = (spark.read
  .format('xml')
  .options(rowTag='book')
  .load(xmlPath))  # books.xml

selected_data = df.select("author", "_id")
(selected_data.write
  .options(rowTag='book', rootTag='books')
  .xml('newbooks.xml'))

Scala

val df = spark.read
  .option("rowTag", "book")
  .xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("newbooks.xml")

R

df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

Anda dapat menentukan skema secara manual saat membaca data:

Python

from pyspark.sql.types import StructType, StructField, StringType, DoubleType

custom_schema = StructType([
    StructField("_id", StringType(), True),
    StructField("author", StringType(), True),
    StructField("description", StringType(), True),
    StructField("genre", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("publish_date", StringType(), True),
    StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)

selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')

Scala

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")

R

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

SQL API

Sumber data XML dapat menyimpulkan jenis data:

DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;

Anda juga dapat menentukan nama dan jenis kolom di DDL. Dalam hal ini, skema tidak disimpulkan secara otomatis.

DROP TABLE IF EXISTS books;

CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");

Muat XML menggunakan COPY INTO

DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;

COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');

Membaca XML dengan validasi baris

Python

df = (spark.read
    .format("xml")
    .option("rowTag", "book")
    .option("rowValidationXSDPath", xsdPath)
    .load(inputPath))
df.printSchema()

Scala

val df = spark.read
  .option("rowTag", "book")
  .option("rowValidationXSDPath", xsdPath)
  .xml(inputPath)
df.printSchema

Mengurai XML berlapis (from_xml dan schema_of_xml)

Python

from pyspark.sql.functions import from_xml, schema_of_xml, lit, col

xml_data = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>
"""

df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()

Scala

import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}

val xmlData = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>""".stripMargin

val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()

from_xml dan schema_of_xml dengan SQL API

SELECT from_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>',
  schema_of_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>')
);

Muat XML dengan Auto Loader

Python

query = (spark
  .readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "xml")
    .option("rowTag", "book")
    .option("cloudFiles.inferColumnTypes", True)
    .option("cloudFiles.schemaLocation", schemaPath)
    .option("cloudFiles.schemaEvolutionMode", "rescue")
    .load(inputPath)
    .writeStream
    .format("delta")
    .option("mergeSchema", "true")
    .option("checkpointLocation", checkPointPath)
    .trigger(Trigger.AvailableNow()))

query = query.start(outputPath).awaitTermination()
df = spark.read.format("delta").load(outputPath)
df.show()

Scala

val query = spark
.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", true)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .format("delta")
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(Trigger.AvailableNow())

query.start(outputPath).awaitTermination()
val df = spark.read.format("delta").load(outputPath)
df.show()

Sumber Daya Tambahan:

Membaca dan menulis data XML menggunakan pustaka spark-xml