Leggere in inglese

Condividi tramite


Leggere e scrivere dati XML usando la spark-xml libreria

Importante

Questa documentazione è stata ritirata e potrebbe non essere aggiornata. I prodotti, i servizi o le tecnologie menzionati int il suo contenuto non sono ufficialmente approvati o testati da Databricks.

Il supporto del formato di file XML nativo è disponibile come anteprima pubblica. Vedere Leggere e scrivere file XML.

Questo articolo descrive come leggere e scrivere un file XML come origine dati Apache Spark.

Requisiti

  1. Creare la spark-xml libreria come libreria Maven. Per la coordinata Maven, specificare:

    • Databricks Runtime 7.x e versioni successive: com.databricks:spark-xml_2.12:<release>

    Vedere spark-xmlVersioni per la versione più recente di <release>.

  2. Installare la libreria in un cluster.

Esempio

L'esempio in questa sezione utilizza il file XML libri.

  1. Recuperare il file XML dei libri:

    $ wget https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml
    
  2. Caricare il file in DBFS.

Leggere e scrivere dati XML

SQL

/*Infer schema*/

CREATE TABLE books
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")

/*Specify column names and types*/

CREATE TABLE books (author string, description string, genre string, _id string, price double, publish_date string, title string)
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")

Scala

// Infer schema

import com.databricks.spark.xml._ // Add the DataFrame.read.xml() method

val df = spark.read
  .option("rowTag", "book")
  .xml("dbfs:/books.xml")

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("dbfs:/newbooks.xml")

// Specify schema

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))

val df = spark.read
  .option("rowTag", "book")
  .schema(customSchema)
  .xml("books.xml")

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("dbfs:/newbooks.xml")

R

# Infer schema

library(SparkR)

sparkR.session("local[4]", sparkPackages = c("com.databricks:spark-xml_2.12:<release>"))

df <- read.df("dbfs:/books.xml", source = "xml", rowTag = "book")

# Default `rootTag` and `rowTag`
write.df(df, "dbfs:/newbooks.xml", "xml")

# Specify schema

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- read.df("dbfs:/books.xml", source = "xml", schema = customSchema, rowTag = "book")

# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
write.df(df, "dbfs:/newbooks.xml", "xml", "overwrite")

Opzioni

  • Leggi
    • path: percorso dei file XML. Accetta le espressioni globbing standard di Hadoop.
    • rowTag: il tag di riga da trattare come una riga. In questo codice XML <books><book><book>...</books>, ad esempio, il valore sarà book. Il valore predefinito è ROW.
    • samplingRatio: rapporto di campionamento per l'inferenza dello schema (0,0 ~ 1). Il valore predefinito è 1. I tipi possibili sono StructType, ArrayTypeStringType, LongTypeDoubleTypeBooleanTypeTimestampTypee NullType, a meno che non si fornisca uno schema.
    • excludeAttribute: indica se escludere gli attributi negli elementi. Il valore predefinito è false.
    • nullValue: valore da considerare come null. Il valore predefinito è "".
    • mode: modalità per la gestione dei record danneggiati. Il valore predefinito è PERMISSIVE.
      • PERMISSIVE:
        • Quando rileva un record danneggiato, imposta tutti i campi su null e inserisce la stringa in formato non valido in un nuovo campo configurato da columnNameOfCorruptRecord.
        • Quando rileva un campo del tipo di dati errato, imposta il campo che causa l'errore su null.
      • DROPMALFORMED: ignora i record danneggiati.
      • FAILFAST: genera un'eccezione quando rileva record danneggiati.
    • inferSchema: se true, tenta di dedurre un tipo appropriato per ogni colonna dataframe risultante, ad esempio un tipo booleano, numerico o date. Se false, tutte le colonne risultanti sono di tipo stringa. Il valore predefinito è true.
    • columnNameOfCorruptRecord: nome del nuovo campo in cui vengono archiviate stringhe in formato non valido. Il valore predefinito è _corrupt_record.
    • attributePrefix: prefisso per gli attributi in modo da distinguere attributi ed elementi. Si tratta del prefisso per i nomi dei campi. Il valore predefinito è _.
    • valueTag: tag usato per il valore quando sono presenti attributi in un elemento senza elementi figlio. Il valore predefinito è _VALUE.
    • charset: il valore predefinito è UTF-8 ma può essere impostato su altri nomi di set di caratteri validi.
    • ignoreSurroundingSpaces: indica se gli spazi vuoti circostanti devono essere ignorati. Il valore predefinito è false.
    • rowValidationXSDPath: percorso di un file XSD usato per convalidare il codice XML per ogni riga. Le righe che non riescono a convalidare vengono considerate come gli errori di analisi come sopra. L'XSD non influisce in caso contrario sullo schema fornito o dedotto. Se lo stesso percorso locale non è già visibile negli executor nel cluster, il file XSD e gli altri da cui dipende devono essere aggiunti agli executor Spark con SparkContext.addFile. In questo caso, per usare XSD /foo/bar.xsdlocale, chiamare addFile("/foo/bar.xsd") e passare "bar.xsd" come rowValidationXSDPath.
  • Scrittura
    • path: percorso in cui scrivere file.
    • rowTag: Etichetta di riga da trattare come una riga. In questo codice XML <books><book><book>...</books>, ad esempio, il valore sarà book. Il valore predefinito è ROW.
    • rootTag: tag radice da considerare come radice. In questo codice XML <books><book><book>...</books>, ad esempio, il valore sarà books. Il valore predefinito è ROWS.
    • nullValue: valore da scrivere null . Il valore predefinito è la stringa "null". Quando "null", non scrive attributi ed elementi per i campi.
    • attributePrefix: prefisso per gli attributi per distinguere attributi ed elementi. Si tratta del prefisso per i nomi dei campi. Il valore predefinito è _.
    • valueTag: tag usato per il valore quando sono presenti attributi in un elemento senza elementi figlio. Il valore predefinito è _VALUE.
    • compression: codec di compressione da usare durante il salvataggio nel file. Deve essere il nome completo di una classe che implementa org.apache.hadoop.io.compress.CompressionCodec o uno dei nomi brevi senza distinzione tra maiuscole e minuscole (bzip2, gzip, lz4e snappy). Il valore predefinito è nessuna compressione.

Supporta l'utilizzo abbreviato dei nomi; È possibile usare xml anziché com.databricks.spark.xml.

Supporto XSD

È possibile convalidare singole righe rispetto a uno schema XSD usando rowValidationXSDPath.

Usare l'utilità com.databricks.spark.xml.util.XSDToSchema per estrarre uno schema di dataframe Spark da alcuni file XSD. Supporta solo tipi semplici, complessi e di sequenza, solo funzionalità XSD di base ed è sperimentale.

import com.databricks.spark.xml.util.XSDToSchema
import java.nio.file.Paths

val schema = XSDToSchema.read(Paths.get("/path/to/your.xsd"))
val df = spark.read.schema(schema)....xml(...)

Analizzare XML annidato

Sebbene venga usato principalmente per convertire un file XML in un dataframe, è anche possibile usare il from_xml metodo per analizzare il codice XML in una colonna con valori di stringa in un dataframe esistente e aggiungerlo come nuova colonna con risultati analizzati come struct con:

import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._

val df = ... /// DataFrame with XML in column 'payload'
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))

Nota

  • mode:
    • Se impostato su PERMISSIVE, il valore predefinito, la modalità di analisi viene invece impostata su DROPMALFORMED. Se includi una colonna nello schema per from_xml che corrisponde a columnNameOfCorruptRecord, allora la modalità PERMISSIVE produce record malformati che vengono inseriti in quella colonna nella struct risultante.
    • Se impostato su DROPMALFORMED, i valori XML che non analizzano correttamente generano un null valore per la colonna. Nessuna riga viene eliminata.
  • from_xml converte matrici di stringhe contenenti XML in matrici di struct analizzati. Utilizzare invece schema_of_xml_array.
  • from_xml_string è un'alternativa da utilizzare in funzioni definite dall'utente che operano direttamente su una stringa anziché su una colonna.

Regole di conversione

A causa delle differenze strutturali tra dataframe e XML, esistono alcune regole di conversione da dati XML a dataframe e da dataframe a dati XML. È possibile disabilitare la gestione degli attributi con l'opzione excludeAttribute.

Convertire XML in DataFrame

  • Attributi: gli attributi vengono convertiti come campi con il prefisso specificato nell'opzione attributePrefix . Se attributePrefix è _, il documento

    <one myOneAttrib="AAAA">
        <two>two</two>
        <three>three</three>
    </one>
    

    produce lo schema:

    root
    |-- _myOneAttrib: string (nullable = true)
    |-- two: string (nullable = true)
    |-- three: string (nullable = true)
    
  • Se un elemento ha attributi ma nessun elemento figlio, il valore dell'attributo viene inserito in un campo separato specificato nell'opzione valueTag . Se valueTag è _VALUE, il documento

    <one>
        <two myTwoAttrib="BBBBB">two</two>
        <three>three</three>
    </one>
    

    produce lo schema:

    root
    |-- two: struct (nullable = true)
    |    |-- _VALUE: string (nullable = true)
    |    |-- _myTwoAttrib: string (nullable = true)
    |-- three: string (nullable = true)
    

Convertire un dataframe in XML

Scrittura di un file XML da un DataFrame con un campo ArrayType che ha come elemento ArrayType, avrebbe un campo annidato aggiuntivo per l'elemento. Ciò non avviene durante la lettura e la scrittura di dati XML, ma nella scrittura di un dataframe letto da altre origini. Pertanto, il roundtrip nella lettura e nella scrittura di file XML ha la stessa struttura, ma è possibile che la scrittura di un DataFrame letto da altre origini abbia una struttura diversa.

DataFrame con lo schema:

 |-- a: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

e dati:

+------------------------------------+
|                                   a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

Produce il file XML:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>