Čtení a zápis souborů XML

Důležité

Tato funkce je ve verzi Public Preview.

Tento článek popisuje, jak číst a zapisovat soubory XML.

Jazyk XML (Extensible Markup Language) je jazyk značek pro formátování, ukládání a sdílení dat v textovém formátu. Definuje sadu pravidel pro serializaci dat od dokumentů po libovolné datové struktury.

Nativní podpora formátu souboru XML umožňuje příjem dat, dotazování a analýzu dat XML pro dávkové zpracování nebo streamování. Může automaticky odvodit a vyvíjet schémata a datové typy, podporuje výrazy SQL jako from_xmla může generovat dokumenty XML. Nevyžaduje externí soubory JAR a bezproblémově funguje s automatickým zavaděčem read_files a COPY INTO.

Požadavky

Databricks Runtime 14.3 a novější

Analýza záznamů XML

Specifikace XML vyžaduje dobře formátovanou strukturu. Tato specifikace se ale okamžitě nemapuje na tabulkový formát. Je nutné zadat rowTag možnost označit XML element, který mapuje na DataFrameRow. Prvek rowTag se stane nejvyšší úrovní struct. Podřízené prvky rowTag se stanou poli nejvyšší úrovně struct.

Můžete zadat schéma pro tento záznam nebo ho nechat automaticky odvodit. Vzhledem k tomu, že analyzátor zkoumá rowTag pouze prvky, odfiltrují se DTD a externí entity.

Následující příklady ilustrují odvozování schématu a parsování souboru XML pomocí různých rowTag možností:

Python

xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""

xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)

Scala

val xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)

Přečtěte si soubor XML s rowTag možností "books":

Python

df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)

Scala

val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)

Výstup:

root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)

+------------------------------------------------------------------------------+
|book                                                                          |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+

Soubor XML rowTag si můžete přečíst jako "knihu":

Python

df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:

Scala

val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:

Výstup:

root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)

+-----+-----------+---------------+
|_id  |author     |title          |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+

Možnosti zdroje dat

Možnosti zdroje dat pro XML lze zadat následujícími způsoby:

Seznam možností najdete v tématu Možnosti automatického zavaděče.

Podpora XSD

Volitelně můžete ověřit každý záznam XML na úrovni řádků definicí schématu XML (XSD). V možnosti je zadaný rowValidationXSDPath soubor XSD. XSD jinak nemá vliv na zadané schéma nebo odvozené. Záznam, který selže, je označený jako poškozený a zpracován na základě možnosti režimu zpracování poškozených záznamů popsaných v části možnosti.

Můžete použít XSDToSchema k extrakci schématu datového rámce Spark ze souboru XSD. Podporuje pouze jednoduché, složité a sekvenční typy a podporuje pouze základní funkce XSD.

import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path

val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
  <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="book">
      <xs:complexType>
        <xs:sequence>
          <xs:element name="author" type="xs:string" />
          <xs:element name="title" type="xs:string" />
          <xs:element name="genre" type="xs:string" />
          <xs:element name="price" type="xs:decimal" />
          <xs:element name="publish_date" type="xs:date" />
          <xs:element name="description" type="xs:string" />
        </xs:sequence>
        <xs:attribute name="id" type="xs:string" use="required" />
      </xs:complexType>
    </xs:element>
  </xs:schema>"""

dbutils.fs.put(xsdPath, xsdString, true)

val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))

Následující tabulka ukazuje převod datových typů XSD na datové typy Spark:

Datové typy XSD Datové typy Sparku
boolean BooleanType
decimal DecimalType
unsignedLong DecimalType(38, 0)
double DoubleType
float FloatType
byte ByteType
short, unsignedByte ShortType
integer, negativeInteger, nonNegativeInteger, nonPositiveInteger, , positiveIntegerunsignedShort IntegerType
long, unsignedInt LongType
date DateType
dateTime TimestampType
Others StringType

Parsování vnořeného XML

Data XML ve sloupci s řetězcovou hodnotou v existujícím datovém rámci je možné analyzovat schema_of_xml a from_xml která vrací schéma a analyzované výsledky jako nové struct sloupce. Data XML předaná jako argument schema_of_xml a from_xml musí se jednat o jeden správně formátovaný záznam XML.

schema_of_xml

Syntaxe

schema_of_xml(xmlStr [, options] )

Argumenty

  • xmlStr: Výraz STRING určující jeden správně formátovaný záznam XML.
  • options: Volitelný MAP<STRING,STRING> literál určující direktivy.

Vrácení

ŘETĚZEC obsahující definici struktury s n poli řetězců, ve kterých jsou názvy sloupců odvozeny od elementu XML a názvů atributů. Hodnoty polí obsahují odvozené formátované typy SQL.

from_xml

Syntaxe

from_xml(xmlStr, schema [, options])

Argumenty

  • xmlStr: Výraz STRING určující jeden správně formátovaný záznam XML.
  • schema: Výraz STRING nebo vyvolání schema_of_xml funkce.
  • options: Volitelný MAP<STRING,STRING> literál určující direktivy.

Vrácení

Struktura s názvy polí a typy odpovídající definici schématu. Schéma musí být definováno jako názvy sloupců oddělených čárkami a páry datových typů, které se používají například CREATE TABLE. Většina možností zobrazených v možnostech zdroje dat platí s následujícími výjimkami:

  • rowTag: Protože existuje pouze jeden záznam XML, rowTag možnost není použitelná.
  • mode (výchozí: PERMISSIVE): Umožňuje režim zpracování poškozených záznamů během analýzy.
    • PERMISSIVE: Když splňuje poškozený záznam, umístí poškozený řetězec do pole nakonfigurovaného columnNameOfCorruptRecordpomocí a nastaví poškozená pole na null. Chcete-li zachovat poškozené záznamy, můžete nastavit pole typu řetězce pojmenované columnNameOfCorruptRecord v uživatelsky definovaném schématu. Pokud schéma pole neobsahuje, během analýzy zahodí poškozené záznamy. Při odvození schématu implicitně přidá columnNameOfCorruptRecord pole ve výstupním schématu.
    • FAILFAST: Vyvolá výjimku, když splňuje poškozené záznamy.

Převod struktury

Vzhledem k rozdílům ve struktuře mezi datovým rámcem DataFrame a XML existují určitá pravidla převodu z dat XML do DataFrame a z DataFrame dat XML. Všimněte si, že zpracování atributů lze zakázat pomocí možnosti excludeAttribute.

Převod z XML na datový rámec

Atributy: Atributy jsou převedeny jako pole s předponou nadpisu attributePrefix.

<one myOneAttrib="AAAA">
  <two>two</two>
  <three>three</three>
</one>

vytvoří následující schéma:

root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)

Znaková data v elementu, který obsahuje atributy nebo podřízené elementy: Tyto prvky jsou analyzovány do valueTag pole. Pokud existuje více výskytů znakových dat, valueTag pole se převede na array typ.

<one>
  <two myTwoAttrib="BBBBB">two</two>
  some value between elements
  <three>three</three>
  some other value between elements
</one>

vytvoří následující schéma:

root
 |-- _VALUE: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- two: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)

Převod z datového rámce na XML

Element jako pole v matici: Zápis souboru XML z DataFrame pole ArrayType s jeho elementem, stejně jako ArrayType by měl další vnořené pole pro prvek. K tomu nedojde při čtení a zápisu dat XML, ale při zápisu DataFrame čtení z jiných zdrojů. Proto zaokrouhlování při čtení a zápisu souborů XML má stejnou strukturu, ale zápis DataFrame čtení z jiných zdrojů je možné mít jinou strukturu.

Datový rámec s následujícím schématem:

|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)

a s daty níže:

+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

vytvoří soubor XML níže:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>

Název prvku nepojmenovaného pole v poli DataFrame je určen možností arrayElementName (Výchozí: item).

Sloupec zachráněných dat

Sloupec zachráněných dat zajišťuje, že během ETL nikdy nepřijdete o data nebo je nezmeškáte. Můžete povolit, aby zachytávalo všechna data, která nebyla analyzována, protože jedno nebo více polí v záznamu má jeden z následujících problémů:

  • Chybí ze zadaného schématu.
  • Neodpovídá datovému typu zadaného schématu.
  • Neshoda velkých a velkých písmen s názvy polí v zadaném schématu

Uložený datový sloupec se vrátí jako dokument JSON obsahující sloupce, které byly uloženy, a cestu ke zdrojovému souboru záznamu. Chcete-li odebrat cestu ke zdrojovému souboru ze sloupce zachráněných dat, můžete nastavit následující konfiguraci SQL:

Python

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

Scala

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").

Záchranný datový sloupec můžete povolit nastavením možnosti rescuedDataColumn na název sloupce při čtení dat, například _rescued_data pomocí spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>).

Analyzátor XML podporuje při analýze záznamů tři režimy: PERMISSIVE, DROPMALFORMEDa FAILFAST. Při použití společně s datovým rescuedDataColumntypem neshody nezpůsobí vyřazení záznamů v DROPMALFORMED režimu nebo vyvolání chyby v FAILFAST režimu. Zahodí se pouze poškozené záznamy (neúplné nebo poškozené XML) nebo vyvolá chyby.

Odvození schématu a vývoj v automatickém zavaděče

Podrobnou diskuzi o tomto tématu a příslušných možnostech najdete v tématu Konfigurace odvozování schématu a vývoje v auto loaderu. Automatický zavaděč můžete nakonfigurovat tak, aby automaticky rozpoznal schéma načtených dat XML, což umožňuje inicializovat tabulky bez explicitního deklarování schématu dat a vyvíjet schéma tabulky při zavádění nových sloupců. To eliminuje potřebu ručního sledování a použití změn schématu v průběhu času.

Ve výchozím nastavení se při odvozování schématu automatického zavaděče snaží vyhnout problémům s vývojem schématu kvůli neshodám typů. U formátů, které nekódují datové typy (JSON, CSV a XML), auto loader odvodí všechny sloupce jako řetězce, včetně vnořených polí v souborech XML. Apache Spark DataFrameReader používá jiné chování pro odvozování schématu a výběr datových typů pro sloupce ve zdrojích XML na základě ukázkových dat. Chcete-li toto chování povolit pomocí automatického zavaděče, nastavte možnost cloudFiles.inferColumnTypes na truehodnotu .

Auto Loader zjistí přidání nových sloupců při zpracování dat. Když Auto Loader zjistí nový sloupec, datový proud se zastaví pomocí .UnknownFieldException Než datový proud vyvolá tuto chybu, auto loader provede odvozování schématu v nejnovější mikrodávce dat a aktualizuje umístění schématu s nejnovějším schématem sloučením nových sloupců na konec schématu. Datové typy existujících sloupců zůstávají beze změny. Auto Loader podporuje různé režimy pro vývoj schématu, který jste nastavili v možnosti cloudFiles.schemaEvolutionMode.

Pomocí nápovědy schématu můžete vynutit informace o schématu, které znáte a očekáváte u odvozeného schématu. Pokud víte, že sloupec je konkrétní datový typ, nebo pokud chcete zvolit obecnější datový typ (například dvojité místo celého čísla), můžete zadat libovolný počet tipů pro datové typy sloupců jako řetězec pomocí syntaxe specifikace schématu SQL. Pokud je povolený sloupec s daty o záchraně, načtou se do _rescued_data sloupce pole pojmenovaná v jiném případě, než je schéma. Toto chování můžete změnit tak, že nastavíte možnost readerCaseSensitivefalsena možnost , v takovém případě Auto Loader čte data bez rozlišování malých a malých písmen.

Příklady

Příklady v této části používají soubor XML dostupný ke stažení v úložišti Apache Spark GitHub.

Čtení a zápis XML

Python

df = (spark.read
  .format('xml')
  .options(rowTag='book')
  .load(xmlPath))  # books.xml

selected_data = df.select("author", "_id")
(selected_data.write
  .options(rowTag='book', rootTag='books')
  .xml('newbooks.xml'))

Scala

val df = spark.read
  .option("rowTag", "book")
  .xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("newbooks.xml")

R

df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

Při čtení dat můžete schéma zadat ručně:

Python

from pyspark.sql.types import StructType, StructField, StringType, DoubleType

custom_schema = StructType([
    StructField("_id", StringType(), True),
    StructField("author", StringType(), True),
    StructField("description", StringType(), True),
    StructField("genre", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("publish_date", StringType(), True),
    StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)

selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')

Scala

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")

R

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

Rozhraní API SQL

Zdroj dat XML může odvodit datové typy:

DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;

V DDL můžete také zadat názvy a typy sloupců. V tomto případě není schéma odvozeno automaticky.

DROP TABLE IF EXISTS books;

CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");

Načtení XML pomocí FUNKCE COPY INTO

DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;

COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');

Čtení XML s ověřením řádků

Python

df = (spark.read
    .format("xml")
    .option("rowTag", "book")
    .option("rowValidationXSDPath", xsdPath)
    .load(inputPath))
df.printSchema()

Scala

val df = spark.read
  .option("rowTag", "book")
  .option("rowValidationXSDPath", xsdPath)
  .xml(inputPath)
df.printSchema

Analýza vnořeného XML (from_xml a schema_of_xml)

Python

from pyspark.sql.functions import from_xml, schema_of_xml, lit, col

xml_data = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>
"""

df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()

Scala

import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}

val xmlData = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>""".stripMargin

val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()

from_xml a schema_of_xml s využitím rozhraní SQL API

SELECT from_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>',
  schema_of_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>')
);

Načtení XML pomocí automatického zavaděče

Python

query = (spark
  .readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "xml")
    .option("rowTag", "book")
    .option("cloudFiles.inferColumnTypes", True)
    .option("cloudFiles.schemaLocation", schemaPath)
    .option("cloudFiles.schemaEvolutionMode", "rescue")
    .load(inputPath)
    .writeStream
    .format("delta")
    .option("mergeSchema", "true")
    .option("checkpointLocation", checkPointPath)
    .trigger(Trigger.AvailableNow()))

query = query.start(outputPath).awaitTermination()
df = spark.read.format("delta").load(outputPath)
df.show()

Scala

val query = spark
.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", true)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .format("delta")
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(Trigger.AvailableNow())

query.start(outputPath).awaitTermination()
val df = spark.read.format("delta").load(outputPath)
df.show()

Další materiály

Čtení a zápis dat XML pomocí knihovny spark-xml