Share via


Lesen und Schreiben von XML-Dateien

Wichtig

Dieses Feature befindet sich in der Public Preview.

In diesem Artikel wird beschrieben, wie XML-Dateien gelesen und geschrieben werden.

Extensible Markup Language (XML) ist eine Markupsprache zum Formatieren, Speichern und Freigeben von Daten im Textformat. Sie definiert verschiedene Regeln zum Serialisieren von Daten, die von Dokumenten bis hin zu beliebigen Datenstrukturen reichen.

Die native XML-Dateiformatunterstützung ermöglicht das Erfassen, Abfragen und Analysieren von XML-Daten für die Batchverarbeitung oder das Streaming. Sie kann Schema- und Datentypen automatisch ableiten und entwickeln, SQL-Ausdrücke wie from_xml unterstützen und XML-Dokumente generieren. Sie erfordert keine externen JAR-Dateien und arbeitet nahtlos mit Auto Loader, read_files und COPY INTO.

Anforderungen

Databricks Runtime 14.3 und höher

Parsen von XML-Datensätzen

Die XML-Spezifikation schreibt eine wohlgeformte Struktur vor. Diese Spezifikation wird jedoch nicht sofort einem Tabellenformat zugeordnet. Sie müssen mit der Option rowTag das XML-Element angeben, das DataFrameRow zugeordnet wird. Das rowTag-Element wird zur Struktur (struct) der obersten Ebene. Die untergeordneten Elemente von rowTag werden zu den Feldern der Struktur (struct) der obersten Ebene.

Sie können das Schema für diesen Datensatz angeben oder es automatisch ableiten lassen. Da der Parser nur die rowTag-Elemente untersucht, werden DTD und externe Entitäten herausgefiltert.

Die folgenden Beispiele veranschaulichen den Schemarückschluss und die Analyse einer XML-Datei mit unterschiedlichen rowTag-Optionen:

Python

xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""

xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)

Scala

val xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)

Lesen Sie die XML-Datei mit der rowTag-Option „books“:

Python

df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)

Scala

val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)

Ausgabe:

root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)

+------------------------------------------------------------------------------+
|book                                                                          |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+

Lesen Sie die XML-Datei mit der rowTag-Option „book“:

Python

df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:

Scala

val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:

Ausgabe:

root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)

+-----+-----------+---------------+
|_id  |author     |title          |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+

Datenquellenoptionen

Datenquellenoptionen für XML können wie folgt angegeben werden:

Eine Liste der Optionen finden Sie unter Optionen für Auto Loader.

XSD-Support

Sie können optional jeden XML-Datensatz auf Zeilenebene anhand einer XML-Schemadefinition (XSD) überprüfen. Die XSD-Datei wird in der Option rowValidationXSDPath angegeben. Die XSD wirkt sich nicht anderweitig auf das bereitgestellte oder abgeleitete Schema aus. Ein Datensatz, für den bei der Überprüfung ein Fehler auftritt, wird als „beschädigt“ markiert und basierend auf der im Optionsabschnitt beschriebenen Option für den Umgang mit beschädigten Datensätze behandelt.

Sie können XSDToSchema verwenden, um ein Spark DataFrame-Schema aus einer XSD-Datei zu extrahieren. Es unterstützt nur einfache, komplexe und sequentielle Typen und nur grundlegende XSD-Funktionen.

import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path

val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
  <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="book">
      <xs:complexType>
        <xs:sequence>
          <xs:element name="author" type="xs:string" />
          <xs:element name="title" type="xs:string" />
          <xs:element name="genre" type="xs:string" />
          <xs:element name="price" type="xs:decimal" />
          <xs:element name="publish_date" type="xs:date" />
          <xs:element name="description" type="xs:string" />
        </xs:sequence>
        <xs:attribute name="id" type="xs:string" use="required" />
      </xs:complexType>
    </xs:element>
  </xs:schema>"""

dbutils.fs.put(xsdPath, xsdString, true)

val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))

Die folgende Tabelle zeigt die Konvertierung von XSD-Datentypen in Spark-Datentypen:

XSD-Datentypen Spark-Datentypen
boolean BooleanType
decimal DecimalType
unsignedLong DecimalType(38, 0)
double DoubleType
float FloatType
byte ByteType
short, unsignedByte ShortType
integer, negativeInteger, nonNegativeInteger, nonPositiveInteger, positiveInteger, unsignedShort IntegerType
long, unsignedInt LongType
date DateType
dateTime TimestampType
Others StringType

Parsen von verschachteltem XML

XML-Daten in einer Spalte mit Zeichenfolgenwerten in einem vorhandenen Datenrahmen können mit schema_of_xml und from_xml analysiert werden. Dabei werden das Schema und die analysierten Ergebnisse als neue struct-Spalten zurückgegeben. XML-Daten, die als Argument an schema_of_xml und from_xml übergeben werden, müssen ein einzelner wohlgeformter XML-Datensatz sein.

schema_of_xml

Syntax

schema_of_xml(xmlStr [, options] )

Argumente

  • xmlStr: Ein STRING-Ausdruck, der einen einzelnen wohlgeformten XML-Datensatz angibt
  • options: Ein optionales MAP<STRING,STRING>-Literal, das Anweisungen angibt.

Rückgabe

Ein STRING-Wert mit der Definition einer Struktur mit n Zeichenfolgenfeldern, in denen die Spaltennamen von den XML-Elementnamen und -Attributnamen abgeleitet werden. Die Feldwerte enthalten die abgeleiteten formatierten SQL Typen.

from_xml

Syntax

from_xml(xmlStr, schema [, options])

Argumente

  • xmlStr: Ein STRING-Ausdruck, der einen einzelnen wohlgeformten XML-Datensatz angibt
  • schema: Ein STRING-Ausdruck oder ein Aufruf der schema_of_xml-Funktion
  • options: Ein optionales MAP<STRING,STRING>-Literal, das Anweisungen angibt.

Rückgabe

Eine Struktur mit Feldnamen und Typen, die mit der Schemadefinition übereinstimmen. Das Schema muss als Spaltennamen- und Datentyppaare mit Kommas als Trennzeichen definiert werden, z. B wie in CREATE TABLE. Die meisten Optionen in den Datenquellenoptionen sind mit folgenden Ausnahmen anwendbar:

  • rowTag: Da nur ein XML-Datensatz vorhanden ist, kann die Option rowTag nicht angewendet werden.
  • mode (Standardwert: PERMISSIVE): Hiermit wird die Behandlung von beschädigten Datensätzen beim Parsen festgelegt.
    • PERMISSIVE: Wenn ein beschädigter Datensatz erkannt wird, wird die falsch formatierte Zeichenfolge in ein durch columnNameOfCorruptRecord konfiguriertes Feld eingefügt, und die falsch formatierten Felder werden auf null festgelegt. Um beschädigte Datensätze beizubehalten, können Sie ein Zeichenfolgenfeld namens columnNameOfCorruptRecord in einem benutzerdefinierten Schema festlegen. Wenn das Feld nicht im Schema vorhanden ist, werden beschädigte Datensätze bei der Analyse gelöscht. Beim Ableiten eines Schemas wird in einem Ausgabeschema implizit ein columnNameOfCorruptRecord-Feld hinzugefügt.
    • FAILFAST: Hiermit wird eine Ausnahme ausgelöst, wenn beschädigte Datensätze erkannt werden.

Strukturkonvertierung

Aufgrund struktureller Unterschiede zwischen DataFrames und XML gibt es einige Konvertierungsregeln von XML-Daten in DataFrame und von DataFrame in XML-Daten. Beachten Sie, dass die Behandlung von Attributen mit der Option excludeAttribute deaktiviert werden kann.

Konvertieren von XML in DataFrame

Attribute: Attribute werden als Felder mit dem Headerpräfix attributePrefix konvertiert.

<one myOneAttrib="AAAA">
  <two>two</two>
  <three>three</three>
</one>

erzeugt folgendes Schema:

root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)

Zeichendaten in einem Element mit Attributen oder untergeordneten Elementen: Diese werden in das Feld valueTag geparst. Bei mehreren Vorkommen von Zeichendaten wird das Feld valueTag in einen array-Typ konvertiert.

<one>
  <two myTwoAttrib="BBBBB">two</two>
  some value between elements
  <three>three</three>
  some other value between elements
</one>

erzeugt folgendes Schema:

root
 |-- _VALUE: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- two: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)

Konvertieren von DataFrame in XML

Element als Array in einem Array: Beim Schreiben einer XML-Datei aus DataFrame mit dem Feld ArrayType und dem Element ArrayType wäre ein zusätzliches geschachtelte Feld für das Element vorhanden. Dies geschieht nicht beim Lesen und Schreiben von XML-Daten, sondern beim Schreiben eines aus anderen Quellen gelesenen DataFrame-Elements. Daher hat ein Roundtrip beim Lesen und Schreiben von XML-Dateien dieselbe Struktur, aber beim Schreiben eines aus anderen Quellen gelesenen DataFrame-Elements kann eine andere Struktur verwendet werden.

DataFrame mit folgendem Schema:

|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)

und mit folgenden Daten:

+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

erzeugt folgende XML-Datei:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>

Der Elementname des nicht benannten Arrays in DataFrame wird durch die Option arrayElementName angegeben (Standardwert: item).

Spalte „Gerettete Daten“

Die Spalte für gerettete Daten stellt sicher, dass während des ETL-Prozesses keine Daten verloren gehen oder übergangen werden. Sie können die Spalte für wiederhergestellte Daten aktivieren, um alle Daten zu erfassen, die nicht geparst wurden, weil mindestens ein Feld in einem Datensatz eins der folgenden Probleme aufweisen:

  • Fehlt im bereitgestellten Schema.
  • Stimmt nicht mit dem Datentyp des bereitgestellten Schemas überein.
  • Weist einen Konflikt bei Groß-/Kleinschreibung mit den Feldnamen im angegebenen Schema auf.

Die gerettete Datenspalte wird als JSON-Dokument zurückgegeben, das die geretteten Spalten und den Quelldateipfad des Datensatzes enthält. Um den Quelldateipfad aus der Spalte mit wiederhergestellten Daten zu entfernen, können Sie die SQL-Konfiguration festlegen_

Python

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

Scala

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").

Sie können die gerettete Datenspalte aktivieren, indem Sie die Option rescuedDataColumn beim Lesen von Daten auf einen Spaltennamen setzen, z. B. _rescued_data mit spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>).

Der XML-Parser unterstützt drei Modi beim Parsen von Datensätzen: PERMISSIVE, DROPMALFORMED und FAILFAST. Bei Verwendung mit rescuedDataColumn führen Datentypkonflikte nicht dazu, dass Datensätze im Modus DROPMALFORMED gelöscht werden oder im Modus FAILFAST einen Fehler auslösen. Nur beschädigte Datensätze (unvollständige oder falsch formatierte XML-Elemente) werden verworfen oder führen zu Fehlern.

Schemarückschluss und -entwicklung im Autoloader

Eine ausführliche Erläuterung dieses Themas und anwendbarer Optionen finden Sie unter Schemarückschluss und -entwicklung im Autoloader konfigurieren. Sie können den Autoloader so konfigurieren, dass das Schema der geladenen XML-Daten automatisch erkannt wird. Dadurch können Sie Tabellen initialisieren, ohne das Datenschema explizit zu deklarieren, und das Tabellenschema entwickeln, wenn neue Spalten eingeführt werden. Dadurch wird die Notwendigkeit beseitigt, Schemaänderungen im Laufe der Zeit manuell nachverfolgen und anwenden zu müssen.

Standardmäßig versucht der Autoloader-Schemarückschluss, Schemaentwicklungsprobleme aufgrund von Typkonflikten zu vermeiden. Für Formate, die Datentypen (JSON, CSV und XML) nicht codieren, führt der Autoloader einen Rückschluss für alle Spalten als Zeichenfolgen aus (einschließlich geschachtelter Felder in XML-Dateien). DataFrameReader in Apache Spark verwendet ein anderes Verhalten für Schemarückschlüsse: Für Spalten in XML-Quellen werden auf Grundlage von Beispieldaten Datentypen auswählt. Um dieses Verhalten im Autoloader zu aktivieren, legen Sie die Option cloudFiles.inferColumnTypes auf true fest.

Autoloader erkennt das Hinzufügen neuer Spalten, während er Ihre Daten verarbeitet. Wenn Autoloader eine neue Spalte erkennt, wird der Stream mit einer UnknownFieldException beendet. Bevor Ihr Stream diesen Fehler auslöst, führt Autoloader einen Schemarückschluss für den letzten Mikrobatch von Daten durch und aktualisiert den Schemaspeicherort mit dem neuesten Schema, indem neue Spalten am Ende des Schemas zusammengeführt werden. Die Datentypen vorhandener Spalten bleiben unverändert. Der Autoloader unterstützt die folgenden Modi für die Schemaentwicklung, die Sie in der Option cloudFiles.schemaEvolutionMode festlegen.

Sie können Schemahinweise verwenden, um die Schemainformationen zu erzwingen, die Sie kennen und für ein rückgeschlossenes Schema erwarten. Wenn Sie wissen, dass eine Spalte einen bestimmten Datentyp hat, oder wenn Sie einen noch allgemeineren Datentyp wählen möchten (z. B. „double“ statt „integer“), können Sie eine beliebige Anzahl von Hinweisen für die Datentypen von Spalten als eine Zeichenfolge mithilfe der SQL-Schemaspezifikationssyntax angeben. Wenn die Spalte für wiederhergestellte Daten aktiviert ist, werden Felder, deren Name eine andere Groß-/Kleinschreibung als das Schema aufweist, in die Spalte _rescued_data geladen. Sie können dieses Verhalten ändern, indem Sie die Option readerCaseSensitive auf false festlegen. In diesem Fall werden vom Autoloader Daten gelesen, ohne dass zwischen Groß- und Kleinschreibung unterschieden wird.

Beispiele

Die Beispiele in diesem Abschnitt verwenden eine XML-Datei, die im GitHub-Repository für Apache Spark zum Download zur Verfügung steht.

Lesen und Schreiben von XML

Python

df = (spark.read
  .format('xml')
  .options(rowTag='book')
  .load(xmlPath))  # books.xml

selected_data = df.select("author", "_id")
(selected_data.write
  .options(rowTag='book', rootTag='books')
  .xml('newbooks.xml'))

Scala

val df = spark.read
  .option("rowTag", "book")
  .xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("newbooks.xml")

R

df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

Sie können das Schema beim Lesen von Daten manuell angeben:

Python

from pyspark.sql.types import StructType, StructField, StringType, DoubleType

custom_schema = StructType([
    StructField("_id", StringType(), True),
    StructField("author", StringType(), True),
    StructField("description", StringType(), True),
    StructField("genre", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("publish_date", StringType(), True),
    StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)

selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')

Scala

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")

R

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

SQL-API

Die XML-Datenquelle kann Datentypen ableiten:

DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;

Sie können auch Spaltennamen und -typen in DDL angeben. In diesem Fall wird das Schema nicht automatisch abgeleitet.

DROP TABLE IF EXISTS books;

CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");

Laden von XML mithilfe von COPY INTO

DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;

COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');

Lesen von XML mit Zeilenüberprüfung

Python

df = (spark.read
    .format("xml")
    .option("rowTag", "book")
    .option("rowValidationXSDPath", xsdPath)
    .load(inputPath))
df.printSchema()

Scala

val df = spark.read
  .option("rowTag", "book")
  .option("rowValidationXSDPath", xsdPath)
  .xml(inputPath)
df.printSchema

Parsen geschachtelter XML-Daten („from_xml“ und „schema_of_xml“)

Python

from pyspark.sql.functions import from_xml, schema_of_xml, lit, col

xml_data = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>
"""

df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()

Scala

import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}

val xmlData = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>""".stripMargin

val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()

„from_xml“ und „schema_of_xml“ mit SQL-API

SELECT from_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>',
  schema_of_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>')
);

Laden von XML mit dem Autoloader

Python

query = (spark
  .readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "xml")
    .option("rowTag", "book")
    .option("cloudFiles.inferColumnTypes", True)
    .option("cloudFiles.schemaLocation", schemaPath)
    .option("cloudFiles.schemaEvolutionMode", "rescue")
    .load(inputPath)
    .writeStream
    .format("delta")
    .option("mergeSchema", "true")
    .option("checkpointLocation", checkPointPath)
    .trigger(Trigger.AvailableNow()))

query = query.start(outputPath).awaitTermination()
df = spark.read.format("delta").load(outputPath)
df.show()

Scala

val query = spark
.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", true)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .format("delta")
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(Trigger.AvailableNow())

query.start(outputPath).awaitTermination()
val df = spark.read.format("delta").load(outputPath)
df.show()

Zusätzliche Ressourcen

Lesen und Schreiben von XML-Daten mithilfe der Spark-XML-Bibliothek