Чтение и запись XML-файлов

Внимание

Эта функция предоставляется в режиме общедоступной предварительной версии.

В этой статье описывается, как считывать и записывать XML-файлы.

Расширяемый язык разметки (XML) — это язык разметки для форматирования, хранения и совместного использования данных в текстовом формате. Он определяет набор правил сериализации данных, начиная от документов до произвольных структур данных.

Поддержка собственного формата XML-файла позволяет получать, запрашивать и анализировать XML-данные для пакетной обработки или потоковой передачи. Он может автоматически выводить и развивать схемы и типы данных, поддерживать такие выражения SQL, как from_xmlи создавать XML-документы. Он не требует внешних jar-модулей и работает без проблем с автозагрузчиком read_files и COPY INTO.

Требования

Databricks Runtime 14.3 и более поздних версий

Анализ XML-записей

Спецификация XML требует хорошо сформированной структуры. Однако эта спецификация не сразу сопоставляет табличный формат. Необходимо указать rowTag параметр, указывающий XML-элемент, который сопоставляется с элементом DataFrameRow. Элемент rowTag становится верхним уровнем struct. Дочерние элементы rowTag становятся полями верхнего уровня struct.

Можно указать схему для этой записи или позволить ей автоматически выводить. Так как средство синтаксического анализа проверяет rowTag только элементы, DTD и внешние сущности отфильтровываются.

В следующих примерах показано вывод схемы и анализ XML-файла с помощью различных rowTag параметров:

Python

xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""

xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)

Scala

val xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)

Считывайте XML-файл с rowTag параметром "книги":

Python

df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)

Scala

val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)

Выходные данные:

root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)

+------------------------------------------------------------------------------+
|book                                                                          |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+

Считывайте XML-файл с rowTag "книгой":

Python

df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:

Scala

val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:

Выходные данные:

root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)

+-----+-----------+---------------+
|_id  |author     |title          |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+

Параметры источника данных

Параметры источника данных для XML можно указать следующим образом:

Список параметров см. в разделе "Параметры автозагрузчика".

Поддержка XSD

При необходимости можно проверить каждую xml-запись уровня строки с помощью определения схемы XML (XSD). XSD-файл указывается в параметре rowValidationXSDPath . В противном случае XSD не влияет на указанную или логически выведенную схему. Запись, которая завершается сбоем проверки, помечается как поврежденная и обрабатывается на основе параметра режима обработки поврежденных записей, описанного в разделе параметра.

Можно использовать XSDToSchema для извлечения схемы Кадра данных Spark из XSD-файла. Он поддерживает только простые, сложные и последовательные типы и поддерживает только основные функции XSD.

import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path

val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
  <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="book">
      <xs:complexType>
        <xs:sequence>
          <xs:element name="author" type="xs:string" />
          <xs:element name="title" type="xs:string" />
          <xs:element name="genre" type="xs:string" />
          <xs:element name="price" type="xs:decimal" />
          <xs:element name="publish_date" type="xs:date" />
          <xs:element name="description" type="xs:string" />
        </xs:sequence>
        <xs:attribute name="id" type="xs:string" use="required" />
      </xs:complexType>
    </xs:element>
  </xs:schema>"""

dbutils.fs.put(xsdPath, xsdString, true)

val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))

В следующей таблице показано преобразование типов данных XSD в типы данных Spark:

Типы данных XSD Типы данных Spark
boolean BooleanType
decimal DecimalType
unsignedLong DecimalType(38, 0)
double DoubleType
float FloatType
byte ByteType
short, unsignedByte ShortType
integer, , negativeIntegernonPositiveIntegernonNegativeIntegerpositiveInteger,unsignedShort IntegerType
long, unsignedInt LongType
date DateType
dateTime TimestampType
Others StringType

Анализ вложенных XML-файлов

XML-данные в столбце с строковым значением в существующем кадре данных можно анализировать и schema_of_xmlfrom_xml возвращать схему и проанализированные результаты в виде новых struct столбцов. XML-данные, передаваемые в качестве аргумента schema_of_xml и from_xml должны быть одной хорошо сформированной XML-записью.

schema_of_xml

Синтаксис

schema_of_xml(xmlStr [, options] )

Аргументы

  • xmlStr: выражение STRING, указывающее одну хорошо сформированную XML-запись.
  • options: необязательный MAP<STRING,STRING> литерал, указывающий директивы.

Возвраты

Строка, содержащая определение структуры с n полями строк, в которых имена столбцов являются производными от XML-элемента и имен атрибутов. Значения полей содержат производные отформатированные типы SQL.

from_xml

Синтаксис

from_xml(xmlStr, schema [, options])

Аргументы

  • xmlStr: выражение STRING, указывающее одну хорошо сформированную XML-запись.
  • schema: выражение STRING или вызов schema_of_xml функции.
  • options: необязательный MAP<STRING,STRING> литерал, указывающий директивы.

Возвраты

Структура с типами и именами полей, соответствующими определению схемы. Схема должна быть определена как имя столбца с разделим запятыми и пары типов данных, как это используется, например CREATE TABLE. Большинство вариантов, отображаемых в параметрах источника данных, применимы к следующим исключениям:

  • rowTag: Так как существует только одна XML-запись, rowTag параметр неприменимо.
  • mode (по умолчанию: PERMISSIVE: разрешает режим для работы с поврежденными записями во время синтаксического анализа.
    • PERMISSIVE: когда она соответствует поврежденной записи, помещает недоформированную строку в поле, настроенное columnNameOfCorruptRecordи задает неправильно сформированные поля null. Чтобы сохранить поврежденные записи, можно задать поле типа строки с именем columnNameOfCorruptRecord в определяемой пользователем схеме. Если в схеме нет этого поля, то поврежденные записи удаляются во время синтаксического анализа. При выводе схемы поле columnNameOfCorruptRecord неявным образом добавляется в выходную схему.
    • FAILFAST: создает исключение при выполнении поврежденных записей.

Преобразование структуры

Из-за различий структуры между кадрами данных и XML существуют некоторые правила преобразования из XML-данных в DataFrame XML-данные и из DataFrame них. Обратите внимание, что обработка атрибутов может быть отключена с помощью параметра excludeAttribute.

Преобразование из XML в кадр данных

Атрибуты: атрибуты преобразуются в виде полей с префиксом attributePrefixзаголовка.

<one myOneAttrib="AAAA">
  <two>two</two>
  <three>three</three>
</one>

создает схему ниже:

root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)

Символьные данные в элементе, содержащего атрибуты или дочерние элементы: они анализируются в valueTag поле. Если имеется несколько вхождений символьных данных, valueTag поле преобразуется в array тип.

<one>
  <two myTwoAttrib="BBBBB">two</two>
  some value between elements
  <three>three</three>
  some other value between elements
</one>

создает схему ниже:

root
 |-- _VALUE: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- two: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)

Преобразование из кадра данных в XML

Элемент в виде массива: написание XML-файла из DataFrame поля ArrayType с его элементом, как ArrayType и дополнительное вложенное поле для элемента. Это не произойдет при чтении и написании XML-данных, но записи DataFrame чтения из других источников. Таким образом, циклический переход в чтение и запись XML-файлов имеет ту же структуру, но запись DataFrame чтения из других источников может иметь другую структуру.

Кадр данных со схемой ниже:

|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)

и с данными ниже:

+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

создает XML-файл ниже:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>

Имя элемента неименованного массива в указанном DataFrame параметре arrayElementName (по умолчанию: item).

Столбец с восстановленными данными

Столбец с восстановленными данными гарантирует, что данные не будут потеряны или пропущены во время извлечения, преобразования и загрузки. Вы можете включить спасаемый столбец данных для записи любых данных, которые не были проанализированы, так как в одной или нескольких полях записи возникает одна из следующих проблем:

  • Отсутствует из предоставленной схемы
  • Не соответствует типу данных предоставленной схемы
  • Несоответствие регистра с именами полей в предоставленной схеме

Спасательный столбец данных возвращается в виде документа JSON, содержащего столбцы, которые были спасены, и путь к исходному файлу записи. Чтобы удалить путь к исходному файлу из спасаемого столбца данных, можно задать следующую конфигурацию SQL:

Python

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

Scala

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").

Вы можете активировать столбец восстановленных данных, задав в качестве значения параметра rescuedDataColumn имя столбца при считывании данных, например _rescued_data в spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>).

Средство синтаксического анализа XML поддерживает три режима при анализе записей: PERMISSIVE, DROPMALFORMEDи FAILFAST. При использовании вместе с rescuedDataColumn несоответствие типов данных не приводит к удалению записей в режиме DROPMALFORMED или возникновению ошибки в режиме FAILFAST. Удаляются или возникают ошибки только поврежденных записей (неполные или неправильные XML-файлы).

Вывод и изменение схемы в Auto Loader

Подробные сведения об этом разделе и применимых параметрах см. в разделе "Настройка вывода схемы и эволюции" в автозагрузчике. Вы можете настроить автозагрузчик для автоматического обнаружения схемы загруженных XML-данных, позволяя инициализировать таблицы без явного объявления схемы данных и развивать схему таблицы в виде новых столбцов. Это устраняет необходимость вручную отслеживать и применять изменения схемы с течением времени.

По умолчанию вывод схемы автозагрузчика стремится избежать проблем с эволюцией схемы из-за несоответствия типов. Для форматов, которые не кодируют типы данных (JSON, CSV и XML), автозагрузчик выводит все столбцы в виде строк, включая вложенные поля в XML-файлах. Apache Spark DataFrameReader использует другое поведение для вывода схемы, выбирая типы данных для столбцов в XML-источниках на основе примеров данных. Чтобы включить это поведение с помощью автозагрузчика, задайте для параметра значение cloudFiles.inferColumnTypestrue.

Автозагрузчик обнаруживает добавление новых столбцов при обработке данных. Когда автозагрузчик обнаруживает новый столбец, поток останавливается с UnknownFieldExceptionпомощью . Прежде чем поток выдает эту ошибку, автозагрузчик выполняет вывод схемы в последней микропакете данных и обновляет расположение схемы с последней схемой, объединяя новые столбцы в конец схемы. Типы данных существующих столбцов останутся неизменными. Автозагрузчик поддерживает различные режимы эволюции схемы, заданные в параметре cloudFiles.schemaEvolutionMode.

Вы можете использовать подсказки схемы для принудительного применения сведений о схеме, которые вы знаете и ожидаете в выводимых схемах. Если вы знаете, что столбец имеет определенный тип данных или вы хотите выбрать более общий тип данных (например, двойной вместо целого числа), можно указать произвольное количество подсказок для типов данных столбцов в виде строки с помощью синтаксиса спецификации схемы SQL. При включении спасаемого столбца данных поля, именованные в случае, отличном от схемы, загружаются в _rescued_data столбец. Это поведение можно изменить, задав параметр readerCaseSensitivefalse, в котором автозагрузчик считывает данные без учета регистра.

Примеры

В примерах этого раздела используется XML-файл, доступный для скачивания в репозитории Apache Spark GitHub.

Чтение и запись XML

Python

df = (spark.read
  .format('xml')
  .options(rowTag='book')
  .load(xmlPath))  # books.xml

selected_data = df.select("author", "_id")
(selected_data.write
  .options(rowTag='book', rootTag='books')
  .xml('newbooks.xml'))

Scala

val df = spark.read
  .option("rowTag", "book")
  .xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("newbooks.xml")

R

df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

Можно вручную указать схему при чтении данных:

Python

from pyspark.sql.types import StructType, StructField, StringType, DoubleType

custom_schema = StructType([
    StructField("_id", StringType(), True),
    StructField("author", StringType(), True),
    StructField("description", StringType(), True),
    StructField("genre", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("publish_date", StringType(), True),
    StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)

selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')

Scala

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")

R

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

API SQL

Источник данных XML может выводить типы данных:

DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;

Можно также указать имена и типы столбцов в DDL. В этом случае схема не выводится автоматически.

DROP TABLE IF EXISTS books;

CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");

Загрузка XML с помощью COPY INTO

DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;

COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');

Чтение XML с проверкой строк

Python

df = (spark.read
    .format("xml")
    .option("rowTag", "book")
    .option("rowValidationXSDPath", xsdPath)
    .load(inputPath))
df.printSchema()

Scala

val df = spark.read
  .option("rowTag", "book")
  .option("rowValidationXSDPath", xsdPath)
  .xml(inputPath)
df.printSchema

Синтаксический анализ вложенных XML (from_xml и schema_of_xml)

Python

from pyspark.sql.functions import from_xml, schema_of_xml, lit, col

xml_data = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>
"""

df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()

Scala

import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}

val xmlData = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>""".stripMargin

val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()

from_xml и schema_of_xml с ПОМОЩЬЮ API SQL

SELECT from_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>',
  schema_of_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>')
);

Загрузка XML с помощью автозагрузчика

Python

query = (spark
  .readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "xml")
    .option("rowTag", "book")
    .option("cloudFiles.inferColumnTypes", True)
    .option("cloudFiles.schemaLocation", schemaPath)
    .option("cloudFiles.schemaEvolutionMode", "rescue")
    .load(inputPath)
    .writeStream
    .format("delta")
    .option("mergeSchema", "true")
    .option("checkpointLocation", checkPointPath)
    .trigger(Trigger.AvailableNow()))

query = query.start(outputPath).awaitTermination()
df = spark.read.format("delta").load(outputPath)
df.show()

Scala

val query = spark
.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", true)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .format("delta")
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(Trigger.AvailableNow())

query.start(outputPath).awaitTermination()
val df = spark.read.format("delta").load(outputPath)
df.show()

Дополнительные ресурсы

Чтение и запись XML-данных с помощью библиотеки spark-xml