Sdílet prostřednictvím


Čtení a zápis dat XML pomocí spark-xml knihovny

Důležité

Tato dokumentace byla vyřazena a nemusí být aktualizována. Produkty, služby nebo technologie uvedené v obsahu nejsou oficiálně schváleny ani testovány službou Databricks.

Podpora nativního formátu souboru XML je k dispozici ve verzi Public Preview. Viz Čtení a zápis souborů XML.

Tento článek popisuje, jak číst a zapisovat soubor XML jako zdroj dat Apache Spark.

Požadavky

  1. Vytvořte knihovnu spark-xml jako knihovnu Maven. Pro souřadnici Maven zadejte:

    • Databricks Runtime 7.x a novější: com.databricks:spark-xml_2.12:<release>

    Podívejte se spark-xml na vydání nejnovější verze <release>.

  2. Nainstalujte knihovnu do clusteru.

Příklad

Příklad v této části používá soubor XML knih .

  1. Načtení souboru XML knih:

    $ wget https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml
    
  2. Nahrajte soubor do DBFS.

Čtení a zápis dat XML

SQL

/*Infer schema*/

CREATE TABLE books
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")

/*Specify column names and types*/

CREATE TABLE books (author string, description string, genre string, _id string, price double, publish_date string, title string)
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")

Scala

// Infer schema

import com.databricks.spark.xml._ // Add the DataFrame.read.xml() method

val df = spark.read
  .option("rowTag", "book")
  .xml("dbfs:/books.xml")

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("dbfs:/newbooks.xml")

// Specify schema

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))

val df = spark.read
  .option("rowTag", "book")
  .schema(customSchema)
  .xml("books.xml")

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("dbfs:/newbooks.xml")

R

# Infer schema

library(SparkR)

sparkR.session("local[4]", sparkPackages = c("com.databricks:spark-xml_2.12:<release>"))

df <- read.df("dbfs:/books.xml", source = "xml", rowTag = "book")

# Default `rootTag` and `rowTag`
write.df(df, "dbfs:/newbooks.xml", "xml")

# Specify schema

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- read.df("dbfs:/books.xml", source = "xml", schema = customSchema, rowTag = "book")

# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
write.df(df, "dbfs:/newbooks.xml", "xml", "overwrite")

Možnosti

  • Číst
    • path: Umístění souborů XML. Přijímá standardní výrazy globbingu Hadoop.
    • rowTag: Značka řádku, která se má považovat za řádek. Například v tomto XML <books><book><book>...</books>by hodnota byla book. Výchozí hodnota je ROW.
    • samplingRatio: Poměr vzorkování pro odvození schématu (0,0 ~ 1). Výchozí hodnota je 1. Možné typy jsou StructType, , StringTypeDoubleTypeLongTypeBooleanTypeArrayTypeTimestampType a NullType, pokud nezadáte schéma.
    • excludeAttribute: Zda vyloučit atributy v prvcích. Výchozí hodnota je False.
    • nullValue: Hodnota, která se má považovat za null hodnotu. Výchozí hodnota je "".
    • mode: Režim práce s poškozenými záznamy. Výchozí hodnota je PERMISSIVE.
      • PERMISSIVE:
        • Když narazí na poškozený záznam, nastaví všechna pole na null a umístí poškozený řetězec do nového pole nakonfigurovaného uživatelem columnNameOfCorruptRecord.
        • Když narazí na pole nesprávného datového typu, nastaví pole pro přesměrování na nullhodnotu .
      • DROPMALFORMED: ignoruje poškozené záznamy.
      • FAILFAST: vyvolá výjimku, když zjistí poškozené záznamy.
    • inferSchema: Pokud truese pokusí odvodit odpovídající typ pro každý výsledný sloupec datového rámce, například logický, číselný nebo datový typ. Pokud falsejsou všechny výsledné sloupce typu řetězce. Výchozí hodnota je true.
    • columnNameOfCorruptRecord: Název nového pole, ve kterém jsou uloženy poškozené řetězce. Výchozí hodnota je _corrupt_record.
    • attributePrefix: Předpona atributů tak, aby se odlišily atributy a prvky. Toto je předpona pro názvy polí. Výchozí hodnota je _.
    • valueTag: Značka použitá pro hodnotu, pokud jsou atributy v elementu, který neobsahuje žádné podřízené prvky. Výchozí hodnota je _VALUE.
    • charset: Výchozí nastavení UTF-8 je ale možné nastavit na jiné platné názvy znakových sad.
    • ignoreSurroundingSpaces: Zda se mají vynechat prázdné znaky okolních hodnot. Výchozí hodnota je False.
    • rowValidationXSDPath: Cesta k souboru XSD, který slouží k ověření XML pro každý řádek. Řádky, které se nepodaří ověřit, se považují za parsované chyby jako výše. XSD jinak nemá vliv na zadané schéma nebo odvozené. Pokud stejná místní cesta ještě není viditelná u exekutorů v clusteru, pak XSD a všechny ostatní, na které závisí, by měly být přidány do exekutorů Spark pomocí SparkContext.addFile. V tomto případě použít místní XSD /foo/bar.xsd, volání addFile("/foo/bar.xsd") a předání "bar.xsd" jako rowValidationXSDPath.
  • Psát
    • path: Umístění pro zápis souborů.
    • rowTag: Značka řádku, která se má považovat za řádek. Například v tomto XML <books><book><book>...</books>by hodnota byla book. Výchozí hodnota je ROW.
    • rootTag: Kořenová značka, která má být považována za kořen. Například v tomto XML <books><book><book>...</books>by hodnota byla books. Výchozí hodnota je ROWS.
    • nullValue: Hodnota pro zápis null hodnoty. Výchozí hodnota je řetězec "null". Pokud "null", nezapisuje atributy a prvky pro pole.
    • attributePrefix: Předpona atributů k rozlišení atributů a prvků. Toto je předpona pro názvy polí. Výchozí hodnota je _.
    • valueTag: Značka použitá pro hodnotu, pokud jsou atributy v elementu, který neobsahuje žádné podřízené prvky. Výchozí hodnota je _VALUE.
    • compression: Komprese kodeku, který se použije při ukládání do souboru. Měl by být plně kvalifikovaný název třídy implementující org.apache.hadoop.io.compress.CompressionCodec nebo jeden z krátkých názvů bez rozlišování velkých a malých písmen (bzip2, gzip, lz4a snappy). Výchozí hodnota není komprese.

Podporuje zkrácené využití názvů; Místo toho můžete použít xml com.databricks.spark.xml.

Podpora XSD

Jednotlivé řádky můžete ověřit pomocí schématu XSD.rowValidationXSDPath

Pomocí nástroje com.databricks.spark.xml.util.XSDToSchema extrahujete schéma datového rámce Sparku z některých souborů XSD. Podporuje pouze jednoduché, složité a sekvenční typy, pouze základní funkce XSD a je experimentální.

import com.databricks.spark.xml.util.XSDToSchema
import java.nio.file.Paths

val schema = XSDToSchema.read(Paths.get("/path/to/your.xsd"))
val df = spark.read.schema(schema)....xml(...)

Parsování vnořeného XML

I když se primárně používá k převodu souboru XML na datový rámec, můžete také použít metodu from_xml parsování XML ve sloupci s řetězcovou hodnotou v existujícím datovém rámci a přidat ho jako nový sloupec s parsovanými výsledky jako strukturou s:

import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._

val df = ... /// DataFrame with XML in column 'payload'
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))

Poznámka:

  • mode:
    • Pokud je nastavená hodnota PERMISSIVE, výchozí režim analýzy místo toho nastaví DROPMALFORMEDhodnotu . Pokud do schématu zahrnete sloupec odpovídající from_xml columnNameOfCorruptRecordtomuto sloupci, PERMISSIVE režim vypíše do výsledné struktury chybné záznamy.
    • Pokud je nastavená hodnota DROPMALFORMED, hodnoty XML, které se správně parsují, mají za null následek hodnotu sloupce. Nezahodí se žádné řádky.
  • from_xml převede pole řetězců obsahujících XML na pole parsovaných struktur. Místo toho použijte schema_of_xml_array.
  • from_xml_string je alternativou pro použití v uživatelem definovaných funkcích, které pracují s řetězcem přímo místo sloupce.

Pravidla převodu

Vzhledem k strukturálním rozdílům mezi datovými rámci a XML existují určitá pravidla převodu z dat XML na datový rámec a z datového rámce na data XML. Pomocí možnosti excludeAttributemůžete zakázat zpracování atributů .

Převod XML na datový rámec

  • Atributy: Atributy se převedou jako pole s předponou zadanou attributePrefix v možnosti. Pokud attributePrefix je _, dokument

    <one myOneAttrib="AAAA">
        <two>two</two>
        <three>three</three>
    </one>
    

    vytvoří schéma:

    root
    |-- _myOneAttrib: string (nullable = true)
    |-- two: string (nullable = true)
    |-- three: string (nullable = true)
    
  • Pokud prvek obsahuje atributy, ale žádné podřízené prvky, hodnota atributu je vložena do samostatného pole určeného valueTag v možnosti. Pokud valueTag je _VALUE, dokument

    <one>
        <two myTwoAttrib="BBBBB">two</two>
        <three>three</three>
    </one>
    

    vytvoří schéma:

    root
    |-- two: struct (nullable = true)
    |    |-- _VALUE: string (nullable = true)
    |    |-- _myTwoAttrib: string (nullable = true)
    |-- three: string (nullable = true)
    

Převod datového rámce na XML

Zápis souboru XML z datového rámce s polem ArrayType s jeho elementem, stejně jako ArrayType by to bylo další vnořené pole pro element. K tomu nedojde při čtení a zápisu dat XML, ale při zápisu datového rámce načteného z jiných zdrojů. Proto zaokrouhlování při čtení a zápisu souborů XML má stejnou strukturu, ale zápis datového rámce načteného z jiných zdrojů je možné mít jinou strukturu.

Datový rámec se schématem:

 |-- a: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

a data:

+------------------------------------+
|                                   a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

vytvoří soubor XML:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>