قراءة ملفات XML وكتابتها
توضح هذه المقالة كيفية قراءة ملفات XML وكتابتها.
لغة التمييز الموسعة (XML) هي لغة ترميز لتنسيق البيانات وتخزينها ومشاركتها بتنسيق نصي. وهو يحدد مجموعة من القواعد لتسلسل البيانات التي تتراوح بين المستندات وهياكل البيانات العشوائية.
يتيح دعم تنسيق ملف XML الأصلي استيعاب بيانات XML والاستعلام فيها وتحليلها لمعالجة الدفعات أو دفقها. يمكنه استنتاج وتطوير أنواع المخططات والبيانات تلقائيا، ويدعم تعبيرات SQL مثل from_xml
، ويمكنه إنشاء مستندات XML. لا يتطلب جرة خارجية ويعمل بسلاسة مع أداة التحميل التلقائي و read_files
COPY INTO
. يمكنك التحقق اختياريا من صحة كل سجل XML على مستوى الصف مقابل تعريف مخطط XML (XSD).
المتطلبات
Databricks Runtime 14.3 وما فوق
تحليل سجلات XML
تفرض مواصفات XML بنية جيدة التكوين. ومع ذلك، لا يتم تعيين هذه المواصفات على الفور إلى تنسيق جدولي. يجب تحديد rowTag
الخيار للإشارة إلى عنصر XML الذي يعين إلى DataFrame
Row
. rowTag
يصبح العنصر المستوى struct
الأعلى . تصبح العناصر التابعة لحقول rowTag
المستوى struct
الأعلى .
يمكنك تحديد مخطط هذا السجل أو السماح بالاستدلال عليه تلقائيا. نظرا لأن المحلل يفحص rowTag
العناصر فقط، تتم تصفية DTD والكيانات الخارجية.
توضح الأمثلة التالية استنتاج المخطط وتحليل ملف XML باستخدام خيارات مختلفة rowTag
:
Python
xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)
Scala
val xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)
اقرأ ملف XML مع rowTag
خيار "الكتب":
Python
df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
Scala
val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)
إخراج:
root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)
+------------------------------------------------------------------------------+
|book |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+
اقرأ ملف XML باستخدام rowTag
ك "كتاب":
Python
df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:
Scala
val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:
إخراج:
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)
+-----+-----------+---------------+
|_id |author |title |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+
خيارات مصدر البيانات
يمكن تحديد خيارات مصدر البيانات ل XML بالطرق التالية:
.option/.options
أساليب ما يلي:- DataFrameReader
- DataFrameWriter
- DataStreamReader
- DataStreamWriter
- الدالات المضمنة التالية:
OPTIONS
عبارة CREATE TABLE USING DATA_SOURCE
للحصول على قائمة بالخيارات، راجع خيارات التحميل التلقائي.
دعم XSD
يمكنك التحقق اختياريا من صحة كل سجل XML على مستوى الصف بواسطة تعريف مخطط XML (XSD). تم تحديد ملف XSD في rowValidationXSDPath
الخيار . لا يؤثر XSD على المخطط المقدم أو المستنتج. يتم وضع علامة على السجل الذي يفشل في التحقق من الصحة على أنه "تالف" ويتم التعامل معه استنادا إلى خيار وضع معالجة السجلات التالف الموضح في قسم الخيار.
يمكنك استخدام XSDToSchema
لاستخراج مخطط Spark DataFrame من ملف XSD. وهو يدعم أنواع بسيطة ومعقدة وتسلسلية فقط، ويدعم فقط وظائف XSD الأساسية.
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path
val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="book">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="title" type="xs:string" />
<xs:element name="genre" type="xs:string" />
<xs:element name="price" type="xs:decimal" />
<xs:element name="publish_date" type="xs:date" />
<xs:element name="description" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""
dbutils.fs.put(xsdPath, xsdString, true)
val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))
يوضح الجدول التالي تحويل أنواع بيانات XSD إلى أنواع بيانات Spark:
أنواع بيانات XSD | أنواع بيانات Spark |
---|---|
boolean |
BooleanType |
decimal |
DecimalType |
unsignedLong |
DecimalType(38, 0) |
double |
DoubleType |
float |
FloatType |
byte |
ByteType |
short , unsignedByte |
ShortType |
integer ، negativeInteger ، nonNegativeInteger ، nonPositiveInteger ، ، positiveInteger unsignedShort |
IntegerType |
long , unsignedInt |
LongType |
date |
DateType |
dateTime |
TimestampType |
Others |
StringType |
تحليل XML المتداخل
يمكن تحليل بيانات XML في عمود ذي قيمة سلسلة في DataFrame موجود مع schema_of_xml
والتي from_xml
ترجع المخطط والنتائج التي تم تحليلها كأعمدة جديدة struct
. تم تمرير بيانات XML كوسيطة إلى schema_of_xml
from_xml
ويجب أن تكون سجل XML واحد جيد التكوين.
schema_of_xml
إعراب
schema_of_xml(xmlStr [, options] )
الوسيطات
xmlStr
: تعبير STRING يحدد سجل XML واحد مكون بشكل جيد.options
: توجيه تحديد حرفي اختياريMAP<STRING,STRING>
.
مرتجعات
سلسلة تحمل تعريف بنية مع حقول n من السلاسل حيث يتم اشتقاق أسماء الأعمدة من عنصر XML وأسماء السمات. تحتوي قيم الحقول على أنواع SQL المنسقة المشتقة.
from_xml
إعراب
from_xml(xmlStr, schema [, options])
الوسيطات
xmlStr
: تعبير STRING يحدد سجل XML واحد مكون بشكل جيد.schema
: تعبير STRING أو استدعاء الدالةschema_of_xml
.options
: توجيه تحديد حرفي اختياريMAP<STRING,STRING>
.
مرتجعات
بنية بأسماء الحقول وأنواع مطابقة لتعريف المخطط. يجب تعريف المخطط على أنه اسم عمود مفصول بفواصل وأزواج أنواع البيانات كما هو مستخدم في، على سبيل المثال، CREATE TABLE
. تنطبق معظم الخيارات الموضحة في خيارات مصدر البيانات مع الاستثناءات التالية:
rowTag
: نظرا لوجود سجل XML واحد فقط،rowTag
فإن الخيار غير قابل للتطبيق.mode
(افتراضي:PERMISSIVE
): يسمح بوضع للتعامل مع السجلات التالفة أثناء التحليل.PERMISSIVE
: عندما يفي بسجل تالف، يضع السلسلة التي تم تكوينها بشكل غير سليم في حقل تم تكوينه بواسطةcolumnNameOfCorruptRecord
، ويعين الحقول التي تم تكوينها بشكل غير سليم إلىnull
. للاحتفاظ بالسجلات التالفة، يمكنك تعيين حقل نوع سلسلة يسمىcolumnNameOfCorruptRecord
في مخطط معرف من قبل المستخدم. إذا لم يكن المخطط يحتوي على الحقل، فإنه يسقط سجلات تالفة أثناء التحليل. عند الاستدلال على مخطط، فإنه يضيف ضمنيا حقلاcolumnNameOfCorruptRecord
في مخطط إخراج.FAILFAST
: يطرح استثناء عندما يفي بالسجلات التالفة.
تحويل البنية
نظرا لاختلافات البنية بين DataFrame وXML، هناك بعض قواعد التحويل من بيانات XML من DataFrame
DataFrame
وإلى بيانات XML. لاحظ أنه يمكن تعطيل معالجة السمات باستخدام الخيار excludeAttribute
.
التحويل من XML إلى DataFrame
السمات: يتم تحويل السمات كالحقول مع بادئة attributePrefix
العنوان .
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>
ينتج مخططا أدناه:
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)
بيانات الأحرف في عنصر يحتوي على سمة (سمات) أو عنصر (عناصر) تابعة: يتم تحليلها في valueTag
الحقل. إذا كان هناك تكرارات متعددة لبيانات الأحرف valueTag
، يتم تحويل الحقل إلى array
نوع.
<one>
<two myTwoAttrib="BBBBB">two</two>
some value between elements
<three>three</three>
some other value between elements
</one>
ينتج مخططا أدناه:
root
|-- _VALUE: array (nullable = true)
| |-- element: string (containsNull = true)
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
التحويل من DataFrame إلى XML
عنصر كصفيف في صفيف: كتابة ملف XML من DataFrame
وجود حقل ArrayType
مع عنصره كما ArrayType
سيكون له حقل متداخل إضافي للعنصر. لن يحدث هذا في قراءة وكتابة بيانات XML ولكن كتابة DataFrame
قراءة من مصادر أخرى. لذلك، فإن التنقل الدائري في قراءة ملفات XML وكتابتها له نفس البنية ولكن كتابة DataFrame
قراءة من مصادر أخرى من الممكن أن يكون لها بنية مختلفة.
DataFrame مع مخطط أدناه:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
ومع البيانات أدناه:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
ينتج ملف XML أدناه:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>
يتم تحديد اسم عنصر الصفيف غير المسمى في DataFrame
بواسطة الخيار arrayElementName
(الافتراضي: item
).
عمود البيانات المنقذة
يضمن عمود البيانات الذي تم إنقاذه عدم فقدان البيانات أو فقدانها أثناء ETL. يمكنك تمكين عمود البيانات الذي تم إنقاذه لالتقاط أي بيانات لم يتم تحليلها لأن حقلا واحدا أو أكثر في السجل يحتوي على إحدى المشكلات التالية:
- غير موجود في المخطط المقدم
- لا يتطابق مع نوع بيانات المخطط المقدم
- عدم تطابق حالة مع أسماء الحقول في المخطط المتوفر
يتم إرجاع عمود البيانات الذي تم إنقاذه كمستند JSON يحتوي على الأعمدة التي تم إنقاذها، ومسار الملف المصدر للسجل. لإزالة مسار الملف المصدر من عمود البيانات المنقذة، يمكنك تعيين تكوين SQL التالي:
Python
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
Scala
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").
يمكنك تمكين عمود البيانات الذي تم إنقاذه عن طريق تعيين الخيار rescuedDataColumn
إلى اسم عمود عند قراءة البيانات، مثل _rescued_data
مع spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>)
.
يدعم محلل XML ثلاثة أوضاع عند تحليل السجلات: PERMISSIVE
و DROPMALFORMED
و.FAILFAST
عند استخدامها مع rescuedDataColumn
، لا تتسبب عدم تطابقات نوع البيانات في إسقاط السجلات في DROPMALFORMED
الوضع أو طرح خطأ في FAILFAST
الوضع. يتم إسقاط السجلات التالفة فقط (XML غير المكتملة أو المشوهة) أو طرح الأخطاء.
استنتاج المخطط وتطوره في Auto Loader
للحصول على مناقشة مفصلة لهذا الموضوع والخيارات القابلة للتطبيق، راجع تكوين استنتاج المخطط وتطوره في Auto Loader. يمكنك تكوين "المحمل التلقائي" للكشف تلقائيا عن مخطط بيانات XML المحملة، ما يسمح لك بتهيئة الجداول دون الإعلان صراحة عن مخطط البيانات وتطوير مخطط الجدول مع تقديم أعمدة جديدة. وهذا يلغي الحاجة إلى تعقب تغييرات المخطط وتطبيقها يدويا بمرور الوقت.
بشكل افتراضي، يسعى استنتاج مخطط التحميل التلقائي إلى تجنب مشكلات تطور المخطط بسبب عدم تطابق النوع. بالنسبة للتنسيقات التي لا تقوم بترميز أنواع البيانات (JSON وCSV وXML)، يستنتج المحمل التلقائي جميع الأعمدة كسلاسل، بما في ذلك الحقول المتداخلة في ملفات XML. يستخدم Apache Spark DataFrameReader
سلوكا مختلفا لاستدلال المخطط، وتحديد أنواع البيانات للأعمدة في مصادر XML استنادا إلى بيانات العينة. لتمكين هذا السلوك باستخدام "المحمل التلقائي"، قم بتعيين الخيار cloudFiles.inferColumnTypes
إلى true
.
يكتشف "المحمل التلقائي" إضافة أعمدة جديدة أثناء معالجة بياناتك. عندما يكتشف "المحمل التلقائي" عمودا جديدا، يتوقف الدفق باستخدام UnknownFieldException
. قبل أن يطرح الدفق هذا الخطأ، ينفذ Auto Loader استنتاج المخطط على أحدث دفعة صغيرة من البيانات ويحدث موقع المخطط بأحدث مخطط عن طريق دمج أعمدة جديدة إلى نهاية المخطط. تظل أنواع بيانات الأعمدة الموجودة دون تغيير. يدعم Auto Loader أوضاعا مختلفة لتطور المخطط، والتي قمت بتعيينها في الخيار cloudFiles.schemaEvolutionMode
.
يمكنك استخدام تلميحات المخطط لفرض معلومات المخطط التي تعرفها وتتوقعها على مخطط مستنتج. عندما تعرف أن العمود من نوع بيانات معين، أو إذا كنت تريد اختيار نوع بيانات أكثر عمومية (على سبيل المثال، مزدوج بدلا من عدد صحيح)، يمكنك توفير عدد عشوائي من التلميحات لأنواع بيانات الأعمدة كسلسلة باستخدام بناء جملة مواصفات مخطط SQL. عند تمكين عمود البيانات الذي تم إنقاذه، يتم تحميل الحقول المسماة _rescued_data
في حالة أخرى غير تلك الخاصة بالمخطط إلى العمود. يمكنك تغيير هذا السلوك عن طريق تعيين الخيار readerCaseSensitive
إلى false
، وفي هذه الحالة يقرأ "المحمل التلقائي" البيانات بطريقة غير حساسة لحالة الأحرف.
الأمثلة
تستخدم الأمثلة في هذا القسم ملف XML المتوفر للتنزيل في Apache Spark GitHub repo.
قراءة وكتابة XML
Python
df = (spark.read
.format('xml')
.options(rowTag='book')
.load(xmlPath)) # books.xml
selected_data = df.select("author", "_id")
(selected_data.write
.options(rowTag='book', rootTag='books')
.xml('newbooks.xml'))
Scala
val df = spark.read
.option("rowTag", "book")
.xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("newbooks.xml")
R
df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
يمكنك تحديد المخطط يدويا عند قراءة البيانات:
Python
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
custom_schema = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("description", StringType(), True),
StructField("genre", StringType(), True),
StructField("price", DoubleType(), True),
StructField("publish_date", StringType(), True),
StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)
selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')
Scala
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")
R
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
واجهة برمجة تطبيقات SQL
يمكن لمصدر بيانات XML استنتاج أنواع البيانات:
DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;
يمكنك أيضا تحديد أسماء الأعمدة وأنواعها في DDL. في هذه الحالة، لا يتم استنتاج المخطط تلقائيا.
DROP TABLE IF EXISTS books;
CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");
تحميل XML باستخدام COPY INTO
DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;
COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');
قراءة XML مع التحقق من صحة الصف
Python
df = (spark.read
.format("xml")
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.load(inputPath))
df.printSchema()
Scala
val df = spark.read
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.xml(inputPath)
df.printSchema
تحليل XML المتداخل (from_xml schema_of_xml)
Python
from pyspark.sql.functions import from_xml, schema_of_xml, lit, col
xml_data = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>
"""
df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
Scala
import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}
val xmlData = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>""".stripMargin
val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()
from_xml schema_of_xml مع واجهة برمجة تطبيقات SQL
SELECT from_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>',
schema_of_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>')
);
تحميل XML باستخدام أداة التحميل التلقائي
Python
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(availableNow=True)
.toTable("table_name")
)
Scala
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(Trigger.AvailableNow()
.toTable("table_name")
)