Lesen und Schreiben von XML-Daten mithilfe der spark-xml
-Bibliothek
Wichtig
Diese Dokumentation wurde eingestellt und wird unter Umständen nicht aktualisiert. Die in diesen Inhalten genannten Produkte, Dienste oder Technologien werden von Databricks nicht offiziell unterstützt oder getestet.
Native XML-Dateiformatunterstützung ist als Public Preview verfügbar. Weitere Informationen unter Lesen und Schreiben von XML-Dateien.
In diesem Artikel wird das Lesen und Schreiben einer XML-Datei als Apache Spark-Datenquelle beschrieben.
Anforderungen
Erstellen Sie die Bibliothek
spark-xml
als Maven-Bibliothek. Geben Sie für die Maven-Koordinate Folgendes an:- Databricks Runtime 7.x und höhere Versionen:
com.databricks:spark-xml_2.12:<release>
Die aktuelle Version von
<release>
finden Sie unterspark-xml
-Releases.- Databricks Runtime 7.x und höhere Versionen:
Installieren der Bibliothek auf einem Cluster.
Beispiel
Das Beispiel in diesem Abschnitt verwendet die XML-Datei books.
So rufen Sie die XML-Datei „books“ ab:
$ wget https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml
Laden Sie die Datei in DBFS hoch.
Lesen und Schreiben von XML-Daten
SQL
/*Infer schema*/
CREATE TABLE books
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")
/*Specify column names and types*/
CREATE TABLE books (author string, description string, genre string, _id string, price double, publish_date string, title string)
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")
Scala
// Infer schema
import com.databricks.spark.xml._ // Add the DataFrame.read.xml() method
val df = spark.read
.option("rowTag", "book")
.xml("dbfs:/books.xml")
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("dbfs:/newbooks.xml")
// Specify schema
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read
.option("rowTag", "book")
.schema(customSchema)
.xml("books.xml")
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("dbfs:/newbooks.xml")
R
# Infer schema
library(SparkR)
sparkR.session("local[4]", sparkPackages = c("com.databricks:spark-xml_2.12:<release>"))
df <- read.df("dbfs:/books.xml", source = "xml", rowTag = "book")
# Default `rootTag` and `rowTag`
write.df(df, "dbfs:/newbooks.xml", "xml")
# Specify schema
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- read.df("dbfs:/books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
write.df(df, "dbfs:/newbooks.xml", "xml", "overwrite")
Tastatur
- Lesen
path
: Speicherort von XML-Dateien. Akzeptiert standardmäßige Hadoop-Globbing-Ausdrücke.rowTag
: Das Zeilentag, das als Zeile behandelt werden soll. In dieser XML-Datei<books><book><book>...</books>
wäre der Wert zum Beispielbook
. Der Standardwert istROW
.samplingRatio
: Samplingverhältnis für das abgeleitete Schema (0,0 ~ 1). Der Standardwert ist 1. Mögliche Typen sindStructType
,ArrayType
,StringType
,LongType
,DoubleType
,BooleanType
,TimestampType
undNullType
es sei denn, Sie geben ein Schema an.excludeAttribute
: Gibt an, ob Attribute in Elementen ausgeschlossen werden. Die Standardeinstellung ist "false".nullValue
: Der Wert, der als Wert behandelt werdennull
soll. Der Standardwert ist""
.mode
: Der Modus für den Umgang mit beschädigten Datensätzen. Der Standardwert istPERMISSIVE
.PERMISSIVE
:- Wenn ein beschädigter Datensatz angezeigt wird, legt es alle Felder auf
null
fest und legt die falsch formatierte Zeichenfolge in ein neues Feld fest, das voncolumnNameOfCorruptRecord
konfiguriert wird. - Wenn es auf ein Feld des falschen Datentyps stößt, setzt es das betreffende Feld auf
null
.
- Wenn ein beschädigter Datensatz angezeigt wird, legt es alle Felder auf
DROPMALFORMED
: ignoriert beschädigte Datensätze.FAILFAST
: Hiermit wird eine Ausnahme ausgelöst, wenn beschädigte Datensätze erkannt werden.
inferSchema
: wenntrue
, versucht, einen geeigneten Typ für jede resultierende DataFrame-Spalte, z. B. einen booleschen, numerischen oder Datumstyp, abgeleitet zu haben. Wennfalse
, sind alle resultierenden Spalten vom Zeichenfolgentyp. Der Standardwert isttrue
.columnNameOfCorruptRecord
: Der Name des neuen Felds, in dem falsch formatierte Zeichenfolgen gespeichert werden. Der Standardwert ist_corrupt_record
.attributePrefix
: Das Präfix für Attribute, damit Attribute und Elemente unterschieden werden können. Dies ist das Präfix für Feldnamen. Der Standardwert ist_
.valueTag
: Das Tag, das für den Wert verwendet wird, wenn Attribute in einem Element ohne untergeordnete Elemente enthalten sind. Der Standardwert ist_VALUE
.charset
: Der Standardwert istUTF-8
kann aber auf andere gültige Zeichensatznamen festgelegt werden.ignoreSurroundingSpaces
: Gibt an, ob Leerzeichen, die Werte umgeben, übersprungen werden sollen. Die Standardeinstellung ist "false".rowValidationXSDPath
: Pfad zu einer XSD-Datei, die verwendet wird, um den XML-Code für jede Zeile zu überprüfen. Zeilen, die nicht validiert werden können, werden wie oben beschrieben als Parse-Fehler behandelt. Die XSD wirkt sich nicht anderweitig auf das bereitgestellte oder abgeleitete Schema aus. Wenn derselbe lokale Pfad nicht bereits auf den Executors im Cluster sichtbar ist, sollten die XSD und alle anderen, von der sie abhängig sind, mit SparkContext.addFile den Spark-Executors hinzugefügt werden. In diesem Fall müssen Sie die lokale XSD/foo/bar.xsd
verwenden,addFile("/foo/bar.xsd")
aufrufen und"bar.xsd"
alsrowValidationXSDPath
übergeben.
- Schreiben
path
: Speicherort zum Schreiben von Dateien.rowTag
: Das Zeilentag, das als Zeile behandelt werden soll. In dieser XML-Datei<books><book><book>...</books>
wäre der Wert zum Beispielbook
. Der Standardwert istROW
.rootTag
: Das Stammtag, das als Stamm behandelt werden soll. In dieser XML-Datei<books><book><book>...</books>
wäre der Wert zum Beispielbooks
. Der Standardwert istROWS
.nullValue
: Der zu schreibendenull
-Wert. Der Standardwert ist die Zeichenfolge"null"
. Bei"null"
werden keine Attribute und Elemente für Felder geschrieben.attributePrefix
: Das Präfix für Attribute, um Attribute und Elemente zu unterscheiden. Dies ist das Präfix für Feldnamen. Der Standardwert ist_
.valueTag
: Das Tag, das für den Wert verwendet wird, wenn Attribute in einem Element ohne untergeordnete Elemente enthalten sind. Der Standardwert ist_VALUE
.compression
: Komprimierungscodec, der beim Speichern in einer Datei verwendet werden soll. Sollte der vollqualifizierte Name einer implementierenden Klasseorg.apache.hadoop.io.compress.CompressionCodec
oder einer der von Groß- und Kleinschreibung unabhängigen Kurznamen (bzip2
,gzip
,lz4
undsnappy
) sein. Der Standardwert ist keine Komprimierung.
Unterstützt die verkürzte Namensverwendung. Sie können xml
anstelle von com.databricks.spark.xml
verwenden.
XSD-Support
Sie können einzelne Zeilen gegen ein XSD-Schema validieren, indem Sie rowValidationXSDPath
verwenden.
Sie verwenden das Hilfsprogramm com.databricks.spark.xml.util.XSDToSchema
, um ein Spark DataFrame-Schema aus einigen XSD-Dateien zu extrahieren. Es unterstützt nur einfache, komplexe und sequentielle Typen, nur grundlegende XSD-Funktionen und ist experimentell.
import com.databricks.spark.xml.util.XSDToSchema
import java.nio.file.Paths
val schema = XSDToSchema.read(Paths.get("/path/to/your.xsd"))
val df = spark.read.schema(schema)....xml(...)
Parsen von verschachteltem XML
Obwohl in erster Linie verwendet, um eine XML-Datei in einen DataFrame zu konvertieren, können Sie auch die Methode from_xml
verwenden, um XML in einer String-bewerteten Spalte in einem bestehenden DataFrame zu parsen und es als eine neue Spalte mit geparsten Ergebnissen als eine Struktur hinzuzufügen:
import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._
val df = ... /// DataFrame with XML in column 'payload'
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))
Hinweis
mode
:- Wenn der Standardwert auf
PERMISSIVE
gesetzt ist, wird der Parse-Modus stattdessen aufDROPMALFORMED
gesetzt. Wenn Sie eine Spalte im Schema fürfrom_xml
enthalten, diecolumnNameOfCorruptRecord
entspricht, gibt der ModusPERMISSIVE
falsch formatierte Datensätze an diese Spalte in der resultierenden Struktur aus. - Wenn diese Methode auf
DROPMALFORMED
festgelegt ist, führen XML-Werte, die nicht ordnungsgemäß analysiert werden, zu einem Wertnull
für die Spalte. Es werden keine Zeilen gelöscht.
- Wenn der Standardwert auf
from_xml
konvertiert Arrays von Zeichenfolgen, die XML enthalten, in Arrays von analysierten Strukturen. Verwenden Sie stattdessenschema_of_xml_array
.from_xml_string
ist eine Alternative zur Verwendung in UDFs, die direkt auf einer Zeichenfolge statt auf einer Spalte arbeitet.
Konvertierungsregeln
Aufgrund struktureller Unterschiede zwischen DataFrames und XML gibt es einige Konvertierungsregeln von XML-Daten in DataFrame und von DataFrame in XML-Daten. Sie können die Behandlung von Attributen mit der Option excludeAttribute
deaktivieren.
Konvertieren von XML in DataFrame
Attribute: Attribute werden als Felder mit dem in der Option
attributePrefix
angegebenen Präfix konvertiert. WennattributePrefix
_
ist, produziert das Dokument<one myOneAttrib="AAAA"> <two>two</two> <three>three</three> </one>
das Schema:
root |-- _myOneAttrib: string (nullable = true) |-- two: string (nullable = true) |-- three: string (nullable = true)
Wenn ein Element Attribute, aber keine untergeordneten Elemente enthält, wird der Attributwert in ein separates Feld in der Option
valueTag
angegeben. WennvalueTag
_VALUE
ist, produziert das Dokument<one> <two myTwoAttrib="BBBBB">two</two> <three>three</three> </one>
das Schema:
root |-- two: struct (nullable = true) | |-- _VALUE: string (nullable = true) | |-- _myTwoAttrib: string (nullable = true) |-- three: string (nullable = true)
Konvertieren von DataFrame in XML
Das Schreiben einer XML-Datei aus DataFrame mit einem Feld ArrayType
mit seinem Element ArrayType
würde ein zusätzliches geschachtelte Feld für das Element haben. Dies geschieht nicht beim Lesen und Schreiben von XML-Daten, sondern beim Schreiben eines aus anderen Quellen gelesenen DataFrames. Daher hat roundtrip beim Lesen und Schreiben von XML-Dateien dieselbe Struktur, aber das Schreiben eines aus anderen Quellen gelesenen DataFrames kann eine andere Struktur haben.
Ein DataFrame mit dem Schema:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
und den Daten:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
produziert die XML-Datei:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>