XML dosyalarını okuma ve yazma
Önemli
Bu özellik Genel Önizlemededir.
Bu makalede XML dosyalarının nasıl okunduğu ve yazıldığı açıklanır.
Genişletilebilir biçimlendirme dili (XML), verileri metin biçiminde biçimlendirmeye, depolamaya ve paylaşmaya yönelik bir işaretleme dilidir. Belgelerden rastgele veri yapılarına kadar değişen verileri seri hale getirmek için bir dizi kural tanımlar.
Yerel XML dosya biçimi desteği, xml verilerini toplu işleme veya akış için alma, sorgulama ve ayrıştırma işlemlerini etkinleştirir. Şema ve veri türlerini otomatik olarak çıkarıp geliştirebilir, gibi from_xml
SQL ifadelerini destekler ve XML belgeleri oluşturabilir. Dış jar gerektirmez ve Otomatik Yükleyici read_files
ve COPY INTO
ile sorunsuz çalışır. İsteğe bağlı olarak her satır düzeyi XML kaydını bir XML Şema Tanımına (XSD) göre doğrulayabilirsiniz.
Gereksinimler
Databricks Runtime 14.3 ve üzeri
XML kayıtlarını ayrıştırma
XML belirtimi iyi biçimlendirilmiş bir yapıyı zorunlu kullanır. Ancak, bu belirtim hemen tablosal bir biçimle eşlenemez. bir ile rowTag
eşleyen XML öğesini belirtmek için DataFrame
Row
seçeneğini belirtmeniz gerekir. rowTag
öğesi en üst düzey struct
olur. öğesinin rowTag
alt öğeleri, en üst düzey struct
alanının alanları haline gelir.
Bu kaydın şemasını belirtebilir veya otomatik olarak çıkarılmalarını sağlayabilirsiniz. Ayrıştırıcı yalnızca öğeleri incelediğinden rowTag
, DTD ve dış varlıklar filtrelenir.
Aşağıdaki örneklerde, farklı rowTag
seçenekler kullanılarak xml dosyasının şema çıkarımı ve ayrıştırılması gösterilmektedir:
Python
xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)
Scala
val xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)
XML dosyasını "kitaplar" seçeneğiyle rowTag
okuyun:
Python
df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
Scala
val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)
Çıktı:
root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)
+------------------------------------------------------------------------------+
|book |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+
XML dosyasını rowTag
"book" olarak okuyun:
Python
df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:
Scala
val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:
Çıktı:
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)
+-----+-----------+---------------+
|_id |author |title |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+
Veri kaynağı seçenekleri
XML için veri kaynağı seçenekleri aşağıdaki yollarla belirtilebilir:
- Aşağıdaki
.option/.options
yöntemleri:- DataFrameReader
- DataFrameWriter
- DataStreamReader
- DataStreamWriter
- Aşağıdaki yerleşik işlevler:
OPTIONS
CREATE TABLE USING DATA_SOURCE yan tümcesi
Seçeneklerin listesi için bkz . Otomatik Yükleyici seçenekleri.
XSD desteği
İsteğe bağlı olarak her satır düzeyi XML kaydını bir XML Şema Tanımı (XSD) ile doğrulayabilirsiniz. Seçeneğinde rowValidationXSDPath
XSD dosyası belirtilir. XSD, sağlanan veya çıkarılmış şemayı başka şekilde etkilemez. Doğrulamada başarısız olan bir kayıt "bozuk" olarak işaretlenir ve seçenek bölümünde açıklanan bozuk kayıt işleme modu seçeneğine göre işlenir.
Bir XSD dosyasından Spark DataFrame şeması ayıklamak için kullanabilirsiniz XSDToSchema
. Yalnızca basit, karmaşık ve sıra türlerini destekler ve yalnızca temel XSD işlevlerini destekler.
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path
val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="book">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="title" type="xs:string" />
<xs:element name="genre" type="xs:string" />
<xs:element name="price" type="xs:decimal" />
<xs:element name="publish_date" type="xs:date" />
<xs:element name="description" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""
dbutils.fs.put(xsdPath, xsdString, true)
val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))
Aşağıdaki tabloda XSD veri türlerinin Spark veri türlerine dönüştürülmesi gösterilmektedir:
XSD Veri Türleri | Spark Veri Türleri |
---|---|
boolean |
BooleanType |
decimal |
DecimalType |
unsignedLong |
DecimalType(38, 0) |
double |
DoubleType |
float |
FloatType |
byte |
ByteType |
short , unsignedByte |
ShortType |
integer , negativeInteger , nonNegativeInteger , , nonPositiveInteger , positiveInteger , unsignedShort |
IntegerType |
long , unsignedInt |
LongType |
date |
DateType |
dateTime |
TimestampType |
Others |
StringType |
İç içe XML ayrıştırma
Mevcut DataFrame'deki dize değerli bir sütundaki XML verileri ile schema_of_xml
ayrıştırılabilir ve from_xml
şemayı ve ayrıştırılan sonuçları yeni struct
sütunlar olarak döndürür. ve bağımsız schema_of_xml
from_xml
değişkeni olarak geçirilen XML verileri tek bir iyi biçimlendirilmiş XML kaydı olmalıdır.
schema_of_xml
Söz dizimi
schema_of_xml(xmlStr [, options] )
Bağımsız Değişkenler
xmlStr
: İyi biçimlendirilmiş tek bir XML kaydı belirten STRING ifadesi.options
: İsteğe bağlıMAP<STRING,STRING>
değişmez değer belirtme yönergeleri.
İadeler
Sütun adlarının XML öğesinden ve öznitelik adlarından türetildiği n dize alanı içeren bir yapının tanımını tutan STRING. Alan değerleri türetilmiş biçimlendirilmiş SQL türlerini barındırır.
from_xml
Söz dizimi
from_xml(xmlStr, schema [, options])
Bağımsız Değişkenler
xmlStr
: İyi biçimlendirilmiş tek bir XML kaydı belirten STRING ifadesi.schema
: BIR STRING ifadesi veya işlevi çağırmaschema_of_xml
.options
: İsteğe bağlıMAP<STRING,STRING>
değişmez değer belirtme yönergeleri.
İadeler
Şema tanımıyla eşleşen alan adlarını ve türlerini içeren bir yapı. Şema, içinde kullanıldığı gibi virgülle ayrılmış sütun adı ve veri türü çiftleri olarak tanımlanmalıdır. Örneğin, CREATE TABLE
. Veri kaynağı seçeneklerinde gösterilen çoğu seçenek aşağıdaki özel durumlar için geçerlidir:
rowTag
: Yalnızca bir XML kaydı olduğundan seçeneğirowTag
geçerli değildir.mode
(varsayılan:PERMISSIVE
): Ayrıştırma sırasında bozuk kayıtlarla ilgilenmek için bir moda izin verir.PERMISSIVE
: Bozuk bir kaydı karşıladığında, hatalı biçimlendirilmiş dizeyi tarafındancolumnNameOfCorruptRecord
yapılandırılan bir alana yerleştirir ve hatalı biçimlendirilmiş alanları olaraknull
ayarlar. Bozuk kayıtları tutmak için, kullanıcı tanımlı şemada adlıcolumnNameOfCorruptRecord
bir dize türü alanı ayarlayabilirsiniz. Bir şemanın alanı yoksa, ayrıştırma sırasında bozuk kayıtları bırakır. Şema çıkarıldığında, bir çıktı şemasına örtük olarak bircolumnNameOfCorruptRecord
alan ekler.FAILFAST
: Bozuk kayıtları karşıladığında bir özel durum oluşturur.
Yapı dönüştürme
DataFrame ile XML arasındaki yapı farklılıkları nedeniyle, XML verilerinden XML verilerine ve DataFrame
XML verilerine DataFrame
bazı dönüştürme kuralları vardır. Öznitelikleri işlemenin seçeneğiyle excludeAttribute
devre dışı bırakılabildiğini unutmayın.
XML'den DataFrame'e dönüştürme
Öznitelikler: Öznitelikler, başlık ön ekine attributePrefix
sahip alanlar olarak dönüştürülür.
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>
aşağıdaki şemayı oluşturur:
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)
Öznitelik veya alt öğe içeren bir öğedeki karakter verileri: Bunlar alana ayrıştırılır valueTag
. Karakter verilerinin birden çok tekrarı varsa, valueTag
alan bir array
türe dönüştürülür.
<one>
<two myTwoAttrib="BBBBB">two</two>
some value between elements
<three>three</three>
some other value between elements
</one>
aşağıdaki şemayı oluşturur:
root
|-- _VALUE: array (nullable = true)
| |-- element: string (containsNull = true)
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
DataFrame'den XML'ye dönüştürme
Dizideki bir dizi olarak öğe: Öğesiyle birlikte bir alana ArrayType
sahip bir DataFrame
XML dosyası yazma, öğesi ArrayType
için ek bir iç içe alan olması gibi. Bu durum XML verilerini okurken ve yazarken değil, diğer kaynaklardan okuma yazarken DataFrame
meydana gelir. Bu nedenle, XML dosyalarını okuma ve yazmada gidiş dönüş aynı yapıya sahiptir, ancak başka kaynaklardan okuma yazmak DataFrame
farklı bir yapıya sahip olabilir.
Aşağıdaki şemaya sahip DataFrame:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
ve aşağıdaki verilerle:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
aşağıda bir XML dosyası oluşturur:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>
içindeki DataFrame
adlandırılmamış dizinin öğe adı seçeneğiyle arrayElementName
belirtilir (Varsayılan: item
).
Kurtarılan veri sütunu
Kurtarılan veri sütunu, ETL sırasında verileri asla kaybetmenizi veya kaçırmamanızı sağlar. Bir kayıttaki bir veya daha fazla alanda aşağıdaki sorunlardan biri olduğundan, kurtarılan veri sütununu ayrıştırılmayan verileri yakalamak için etkinleştirebilirsiniz:
- Sağlanan şemada yok
- Sağlanan şemanın veri türüyle eşleşmiyor
- Sağlanan şemadaki alan adlarıyla büyük/küçük harf uyuşmazlığı var
Kurtarılan veri sütunu, kurtarılan sütunları ve kaydın kaynak dosya yolunu içeren bir JSON belgesi olarak döndürülür. Kurtarılan veri sütunundan kaynak dosya yolunu kaldırmak için aşağıdaki SQL yapılandırmasını ayarlayabilirsiniz:
Python
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
Scala
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").
Ile gibi _rescued_data
spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>)
verileri okurken seçeneğini rescuedDataColumn
bir sütun adı olarak ayarlayarak kurtarılan veri sütununu etkinleştirebilirsiniz.
XML ayrıştırıcısı kayıtları ayrıştırırken üç modu destekler: PERMISSIVE
, DROPMALFORMED
ve FAILFAST
. ile rescuedDataColumn
birlikte kullanıldığında, veri türü uyuşmazlıkları kayıtların modda DROPMALFORMED
bırakılmasına veya modda hata FAILFAST
oluşturmasına neden olmaz. Yalnızca bozuk kayıtlar (eksik veya yanlış biçimlendirilmiş XML) bırakılır veya hata oluşturur.
Otomatik Yükleyici'de şema çıkarımı ve evrimi
Bu konu başlığı ve geçerli seçenekler hakkında ayrıntılı bilgi için bkz . Otomatik Yükleyici'de şema çıkarımı ve evrimini yapılandırma. Otomatik Yükleyici'yi, yüklenen XML verilerinin şemasını otomatik olarak algılayarak veri şemasını açıkça bildirmeden tabloları başlatmanıza ve yeni sütunlar kullanıma sunulduğunda tablo şemasını geliştirmenize olanak tanıyacak şekilde yapılandırabilirsiniz. Bu, şema değişikliklerini zaman içinde el ile izleme ve uygulama gereksinimini ortadan kaldırır.
Varsayılan olarak, Otomatik Yükleyici şema çıkarımı, tür uyuşmazlıkları nedeniyle şema evrimi sorunlarından kaçınmayı arar. Veri türlerini (JSON, CSV ve XML) kodlamamış biçimler için Otomatik Yükleyici, XML dosyalarındaki iç içe yerleştirilmiş alanlar da dahil olmak üzere tüm sütunları dize olarak çıkarsar. Apache Spark DataFrameReader
şema çıkarımı için farklı bir davranış kullanır ve örnek verilere göre XML kaynaklarındaki sütunlar için veri türlerini seçer. Otomatik Yükleyici ile bu davranışı etkinleştirmek için seçeneğini cloudFiles.inferColumnTypes
olarak true
ayarlayın.
Otomatik Yükleyici, verilerinizi işlerken yeni sütunların eklenmesini algılar. Otomatik Yükleyici yeni bir sütun algıladığında akış ile UnknownFieldException
durdurulur. Akışınız bu hatayı oluşturmadan önce, Otomatik Yükleyici en son mikro veri toplu işleminde şema çıkarımını gerçekleştirir ve şemanın sonuna yeni sütunları birleştirerek şema konumunu en son şemayla güncelleştirir. Mevcut sütunların veri türleri değişmeden kalır. Otomatik Yükleyici, seçeneğinde cloudFiles.schemaEvolutionMode
ayarladığınız şema evrimi için farklı modları destekler.
Bildiğiniz ve çıkarım yapılan bir şemada beklediğiniz şema bilgilerini zorunlu kılmak için şema ipuçlarını kullanabilirsiniz. Sütunun belirli bir veri türünde olduğunu biliyorsanız veya daha genel bir veri türü (örneğin, tamsayı yerine çift) seçmek istiyorsanız, SQL şema belirtimi söz dizimini kullanarak sütun veri türleri için dize olarak rastgele sayıda ipucu sağlayabilirsiniz. Kurtarılan veri sütunu etkinleştirildiğinde, şemanın dışında bir durumda adlandırılan alanlar sütuna _rescued_data
yüklenir. Seçeneğini olarak ayarlayarak readerCaseSensitive
false
bu davranışı değiştirebilirsiniz. Bu durumda Otomatik Yükleyici verileri büyük/küçük harfe duyarlı olmayan bir şekilde okur.
Örnekler
Bu bölümdeki örneklerde Apache Spark GitHub deposunda indirilebilen bir XML dosyası kullanılmaktadır.
XML okuma ve yazma
Python
df = (spark.read
.format('xml')
.options(rowTag='book')
.load(xmlPath)) # books.xml
selected_data = df.select("author", "_id")
(selected_data.write
.options(rowTag='book', rootTag='books')
.xml('newbooks.xml'))
Scala
val df = spark.read
.option("rowTag", "book")
.xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("newbooks.xml")
R
df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
Verileri okurken şemayı el ile belirtebilirsiniz:
Python
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
custom_schema = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("description", StringType(), True),
StructField("genre", StringType(), True),
StructField("price", DoubleType(), True),
StructField("publish_date", StringType(), True),
StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)
selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')
Scala
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")
R
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
SQL API'si
XML veri kaynağı veri türlerini çıkarsayabilir:
DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;
DDL'de sütun adlarını ve türlerini de belirtebilirsiniz. Bu durumda şema otomatik olarak çıkarılmaz.
DROP TABLE IF EXISTS books;
CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");
COPY INTO kullanarak XML yükleme
DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;
COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');
Satır doğrulama ile XML okuma
Python
df = (spark.read
.format("xml")
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.load(inputPath))
df.printSchema()
Scala
val df = spark.read
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.xml(inputPath)
df.printSchema
İç içe XML ayrıştırma (from_xml ve schema_of_xml)
Python
from pyspark.sql.functions import from_xml, schema_of_xml, lit, col
xml_data = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>
"""
df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
Scala
import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}
val xmlData = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>""".stripMargin
val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()
SQL API ile from_xml ve schema_of_xml
SELECT from_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>',
schema_of_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>')
);
Otomatik Yükleyici ile XML Yükleme
Python
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(availableNow=True)
.toTable("table_name")
)
Scala
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(Trigger.AvailableNow()
.toTable("table_name")
)
Ek kaynaklar
Spark-xml kitaplığını kullanarak XML verilerini okuma ve yazma