قراءة ملفات XML وكتابتها

هام

هذه الميزة في المعاينة العامة.

توضح هذه المقالة كيفية قراءة ملفات XML وكتابتها.

لغة التمييز الموسعة (XML) هي لغة ترميز لتنسيق البيانات وتخزينها ومشاركتها بتنسيق نصي. وهو يحدد مجموعة من القواعد لتسلسل البيانات التي تتراوح بين المستندات وهياكل البيانات العشوائية.

يتيح دعم تنسيق ملف XML الأصلي استيعاب بيانات XML والاستعلام فيها وتحليلها لمعالجة الدفعات أو دفقها. يمكنه استنتاج وتطوير أنواع المخططات والبيانات تلقائيا، ويدعم تعبيرات SQL مثل from_xml، ويمكنه إنشاء مستندات XML. لا يتطلب جرة خارجية ويعمل بسلاسة مع أداة التحميل التلقائي و read_files COPY INTO. يمكنك التحقق اختياريا من صحة كل سجل XML على مستوى الصف مقابل تعريف مخطط XML (XSD).

المتطلبات

Databricks Runtime 14.3 وما فوق

تحليل سجلات XML

تفرض مواصفات XML بنية جيدة التكوين. ومع ذلك، لا يتم تعيين هذه المواصفات على الفور إلى تنسيق جدولي. يجب تحديد rowTag الخيار للإشارة إلى عنصر XML الذي يعين إلى DataFrame Row. rowTag يصبح العنصر المستوى structالأعلى . تصبح العناصر التابعة لحقول rowTag المستوى structالأعلى .

يمكنك تحديد مخطط هذا السجل أو السماح بالاستدلال عليه تلقائيا. نظرا لأن المحلل يفحص rowTag العناصر فقط، تتم تصفية DTD والكيانات الخارجية.

توضح الأمثلة التالية استنتاج المخطط وتحليل ملف XML باستخدام خيارات مختلفة rowTag :

Python

xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""

xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)

Scala

val xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)

اقرأ ملف XML مع rowTag خيار "الكتب":

Python

df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)

Scala

val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)

إخراج:

root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)

+------------------------------------------------------------------------------+
|book                                                                          |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+

اقرأ ملف XML باستخدام rowTag ك "كتاب":

Python

df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:

Scala

val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:

إخراج:

root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)

+-----+-----------+---------------+
|_id  |author     |title          |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+

خيارات مصدر البيانات

يمكن تحديد خيارات مصدر البيانات ل XML بالطرق التالية:

للحصول على قائمة بالخيارات، راجع خيارات التحميل التلقائي.

دعم XSD

يمكنك التحقق اختياريا من صحة كل سجل XML على مستوى الصف بواسطة تعريف مخطط XML (XSD). تم تحديد ملف XSD في rowValidationXSDPath الخيار . لا يؤثر XSD على المخطط المقدم أو المستنتج. يتم وضع علامة على السجل الذي يفشل في التحقق من الصحة على أنه "تالف" ويتم التعامل معه استنادا إلى خيار وضع معالجة السجلات التالف الموضح في قسم الخيار.

يمكنك استخدام XSDToSchema لاستخراج مخطط Spark DataFrame من ملف XSD. وهو يدعم أنواع بسيطة ومعقدة وتسلسلية فقط، ويدعم فقط وظائف XSD الأساسية.

import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path

val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
  <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="book">
      <xs:complexType>
        <xs:sequence>
          <xs:element name="author" type="xs:string" />
          <xs:element name="title" type="xs:string" />
          <xs:element name="genre" type="xs:string" />
          <xs:element name="price" type="xs:decimal" />
          <xs:element name="publish_date" type="xs:date" />
          <xs:element name="description" type="xs:string" />
        </xs:sequence>
        <xs:attribute name="id" type="xs:string" use="required" />
      </xs:complexType>
    </xs:element>
  </xs:schema>"""

dbutils.fs.put(xsdPath, xsdString, true)

val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))

يوضح الجدول التالي تحويل أنواع بيانات XSD إلى أنواع بيانات Spark:

أنواع بيانات XSD أنواع بيانات Spark
boolean BooleanType
decimal DecimalType
unsignedLong DecimalType(38, 0)
double DoubleType
float FloatType
byte ByteType
short, unsignedByte ShortType
integer، negativeInteger، nonNegativeInteger، nonPositiveInteger، ، positiveIntegerunsignedShort IntegerType
long, unsignedInt LongType
date DateType
dateTime TimestampType
Others StringType

تحليل XML المتداخل

يمكن تحليل بيانات XML في عمود ذي قيمة سلسلة في DataFrame موجود مع schema_of_xml والتي from_xml ترجع المخطط والنتائج التي تم تحليلها كأعمدة جديدة struct . تم تمرير بيانات XML كوسيطة إلى schema_of_xml from_xml ويجب أن تكون سجل XML واحد جيد التكوين.

schema_of_xml

إعراب

schema_of_xml(xmlStr [, options] )

الوسيطات

  • xmlStr: تعبير STRING يحدد سجل XML واحد مكون بشكل جيد.
  • options: توجيه تحديد حرفي اختياري MAP<STRING,STRING> .

مرتجعات

سلسلة تحمل تعريف بنية مع حقول n من السلاسل حيث يتم اشتقاق أسماء الأعمدة من عنصر XML وأسماء السمات. تحتوي قيم الحقول على أنواع SQL المنسقة المشتقة.

from_xml

إعراب

from_xml(xmlStr, schema [, options])

الوسيطات

  • xmlStr: تعبير STRING يحدد سجل XML واحد مكون بشكل جيد.
  • schema: تعبير STRING أو استدعاء الدالة schema_of_xml .
  • options: توجيه تحديد حرفي اختياري MAP<STRING,STRING> .

مرتجعات

بنية بأسماء الحقول وأنواع مطابقة لتعريف المخطط. يجب تعريف المخطط على أنه اسم عمود مفصول بفواصل وأزواج أنواع البيانات كما هو مستخدم في، على سبيل المثال، CREATE TABLE. تنطبق معظم الخيارات الموضحة في خيارات مصدر البيانات مع الاستثناءات التالية:

  • rowTag: نظرا لوجود سجل XML واحد فقط، rowTag فإن الخيار غير قابل للتطبيق.
  • mode (افتراضي: PERMISSIVE): يسمح بوضع للتعامل مع السجلات التالفة أثناء التحليل.
    • PERMISSIVE: عندما يفي بسجل تالف، يضع السلسلة التي تم تكوينها بشكل غير سليم في حقل تم تكوينه بواسطة columnNameOfCorruptRecord، ويعين الحقول التي تم تكوينها بشكل غير سليم إلى null. للاحتفاظ بالسجلات التالفة، يمكنك تعيين حقل نوع سلسلة يسمى columnNameOfCorruptRecord في مخطط معرف من قبل المستخدم. إذا لم يكن المخطط يحتوي على الحقل، فإنه يسقط سجلات تالفة أثناء التحليل. عند الاستدلال على مخطط، فإنه يضيف ضمنيا حقلا columnNameOfCorruptRecord في مخطط إخراج.
    • FAILFAST: يطرح استثناء عندما يفي بالسجلات التالفة.

تحويل البنية

نظرا لاختلافات البنية بين DataFrame وXML، هناك بعض قواعد التحويل من بيانات XML من DataFrame DataFrame وإلى بيانات XML. لاحظ أنه يمكن تعطيل معالجة السمات باستخدام الخيار excludeAttribute.

التحويل من XML إلى DataFrame

السمات: يتم تحويل السمات كالحقول مع بادئة attributePrefixالعنوان .

<one myOneAttrib="AAAA">
  <two>two</two>
  <three>three</three>
</one>

ينتج مخططا أدناه:

root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)

بيانات الأحرف في عنصر يحتوي على سمة (سمات) أو عنصر (عناصر) تابعة: يتم تحليلها في valueTag الحقل. إذا كان هناك تكرارات متعددة لبيانات الأحرف valueTag ، يتم تحويل الحقل إلى array نوع.

<one>
  <two myTwoAttrib="BBBBB">two</two>
  some value between elements
  <three>three</three>
  some other value between elements
</one>

ينتج مخططا أدناه:

root
 |-- _VALUE: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- two: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)

التحويل من DataFrame إلى XML

عنصر كصفيف في صفيف: كتابة ملف XML من DataFrame وجود حقل ArrayType مع عنصره كما ArrayType سيكون له حقل متداخل إضافي للعنصر. لن يحدث هذا في قراءة وكتابة بيانات XML ولكن كتابة DataFrame قراءة من مصادر أخرى. لذلك، فإن التنقل الدائري في قراءة ملفات XML وكتابتها له نفس البنية ولكن كتابة DataFrame قراءة من مصادر أخرى من الممكن أن يكون لها بنية مختلفة.

DataFrame مع مخطط أدناه:

|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)

ومع البيانات أدناه:

+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

ينتج ملف XML أدناه:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>

يتم تحديد اسم عنصر الصفيف غير المسمى في DataFrame بواسطة الخيار arrayElementName (الافتراضي: item).

عمود البيانات المنقذة

يضمن عمود البيانات الذي تم إنقاذه عدم فقدان البيانات أو فقدانها أثناء ETL. يمكنك تمكين عمود البيانات الذي تم إنقاذه لالتقاط أي بيانات لم يتم تحليلها لأن حقلا واحدا أو أكثر في السجل يحتوي على إحدى المشكلات التالية:

  • غير موجود في المخطط المقدم
  • لا يتطابق مع نوع بيانات المخطط المقدم
  • عدم تطابق حالة مع أسماء الحقول في المخطط المتوفر

يتم إرجاع عمود البيانات الذي تم إنقاذه كمستند JSON يحتوي على الأعمدة التي تم إنقاذها، ومسار الملف المصدر للسجل. لإزالة مسار الملف المصدر من عمود البيانات المنقذة، يمكنك تعيين تكوين SQL التالي:

Python

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

Scala

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").

يمكنك تمكين عمود البيانات الذي تم إنقاذه عن طريق تعيين الخيار rescuedDataColumn إلى اسم عمود عند قراءة البيانات، مثل _rescued_data مع spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>).

يدعم محلل XML ثلاثة أوضاع عند تحليل السجلات: PERMISSIVEو DROPMALFORMEDو.FAILFAST عند استخدامها مع rescuedDataColumn، لا تتسبب عدم تطابقات نوع البيانات في إسقاط السجلات في DROPMALFORMED الوضع أو طرح خطأ في FAILFAST الوضع. يتم إسقاط السجلات التالفة فقط (XML غير المكتملة أو المشوهة) أو طرح الأخطاء.

استنتاج المخطط وتطوره في Auto Loader

للحصول على مناقشة مفصلة لهذا الموضوع والخيارات القابلة للتطبيق، راجع تكوين استنتاج المخطط وتطوره في Auto Loader. يمكنك تكوين "المحمل التلقائي" للكشف تلقائيا عن مخطط بيانات XML المحملة، ما يسمح لك بتهيئة الجداول دون الإعلان صراحة عن مخطط البيانات وتطوير مخطط الجدول مع تقديم أعمدة جديدة. وهذا يلغي الحاجة إلى تعقب تغييرات المخطط وتطبيقها يدويا بمرور الوقت.

بشكل افتراضي، يسعى استنتاج مخطط التحميل التلقائي إلى تجنب مشكلات تطور المخطط بسبب عدم تطابق النوع. بالنسبة للتنسيقات التي لا تقوم بترميز أنواع البيانات (JSON وCSV وXML)، يستنتج المحمل التلقائي جميع الأعمدة كسلاسل، بما في ذلك الحقول المتداخلة في ملفات XML. يستخدم Apache Spark DataFrameReader سلوكا مختلفا لاستدلال المخطط، وتحديد أنواع البيانات للأعمدة في مصادر XML استنادا إلى بيانات العينة. لتمكين هذا السلوك باستخدام "المحمل التلقائي"، قم بتعيين الخيار cloudFiles.inferColumnTypes إلى true.

يكتشف "المحمل التلقائي" إضافة أعمدة جديدة أثناء معالجة بياناتك. عندما يكتشف "المحمل التلقائي" عمودا جديدا، يتوقف الدفق باستخدام UnknownFieldException. قبل أن يطرح الدفق هذا الخطأ، ينفذ Auto Loader استنتاج المخطط على أحدث دفعة صغيرة من البيانات ويحدث موقع المخطط بأحدث مخطط عن طريق دمج أعمدة جديدة إلى نهاية المخطط. تظل أنواع بيانات الأعمدة الموجودة دون تغيير. يدعم Auto Loader أوضاعا مختلفة لتطور المخطط، والتي قمت بتعيينها في الخيار cloudFiles.schemaEvolutionMode.

يمكنك استخدام تلميحات المخطط لفرض معلومات المخطط التي تعرفها وتتوقعها على مخطط مستنتج. عندما تعرف أن العمود من نوع بيانات معين، أو إذا كنت تريد اختيار نوع بيانات أكثر عمومية (على سبيل المثال، مزدوج بدلا من عدد صحيح)، يمكنك توفير عدد عشوائي من التلميحات لأنواع بيانات الأعمدة كسلسلة باستخدام بناء جملة مواصفات مخطط SQL. عند تمكين عمود البيانات الذي تم إنقاذه، يتم تحميل الحقول المسماة _rescued_data في حالة أخرى غير تلك الخاصة بالمخطط إلى العمود. يمكنك تغيير هذا السلوك عن طريق تعيين الخيار readerCaseSensitive إلى false، وفي هذه الحالة يقرأ "المحمل التلقائي" البيانات بطريقة غير حساسة لحالة الأحرف.

الأمثلة

تستخدم الأمثلة في هذا القسم ملف XML المتوفر للتنزيل في Apache Spark GitHub repo.

قراءة وكتابة XML

Python

df = (spark.read
  .format('xml')
  .options(rowTag='book')
  .load(xmlPath))  # books.xml

selected_data = df.select("author", "_id")
(selected_data.write
  .options(rowTag='book', rootTag='books')
  .xml('newbooks.xml'))

Scala

val df = spark.read
  .option("rowTag", "book")
  .xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("newbooks.xml")

R

df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

يمكنك تحديد المخطط يدويا عند قراءة البيانات:

Python

from pyspark.sql.types import StructType, StructField, StringType, DoubleType

custom_schema = StructType([
    StructField("_id", StringType(), True),
    StructField("author", StringType(), True),
    StructField("description", StringType(), True),
    StructField("genre", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("publish_date", StringType(), True),
    StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)

selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')

Scala

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")

R

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

واجهة برمجة تطبيقات SQL

يمكن لمصدر بيانات XML استنتاج أنواع البيانات:

DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;

يمكنك أيضا تحديد أسماء الأعمدة وأنواعها في DDL. في هذه الحالة، لا يتم استنتاج المخطط تلقائيا.

DROP TABLE IF EXISTS books;

CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");

تحميل XML باستخدام COPY INTO

DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;

COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');

قراءة XML مع التحقق من صحة الصف

Python

df = (spark.read
    .format("xml")
    .option("rowTag", "book")
    .option("rowValidationXSDPath", xsdPath)
    .load(inputPath))
df.printSchema()

Scala

val df = spark.read
  .option("rowTag", "book")
  .option("rowValidationXSDPath", xsdPath)
  .xml(inputPath)
df.printSchema

تحليل XML المتداخل (from_xml schema_of_xml)

Python

from pyspark.sql.functions import from_xml, schema_of_xml, lit, col

xml_data = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>
"""

df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()

Scala

import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}

val xmlData = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>""".stripMargin

val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()

from_xml schema_of_xml مع واجهة برمجة تطبيقات SQL

SELECT from_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>',
  schema_of_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>')
);

تحميل XML باستخدام أداة التحميل التلقائي

Python

query = (spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", True)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(availableNow=True)
  .toTable("table_name")
)

Scala

val query = spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", true)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(Trigger.AvailableNow()
  .toTable("table_name")
  )

الموارد الإضافية

قراءة بيانات XML وكتابتها باستخدام مكتبة spark-xml