Κοινή χρήση μέσω


Read and write XML files

Important

This feature is in Public Preview.

This article describes how to read and write XML files.

Extensible Markup Language (XML) is a markup language for formatting, storing, and sharing data in textual format. It defines a set of rules for serializing data ranging from documents to arbitrary data structures.

Native XML file format support enables ingestion, querying, and parsing of XML data for batch processing or streaming. It can automatically infer and evolve schema and data types, supports SQL expressions like from_xml, and can generate XML documents. It doesn’t require external jars and works seamlessly with Auto Loader, read_files and COPY INTO. You can optionally validate each row-level XML record against an XML Schema Definition (XSD).

Requirements

Databricks Runtime 14.3 and above

Parse XML records

XML specification mandates a well-formed structure. However, this specification doesn’t immediately map to a tabular format. You must specify the rowTag option to indicate the XML element that maps to a DataFrame Row. The rowTag element becomes the top-level struct. The child elements of rowTag become the fields of the top-level struct.

You can specify the schema for this record or let it be inferred automatically. Because the parser only examines the rowTag elements, DTD and external entities are filtered out.

The following examples illustrate schema inference and parsing of an XML file using different rowTag options:

Python

xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""

xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)

Scala

val xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)

Read the XML file with rowTag option as “books”:

Python

df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)

Scala

val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)

Output:

root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)

+------------------------------------------------------------------------------+
|book                                                                          |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+

Read the XML file with rowTag as “book”:

Python

df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:

Scala

val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:

Output:

root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)

+-----+-----------+---------------+
|_id  |author     |title          |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+

Data source options

Data source options for XML can be specified the following ways:

For a list of options, see Auto Loader options.

XSD support

You can optionally validate each row-level XML record by an XML Schema Definition (XSD). The XSD file is specified in the rowValidationXSDPath option. The XSD does not otherwise affect the schema provided or inferred. A record that fails the validation is marked as “corrupted” and handled based on the corrupt record handling mode option described in the option section.

You can use XSDToSchema to extract a Spark DataFrame schema from a XSD file. It supports only simple, complex, and sequence types, and only supports basic XSD functionality.

import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path

val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
  <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="book">
      <xs:complexType>
        <xs:sequence>
          <xs:element name="author" type="xs:string" />
          <xs:element name="title" type="xs:string" />
          <xs:element name="genre" type="xs:string" />
          <xs:element name="price" type="xs:decimal" />
          <xs:element name="publish_date" type="xs:date" />
          <xs:element name="description" type="xs:string" />
        </xs:sequence>
        <xs:attribute name="id" type="xs:string" use="required" />
      </xs:complexType>
    </xs:element>
  </xs:schema>"""

dbutils.fs.put(xsdPath, xsdString, true)

val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))

The following table shows the conversion of XSD data types to Spark data types:

XSD Data Types Spark Data Types
boolean BooleanType
decimal DecimalType
unsignedLong DecimalType(38, 0)
double DoubleType
float FloatType
byte ByteType
short, unsignedByte ShortType
integer, negativeInteger, nonNegativeInteger, nonPositiveInteger, positiveInteger, unsignedShort IntegerType
long, unsignedInt LongType
date DateType
dateTime TimestampType
Others StringType

Parse nested XML

XML data in a string-valued column in an existing DataFrame can be parsed with schema_of_xml and from_xml that returns the schema and the parsed results as new struct columns. XML data passed as an argument to schema_of_xml and from_xml must be a single well-formed XML record.

schema_of_xml

Syntax

schema_of_xml(xmlStr [, options] )

Arguments

  • xmlStr: A STRING expression specifying a single well-formed XML record.
  • options: An optional MAP<STRING,STRING> literal specifying directives.

Returns

A STRING holding a definition of a struct with n fields of strings where the column names are derived from the XML element and attribute names. The field values hold the derived formatted SQL types.

from_xml

Syntax

from_xml(xmlStr, schema [, options])

Arguments

  • xmlStr: A STRING expression specifying a single well-formed XML record.
  • schema: A STRING expression or invocation of the schema_of_xml function.
  • options: An optional MAP<STRING,STRING> literal specifying directives.

Returns

A struct with field names and types matching the schema definition. Schema must be defined as comma-separated column name and data type pairs as used in, for example, CREATE TABLE. Most options shown in the data source options are applicable with the following exceptions:

  • rowTag: Because there is only one XML record, the rowTag option is not applicable.
  • mode (default: PERMISSIVE): Allows a mode for dealing with corrupt records during parsing.
    • PERMISSIVE: When it meets a corrupted record, puts the malformed string into a field configured by columnNameOfCorruptRecord, and sets malformed fields to null. To keep corrupt records, you can set a string type field named columnNameOfCorruptRecord in a user-defined schema. If a schema does not have the field, it drops corrupt records during parsing. When inferring a schema, it implicitly adds a columnNameOfCorruptRecord field in an output schema.
    • FAILFAST: Throws an exception when it meets corrupted records.

Structure conversion

Due to the structure differences between DataFrame and XML, there are some conversion rules from XML data to DataFrame and from DataFrame to XML data. Note that handling attributes can be disabled with the option excludeAttribute.

Conversion from XML to DataFrame

Attributes: Attributes are converted as fields with the heading prefix attributePrefix.

<one myOneAttrib="AAAA">
  <two>two</two>
  <three>three</three>
</one>

produces a schema below:

root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)

Character data in an element containing attribute(s) or child element(s): These are parsed into the valueTag field. If there are multiple occurrences of character data, the valueTag field is converted to an array type.

<one>
  <two myTwoAttrib="BBBBB">two</two>
  some value between elements
  <three>three</three>
  some other value between elements
</one>

produces a schema below:

root
 |-- _VALUE: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- two: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)

Conversion from DataFrame to XML

Element as an array in an array: Writing a XML file from DataFrame having a field ArrayType with its element as ArrayType would have an additional nested field for the element. This would not happen in reading and writing XML data but writing a DataFrame read from other sources. Therefore, roundtrip in reading and writing XML files has the same structure but writing a DataFrame read from other sources is possible to have a different structure.

DataFrame with a schema below:

|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)

and with data below:

+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

produces a XML file below:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>

The element name of the unnamed array in the DataFrame is specified by the option arrayElementName (Default: item).

Rescued data column

The rescued data column ensures that you never lose or miss out on data during ETL. You can enable the rescued data column to capture any data that wasn’t parsed because one or more fields in a record have one of the following issues:

  • Absent from the provided schema
  • Does not match the data type of the provided schema
  • Has a case mismatch with the field names in the provided schema

The rescued data column is returned as a JSON document containing the columns that were rescued, and the source file path of the record. To remove the source file path from the rescued data column, you can set the following SQL configuration:

Python

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

Scala

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").

You can enable the rescued data column by setting the option rescuedDataColumn to a column name when reading data, such as _rescued_data with spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>).

The XML parser supports three modes when parsing records: PERMISSIVE, DROPMALFORMED, and FAILFAST. When used together with rescuedDataColumn, data type mismatches do not cause records to be dropped in DROPMALFORMED mode or throw an error in FAILFAST mode. Only corrupt records (incomplete or malformed XML) are dropped or throw errors.

Schema inference and evolution in Auto Loader

For a detailed discussion of this topic and applicable options, see Configure schema inference and evolution in Auto Loader. You can configure Auto Loader to automatically detect the schema of loaded XML data, allowing you to initialize tables without explicitly declaring the data schema and evolve the table schema as new columns are introduced. This eliminates the need to manually track and apply schema changes over time.

By default, Auto Loader schema inference seeks to avoid schema evolution issues due to type mismatches. For formats that don’t encode data types (JSON, CSV, and XML), Auto Loader infers all columns as strings, including nested fields in XML files. The Apache Spark DataFrameReader uses a different behavior for schema inference, selecting data types for columns in XML sources based on sample data. To enable this behavior with Auto Loader, set the option cloudFiles.inferColumnTypes to true.

Auto Loader detects the addition of new columns as it processes your data. When Auto Loader detects a new column, the stream stops with an UnknownFieldException. Before your stream throws this error, Auto Loader performs schema inference on the latest micro-batch of data and updates the schema location with the latest schema by merging new columns to the end of the schema. The data types of existing columns remain unchanged. Auto Loader supports different modes for schema evolution, which you set in the option cloudFiles.schemaEvolutionMode.

You can use schema hints to enforce the schema information that you know and expect on an inferred schema. When you know that a column is of a specific data type, or if you want to choose a more general data type (for example, a double instead of an integer), you can provide an arbitrary number of hints for column data types as a string using SQL schema specification syntax. When the rescued data column is enabled, fields named in a case other than that of the schema are loaded to the _rescued_data column. You can change this behavior by setting the option readerCaseSensitive to false, in which case Auto Loader reads data in a case-insensitive way.

Examples

The examples in this section use an XML file available for download in the Apache Spark GitHub repo.

Read and write XML

Python

df = (spark.read
  .format('xml')
  .options(rowTag='book')
  .load(xmlPath))  # books.xml

selected_data = df.select("author", "_id")
(selected_data.write
  .options(rowTag='book', rootTag='books')
  .xml('newbooks.xml'))

Scala

val df = spark.read
  .option("rowTag", "book")
  .xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("newbooks.xml")

R

df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

You can manually specify the schema when reading data:

Python

from pyspark.sql.types import StructType, StructField, StringType, DoubleType

custom_schema = StructType([
    StructField("_id", StringType(), True),
    StructField("author", StringType(), True),
    StructField("description", StringType(), True),
    StructField("genre", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("publish_date", StringType(), True),
    StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)

selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')

Scala

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")

R

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

SQL API

XML data source can infer data types:

DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;

You can also specify column names and types in DDL. In this case, the schema is not inferred automatically.

DROP TABLE IF EXISTS books;

CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");

Load XML using COPY INTO

DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;

COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');

Read XML with row validation

Python

df = (spark.read
    .format("xml")
    .option("rowTag", "book")
    .option("rowValidationXSDPath", xsdPath)
    .load(inputPath))
df.printSchema()

Scala

val df = spark.read
  .option("rowTag", "book")
  .option("rowValidationXSDPath", xsdPath)
  .xml(inputPath)
df.printSchema

Parse nested XML (from_xml and schema_of_xml)

Python

from pyspark.sql.functions import from_xml, schema_of_xml, lit, col

xml_data = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>
"""

df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()

Scala

import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}

val xmlData = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>""".stripMargin

val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()

from_xml and schema_of_xml with SQL API

SELECT from_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>',
  schema_of_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>')
);

Load XML with Auto Loader

Python

query = (spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", True)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(availableNow=True)
  .toTable("table_name")
)

Scala

val query = spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", true)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(Trigger.AvailableNow()
  .toTable("table_name")
  )

Additional resources

Read and write XML data using the spark-xml library