Megosztás a következőn keresztül:


XML-adatok olvasása és írása a spark-xml kódtár használatával

Fontos

Ez a dokumentáció ki lett állítva, és lehet, hogy nem frissül. A tartalomban említett termékeket, szolgáltatásokat vagy technológiákat a Databricks hivatalosan nem támogatja vagy teszteli.

A natív XML-fájlformátum támogatása nyilvános előzetes verzióként érhető el. Lásd: XML-fájlok olvasása és írása.

Ez a cikk bemutatja, hogyan olvashat és írhat xml-fájlokat Apache Spark-adatforrásként.

Követelmények

  1. Hozza létre a spark-xml tárat Maven-kódtárként. A Maven koordináta esetében adja meg a következőket:

    • Databricks Runtime 7.x és újabb verziók: com.databricks:spark-xml_2.12:<release>

    Tekintse meg spark-xml a legújabb verziójú kiadásokat<release>.

  2. Telepítse a tárat egy fürtre.

Példa

A jelen szakaszban szereplő példa a könyvek XML-fájljában található.

  1. A könyvek XML-fájljának lekérése:

    $ wget https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml
    
  2. Töltse fel a fájlt a DBFS-be.

XML-adatok olvasása és írása

SQL

/*Infer schema*/

CREATE TABLE books
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")

/*Specify column names and types*/

CREATE TABLE books (author string, description string, genre string, _id string, price double, publish_date string, title string)
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")

Scala

// Infer schema

import com.databricks.spark.xml._ // Add the DataFrame.read.xml() method

val df = spark.read
  .option("rowTag", "book")
  .xml("dbfs:/books.xml")

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("dbfs:/newbooks.xml")

// Specify schema

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))

val df = spark.read
  .option("rowTag", "book")
  .schema(customSchema)
  .xml("books.xml")

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("dbfs:/newbooks.xml")

R

# Infer schema

library(SparkR)

sparkR.session("local[4]", sparkPackages = c("com.databricks:spark-xml_2.12:<release>"))

df <- read.df("dbfs:/books.xml", source = "xml", rowTag = "book")

# Default `rootTag` and `rowTag`
write.df(df, "dbfs:/newbooks.xml", "xml")

# Specify schema

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- read.df("dbfs:/books.xml", source = "xml", schema = customSchema, rowTag = "book")

# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
write.df(df, "dbfs:/newbooks.xml", "xml", "overwrite")

Beállítások

  • Olvas
    • path: AZ XML-fájlok helye. Szabványos Hadoop-globbing kifejezéseket fogad el.
    • rowTag: A sorként kezelendő sorcímke. Ebben az XML-ben <books><book><book>...</books>például az érték az lenne book. Az alapértelmezett szint a ROW.
    • samplingRatio: Mintavételezési arány a következtetési sémához (0,0 ~ 1). Az alapértelmezett érték 1. A lehetséges típusok a StructTypekövetkezők: , ArrayType, StringTypeLongType, DoubleType, NullTypeBooleanTypeTimestampType kivéve, ha sémát ad meg.
    • excludeAttribute: Az elemek attribútumainak kizárása. Az alapértelmezett érték a false (hamis).
    • nullValue: Az értékként null kezelendő érték. Az alapértelmezett szint a "".
    • mode: A sérült rekordok kezelésének módja. Az alapértelmezett szint a PERMISSIVE.
      • PERMISSIVE:
        • Ha sérült rekorddal találkozik, az összes mezőt null beállítja, és a hibásan formázott sztringet egy új, a program által columnNameOfCorruptRecordkonfigurált mezőbe helyezi.
        • Ha nem megfelelő adattípusú mezővel találkozik, a jogsértő mezőt a következőre nullállítja: .
      • DROPMALFORMED: figyelmen kívül hagyja a sérült rekordokat.
      • FAILFAST: kivételt eredményez, ha sérült rekordokat észlel.
    • inferSchema: if true, az eredményül kapott DataFrame-oszlopokhoz , például logikai, numerikus vagy dátumtípushoz próbál megfelelő típust kikövetkeztetni. Ha false, az összes eredményül kapott oszlop sztring típusú. Az alapértelmezett szint a true.
    • columnNameOfCorruptRecord: Annak az új mezőnek a neve, amelyben a helytelen formátumú sztringek vannak tárolva. Az alapértelmezett szint a _corrupt_record.
    • attributePrefix: Az attribútumok előtagja az attribútumok és elemek megkülönböztetése érdekében. Ez a mezőnevek előtagja. Az alapértelmezett szint a _.
    • valueTag: Az értékhez használt címke, ha olyan elem attribútumai vannak, amelyek nem tartalmaznak gyermekelemeket. Az alapértelmezett szint a _VALUE.
    • charset: Alapértelmezett érték, UTF-8 de más érvényes karakterkészletnevekre is beállítható.
    • ignoreSurroundingSpaces: Azt jelzi, hogy kihagyja-e az értékeket körülvevő szóközöket. Az alapértelmezett érték a false (hamis).
    • rowValidationXSDPath: Elérési út egy XSD-fájlhoz, amely az EGYES sorok XML-jének ellenőrzésére szolgál. A nem érvényesíthető sorok elemzési hibákként lesznek kezelve a fenti módon. Az XSD egyébként nem befolyásolja a megadott vagy kikövetkezett sémát. Ha ugyanez a helyi elérési út még nem látható a fürt végrehajtóinál, akkor az XSD-t és az attól függő többit hozzá kell adni a Spark-végrehajtókhoz a SparkContext.addFile fájllal. Ebben az esetben a helyi XSD /foo/bar.xsdhasználatához hívja addFile("/foo/bar.xsd") meg és adja át "bar.xsd" a következőt rowValidationXSDPath: .
  • Ír
    • path: Fájlok írásának helye.
    • rowTag: A sorként kezelendő sorcímke. Ebben az XML-ben <books><book><book>...</books>például az érték az lenne book. Az alapértelmezett szint a ROW.
    • rootTag: A gyökércímke, amely gyökérként kezelendő. Ebben az XML-ben <books><book><book>...</books>például az érték az lenne books. Az alapértelmezett szint a ROWS.
    • nullValue: Az írandó null érték. Az alapértelmezett érték a sztring "null". Amikor "null"nem ír attribútumokat és elemeket a mezőkhöz.
    • attributePrefix: Az attribútumok és elemek megkülönböztetésére szolgáló attribútumok előtagja. Ez a mezőnevek előtagja. Az alapértelmezett szint a _.
    • valueTag: Az értékhez használt címke, ha olyan elem attribútumai vannak, amelyek nem tartalmaznak gyermekelemeket. Az alapértelmezett szint a _VALUE.
    • compression: Fájlba mentéskor használandó tömörítési kodek. Egy implementáló org.apache.hadoop.io.compress.CompressionCodec osztály teljes neve, vagy a kis- és nagybetűket nem megkülönböztető rövid nevek (bzip2, , gzip, lz4és snappy) egyikének kell lennie. Az alapértelmezett érték nem tömörítés.

Támogatja a rövidített névhasználatot; xml Használhatja ahelyett, hogy com.databricks.spark.xml.

XSD-támogatás

Az egyes sorokat egy XSD-sémán ellenőrizheti a következővel rowValidationXSDPath: .

A segédprogrammal com.databricks.spark.xml.util.XSDToSchema kinyerhet egy Spark DataFrame-sémát néhány XSD-fájlból. Csak egyszerű, összetett és szekvenciatípusokat támogat, csak alapszintű XSD-funkciókat, és kísérleti jellegű.

import com.databricks.spark.xml.util.XSDToSchema
import java.nio.file.Paths

val schema = XSDToSchema.read(Paths.get("/path/to/your.xsd"))
val df = spark.read.schema(schema)....xml(...)

Beágyazott XML elemzése

Bár az XML-fájlok elsősorban DataFrame-fájllá alakítására szolgálnak, a from_xml metódussal elemezheti az XML-t egy meglévő DataFrame-fájl sztringértékes oszlopában, és új oszlopként is hozzáadhatja az elemzési eredményekkel, a következőkkel:

import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._

val df = ... /// DataFrame with XML in column 'payload'
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))

Feljegyzés

  • mode:
    • Ha az alapértelmezett értékre van PERMISSIVEállítva, az elemzési mód alapértelmezés szerint a következő lesz DROPMALFORMED. Ha olyan oszlopot tartalmaz a sémában, amely megfelel a sémánakfrom_xml, akkor PERMISSIVE a columnNameOfCorruptRecordmód hibásan formázott rekordokat ad ki az adott oszlopba az eredményül kapott szerkezetben.
    • Ha be van állítva, DROPMALFORMEDaz olyan XML-értékek, amelyek nem elemzik megfelelően, az oszlop értékét eredményezik null . A rendszer nem ejti el a sorokat.
  • from_xml AZ XML-t tartalmazó sztringtömböket elemzi strukturált szerkezetek tömbjeivé alakítja. A schema_of_xml_array használható helyette.
  • from_xml_string Egy alternatíva az olyan UDF-ekben való használatra, amelyek közvetlenül egy oszlop helyett egy sztringen működnek.

Konverziós szabályok

A DataFrames és az XML közötti szerkezeti különbségek miatt az XML-adatokról a DataFrame-re és a DataFrame-ről az XML-adatokra való konvertálásra vonatkozó szabályok léteznek. A beállítással excludeAttributeletilthatja az attribútumok kezelését.

XML konvertálása DataFrame-gé

  • Attribútumok: Az attribútumok mezőkké alakulnak a beállításban attributePrefix megadott előtaggal. Ha attributePrefix igen _, a dokumentum

    <one myOneAttrib="AAAA">
        <two>two</two>
        <three>three</three>
    </one>
    

    hozza létre a sémát:

    root
    |-- _myOneAttrib: string (nullable = true)
    |-- two: string (nullable = true)
    |-- three: string (nullable = true)
    
  • Ha egy elem rendelkezik attribútumokkal, de nem tartalmaznak gyermekelemeket, az attribútum értéke a beállításban valueTag megadott külön mezőben lesz elhelyezve. Ha valueTag igen _VALUE, a dokumentum

    <one>
        <two myTwoAttrib="BBBBB">two</two>
        <three>three</three>
    </one>
    

    hozza létre a sémát:

    root
    |-- two: struct (nullable = true)
    |    |-- _VALUE: string (nullable = true)
    |    |-- _myTwoAttrib: string (nullable = true)
    |-- three: string (nullable = true)
    

DataFrame konvertálása XML-fájllá

Xml-fájl írása a DataFrame-ből egy olyan mezővel ArrayType , amely az elemével rendelkezik, valamint ArrayType egy további beágyazott mező az elemhez. Ez nem XML-adatok olvasása és írása során, hanem más forrásokból beolvasott DataFrame írása esetén fordul elő. Ezért az XML-fájlok olvasása és írása során a kerekítés ugyanazzal a struktúrával rendelkezik, de más forrásokból származó DataFrame-olvasás írása más struktúrával is rendelkezhet.

Adatkeret a sémával:

 |-- a: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

és adatok:

+------------------------------------+
|                                   a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

létrehozza az XML-fájlt:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>