Freigeben über


Lesen und Schreiben von XML-Daten mithilfe der spark-xml-Bibliothek

Wichtig

Diese Dokumentation wurde eingestellt und wird unter Umständen nicht aktualisiert. Die in diesen Inhalten genannten Produkte, Dienste oder Technologien werden von Databricks nicht offiziell unterstützt oder getestet.

Native XML-Dateiformatunterstützung ist als Public Preview verfügbar. Weitere Informationen unter Lesen und Schreiben von XML-Dateien.

In diesem Artikel wird das Lesen und Schreiben einer XML-Datei als Apache Spark-Datenquelle beschrieben.

Anforderungen

  1. Erstellen Sie die Bibliothek spark-xml als Maven-Bibliothek. Geben Sie für die Maven-Koordinate Folgendes an:

    • Databricks Runtime 7.x und höhere Versionen: com.databricks:spark-xml_2.12:<release>

    Die aktuelle Version von <release> finden Sie unter spark-xml-Releases.

  2. Installieren der Bibliothek auf einem Cluster.

Beispiel

Das Beispiel in diesem Abschnitt verwendet die XML-Datei books.

  1. So rufen Sie die XML-Datei „books“ ab:

    $ wget https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml
    
  2. Laden Sie die Datei in DBFS hoch.

Lesen und Schreiben von XML-Daten

SQL

/*Infer schema*/

CREATE TABLE books
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")

/*Specify column names and types*/

CREATE TABLE books (author string, description string, genre string, _id string, price double, publish_date string, title string)
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")

Scala

// Infer schema

import com.databricks.spark.xml._ // Add the DataFrame.read.xml() method

val df = spark.read
  .option("rowTag", "book")
  .xml("dbfs:/books.xml")

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("dbfs:/newbooks.xml")

// Specify schema

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))

val df = spark.read
  .option("rowTag", "book")
  .schema(customSchema)
  .xml("books.xml")

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("dbfs:/newbooks.xml")

R

# Infer schema

library(SparkR)

sparkR.session("local[4]", sparkPackages = c("com.databricks:spark-xml_2.12:<release>"))

df <- read.df("dbfs:/books.xml", source = "xml", rowTag = "book")

# Default `rootTag` and `rowTag`
write.df(df, "dbfs:/newbooks.xml", "xml")

# Specify schema

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- read.df("dbfs:/books.xml", source = "xml", schema = customSchema, rowTag = "book")

# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
write.df(df, "dbfs:/newbooks.xml", "xml", "overwrite")

Tastatur

  • Lesen
    • path: Speicherort von XML-Dateien. Akzeptiert standardmäßige Hadoop-Globbing-Ausdrücke.
    • rowTag: Das Zeilentag, das als Zeile behandelt werden soll. In dieser XML-Datei <books><book><book>...</books> wäre der Wert zum Beispiel book. Der Standardwert ist ROW.
    • samplingRatio: Samplingverhältnis für das abgeleitete Schema (0,0 ~ 1). Der Standardwert ist 1. Mögliche Typen sind StructType, ArrayType, StringType, LongType, DoubleType, BooleanType, TimestampType und NullType es sei denn, Sie geben ein Schema an.
    • excludeAttribute: Gibt an, ob Attribute in Elementen ausgeschlossen werden. Die Standardeinstellung ist "false".
    • nullValue: Der Wert, der als Wert behandelt werden null soll. Der Standardwert ist "".
    • mode: Der Modus für den Umgang mit beschädigten Datensätzen. Der Standardwert ist PERMISSIVE.
      • PERMISSIVE:
        • Wenn ein beschädigter Datensatz angezeigt wird, legt es alle Felder auf null fest und legt die falsch formatierte Zeichenfolge in ein neues Feld fest, das von columnNameOfCorruptRecord konfiguriert wird.
        • Wenn es auf ein Feld des falschen Datentyps stößt, setzt es das betreffende Feld auf null.
      • DROPMALFORMED: ignoriert beschädigte Datensätze.
      • FAILFAST: Hiermit wird eine Ausnahme ausgelöst, wenn beschädigte Datensätze erkannt werden.
    • inferSchema: wenn true, versucht, einen geeigneten Typ für jede resultierende DataFrame-Spalte, z. B. einen booleschen, numerischen oder Datumstyp, abgeleitet zu haben. Wenn false, sind alle resultierenden Spalten vom Zeichenfolgentyp. Der Standardwert ist true.
    • columnNameOfCorruptRecord: Der Name des neuen Felds, in dem falsch formatierte Zeichenfolgen gespeichert werden. Der Standardwert ist _corrupt_record.
    • attributePrefix: Das Präfix für Attribute, damit Attribute und Elemente unterschieden werden können. Dies ist das Präfix für Feldnamen. Der Standardwert ist _.
    • valueTag: Das Tag, das für den Wert verwendet wird, wenn Attribute in einem Element ohne untergeordnete Elemente enthalten sind. Der Standardwert ist _VALUE.
    • charset: Der Standardwert ist UTF-8 kann aber auf andere gültige Zeichensatznamen festgelegt werden.
    • ignoreSurroundingSpaces: Gibt an, ob Leerzeichen, die Werte umgeben, übersprungen werden sollen. Die Standardeinstellung ist "false".
    • rowValidationXSDPath: Pfad zu einer XSD-Datei, die verwendet wird, um den XML-Code für jede Zeile zu überprüfen. Zeilen, die nicht validiert werden können, werden wie oben beschrieben als Parse-Fehler behandelt. Die XSD wirkt sich nicht anderweitig auf das bereitgestellte oder abgeleitete Schema aus. Wenn derselbe lokale Pfad nicht bereits auf den Executors im Cluster sichtbar ist, sollten die XSD und alle anderen, von der sie abhängig sind, mit SparkContext.addFile den Spark-Executors hinzugefügt werden. In diesem Fall müssen Sie die lokale XSD /foo/bar.xsd verwenden, addFile("/foo/bar.xsd") aufrufen und "bar.xsd" als rowValidationXSDPath übergeben.
  • Schreiben
    • path: Speicherort zum Schreiben von Dateien.
    • rowTag: Das Zeilentag, das als Zeile behandelt werden soll. In dieser XML-Datei <books><book><book>...</books> wäre der Wert zum Beispiel book. Der Standardwert ist ROW.
    • rootTag: Das Stammtag, das als Stamm behandelt werden soll. In dieser XML-Datei <books><book><book>...</books> wäre der Wert zum Beispiel books. Der Standardwert ist ROWS.
    • nullValue: Der zu schreibende null-Wert. Der Standardwert ist die Zeichenfolge "null". Bei "null" werden keine Attribute und Elemente für Felder geschrieben.
    • attributePrefix: Das Präfix für Attribute, um Attribute und Elemente zu unterscheiden. Dies ist das Präfix für Feldnamen. Der Standardwert ist _.
    • valueTag: Das Tag, das für den Wert verwendet wird, wenn Attribute in einem Element ohne untergeordnete Elemente enthalten sind. Der Standardwert ist _VALUE.
    • compression: Komprimierungscodec, der beim Speichern in einer Datei verwendet werden soll. Sollte der vollqualifizierte Name einer implementierenden Klasse org.apache.hadoop.io.compress.CompressionCodec oder einer der von Groß- und Kleinschreibung unabhängigen Kurznamen (bzip2, gzip, lz4 und snappy) sein. Der Standardwert ist keine Komprimierung.

Unterstützt die verkürzte Namensverwendung. Sie können xml anstelle von com.databricks.spark.xml verwenden.

XSD-Support

Sie können einzelne Zeilen gegen ein XSD-Schema validieren, indem Sie rowValidationXSDPath verwenden.

Sie verwenden das Hilfsprogramm com.databricks.spark.xml.util.XSDToSchema, um ein Spark DataFrame-Schema aus einigen XSD-Dateien zu extrahieren. Es unterstützt nur einfache, komplexe und sequentielle Typen, nur grundlegende XSD-Funktionen und ist experimentell.

import com.databricks.spark.xml.util.XSDToSchema
import java.nio.file.Paths

val schema = XSDToSchema.read(Paths.get("/path/to/your.xsd"))
val df = spark.read.schema(schema)....xml(...)

Parsen von verschachteltem XML

Obwohl in erster Linie verwendet, um eine XML-Datei in einen DataFrame zu konvertieren, können Sie auch die Methode from_xml verwenden, um XML in einer String-bewerteten Spalte in einem bestehenden DataFrame zu parsen und es als eine neue Spalte mit geparsten Ergebnissen als eine Struktur hinzuzufügen:

import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._

val df = ... /// DataFrame with XML in column 'payload'
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))

Hinweis

  • mode:
    • Wenn der Standardwert auf PERMISSIVE gesetzt ist, wird der Parse-Modus stattdessen auf DROPMALFORMED gesetzt. Wenn Sie eine Spalte im Schema für from_xml enthalten, die columnNameOfCorruptRecord entspricht, gibt der Modus PERMISSIVE falsch formatierte Datensätze an diese Spalte in der resultierenden Struktur aus.
    • Wenn diese Methode auf DROPMALFORMEDfestgelegt ist, führen XML-Werte, die nicht ordnungsgemäß analysiert werden, zu einem Wert null für die Spalte. Es werden keine Zeilen gelöscht.
  • from_xml konvertiert Arrays von Zeichenfolgen, die XML enthalten, in Arrays von analysierten Strukturen. Verwenden Sie stattdessen schema_of_xml_array.
  • from_xml_string ist eine Alternative zur Verwendung in UDFs, die direkt auf einer Zeichenfolge statt auf einer Spalte arbeitet.

Konvertierungsregeln

Aufgrund struktureller Unterschiede zwischen DataFrames und XML gibt es einige Konvertierungsregeln von XML-Daten in DataFrame und von DataFrame in XML-Daten. Sie können die Behandlung von Attributen mit der Option excludeAttribute deaktivieren.

Konvertieren von XML in DataFrame

  • Attribute: Attribute werden als Felder mit dem in der Option attributePrefix angegebenen Präfix konvertiert. Wenn attributePrefix_ ist, produziert das Dokument

    <one myOneAttrib="AAAA">
        <two>two</two>
        <three>three</three>
    </one>
    

    das Schema:

    root
    |-- _myOneAttrib: string (nullable = true)
    |-- two: string (nullable = true)
    |-- three: string (nullable = true)
    
  • Wenn ein Element Attribute, aber keine untergeordneten Elemente enthält, wird der Attributwert in ein separates Feld in der Option valueTag angegeben. Wenn valueTag_VALUE ist, produziert das Dokument

    <one>
        <two myTwoAttrib="BBBBB">two</two>
        <three>three</three>
    </one>
    

    das Schema:

    root
    |-- two: struct (nullable = true)
    |    |-- _VALUE: string (nullable = true)
    |    |-- _myTwoAttrib: string (nullable = true)
    |-- three: string (nullable = true)
    

Konvertieren von DataFrame in XML

Das Schreiben einer XML-Datei aus DataFrame mit einem Feld ArrayType mit seinem Element ArrayType würde ein zusätzliches geschachtelte Feld für das Element haben. Dies geschieht nicht beim Lesen und Schreiben von XML-Daten, sondern beim Schreiben eines aus anderen Quellen gelesenen DataFrames. Daher hat roundtrip beim Lesen und Schreiben von XML-Dateien dieselbe Struktur, aber das Schreiben eines aus anderen Quellen gelesenen DataFrames kann eine andere Struktur haben.

Ein DataFrame mit dem Schema:

 |-- a: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

und den Daten:

+------------------------------------+
|                                   a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

produziert die XML-Datei:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>