使用連結
本文說明如何將 XML 檔案讀取和寫入為 Apache Spark 數據源。
需求
將
spark-xml庫建立為Maven函式庫。 針對 Maven 座標,指定:- Databricks Runtime 7.x 和更新版本:
com.databricks:spark-xml_2.12:<release>
- Databricks Runtime 7.x 和更新版本:
範例
本節中的範例會使用 書籍 XML 文件。
擷取書籍 XML 檔案:
$ wget https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml將檔案上傳至 DBFS。
讀取和寫入 XML 數據
SQL
/*Infer schema*/
CREATE TABLE books
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")
/*Specify column names and types*/
CREATE TABLE books (author string, description string, genre string, _id string, price double, publish_date string, title string)
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")
程式語言 Scala
// Infer schema
import com.databricks.spark.xml._ // Add the DataFrame.read.xml() method
val df = spark.read
.option("rowTag", "book")
.xml("dbfs:/books.xml")
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("dbfs:/newbooks.xml")
// Specify schema
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read
.option("rowTag", "book")
.schema(customSchema)
.xml("books.xml")
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("dbfs:/newbooks.xml")
R
# Infer schema
library(SparkR)
sparkR.session("local[4]", sparkPackages = c("com.databricks:spark-xml_2.12:<release>"))
df <- read.df("dbfs:/books.xml", source = "xml", rowTag = "book")
# Default `rootTag` and `rowTag`
write.df(df, "dbfs:/newbooks.xml", "xml")
# Specify schema
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- read.df("dbfs:/books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
write.df(df, "dbfs:/newbooks.xml", "xml", "overwrite")
選項
- 閱讀
-
path:XML 檔案的位置。 接受標準Hadoop通配符表達式。 -
rowTag:用於標識為行的行標籤。 例如,在此 XML<books><book><book>...</books>中,值會是book。 預設值為ROW。 -
samplingRatio:推斷架構的取樣比例(0.0 ~ 1)。 預設 為 1。 除非您提供架構,否則可能的型別為StructType、ArrayType、StringType、LongTypeDoubleType、、BooleanTypeTimestampType和NullType。 -
excludeAttribute:是否要排除元素中的屬性。 預設為 False。 -
nullValue:視為null值的值。 預設值為""。 -
mode:處理損毀記錄的模式。 預設值為PERMISSIVE。-
PERMISSIVE:- 當它遇到損毀的記錄時,將所有欄位設定為
null,並將格式錯誤的字串放入設定在columnNameOfCorruptRecord的新欄位。 - 當它遇到錯誤資料類型的欄位時,請將違規欄位設定為
null。
- 當它遇到損毀的記錄時,將所有欄位設定為
-
DROPMALFORMED:忽略損毀的記錄。 -
FAILFAST:偵測到損毀的記錄時擲回例外狀況。
-
-
inferSchema:如果true為 ,則嘗試推斷每個產生的DataFrame數據行的適當類型,例如布爾值、數值或日期類型。 如果false,則所有產生的數據行都是字串類型。 預設值為true。 -
columnNameOfCorruptRecord:存放格式不正確字串的新欄位名稱。 預設值為_corrupt_record。 -
attributePrefix:屬性的前置詞,以便區分屬性和元素。 這是欄位名稱的前置詞。 預設值為_。 -
valueTag:當元素中沒有子元素的屬性時,用於值的標記。 預設值為_VALUE。 -
charset:預設值為 ,UTF-8但可以設定為其他有效的字元集名稱。 -
ignoreSurroundingSpaces:是否應該略過周圍值的空格符。 預設為 False。 -
rowValidationXSDPath:XSD 檔案的路徑,用來驗證每個數據列的 XML。 無法驗證的資料列視為上述剖析錯誤。 XSD 不會以其他方式影響已提供或推斷的架構。 如果叢集中的執行程式上還看不到相同的本機路徑,則應使用 SparkContext.addFile 將 XSD 及其相依的所有其他檔案新增至 Spark 執行程式。 在這種情況下,若要使用本機 XSD/foo/bar.xsd,請呼叫addFile("/foo/bar.xsd")並將"bar.xsd"傳遞為rowValidationXSDPath。
-
- 寫
-
path:要寫入檔案的位置。 -
rowTag:用於標識為行的行標籤。 例如,在此 XML<books><book><book>...</books>中,值會是book。 預設值為ROW。 -
rootTag:要視為根目錄的根標記。 例如,在此 XML<books><book><book>...</books>中,值會是books。 預設值為ROWS。 -
nullValue:要寫入的null值。 預設值為字串"null"。 當 為 時"null",它不會寫入欄位的屬性和元素。 -
attributePrefix:屬性的前置詞,用來區分屬性和元素。 這是欄位名稱的前置詞。 預設值為_。 -
valueTag:當元素中沒有子元素的屬性時,用於值的標記。 預設值為_VALUE。 -
compression:儲存至檔案時要使用的壓縮編解碼器。 應該是實作org.apache.hadoop.io.compress.CompressionCodec類別的完整名稱,或是不區分大小寫的簡短名稱之一 (bzip2、gzip、lz4和snappy)。 預設值為無壓縮。
-
支援縮短的名稱使用方式;您可以使用 xml ,而不是 com.databricks.spark.xml。
XSD 支援
您可以使用 rowValidationXSDPath 驗證個別資料列是否符合 XSD 架構。
您可以使用 公用程式 com.databricks.spark.xml.util.XSDToSchema ,從 某些 XSD 檔案擷取 Spark DataFrame 架構。 它只支持簡單、複雜和循序類型,只有基本的 XSD 功能,而且是實驗性的。
import com.databricks.spark.xml.util.XSDToSchema
import java.nio.file.Paths
val schema = XSDToSchema.read(Paths.get("/path/to/your.xsd"))
val df = spark.read.schema(schema)....xml(...)
剖析巢狀 XML
雖然主要用來將 XML 檔案轉換成 DataFrame,但您也可以使用 from_xml 方法,在現有 DataFrame 的字串值數據行中剖析 XML,並將它新增為具有剖析結果的新數據行作為結構:
import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._
val df = ... /// DataFrame with XML in column 'payload'
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))
注意
-
mode:- 如果設定為
PERMISSIVE(預設值),則剖析模式會自動變為DROPMALFORMED。 如果您在架構中包含from_xml符合columnNameOfCorruptRecord的數據行,則PERMISSIVE模式會將格式不正確的記錄輸出至結果結構中的該數據行。 - 如果設定為
DROPMALFORMED,則未正確剖析的 XML 值會使該欄的值為null。 不會刪除任何資料列。
- 如果設定為
-
from_xml將包含 XML 的字串數位轉換成剖析結構陣列。 請改用schema_of_xml_array。 -
from_xml_string是用於 UDF 的替代方案,它直接在字串上運作,而不是在數據列上。
轉換規則
由於 DataFrame 與 XML 之間的結構差異,從 XML 數據轉換成 DataFrame,以及從 DataFrame 轉換成 XML 數據有一些轉換規則。 您可以使用 選項 excludeAttribute停用處理屬性。
將 XML 轉換為 DataFrame
屬性:屬性會轉換成具有 選項中所指定前置詞的
attributePrefix欄位。 如果attributePrefix為_,則文檔<one myOneAttrib="AAAA"> <two>two</two> <three>three</three> </one>會產生架構:
root |-- _myOneAttrib: string (nullable = true) |-- two: string (nullable = true) |-- three: string (nullable = true)如果專案具有屬性,但沒有子專案,則屬性值會放在 選項中指定的
valueTag個別欄位。 如果valueTag為_VALUE,則文檔<one> <two myTwoAttrib="BBBBB">two</two> <three>three</three> </one>會產生架構:
root |-- two: struct (nullable = true) | |-- _VALUE: string (nullable = true) | |-- _myTwoAttrib: string (nullable = true) |-- three: string (nullable = true)
將 DataFrame 轉換為 XML
從 DataFrame 撰寫 XML 檔案,當有一個欄位 ArrayType 且其元素為 ArrayType 時,該元素會有一個額外的巢狀欄位。 這種情況不會發生在讀取和寫入 XML 數據的過程中,而是會發生在寫入從其他來源讀取的 DataFrame 的時候。 因此,讀取和寫入 XML 檔案的過程會保持相同的結構,但從其他來源讀取的 DataFrame 可能會有不同的結構。
具有架構的資料框:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
資料:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
會產生 XML 檔案:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>