分享方式:


讀取和寫入 XML 檔案

重要

這項功能處於公開預覽狀態

本文說明如何讀取和寫入 XML 檔案。

可延伸標記語言 (XML) 是一種標記語言,用於格式化、儲存及共用文字格式的數據。 它會定義一組規則,以串行化從檔到任意數據結構的數據。

原生 XML 檔案格式支援可讓您擷取、查詢和剖析 XML 數據,以進行批處理或串流。 它可以自動推斷和演進架構和數據類型、支援 SQL 運算式,例如 from_xml,而且可以產生 XML 檔。 它不需要外部 jar,且可順暢地與自動載入器及 read_files COPY INTO搭配運作。 您可以選擇性地針對 XML 架構定義 (XSD) 驗證每個資料列層級 XML 記錄。

需求

Databricks Runtime 14.3 和更新版本

剖析 XML 記錄

XML 規格規定格式正確的結構。 不過,此規格不會立即對應至表格式格式。 您必須指定 rowTag 選項,以指出對應至 DataFrame Row的 XML 專案。 元素 rowTag 會變成最上層 struct。 的子項目 rowTag 會成為最上層 struct的欄位。

您可以指定此記錄的架構,或讓它自動推斷。 因為剖析器只會檢查 rowTag 元素,因此會篩選掉 DTD 和外部實體。

下列範例說明使用不同 rowTag 選項剖析 XML 檔案的架構推斷和剖析:

Python

xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""

xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)

Scala

val xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)

以 「books」 選項讀取 XML 檔案 rowTag

Python

df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)

Scala

val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)

輸出:

root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)

+------------------------------------------------------------------------------+
|book                                                                          |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+

以 「book」 讀取 XML rowTag 檔案:

Python

df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:

Scala

val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:

輸出:

root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)

+-----+-----------+---------------+
|_id  |author     |title          |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+

數據源選項

XML 的數據來源選項可以指定下列方式:

  • .option/.options下列方法:
    • DataFrameReader
    • DataFrameWriter
    • DataStreamReader
    • DataStreamWriter
  • 下列內建函式:
  • OPTIONS CREATE TABLE USING DATA_SOURCE 的 子句

如需選項清單,請參閱 自動載入器選項

XSD 支援

您可以選擇性地驗證 XML 架構定義 (XSD) 的每個資料列層級 XML 記錄。 在選項中 rowValidationXSDPath 指定 XSD 檔案。 XSD 不會影響提供的架構或推斷。 驗證失敗的記錄會標示為「損毀」,並根據選項區段中所述的損毀記錄處理模式選項進行處理。

您可以使用 XSDToSchema 從 XSD 檔案擷取 Spark DataFrame 架構。 它只支持簡單、複雜和循序類型,而且只支援基本的 XSD 功能。

import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path

val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
  <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="book">
      <xs:complexType>
        <xs:sequence>
          <xs:element name="author" type="xs:string" />
          <xs:element name="title" type="xs:string" />
          <xs:element name="genre" type="xs:string" />
          <xs:element name="price" type="xs:decimal" />
          <xs:element name="publish_date" type="xs:date" />
          <xs:element name="description" type="xs:string" />
        </xs:sequence>
        <xs:attribute name="id" type="xs:string" use="required" />
      </xs:complexType>
    </xs:element>
  </xs:schema>"""

dbutils.fs.put(xsdPath, xsdString, true)

val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))

下表顯示 XSD 資料類型轉換成 Spark 資料類型:

XSD 數據類型 Spark 資料類型
boolean BooleanType
decimal DecimalType
unsignedLong DecimalType(38, 0)
double DoubleType
float FloatType
byte ByteType
short, unsignedByte ShortType
integer、、negativeIntegernonNegativeIntegernonPositiveInteger、、positiveIntegerunsignedShort IntegerType
long, unsignedInt LongType
date DateType
dateTime TimestampType
Others StringType

剖析巢狀 XML

現有 DataFrame 中字串值資料行中的 XML 數據可以剖析, schema_of_xml 並以 from_xmlstruct 數據行傳回架構和剖析的結果。 做為自變數傳遞至 schema_of_xml 的 XML 數據,而且 from_xml 必須是單一格式正確的 XML 記錄。

schema_of_xml

語法

schema_of_xml(xmlStr [, options] )

引數

  • xmlStr:指定單一格式正確的 XML 記錄的 STRING 運算式。
  • options:指定指示詞的選擇性常 MAP<STRING,STRING> 值。

傳回

STRING,包含具有 n 個字串字段的結構定義,其中數據行名稱衍生自 XML 元素和屬性名稱。 域值會保存衍生的格式化 SQL 類型。

from_xml

語法

from_xml(xmlStr, schema [, options])

引數

  • xmlStr:指定單一格式正確的 XML 記錄的 STRING 運算式。
  • schema:函式的 schema_of_xml STRING 運算式或調用。
  • options:指定指示詞的選擇性常 MAP<STRING,STRING> 值。

傳回

結構,具有符合架構定義的功能變數名稱和型別。 架構必須定義為逗號分隔的資料列名稱和資料類型群組,例如 CREATE TABLE 資料來源選項中顯示的 大部分選項 都適用下列例外狀況:

  • rowTag:因為只有一個 XML 記錄,因此 rowTag 選項不適用。
  • mode (預設值: ): PERMISSIVE允許在剖析期間處理損毀記錄的模式。
    • PERMISSIVE:當它符合損毀的記錄時,會將格式錯誤的字串放入 所設定 columnNameOfCorruptRecord的欄位,並將格式不正確的欄位設定為 null。 若要保留損毀的記錄,您可以在使用者定義的架構中設定名為 columnNameOfCorruptRecord 的字串類型字段。 如果架構沒有 欄位,則會在剖析期間卸除損毀的記錄。 在推斷架構時,它會隱含地在輸出架構中加入 columnNameOfCorruptRecord 字段。
    • FAILFAST:遇到損毀的記錄時擲回例外狀況。

結構轉換

由於 DataFrame 和 XML 之間的結構差異,因此有一些從 XML 數據到 DataFrame DataFrame XML 數據的轉換規則。 請注意,使用 選項 excludeAttribute可以停用處理屬性。

從 XML 轉換成 DataFrame

屬性:屬性會轉換成標題前置詞attributePrefix為 的欄位。

<one myOneAttrib="AAAA">
  <two>two</two>
  <three>three</three>
</one>

會產生下列架構:

root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)

包含 attribute(s) 或子元素的專案中的字元數據: 這些數據會剖析成 valueTag 欄位。 如果有多個字元數據出現,欄位 valueTagarray 轉換成類型。

<one>
  <two myTwoAttrib="BBBBB">two</two>
  some value between elements
  <three>three</three>
  some other value between elements
</one>

會產生下列架構:

root
 |-- _VALUE: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- two: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)

從 DataFrame 轉換成 XML

元素做為陣列中的陣列:撰寫 XML 檔案 DataFrame 時,將具有具有其元素的欄位 ArrayType ,如同 ArrayType 為專案加上額外的巢狀字段。 這不會發生在讀取和寫入 XML 數據,而是從其他來源寫入 DataFrame 讀取時發生。 因此,讀取和寫入 XML 檔案的往返都有相同的結構,但從其他來源寫入 DataFrame 讀取可能會有不同的結構。

具有下列架構的 DataFrame:

|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)

以及下列資料:

+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

會產生下列 XML 檔案:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>

DataFrame 未命名陣列的元素名稱是由 選項 arrayElementName 指定 (預設值: item)。

已獲救的數據行

已獲救的數據行可確保您在 ETL 期間永遠不會遺失或錯過數據。 您可以啟用已獲救的數據行來擷取未剖析的任何數據,因為記錄中的一或多個字段有下列其中一個問題:

  • 不存在提供的架構
  • 不符合所提供架構的數據類型
  • 與所提供架構中的功能變數名稱不符

已獲救的數據行會以 JSON 檔的形式傳回,其中包含已獲救的數據行,以及記錄的來源檔案路徑。 若要從已獲救的數據行中移除來源檔案路徑,您可以設定下列 SQL 組態:

Python

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

Scala

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").

您可以在讀取資料時,將 選項 rescuedDataColumn 設定為數據行名稱,例如 _rescued_data 使用 spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>)來啟用已獲救的數據行。

剖析記錄時,XML 剖析器支援三種模式: PERMISSIVEDROPMALFORMEDFAILFAST。 搭配 rescuedDataColumn使用 時,數據類型不符會導致記錄在模式中 DROPMALFORMED 卸除或以 FAILFAST 模式擲回錯誤。 只有損毀的記錄(不完整或格式不正確的 XML)會卸除或擲回錯誤。

自動載入器中的架構推斷和演進

如需本主題和適用選項的詳細討論,請參閱 設定自動載入器中的架構推斷和演進。 您可以設定自動載入器來自動偵測已載入之 XML 數據的架構,讓您不需要明確宣告數據架構並隨著新數據行匯入而演進數據表架構,即可初始化數據表。 這樣就不需要在一段時間內手動追蹤和套用架構變更。

根據預設,自動載入器架構推斷會因為類型不符而尋求避免架構演進問題。 對於未編碼數據類型的格式(JSON、CSV 和 XML),自動載入器會將所有數據行推斷為字串,包括 XML 檔案中的巢狀字段。 Apache Spark DataFrameReader 會針對架構推斷使用不同的行為,根據範例數據選取 XML 來源中數據行的數據類型。 若要使用自動載入器開啟此行為,請將 選項 cloudFiles.inferColumnTypes 設定為 true

自動載入器會在處理您的數據時偵測新數據行的新增。 當自動載入器偵測到新的數據行時,數據流會以 UnknownFieldException停止。 在數據流擲回此錯誤之前,自動載入器會先對最新的微批次數據執行架構推斷,並藉由將新數據行合併至架構結尾,以最新的架構來更新架構位置。 現有數據行的數據類型保持不變。 自動載入器支援架構演進的不同模式,您可以在 選項 cloudFiles.schemaEvolutionMode中設定。

您可以使用 架構提示 來強制執行所知道且預期於推斷架構上的架構資訊。 當您知道資料行是特定資料類型,或者如果您想要選擇更一般數據類型(例如雙精度浮點數而非整數),您可以使用 SQL 架構規格語法,為數據行數據類型提供任意數目的提示作為字元串。 啟用獲救的數據行時,在架構以外的案例中命名的欄位會載入至數據 _rescued_data 行。 您可以將 選項 readerCaseSensitive 設定為 false來變更此行為,在此情況下,自動載入器會以不區分大小寫的方式讀取數據。

範例

本節中的範例會使用可在 Apache Spark GitHub 存放庫中下載的 XML 檔案。

讀取和寫入 XML

Python

df = (spark.read
  .format('xml')
  .options(rowTag='book')
  .load(xmlPath))  # books.xml

selected_data = df.select("author", "_id")
(selected_data.write
  .options(rowTag='book', rootTag='books')
  .xml('newbooks.xml'))

Scala

val df = spark.read
  .option("rowTag", "book")
  .xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("newbooks.xml")

R

df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

讀取資料時,您可以手動指定架構:

Python

from pyspark.sql.types import StructType, StructField, StringType, DoubleType

custom_schema = StructType([
    StructField("_id", StringType(), True),
    StructField("author", StringType(), True),
    StructField("description", StringType(), True),
    StructField("genre", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("publish_date", StringType(), True),
    StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)

selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')

Scala

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")

R

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

SQL API

XML 資料來源可以推斷資料型態:

DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;

您也可以在 DDL 中指定資料行名稱和類型。 在此情況下,不會自動推斷架構。

DROP TABLE IF EXISTS books;

CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");

使用 COPY INTO 載入 XML

DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;

COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');

讀取具有數據列驗證的 XML

Python

df = (spark.read
    .format("xml")
    .option("rowTag", "book")
    .option("rowValidationXSDPath", xsdPath)
    .load(inputPath))
df.printSchema()

Scala

val df = spark.read
  .option("rowTag", "book")
  .option("rowValidationXSDPath", xsdPath)
  .xml(inputPath)
df.printSchema

剖析巢狀 XML (from_xml 和 schema_of_xml)

Python

from pyspark.sql.functions import from_xml, schema_of_xml, lit, col

xml_data = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>
"""

df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()

Scala

import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}

val xmlData = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>""".stripMargin

val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()

使用 SQL API from_xml和schema_of_xml

SELECT from_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>',
  schema_of_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>')
);

使用自動載入器載入 XML

Python

query = (spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", True)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(availableNow=True)
  .toTable("table_name")
)

Scala

val query = spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", true)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(Trigger.AvailableNow()
  .toTable("table_name")
  )

其他資源

使用spark-xml 連結庫讀取和寫入 XML 數據