Lesen und Schreiben von XML-Dateien
Wichtig
Dieses Feature befindet sich in der Public Preview.
In diesem Artikel wird beschrieben, wie XML-Dateien gelesen und geschrieben werden.
Extensible Markup Language (XML) ist eine Markupsprache zum Formatieren, Speichern und Freigeben von Daten im Textformat. Sie definiert verschiedene Regeln zum Serialisieren von Daten, die von Dokumenten bis hin zu beliebigen Datenstrukturen reichen.
Die native XML-Dateiformatunterstützung ermöglicht das Erfassen, Abfragen und Analysieren von XML-Daten für die Batchverarbeitung oder das Streaming. Sie kann Schema- und Datentypen automatisch ableiten und entwickeln, SQL-Ausdrücke wie from_xml
unterstützen und XML-Dokumente generieren. Sie erfordert keine externen JAR-Dateien und arbeitet nahtlos mit Auto Loader, read_files
und COPY INTO
. Sie können optional jeden XML-Datensatz auf Zeilenebene anhand einer XML-Schemadefinition (XSD) überprüfen.
Anforderungen
Databricks Runtime 14.3 und höher
Parsen von XML-Datensätzen
Die XML-Spezifikation schreibt eine wohlgeformte Struktur vor. Diese Spezifikation wird jedoch nicht sofort einem Tabellenformat zugeordnet. Sie müssen mit der Option rowTag
das XML-Element angeben, das DataFrame
Row
zugeordnet wird. Das rowTag
-Element wird zur Struktur (struct
) der obersten Ebene. Die untergeordneten Elemente von rowTag
werden zu den Feldern der Struktur (struct
) der obersten Ebene.
Sie können das Schema für diesen Datensatz angeben oder es automatisch ableiten lassen. Da der Parser nur die rowTag
-Elemente untersucht, werden DTD und externe Entitäten herausgefiltert.
Die folgenden Beispiele veranschaulichen den Schemarückschluss und die Analyse einer XML-Datei mit unterschiedlichen rowTag
-Optionen:
Python
xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)
Scala
val xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)
Lesen Sie die XML-Datei mit der rowTag
-Option „books“:
Python
df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
Scala
val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)
Ausgabe:
root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)
+------------------------------------------------------------------------------+
|book |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+
Lesen Sie die XML-Datei mit der rowTag
-Option „book“:
Python
df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:
Scala
val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:
Ausgabe:
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)
+-----+-----------+---------------+
|_id |author |title |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+
Datenquellenoptionen
Datenquellenoptionen für XML können wie folgt angegeben werden:
- Mit den
.option/.options
-Methoden folgender Elemente:- DataFrameReader
- DataFrameWriter
- DataStreamReader
- DataStreamWriter
- Mit den folgenden integrierten Funktionen:
- Mit der
OPTIONS
-Klausel von CREATE TABLE USING DATA_SOURCE
Eine Liste der Optionen finden Sie unter Optionen für Auto Loader.
XSD-Support
Sie können optional jeden XML-Datensatz auf Zeilenebene anhand einer XML-Schemadefinition (XSD) überprüfen. Die XSD-Datei wird in der Option rowValidationXSDPath
angegeben. Die XSD wirkt sich nicht anderweitig auf das bereitgestellte oder abgeleitete Schema aus. Ein Datensatz, für den bei der Überprüfung ein Fehler auftritt, wird als „beschädigt“ markiert und basierend auf der im Optionsabschnitt beschriebenen Option für den Umgang mit beschädigten Datensätze behandelt.
Sie können XSDToSchema
verwenden, um ein Spark DataFrame-Schema aus einer XSD-Datei zu extrahieren. Es unterstützt nur einfache, komplexe und sequentielle Typen und nur grundlegende XSD-Funktionen.
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path
val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="book">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="title" type="xs:string" />
<xs:element name="genre" type="xs:string" />
<xs:element name="price" type="xs:decimal" />
<xs:element name="publish_date" type="xs:date" />
<xs:element name="description" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""
dbutils.fs.put(xsdPath, xsdString, true)
val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))
Die folgende Tabelle zeigt die Konvertierung von XSD-Datentypen in Spark-Datentypen:
XSD-Datentypen | Spark-Datentypen |
---|---|
boolean |
BooleanType |
decimal |
DecimalType |
unsignedLong |
DecimalType(38, 0) |
double |
DoubleType |
float |
FloatType |
byte |
ByteType |
short , unsignedByte |
ShortType |
integer , negativeInteger , nonNegativeInteger , nonPositiveInteger , positiveInteger , unsignedShort |
IntegerType |
long , unsignedInt |
LongType |
date |
DateType |
dateTime |
TimestampType |
Others |
StringType |
Parsen von verschachteltem XML
XML-Daten in einer Spalte mit Zeichenfolgenwerten in einem vorhandenen Datenrahmen können mit schema_of_xml
und from_xml
analysiert werden. Dabei werden das Schema und die analysierten Ergebnisse als neue struct
-Spalten zurückgegeben. XML-Daten, die als Argument an schema_of_xml
und from_xml
übergeben werden, müssen ein einzelner wohlgeformter XML-Datensatz sein.
schema_of_xml
Syntax
schema_of_xml(xmlStr [, options] )
Argumente
xmlStr
: Ein STRING-Ausdruck, der einen einzelnen wohlgeformten XML-Datensatz angibtoptions
: Ein optionalesMAP<STRING,STRING>
-Literal, das Anweisungen angibt.
Rückgabe
Ein STRING-Wert mit der Definition einer Struktur mit n Zeichenfolgenfeldern, in denen die Spaltennamen von den XML-Elementnamen und -Attributnamen abgeleitet werden. Die Feldwerte enthalten die abgeleiteten formatierten SQL Typen.
from_xml
Syntax
from_xml(xmlStr, schema [, options])
Argumente
xmlStr
: Ein STRING-Ausdruck, der einen einzelnen wohlgeformten XML-Datensatz angibtschema
: Ein STRING-Ausdruck oder ein Aufruf derschema_of_xml
-Funktionoptions
: Ein optionalesMAP<STRING,STRING>
-Literal, das Anweisungen angibt.
Rückgabe
Eine Struktur mit Feldnamen und Typen, die mit der Schemadefinition übereinstimmen. Das Schema muss als Spaltennamen- und Datentyppaare mit Kommas als Trennzeichen definiert werden, z. B wie in CREATE TABLE
. Die meisten Optionen in den Datenquellenoptionen sind mit folgenden Ausnahmen anwendbar:
rowTag
: Da nur ein XML-Datensatz vorhanden ist, kann die OptionrowTag
nicht angewendet werden.mode
(Standardwert:PERMISSIVE
): Hiermit wird die Behandlung von beschädigten Datensätzen beim Parsen festgelegt.PERMISSIVE
: Wenn ein beschädigter Datensatz erkannt wird, wird die falsch formatierte Zeichenfolge in ein durchcolumnNameOfCorruptRecord
konfiguriertes Feld eingefügt, und die falsch formatierten Felder werden aufnull
festgelegt. Um beschädigte Datensätze beizubehalten, können Sie ein Zeichenfolgenfeld namenscolumnNameOfCorruptRecord
in einem benutzerdefinierten Schema festlegen. Wenn das Feld nicht im Schema vorhanden ist, werden beschädigte Datensätze bei der Analyse gelöscht. Beim Ableiten eines Schemas wird in einem Ausgabeschema implizit eincolumnNameOfCorruptRecord
-Feld hinzugefügt.FAILFAST
: Hiermit wird eine Ausnahme ausgelöst, wenn beschädigte Datensätze erkannt werden.
Strukturkonvertierung
Aufgrund struktureller Unterschiede zwischen DataFrames und XML gibt es einige Konvertierungsregeln von XML-Daten in DataFrame
und von DataFrame
in XML-Daten. Beachten Sie, dass die Behandlung von Attributen mit der Option excludeAttribute
deaktiviert werden kann.
Konvertieren von XML in DataFrame
Attribute: Attribute werden als Felder mit dem Headerpräfix attributePrefix
konvertiert.
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>
erzeugt folgendes Schema:
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)
Zeichendaten in einem Element mit Attributen oder untergeordneten Elementen: Diese werden in das Feld valueTag
geparst. Bei mehreren Vorkommen von Zeichendaten wird das Feld valueTag
in einen array
-Typ konvertiert.
<one>
<two myTwoAttrib="BBBBB">two</two>
some value between elements
<three>three</three>
some other value between elements
</one>
erzeugt folgendes Schema:
root
|-- _VALUE: array (nullable = true)
| |-- element: string (containsNull = true)
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
Konvertieren von DataFrame in XML
Element als Array in einem Array: Beim Schreiben einer XML-Datei aus DataFrame
mit dem Feld ArrayType
und dem Element ArrayType
wäre ein zusätzliches geschachtelte Feld für das Element vorhanden. Dies geschieht nicht beim Lesen und Schreiben von XML-Daten, sondern beim Schreiben eines aus anderen Quellen gelesenen DataFrame
-Elements. Daher hat ein Roundtrip beim Lesen und Schreiben von XML-Dateien dieselbe Struktur, aber beim Schreiben eines aus anderen Quellen gelesenen DataFrame
-Elements kann eine andere Struktur verwendet werden.
DataFrame mit folgendem Schema:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
und mit folgenden Daten:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
erzeugt folgende XML-Datei:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>
Der Elementname des nicht benannten Arrays in DataFrame
wird durch die Option arrayElementName
angegeben (Standardwert: item
).
Spalte „Gerettete Daten“
Die Spalte für gerettete Daten stellt sicher, dass während des ETL-Prozesses keine Daten verloren gehen oder übergangen werden. Sie können die Spalte für wiederhergestellte Daten aktivieren, um alle Daten zu erfassen, die nicht geparst wurden, weil mindestens ein Feld in einem Datensatz eins der folgenden Probleme aufweisen:
- Fehlt im bereitgestellten Schema.
- Stimmt nicht mit dem Datentyp des bereitgestellten Schemas überein.
- Weist einen Konflikt bei Groß-/Kleinschreibung mit den Feldnamen im angegebenen Schema auf.
Die gerettete Datenspalte wird als JSON-Dokument zurückgegeben, das die geretteten Spalten und den Quelldateipfad des Datensatzes enthält. Um den Quelldateipfad aus der Spalte mit wiederhergestellten Daten zu entfernen, können Sie die SQL-Konfiguration festlegen_
Python
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
Scala
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").
Sie können die gerettete Datenspalte aktivieren, indem Sie die Option rescuedDataColumn
beim Lesen von Daten auf einen Spaltennamen setzen, z. B. _rescued_data
mit spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>)
.
Der XML-Parser unterstützt drei Modi beim Parsen von Datensätzen: PERMISSIVE
, DROPMALFORMED
und FAILFAST
. Bei Verwendung mit rescuedDataColumn
führen Datentypkonflikte nicht dazu, dass Datensätze im Modus DROPMALFORMED
gelöscht werden oder im Modus FAILFAST
einen Fehler auslösen. Nur beschädigte Datensätze (unvollständige oder falsch formatierte XML-Elemente) werden verworfen oder führen zu Fehlern.
Schemarückschluss und -entwicklung im Autoloader
Eine ausführliche Erläuterung dieses Themas und anwendbarer Optionen finden Sie unter Schemarückschluss und -entwicklung im Autoloader konfigurieren. Sie können den Autoloader so konfigurieren, dass das Schema der geladenen XML-Daten automatisch erkannt wird. Dadurch können Sie Tabellen initialisieren, ohne das Datenschema explizit zu deklarieren, und das Tabellenschema entwickeln, wenn neue Spalten eingeführt werden. Dadurch wird die Notwendigkeit beseitigt, Schemaänderungen im Laufe der Zeit manuell nachverfolgen und anwenden zu müssen.
Standardmäßig versucht der Autoloader-Schemarückschluss, Schemaentwicklungsprobleme aufgrund von Typkonflikten zu vermeiden. Für Formate, die Datentypen (JSON, CSV und XML) nicht codieren, führt der Autoloader einen Rückschluss für alle Spalten als Zeichenfolgen aus (einschließlich geschachtelter Felder in XML-Dateien). DataFrameReader
in Apache Spark verwendet ein anderes Verhalten für Schemarückschlüsse: Für Spalten in XML-Quellen werden auf Grundlage von Beispieldaten Datentypen auswählt. Um dieses Verhalten im Autoloader zu aktivieren, legen Sie die Option cloudFiles.inferColumnTypes
auf true
fest.
Autoloader erkennt das Hinzufügen neuer Spalten, während er Ihre Daten verarbeitet. Wenn Autoloader eine neue Spalte erkennt, wird der Stream mit einer UnknownFieldException
beendet. Bevor Ihr Stream diesen Fehler auslöst, führt Autoloader einen Schemarückschluss für den letzten Mikrobatch von Daten durch und aktualisiert den Schemaspeicherort mit dem neuesten Schema, indem neue Spalten am Ende des Schemas zusammengeführt werden. Die Datentypen vorhandener Spalten bleiben unverändert. Der Autoloader unterstützt die folgenden Modi für die Schemaentwicklung, die Sie in der Option cloudFiles.schemaEvolutionMode
festlegen.
Sie können Schemahinweise verwenden, um die Schemainformationen zu erzwingen, die Sie kennen und für ein rückgeschlossenes Schema erwarten. Wenn Sie wissen, dass eine Spalte einen bestimmten Datentyp hat, oder wenn Sie einen noch allgemeineren Datentyp wählen möchten (z. B. „double“ statt „integer“), können Sie eine beliebige Anzahl von Hinweisen für die Datentypen von Spalten als eine Zeichenfolge mithilfe der SQL-Schemaspezifikationssyntax angeben. Wenn die Spalte für wiederhergestellte Daten aktiviert ist, werden Felder, deren Name eine andere Groß-/Kleinschreibung als das Schema aufweist, in die Spalte _rescued_data
geladen. Sie können dieses Verhalten ändern, indem Sie die Option readerCaseSensitive
auf false
festlegen. In diesem Fall werden vom Autoloader Daten gelesen, ohne dass zwischen Groß- und Kleinschreibung unterschieden wird.
Beispiele
Die Beispiele in diesem Abschnitt verwenden eine XML-Datei, die im GitHub-Repository für Apache Spark zum Download zur Verfügung steht.
Lesen und Schreiben von XML
Python
df = (spark.read
.format('xml')
.options(rowTag='book')
.load(xmlPath)) # books.xml
selected_data = df.select("author", "_id")
(selected_data.write
.options(rowTag='book', rootTag='books')
.xml('newbooks.xml'))
Scala
val df = spark.read
.option("rowTag", "book")
.xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("newbooks.xml")
R
df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
Sie können das Schema beim Lesen von Daten manuell angeben:
Python
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
custom_schema = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("description", StringType(), True),
StructField("genre", StringType(), True),
StructField("price", DoubleType(), True),
StructField("publish_date", StringType(), True),
StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)
selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')
Scala
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")
R
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
SQL-API
Die XML-Datenquelle kann Datentypen ableiten:
DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;
Sie können auch Spaltennamen und -typen in DDL angeben. In diesem Fall wird das Schema nicht automatisch abgeleitet.
DROP TABLE IF EXISTS books;
CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");
Laden von XML mithilfe von COPY INTO
DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;
COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');
Lesen von XML mit Zeilenüberprüfung
Python
df = (spark.read
.format("xml")
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.load(inputPath))
df.printSchema()
Scala
val df = spark.read
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.xml(inputPath)
df.printSchema
Parsen geschachtelter XML-Daten („from_xml“ und „schema_of_xml“)
Python
from pyspark.sql.functions import from_xml, schema_of_xml, lit, col
xml_data = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>
"""
df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
Scala
import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}
val xmlData = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>""".stripMargin
val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()
„from_xml“ und „schema_of_xml“ mit SQL-API
SELECT from_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>',
schema_of_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>')
);
Laden von XML mit dem Autoloader
Python
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(availableNow=True)
.toTable("table_name")
)
Scala
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(Trigger.AvailableNow()
.toTable("table_name")
)
Zusätzliche Ressourcen
Lesen und Schreiben von XML-Daten mithilfe der Spark-XML-Bibliothek