Leggere e scrivere dati XML usando la spark-xml
libreria
Importante
Questa documentazione è stata ritirata e potrebbe non essere aggiornata. I prodotti, i servizi o le tecnologie menzionati int il suo contenuto non sono ufficialmente approvati o testati da Databricks.
Il supporto del formato di file XML nativo è disponibile come anteprima pubblica. Vedere Leggere e scrivere file XML.
Questo articolo descrive come leggere e scrivere un file XML come origine dati Apache Spark.
Creare la
spark-xml
libreria come libreria Maven. Per la coordinata Maven, specificare:- Databricks Runtime 7.x e versioni successive:
com.databricks:spark-xml_2.12:<release>
Vedere
spark-xml
Versioni per la versione più recente di<release>
.- Databricks Runtime 7.x e versioni successive:
Installare la libreria in un cluster.
L'esempio in questa sezione utilizza il file XML libri.
Recuperare il file XML dei libri:
$ wget https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml
Caricare il file in DBFS.
/*Infer schema*/
CREATE TABLE books
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")
/*Specify column names and types*/
CREATE TABLE books (author string, description string, genre string, _id string, price double, publish_date string, title string)
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")
// Infer schema
import com.databricks.spark.xml._ // Add the DataFrame.read.xml() method
val df = spark.read
.option("rowTag", "book")
.xml("dbfs:/books.xml")
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("dbfs:/newbooks.xml")
// Specify schema
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read
.option("rowTag", "book")
.schema(customSchema)
.xml("books.xml")
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("dbfs:/newbooks.xml")
# Infer schema
library(SparkR)
sparkR.session("local[4]", sparkPackages = c("com.databricks:spark-xml_2.12:<release>"))
df <- read.df("dbfs:/books.xml", source = "xml", rowTag = "book")
# Default `rootTag` and `rowTag`
write.df(df, "dbfs:/newbooks.xml", "xml")
# Specify schema
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- read.df("dbfs:/books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
write.df(df, "dbfs:/newbooks.xml", "xml", "overwrite")
- Leggi
-
path
: percorso dei file XML. Accetta le espressioni globbing standard di Hadoop. -
rowTag
: il tag di riga da trattare come una riga. In questo codice XML<books><book><book>...</books>
, ad esempio, il valore saràbook
. Il valore predefinito èROW
. -
samplingRatio
: rapporto di campionamento per l'inferenza dello schema (0,0 ~ 1). Il valore predefinito è 1. I tipi possibili sonoStructType
,ArrayType
StringType
,LongType
DoubleType
BooleanType
TimestampType
eNullType
, a meno che non si fornisca uno schema. -
excludeAttribute
: indica se escludere gli attributi negli elementi. Il valore predefinito è false. -
nullValue
: valore da considerare comenull
. Il valore predefinito è""
. -
mode
: modalità per la gestione dei record danneggiati. Il valore predefinito èPERMISSIVE
.-
PERMISSIVE
:- Quando rileva un record danneggiato, imposta tutti i campi su
null
e inserisce la stringa in formato non valido in un nuovo campo configurato dacolumnNameOfCorruptRecord
. - Quando rileva un campo del tipo di dati errato, imposta il campo che causa l'errore su
null
.
- Quando rileva un record danneggiato, imposta tutti i campi su
-
DROPMALFORMED
: ignora i record danneggiati. -
FAILFAST
: genera un'eccezione quando rileva record danneggiati.
-
-
inferSchema
: setrue
, tenta di dedurre un tipo appropriato per ogni colonna dataframe risultante, ad esempio un tipo booleano, numerico o date. Sefalse
, tutte le colonne risultanti sono di tipo stringa. Il valore predefinito ètrue
. -
columnNameOfCorruptRecord
: nome del nuovo campo in cui vengono archiviate stringhe in formato non valido. Il valore predefinito è_corrupt_record
. -
attributePrefix
: prefisso per gli attributi in modo da distinguere attributi ed elementi. Si tratta del prefisso per i nomi dei campi. Il valore predefinito è_
. -
valueTag
: tag usato per il valore quando sono presenti attributi in un elemento senza elementi figlio. Il valore predefinito è_VALUE
. -
charset
: il valore predefinito èUTF-8
ma può essere impostato su altri nomi di set di caratteri validi. -
ignoreSurroundingSpaces
: indica se gli spazi vuoti circostanti devono essere ignorati. Il valore predefinito è false. -
rowValidationXSDPath
: percorso di un file XSD usato per convalidare il codice XML per ogni riga. Le righe che non riescono a convalidare vengono considerate come gli errori di analisi come sopra. L'XSD non influisce in caso contrario sullo schema fornito o dedotto. Se lo stesso percorso locale non è già visibile negli executor nel cluster, il file XSD e gli altri da cui dipende devono essere aggiunti agli executor Spark con SparkContext.addFile. In questo caso, per usare XSD/foo/bar.xsd
locale, chiamareaddFile("/foo/bar.xsd")
e passare"bar.xsd"
comerowValidationXSDPath
.
-
- Scrittura
-
path
: percorso in cui scrivere file. -
rowTag
: Etichetta di riga da trattare come una riga. In questo codice XML<books><book><book>...</books>
, ad esempio, il valore saràbook
. Il valore predefinito èROW
. -
rootTag
: tag radice da considerare come radice. In questo codice XML<books><book><book>...</books>
, ad esempio, il valore saràbooks
. Il valore predefinito èROWS
. -
nullValue
: valore da scriverenull
. Il valore predefinito è la stringa"null"
. Quando"null"
, non scrive attributi ed elementi per i campi. -
attributePrefix
: prefisso per gli attributi per distinguere attributi ed elementi. Si tratta del prefisso per i nomi dei campi. Il valore predefinito è_
. -
valueTag
: tag usato per il valore quando sono presenti attributi in un elemento senza elementi figlio. Il valore predefinito è_VALUE
. -
compression
: codec di compressione da usare durante il salvataggio nel file. Deve essere il nome completo di una classe che implementaorg.apache.hadoop.io.compress.CompressionCodec
o uno dei nomi brevi senza distinzione tra maiuscole e minuscole (bzip2
,gzip
,lz4
esnappy
). Il valore predefinito è nessuna compressione.
-
Supporta l'utilizzo abbreviato dei nomi; È possibile usare xml
anziché com.databricks.spark.xml
.
È possibile convalidare singole righe rispetto a uno schema XSD usando rowValidationXSDPath
.
Usare l'utilità com.databricks.spark.xml.util.XSDToSchema
per estrarre uno schema di dataframe Spark da alcuni file XSD. Supporta solo tipi semplici, complessi e di sequenza, solo funzionalità XSD di base ed è sperimentale.
import com.databricks.spark.xml.util.XSDToSchema
import java.nio.file.Paths
val schema = XSDToSchema.read(Paths.get("/path/to/your.xsd"))
val df = spark.read.schema(schema)....xml(...)
Sebbene venga usato principalmente per convertire un file XML in un dataframe, è anche possibile usare il from_xml
metodo per analizzare il codice XML in una colonna con valori di stringa in un dataframe esistente e aggiungerlo come nuova colonna con risultati analizzati come struct con:
import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._
val df = ... /// DataFrame with XML in column 'payload'
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))
Nota
-
mode
:- Se impostato su
PERMISSIVE
, il valore predefinito, la modalità di analisi viene invece impostata suDROPMALFORMED
. Se includi una colonna nello schema perfrom_xml
che corrisponde acolumnNameOfCorruptRecord
, allora la modalitàPERMISSIVE
produce record malformati che vengono inseriti in quella colonna nella struct risultante. - Se impostato su
DROPMALFORMED
, i valori XML che non analizzano correttamente generano unnull
valore per la colonna. Nessuna riga viene eliminata.
- Se impostato su
-
from_xml
converte matrici di stringhe contenenti XML in matrici di struct analizzati. Utilizzare inveceschema_of_xml_array
. -
from_xml_string
è un'alternativa da utilizzare in funzioni definite dall'utente che operano direttamente su una stringa anziché su una colonna.
A causa delle differenze strutturali tra dataframe e XML, esistono alcune regole di conversione da dati XML a dataframe e da dataframe a dati XML. È possibile disabilitare la gestione degli attributi con l'opzione excludeAttribute
.
Attributi: gli attributi vengono convertiti come campi con il prefisso specificato nell'opzione
attributePrefix
. SeattributePrefix
è_
, il documento<one myOneAttrib="AAAA"> <two>two</two> <three>three</three> </one>
produce lo schema:
root |-- _myOneAttrib: string (nullable = true) |-- two: string (nullable = true) |-- three: string (nullable = true)
Se un elemento ha attributi ma nessun elemento figlio, il valore dell'attributo viene inserito in un campo separato specificato nell'opzione
valueTag
. SevalueTag
è_VALUE
, il documento<one> <two myTwoAttrib="BBBBB">two</two> <three>three</three> </one>
produce lo schema:
root |-- two: struct (nullable = true) | |-- _VALUE: string (nullable = true) | |-- _myTwoAttrib: string (nullable = true) |-- three: string (nullable = true)
Scrittura di un file XML da un DataFrame con un campo ArrayType
che ha come elemento ArrayType
, avrebbe un campo annidato aggiuntivo per l'elemento. Ciò non avviene durante la lettura e la scrittura di dati XML, ma nella scrittura di un dataframe letto da altre origini. Pertanto, il roundtrip nella lettura e nella scrittura di file XML ha la stessa struttura, ma è possibile che la scrittura di un DataFrame letto da altre origini abbia una struttura diversa.
DataFrame con lo schema:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
e dati:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
Produce il file XML:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>