Leitura e gravação de arquivos XML

Importante

Esse recurso está em uma versão prévia.

Este artigo descreve como fazer a leitura e a gravação de arquivos XML.

Extensible Markup Language (XML) é uma linguagem de marcação para formatação, armazenamento e compartilhamento de dados em formato textual. Ela define um conjunto de regras para serializar dados que variam de documentos a estruturas de dados arbitrárias.

O suporte nativo ao formato de arquivo XML permite a ingestão, a consulta e a análise de dados XML para processamento em lote ou streaming. Ele pode inferir e evoluir automaticamente esquemas e tipos de dados, dá suporte para expressões SQL como from_xml e pode gerar documentos XML. Não requer jars externos e funciona perfeitamente com o Carregador Automático, read_files e COPY INTO.

Requisitos

Azure Databricks Runtime 14.3 e superior

Analisar registros XML

A especificação do XML exige uma estrutura bem formada. Entretanto, essa especificação não é imediatamente mapeada em um formato tabular. Você deve especificar a opção rowTag para indicar o elemento XML que mapeia para um DataFrameRow. O elemento rowTag torna-se o struct de nível superior. Os elementos filhos de rowTag tornam-se os campos do struct de nível superior.

É possível especificar o esquema para esse registro ou permitir que ele seja inferido automaticamente. Como o analisador examina apenas os elementos rowTag, a DTD e as entidades externas são filtradas.

Os exemplos a seguir ilustram a inferência de esquema e a análise de um arquivo XML usando diferentes opções rowTag:

Python

xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""

xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)

Scala

val xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)

Leitura do arquivo XML com a opção rowTag como "livros":

Python

df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)

Scala

val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)

Saída:

root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)

+------------------------------------------------------------------------------+
|book                                                                          |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+

Leitura do arquivo XML com rowTag como "livro":

Python

df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:

Scala

val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:

Saída:

root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)

+-----+-----------+---------------+
|_id  |author     |title          |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+

Opções de fonte de dados

As opções de fonte de dados para XML podem ser especificadas das seguintes maneiras:

Para obter uma lista de opções, veja Opções do carregador automático.

Suporte a XSD

Opcionalmente, é possível validar cada registro XML em nível de linha por meio de uma Definição de Esquema XML (XSD). O arquivo XSD é especificado na opção rowValidationXSDPath. O XSD não afeta o esquema fornecido ou inferido. Um registro com falha na validação é marcado como "corrompido" e tratado com base na opção de modo de tratamento de registros corrompidos descrita na seção de opções.

Você pode usar XSDToSchema para extrair um esquema DataFrame do Spark de um arquivo XSD. Ele tem suporte apenas para tipos simples, complexos e sequenciais, e dá suporte apenas para a funcionalidade básica do XSD.

import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path

val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
  <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="book">
      <xs:complexType>
        <xs:sequence>
          <xs:element name="author" type="xs:string" />
          <xs:element name="title" type="xs:string" />
          <xs:element name="genre" type="xs:string" />
          <xs:element name="price" type="xs:decimal" />
          <xs:element name="publish_date" type="xs:date" />
          <xs:element name="description" type="xs:string" />
        </xs:sequence>
        <xs:attribute name="id" type="xs:string" use="required" />
      </xs:complexType>
    </xs:element>
  </xs:schema>"""

dbutils.fs.put(xsdPath, xsdString, true)

val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))

A tabela a seguir mostra a conversão de tipos de dados XSD em tipos de dados do Spark:

Tipos de dados XSD Tipos de dados do Spark
boolean BooleanType
decimal DecimalType
unsignedLong DecimalType(38, 0)
double DoubleType
float FloatType
byte ByteType
short, unsignedByte ShortType
integer, negativeInteger, nonNegativeInteger, nonPositiveInteger, positiveInteger, unsignedShort IntegerType
long, unsignedInt LongType
date DateType
dateTime TimestampType
Others StringType

Analisar XML aninhado

Os dados XML em uma coluna com valor de cadeia de caracteres em um DataFrame existente podem ser analisados com schema_of_xml e from_xml que retornam o esquema e os resultados analisados como novas colunas struct. Os dados XML passados como argumento para schema_of_xml e from_xml devem ser um único registro XML bem formado.

schema_of_xml

Sintaxe

schema_of_xml(xmlStr [, options] )

Argumentos

  • xmlStr: uma expressão de CADEIA DE CARACTERES que especifica um único registro XML bem formado.
  • options: uma MAP<STRING,STRING> literal opcional especificando diretivas.

Retorna

Uma CADEIA DE CARACTERES que contém uma definição de struct com n campos de cadeias de caracteres em que os nomes das colunas são derivados dos nomes de elementos e atributos XML. Os valores dos campos contêm os tipos SQL formatados derivados.

from_xml

Sintaxe

from_xml(xmlStr, schema [, options])

Argumentos

  • xmlStr: uma expressão de CADEIA DE CARACTERES que especifica um único registro XML bem formado.
  • schema: uma expressão de CADEIA DE CARACTERES ou invocação da função schema_of_xml.
  • options: uma MAP<STRING,STRING> literal opcional especificando diretivas.

Retorna

Uma struct com nomes de campos e tipos correspondentes à definição de esquema. O esquema deve ser definido como pares de nomes de colunas e tipos de dados separados por vírgulas, como usado, por exemplo, em CREATE TABLE. A maioria das opções mostradas nas opções da fonte de dados é aplicável, com as seguintes exceções:

  • rowTag: como há apenas um registro XML, a opção rowTag não é aplicável.
  • mode (padrão: PERMISSIVE): permite um modo para lidar com registros corrompidos durante a análise.
    • PERMISSIVE: quando um registro corrompido e encontrado, a cadeia de caracteres malformada é colocada em um campo configurado por columnNameOfCorruptRecord e os campos malformados são definidos como null. Para manter os registros corrompidos, é possível configurar um campo do tipo cadeia de caracteres chamado columnNameOfCorruptRecord em um esquema definido pelo usuário. Se o esquema não tiver o campo, ele removerá os registros corrompidos durante a análise. Ao inferir um esquema, ele adiciona implicitamente um campo columnNameOfCorruptRecord a um esquema de saída.
    • FAILFAST: lança uma exceção quando encontra registros corrompidos.

Conversão de estrutura

Devido às diferenças de estrutura entre DataFrame e XML, existem algumas regras de conversão de dados XML para DataFrame e de DataFrame para dados XML. Observe que os atributos de tratamento podem ser desabilitados com a opção excludeAttribute.

Conversão de XML para DataFrame

Atributos: os atributos são convertidos como campos com o prefixo de título attributePrefix.

<one myOneAttrib="AAAA">
  <two>two</two>
  <three>three</three>
</one>

produz o esquema abaixo:

root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)

Dados de caracteres em um elemento contendo atributo(s) ou elemento(s) filho(s): Esses dados são analisados no campo valueTag. Se houver várias ocorrências de dados de caracteres, o campo valueTag será convertido em um tipo array.

<one>
  <two myTwoAttrib="BBBBB">two</two>
  some value between elements
  <three>three</three>
  some other value between elements
</one>

produz o esquema abaixo:

root
 |-- _VALUE: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- two: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)

Conversão de DataFrame para XML

Elemento como uma matriz em uma matriz: a gravação de um arquivo XML de DataFrame com um campo ArrayType e seu elemento como ArrayType teria um campo aninhado adicional para o elemento. Isso não aconteceria ao ler e gravar dados XML, mas ao gravar uma leitura DataFrame de outras fontes. Portanto, a ida e volta na leitura e gravação de arquivos XML tem a mesma estrutura, mas a gravação de uma leitura DataFrame de outras fontes pode ter uma estrutura diferente.

DataFrame com um esquema abaixo:

|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)

e com os dados abaixo:

+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

produz um arquivo XML abaixo:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>

O nome do elemento da matriz sem nome em DataFrame é especificado pela opção arrayElementName (Padrão: item).

Coluna de dados resgatados

A coluna de dados resgatados garante que você nunca perca ou ignore dados durante o ETL. Você pode habilitar a coluna de dados resgatados para capturar quaisquer dados que não foram analisados porque um ou mais campos em um registro têm um dos seguintes problemas:

  • Ausente do esquema fornecido
  • Não corresponde ao tipo de dados do esquema fornecido
  • Tem uma incompatibilidade de maiúsculas e minúsculas com os nomes de campo no esquema fornecido

A coluna de dados resgatada é retornada como um documento JSON que contém as colunas que foram resgatadas e o caminho do arquivo de origem do registro. Para remover o caminho do arquivo de origem da coluna de dados resgatados, você pode definir a seguinte configuração SQL:

Python

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

Scala

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").

É possível habilitar a coluna de dados resgatados definindo a opção rescuedDataColumn como um nome de coluna ao ler os dados, como _rescued_data com spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>).

O analisador XML dá suporte para três modos ao analisar registros: PERMISSIVE, DROPMALFORMED e FAILFAST. Quando usado junto a rescuedDataColumn, as incompatibilidades de tipo de dados não fazem com que os registros sejam removidos no modo DROPMALFORMED ou geram um erro no modo FAILFAST. Somente registros corrompidos (XML incompleto ou malformado) são descartados ou geram erros.

Inferência e evolução de esquemas no Carregador Automático

Para obter uma discussão detalhada sobre esse tópico e as opções aplicáveis, consulte Configurar a inferência e evolução do esquema no Carregador Automático. Você pode configurar o Carregador Automático para detectar automaticamente o esquema dos dados XML carregados, permitindo que você inicialize tabelas sem declarar explicitamente o esquema de dados e evolua o esquema da tabela à medida que novas colunas são introduzidas. Isso elimina a necessidade de controlar e aplicar manualmente as alterações de esquema ao longo do tempo.

Por padrão, a inferência de esquema do Carregador Automático busca evitar problemas de evolução do esquema devido a incompatibilidades de tipo. Para formatos que não codificam tipos de dados (JSON, CSV e XML), o Carregador Automático infere todas as colunas como cadeias de caracteres, incluindo campos aninhados em arquivos XML. O Apache Spark DataFrameReader usa um comportamento diferente para inferência de esquema, selecionando tipos de dados para colunas em fontes XML com base em dados de amostra. Para habilitar esse comportamento com o Carregador Automático, defina a opção cloudFiles.inferColumnTypes como true.

O Carregador Automático detecta a adição de novas colunas à medida que processa seus dados. Quando o Carregador Automático detecta uma nova coluna, o fluxo é interrompido com um UnknownFieldException. Antes de o fluxo gerar esse erro, o Carregador Automático executa a inferência de esquema no microlote de dados mais recente e atualiza o local do esquema com o esquema mais recente, mesclando as novas colunas ao final do esquema. Os tipos de dados das colunas existentes permanecem inalterados. O carregador automático dá suporte para diferentes modos de evolução do esquema, que são definidos na opção cloudFiles.schemaEvolutionMode.

Você pode usar as dicas de esquema para impor as informações de esquema que você conhece e espera em um esquema inferido. Quando você sabe que uma coluna é de um tipo de dados específico ou se deseja escolher um tipo de dados mais geral (por exemplo, um duplo em vez de um inteiro), é possível fornecer um número arbitrário de dicas para tipos de dados de coluna como uma cadeia de caracteres usando a sintaxe de especificação do esquema SQL. Quando a coluna de dados resgatados está habilitada, os campos nomeados em um caso diferente daquele do esquema são carregados na coluna _rescued_data. Você pode alterar esse comportamento definindo a opção readerCaseSensitive como false, caso em que o Carregador Automático faz a leitura dos dados sem distinção entre maiúsculas e minúsculas.

Exemplos

Os exemplos desta seção usam um arquivo XML disponível para download no repositório GitHub do Apache Spark.

Ler e gravar XML

Python

df = (spark.read
  .format('xml')
  .options(rowTag='book')
  .load(xmlPath))  # books.xml

selected_data = df.select("author", "_id")
(selected_data.write
  .options(rowTag='book', rootTag='books')
  .xml('newbooks.xml'))

Scala

val df = spark.read
  .option("rowTag", "book")
  .xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("newbooks.xml")

R

df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

Você pode especificar manualmente o esquema ao ler dados:

Python

from pyspark.sql.types import StructType, StructField, StringType, DoubleType

custom_schema = StructType([
    StructField("_id", StringType(), True),
    StructField("author", StringType(), True),
    StructField("description", StringType(), True),
    StructField("genre", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("publish_date", StringType(), True),
    StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)

selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')

Scala

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")

R

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

API do SQL

A fonte de dados XML pode inferir tipos de dados:

DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;

Você também pode especificar nomes e tipos de colunas na DDL. Nesse caso, o esquema não é inferido automaticamente.

DROP TABLE IF EXISTS books;

CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");

Carregar o XML usando COPY INTO

DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;

COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');

Leitura do XML com validação de linha

Python

df = (spark.read
    .format("xml")
    .option("rowTag", "book")
    .option("rowValidationXSDPath", xsdPath)
    .load(inputPath))
df.printSchema()

Scala

val df = spark.read
  .option("rowTag", "book")
  .option("rowValidationXSDPath", xsdPath)
  .xml(inputPath)
df.printSchema

Analisar o XML aninhado (from_xml e schema_of_xml)

Python

from pyspark.sql.functions import from_xml, schema_of_xml, lit, col

xml_data = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>
"""

df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()

Scala

import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}

val xmlData = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>""".stripMargin

val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()

from_xml e schema_of_xml com a API do SQL

SELECT from_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>',
  schema_of_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>')
);

Carregar o XML com o Carregador Automático

Python

query = (spark
  .readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "xml")
    .option("rowTag", "book")
    .option("cloudFiles.inferColumnTypes", True)
    .option("cloudFiles.schemaLocation", schemaPath)
    .option("cloudFiles.schemaEvolutionMode", "rescue")
    .load(inputPath)
    .writeStream
    .format("delta")
    .option("mergeSchema", "true")
    .option("checkpointLocation", checkPointPath)
    .trigger(Trigger.AvailableNow()))

query = query.start(outputPath).awaitTermination()
df = spark.read.format("delta").load(outputPath)
df.show()

Scala

val query = spark
.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", true)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .format("delta")
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(Trigger.AvailableNow())

query.start(outputPath).awaitTermination()
val df = spark.read.format("delta").load(outputPath)
df.show()

Recursos adicionais

Leitura e gravação de dados XML usando a biblioteca spark-xml