Odczytywanie i zapisywanie plików XML

Ważne

Ta funkcja jest dostępna w publicznej wersji zapoznawczej.

W tym artykule opisano sposób odczytywania i zapisywania plików XML.

Extensible Markup Language (XML) to język znaczników do formatowania, przechowywania i udostępniania danych w formacie tekstowym. Definiuje zestaw reguł serializacji danych, od dokumentów do dowolnych struktur danych.

Natywna obsługa formatu plików XML umożliwia pozyskiwanie, wykonywanie zapytań i analizowanie danych XML na potrzeby przetwarzania wsadowego lub przesyłania strumieniowego. Może automatycznie wnioskować i rozwijać schemat i typy danych, obsługuje wyrażenia SQL, takie jak from_xml, i mogą generować dokumenty XML. Nie wymaga on zewnętrznych plików jar i bezproblemowo współpracuje z Auto Loader, read_files i COPY INTO. Opcjonalnie można zweryfikować każdy rekord XML na poziomie wiersza względem definicji schematu XML (XSD).

Wymagania

Środowisko Databricks Runtime w wersji 14.3 lub nowszej

Analizowanie rekordów XML

Specyfikacja XML nakazuje dobrze sformułowaną strukturę. Jednak ta specyfikacja nie przekłada się bezpośrednio na format tabelaryczny. Należy określić opcję rowTag, aby wskazać element XML, który odpowiada elementowi DataFrameRow. Element rowTag staje się najwyższym poziomem struct. Elementy podrzędne rowTag stają się polami najwyższego poziomu struct.

Możesz określić schemat dla tego rekordu lub zezwolić na automatyczne wnioskowanie. Ponieważ parser sprawdza tylko elementy rowTag, deklaracje DTD i encje zewnętrzne są odfiltrowywane.

W poniższych przykładach przedstawiono wnioskowanie schematu i analizowanie pliku XML przy użyciu różnych opcji rowTag:

Python

xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""

xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)

Skala

val xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)

Odczytaj plik XML za pomocą opcji rowTag jako „books”:

Python

df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)

Skala

val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)

Wyjście:

root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)

+------------------------------------------------------------------------------+
|book                                                                          |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+

Odczytaj plik XML, traktując rowTag jako „książka”:

Python

df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:

Skala

val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:

Wyjście:

root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)

+-----+-----------+---------------+
|_id  |author     |title          |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+

Opcje źródła danych

.option() Użyj metod .options() i DataFrameReader i DataFrameWriter , aby skonfigurować źródła danych XML. Aby uzyskać pełną listę obsługiwanych opcji, zobacz DataFrameReader Opcje XML i DataFrameWriter opcje XML.

Obsługa XSD

Opcjonalnie można zweryfikować każdy rekord XML na poziomie wiersza za pomocą definicji schematu XML (XSD). Plik XSD jest określony w rowValidationXSDPath opcji . XSD nie ma w inny sposób wpływu na podany lub wywnioskowany schemat. Rekord, który kończy się niepowodzeniem walidacji, jest oznaczony jako "uszkodzony" i obsługiwany na podstawie opcji trybu obsługi uszkodzonych rekordów opisanych w sekcji opcji.

Możesz użyć XSDToSchema do wyodrębnienia schematu ramki danych Spark z pliku XSD. Obsługuje tylko proste, złożone i sekwencyjne typy i obsługuje tylko podstawowe funkcje XSD.

import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path

val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
  <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="book">
      <xs:complexType>
        <xs:sequence>
          <xs:element name="author" type="xs:string" />
          <xs:element name="title" type="xs:string" />
          <xs:element name="genre" type="xs:string" />
          <xs:element name="price" type="xs:decimal" />
          <xs:element name="publish_date" type="xs:date" />
          <xs:element name="description" type="xs:string" />
        </xs:sequence>
        <xs:attribute name="id" type="xs:string" use="required" />
      </xs:complexType>
    </xs:element>
  </xs:schema>"""

dbutils.fs.put(xsdPath, xsdString, true)

val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))

W poniższej tabeli przedstawiono konwersję typów danych XSD na typy danych platformy Spark:

Typy danych XSD Typy danych platformy Spark
boolean BooleanType
decimal DecimalType
unsignedLong DecimalType(38, 0)
double DoubleType
float FloatType
byte ByteType
short, unsignedByte ShortType
integer, , negativeInteger, nonNegativeInteger, nonPositiveInteger, , positiveIntegerunsignedShort IntegerType
long, unsignedInt LongType
date DateType
dateTime TimestampType
Others StringType

Analizowanie zagnieżdżonego kodu XML

Dane XML w kolumnie typu ciąg wartościowy w istniejącej tabeli danych mogą być analizowane przy użyciu schema_of_xml i from_xml, co zwraca schemat i przeanalizowane wyniki jako nowe kolumny struct. Dane XML przekazywane jako argument do schema_of_xml i from_xml muszą być pojedynczym dobrze sformułowanym rekordem XML.

schema_of_xml

Składnia

schema_of_xml(xmlStr [, options] )

Argumenty

  • xmlStr: wyrażenie STRING określające pojedynczy poprawnie sformułowany rekord XML.
  • options: opcjonalny literał MAP<STRING,STRING> określający dyrektywy.

Zwroty

Ciąg zawierający definicję struktury z n polami ciągów, w których nazwy kolumn pochodzą z elementów XML i nazw atrybutów. Wartości pól przechowują pochodne sformatowane typy SQL.

from_xml

Składnia

from_xml(xmlStr, schema [, options])

Argumenty

  • xmlStr: wyrażenie STRING określające pojedynczy poprawnie sformułowany rekord XML.
  • schema: wyrażenie STRING lub wywołanie funkcji schema_of_xml.
  • options: To opcjonalny literał MAP<STRING,STRING> określający dyrektywy.

Zwroty

Struktura z nazwami pól i typami pasującymi do definicji schematu. Schemat musi być zdefiniowany jako nazwa kolumny rozdzielanej przecinkami i pary typów danych, które są używane na przykład CREATE TABLE. Większość opcji wyświetlanych w opcjach źródła danych ma zastosowanie z następującymi wyjątkami:

  • rowTag: Ponieważ istnieje tylko jeden rekord XML, rowTag opcja nie ma zastosowania.
  • mode (ustawienie domyślne: PERMISSIVE): umożliwia tryb radzenia sobie z uszkodzonymi rekordami podczas analizowania.
    • PERMISSIVE: Gdy napotka uszkodzony rekord, umieszcza nieprawidłowy ciąg w polu skonfigurowanym przez columnNameOfCorruptRecord i ustawia nieprawidłowe pola na null. Aby zachować uszkodzone rekordy, można ustawić pole typu ciągu o nazwie columnNameOfCorruptRecord w schemacie zdefiniowanym przez użytkownika. Jeśli schemat nie ma pola, usuwa uszkodzone rekordy podczas analizowania. Podczas wnioskowania schematu niejawnie dodaje pole columnNameOfCorruptRecord w schemacie wyjściowym.
    • FAILFAST: zgłasza wyjątek, gdy napotyka uszkodzone rekordy.

Konwersja struktury

Ze względu na różnice w strukturze między DataFrame a XML istnieją pewne reguły konwersji danych XML do DataFrame oraz z DataFrame do danych XML. Należy pamiętać, że atrybuty obsługi można wyłączyć za pomocą opcji excludeAttribute.

Konwersja z xml na ramkę danych

Atrybuty: Atrybuty są przekształcane w pola z prefiksem w nagłówku attributePrefix.

<one myOneAttrib="AAAA">
  <two>two</two>
  <three>three</three>
</one>

tworzy poniższy schemat:

root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)

Dane znaków w elemecie zawierającym atrybuty lub elementy podrzędne: są one analizowane w valueTag polu. Jeśli występuje wiele danych znakowych, pole valueTag jest konwertowane do typu array.

<one>
  <two myTwoAttrib="BBBBB">two</two>
  some value between elements
  <three>three</three>
  some other value between elements
</one>

tworzy poniższy schemat:

root
 |-- _VALUE: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- two: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)

Konwersja z ramki danych na XML

Element jako tablica w tablicy: Tworzenie pliku XML z DataFrame zawierającego pole ArrayType, którego element jako ArrayType będzie miał dodatkowe zagnieżdżone pole dla elementu. Nie zdarzyłoby się to podczas odczytu i zapisu danych XML, lecz przy zapisywaniu elementu DataFrame, odczytanego z innych źródeł. W związku z tym odczyt i ponowny zapis plików XML zachowują tę samą strukturę, ale zapis elementu DataFrame, odczytanego z innych źródeł, może mieć inną strukturę.

Ramka danych ze schematem poniżej:

|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)

i z poniższymi danymi:

+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

tworzy poniższy plik XML:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>

Nazwa elementu tablicy bez nazwy w DataFrame jest określana przez opcję arrayElementName (Wartość domyślna: item).

Uratowana kolumna danych

Uratowana kolumna danych gwarantuje, że nigdy nie utracisz ani nie przegapisz danych podczas etl. Możesz włączyć uratowaną kolumnę danych, aby przechwycić wszystkie dane, które nie zostały przeanalizowane, ponieważ co najmniej jedno pole w rekordzie ma jeden z następujących problemów:

  • Brak podanego schematu
  • Nie jest zgodny z typem danych podanego schematu
  • Ma niezgodność wielkości liter z nazwami pól w podanym schemacie

Uratowana kolumna danych jest zwracana jako dokument JSON zawierający kolumny, które zostały uratowane, oraz ścieżkę pliku źródłowego rekordu. Aby usunąć ścieżkę pliku źródłowego z uratowanej kolumny danych, możesz ustawić następującą konfigurację SQL:

Python

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

Skala

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").

Możesz aktywować kolumnę odzyskanych danych, ustawiając opcję rescuedDataColumn na nazwę kolumny podczas odczytu danych, na przykład _rescued_data korzystając z spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>).

Analizator XML obsługuje trzy tryby podczas analizowania rekordów: PERMISSIVE, DROPMALFORMEDi FAILFAST. Gdy jest używane razem z rescuedDataColumn, niezgodności typów danych nie powodują odrzucania rekordów w trybie DROPMALFORMED ani zgłoszenia błędu w trybie FAILFAST. Tylko uszkodzone rekordy (niekompletny lub niepoprawny składniowo XML) są odrzucane lub powodują błędy.

Wnioskowanie schematu i ewolucja w Auto Loaderze

Aby szczegółowo omówić ten temat i odpowiednie opcje, zobacz Konfigurowanie wnioskowania schematu i ewolucji w automatycznym module ładującym. Możesz skonfigurować moduł automatycznego ładowania w celu automatycznego wykrywania schematu załadowanych danych XML, co umożliwia inicjowanie tabel bez jawnego deklarowania schematu danych i rozwijania schematu tabeli w miarę wprowadzania nowych kolumn. Eliminuje to konieczność ręcznego śledzenia i stosowania zmian schematu w czasie.

Domyślnie wnioskowanie schematu przez Auto Loader ma na celu unikanie problemów z ewolucją schematu wynikających z niezgodności typów. W przypadku formatów, które nie kodują typów danych (JSON, CSV i XML), moduł ładujący automatycznie wywnioskuje wszystkie kolumny jako ciągi, w tym pola zagnieżdżone w plikach XML. Usługa Apache Spark DataFrameReader używa innego zachowania do wnioskowania schematu, wybierając typy danych dla kolumn w źródłach XML na podstawie przykładowych danych. Aby włączyć to zachowanie za pomocą modułu automatycznego ładowania, ustaw opcję cloudFiles.inferColumnTypes na true.

Moduł automatycznego ładowania wykrywa dodanie nowych kolumn podczas przetwarzania danych. Gdy funkcja automatycznego ładowania wykryje nową kolumnę, strumień zatrzymuje się z kodem UnknownFieldException. Przed wystąpieniem tego błędu, Auto Loader przeprowadza wnioskowanie schematu na najnowszym mikropakiecie danych i aktualizuje lokalizację schematu przy użyciu najnowszego schematu, dodając nowe kolumny na końcu schematu. Typy danych istniejących kolumn pozostają niezmienione. Moduł automatycznego ładowania obsługuje różne tryby ewolucji schematu, które są ustawiane w opcji cloudFiles.schemaEvolutionMode.

Możesz użyć wskazówek schematu , aby wymusić informacje o schemacie, które znasz i których oczekujesz na wywnioskowanym schemacie. Jeśli wiesz, że kolumna ma określony typ danych lub jeśli chcesz wybrać bardziej ogólny typ danych (na przykład podwójne zamiast liczby całkowitej), możesz podać dowolną liczbę wskazówek dla typów danych kolumn jako ciąg przy użyciu składni specyfikacji schematu SQL. Po włączeniu uratowanej kolumny danych pola o nazwie w przypadku innym niż schemat są ładowane do kolumny _rescued_data. To zachowanie można zmienić, ustawiając opcję readerCaseSensitive na false; wówczas Auto Loader odczytuje dane bez rozróżniania wielkości liter.

Przykłady

Przykłady w tej sekcji używają pliku XML dostępnego do pobrania w repozytorium GitHub platformy Apache Spark.

Odczytywanie i zapisywanie kodu XML

Python

df = (spark.read
  .format('xml')
  .options(rowTag='book')
  .load(xmlPath))  # books.xml

selected_data = df.select("author", "_id")
(selected_data.write
  .options(rowTag='book', rootTag='books')
  .xml('newbooks.xml'))

Skala

val df = spark.read
  .option("rowTag", "book")
  .xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("newbooks.xml")

R

df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

Schemat można określić ręcznie podczas odczytywania danych:

Python

from pyspark.sql.types import StructType, StructField, StringType, DoubleType

custom_schema = StructType([
    StructField("_id", StringType(), True),
    StructField("author", StringType(), True),
    StructField("description", StringType(), True),
    StructField("genre", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("publish_date", StringType(), True),
    StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)

selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')

Skala

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")

R

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

SQL API

Źródło danych XML może wnioskować typy danych:

DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;

Można również określić nazwy i typy kolumn w języku DDL. W takim przypadku schemat nie jest automatycznie wnioskowany.

DROP TABLE IF EXISTS books;

CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");

Ładowanie kodu XML przy użyciu COPY INTO

DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;

COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');

Odczytywanie kodu XML z walidacją wierszy

Python

df = (spark.read
    .format("xml")
    .option("rowTag", "book")
    .option("rowValidationXSDPath", xsdPath)
    .load(inputPath))
df.printSchema()

Skala

val df = spark.read
  .option("rowTag", "book")
  .option("rowValidationXSDPath", xsdPath)
  .xml(inputPath)
df.printSchema

Analizowanie zagnieżdżonego kodu XML (from_xml i schema_of_xml)

Python

from pyspark.sql.functions import from_xml, schema_of_xml, lit, col

xml_data = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>
"""

df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()

Skala

import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}

val xmlData = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>""".stripMargin

val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()

from_xml i schema_of_xml przy użyciu interfejsu API SQL

SELECT from_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>',
  schema_of_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>')
);

Ładowanie kodu XML za pomocą modułu ładującego automatycznego

Python

query = (spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", True)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(availableNow=True)
  .toTable("table_name")
)

Skala

val query = spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", true)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(Trigger.AvailableNow()
  .toTable("table_name")
  )

Dodatkowe zasoby

Odczytywanie i zapisywanie danych XML przy użyciu biblioteki spark-xml