Чтение и запись XML-файлов
Внимание
Эта функция предоставляется в режиме общедоступной предварительной версии.
В этой статье описывается, как считывать и записывать XML-файлы.
Расширяемый язык разметки (XML) — это язык разметки для форматирования, хранения и совместного использования данных в текстовом формате. Он определяет набор правил сериализации данных, начиная от документов до произвольных структур данных.
Поддержка собственного формата XML-файла позволяет получать, запрашивать и анализировать XML-данные для пакетной обработки или потоковой передачи. Он может автоматически выводить и развивать схемы и типы данных, поддерживать такие выражения SQL, как from_xml
и создавать XML-документы. Он не требует внешних jar-модулей и работает без проблем с автозагрузчиком read_files
и COPY INTO
. При необходимости можно проверить каждую xml-запись уровня строки в определении схемы XML (XSD).
Требования
Databricks Runtime 14.3 и более поздних версий
Анализ XML-записей
Спецификация XML требует хорошо сформированной структуры. Однако эта спецификация не сразу сопоставляет табличный формат. Необходимо указать rowTag
параметр, указывающий XML-элемент, который сопоставляется с элементом DataFrame
Row
. Элемент rowTag
становится верхним уровнем struct
. Дочерние элементы rowTag
становятся полями верхнего уровня struct
.
Можно указать схему для этой записи или позволить ей автоматически выводить. Так как средство синтаксического анализа проверяет rowTag
только элементы, DTD и внешние сущности отфильтровываются.
В следующих примерах показано вывод схемы и анализ XML-файла с помощью различных rowTag
параметров:
Python
xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)
Scala
val xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)
Считывайте XML-файл с rowTag
параметром "книги":
Python
df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
Scala
val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)
Выходные данные:
root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)
+------------------------------------------------------------------------------+
|book |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+
Считывайте XML-файл с rowTag
"книгой":
Python
df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:
Scala
val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:
Выходные данные:
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)
+-----+-----------+---------------+
|_id |author |title |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+
Параметры источника данных
Параметры источника данных для XML можно указать следующим образом:
- Методы
.option/.options
следующих методов:- DataFrameReader
- DataFrameWriter
- DataStreamReader
- DataStreamWriter
- Следующие встроенные функции:
- Предложение
OPTIONS
CREATE TABLE USING DATA_SOURCE
Список параметров см. в разделе "Параметры автозагрузчика".
Поддержка XSD
При необходимости можно проверить каждую xml-запись уровня строки с помощью определения схемы XML (XSD). XSD-файл указывается в параметре rowValidationXSDPath
. В противном случае XSD не влияет на указанную или логически выведенную схему. Запись, которая завершается сбоем проверки, помечается как поврежденная и обрабатывается на основе параметра режима обработки поврежденных записей, описанного в разделе параметра.
Можно использовать XSDToSchema
для извлечения схемы Кадра данных Spark из XSD-файла. Он поддерживает только простые, сложные и последовательные типы и поддерживает только основные функции XSD.
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path
val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="book">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="title" type="xs:string" />
<xs:element name="genre" type="xs:string" />
<xs:element name="price" type="xs:decimal" />
<xs:element name="publish_date" type="xs:date" />
<xs:element name="description" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""
dbutils.fs.put(xsdPath, xsdString, true)
val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))
В следующей таблице показано преобразование типов данных XSD в типы данных Spark:
Типы данных XSD | Типы данных Spark |
---|---|
boolean |
BooleanType |
decimal |
DecimalType |
unsignedLong |
DecimalType(38, 0) |
double |
DoubleType |
float |
FloatType |
byte |
ByteType |
short , unsignedByte |
ShortType |
integer , , negativeInteger nonPositiveInteger nonNegativeInteger positiveInteger ,unsignedShort |
IntegerType |
long , unsignedInt |
LongType |
date |
DateType |
dateTime |
TimestampType |
Others |
StringType |
Анализ вложенных XML-файлов
XML-данные в столбце с строковым значением в существующем кадре данных можно анализировать и schema_of_xml
from_xml
возвращать схему и проанализированные результаты в виде новых struct
столбцов. XML-данные, передаваемые в качестве аргумента schema_of_xml
и from_xml
должны быть одной хорошо сформированной XML-записью.
schema_of_xml
Синтаксис
schema_of_xml(xmlStr [, options] )
Аргументы
xmlStr
: выражение STRING, указывающее одну хорошо сформированную XML-запись.options
: необязательныйMAP<STRING,STRING>
литерал, указывающий директивы.
Возвраты
Строка, содержащая определение структуры с n полями строк, в которых имена столбцов являются производными от XML-элемента и имен атрибутов. Значения полей содержат производные отформатированные типы SQL.
from_xml
Синтаксис
from_xml(xmlStr, schema [, options])
Аргументы
xmlStr
: выражение STRING, указывающее одну хорошо сформированную XML-запись.schema
: выражение STRING или вызовschema_of_xml
функции.options
: необязательныйMAP<STRING,STRING>
литерал, указывающий директивы.
Возвраты
Структура с типами и именами полей, соответствующими определению схемы. Схема должна быть определена как имя столбца с разделим запятыми и пары типов данных, как это используется, например CREATE TABLE
. Большинство вариантов, отображаемых в параметрах источника данных, применимы к следующим исключениям:
rowTag
: Так как существует только одна XML-запись,rowTag
параметр неприменимо.mode
(по умолчанию:PERMISSIVE
: разрешает режим для работы с поврежденными записями во время синтаксического анализа.PERMISSIVE
: когда она соответствует поврежденной записи, помещает недоформированную строку в поле, настроенноеcolumnNameOfCorruptRecord
и задает неправильно сформированные поляnull
. Чтобы сохранить поврежденные записи, можно задать поле типа строки с именемcolumnNameOfCorruptRecord
в определяемой пользователем схеме. Если в схеме нет этого поля, то поврежденные записи удаляются во время синтаксического анализа. При выводе схемы полеcolumnNameOfCorruptRecord
неявным образом добавляется в выходную схему.FAILFAST
: создает исключение при выполнении поврежденных записей.
Преобразование структуры
Из-за различий структуры между кадрами данных и XML существуют некоторые правила преобразования из XML-данных в DataFrame
XML-данные и из DataFrame
них. Обратите внимание, что обработка атрибутов может быть отключена с помощью параметра excludeAttribute
.
Преобразование из XML в кадр данных
Атрибуты: атрибуты преобразуются в виде полей с префиксом attributePrefix
заголовка.
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>
создает схему ниже:
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)
Символьные данные в элементе, содержащего атрибуты или дочерние элементы: они анализируются в valueTag
поле. Если имеется несколько вхождений символьных данных, valueTag
поле преобразуется в array
тип.
<one>
<two myTwoAttrib="BBBBB">two</two>
some value between elements
<three>three</three>
some other value between elements
</one>
создает схему ниже:
root
|-- _VALUE: array (nullable = true)
| |-- element: string (containsNull = true)
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
Преобразование из кадра данных в XML
Элемент в виде массива: написание XML-файла из DataFrame
поля ArrayType
с его элементом, как ArrayType
и дополнительное вложенное поле для элемента. Это не произойдет при чтении и написании XML-данных, но записи DataFrame
чтения из других источников. Таким образом, циклический переход в чтение и запись XML-файлов имеет ту же структуру, но запись DataFrame
чтения из других источников может иметь другую структуру.
Кадр данных со схемой ниже:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
и с данными ниже:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
создает XML-файл ниже:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>
Имя элемента неименованного массива в указанном DataFrame
параметре arrayElementName
(по умолчанию: item
).
Столбец с восстановленными данными
Столбец с восстановленными данными гарантирует, что данные не будут потеряны или пропущены во время извлечения, преобразования и загрузки. Вы можете включить спасаемый столбец данных для записи любых данных, которые не были проанализированы, так как в одной или нескольких полях записи возникает одна из следующих проблем:
- Отсутствует из предоставленной схемы
- Не соответствует типу данных предоставленной схемы
- Несоответствие регистра с именами полей в предоставленной схеме
Спасательный столбец данных возвращается в виде документа JSON, содержащего столбцы, которые были спасены, и путь к исходному файлу записи. Чтобы удалить путь к исходному файлу из спасаемого столбца данных, можно задать следующую конфигурацию SQL:
Python
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
Scala
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").
Вы можете активировать столбец восстановленных данных, задав в качестве значения параметра rescuedDataColumn
имя столбца при считывании данных, например _rescued_data
в spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>)
.
Средство синтаксического анализа XML поддерживает три режима при анализе записей: PERMISSIVE
, DROPMALFORMED
и FAILFAST
. При использовании вместе с rescuedDataColumn
несоответствие типов данных не приводит к удалению записей в режиме DROPMALFORMED
или возникновению ошибки в режиме FAILFAST
. Удаляются или возникают ошибки только поврежденных записей (неполные или неправильные XML-файлы).
Вывод и изменение схемы в Auto Loader
Подробные сведения об этом разделе и применимых параметрах см. в разделе "Настройка вывода схемы и эволюции" в автозагрузчике. Вы можете настроить автозагрузчик для автоматического обнаружения схемы загруженных XML-данных, позволяя инициализировать таблицы без явного объявления схемы данных и развивать схему таблицы в виде новых столбцов. Это устраняет необходимость вручную отслеживать и применять изменения схемы с течением времени.
По умолчанию вывод схемы автозагрузчика стремится избежать проблем с эволюцией схемы из-за несоответствия типов. Для форматов, которые не кодируют типы данных (JSON, CSV и XML), автозагрузчик выводит все столбцы в виде строк, включая вложенные поля в XML-файлах. Apache Spark DataFrameReader
использует другое поведение для вывода схемы, выбирая типы данных для столбцов в XML-источниках на основе примеров данных. Чтобы включить это поведение с помощью автозагрузчика, задайте для параметра значение cloudFiles.inferColumnTypes
true
.
Автозагрузчик обнаруживает добавление новых столбцов при обработке данных. Когда автозагрузчик обнаруживает новый столбец, поток останавливается с UnknownFieldException
помощью . Прежде чем поток выдает эту ошибку, автозагрузчик выполняет вывод схемы в последней микропакете данных и обновляет расположение схемы с последней схемой, объединяя новые столбцы в конец схемы. Типы данных существующих столбцов останутся неизменными. Автозагрузчик поддерживает различные режимы эволюции схемы, заданные в параметре cloudFiles.schemaEvolutionMode
.
Вы можете использовать подсказки схемы для принудительного применения сведений о схеме, которые вы знаете и ожидаете в выводимых схемах. Если вы знаете, что столбец имеет определенный тип данных или вы хотите выбрать более общий тип данных (например, двойной вместо целого числа), можно указать произвольное количество подсказок для типов данных столбцов в виде строки с помощью синтаксиса спецификации схемы SQL. При включении спасаемого столбца данных поля, именованные в случае, отличном от схемы, загружаются в _rescued_data
столбец. Это поведение можно изменить, задав параметр readerCaseSensitive
false
, в котором автозагрузчик считывает данные без учета регистра.
Примеры
В примерах этого раздела используется XML-файл, доступный для скачивания в репозитории Apache Spark GitHub.
Чтение и запись XML
Python
df = (spark.read
.format('xml')
.options(rowTag='book')
.load(xmlPath)) # books.xml
selected_data = df.select("author", "_id")
(selected_data.write
.options(rowTag='book', rootTag='books')
.xml('newbooks.xml'))
Scala
val df = spark.read
.option("rowTag", "book")
.xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("newbooks.xml")
R
df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
Можно вручную указать схему при чтении данных:
Python
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
custom_schema = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("description", StringType(), True),
StructField("genre", StringType(), True),
StructField("price", DoubleType(), True),
StructField("publish_date", StringType(), True),
StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)
selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')
Scala
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")
R
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
API SQL
Источник данных XML может выводить типы данных:
DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;
Можно также указать имена и типы столбцов в DDL. В этом случае схема не выводится автоматически.
DROP TABLE IF EXISTS books;
CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");
Загрузка XML с помощью COPY INTO
DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;
COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');
Чтение XML с проверкой строк
Python
df = (spark.read
.format("xml")
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.load(inputPath))
df.printSchema()
Scala
val df = spark.read
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.xml(inputPath)
df.printSchema
Синтаксический анализ вложенных XML (from_xml и schema_of_xml)
Python
from pyspark.sql.functions import from_xml, schema_of_xml, lit, col
xml_data = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>
"""
df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
Scala
import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}
val xmlData = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>""".stripMargin
val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()
from_xml и schema_of_xml с ПОМОЩЬЮ API SQL
SELECT from_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>',
schema_of_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>')
);
Загрузка XML с помощью автозагрузчика
Python
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(availableNow=True)
.toTable("table_name")
)
Scala
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(Trigger.AvailableNow()
.toTable("table_name")
)