Partager via


Lire et écrire des fichiers XML

Important

Cette fonctionnalité est disponible en préversion publique.

Cet article explique comment lire et écrire des fichiers XML.

XML (Extensible Markup Language) est un langage de balisage pour la mise en forme, le stockage et le partage de données au format textuel. Il définit un ensemble de règles pour la sérialisation des données allant de documents à des structures de données arbitraires.

La prise en charge du format de fichier XML natif permet l’ingestion, l’interrogation et l’analyse des données XML pour le traitement par lots ou la diffusion en continu. Il peut déduire et faire évoluer automatiquement le schéma et les types de données, prend en charge des expressions SQL telles que from_xml, et peut générer des documents XML. Il ne nécessite pas de fichiers jar externes et fonctionne en toute transparence avec le chargeur automatique, read_files et COPY INTO. Vous pouvez éventuellement valider chaque enregistrement XML au niveau des lignes par rapport à une définition de schéma XML (XSD).

Spécifications

Databricks Runtime version 14.3 et supérieures

Analyser des enregistrements XML

La spécification XML impose une structure bien formée. Toutefois, cette spécification ne mappe pas immédiatement à un format tabulaire. Vous devez spécifier l’option rowTag permettant d’indiquer l’élément XML mappé à un DataFrameRow. L’élément rowTag devient le niveau supérieur struct. Les éléments enfants de rowTag deviennent les champs du niveau supérieur struct.

Vous pouvez spécifier le schéma de cet enregistrement ou le laisser être déduit automatiquement. Étant donné que l’analyseur examine uniquement les éléments rowTag, les entités DTD et externes sont filtrées.

Les exemples suivants illustrent l’inférence de schéma et l’analyse d’un fichier XML à l’aide de différentes options rowTag :

Python

xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""

xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)

Scala

val xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)

Lisez le fichier XML avec l’option rowTag comme « livres » :

Python

df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)

Scala

val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)

Sortie :

root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)

+------------------------------------------------------------------------------+
|book                                                                          |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+

Lisez le fichier XML avec rowTag comme « livre » :

Python

df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:

Scala

val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:

Sortie :

root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)

+-----+-----------+---------------+
|_id  |author     |title          |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+

Options de source de données

Les options de source de données pour XML peuvent être spécifiées de la manière suivante :

Pour obtenir la liste des options, consultez Options du chargeur automatique.

Prise en charge XSD

Vous pouvez éventuellement valider chaque enregistrement XML au niveau des lignes par une définition de schéma XML (XSD). Le fichier XSD est spécifié dans l’option rowValidationXSDPath. Le XSD n’affecte pas le schéma fourni ou inféré. Un enregistrement qui échoue la validation est marqué comme « endommagé » et traité en fonction de l’option de mode de gestion des enregistrements endommagés décrite dans la section d’option.

Vous pouvez utiliser XSDToSchema pour extraire un schéma DataFrame Spark à partir d’un fichier XSD. Il prend uniquement en charge les types simples, complexes et séquences, et prend uniquement en charge les fonctionnalités XSD de base.

import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path

val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
  <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="book">
      <xs:complexType>
        <xs:sequence>
          <xs:element name="author" type="xs:string" />
          <xs:element name="title" type="xs:string" />
          <xs:element name="genre" type="xs:string" />
          <xs:element name="price" type="xs:decimal" />
          <xs:element name="publish_date" type="xs:date" />
          <xs:element name="description" type="xs:string" />
        </xs:sequence>
        <xs:attribute name="id" type="xs:string" use="required" />
      </xs:complexType>
    </xs:element>
  </xs:schema>"""

dbutils.fs.put(xsdPath, xsdString, true)

val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))

Le tableau suivant montre la conversion de types de données XSD en types de données Spark :

Type de données XSD Types de données Spark
boolean BooleanType
decimal DecimalType
unsignedLong DecimalType(38, 0)
double DoubleType
float FloatType
byte ByteType
short, unsignedByte ShortType
integer, negativeInteger, nonNegativeInteger, nonPositiveInteger, positiveInteger, unsignedShort IntegerType
long, unsignedInt LongType
date DateType
dateTime TimestampType
Others StringType

Analyser les données XML imbriquées

Les données XML d’une colonne à valeur de chaîne dans un DataFrame existant peuvent être analysées avec schema_of_xml et from_xml qui retournent le schéma et les résultats analysés sous forme de nouvelles colonnes struct. Les données XML transmises en tant qu’argument à schema_of_xml et from_xml doivent être un enregistrement XML bien formé.

schema_of_xml

Syntaxe

schema_of_xml(xmlStr [, options] )

Arguments

  • xmlStr : expression STRING spécifiant un enregistrement XML bien formé unique.
  • options : un MAP<STRING,STRING> littéral optionnel spécifiant les directives.

Renvoie

Un STRING contenant une définition d’un struct avec n champs de chaînes où les noms des colonnes sont dérivés de l’élément XML et des noms d’attributs. Les valeurs de champ contiennent les types SQL mis en forme dérivés.

from_xml

Syntaxe

from_xml(xmlStr, schema [, options])

Arguments

  • xmlStr : expression STRING spécifiant un enregistrement XML bien formé unique.
  • schema : expression STRING ou appel de la fonction schema_of_xml.
  • options : un MAP<STRING,STRING> littéral optionnel spécifiant les directives.

Renvoie

Struct dont le nom et le type des champs correspondent à la définition de schéma. Le schéma doit être défini en tant que paires nom de colonne-type de données séparées par des virgules, comme dans CREATE TABLE. La plupart des options affichées dans les options de source de données s’appliquent, à l’exception des éléments suivants :

  • rowTag : étant donné qu’il n’existe qu’un seul enregistrement XML, l’option rowTag n’est pas applicable.
  • mode (PERMISSIVE par défaut) : autorise un mode de traitement des enregistrements endommagés pendant l’analyse.
    • PERMISSIVE : en présence d’un enregistrement endommagé, place la chaîne malformée dans un champ configuré par columnNameOfCorruptRecord et définit les champs malformés sur null. Pour conserver les enregistrements corrompus, vous pouvez définir un champ de type chaîne nommé columnNameOfCorruptRecord dans un schéma défini par l’utilisateur. Si le schéma est dépourvu de ce champ, les enregistrements endommagés sont supprimés au cours de l’analyse. Lors de l’inférence d’un schéma, un champ columnNameOfCorruptRecord est ajouté implicitement dans un schéma de sortie.
    • FAILFAST : lève une exception en présence d’enregistrements endommagés.

Conversion de structure

En raison des différences de structure entre DataFrame et XML, il existe certaines règles de conversion de données XML vers DataFrame et de DataFrame vers les données XML. Notez que la gestion des attributs peut être désactivée avec l’option excludeAttribute.

Conversion de XML en DataFrame

Attributs : les attributs sont convertis en tant que champs avec le préfixe de titre attributePrefix.

<one myOneAttrib="AAAA">
  <two>two</two>
  <three>three</three>
</one>

produit un schéma ci-dessous :

root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)

Données de caractères dans un élément contenant des attributs ou des éléments enfants : Celles-ci sont analysées dans le champ valueTag. S’il existe plusieurs occurrences de données caractères, le champ valueTag est converti en type array.

<one>
  <two myTwoAttrib="BBBBB">two</two>
  some value between elements
  <three>three</three>
  some other value between elements
</one>

produit un schéma ci-dessous :

root
 |-- _VALUE: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- two: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)

Conversion de DataFrame en XML

Élément comme tableau dans un tableau : l’écriture d’un fichier XML à partir de DataFrame avec un champ ArrayType et son élément comme ArrayType aurait un champ imbriqué supplémentaire pour l’élément. Cela ne se produirait pas lors de la lecture et de l’écriture de données XML, mais l’écriture d’un DataFrame lu à partir d’autres sources. Par conséquent, l’aller-retour dans la lecture et l’écriture des fichiers XML a la même structure, mais l’écriture d’un DataFrame lu à partir d’autres sources peut avoir une structure différente.

DataFrame avec un schéma ci-dessous :

|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)

et avec les données ci-dessous :

+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

produit un fichier XML ci-dessous :

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>

Le nom de l’élément du tableau sans nom dans le DataFrame est spécifié par l’option arrayElementName (par défaut : item).

Colonne de données récupérées

La colonne des données récupérées garantit que vous ne perdez pas ou ne manquez pas les données pendant l’opération ETL. Vous pouvez activer la colonne de données récupérées pour capturer les données qui n’ont pas été analysées, car un ou plusieurs champs d’un enregistrement présentent l’un des problèmes suivants :

  • Absent du schéma fourni
  • Ne correspond pas au type de données du schéma fourni
  • A une incompatibilité de casse avec les noms de champs dans le schéma fourni

La colonne des données récupérées est renvoyée sous la forme d’un document JSON contenant les colonnes récupérées, ainsi que le chemin d’accès au fichier source de l’enregistrement. Pour supprimer le chemin du fichier source de la colonne de données récupérées, vous pouvez définir la configuration SQL suivante :

Python

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

Scala

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").

Vous pouvez activer la colonne de données sauvées en définissant l’option rescuedDataColumn sur un nom de colonne, comme _rescued_data avec spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>).

L’analyseur XML prend en charge trois modes d’analyse des enregistrements : PERMISSIVE, DROPMALFORMED et FAILFAST. En cas d’utilisation avec rescuedDataColumn, les discordances de type de données n’entraînent pas de suppression d’enregistrements en mode DROPMALFORMED, ou de génération d’erreur en mode FAILFAST. Seuls les enregistrements endommagés (enregistrements XML malformés ou incomplets) sont annulés ou génèrent des erreurs.

Inférence et évolution de schéma dans le chargeur automatique

Pour une présentation détaillée de cette rubrique et des options applicables, consultez Configurer l’inférence de schéma et l’évolution dans le chargeur automatique. Vous pouvez configurer le chargeur automatique pour détecter automatiquement le schéma des données XML chargées, ce qui vous permet d’initialiser des tables sans déclarer explicitement le schéma de données et de faire évoluer le schéma de table à mesure que de nouvelles colonnes sont introduites. Cela élimine la nécessité de suivre et d’appliquer manuellement les modifications de schéma au fil du temps.

Par défaut, l’inférence de schéma Auto Loader cherche à éviter les problèmes d’évolution du schéma en raison d’incompatibilités de type. Pour les formats qui n’encodent pas les types de données (JSON, CSV et XML), le chargeur automatique déduit toutes les colonnes sous forme de chaînes, y compris les champs imbriqués dans les fichiers XML. Apache Spark DataFrameReader utilise un comportement différent pour l’inférence de schéma, en sélectionnant des types de données pour les colonnes dans des sources XML en fonction d’exemples de données. Pour activer ce comportement avec Auto Loader, définissez l’option cloudFiles.inferColumnTypes sur true.

Auto Loader détecte l’ajout de nouvelles colonnes lors du traitement de vos données. Lorsque le chargeur automatique détecte une nouvelle colonne, le flux s’arrête avec un UnknownFieldException. Avant que votre stream ne génère cette erreur, Auto Loader effectue l’inférence de schéma sur le dernier micro-batch de données et met à jour l’emplacement du schéma avec le schéma le plus récent en fusionnant les nouvelles colonnes à la fin du schéma. Les types de données des colonnes existantes restent inchangés. Le chargeur automatique prend en charge différents modes pour l’évolution des schémas, que vous définissez dans l’option cloudFiles.schemaEvolutionMode.

Vous pouvez utiliser des indicateurs de schéma pour appliquer les informations de schéma que vous connaissez et attendez sur un schéma déduit. Lorsque vous savez qu’une colonne est d’un type de données spécifique, ou si vous voulez choisir un type de données plus général (par exemple, un double au lieu d’un entier), vous pouvez fournir un nombre arbitraire d’indices pour les types de données de colonne sous forme de chaîne en utilisant la syntaxe de spécification de schéma SQL. Lorsque la colonne de données récupérées est activée, les champs nommés dans une casse autre que celle du schéma sont chargés dans la colonne _rescued_data. Vous pouvez modifier ce comportement en définissant l’option readerCaseSensitive sur false, auquel cas le chargeur automatique lit les données sans respect de la casse.

Exemples

Les exemples de cette section utilisent un fichier XML disponible en téléchargement dans le référentiel GitHub Apache Spark.

Lire et écrire XML

Python

df = (spark.read
  .format('xml')
  .options(rowTag='book')
  .load(xmlPath))  # books.xml

selected_data = df.select("author", "_id")
(selected_data.write
  .options(rowTag='book', rootTag='books')
  .xml('newbooks.xml'))

Scala

val df = spark.read
  .option("rowTag", "book")
  .xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("newbooks.xml")

R

df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

Vous pouvez spécifier manuellement le schéma lors de la lecture des données :

Python

from pyspark.sql.types import StructType, StructField, StringType, DoubleType

custom_schema = StructType([
    StructField("_id", StringType(), True),
    StructField("author", StringType(), True),
    StructField("description", StringType(), True),
    StructField("genre", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("publish_date", StringType(), True),
    StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)

selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')

Scala

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")

R

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

API SQL

La source de données XML peut déduire les types de données :

DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;

Vous pouvez également spécifier des noms de colonnes et des types dans DDL. Dans ce cas, le schéma n’est pas déduit automatiquement.

DROP TABLE IF EXISTS books;

CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");

Charger du code XML à l’aide de COPY INTO

DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;

COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');

Lire du code XML avec validation de ligne

Python

df = (spark.read
    .format("xml")
    .option("rowTag", "book")
    .option("rowValidationXSDPath", xsdPath)
    .load(inputPath))
df.printSchema()

Scala

val df = spark.read
  .option("rowTag", "book")
  .option("rowValidationXSDPath", xsdPath)
  .xml(inputPath)
df.printSchema

Analyser le code XML imbriqué (from_xml et schema_of_xml)

Python

from pyspark.sql.functions import from_xml, schema_of_xml, lit, col

xml_data = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>
"""

df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()

Scala

import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}

val xmlData = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>""".stripMargin

val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()

from_xml et schema_of_xml avec l’API SQL

SELECT from_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>',
  schema_of_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>')
);

Charger du code XML avec le chargeur automatique

Python

query = (spark
  .readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "xml")
    .option("rowTag", "book")
    .option("cloudFiles.inferColumnTypes", True)
    .option("cloudFiles.schemaLocation", schemaPath)
    .option("cloudFiles.schemaEvolutionMode", "rescue")
    .load(inputPath)
    .writeStream
    .format("delta")
    .option("mergeSchema", "true")
    .option("checkpointLocation", checkPointPath)
    .trigger(Trigger.AvailableNow()))

query = query.start(outputPath).awaitTermination()
df = spark.read.format("delta").load(outputPath)
df.show()

Scala

val query = spark
.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", true)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .format("delta")
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(Trigger.AvailableNow())

query.start(outputPath).awaitTermination()
val df = spark.read.format("delta").load(outputPath)
df.show()

Ressources supplémentaires

Lire et écrire des données XML à l’aide de la bibliothèque spark-xml