Share via


XML-bestanden lezen en schrijven

Belangrijk

Deze functie is beschikbaar als openbare preview.

In dit artikel wordt beschreven hoe u XML-bestanden leest en schrijft.

Extensible Markup Language (XML) is een opmaaktaal voor het opmaken, opslaan en delen van gegevens in tekstindeling. Hiermee definieert u een set regels voor het serialiseren van gegevens, variërend van documenten tot willekeurige gegevensstructuren.

Systeemeigen XML-bestandsindeling biedt ondersteuning voor opname, query's en parseren van XML-gegevens voor batchverwerking of streaming. Het kan automatisch schema- en gegevenstypen afleiden en ontwikkelen, ONDERSTEUNT SQL-expressies zoals from_xmlen kan XML-documenten genereren. Het vereist geen externe jar's en werkt naadloos met Auto Loader, read_files en COPY INTO.

Vereisten

Databricks Runtime 14.3 en hoger

XML-records parseren

XML-specificatie vereist een goed gevormde structuur. Deze specificatie wordt echter niet onmiddellijk toegewezen aan een tabellaire indeling. U moet de rowTag optie opgeven om het XML-element aan te geven dat wordt toegewezen aan een DataFrameRow. Het rowTag element wordt het hoogste niveau struct. De onderliggende elementen worden rowTag de velden van het hoogste niveau struct.

U kunt het schema voor deze record opgeven of deze automatisch laten afleiden. Omdat de parser alleen de rowTag elementen onderzoekt, worden DTD en externe entiteiten uitgefilterd.

In de volgende voorbeelden ziet u schemadeductie en parsering van een XML-bestand met behulp van verschillende rowTag opties:

Python

xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""

xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)

Scala

val xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)

Lees het XML-bestand met rowTag de optie 'boeken':

Python

df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)

Scala

val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)

Uitvoer:

root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)

+------------------------------------------------------------------------------+
|book                                                                          |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+

Lees het XML-bestand met rowTag als 'boek':

Python

df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:

Scala

val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:

Uitvoer:

root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)

+-----+-----------+---------------+
|_id  |author     |title          |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+

Opties voor gegevensbronnen

Opties voor gegevensbronnen voor XML kunnen op de volgende manieren worden opgegeven:

Zie Opties voor automatisch laden voor een lijst met opties.

XSD-ondersteuning

U kunt desgewenst elke XML-record op rijniveau valideren door een XML-schemadefinitie (XSD). Het XSD-bestand wordt opgegeven in de rowValidationXSDPath optie. De XSD heeft verder geen invloed op het opgegeven of afgeleid schema. Een record die mislukt door de validatie, wordt gemarkeerd als beschadigd en verwerkt op basis van de optie voor het verwerken van beschadigde records die in de optiesectie wordt beschreven.

U kunt een Spark DataFrame-schema uit XSDToSchema een XSD-bestand extraheren. Het ondersteunt alleen eenvoudige, complexe en reekstypen en biedt alleen ondersteuning voor eenvoudige XSD-functionaliteit.

import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path

val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
  <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="book">
      <xs:complexType>
        <xs:sequence>
          <xs:element name="author" type="xs:string" />
          <xs:element name="title" type="xs:string" />
          <xs:element name="genre" type="xs:string" />
          <xs:element name="price" type="xs:decimal" />
          <xs:element name="publish_date" type="xs:date" />
          <xs:element name="description" type="xs:string" />
        </xs:sequence>
        <xs:attribute name="id" type="xs:string" use="required" />
      </xs:complexType>
    </xs:element>
  </xs:schema>"""

dbutils.fs.put(xsdPath, xsdString, true)

val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))

In de volgende tabel ziet u de conversie van XSD-gegevenstypen naar Spark-gegevenstypen:

XSD-gegevenstypen Spark-gegevenstypen
boolean BooleanType
decimal DecimalType
unsignedLong DecimalType(38, 0)
double DoubleType
float FloatType
byte ByteType
short, unsignedByte ShortType
integer, , negativeIntegernonNegativeInteger, nonPositiveInteger, , , positiveIntegerunsignedShort IntegerType
long, unsignedInt LongType
date DateType
dateTime TimestampType
Others StringType

Geneste XML parseren

XML-gegevens in een kolom met tekenreekswaarden in een bestaand DataFrame kunnen worden geparseerd met schema_of_xml en from_xml die het schema en de geparseerde resultaten als nieuwe struct kolommen retourneren. XML-gegevens die als argument schema_of_xml worden doorgegeven en from_xml moeten één goed opgemaakt XML-record zijn.

schema_of_xml

Syntaxis

schema_of_xml(xmlStr [, options] )

Argumenten

  • xmlStr: Een TEKENREEKS-expressie die één goed opgemaakt XML-record aangeeft.
  • options: Een optionele letterlijke MAP<STRING,STRING> waarde die instructies aangeeft.

Retouren

Een TEKENREEKS met een definitie van een struct met n velden met tekenreeksen waarvan de kolomnamen zijn afgeleid van het XML-element en de kenmerknamen. De veldwaarden bevatten de afgeleide, opgemaakte SQL-typen.

from_xml

Syntaxis

from_xml(xmlStr, schema [, options])

Argumenten

  • xmlStr: Een TEKENREEKS-expressie die één goed opgemaakt XML-record aangeeft.
  • schema: Een TEKENREEKS-expressie of aanroep van de schema_of_xml functie.
  • options: Een optionele letterlijke MAP<STRING,STRING> waarde die instructies aangeeft.

Retouren

Een struct met veldnamen en -typen die overeenkomen met de schemadefinitie. Het schema moet worden gedefinieerd als door komma's gescheiden kolomnaam en gegevenstypeparen zoals wordt gebruikt in, bijvoorbeeld CREATE TABLE. De meeste opties die worden weergegeven in de opties voor gegevensbronnen zijn van toepassing met de volgende uitzonderingen:

  • rowTag: Omdat er slechts één XML-record is, is de rowTag optie niet van toepassing.
  • mode (standaard: PERMISSIVE): Hiermee kan een modus worden gebruikt voor het verwerken van beschadigde records tijdens het parseren.
    • PERMISSIVE: Wanneer deze voldoet aan een beschadigde record, plaatst u de ongeldige tekenreeks in een veld dat is geconfigureerd door columnNameOfCorruptRecorden stelt u onjuiste velden in op null. Als u beschadigde records wilt behouden, kunt u een tekenreekstypeveld instellen met de naam columnNameOfCorruptRecord in een door de gebruiker gedefinieerd schema. Als een schema het veld niet heeft, worden beschadigde records verwijderd tijdens het parseren. Bij het uitstellen van een schema wordt impliciet een columnNameOfCorruptRecord veld in een uitvoerschema toegevoegd.
    • FAILFAST: Genereert een uitzondering wanneer deze voldoet aan beschadigde records.

Structuurconversie

Vanwege de structuurverschillen tussen DataFrame en XML zijn er enkele conversieregels van XML-gegevens naar DataFrame en van DataFrame naar XML-gegevens. Houd er rekening mee dat afhandelingskenmerken kunnen worden uitgeschakeld met de optie excludeAttribute.

Conversie van XML naar DataFrame

Kenmerken: Kenmerken worden geconverteerd als velden met het kopvoorvoegsel attributePrefix.

<one myOneAttrib="AAAA">
  <two>two</two>
  <three>three</three>
</one>

produceert hieronder een schema:

root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)

Tekengegevens in een element met kenmerken of onderliggende elementen: deze worden geparseerd in het valueTag veld. Als er meerdere exemplaren van tekengegevens zijn, wordt het valueTag veld geconverteerd naar een array type.

<one>
  <two myTwoAttrib="BBBBB">two</two>
  some value between elements
  <three>three</three>
  some other value between elements
</one>

produceert hieronder een schema:

root
 |-- _VALUE: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- two: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)

Conversie van DataFrame naar XML

Element als een matrix in een matrix: een XML-bestand schrijven van DataFrame een veld ArrayType met het bijbehorende element, zoals ArrayType een extra geneste veld voor het element zou hebben. Dit gebeurt niet bij het lezen en schrijven van XML-gegevens, maar het schrijven van een DataFrame leesbewerking uit andere bronnen. Daarom heeft roundtrip in het lezen en schrijven van XML-bestanden dezelfde structuur, maar het schrijven van een DataFrame leesbewerking uit andere bronnen is mogelijk om een andere structuur te hebben.

DataFrame met een schema hieronder:

|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)

en met de onderstaande gegevens:

+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

produceert hieronder een XML-bestand:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>

De elementnaam van de niet-benoemde matrix in de DataFrame matrix wordt opgegeven door de optie arrayElementName (standaard: item).

Kolom met geredde gegevens

De kolom met geredde gegevens zorgt ervoor dat u nooit gegevens kwijtraakt of mist tijdens ETL. U kunt de opgeslagen gegevenskolom inschakelen om gegevens vast te leggen die niet zijn geparseerd omdat een of meer velden in een record een van de volgende problemen hebben:

  • Afwezig uit het opgegeven schema
  • Komt niet overeen met het gegevenstype van het opgegeven schema
  • Komt niet overeen met de veldnamen in het opgegeven schema

De kolom met geredde gegevens wordt geretourneerd als een JSON-document met de kolommen die zijn gered en het bronbestandspad van de record. Als u het bronbestandspad uit de kolom met geredde gegevens wilt verwijderen, kunt u de volgende SQL-configuratie instellen:

Python

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

Scala

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").

U kunt de kolom met geredde gegevens inschakelen door de optie rescuedDataColumn in te stellen op een kolomnaam bij het lezen van gegevens, zoals _rescued_data bij spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>).

De XML-parser ondersteunt drie modi bij het parseren van records: PERMISSIVE, DROPMALFORMEDen FAILFAST. Wanneer gegevenstypen samen met rescuedDataColumnelkaar worden gebruikt, komen records niet overeen in DROPMALFORMED de modus of veroorzaken ze een fout in FAILFAST de modus. Alleen beschadigde records (onvolledige of onjuiste XML) worden verwijderd of fouten veroorzaakt.

Schemadeductie en evolutie in Auto Loader

Zie Schemadeductie en evolutie configureren in AutoLoader voor een gedetailleerde bespreking van dit onderwerp en de toepasselijke opties. U kunt automatisch laden configureren om het schema van geladen XML-gegevens automatisch te detecteren, zodat u tabellen kunt initialiseren zonder expliciet het gegevensschema te declareren en het tabelschema te ontwikkelen naarmate er nieuwe kolommen worden geïntroduceerd. Hierdoor hoeft u geen schemawijzigingen handmatig bij te houden en toe te passen in de loop van de tijd.

Standaard wordt in deductie van het automatisch laadprogramma gezocht om problemen met de ontwikkeling van schema's te voorkomen als gevolg van niet-overeenkomende typen. Voor indelingen die geen gegevenstypen coderen (JSON, CSV en XML), worden alle kolommen als tekenreeksen afgeleid, inclusief geneste velden in XML-bestanden. Apache Spark DataFrameReader gebruikt een ander gedrag voor schemadeductie en selecteert gegevenstypen voor kolommen in XML-bronnen op basis van voorbeeldgegevens. Als u dit gedrag wilt inschakelen met automatisch laden, stelt u de optie cloudFiles.inferColumnTypes in op true.

Auto Loader detecteert de toevoeging van nieuwe kolommen terwijl deze uw gegevens verwerkt. Wanneer automatisch laden een nieuwe kolom detecteert, stopt de stroom met een UnknownFieldException. Voordat uw stream deze fout genereert, voert Auto Loader schemadeductie uit op de meest recente microbatch met gegevens en werkt de schemalocatie bij met het nieuwste schema door nieuwe kolommen samen te voegen aan het einde van het schema. De gegevenstypen van bestaande kolommen blijven ongewijzigd. Auto Loader ondersteunt verschillende modi voor de ontwikkeling van schema's, die u in de optie cloudFiles.schemaEvolutionModeinstelt.

U kunt schemahints gebruiken om de schema-informatie af te dwingen die u kent en verwacht in een afgeleid schema. Wanneer u weet dat een kolom van een specifiek gegevenstype is of als u een meer algemeen gegevenstype wilt kiezen (bijvoorbeeld een dubbele waarde in plaats van een geheel getal), kunt u een willekeurig aantal hints voor kolomgegevenstypen opgeven als een tekenreeks met de syntaxis van de SQL-schemaspecificatie. Wanneer de kolom met geredde gegevens is ingeschakeld, worden velden met de naam in een ander geval dan dat van het schema geladen in de _rescued_data kolom. U kunt dit gedrag wijzigen door de optie readerCaseSensitive in te falsestellen op, in welk geval Automatisch laadprogramma gegevens op een niet-hoofdlettergevoelige manier leest.

Voorbeelden

In de voorbeelden in deze sectie wordt een XML-bestand gebruikt dat kan worden gedownload in de Apache Spark GitHub-opslagplaats.

XML lezen en schrijven

Python

df = (spark.read
  .format('xml')
  .options(rowTag='book')
  .load(xmlPath))  # books.xml

selected_data = df.select("author", "_id")
(selected_data.write
  .options(rowTag='book', rootTag='books')
  .xml('newbooks.xml'))

Scala

val df = spark.read
  .option("rowTag", "book")
  .xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("newbooks.xml")

R

df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

U kunt het schema handmatig opgeven bij het lezen van gegevens:

Python

from pyspark.sql.types import StructType, StructField, StringType, DoubleType

custom_schema = StructType([
    StructField("_id", StringType(), True),
    StructField("author", StringType(), True),
    StructField("description", StringType(), True),
    StructField("genre", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("publish_date", StringType(), True),
    StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)

selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')

Scala

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")

R

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

SQL-API

XML-gegevensbron kan gegevenstypen afleiden:

DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;

U kunt ook kolomnamen en -typen opgeven in DDL. In dit geval wordt het schema niet automatisch afgeleid.

DROP TABLE IF EXISTS books;

CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");

XML laden met COPY INTO

DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;

COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');

XML lezen met rijvalidatie

Python

df = (spark.read
    .format("xml")
    .option("rowTag", "book")
    .option("rowValidationXSDPath", xsdPath)
    .load(inputPath))
df.printSchema()

Scala

val df = spark.read
  .option("rowTag", "book")
  .option("rowValidationXSDPath", xsdPath)
  .xml(inputPath)
df.printSchema

Geneste XML parseren (from_xml en schema_of_xml)

Python

from pyspark.sql.functions import from_xml, schema_of_xml, lit, col

xml_data = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>
"""

df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()

Scala

import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}

val xmlData = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>""".stripMargin

val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()

from_xml en schema_of_xml met SQL API

SELECT from_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>',
  schema_of_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>')
);

XML laden met automatisch laadprogramma

Python

query = (spark
  .readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "xml")
    .option("rowTag", "book")
    .option("cloudFiles.inferColumnTypes", True)
    .option("cloudFiles.schemaLocation", schemaPath)
    .option("cloudFiles.schemaEvolutionMode", "rescue")
    .load(inputPath)
    .writeStream
    .format("delta")
    .option("mergeSchema", "true")
    .option("checkpointLocation", checkPointPath)
    .trigger(Trigger.AvailableNow()))

query = query.start(outputPath).awaitTermination()
df = spark.read.format("delta").load(outputPath)
df.show()

Scala

val query = spark
.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", true)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .format("delta")
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(Trigger.AvailableNow())

query.start(outputPath).awaitTermination()
val df = spark.read.format("delta").load(outputPath)
df.show()

Aanvullende bronnen

XML-gegevens lezen en schrijven met behulp van de spark-XML-bibliotheek