XML-fájlok olvasása és írása
Fontos
Ez a funkció a nyilvános előzetes verzióban érhető el.
Ez a cikk az XML-fájlok olvasását és írását ismerteti.
A bővíthető korrektúranyelv (XML) az adatok szöveges formátumban történő formázására, tárolására és megosztására szolgáló korrektúranyelv. Az adatok szerializálására vonatkozó szabályokat határoz meg a dokumentumoktól az tetszőleges adatstruktúrákig.
A natív XML-fájlformátum támogatása lehetővé teszi az XML-adatok kötegelt feldolgozáshoz vagy streameléshez való betöltését, lekérdezését és elemzését. Képes automatikusan sémákat és adattípusokat kikövetkeztetni és fejleszteni, támogatja az OLYAN SQL-kifejezéseket, mint például from_xml
az XML-dokumentumok létrehozása. Nem igényel külső üvegeket, és zökkenőmentesen működik az Automatikus betöltővel read_files
és COPY INTO
a . Igény szerint ellenőrizheti az egyes sorszintű XML-rekordokat egy XML-sémadefiníció (XSD) alapján.
Követelmények
Databricks Runtime 14.3 és újabb verziók
XML-rekordok elemzése
Az XML-specifikáció egy jól formázott struktúrát ad meg. Ez a specifikáció azonban nem képez le azonnal táblázatos formátumot. Meg kell adnia annak az rowTag
XML-elemnek a jelölését, amely egy DataFrame
Row
. Az rowTag
elem lesz a legfelső szintű struct
. A gyermekelemek rowTag
a felső szint struct
mezőivé válnak.
Megadhatja a rekord sémáját, vagy hagyhatja, hogy az automatikusan következtethető legyen. Mivel az elemző csak az elemeket vizsgálja, a rendszer kiszűri a rowTag
DTD-t és a külső entitásokat.
Az alábbi példák egy XML-fájl sémakövetkeztetését és elemzését szemléltetik különböző rowTag
beállításokkal:
Python
xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)
Scala
val xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)
Olvassa el az XML-fájlt a "könyvek" lehetőséggel rowTag
:
Python
df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
Scala
val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)
Hozam:
root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)
+------------------------------------------------------------------------------+
|book |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+
Olvassa el az XML-fájlt rowTag
könyvként:
Python
df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:
Scala
val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:
Hozam:
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)
+-----+-----------+---------------+
|_id |author |title |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+
Adatforrás beállításai
Az XML adatforrás-beállításai a következő módokon adhatók meg:
- A
.option/.options
következők módszerei:- DataFrameReader
- DataFrameWriter
- DataStreamReader
- DataStreamWriter
- A következő beépített függvények:
- A
OPTIONS
CREATE TABLE USING DATA_SOURCE záradéka
A beállítások listáját az Automatikus betöltő beállításai című témakörben találja.
XSD-támogatás
Igény szerint az egyes sorszintű XML-rekordokat egy XML-sémadefinícióval (XSD) ellenőrizheti. Az XSD-fájl meg van adva a rowValidationXSDPath
beállításban. Az XSD egyébként nem befolyásolja a megadott vagy kikövetkezett sémát. Az ellenőrzés meghiúsuló rekordja "sérültként" van megjelölve, és a beállítás szakaszban leírt sérült rekordkezelési mód alapján lesz kezelve.
A Spark DataFrame-sémákat XSD-fájlból is kinyerheti XSDToSchema
. Csak egyszerű, összetett és szekvenciatípusokat támogat, és csak az alapszintű XSD-funkciókat támogatja.
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path
val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="book">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="title" type="xs:string" />
<xs:element name="genre" type="xs:string" />
<xs:element name="price" type="xs:decimal" />
<xs:element name="publish_date" type="xs:date" />
<xs:element name="description" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""
dbutils.fs.put(xsdPath, xsdString, true)
val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))
Az alábbi táblázat az XSD-adattípusok Spark-adattípusokra való konvertálását mutatja be:
XSD-adattípusok | Spark-adattípusok |
---|---|
boolean |
BooleanType |
decimal |
DecimalType |
unsignedLong |
DecimalType(38, 0) |
double |
DoubleType |
float |
FloatType |
byte |
ByteType |
short , unsignedByte |
ShortType |
integer , negativeInteger , nonNegativeInteger , nonPositiveInteger positiveInteger unsignedShort |
IntegerType |
long , unsignedInt |
LongType |
date |
DateType |
dateTime |
TimestampType |
Others |
StringType |
Beágyazott XML elemzése
A meglévő DataFrame sztringértékű oszlopában lévő XML-adatok elemezhetők schema_of_xml
, és from_xml
új oszlopként struct
visszaadják a sémát és az elemzési eredményeket. Az argumentumként schema_of_xml
from_xml
átadott XML-adatoknak egyetlen jól formázott XML-rekordnak kell lenniük.
schema_of_xml
Szintaxis
schema_of_xml(xmlStr [, options] )
Argumentumok
xmlStr
: Sztringkifejezés, amely egyetlen jól formázott XML-rekordot határoz meg.options
: Egy nem kötelezőMAP<STRING,STRING>
konstans, amely irányelveket határoz meg.
Visszatérési érték
Olyan karakterlánc, amely egy struktúra definícióját tartalmazza n sztringmezőkkel, ahol az oszlopnevek az XML-elemből és az attribútumnevekből származnak. A mezőértékek a származtatott formázott SQL-típusokat tartják.
from_xml
Szintaxis
from_xml(xmlStr, schema [, options])
Argumentumok
xmlStr
: Sztringkifejezés, amely egyetlen jól formázott XML-rekordot határoz meg.schema
: A függvény SZTRING-kifejezése vagy meghívásaschema_of_xml
.options
: Egy nem kötelezőMAP<STRING,STRING>
konstans, amely irányelveket határoz meg.
Visszatérési érték
A sémadefiníciónak megfelelő mezőneveket és típusokat tartalmazó szerkezet. A sémát vesszővel tagolt oszlopnévként és adattípuspárként kell definiálni, például CREATE TABLE
. Az adatforrás beállításaiban megjelenő legtöbb lehetőség a következő kivételekkel alkalmazható:
rowTag
: Mivel csak egy XML-rekord van, arowTag
beállítás nem alkalmazható.mode
(alapértelmezett:PERMISSIVE
): Lehetővé teszi a sérült rekordok elemzés közbeni kezelését.PERMISSIVE
: Ha egy sérült rekordnak megfelel, a hibásan formázott sztringet egy, a program általcolumnNameOfCorruptRecord
konfigurált mezőbe helyezi, és a hibásan formázott mezőket a következőrenull
állítja. A sérült rekordok megőrzéséhez beállíthat egy felhasználó által definiált sémában elnevezettcolumnNameOfCorruptRecord
sztring típusú mezőt. Ha egy séma nem rendelkezik a mezővel, az elemzés során a sérült rekordokat elveti. Séma következtetése esetén implicit módon hozzáad egy mezőt egycolumnNameOfCorruptRecord
kimeneti sémához.FAILFAST
: Kivételt eredményez, ha sérült rekordoknak felel meg.
Struktúraátalakítás
A DataFrame és az XML közötti struktúrabeli különbségek miatt van néhány konverziós szabály az XML-adatokról DataFrame
az XML-adatokra és az xml-adatokra DataFrame
. Vegye figyelembe, hogy az attribútumok kezelése letiltható a beállítással excludeAttribute
.
Konvertálás XML-ről DataFrame-re
Attribútumok: Az attribútumok a címsorelőtaggal attributePrefix
rendelkező mezőkké alakulnak.
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>
az alábbi sémát hozza létre:
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)
Attribútumot vagy gyermekelemet tartalmazó elem karakteradatai: Ezek a mezőkbe vannak valueTag
elemezve. Ha a karakteradatok több előfordulása is előfordul, a valueTag
mező típussá array
lesz konvertálva.
<one>
<two myTwoAttrib="BBBBB">two</two>
some value between elements
<three>three</three>
some other value between elements
</one>
az alábbi sémát hozza létre:
root
|-- _VALUE: array (nullable = true)
| |-- element: string (containsNull = true)
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
Konvertálás DataFrame-ről XML-re
Elem tömbként egy tömbben: Xml-fájl írása abból, DataFrame
hogy egy mező ArrayType
az elemével ArrayType
együtt egy további beágyazott mezővel rendelkezik az elemhez. Ez nem az XML-adatok olvasása és írása, hanem más forrásokból történő olvasás DataFrame
esetén fordul elő. Ezért az XML-fájlok olvasása és írása során a roundtrip ugyanazzal a struktúrával rendelkezik, de más forrásokból származó olvasás írása DataFrame
más struktúrával is rendelkezhet.
DataFrame az alábbi sémával:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
és az alábbi adatokkal:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
az alábbi XML-fájlt hozza létre:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>
A névtelen tömb elemnevét a DataFrame
beállítás arrayElementName
adja meg (Alapértelmezett: item
).
Mentett adatoszlop
A mentett adatoszlop gondoskodik arról, hogy az ETL során soha ne veszítsen el vagy maradjon ki az adatokból. Engedélyezheti, hogy a mentett adatoszlop rögzítse azokat az adatokat, amelyeket nem elemeztek, mert egy rekord egy vagy több mezője az alábbi problémák egyikével jár:
- Hiányzik a megadott sémából
- Nem egyezik a megadott séma adattípusával
- A megadott séma mezőneveivel nem egyezik meg a kis- és nagybetűk
A mentett adatoszlop JSON-dokumentumként lesz visszaadva, amely a mentett oszlopokat és a rekord forrásfájl-elérési útját tartalmazza. Ha el szeretné távolítani a forrásfájl elérési útját a mentett adatoszlopból, a következő SQL-konfigurációt állíthatja be:
Python
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
Scala
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").
A mentett adatoszlopot úgy engedélyezheti, hogy az adatok olvasásakor oszlopnévre állítja a beállítást rescuedDataColumn
, például _rescued_data
a következővel spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>)
: .
Az XML-elemző három módot támogat rekordok elemzésekor: PERMISSIVE
, DROPMALFORMED
és FAILFAST
. Ha együtt rescuedDataColumn
használják, az adattípus eltérései nem okoznak rekordokat módban, DROPMALFORMED
és nem jeleznek hibát FAILFAST
módban. A rendszer csak sérült rekordokat (hiányos vagy hibás formátumú XML)elvet vagy hibát jelez.
Sémakövetkeztetés és -fejlesztés az Automatikus betöltőben
A témakör és az alkalmazható lehetőségek részletes ismertetését lásd: Sémakövetkeztetés és -fejlesztés konfigurálása az Automatikus betöltőben. Az Automatikus betöltőt úgy konfigurálhatja, hogy automatikusan észlelje a betöltött XML-adatok sémáját, így anélkül inicializálhatja a táblákat, hogy explicit módon deklarálja az adatsémát, és új oszlopok bevezetésekor továbbfejleszti a táblázatsémát. Ez szükségtelenné teszi a sémamódosítások manuális nyomon követését és alkalmazását az idő függvényében.
Az automatikus betöltő sémakövetkeztetése alapértelmezés szerint a típuseltérések miatti sémafejlődési problémák elkerülésére törekszik. Olyan formátumok esetén, amelyek nem kódolnak adattípusokat (JSON, CSV és XML), az Automatikus betöltő minden oszlopot sztringként von le, beleértve az XML-fájlok beágyazott mezőit is. Az Apache Spark DataFrameReader
eltérő viselkedést használ a sémakövetkeztetéshez, és mintaadatok alapján választja ki az XML-források oszlopainak adattípusait. Ha engedélyezni szeretné ezt a viselkedést az Automatikus betöltővel, állítsa a beállítást a következőre cloudFiles.inferColumnTypes
true
: .
Az Automatikus betöltő észleli az új oszlopok hozzáadását az adatok feldolgozása során. Amikor az Automatikus betöltő új oszlopot észlel, a stream egy UnknownFieldException
. Mielőtt a stream ezt a hibát észleli, az Automatikus betöltő sémakövetkeztetést hajt végre a legújabb mikro-adatkötegen, és frissíti a séma helyét a legújabb sémával az új oszlopoknak a séma végéhez való egyesítésével. A meglévő oszlopok adattípusai változatlanok maradnak. Az Automatikus betöltő a sémafejlődés különböző módjait támogatja, amelyeket a beállításban cloudFiles.schemaEvolutionMode
állított be.
Sématippekkel kikényszerítheti azokat a sémainformációkat , amelyeket egy következtetett sémában ismer és vár. Ha tudja, hogy egy oszlop adott adattípusú, vagy ha általánosabb adattípust (például egész szám helyett dupla) szeretne választani, tetszőleges számú tippet adhat meg az oszlop adattípusaihoz sztringként az SQL-séma specifikációjának szintaxisával. Ha a mentett adatoszlop engedélyezve van, a rendszer betölti az oszlopba a sématól _rescued_data
eltérő esetben elnevezett mezőket. Ezt a viselkedést úgy módosíthatja, hogy a beállítást readerCaseSensitive
a következőre false
állítja: ebben az esetben az Automatikus betöltő kis- és nagybetűket nem érzékelyítő módon olvassa be az adatokat.
Példák
Az ebben a szakaszban szereplő példák egy, az Apache Spark GitHub-adattárban letölthető XML-fájlt használnak.
XML olvasása és írása
Python
df = (spark.read
.format('xml')
.options(rowTag='book')
.load(xmlPath)) # books.xml
selected_data = df.select("author", "_id")
(selected_data.write
.options(rowTag='book', rootTag='books')
.xml('newbooks.xml'))
Scala
val df = spark.read
.option("rowTag", "book")
.xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("newbooks.xml")
R
df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
Adatok olvasásakor manuálisan is megadhatja a sémát:
Python
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
custom_schema = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("description", StringType(), True),
StructField("genre", StringType(), True),
StructField("price", DoubleType(), True),
StructField("publish_date", StringType(), True),
StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)
selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')
Scala
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")
R
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
SQL API
Az XML-adatforrás adattípusokra következtethet:
DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;
A DDL-ben oszlopneveket és -típusokat is megadhat. Ebben az esetben a séma nem lesz automatikusan kikövetkeztetve.
DROP TABLE IF EXISTS books;
CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");
XML betöltése a COPY INTO használatával
DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;
COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');
XML olvasása sorérvényesítéssel
Python
df = (spark.read
.format("xml")
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.load(inputPath))
df.printSchema()
Scala
val df = spark.read
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.xml(inputPath)
df.printSchema
Beágyazott XML elemzése (from_xml és schema_of_xml)
Python
from pyspark.sql.functions import from_xml, schema_of_xml, lit, col
xml_data = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>
"""
df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
Scala
import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}
val xmlData = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>""".stripMargin
val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()
from_xml és schema_of_xml az SQL API-val
SELECT from_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>',
schema_of_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>')
);
XML betöltése automatikus betöltővel
Python
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(availableNow=True)
.toTable("table_name")
)
Scala
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(Trigger.AvailableNow()
.toTable("table_name")
)
További erőforrások
XML-adatok olvasása és írása a Spark-XML-kódtár használatával
Visszajelzés
https://aka.ms/ContentUserFeedback.
Hamarosan elérhető: 2024-ben fokozatosan kivezetjük a GitHub-problémákat a tartalom visszajelzési mechanizmusaként, és lecseréljük egy új visszajelzési rendszerre. További információ:Visszajelzés küldése és megtekintése a következőhöz: