หมายเหตุ
การเข้าถึงหน้านี้ต้องได้รับการอนุญาต คุณสามารถลอง ลงชื่อเข้าใช้หรือเปลี่ยนไดเรกทอรีได้
การเข้าถึงหน้านี้ต้องได้รับการอนุญาต คุณสามารถลองเปลี่ยนไดเรกทอรีได้
Important
This feature is in Public Preview.
Extensible Markup Language (XML) is a markup language for formatting, storing, and sharing data in textual format. It defines a set of rules for serializing data ranging from documents to arbitrary data structures.
Azure Databricks supports XML for both reading and writing with Apache Spark, including automatic schema inference and evolution, row tag configuration, XSD validation, and SQL expressions like from_xml. Native XML support works with Auto Loader, read_files, and COPY INTO without requiring external jars.
Prerequisites
XML file format support requires Databricks Runtime 14.3 and above.
Options
Use the .option() and .options() methods of DataFrameReader and DataFrameWriter to configure XML data sources. For a complete list of supported options, see DataFrameReader XML options and DataFrameWriter XML options.
Parse XML records
XML specification mandates a well-formed structure. However, this specification doesn't immediately map to a tabular format. You must specify the rowTag option to indicate the XML element that maps to a DataFrame Row. The rowTag element becomes the top-level struct. The child elements of rowTag become the fields of the top-level struct.
You can specify the schema for this record or let it be inferred automatically. Because the parser only examines the rowTag elements, DTD and external entities are filtered out.
The following examples illustrate schema inference and parsing of an XML file using different rowTag options:
Python
xmlString = """
<reviews>
<review id="r001">
<author>Alice</author>
<rating>5</rating>
<comment>Amazing stay, highly recommend!</comment>
</review>
<review id="r002">
<author>Bob</author>
<rating>4</rating>
<comment>Great location, very comfortable</comment>
</review>
</reviews>"""
xmlPath = "/Volumes/<catalog>/<schema>/<volume>/reviews.xml"
dbutils.fs.put(xmlPath, xmlString, True)
Scala
val xmlString = """
<reviews>
<review id="r001">
<author>Alice</author>
<rating>5</rating>
<comment>Amazing stay, highly recommend!</comment>
</review>
<review id="r002">
<author>Bob</author>
<rating>4</rating>
<comment>Great location, very comfortable</comment>
</review>
</reviews>"""
val xmlPath = "/Volumes/<catalog>/<schema>/<volume>/reviews.xml"
dbutils.fs.put(xmlPath, xmlString)
Read the XML file with rowTag option as "reviews":
Python
df = spark.read.option("rowTag", "reviews").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
Scala
val df = spark.read.option("rowTag", "reviews").xml(xmlPath)
df.printSchema()
df.show(truncate=false)
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews.xml',
format => 'xml',
rowTag => 'reviews'
)
Output:
root
|-- review: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- comment: string (nullable = true)
| | |-- rating: string (nullable = true)
+----------------------------------------------------------------------------------------+
|review |
+----------------------------------------------------------------------------------------+
|[{r001, Alice, Amazing stay, highly recommend!, 5}, {r002, Bob, Great location..., 4}] |
+----------------------------------------------------------------------------------------+
Read the XML file with rowTag as "review":
Python
df = spark.read.option("rowTag", "review").format("xml").load(xmlPath)
# Infers four top-level fields and parses `review` in separate rows:
Scala
val df = spark.read.option("rowTag", "review").xml(xmlPath)
// Infers four top-level fields and parses `review` in separate rows:
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews.xml',
format => 'xml',
rowTag => 'review'
)
Output:
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- comment: string (nullable = true)
|-- rating: string (nullable = true)
+----+------+--------------------------------+------+
|_id |author|comment |rating|
+----+------+--------------------------------+------+
|r001|Alice |Amazing stay, highly recommend! |5 |
|r002|Bob |Great location, very comfortable|4 |
+----+------+--------------------------------+------+
Set XML compression
When writing XML files to cloud storage, compression reduces storage costs. Text-based formats like XML compress effectively because they contain repetitive structure and character sequences.
Configure compression using the compression write option. The default is none.
| Codec | Description |
|---|---|
none |
No compression. Default. |
bzip2 |
High compression ratio, slowest option. Best for archival use cases where storage cost is the priority. |
deflate |
Higher compression ratio than snappy at the cost of additional CPU time. |
gzip |
Good compression ratio with wider tooling support than snappy. |
lz4 |
Optimized for speed with lower compression ratio. |
snappy |
Optimized for speed with moderate compression. Good for interactive workloads. |
zstd |
Good balance of speed and compression ratio; faster than deflate at similar or better ratios. |
For example, write Wanderbricks reviews to reviews_xml_compressed using gzip compression.
Python
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("xml").option("compression", "gzip").option("rowTag", "review").save("/Volumes/<catalog>/<schema>/<volume>/reviews_xml_compressed")
Scala
val df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("xml").option("compression", "gzip").option("rowTag", "review").save("/Volumes/<catalog>/<schema>/<volume>/reviews_xml_compressed")
SQL
CREATE TABLE reviews_compressed (author STRING, rating INT, comment STRING)
USING XML
OPTIONS (rowTag 'review', compression 'gzip');
Validate XML records with XSD
You can optionally validate each row-level XML record by an XML Schema Definition (XSD). The XSD file is specified in the rowValidationXSDPath option. The XSD does not otherwise affect the schema provided or inferred. A record that fails the validation is marked as "corrupted" and handled based on the corrupt record handling mode option described in the option section.
You can use XSDToSchema to extract a Spark DataFrame schema from a XSD file. It supports only simple, complex, and sequence types, and only supports basic XSD functionality.
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path
val xsdPath = "/Volumes/<catalog>/<schema>/<volume>/reviews.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="review">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="rating" type="xs:integer" />
<xs:element name="comment" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""
dbutils.fs.put(xsdPath, xsdString, true)
val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))
The following table shows the conversion of XSD data types to Spark data types:
| XSD Data Types | Spark Data Types |
|---|---|
boolean |
BooleanType |
decimal |
DecimalType |
unsignedLong |
DecimalType(38, 0) |
double |
DoubleType |
float |
FloatType |
byte |
ByteType |
short, unsignedByte |
ShortType |
integer, negativeInteger, nonNegativeInteger, nonPositiveInteger, positiveInteger, unsignedShort |
IntegerType |
long, unsignedInt |
LongType |
date |
DateType |
dateTime |
TimestampType |
Others |
StringType |
Parse nested XML
XML data in a string-valued column in an existing DataFrame can be parsed with schema_of_xml and from_xml that returns the schema and the parsed results as new struct columns. XML data passed as an argument to schema_of_xml and from_xml must be a single well-formed XML record.
schema_of_xml
Use schema_of_xml to infer the Spark schema from an XML string. Pass the result to from_xml to parse XML columns.
Syntax: schema_of_xml(xmlStr [, options])
| Argument | Required | Description |
|---|---|---|
xmlStr |
Yes | A STRING expression specifying a single well-formed XML record. |
options |
No | A MAP<STRING,STRING> literal specifying directives. |
Returns a STRING holding a definition of a struct with n fields of strings where the column names are derived from the XML element and attribute names. The field values hold the derived formatted SQL types.
from_xml
Use from_xml to parse a STRING column containing XML records into a struct. Provide a schema directly or use the output of schema_of_xml.
Syntax: from_xml(xmlStr, schema [, options])
| Argument | Required | Description |
|---|---|---|
xmlStr |
Yes | A STRING expression specifying a single well-formed XML record. |
schema |
Yes | A STRING expression or invocation of the schema_of_xml function. |
options |
No | A MAP<STRING,STRING> literal specifying directives. |
Returns a struct with field names and types matching the schema definition. Schema must be defined as comma-separated column name and data type pairs as used in, for example, CREATE TABLE. Most options shown in the Options section are applicable with the
following exceptions:
rowTag: Because there is only one XML record, therowTagoption is not applicable.mode(default:PERMISSIVE): Allows a mode for dealing with corrupt records during parsing.PERMISSIVE: When it meets a corrupted record, puts the malformed string into a field configured bycolumnNameOfCorruptRecord, and sets malformed fields tonull. To keep corrupt records, you can set a string type field namedcolumnNameOfCorruptRecordin a user-defined schema. If a schema does not have the field, it drops corrupt records during parsing. When inferring a schema, it implicitly adds acolumnNameOfCorruptRecordfield in an output schema.FAILFAST: Throws an exception when it meets corrupted records.
Examples
To parse an XML string column, use schema_of_xml to infer the schema, then pass it to from_xml:
Python
from pyspark.sql.functions import from_xml, schema_of_xml, lit, col
xml_data = """
<review id="r001">
<author>Alice</author>
<rating>5</rating>
<comment>Amazing stay, highly recommend!</comment>
</review>
"""
df = spark.createDataFrame([(1, xml_data)], ["review_id", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
Scala
import org.apache.spark.sql.functions.{from_xml, schema_of_xml, lit}
val xmlData = """
<review id="r001">
<author>Alice</author>
<rating>5</rating>
<comment>Amazing stay, highly recommend!</comment>
</review>""".stripMargin
val df = Seq((1, xmlData)).toDF("review_id", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()
To parse inline XML in SQL:
SELECT from_xml('
<review id="r001">
<author>Alice</author>
<rating>5</rating>
<comment>Amazing stay, highly recommend!</comment>
</review>',
schema_of_xml('
<review id="r001">
<author>Alice</author>
<rating>5</rating>
<comment>Amazing stay, highly recommend!</comment>
</review>')
);
Convert between XML and DataFrame structures
Due to the structure differences between DataFrame and XML, there are some conversion rules from XML data to DataFrame and from DataFrame to XML data. Note that handling attributes can be disabled with the option excludeAttribute.
Conversion from XML to DataFrame
When reading XML, Azure Databricks maps XML elements and attributes to DataFrame fields according to the following rules.
Attributes are converted as fields with the heading prefix attributePrefix.
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>
This produces the following schema:
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)
Character data in an element containing attribute(s) or child element(s) are parsed into the valueTag field. If there are multiple occurrences of character data, the valueTag field is converted to an array type.
<one>
<two myTwoAttrib="BBBBB">two</two>
some value between elements
<three>three</three>
some other value between elements
</one>
This produces the following schema:
root
|-- _VALUE: array (nullable = true)
| |-- element: string (containsNull = true)
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
Conversion from DataFrame to XML
When writing a DataFrame to XML, certain nested structures require special handling due to differences between the DataFrame and XML data models.
If a DataFrame contains an ArrayType field whose element type is also ArrayType, writing it to XML produces an extra nesting level that is not present when round-tripping XML files. This only affects DataFrames sourced outside of XML — reading and writing XML files preserves the original structure.
For example, a DataFrame with the following schema:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
and the following data:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
produces the following XML output:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>
The element name of the unnamed array in the DataFrame is specified by the option arrayElementName (Default: item).
Enable the rescued data column
The rescued data column ensures that you never lose data during ETL. It captures any data that wasn't parsed because one or more fields in a record have one of the following issues:
- Absent from the provided schema.
- Does not match the data type of the provided schema.
- Has a case mismatch with the field names in the provided schema.
The rescued data column is returned as a JSON document containing the columns that were rescued, and the source file path of the record.
To enable the rescued data column, set the rescuedDataColumn option to a column name when reading:
Python
df = spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load("/Volumes/<catalog>/<schema>/<volume>/reviews_xml")
Scala
val df = spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load("/Volumes/<catalog>/<schema>/<volume>/reviews_xml")
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_xml',
format => 'xml',
rowTag => 'review',
rescuedDataColumn => '_rescued_data'
)
To remove the source file path from the rescued data column, set:
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
The XML parser supports three modes when parsing records: PERMISSIVE, DROPMALFORMED, and FAILFAST. When used together with rescuedDataColumn, data type mismatches do not cause records to be dropped in DROPMALFORMED mode or throw an error in FAILFAST mode. Only corrupt records (incomplete or malformed XML) are dropped or throw errors.
Infer and evolve schema with Auto Loader
For a detailed discussion of this topic and applicable options, see Configure schema inference and evolution in Auto Loader. You can configure Auto Loader to automatically detect the schema of loaded XML data, allowing you to initialize tables without explicitly declaring the data schema and evolve the table schema as new columns are introduced. This eliminates the need to manually track and apply schema changes over time.
By default, Auto Loader schema inference seeks to avoid schema evolution issues due to type mismatches. For formats that don't encode data types (JSON, CSV, and XML), Auto Loader infers all columns as strings, including nested fields in XML files. The Apache Spark DataFrameReader uses a different behavior for schema inference, selecting data types for columns in XML sources based on sample data. To enable this behavior with Auto Loader, set the option cloudFiles.inferColumnTypes to true.
Auto Loader detects the addition of new columns as it processes your data. When Auto Loader detects a new column, the stream stops with an UnknownFieldException. Before your stream throws this error, Auto Loader performs schema inference on the latest micro-batch of data and updates the schema location with the latest schema by merging new columns to the end of the schema. The data types of existing columns remain unchanged. Auto Loader supports different modes for schema evolution, which you set in the option cloudFiles.schemaEvolutionMode.
You can use schema hints to enforce the schema information that you know and expect on an inferred schema. When you know that a column is of a specific data type, or if you want to choose a more general data type (for example, a double instead of an integer), you can provide an arbitrary number of hints for column data types as a string using SQL schema specification syntax. When the rescued data column is enabled, fields named in a case other than that of the schema are loaded to the _rescued_data column. You can change this behavior by setting the option readerCaseSensitive to false, in which case Auto Loader reads data in a case-insensitive way.
Usage
The following examples use the Wanderbricks dataset to demonstrate reading and writing XML files using the Spark DataFrame API and SQL.
Read and write XML
Use the DataFrame API to write Wanderbricks reviews to XML and read them back.
Python
# Write Wanderbricks reviews to XML
df = spark.read.table("samples.wanderbricks.reviews")
df.write \
.format("xml") \
.option("rootTag", "reviews") \
.option("rowTag", "review") \
.save("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
# Read the XML file back
df_read = spark.read \
.format("xml") \
.option("rowTag", "review") \
.load("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
df_read.show()
Scala
// Write Wanderbricks reviews to XML
val df = spark.read.table("samples.wanderbricks.reviews")
df.write
.format("xml")
.option("rootTag", "reviews")
.option("rowTag", "review")
.save("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
// Read the XML file back
val dfRead = spark.read
.format("xml")
.option("rowTag", "review")
.xml("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
dfRead.show()
R
df <- loadDF("/Volumes/<catalog>/<schema>/<volume>/reviews.xml", source = "xml", rowTag = "review")
saveDF(df, "/Volumes/<catalog>/<schema>/<volume>/newreviews.xml", "xml", "overwrite")
You can manually specify the schema when reading data:
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
custom_schema = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("rating", IntegerType(), True),
StructField("comment", StringType(), True)
])
df = spark.read.options(rowTag='review').xml('/Volumes/<catalog>/<schema>/<volume>/reviews.xml', schema=custom_schema)
df.show()
Scala
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("rating", IntegerType, nullable = true),
StructField("comment", StringType, nullable = true)))
val df = spark.read.option("rowTag", "review").schema(customSchema).xml("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
df.show()
R
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("rating", "integer"),
structField("comment", "string"))
df <- loadDF("/Volumes/<catalog>/<schema>/<volume>/reviews.xml", source = "xml", schema = customSchema, rowTag = "review")
saveDF(df, "/Volumes/<catalog>/<schema>/<volume>/newreviews.xml", "xml", "overwrite")
Read and write XML with SQL
Use SQL DDL to create a table from an XML file. Azure Databricks infers column types automatically.
DROP TABLE IF EXISTS reviews;
CREATE TABLE reviews
USING XML
OPTIONS (path "/Volumes/<catalog>/<schema>/<volume>/reviews.xml", rowTag "review");
SELECT * FROM reviews;
You can also specify column names and types in DDL. In this case, the schema is not inferred automatically.
DROP TABLE IF EXISTS reviews;
CREATE TABLE reviews (_id string, author string, rating integer, comment string)
USING XML
OPTIONS (path "/Volumes/<catalog>/<schema>/<volume>/reviews.xml", rowTag "review");
Load XML using COPY INTO
Use COPY INTO to load XML files from cloud storage into a Delta table.
DROP TABLE IF EXISTS reviews;
CREATE TABLE IF NOT EXISTS reviews;
COPY INTO reviews
FROM "/Volumes/<catalog>/<schema>/<volume>/reviews.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'review')
COPY_OPTIONS ('mergeSchema' = 'true');
Read XML with row validation
Use the rowValidationXSDPath option to validate each row against an XSD schema while reading.
Python
df = (spark.read
.format("xml")
.option("rowTag", "review")
.option("rowValidationXSDPath", xsdPath)
.load("/Volumes/<catalog>/<schema>/<volume>/reviews.xml"))
df.printSchema()
Scala
val df = spark.read
.option("rowTag", "review")
.option("rowValidationXSDPath", xsdPath)
.xml("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
df.printSchema
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews.xml',
format => 'xml',
rowTag => 'review',
rowValidationXSDPath => '/Volumes/<catalog>/<schema>/<volume>/reviews.xsd'
)
Load XML with Auto Loader
Use Auto Loader to continuously ingest XML files from cloud storage into a Delta table with automatic schema inference and evolution.
Python
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "review")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(availableNow=True)
.toTable("reviews")
)
Scala
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "review")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(Trigger.AvailableNow())
.toTable("reviews")
Additional resources
- Read and write XML data using the
spark-xmllibrary: For users who previously used the open-source Spark XML library, see the legacy integration guide. - Read and write JSON files: If your data is semi-structured but not XML, JSON provides similar schema inference and nested data support with a simpler format.