Поделиться через


Чтение и запись XML-данных с помощью библиотеки spark-xml

Внимание

Поддержка этой документации прекращена, она может больше не обновляться. Продукты, услуги или технологии, упомянутые в его содержимом, не поддерживаются официально или проверяются Databricks.

Поддержка формата собственного XML-файла доступна как общедоступная предварительная версия. См. статью "Чтение и запись XML-файлов".

В этой статье описано чтение и запись XML-файла в качестве источника данных Apache Spark.

Требования

  1. Создайте библиотеку spark-xml как библиотеку Maven. Для координаты Maven укажите:

    • Databricks Runtime 7.x и более поздних версий: com.databricks:spark-xml_2.12:<release>

    Дополнительные сведения см. в выпусках spark-xml последней <release>версии.

  2. Установите библиотеку в кластер.

Пример

В примере в этом разделе используется XML-файл books.

  1. Получите XML-файл books:

    $ wget https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml
    
  2. Отправьте файл в хранилище DBFS.

Чтение и запись XML-данных

SQL

/*Infer schema*/

CREATE TABLE books
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")

/*Specify column names and types*/

CREATE TABLE books (author string, description string, genre string, _id string, price double, publish_date string, title string)
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")

Scala

// Infer schema

import com.databricks.spark.xml._ // Add the DataFrame.read.xml() method

val df = spark.read
  .option("rowTag", "book")
  .xml("dbfs:/books.xml")

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("dbfs:/newbooks.xml")

// Specify schema

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))

val df = spark.read
  .option("rowTag", "book")
  .schema(customSchema)
  .xml("books.xml")

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("dbfs:/newbooks.xml")

R

# Infer schema

library(SparkR)

sparkR.session("local[4]", sparkPackages = c("com.databricks:spark-xml_2.12:<release>"))

df <- read.df("dbfs:/books.xml", source = "xml", rowTag = "book")

# Default `rootTag` and `rowTag`
write.df(df, "dbfs:/newbooks.xml", "xml")

# Specify schema

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- read.df("dbfs:/books.xml", source = "xml", schema = customSchema, rowTag = "book")

# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
write.df(df, "dbfs:/newbooks.xml", "xml", "overwrite")

Параметры

  • Читать
    • path: расположение XML-файлов. Принимает стандартные выражения подстановки Hadoop.
    • rowTag: тег строки, который будет обрабатываться как строка. Например, в этом XML-файле <books><book><book>...</books> будет использоваться значение book. По умолчанию — ROW.
    • samplingRatio: коэффициент выборки для выведения схемы (0,0 ~ 1). По умолчанию 1. Допустимые типы: StructType, ArrayType, StringType, LongType, DoubleType, BooleanType, TimestampType и NullType, если схема не указана.
    • excludeAttribute: определяет, следует ли исключать атрибуты в элементах. По умолчанию — false.
    • nullValue: значение, обрабатываемое как значение null. По умолчанию — "".
    • mode: режим работы с поврежденными записями. По умолчанию — PERMISSIVE.
      • PERMISSIVE:
        • При обнаружении поврежденной записи устанавливает для всех полей значение null и помещает неправильную строку в новое поле, настроенное с помощью columnNameOfCorruptRecord.
        • При обнаружении поля с неверным типом данных устанавливает для поля, вызывающего ошибку, значение null.
      • DROPMALFORMED: игнорирует поврежденные записи.
      • FAILFAST: вызывает исключение при обнаружении поврежденных записей.
    • inferSchema: если true, пытается определить подходящий тип для каждого результирующего столбца DataFrame, например, логическое выражение, числовой тип или данные. Если false, все результирующие столбцы будут строкового типа. По умолчанию — true.
    • columnNameOfCorruptRecord: имя нового поля, в котором хранятся неправильные строки. По умолчанию — _corrupt_record.
    • attributePrefix: префикс атрибутов для различения атрибутов и элементов. Это префикс для имен полей. По умолчанию — _.
    • valueTag: тег, используемый для значения при наличии в элементе атрибутов, не имеющих дочерних элементов. По умолчанию — _VALUE.
    • charset: по умолчанию принимает значение UTF-8, но может быть задано другое допустимое имя кодировки.
    • ignoreSurroundingSpaces: определяет, следует ли пропускать пробелы вокруг значений. По умолчанию — false.
    • rowValidationXSDPath: путь к XSD-файлу, который используется для проверки XML для каждой строки. Строки, которые не удается проверить, обрабатываются как ошибки синтаксического анализа, как описано выше. В противном случае XSD не влияет на указанную или логически выведенную схему. Если тот же локальный путь еще не виден на исполнителях в кластере, то XSD и все остальные, от которых он зависит, должны быть добавлены в исполнители Spark с помощью SparkContext. addFile. В этом случае для использования локальной XSD /foo/bar.xsd следует вызвать addFile("/foo/bar.xsd") и передать "bar.xsd" как rowValidationXSDPath.
  • Писать
    • path: расположение для записи файлов.
    • rowTag: тег строки, который будет обрабатываться как строка. Например, в этом XML-файле <books><book><book>...</books> будет использоваться значение book. По умолчанию — ROW.
    • rootTag: корневой тег, который будет считаться корневым. Например, в этом XML-файле <books><book><book>...</books> будет использоваться значение books. По умолчанию — ROWS.
    • nullValue: значение для записи значения null. Значение по умолчанию — строка "null". Когда "null", атрибуты и элементы для полей не записываются.
    • attributePrefix: префикс атрибутов для различения атрибутов и элементов. Это префикс для имен полей. По умолчанию — _.
    • valueTag: тег, используемый для значения при наличии в элементе атрибутов, не имеющих дочерних элементов. По умолчанию — _VALUE.
    • compression: кодек сжатия, используемый при сохранении в файл. Должно быть полным именем класса, реализующего org.apache.hadoop.io.compress.CompressionCodec, или одним из коротких имен без учета регистра (bzip2, gzip, lz4 и snappy). По умолчанию сжатие не выполняется.

Поддерживает использование сокращенных имен; вместо xml можно использовать com.databricks.spark.xml.

Поддержка XSD

С помощью rowValidationXSDPath можно проверить отдельные строки в схеме XSD.

Используйте служебную программу com.databricks.spark.xml.util.XSDToSchema для извлечения схемы DataFrame Spark из определенных XSD-файлов. Она поддерживает только простые, сложные и последовательные типы, только основные функции XSD и является экспериментальной.

import com.databricks.spark.xml.util.XSDToSchema
import java.nio.file.Paths

val schema = XSDToSchema.read(Paths.get("/path/to/your.xsd"))
val df = spark.read.schema(schema)....xml(...)

Анализ вложенных XML-файлов

Несмотря на то, что метод from_xml в основном используется для преобразования XML-файла в DataFrame, его можно также использовать для анализа XML-содержимого в столбце с строковыми значениями в существующем DataFrame и его добавления в качестве нового столбца с проанализированными результатами в виде структуры с помощью:

import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._

val df = ... /// DataFrame with XML in column 'payload'
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))

Примечание.

  • mode:
    • Если задано значение PERMISSIVE (параметр по умолчанию), вместо режима анализа используется режим по умолчанию — DROPMALFORMED. Если включить в схему для from_xml столбец, соответствующий columnNameOfCorruptRecord, то в режиме PERMISSIVE в этот столбец в результирующей структуре будут выводится неправильные записи.
    • Если задано значение DROPMALFORMED, XML-значения, которые были неправильно проанализированы, будут отражены в столбце в виде null. Строки не удаляются.
  • from_xml преобразует массивы строк, содержащих XML, в массивы проанализированных структур. Вместо этого используйте schema_of_xml_array.
  • from_xml_string является альтернативой для использования в UDF, которая предназначена для работы непосредственно со строкой, а не столбцом.

Правила преобразования

Из-за структурных различий между DataFrame и XML необходимо соблюдать некоторые правила преобразования данных из XML в DataFrame и из DataFrame в XML. Можно отключить обработку атрибутов с помощью параметра excludeAttribute.

Преобразование XML в DataFrame

  • Атрибуты: атрибуты преобразуются как поля с префиксом, указанным в параметре attributePrefix. Если attributePrefix имеет значение _, документ

    <one myOneAttrib="AAAA">
        <two>two</two>
        <three>three</three>
    </one>
    

    создает схему:

    root
    |-- _myOneAttrib: string (nullable = true)
    |-- two: string (nullable = true)
    |-- three: string (nullable = true)
    
  • Если у элемента есть атрибуты, но нет дочерних элементов, значение атрибута помещается в отдельное поле, указанное в параметре valueTag. Если valueTag имеет значение _VALUE, документ

    <one>
        <two myTwoAttrib="BBBBB">two</two>
        <three>three</three>
    </one>
    

    создает схему:

    root
    |-- two: struct (nullable = true)
    |    |-- _VALUE: string (nullable = true)
    |    |-- _myTwoAttrib: string (nullable = true)
    |-- three: string (nullable = true)
    

Преобразование DataFrame в XML

Написание XML-файла из DataFrame ArrayType с полем с его элементом, как ArrayType и дополнительное вложенное поле для элемента. Это не произойдет при чтении и записи XML-данных, но при записи кадра данных из других источников. Таким образом, цикл чтения и записи XML-файлов имеет одинаковую структуру, но запись данных DataFrame, считанных из других источников, может иметь другую структуру.

DataFrame со схемой:

 |-- a: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

и данными:

+------------------------------------+
|                                   a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

создает XML-файл:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>