Чтение и запись XML-данных с помощью библиотеки spark-xml
Внимание
Поддержка этой документации прекращена, она может больше не обновляться. Продукты, услуги или технологии, упомянутые в его содержимом, не поддерживаются официально или проверяются Databricks.
Поддержка формата собственного XML-файла доступна как общедоступная предварительная версия. См. статью "Чтение и запись XML-файлов".
В этой статье описано чтение и запись XML-файла в качестве источника данных Apache Spark.
Требования
Создайте библиотеку
spark-xml
как библиотеку Maven. Для координаты Maven укажите:- Databricks Runtime 7.x и более поздних версий:
com.databricks:spark-xml_2.12:<release>
Дополнительные сведения см. в выпусках
spark-xml
последней<release>
версии.- Databricks Runtime 7.x и более поздних версий:
Установите библиотеку в кластер.
Пример
В примере в этом разделе используется XML-файл books.
Получите XML-файл books:
$ wget https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml
Отправьте файл в хранилище DBFS.
Чтение и запись XML-данных
SQL
/*Infer schema*/
CREATE TABLE books
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")
/*Specify column names and types*/
CREATE TABLE books (author string, description string, genre string, _id string, price double, publish_date string, title string)
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")
Scala
// Infer schema
import com.databricks.spark.xml._ // Add the DataFrame.read.xml() method
val df = spark.read
.option("rowTag", "book")
.xml("dbfs:/books.xml")
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("dbfs:/newbooks.xml")
// Specify schema
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read
.option("rowTag", "book")
.schema(customSchema)
.xml("books.xml")
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("dbfs:/newbooks.xml")
R
# Infer schema
library(SparkR)
sparkR.session("local[4]", sparkPackages = c("com.databricks:spark-xml_2.12:<release>"))
df <- read.df("dbfs:/books.xml", source = "xml", rowTag = "book")
# Default `rootTag` and `rowTag`
write.df(df, "dbfs:/newbooks.xml", "xml")
# Specify schema
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- read.df("dbfs:/books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
write.df(df, "dbfs:/newbooks.xml", "xml", "overwrite")
Параметры
- Читать
path
: расположение XML-файлов. Принимает стандартные выражения подстановки Hadoop.rowTag
: тег строки, который будет обрабатываться как строка. Например, в этом XML-файле<books><book><book>...</books>
будет использоваться значениеbook
. По умолчанию —ROW
.samplingRatio
: коэффициент выборки для выведения схемы (0,0 ~ 1). По умолчанию 1. Допустимые типы:StructType
,ArrayType
,StringType
,LongType
,DoubleType
,BooleanType
,TimestampType
иNullType
, если схема не указана.excludeAttribute
: определяет, следует ли исключать атрибуты в элементах. По умолчанию — false.nullValue
: значение, обрабатываемое как значениеnull
. По умолчанию —""
.mode
: режим работы с поврежденными записями. По умолчанию —PERMISSIVE
.PERMISSIVE
:- При обнаружении поврежденной записи устанавливает для всех полей значение
null
и помещает неправильную строку в новое поле, настроенное с помощьюcolumnNameOfCorruptRecord
. - При обнаружении поля с неверным типом данных устанавливает для поля, вызывающего ошибку, значение
null
.
- При обнаружении поврежденной записи устанавливает для всех полей значение
DROPMALFORMED
: игнорирует поврежденные записи.FAILFAST
: вызывает исключение при обнаружении поврежденных записей.
inferSchema
: еслиtrue
, пытается определить подходящий тип для каждого результирующего столбца DataFrame, например, логическое выражение, числовой тип или данные. Еслиfalse
, все результирующие столбцы будут строкового типа. По умолчанию —true
.columnNameOfCorruptRecord
: имя нового поля, в котором хранятся неправильные строки. По умолчанию —_corrupt_record
.attributePrefix
: префикс атрибутов для различения атрибутов и элементов. Это префикс для имен полей. По умолчанию —_
.valueTag
: тег, используемый для значения при наличии в элементе атрибутов, не имеющих дочерних элементов. По умолчанию —_VALUE
.charset
: по умолчанию принимает значениеUTF-8
, но может быть задано другое допустимое имя кодировки.ignoreSurroundingSpaces
: определяет, следует ли пропускать пробелы вокруг значений. По умолчанию — false.rowValidationXSDPath
: путь к XSD-файлу, который используется для проверки XML для каждой строки. Строки, которые не удается проверить, обрабатываются как ошибки синтаксического анализа, как описано выше. В противном случае XSD не влияет на указанную или логически выведенную схему. Если тот же локальный путь еще не виден на исполнителях в кластере, то XSD и все остальные, от которых он зависит, должны быть добавлены в исполнители Spark с помощью SparkContext. addFile. В этом случае для использования локальной XSD/foo/bar.xsd
следует вызватьaddFile("/foo/bar.xsd")
и передать"bar.xsd"
какrowValidationXSDPath
.
- Писать
path
: расположение для записи файлов.rowTag
: тег строки, который будет обрабатываться как строка. Например, в этом XML-файле<books><book><book>...</books>
будет использоваться значениеbook
. По умолчанию —ROW
.rootTag
: корневой тег, который будет считаться корневым. Например, в этом XML-файле<books><book><book>...</books>
будет использоваться значениеbooks
. По умолчанию —ROWS
.nullValue
: значение для записи значенияnull
. Значение по умолчанию — строка"null"
. Когда"null"
, атрибуты и элементы для полей не записываются.attributePrefix
: префикс атрибутов для различения атрибутов и элементов. Это префикс для имен полей. По умолчанию —_
.valueTag
: тег, используемый для значения при наличии в элементе атрибутов, не имеющих дочерних элементов. По умолчанию —_VALUE
.compression
: кодек сжатия, используемый при сохранении в файл. Должно быть полным именем класса, реализующегоorg.apache.hadoop.io.compress.CompressionCodec
, или одним из коротких имен без учета регистра (bzip2
,gzip
,lz4
иsnappy
). По умолчанию сжатие не выполняется.
Поддерживает использование сокращенных имен; вместо xml
можно использовать com.databricks.spark.xml
.
Поддержка XSD
С помощью rowValidationXSDPath
можно проверить отдельные строки в схеме XSD.
Используйте служебную программу com.databricks.spark.xml.util.XSDToSchema
для извлечения схемы DataFrame Spark из определенных XSD-файлов. Она поддерживает только простые, сложные и последовательные типы, только основные функции XSD и является экспериментальной.
import com.databricks.spark.xml.util.XSDToSchema
import java.nio.file.Paths
val schema = XSDToSchema.read(Paths.get("/path/to/your.xsd"))
val df = spark.read.schema(schema)....xml(...)
Анализ вложенных XML-файлов
Несмотря на то, что метод from_xml
в основном используется для преобразования XML-файла в DataFrame, его можно также использовать для анализа XML-содержимого в столбце с строковыми значениями в существующем DataFrame и его добавления в качестве нового столбца с проанализированными результатами в виде структуры с помощью:
import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._
val df = ... /// DataFrame with XML in column 'payload'
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))
Примечание.
mode
:- Если задано значение
PERMISSIVE
(параметр по умолчанию), вместо режима анализа используется режим по умолчанию —DROPMALFORMED
. Если включить в схему дляfrom_xml
столбец, соответствующийcolumnNameOfCorruptRecord
, то в режимеPERMISSIVE
в этот столбец в результирующей структуре будут выводится неправильные записи. - Если задано значение
DROPMALFORMED
, XML-значения, которые были неправильно проанализированы, будут отражены в столбце в видеnull
. Строки не удаляются.
- Если задано значение
from_xml
преобразует массивы строк, содержащих XML, в массивы проанализированных структур. Вместо этого используйтеschema_of_xml_array
.from_xml_string
является альтернативой для использования в UDF, которая предназначена для работы непосредственно со строкой, а не столбцом.
Правила преобразования
Из-за структурных различий между DataFrame и XML необходимо соблюдать некоторые правила преобразования данных из XML в DataFrame и из DataFrame в XML. Можно отключить обработку атрибутов с помощью параметра excludeAttribute
.
Преобразование XML в DataFrame
Атрибуты: атрибуты преобразуются как поля с префиксом, указанным в параметре
attributePrefix
. ЕслиattributePrefix
имеет значение_
, документ<one myOneAttrib="AAAA"> <two>two</two> <three>three</three> </one>
создает схему:
root |-- _myOneAttrib: string (nullable = true) |-- two: string (nullable = true) |-- three: string (nullable = true)
Если у элемента есть атрибуты, но нет дочерних элементов, значение атрибута помещается в отдельное поле, указанное в параметре
valueTag
. ЕслиvalueTag
имеет значение_VALUE
, документ<one> <two myTwoAttrib="BBBBB">two</two> <three>three</three> </one>
создает схему:
root |-- two: struct (nullable = true) | |-- _VALUE: string (nullable = true) | |-- _myTwoAttrib: string (nullable = true) |-- three: string (nullable = true)
Преобразование DataFrame в XML
Написание XML-файла из DataFrame ArrayType
с полем с его элементом, как ArrayType
и дополнительное вложенное поле для элемента. Это не произойдет при чтении и записи XML-данных, но при записи кадра данных из других источников. Таким образом, цикл чтения и записи XML-файлов имеет одинаковую структуру, но запись данных DataFrame, считанных из других источников, может иметь другую структуру.
DataFrame со схемой:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
и данными:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
создает XML-файл:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>