XML-adatok olvasása és írása a spark-xml
kódtár használatával
Fontos
Ez a dokumentáció ki lett állítva, és lehet, hogy nem frissül. A tartalomban említett termékeket, szolgáltatásokat vagy technológiákat a Databricks hivatalosan nem támogatja vagy teszteli.
A natív XML-fájlformátum támogatása nyilvános előzetes verzióként érhető el. Lásd: XML-fájlok olvasása és írása.
Ez a cikk bemutatja, hogyan olvashat és írhat xml-fájlokat Apache Spark-adatforrásként.
Követelmények
Hozza létre a
spark-xml
tárat Maven-kódtárként. A Maven koordináta esetében adja meg a következőket:- Databricks Runtime 7.x és újabb verziók:
com.databricks:spark-xml_2.12:<release>
Tekintse meg
spark-xml
a legújabb verziójú kiadásokat<release>
.- Databricks Runtime 7.x és újabb verziók:
Telepítse a tárat egy fürtre.
Példa
A jelen szakaszban szereplő példa a könyvek XML-fájljában található.
A könyvek XML-fájljának lekérése:
$ wget https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml
XML-adatok olvasása és írása
SQL
/*Infer schema*/
CREATE TABLE books
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")
/*Specify column names and types*/
CREATE TABLE books (author string, description string, genre string, _id string, price double, publish_date string, title string)
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")
Scala
// Infer schema
import com.databricks.spark.xml._ // Add the DataFrame.read.xml() method
val df = spark.read
.option("rowTag", "book")
.xml("dbfs:/books.xml")
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("dbfs:/newbooks.xml")
// Specify schema
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read
.option("rowTag", "book")
.schema(customSchema)
.xml("books.xml")
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("dbfs:/newbooks.xml")
R
# Infer schema
library(SparkR)
sparkR.session("local[4]", sparkPackages = c("com.databricks:spark-xml_2.12:<release>"))
df <- read.df("dbfs:/books.xml", source = "xml", rowTag = "book")
# Default `rootTag` and `rowTag`
write.df(df, "dbfs:/newbooks.xml", "xml")
# Specify schema
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- read.df("dbfs:/books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
write.df(df, "dbfs:/newbooks.xml", "xml", "overwrite")
Beállítások
- Olvas
path
: AZ XML-fájlok helye. Szabványos Hadoop-globbing kifejezéseket fogad el.rowTag
: A sorként kezelendő sorcímke. Ebben az XML-ben<books><book><book>...</books>
például az érték az lennebook
. Az alapértelmezett szint aROW
.samplingRatio
: Mintavételezési arány a következtetési sémához (0,0 ~ 1). Az alapértelmezett érték 1. A lehetséges típusok aStructType
következők: ,ArrayType
,StringType
LongType
,DoubleType
,NullType
BooleanType
TimestampType
kivéve, ha sémát ad meg.excludeAttribute
: Az elemek attribútumainak kizárása. Az alapértelmezett érték a false (hamis).nullValue
: Az értékkéntnull
kezelendő érték. Az alapértelmezett szint a""
.mode
: A sérült rekordok kezelésének módja. Az alapértelmezett szint aPERMISSIVE
.PERMISSIVE
:- Ha sérült rekorddal találkozik, az összes mezőt
null
beállítja, és a hibásan formázott sztringet egy új, a program általcolumnNameOfCorruptRecord
konfigurált mezőbe helyezi. - Ha nem megfelelő adattípusú mezővel találkozik, a jogsértő mezőt a következőre
null
állítja: .
- Ha sérült rekorddal találkozik, az összes mezőt
DROPMALFORMED
: figyelmen kívül hagyja a sérült rekordokat.FAILFAST
: kivételt eredményez, ha sérült rekordokat észlel.
inferSchema
: iftrue
, az eredményül kapott DataFrame-oszlopokhoz , például logikai, numerikus vagy dátumtípushoz próbál megfelelő típust kikövetkeztetni. Hafalse
, az összes eredményül kapott oszlop sztring típusú. Az alapértelmezett szint atrue
.columnNameOfCorruptRecord
: Annak az új mezőnek a neve, amelyben a helytelen formátumú sztringek vannak tárolva. Az alapértelmezett szint a_corrupt_record
.attributePrefix
: Az attribútumok előtagja az attribútumok és elemek megkülönböztetése érdekében. Ez a mezőnevek előtagja. Az alapértelmezett szint a_
.valueTag
: Az értékhez használt címke, ha olyan elem attribútumai vannak, amelyek nem tartalmaznak gyermekelemeket. Az alapértelmezett szint a_VALUE
.charset
: Alapértelmezett érték,UTF-8
de más érvényes karakterkészletnevekre is beállítható.ignoreSurroundingSpaces
: Azt jelzi, hogy kihagyja-e az értékeket körülvevő szóközöket. Az alapértelmezett érték a false (hamis).rowValidationXSDPath
: Elérési út egy XSD-fájlhoz, amely az EGYES sorok XML-jének ellenőrzésére szolgál. A nem érvényesíthető sorok elemzési hibákként lesznek kezelve a fenti módon. Az XSD egyébként nem befolyásolja a megadott vagy kikövetkezett sémát. Ha ugyanez a helyi elérési út még nem látható a fürt végrehajtóinál, akkor az XSD-t és az attól függő többit hozzá kell adni a Spark-végrehajtókhoz a SparkContext.addFile fájllal. Ebben az esetben a helyi XSD/foo/bar.xsd
használatához hívjaaddFile("/foo/bar.xsd")
meg és adja át"bar.xsd"
a következőtrowValidationXSDPath
: .
- Ír
path
: Fájlok írásának helye.rowTag
: A sorként kezelendő sorcímke. Ebben az XML-ben<books><book><book>...</books>
például az érték az lennebook
. Az alapértelmezett szint aROW
.rootTag
: A gyökércímke, amely gyökérként kezelendő. Ebben az XML-ben<books><book><book>...</books>
például az érték az lennebooks
. Az alapértelmezett szint aROWS
.nullValue
: Az írandónull
érték. Az alapértelmezett érték a sztring"null"
. Amikor"null"
nem ír attribútumokat és elemeket a mezőkhöz.attributePrefix
: Az attribútumok és elemek megkülönböztetésére szolgáló attribútumok előtagja. Ez a mezőnevek előtagja. Az alapértelmezett szint a_
.valueTag
: Az értékhez használt címke, ha olyan elem attribútumai vannak, amelyek nem tartalmaznak gyermekelemeket. Az alapértelmezett szint a_VALUE
.compression
: Fájlba mentéskor használandó tömörítési kodek. Egy implementálóorg.apache.hadoop.io.compress.CompressionCodec
osztály teljes neve, vagy a kis- és nagybetűket nem megkülönböztető rövid nevek (bzip2
, ,gzip
,lz4
éssnappy
) egyikének kell lennie. Az alapértelmezett érték nem tömörítés.
Támogatja a rövidített névhasználatot; xml
Használhatja ahelyett, hogy com.databricks.spark.xml
.
XSD-támogatás
Az egyes sorokat egy XSD-sémán ellenőrizheti a következővel rowValidationXSDPath
: .
A segédprogrammal com.databricks.spark.xml.util.XSDToSchema
kinyerhet egy Spark DataFrame-sémát néhány XSD-fájlból. Csak egyszerű, összetett és szekvenciatípusokat támogat, csak alapszintű XSD-funkciókat, és kísérleti jellegű.
import com.databricks.spark.xml.util.XSDToSchema
import java.nio.file.Paths
val schema = XSDToSchema.read(Paths.get("/path/to/your.xsd"))
val df = spark.read.schema(schema)....xml(...)
Beágyazott XML elemzése
Bár az XML-fájlok elsősorban DataFrame-fájllá alakítására szolgálnak, a from_xml
metódussal elemezheti az XML-t egy meglévő DataFrame-fájl sztringértékes oszlopában, és új oszlopként is hozzáadhatja az elemzési eredményekkel, a következőkkel:
import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._
val df = ... /// DataFrame with XML in column 'payload'
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))
Feljegyzés
mode
:- Ha az alapértelmezett értékre van
PERMISSIVE
állítva, az elemzési mód alapértelmezés szerint a következő leszDROPMALFORMED
. Ha olyan oszlopot tartalmaz a sémában, amely megfelel a sémánakfrom_xml
, akkorPERMISSIVE
acolumnNameOfCorruptRecord
mód hibásan formázott rekordokat ad ki az adott oszlopba az eredményül kapott szerkezetben. - Ha be van állítva,
DROPMALFORMED
az olyan XML-értékek, amelyek nem elemzik megfelelően, az oszlop értékét eredményeziknull
. A rendszer nem ejti el a sorokat.
- Ha az alapértelmezett értékre van
from_xml
AZ XML-t tartalmazó sztringtömböket elemzi strukturált szerkezetek tömbjeivé alakítja. Aschema_of_xml_array
használható helyette.from_xml_string
Egy alternatíva az olyan UDF-ekben való használatra, amelyek közvetlenül egy oszlop helyett egy sztringen működnek.
Konverziós szabályok
A DataFrames és az XML közötti szerkezeti különbségek miatt az XML-adatokról a DataFrame-re és a DataFrame-ről az XML-adatokra való konvertálásra vonatkozó szabályok léteznek. A beállítással excludeAttribute
letilthatja az attribútumok kezelését.
XML konvertálása DataFrame-gé
Attribútumok: Az attribútumok mezőkké alakulnak a beállításban
attributePrefix
megadott előtaggal. HaattributePrefix
igen_
, a dokumentum<one myOneAttrib="AAAA"> <two>two</two> <three>three</three> </one>
hozza létre a sémát:
root |-- _myOneAttrib: string (nullable = true) |-- two: string (nullable = true) |-- three: string (nullable = true)
Ha egy elem rendelkezik attribútumokkal, de nem tartalmaznak gyermekelemeket, az attribútum értéke a beállításban
valueTag
megadott külön mezőben lesz elhelyezve. HavalueTag
igen_VALUE
, a dokumentum<one> <two myTwoAttrib="BBBBB">two</two> <three>three</three> </one>
hozza létre a sémát:
root |-- two: struct (nullable = true) | |-- _VALUE: string (nullable = true) | |-- _myTwoAttrib: string (nullable = true) |-- three: string (nullable = true)
DataFrame konvertálása XML-fájllá
Xml-fájl írása a DataFrame-ből egy olyan mezővel ArrayType
, amely az elemével rendelkezik, valamint ArrayType
egy további beágyazott mező az elemhez. Ez nem XML-adatok olvasása és írása során, hanem más forrásokból beolvasott DataFrame írása esetén fordul elő. Ezért az XML-fájlok olvasása és írása során a kerekítés ugyanazzal a struktúrával rendelkezik, de más forrásokból származó DataFrame-olvasás írása más struktúrával is rendelkezhet.
Adatkeret a sémával:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
és adatok:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
létrehozza az XML-fájlt:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>