XML-fájlok olvasása és írása

Fontos

Ez a funkció a nyilvános előzetes verzióban érhető el.

Ez a cikk az XML-fájlok olvasását és írását ismerteti.

A bővíthető korrektúranyelv (XML) az adatok szöveges formátumban történő formázására, tárolására és megosztására szolgáló korrektúranyelv. Az adatok szerializálására vonatkozó szabályokat határoz meg a dokumentumoktól az tetszőleges adatstruktúrákig.

A natív XML-fájlformátum támogatása lehetővé teszi az XML-adatok kötegelt feldolgozáshoz vagy streameléshez való betöltését, lekérdezését és elemzését. Képes automatikusan sémákat és adattípusokat kikövetkeztetni és fejleszteni, támogatja az OLYAN SQL-kifejezéseket, mint például from_xmlaz XML-dokumentumok létrehozása. Nem igényel külső üvegeket, és zökkenőmentesen működik az Automatikus betöltővel read_files és COPY INTOa .

Követelmények

Databricks Runtime 14.3 és újabb verziók

XML-rekordok elemzése

Az XML-specifikáció egy jól formázott struktúrát ad meg. Ez a specifikáció azonban nem képez le azonnal táblázatos formátumot. Meg kell adnia annak az rowTag XML-elemnek a jelölését, amely egy DataFrameRow. Az rowTag elem lesz a legfelső szintű struct. A gyermekelemek rowTag a felső szint structmezőivé válnak.

Megadhatja a rekord sémáját, vagy hagyhatja, hogy az automatikusan következtethető legyen. Mivel az elemző csak az elemeket vizsgálja, a rendszer kiszűri a rowTag DTD-t és a külső entitásokat.

Az alábbi példák egy XML-fájl sémakövetkeztetését és elemzését szemléltetik különböző rowTag beállításokkal:

Python

xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""

xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)

Scala

val xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)

Olvassa el az XML-fájlt a "könyvek" lehetőséggel rowTag :

Python

df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)

Scala

val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)

Kimenet:

root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)

+------------------------------------------------------------------------------+
|book                                                                          |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+

Olvassa el az XML-fájlt rowTag könyvként:

Python

df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:

Scala

val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:

Kimenet:

root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)

+-----+-----------+---------------+
|_id  |author     |title          |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+

Adatforrás beállításai

Az XML adatforrás-beállításai a következő módokon adhatók meg:

  • A .option/.options következők módszerei:
    • DataFrameReader
    • DataFrameWriter
    • DataStreamReader
    • DataStreamWriter
  • A következő beépített függvények:
  • A OPTIONS CREATE TABLE USING DATA_SOURCE záradéka

A beállítások listáját az Automatikus betöltő beállításai című témakörben találja.

XSD-támogatás

Igény szerint az egyes sorszintű XML-rekordokat egy XML-sémadefinícióval (XSD) ellenőrizheti. Az XSD-fájl meg van adva a rowValidationXSDPath beállításban. Az XSD egyébként nem befolyásolja a megadott vagy kikövetkezett sémát. Az ellenőrzés meghiúsuló rekordja "sérültként" van megjelölve, és a beállítás szakaszban leírt sérült rekordkezelési mód alapján lesz kezelve.

A Spark DataFrame-sémákat XSD-fájlból is kinyerheti XSDToSchema . Csak egyszerű, összetett és szekvenciatípusokat támogat, és csak az alapszintű XSD-funkciókat támogatja.

import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path

val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
  <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="book">
      <xs:complexType>
        <xs:sequence>
          <xs:element name="author" type="xs:string" />
          <xs:element name="title" type="xs:string" />
          <xs:element name="genre" type="xs:string" />
          <xs:element name="price" type="xs:decimal" />
          <xs:element name="publish_date" type="xs:date" />
          <xs:element name="description" type="xs:string" />
        </xs:sequence>
        <xs:attribute name="id" type="xs:string" use="required" />
      </xs:complexType>
    </xs:element>
  </xs:schema>"""

dbutils.fs.put(xsdPath, xsdString, true)

val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))

Az alábbi táblázat az XSD-adattípusok Spark-adattípusokra való konvertálását mutatja be:

XSD-adattípusok Spark-adattípusok
boolean BooleanType
decimal DecimalType
unsignedLong DecimalType(38, 0)
double DoubleType
float FloatType
byte ByteType
short, unsignedByte ShortType
integer, negativeInteger, nonNegativeInteger, nonPositiveIntegerpositiveIntegerunsignedShort IntegerType
long, unsignedInt LongType
date DateType
dateTime TimestampType
Others StringType

Beágyazott XML elemzése

A meglévő DataFrame sztringértékű oszlopában lévő XML-adatok elemezhetők schema_of_xml , és from_xml új oszlopként struct visszaadják a sémát és az elemzési eredményeket. Az argumentumként schema_of_xmlfrom_xml átadott XML-adatoknak egyetlen jól formázott XML-rekordnak kell lenniük.

schema_of_xml

Szintaxis

schema_of_xml(xmlStr [, options] )

Argumentumok

  • xmlStr: Sztringkifejezés, amely egyetlen jól formázott XML-rekordot határoz meg.
  • options: Egy nem kötelező MAP<STRING,STRING> konstans, amely irányelveket határoz meg.

Visszatérési érték

Olyan karakterlánc, amely egy struktúra definícióját tartalmazza n sztringmezőkkel, ahol az oszlopnevek az XML-elemből és az attribútumnevekből származnak. A mezőértékek a származtatott formázott SQL-típusokat tartják.

from_xml

Szintaxis

from_xml(xmlStr, schema [, options])

Argumentumok

  • xmlStr: Sztringkifejezés, amely egyetlen jól formázott XML-rekordot határoz meg.
  • schema: A függvény SZTRING-kifejezése vagy meghívása schema_of_xml .
  • options: Egy nem kötelező MAP<STRING,STRING> konstans, amely irányelveket határoz meg.

Visszatérési érték

A sémadefiníciónak megfelelő mezőneveket és típusokat tartalmazó szerkezet. A sémát vesszővel tagolt oszlopnévként és adattípuspárként kell definiálni, például CREATE TABLE. Az adatforrás beállításaiban megjelenő legtöbb lehetőség a következő kivételekkel alkalmazható:

  • rowTag: Mivel csak egy XML-rekord van, a rowTag beállítás nem alkalmazható.
  • mode (alapértelmezett: PERMISSIVE): Lehetővé teszi a sérült rekordok elemzés közbeni kezelését.
    • PERMISSIVE: Ha egy sérült rekordnak megfelel, a hibásan formázott sztringet egy, a program által columnNameOfCorruptRecordkonfigurált mezőbe helyezi, és a hibásan formázott mezőket a következőre nullállítja. A sérült rekordok megőrzéséhez beállíthat egy felhasználó által definiált sémában elnevezett columnNameOfCorruptRecord sztring típusú mezőt. Ha egy séma nem rendelkezik a mezővel, az elemzés során a sérült rekordokat elveti. Séma következtetése esetén implicit módon hozzáad egy mezőt egy columnNameOfCorruptRecord kimeneti sémához.
    • FAILFAST: Kivételt eredményez, ha sérült rekordoknak felel meg.

Struktúraátalakítás

A DataFrame és az XML közötti struktúrabeli különbségek miatt van néhány konverziós szabály az XML-adatokról DataFrame az XML-adatokra és az xml-adatokra DataFrame . Vegye figyelembe, hogy az attribútumok kezelése letiltható a beállítással excludeAttribute.

Konvertálás XML-ről DataFrame-re

Attribútumok: Az attribútumok a címsorelőtaggal attributePrefixrendelkező mezőkké alakulnak.

<one myOneAttrib="AAAA">
  <two>two</two>
  <three>three</three>
</one>

az alábbi sémát hozza létre:

root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)

Attribútumot vagy gyermekelemet tartalmazó elem karakteradatai: Ezek a mezőkbe vannak valueTag elemezve. Ha a karakteradatok több előfordulása is előfordul, a valueTag mező típussá array lesz konvertálva.

<one>
  <two myTwoAttrib="BBBBB">two</two>
  some value between elements
  <three>three</three>
  some other value between elements
</one>

az alábbi sémát hozza létre:

root
 |-- _VALUE: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- two: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)

Konvertálás DataFrame-ről XML-re

Elem tömbként egy tömbben: Xml-fájl írása abból, DataFrame hogy egy mező ArrayType az elemével ArrayType együtt egy további beágyazott mezővel rendelkezik az elemhez. Ez nem az XML-adatok olvasása és írása, hanem más forrásokból történő olvasás DataFrame esetén fordul elő. Ezért az XML-fájlok olvasása és írása során a roundtrip ugyanazzal a struktúrával rendelkezik, de más forrásokból származó olvasás írása DataFrame más struktúrával is rendelkezhet.

DataFrame az alábbi sémával:

|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)

és az alábbi adatokkal:

+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

az alábbi XML-fájlt hozza létre:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>

A névtelen tömb elemnevét a DataFrame beállítás arrayElementName adja meg (Alapértelmezett: item).

Mentett adatoszlop

A mentett adatoszlop gondoskodik arról, hogy az ETL során soha ne veszítsen el vagy maradjon ki az adatokból. Engedélyezheti, hogy a mentett adatoszlop rögzítse azokat az adatokat, amelyeket nem elemeztek, mert egy rekord egy vagy több mezője az alábbi problémák egyikével jár:

  • Hiányzik a megadott sémából
  • Nem egyezik a megadott séma adattípusával
  • A megadott séma mezőneveivel nem egyezik meg a kis- és nagybetűk

A mentett adatoszlop JSON-dokumentumként lesz visszaadva, amely a mentett oszlopokat és a rekord forrásfájl-elérési útját tartalmazza. Ha el szeretné távolítani a forrásfájl elérési útját a mentett adatoszlopból, a következő SQL-konfigurációt állíthatja be:

Python

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

Scala

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").

A mentett adatoszlopot úgy engedélyezheti, hogy az adatok olvasásakor oszlopnévre állítja a beállítást rescuedDataColumn , például _rescued_data a következővel spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>): .

Az XML-elemző három módot támogat rekordok elemzésekor: PERMISSIVE, DROPMALFORMEDés FAILFAST. Ha együtt rescuedDataColumnhasználják, az adattípus eltérései nem okoznak rekordokat módban, DROPMALFORMED és nem jeleznek hibát FAILFAST módban. A rendszer csak sérült rekordokat (hiányos vagy hibás formátumú XML)elvet vagy hibát jelez.

Sémakövetkeztetés és -fejlesztés az Automatikus betöltőben

A témakör és az alkalmazható lehetőségek részletes ismertetését lásd: Sémakövetkeztetés és -fejlesztés konfigurálása az Automatikus betöltőben. Az Automatikus betöltőt úgy konfigurálhatja, hogy automatikusan észlelje a betöltött XML-adatok sémáját, így anélkül inicializálhatja a táblákat, hogy explicit módon deklarálja az adatsémát, és új oszlopok bevezetésekor továbbfejleszti a táblázatsémát. Ez szükségtelenné teszi a sémamódosítások manuális nyomon követését és alkalmazását az idő függvényében.

Az automatikus betöltő sémakövetkeztetése alapértelmezés szerint a típuseltérések miatti sémafejlődési problémák elkerülésére törekszik. Olyan formátumok esetén, amelyek nem kódolnak adattípusokat (JSON, CSV és XML), az Automatikus betöltő minden oszlopot sztringként von le, beleértve az XML-fájlok beágyazott mezőit is. Az Apache Spark DataFrameReader eltérő viselkedést használ a sémakövetkeztetéshez, és mintaadatok alapján választja ki az XML-források oszlopainak adattípusait. Ha engedélyezni szeretné ezt a viselkedést az Automatikus betöltővel, állítsa a beállítást a következőre cloudFiles.inferColumnTypestrue: .

Az Automatikus betöltő észleli az új oszlopok hozzáadását az adatok feldolgozása során. Amikor az Automatikus betöltő új oszlopot észlel, a stream egy UnknownFieldException. Mielőtt a stream ezt a hibát észleli, az Automatikus betöltő sémakövetkeztetést hajt végre a legújabb mikro-adatkötegen, és frissíti a séma helyét a legújabb sémával az új oszlopoknak a séma végéhez való egyesítésével. A meglévő oszlopok adattípusai változatlanok maradnak. Az Automatikus betöltő a sémafejlődés különböző módjait támogatja, amelyeket a beállításban cloudFiles.schemaEvolutionModeállított be.

Sématippekkel kikényszerítheti azokat a sémainformációkat , amelyeket egy következtetett sémában ismer és vár. Ha tudja, hogy egy oszlop adott adattípusú, vagy ha általánosabb adattípust (például egész szám helyett dupla) szeretne választani, tetszőleges számú tippet adhat meg az oszlop adattípusaihoz sztringként az SQL-séma specifikációjának szintaxisával. Ha a mentett adatoszlop engedélyezve van, a rendszer betölti az oszlopba a sématól _rescued_data eltérő esetben elnevezett mezőket. Ezt a viselkedést úgy módosíthatja, hogy a beállítást readerCaseSensitive a következőre falseállítja: ebben az esetben az Automatikus betöltő kis- és nagybetűket nem érzékelyítő módon olvassa be az adatokat.

Példák

Az ebben a szakaszban szereplő példák egy, az Apache Spark GitHub-adattárban letölthető XML-fájlt használnak.

XML olvasása és írása

Python

df = (spark.read
  .format('xml')
  .options(rowTag='book')
  .load(xmlPath))  # books.xml

selected_data = df.select("author", "_id")
(selected_data.write
  .options(rowTag='book', rootTag='books')
  .xml('newbooks.xml'))

Scala

val df = spark.read
  .option("rowTag", "book")
  .xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("newbooks.xml")

R

df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

Adatok olvasásakor manuálisan is megadhatja a sémát:

Python

from pyspark.sql.types import StructType, StructField, StringType, DoubleType

custom_schema = StructType([
    StructField("_id", StringType(), True),
    StructField("author", StringType(), True),
    StructField("description", StringType(), True),
    StructField("genre", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("publish_date", StringType(), True),
    StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)

selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')

Scala

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")

R

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

SQL API

Az XML-adatforrás adattípusokra következtethet:

DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;

A DDL-ben oszlopneveket és -típusokat is megadhat. Ebben az esetben a séma nem lesz automatikusan kikövetkeztetve.

DROP TABLE IF EXISTS books;

CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");

XML betöltése a COPY INTO használatával

DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;

COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');

XML olvasása sorérvényesítéssel

Python

df = (spark.read
    .format("xml")
    .option("rowTag", "book")
    .option("rowValidationXSDPath", xsdPath)
    .load(inputPath))
df.printSchema()

Scala

val df = spark.read
  .option("rowTag", "book")
  .option("rowValidationXSDPath", xsdPath)
  .xml(inputPath)
df.printSchema

Beágyazott XML elemzése (from_xml és schema_of_xml)

Python

from pyspark.sql.functions import from_xml, schema_of_xml, lit, col

xml_data = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>
"""

df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()

Scala

import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}

val xmlData = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>""".stripMargin

val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()

from_xml és schema_of_xml az SQL API-val

SELECT from_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>',
  schema_of_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>')
);

XML betöltése automatikus betöltővel

Python

query = (spark
  .readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "xml")
    .option("rowTag", "book")
    .option("cloudFiles.inferColumnTypes", True)
    .option("cloudFiles.schemaLocation", schemaPath)
    .option("cloudFiles.schemaEvolutionMode", "rescue")
    .load(inputPath)
    .writeStream
    .format("delta")
    .option("mergeSchema", "true")
    .option("checkpointLocation", checkPointPath)
    .trigger(Trigger.AvailableNow()))

query = query.start(outputPath).awaitTermination()
df = spark.read.format("delta").load(outputPath)
df.show()

Scala

val query = spark
.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", true)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .format("delta")
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(Trigger.AvailableNow())

query.start(outputPath).awaitTermination()
val df = spark.read.format("delta").load(outputPath)
df.show()

További erőforrások

XML-adatok olvasása és írása a Spark-XML-kódtár használatával