Läsa och skriva XML-filer

Viktigt!

Den här funktionen finns som allmänt tillgänglig förhandsversion.

I den här artikeln beskrivs hur du läser och skriver XML-filer.

Utökningsbart Markup Language (XML) är ett markeringsspråk för formatering, lagring och delning av data i textformat. Den definierar en uppsättning regler för serialisering av data från dokument till godtyckliga datastrukturer.

Stöd för internt XML-filformat möjliggör inmatning, frågekörning och parsning av XML-data för batchbearbetning eller strömning. Den kan automatiskt härleda och utveckla schema- och datatyper, stöder SQL-uttryck som from_xmloch kan generera XML-dokument. Det kräver inte externa burkar och fungerar sömlöst med Auto Loader och read_filesCOPY INTO.

Behov

Databricks Runtime 14.3 och senare

Parsa XML-poster

XML-specifikationen kräver en välformulerad struktur. Den här specifikationen mappas dock inte direkt till tabellformat. Du måste ange alternativet rowTag för att ange XML-elementet som mappar till en DataFrameRow. Elementet rowTag blir den översta nivån struct. De underordnade elementen rowTag i blir fälten på den översta nivån struct.

Du kan ange schemat för den här posten eller låta den härledas automatiskt. Eftersom parsern endast undersöker elementen rowTag filtreras DTD och externa entiteter bort.

Följande exempel illustrerar schemainferens och parsning av en XML-fil med olika rowTag alternativ:

Python

xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""

xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)

Scala

val xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)

Läs XML-filen med rowTag alternativet "books":

Python

df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)

Scala

val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)

Utdata:

root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)

+------------------------------------------------------------------------------+
|book                                                                          |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+

Läs XML-filen med rowTag som "bok":

Python

df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:

Scala

val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:

Utdata:

root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)

+-----+-----------+---------------+
|_id  |author     |title          |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+

Alternativ för datakälla

Datakällans alternativ för XML kan anges på följande sätt:

En lista över alternativ finns i Alternativ för automatisk inläsning.

XSD-stöd

Du kan också verifiera varje XML-post på radnivå med en XML-schemadefinition (XSD). XSD-filen anges i alternativet rowValidationXSDPath . XSD påverkar inte det angivna eller härledda schemat. En post som misslyckas med valideringen markeras som "skadad" och hanteras baserat på alternativet för hantering av skadade poster som beskrivs i alternativavsnittet.

Du kan använda XSDToSchema för att extrahera ett Spark DataFrame-schema från en XSD-fil. Den stöder endast enkla, komplexa och sekvenstyper och stöder endast grundläggande XSD-funktioner.

import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path

val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
  <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="book">
      <xs:complexType>
        <xs:sequence>
          <xs:element name="author" type="xs:string" />
          <xs:element name="title" type="xs:string" />
          <xs:element name="genre" type="xs:string" />
          <xs:element name="price" type="xs:decimal" />
          <xs:element name="publish_date" type="xs:date" />
          <xs:element name="description" type="xs:string" />
        </xs:sequence>
        <xs:attribute name="id" type="xs:string" use="required" />
      </xs:complexType>
    </xs:element>
  </xs:schema>"""

dbutils.fs.put(xsdPath, xsdString, true)

val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))

I följande tabell visas konverteringen av XSD-datatyper till Spark-datatyper:

XSD-datatyper Spark-datatyper
boolean BooleanType
decimal DecimalType
unsignedLong DecimalType(38, 0)
double DoubleType
float FloatType
byte ByteType
short, unsignedByte ShortType
integer, negativeInteger, nonNegativeInteger, nonPositiveInteger, , , positiveIntegerunsignedShort IntegerType
long, unsignedInt LongType
date DateType
dateTime TimestampType
Others StringType

Parsa kapslad XML

XML-data i en strängvärdeskolumn i en befintlig DataFrame kan parsas med schema_of_xml och from_xml som returnerar schemat och de tolkade resultaten som nya struct kolumner. XML-data som skickas som ett argument till schema_of_xml och from_xml måste vara en enda välformulerad XML-post.

schema_of_xml

Syntax

schema_of_xml(xmlStr [, options] )

Argument

  • xmlStr: Ett STRING-uttryck som anger en enda välformulerad XML-post.
  • options: En valfri MAP<STRING,STRING> literal som anger direktiv.

Returer

En STRÄNG som innehåller en definition av en struct med n fält med strängar där kolumnnamnen härleds från XML-elementet och attributnamnen. Fältvärdena innehåller de härledda formaterade SQL-typerna.

from_xml

Syntax

from_xml(xmlStr, schema [, options])

Argument

  • xmlStr: Ett STRING-uttryck som anger en enda välformulerad XML-post.
  • schema: Ett STRING-uttryck eller anrop av schema_of_xml funktionen.
  • options: En valfri MAP<STRING,STRING> literal som anger direktiv.

Returer

En struct med fältnamn och typer som matchar schemadefinitionen. Schemat måste definieras som kommaavgränsat kolumnnamn och datatyppar som används i till exempel CREATE TABLE. De flesta alternativ som visas i alternativen för datakällan gäller med följande undantag:

  • rowTag: Eftersom det bara finns en XML-post är alternativet rowTag inte tillämpligt.
  • mode (standard: PERMISSIVE): Tillåter ett läge för att hantera skadade poster under parsning.
    • PERMISSIVE: När den möter en skadad post placerar du den felaktiga strängen i ett fält som konfigurerats av columnNameOfCorruptRecordoch anger felaktiga fält till null. Om du vill behålla skadade poster kan du ange ett strängtypfält med namnet columnNameOfCorruptRecord i ett användardefinierat schema. Om ett schema inte har fältet släpps skadade poster under parsningen. När du härleder ett schema lägger det implicit till ett columnNameOfCorruptRecord fält i ett utdataschema.
    • FAILFAST: Utlöser ett undantag när det möter skadade poster.

Strukturkonvertering

På grund av strukturskillnaderna mellan DataFrame och XML finns det vissa konverteringsregler från XML-data till DataFrame och från DataFrame till XML-data. Observera att hanteringsattribut kan inaktiveras med alternativet excludeAttribute.

Konvertering från XML till DataFrame

Attribut: Attribut konverteras som fält med rubrikprefixet attributePrefix.

<one myOneAttrib="AAAA">
  <two>two</two>
  <three>three</three>
</one>

skapar ett schema nedan:

root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)

Teckendata i ett element som innehåller attribut eller underordnade element: Dessa parsas i fältet valueTag . Om det finns flera förekomster av teckendata konverteras fältet valueTag till en array typ.

<one>
  <two myTwoAttrib="BBBBB">two</two>
  some value between elements
  <three>three</three>
  some other value between elements
</one>

skapar ett schema nedan:

root
 |-- _VALUE: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- two: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)

Konvertering från DataFrame till XML

Element som en matris i en matris: Skriva en XML-fil från DataFrame att ha ett fält ArrayType med dess element, liksom ArrayType ytterligare ett kapslat fält för elementet. Detta skulle inte inträffa vid läsning och skrivning av XML-data, utan när du skriver en DataFrame läsning från andra källor. Därför har tur och retur i läsning och skrivning av XML-filer samma struktur, men att skriva en DataFrame läsning från andra källor är möjligt att ha en annan struktur.

DataFrame med ett schema nedan:

|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)

och med data nedan:

+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

skapar en XML-fil nedan:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>

Elementnamnet för den namnlösa matrisen DataFrame i anges av alternativet arrayElementName (standard: item).

Datakolumnen Räddad

Den räddade datakolumnen säkerställer att du aldrig förlorar eller går miste om data under ETL. Du kan aktivera den räddade datakolumnen för att samla in data som inte parsats eftersom ett eller flera fält i en post har något av följande problem:

  • Frånvarande från det angivna schemat
  • Matchar inte datatypen för det angivna schemat
  • Har ett ärendematchningsfel med fältnamnen i det angivna schemat

Den räddade datakolumnen returneras som ett JSON-dokument som innehåller kolumnerna som räddades och källfilens sökväg till posten. Om du vill ta bort källfilsökvägen från den räddade datakolumnen kan du ange följande SQL-konfiguration:

Python

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

Scala

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").

Du kan aktivera den räddade datakolumnen genom att ange alternativet rescuedDataColumn till ett kolumnnamn när du läser data, till exempel _rescued_data med spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>).

XML-parsern stöder tre lägen vid parsning av poster: PERMISSIVE, DROPMALFORMEDoch FAILFAST. När de används tillsammans med rescuedDataColumn, orsakar inte datatypsmatchningar att poster tas bort i DROPMALFORMED läge eller utlöser ett fel i FAILFAST läge. Endast skadade poster (ofullständig eller felaktig XML) tas bort eller utlöser fel.

Schemainferens och utveckling i Auto Loader

En detaljerad beskrivning av det här avsnittet och tillämpliga alternativ finns i Konfigurera schemainferens och utveckling i Automatisk inläsning. Du kan konfigurera automatisk inläsning för att automatiskt identifiera schemat för inlästa XML-data, så att du kan initiera tabeller utan att uttryckligen deklarera dataschemat och utveckla tabellschemat när nya kolumner introduceras. Detta eliminerar behovet av att manuellt spåra och tillämpa schemaändringar över tid.

Som standard försöker schemainferensen för automatisk inläsning undvika problem med schemautveckling på grund av typmatchningar. För format som inte kodar datatyper (JSON, CSV och XML) härleder Auto Loader alla kolumner som strängar, inklusive kapslade fält i XML-filer. Apache Spark DataFrameReader använder ett annat beteende för schemainferens och väljer datatyper för kolumner i XML-källor baserat på exempeldata. Om du vill aktivera det här beteendet med Auto Loader anger du alternativet cloudFiles.inferColumnTypes till true.

Automatisk inläsning identifierar tillägg av nya kolumner när dina data bearbetas. När Auto Loader identifierar en ny kolumn stoppas strömmen med en UnknownFieldException. Innan strömmen genererar det här felet utför Auto Loader schemainferens på den senaste mikrobatchen med data och uppdaterar schemaplatsen med det senaste schemat genom att slå samman nya kolumner till slutet av schemat. Datatyperna för befintliga kolumner förblir oförändrade. Auto Loader stöder olika lägen för schemautveckling, som du anger i alternativet cloudFiles.schemaEvolutionMode.

Du kan använda schematips för att framtvinga schemainformationen som du känner till och förväntar dig på ett härledt schema. När du vet att en kolumn är av en specifik datatyp, eller om du vill välja en mer allmän datatyp (till exempel en dubbel i stället för ett heltal), kan du ange ett godtyckligt antal tips för kolumndatatyper som en sträng med sql-schemaspecifikationssyntax. När den räddade datakolumnen är aktiverad läses fält med namnet i ett annat fall än schemat in till _rescued_data kolumnen. Du kan ändra det här beteendet genom att ange alternativet readerCaseSensitive till false, i vilket fall Auto Loader läser data på ett skiftlägesokänsligt sätt.

Exempel

Exemplen i det här avsnittet använder en XML-fil som är tillgänglig för nedladdning på Apache Spark GitHub-lagringsplatsen.

Läsa och skriva XML

Python

df = (spark.read
  .format('xml')
  .options(rowTag='book')
  .load(xmlPath))  # books.xml

selected_data = df.select("author", "_id")
(selected_data.write
  .options(rowTag='book', rootTag='books')
  .xml('newbooks.xml'))

Scala

val df = spark.read
  .option("rowTag", "book")
  .xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("newbooks.xml")

R

df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

Du kan ange schemat manuellt när du läser data:

Python

from pyspark.sql.types import StructType, StructField, StringType, DoubleType

custom_schema = StructType([
    StructField("_id", StringType(), True),
    StructField("author", StringType(), True),
    StructField("description", StringType(), True),
    StructField("genre", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("publish_date", StringType(), True),
    StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)

selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')

Scala

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")

R

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

API för SQL

XML-datakällan kan härleda datatyper:

DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;

Du kan också ange kolumnnamn och typer i DDL. I det här fallet härleds inte schemat automatiskt.

DROP TABLE IF EXISTS books;

CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");

Läs in XML med COPY INTO

DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;

COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');

Läsa XML med radverifiering

Python

df = (spark.read
    .format("xml")
    .option("rowTag", "book")
    .option("rowValidationXSDPath", xsdPath)
    .load(inputPath))
df.printSchema()

Scala

val df = spark.read
  .option("rowTag", "book")
  .option("rowValidationXSDPath", xsdPath)
  .xml(inputPath)
df.printSchema

Parsa kapslad XML (from_xml och schema_of_xml)

Python

from pyspark.sql.functions import from_xml, schema_of_xml, lit, col

xml_data = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>
"""

df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()

Scala

import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}

val xmlData = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>""".stripMargin

val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()

from_xml och schema_of_xml med SQL API

SELECT from_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>',
  schema_of_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>')
);

Läsa in XML med automatisk inläsning

Python

query = (spark
  .readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "xml")
    .option("rowTag", "book")
    .option("cloudFiles.inferColumnTypes", True)
    .option("cloudFiles.schemaLocation", schemaPath)
    .option("cloudFiles.schemaEvolutionMode", "rescue")
    .load(inputPath)
    .writeStream
    .format("delta")
    .option("mergeSchema", "true")
    .option("checkpointLocation", checkPointPath)
    .trigger(Trigger.AvailableNow()))

query = query.start(outputPath).awaitTermination()
df = spark.read.format("delta").load(outputPath)
df.show()

Scala

val query = spark
.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", true)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .format("delta")
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(Trigger.AvailableNow())

query.start(outputPath).awaitTermination()
val df = spark.read.format("delta").load(outputPath)
df.show()

Ytterligare resurser

Läsa och skriva XML-data med spark-xml-biblioteket