Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Důležité
Tato funkce je ve verzi Public Preview.
Tento článek popisuje, jak číst a zapisovat soubory XML.
Jazyk XML (Extensible Markup Language) je jazyk značek pro formátování, ukládání a sdílení dat v textovém formátu. Definuje sadu pravidel pro serializaci dat od dokumentů po libovolné datové struktury.
Nativní podpora formátu souboru XML umožňuje příjem dat, dotazování a analýzu dat XML pro dávkové zpracování nebo streamování. Dokáže automaticky odvodit a vyvíjet schémata a datové typy, podporuje výrazy SQL, jako je from_xml, a může generovat dokumenty XML. Nevyžaduje externí soubory JAR a bezproblémově funguje s Auto Loader, read_files a COPY INTO. Volitelně můžete ověřit každý záznam XML na úrovni řádků proti definici schématu XML (XSD).
Požadavky
Databricks Runtime 14.3 a novější
Analýza záznamů XML
Specifikace XML vyžaduje dobře formátovanou strukturu. Tato specifikace se ale nehodí přímo do tabulkového formátu. Je nutné zadat rowTag možnost označit XML element, který mapuje na DataFrameRow. Prvek rowTag se stane nejvyšší úrovní struct. Podřízené prvky rowTag se stanou poli nejvyšší úrovně struct.
Můžete zadat schéma pro tento záznam nebo ho nechat automaticky odvodit. Vzhledem k tomu, že analyzátor zkoumá rowTag pouze prvky, odfiltrují se DTD a externí entity.
Následující příklady ilustrují odvozování schématu a parsování souboru XML pomocí různých rowTag možností:
Python
xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)
Scala
val xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)
Přečtěte si soubor XML s rowTag možností "books":
Python
df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
Scala
val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)
Výstup:
root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)
+------------------------------------------------------------------------------+
|book |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+
Soubor XML rowTag si můžete přečíst jako "knihu":
Python
df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:
Scala
val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:
Výstup:
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)
+-----+-----------+---------------+
|_id |author |title |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+
Možnosti zdroje dat
Možnosti zdroje dat pro XML lze zadat následujícími způsoby:
- Metody
.option/.optionsnásledujících:- DataFrameReader
- DataFrameWriter
- DataStreamReader
- DataStreamWriter
- Následující předdefinované funkce:
- Klauzule
OPTIONSCREATE TABLE POUŽITÍM DATA_SOURCE
Seznam možností najdete v tématu Možnosti automatického zavaděče.
Podpora XSD
Volitelně můžete ověřit každý záznam XML na úrovni řádků definicí schématu XML (XSD). V možnosti je zadaný rowValidationXSDPath soubor XSD. XSD jinak neovlivňuje poskytnuté nebo odvozené schéma. Záznam, který selže, je označený jako poškozený a zpracován na základě možnosti režimu zpracování poškozených záznamů popsaných v části možnosti.
Můžete použít XSDToSchema k extrakci schématu datového rámce Spark ze souboru XSD. Podporuje pouze jednoduché, složité a sekvenční typy a podporuje pouze základní funkce XSD.
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path
val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="book">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="title" type="xs:string" />
<xs:element name="genre" type="xs:string" />
<xs:element name="price" type="xs:decimal" />
<xs:element name="publish_date" type="xs:date" />
<xs:element name="description" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""
dbutils.fs.put(xsdPath, xsdString, true)
val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))
Následující tabulka ukazuje převod datových typů XSD na datové typy Spark:
| Datové typy XSD | Datové typy Sparku |
|---|---|
boolean |
BooleanType |
decimal |
DecimalType |
unsignedLong |
DecimalType(38, 0) |
double |
DoubleType |
float |
FloatType |
byte |
ByteType |
short, unsignedByte |
ShortType |
integer, negativeInteger, nonNegativeInteger, nonPositiveInteger, , positiveIntegerunsignedShort |
IntegerType |
long, unsignedInt |
LongType |
date |
DateType |
dateTime |
TimestampType |
Others |
StringType |
Parsování vnořeného XML
Data XML ve sloupci s řetězcovou hodnotou v existujícím datovém rámci je možné analyzovat schema_of_xml a from_xml která vrací schéma a analyzované výsledky jako nové struct sloupce. Data XML předaná jako argument schema_of_xml a from_xml musí se jednat o jeden správně formátovaný záznam XML.
schema_of_xml
Syntaxe
schema_of_xml(xmlStr [, options] )
Argumenty
-
xmlStr: Výraz STRING určující jeden správně formátovaný záznam XML. -
options: VolitelnýMAP<STRING,STRING>literál určující direktivy.
Vrácení
ŘETĚZEC obsahující definici struktury s n poli řetězců, ve kterých jsou názvy sloupců odvozeny od elementu XML a názvů atributů. Hodnoty polí obsahují odvozené formátované typy SQL.
from_xml
Syntaxe
from_xml(xmlStr, schema [, options])
Argumenty
-
xmlStr: Výraz STRING určující jeden správně formátovaný záznam XML. -
schema: Výraz STRING nebo vyvoláníschema_of_xmlfunkce. -
options: VolitelnýMAP<STRING,STRING>literál určující direktivy.
Vrácení
Struktura s názvy polí a typy odpovídající definici schématu. Schéma musí být definováno jako názvy sloupců oddělených čárkami a páry datových typů, které se používají například CREATE TABLE. Většina možností zobrazených v možnostech zdroje dat platí s následujícími výjimkami:
-
rowTag: Protože existuje pouze jeden záznam XML,rowTagmožnost není použitelná. -
mode(výchozí:PERMISSIVE): Umožňuje režim zpracování poškozených záznamů během analýzy.-
PERMISSIVE: Když splňuje poškozený záznam, umístí poškozený řetězec do pole nakonfigurovanéhocolumnNameOfCorruptRecordpomocí a nastaví poškozená pole nanull. Chcete-li zachovat poškozené záznamy, můžete nastavit pole typu řetězce pojmenovanécolumnNameOfCorruptRecordv uživatelsky definovaném schématu. Pokud schéma pole neobsahuje, během analýzy zahodí poškozené záznamy. Při odvození schématu implicitně přidá do výstupního schématu polecolumnNameOfCorruptRecord. -
FAILFAST: Vyvolá výjimku, když splňuje poškozené záznamy.
-
Převod struktury
Vzhledem k rozdílům ve struktuře mezi datovým rámcem DataFrame a XML existují určitá pravidla převodu z dat XML do DataFrame a z DataFrame dat XML. Všimněte si, že zpracování atributů lze zakázat pomocí možnosti excludeAttribute.
Převod z XML na datový rámec
Atributy: Atributy jsou převedeny jako pole s předponou nadpisu attributePrefix.
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>
vytvoří následující schéma:
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)
Znaková data v elementu, který obsahuje atributy nebo podřízené elementy: Tyto prvky jsou analyzovány do valueTag pole. Pokud existuje více výskytů znakových dat, valueTag pole se převede na array typ.
<one>
<two myTwoAttrib="BBBBB">two</two>
some value between elements
<three>three</three>
some other value between elements
</one>
vytvoří následující schéma:
root
|-- _VALUE: array (nullable = true)
| |-- element: string (containsNull = true)
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
Převod z datového rámce na XML
Element jako pole v matici: Zápis souboru XML z DataFrame pole ArrayType s jeho elementem, stejně jako ArrayType by měl další vnořené pole pro prvek. K tomu nedojde při čtení a zápisu dat XML, ale při zápisu DataFrame čtení z jiných zdrojů. Proto zaokrouhlování při čtení a zápisu souborů XML má stejnou strukturu, ale zápis DataFrame čtení z jiných zdrojů je možné mít jinou strukturu.
Datový rámec s následujícím schématem:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
a s daty níže:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
vytvoří soubor XML níže:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>
Název prvku nepojmenovaného pole v poli DataFrame je určen možností arrayElementName (Výchozí: item).
Sloupec zachráněných dat
Sloupec zachráněných dat zajišťuje, že během ETL nikdy nepřijdete o data nebo je nezmeškáte. Můžete povolit sloupec pro obnovená data, aby zachytil všechna data, která nebyla analyzována, protože jedno nebo více polí v záznamu má jeden z následujících problémů:
- Chybí ze zadaného schématu.
- Neodpovídá datovému typu zadaného schématu.
- Neshoda velkých a velkých písmen s názvy polí v zadaném schématu
Uložený datový sloupec se vrátí jako dokument JSON obsahující sloupce, které byly uloženy, a cestu ke zdrojovému souboru záznamu. Chcete-li odebrat cestu ke zdrojovému souboru ze sloupce zachráněných dat, můžete nastavit následující konfiguraci SQL:
Python
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
Scala
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").
Záchranný datový sloupec můžete povolit nastavením možnosti rescuedDataColumn na název sloupce při čtení dat, například _rescued_data s spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>).
Analyzátor XML podporuje při analýze záznamů tři režimy: PERMISSIVE, DROPMALFORMEDa FAILFAST. Při použití společně s datovým rescuedDataColumntypem neshody nezpůsobí vyřazení záznamů v DROPMALFORMED režimu nebo vyvolání chyby v FAILFAST režimu. Zahodí se pouze poškozené záznamy (neúplné nebo poškozené XML) nebo vyvolá chyby.
Odvození schématu a vývoj v automatickém zavaděče
Podrobnou diskuzi o tomto tématu a příslušných možnostech najdete v tématu Konfigurace odvozování schématu a vývoje v auto loaderu. Automatický zavaděč můžete nakonfigurovat tak, aby automaticky rozpoznal schéma načtených dat XML, což umožňuje inicializovat tabulky bez explicitního deklarování schématu dat a vyvíjet schéma tabulky při zavádění nových sloupců. To eliminuje potřebu ručního sledování a použití změn schématu v průběhu času.
Ve výchozím nastavení se při odvozování schématu automatického zavaděče snaží vyhnout problémům s vývojem schématu kvůli neshodám typů. U formátů, které nekódují datové typy (JSON, CSV a XML), auto loader odvodí všechny sloupce jako řetězce, včetně vnořených polí v souborech XML. Apache Spark DataFrameReader používá jiné chování pro odvozování schématu a výběr datových typů pro sloupce ve zdrojích XML na základě ukázkových dat. Chcete-li toto chování povolit pomocí Auto Loaderu, nastavte možnost cloudFiles.inferColumnTypes na true.
Auto Loader zjistí přidání nových sloupců při zpracování dat. Když Auto Loader zjistí nový sloupec, datový proud se zastaví s UnknownFieldException. Než váš datový proud vyvolá tuto chybu, Auto Loader provede odvozování schématu na nejnovější mikrodávce dat a aktualizuje lokaci schématu pomocí nejnovějšího schématu sloučením nových sloupců na konec schématu. Datové typy existujících sloupců zůstávají beze změny. Auto Loader podporuje různé režimy pro vývoj schématu, který jste nastavili v možnosti cloudFiles.schemaEvolutionMode.
Pomocí nápovědy schématu můžete vynutit informace o schématu, které znáte a očekáváte u odvozeného schématu. Pokud víte, že sloupec je konkrétní datový typ, nebo pokud chcete zvolit obecnější datový typ (například dvojité místo celého čísla), můžete zadat libovolný počet tipů pro datové typy sloupců jako řetězec pomocí syntaxe specifikace schématu SQL. Pokud je povolený sloupec s daty o záchraně, načtou se do _rescued_data sloupce pole pojmenovaná v jiném případě, než je schéma. Toto chování můžete změnit tak, že nastavíte možnost readerCaseSensitivefalsena možnost , v takovém případě Auto Loader čte data bez rozlišování malých a malých písmen.
Příklady
Příklady v této části používají soubor XML dostupný ke stažení v úložišti Apache Spark GitHub.
Čtení a zápis XML
Python
df = (spark.read
.format('xml')
.options(rowTag='book')
.load(xmlPath)) # books.xml
selected_data = df.select("author", "_id")
(selected_data.write
.options(rowTag='book', rootTag='books')
.xml('newbooks.xml'))
Scala
val df = spark.read
.option("rowTag", "book")
.xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("newbooks.xml")
R
df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
Při čtení dat můžete schéma zadat ručně:
Python
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
custom_schema = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("description", StringType(), True),
StructField("genre", StringType(), True),
StructField("price", DoubleType(), True),
StructField("publish_date", StringType(), True),
StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)
selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')
Scala
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")
R
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
Rozhraní API SQL
Zdroj dat XML může odvodit datové typy:
DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;
V DDL můžete také zadat názvy a typy sloupců. V tomto případě není schéma odvozeno automaticky.
DROP TABLE IF EXISTS books;
CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");
Načtení XML pomocí COPY INTO
DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;
COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');
Čtení XML s ověřením řádků
Python
df = (spark.read
.format("xml")
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.load(inputPath))
df.printSchema()
Scala
val df = spark.read
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.xml(inputPath)
df.printSchema
Analýza vnořeného XML (from_xml a schema_of_xml)
Python
from pyspark.sql.functions import from_xml, schema_of_xml, lit, col
xml_data = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>
"""
df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
Scala
import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}
val xmlData = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>""".stripMargin
val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()
from_xml a schema_of_xml s využitím rozhraní SQL API
SELECT from_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>',
schema_of_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>')
);
Načtení XML pomocí automatického zavaděče
Python
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(availableNow=True)
.toTable("table_name")
)
Scala
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(Trigger.AvailableNow()
.toTable("table_name")
)